error: ‘message_source&’ is not a class, struct, or union type (However, message_source is a class)

  actor, c++, functional-programming, unit-testing

everyone. I’m trying to write some code to implement a concurrent system via actor pattern. The actor’s interface is like:

template <class SourceMessageType, class MessageType>
class actor{
public:
    using value_type = MessageType;
    void process_message(SourceMessageType&& message);
    template <class EmitFunction>
    void on_message(EmitFunction emit);
private:
    std::function<void(MessageType&&)> m_emit;
};

In my program, there are two actors, one is Service Actor while the other is Sink Actor. Their definitions as follow(I will put a minimal working example at the bottom of this question):

class service {
public:
    using value_type = std::string;
    explicit service(boost::asio::io_service& service,
                     unsigned short port = 42042);
    service(const service&) = delete;
    service(service&& other) = default;

    template <class EmitFunction>
    void on_message(EmitFunction emit) {
        m_emit = emit;
        do_accept();
    }
private:
    void do_accept();
    tcp::acceptor m_acceptor;
    tcp::socket m_socket;
    std::function<void(std::string&&)> m_emit;
};

template <class Sender, class Function,
          class MessageType = typename Sender::value_type>
class sink_impl {
public:
    using value_type = MessageType;
    sink_impl(Sender&& sender, Function function)
        : m_sender(std::move(sender)), m_function(function) {
        m_sender.on_message([this](MessageType&& message) {
            process_message(std::move(message));
        });
    }
    void process_message(MessageType&& message) const {
        std::invoke(m_function, std::move(message));
    }
private:
    Sender m_sender;
    Function m_function;
};

As you can see, the Service Actor’s only duty is to send messages to other actors, but it never receives any messages from them. The Sink Actor is quite the opposite. For testing my actors, I write the main function down below and use telnet to mock clients.

template <class Function>
struct sink_helper {
    Function function;
};

template <class Function>
auto sink(Function&& function) {
    return detail::sink_helper<Function>{std::forward<Function>(function)};
}

template <class Sender, class Function>
auto operator|(Sender&& sender, detail::sink_helper<Function> sink) {
    return detail::sink_impl<Sender, Function>(std::forward<Sender>(sender),sink.function);
}
int main(){
    boost::asio::io_service event_loop;
    auto sink_to_cerr =
        sink([](const auto& message) { std::cerr << message << 'n'; });
    // Starting the Boost.ASIO service
    auto pipeline = service(event_loop) | sink_to_cerr;
    cerr << "Service is running...n";
    event_loop.run();
}

So far so good, my program can receive whatever messages I sent to it by telnet. And you know, async operation is always hard to test automatically, so I want to add a mock_service class to my program eariser to test. My mock_service is as follow:

class message_source{
public:
    message_source(std::initializer_list<std::string>&& inits) : m_source{inits}{}
    message_source(const std::vector<std::string>& other) : m_source(other){}
    message_source(const message_source&) = delete;
    message_source(message_source&& other) : m_source(std::move(other.m_source)){}
    template <class EmitFunction>
    void on_message(EmitFunction emit) {
        m_emit = emit;
    }
private:
    std::function<void(std::string&&)> m_emit;
    std::vector<std::string> m_source;
};
// omit some irrelevant codes
int main(){
    boost::asio::io_service event_loop;
    auto sink_to_cerr =
        sink([](const auto& message) { std::cerr << message << 'n'; });
    // Starting the Boost.ASIO service
    auto pipeline = mock_service(event_loop) | sink_to_cerr;
    cerr << "Service is running...n";
    event_loop.run();
}

But this time, I got a compile error message:

g++ mwe.cc -std=c++17 -lpthread
mwe.cc: In instantiation of ‘auto operator|(Sender&&, sink_helper<Function>) [with Sender = mock_service&; Function = main(int, const char**)::<lambda(const auto:1&)>]’:
mwe.cc:134:30:   required from here
mwe.cc:123:12: error: ‘mock_service&’ is not a class, struct, or union type
  123 |     return sink_impl<Sender, Function>(std::forward<Sender>(sender),
      |            ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  124 |                                           sink.function);
      |                                           ~~~~~~~~~~~~~~
mwe.cc: In function ‘int main(int, const char**)’:
mwe.cc:134:10: error: ‘void pipeline’ has incomplete type
  134 |     auto pipeline = source | sink_to_cerr;

