Type-erased async operation initiation

1 week ago 11
ARTICLE AD BOX

A while ago, I posted this question and adopted the accepted solution, which works well. I now want a type-erased version, so I can template the sender on the socket type (i.e. template <typename AsyncStream> class sender_impl;) to support multiple transports, but store them all in the same container.

Based on this example, I've sketched out the following (full disclosure, I haven't tried to compile it yet), and I want to make sure I'm on the right track:

#include "asio/buffer.hpp" #include <boost/asio.hpp> #include <deque> #include <iostream> namespace asio = boost::asio; using tcp = asio::ip::tcp; using boost::system::error_code; class sender { public: virtual ~sender() = default; template <typename Token> auto async_send_message(asio::const_buffer buf, Token&& token) { auto init = [this](auto handler, asio::const_buffer buf) { do_async_send_message(buf, std::move(handler)); }; return asio::async_initiate<Token, void(std::error_code)>(init, std::forward<Token>(token), buf); } private: virtual void do_async_send_message(asio::const_buffer buf, asio::any_completion_handler<void(error_code)> h) = 0; }; class sender_impl : public sender, public std::enable_shared_from_this<sender_impl> { public: explicit sender_impl(tcp::socket sock) : _socket(std::move(sock)) {} private: template <typename Token> auto do_async_send_message(asio::const_buffer buf, asio::any_completion_handler<void(error_code)> h) final { auto wrap = asio::consign(std::move(h), asio::make_work_guard(_socket.get_executor())); // Initiate the async operation on the socket's strand asio::dispatch(_socket.get_executor(), // [this, me = shared_from_this(), buf, h = std::move(h)] mutable { async_send_message_impl(buf, std::move(h)); }); } template <typename Handler> void async_send_message_impl(asio::const_buffer buf, Handler h) { _send_queue.emplace_back(buf, std::move(h)); // Queue was empty meaning there's no active send loop, so start one. if (_send_queue.size() == 1) send_loop(); } void send_loop() { if (_send_queue.empty()) return; asio::async_write( // _socket, _send_queue.front().first, [this, me = shared_from_this()](error_code ec, std::size_t) { auto& h = _send_queue.front().second; auto const& ex = asio::get_associated_executor(h, _socket.get_executor()); asio::dispatch(ex, [h = std::move(h), ec]() mutable { h(ec); }); if (!ec.failed()) { // Keep sending until no messages left to send. _send_queue.pop_front(); send_loop(); } }); } using entry = std::pair<asio::const_buffer, asio::any_completion_handler<void(error_code const&)>>; tcp::socket _socket; std::deque<entry> _send_queue; };

In particular, I can see that I'm now calling asio::make_work_guard() and shared_from_this() inside the initiation function, which possibly has implications for lifetimes.

I believe this is fine from a lifetime perspective, if I assume that the initiation function is called before async_initate() returns, but this doc suggests that some completion tokens may choose to defer execution of the initiation function, which presumably would be a problem.

A few questions:

Is this the right approach?

If not, then what?

Do any of the standard completion tokens (callbacks, futures, awaitables) defer the initiation function? If not, can you give me an example of one that might? (EDIT: Silly me, I guess deferred is an obvious example of this? I finally understand what it does now...)

Read Entire Article