I have written a class that attempts to establish a connection with a TCP server provided with a custom timeout and a number of attempts. It is a Callable object that returns an std::future
for a result.
The problems with my initial implementation are:
- the object has to be persistent until either a connection has been established, or it has run out of attempts or a stop case error has occurred. So I have to store it inside my class which I hope to avoid.
- asio composed operations provide means for customization for the control flow on return: a CompletionToken might be a simple callback, a future, or a coroutine could be used. In my case I have bound the user to a future.
This is my initial implementation for a connection attempt with a custom timeout and number of attempts:
template<typename Connection>
class connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
template<typename Endpoint>
using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
explicit connection_attempt(Connection &connection)
: connection_(connection)
{}
template<typename Callable>
explicit connection_attempt(Connection &connection,
Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
// default attempts = infinite_attempts
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint endpoint,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
infinite_attempts(),
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
private:
connection_type &connection_;
asio::steady_timer timer_
{connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};
std::function<bool(const asio::error_code &)> stopOnError_;
std::promise<bool> connectionResult_;
// cancels the connection on timeout!
template<typename Duration>
void startTimer(const Duration &timeout)
{
timer_.expires_after(timeout); // it will automatically cancel a pending timer
timer_.async_wait(
[this, timeout](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop current connection attempt
connection_.cancel();
});
}
void stopTimer()
{
timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
template<typename Duration>
void asyncConnect(endpoint_type endpoint,
size_t attempts,
Duration &&timeout)
{
startTimer(timeout);
connection_.async_connect(endpoint, [this,
endpoint,
attempts,
timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
connectionResult_.set_value(true);
return;
}
const auto attemptsLeft = attempts == infinite_attempts() ?
infinite_attempts() :
attempts - 1;
if ((stopOnError_ &&
stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
connectionResult_.set_value(false);
return;
}
asyncConnect(endpoint,
attemptsLeft,
timeout);
});
}
};
// this should be an asynchornous function with a custom CompletionToken
template<typename Connection,
typename Callable>
auto make_connection_attempt(Connection &connection,
Callable &&stopOnError) -> connection_attempt<Connection>
{
return connection_attempt<Connection>(connection,
std::forward<Callable>(stopOnError));
}
However, I want to be consistent using ASIO and the Universal Model for Asynchronous Operations: control flow on return should be customizable.
I have followed through the example for sending several messages with intervals using a composed operation with a stateful intermediate handler. The handler recursively passes itself as a handler for each next asynchronous operation: async_wait
and async_write
. These calls are always made in turns: one is always invoked when the other has returned. In my case, however, async_wait
and async_connect
are invoked simultaneously:
// initiation method, called first
void operator()(args...)
{
// not valid!
timer.async_wait(std::move(*this)); // from now on this is invalid
connection.async_connect(endpoint, std::move(*this)); can't move this twice
}
This is a code for a class I am trying to implement as an initiation and an intermediate handler:
template<typename Connection, typename CompletionToken>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
enum class state
{
pending,
connected,
timeout
};
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
// TODO: executor type
using executor_type = asio::associated_executor_t<CompletionToken,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return connection_.get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionToken,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
// TODO: constructor to initialize state, pass timeout value?
explicit composed_connection_attempt(connection_type &connection)
: connection_(connection)
{}
template<typename Callable>
composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
// operator for initiation
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration timeout = default_timeout())
{
// Start timer: how to pass this
// Attempt connection
}
// intermediate completion handler
// this may be invoked without an error both by the timer and a connection
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
}
}
private:
Connection &connection_;
asio::steady_timer timer_{this->get_executor()};
std::atomic<state> state_{state::pending};
std::function<bool(const asio::error_code &)> stopOnError_;
std::function<void(const asio::error_code &)> completionHandler_;
};
So, the problems I am trying to resolve:
- How to share ownership of a stateful intermediate handler with both a timer and a connection (socket)? Maybe I have to use nested classes (main class for initiation and nested for timer and socket events)?
- How to determine which of the asynchronous calls resulted in a
void operator()(const asio::error_code&)
invocation? No error might be the result of a successful connection or a timeout. Both asynchronous operations also can returnasio::error::operation_aborted
on cancelation: the connection attempt is cancelled on timeout, timer is cancelled on success or on connection error.
Source: Windows Questions C++