This error message seems not to provide me any useful information, because the mock_service is exactly class type. So my question is Why this error happens and how to make it compile?


For your convenience, here is a minimal working example.

#include <boost/asio.hpp>
#include <functional>
#include <iostream>
using boost::asio::ip::tcp;

template <typename EmitFunction>
class session : public std::enable_shared_from_this<session<EmitFunction>> {
public:
    session(tcp::socket&& socket, EmitFunction emit)
        : m_socket(std::move(socket)), m_emit(emit) {}
    void start() { do_read(); }

private:
    using shared_session = std::enable_shared_from_this<session<EmitFunction>>;
    void do_read() {
        auto self = shared_session::shared_from_this();
        boost::asio::async_read_until(
            m_socket, m_data, 'n',
            [this, self](const boost::system::error_code& error,
                         std::size_t size) {
                if (!error) {
                    std::istream is(&m_data);
                    std::string line;
                    std::getline(is, line);
                    m_emit(std::move(line));
                    do_read();
                }
            });
    }
    tcp::socket m_socket;
    boost::asio::streambuf m_data;
    EmitFunction m_emit;
};

template <class Socket, class EmitFunction>
auto make_shared_session(Socket&& socket, EmitFunction&& emit) {
    return std::make_shared<session<EmitFunction>>(
        std::forward<Socket>(socket), std::forward<EmitFunction>(emit));
}

class service {
public:
    using value_type = std::string;
    explicit service(boost::asio::io_service& service,unsigned short port = 42042)
        :m_acceptor(service, tcp::endpoint(tcp::v4(), port)), m_socket(service) {}
    service(const service&) = delete;
    service(service&& other) = default;

    template <class EmitFunction>
    void on_message(EmitFunction emit) {
        m_emit = emit;
        do_accept();
    }

private:
    void do_accept(){
        m_acceptor.async_accept(
        m_socket, [this](const boost::system::error_code& error) {
            if (!error) {
                make_shared_session(std::move(m_socket), m_emit)->start();
            } else {
                std::cerr << error.message() << 'n';
            }
            do_accept();
        });
    }
    tcp::acceptor m_acceptor;
    tcp::socket m_socket;
    std::function<void(std::string&&)> m_emit;
    friend std::ostream& operator<<(std::ostream& out, const service& service) {
        return out << "service object";
    }
};

class mock_service{
public:
    mock_service(std::initializer_list<std::string>&& inits) : m_source{inits}{}
    mock_service(const std::vector<std::string>& other) : m_source(other){}
    mock_service(const mock_service& other) : m_source(other.m_source){}
    mock_service(mock_service&& other) : m_source(std::move(other.m_source)){}
    template <class EmitFunction>
    void on_message(EmitFunction emit) {
        m_emit = emit;
    }
private:
    std::function<void(std::string&&)> m_emit;
    std::vector<std::string> m_source;
};

template <class Sender, class Function,
          class MessageType = typename Sender::value_type>
class sink_impl {
public:
    using value_type = MessageType;
    sink_impl(Sender&& sender, Function function)
        : m_sender(std::move(sender)), m_function(function) {
        m_sender.on_message([this](MessageType&& message) {
            process_message(std::move(message));
        });
    }

    void process_message(MessageType&& message) const {
        std::invoke(m_function, std::move(message));
    }

private:
    Sender m_sender;
    Function m_function;
};

template <class Function>
struct sink_helper {
    Function function;
};

template <class Function>
auto sink(Function&& function) {
    return sink_helper<Function>{std::forward<Function>(function)};
}

template <class Sender, class Function>
auto operator|(Sender&& sender, sink_helper<Function> sink) {
    return sink_impl<Sender, Function>(std::forward<Sender>(sender),
                                          sink.function);
}

int main(int argc, char const* argv[]) {
    boost::asio::io_service event_loop;
    auto sink_to_cerr =
        sink([](const auto& message) { std::cerr << message << 'n'; });
    auto pipeline = service(event_loop) | sink_to_cerr;
    //auto pipeline = mock_service{"", " ", "n", "hello world", "hello worldn"} | sink_to_cerr;  //Compile Error
    std::cerr << "Service is running...n";
    event_loop.run();
}

The compile command is g++ mwe.cc -std=c++17 -lpthread

By the way, my environment is :

OS : Ubuntu-20.04

Compiler: GCC version 9.3.0

Source: Windows Questions C++

LEAVE A COMMENT