connection attempt with timout as a composed operation using ASIO

  asynchronous, boost-asio, c++, c++14

I have written a class that attempts to establish a connection with a TCP server provided with a custom timeout and a number of attempts. It is a Callable object that returns an std::future for a result.
The problems with my initial implementation are:

  • the object has to be persistent until either a connection has been established, or it has run out of attempts or a stop case error has occurred. So I have to store it inside my class which I hope to avoid.
  • asio composed operations provide means for customization for the control flow on return: a CompletionToken might be a simple callback, a future, or a coroutine could be used. In my case I have bound the user to a future.

This is my initial implementation for a connection attempt with a custom timeout and number of attempts:

    template<typename Connection>
    class connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        template<typename Endpoint>
        using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        explicit connection_attempt(Connection &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        explicit connection_attempt(Connection &connection,
                                    Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint &&endpoint,
                                     size_t attempts,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         attempts,
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

        // default attempts = infinite_attempts
        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint endpoint,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         infinite_attempts(),
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

    private:
        connection_type &connection_;
        asio::steady_timer timer_
                {connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};

        std::function<bool(const asio::error_code &)> stopOnError_;
        std::promise<bool> connectionResult_;

        // cancels the connection on timeout!
        template<typename Duration>
        void startTimer(const Duration &timeout)
        {
            timer_.expires_after(timeout); // it will automatically cancel a pending timer

            timer_.async_wait(
                    [this, timeout](const asio::error_code &errorCode)
                    {
                        // will occur on connection error before timeout
                        if (errorCode == asio::error::operation_aborted)
                            return;

                        // TODO: handle timer errors? What are the possible errors?
                        assert(!errorCode && "unexpected timer error!");

                        // stop current connection attempt
                        connection_.cancel();
                    });
        }

        void stopTimer()
        {
            timer_.cancel();
        }

        /**
         * Will be trying to connect until:<br>
         * - has run out of attempts
         * - has been required to stop by stopOnError callback (if it was set)
         * @param endpoint
         * @param attempts
         */
        template<typename Duration>
        void asyncConnect(endpoint_type endpoint,
                          size_t attempts,
                          Duration &&timeout)
        {
            startTimer(timeout);

            connection_.async_connect(endpoint, [this,
                    endpoint,
                    attempts,
                    timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
            {
                if (!errorCode)
                {
                    stopTimer();
                    connectionResult_.set_value(true);
                    return;
                }
                
                const auto attemptsLeft = attempts == infinite_attempts() ?
                                          infinite_attempts() :
                                          attempts - 1;

                if ((stopOnError_ &&
                     stopOnError_(errorCode == asio::error::operation_aborted ?
                                  // special case for operation_aborted on timer expiration - need to send timed_out explicitly
                                  // this should only be resulted from the timer calling cancel()
                                  asio::error::timed_out :
                                  errorCode)) ||
                    !attemptsLeft)
                {
                    stopTimer();
                    connectionResult_.set_value(false);
                    return;
                }

                asyncConnect(endpoint,
                             attemptsLeft,
                             timeout);
            });
        }
    };

    // this should be an asynchornous function with a custom CompletionToken
    template<typename Connection,
            typename Callable>
    auto make_connection_attempt(Connection &connection,
                                 Callable &&stopOnError) -> connection_attempt<Connection>
    {
        return connection_attempt<Connection>(connection,
                                              std::forward<Callable>(stopOnError));
    }

However, I want to be consistent using ASIO and the Universal Model for Asynchronous Operations: control flow on return should be customizable.
I have followed through the example for sending several messages with intervals using a composed operation with a stateful intermediate handler. The handler recursively passes itself as a handler for each next asynchronous operation: async_wait and async_write. These calls are always made in turns: one is always invoked when the other has returned. In my case, however, async_wait and async_connect are invoked simultaneously:

// initiation method, called first
void operator()(args...)
{
    // not valid!
    timer.async_wait(std::move(*this)); // from now on this is invalid
    connection.async_connect(endpoint, std::move(*this)); can't move this twice
}

This is a code for a class I am trying to implement as an initiation and an intermediate handler:

template<typename Connection, typename CompletionToken>
    class composed_connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        enum class state
        {
            pending,
            connected,
            timeout
        };

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        // TODO: executor type
        using executor_type = asio::associated_executor_t<CompletionToken,
                typename connection_type::executor_type>;

        executor_type get_executor() const noexcept
        {
            // TODO: get completion handler executor
            return connection_.get_executor();
        }

        // TODO: allocator type
        using allocator_type = typename asio::associated_allocator_t<CompletionToken,
                std::allocator<void>>;

        allocator_type get_allocator() const noexcept
        {
            // TODO: get completion handler allocator
            return allocator_type();
        }

        // TODO: constructor to initialize state, pass timeout value?
        explicit composed_connection_attempt(connection_type &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        // operator for initiation
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        size_t attempts,
                        Duration timeout = default_timeout())
        {
            // Start timer: how to pass this
            // Attempt connection
        }

        // intermediate completion handler
        // this may be invoked without an error both by the timer and a connection
        void operator()(const asio::error_code &errorCode)
        {
            if (!errorCode)
            {

            }
        }


    private:
        Connection &connection_;
        asio::steady_timer timer_{this->get_executor()};
        std::atomic<state> state_{state::pending};
        std::function<bool(const asio::error_code &)> stopOnError_;
        std::function<void(const asio::error_code &)> completionHandler_;
    };

So, the problems I am trying to resolve:

  1. How to share ownership of a stateful intermediate handler with both a timer and a connection (socket)? Maybe I have to use nested classes (main class for initiation and nested for timer and socket events)?
  2. How to determine which of the asynchronous calls resulted in a void operator()(const asio::error_code&) invocation? No error might be the result of a successful connection or a timeout. Both asynchronous operations also can return asio::error::operation_aborted on cancelation: the connection attempt is cancelled on timeout, timer is cancelled on success or on connection error.

Source: Windows Questions C++

LEAVE A COMMENT