11

Is it possible to perform an asynchronous wait (read : non-blocking) on a conditional variable in boost::asio ? if it isn't directly supported any hints on implementing it would be appreciated.

I could implement a timer and fire a wakeup even every few ms, but this is approach is vastly inferior, I find it hard to believe that condition variable synchronization is not implemented / documented.

Hassan Syed
  • 19,697
  • 11
  • 84
  • 166
  • what do you want to do??? - is maybe the last version of [async_read_until](http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/async_read_until.html) what you're looking for? non-blocking waiting is normaly a task for [boost thread](http://www.boost.org/doc/libs/1_47_0/doc/html/thread.html) ... boost thread in combination with boost asio should work ... – jenseb Jul 21 '11 at 13:14
  • I have an alternative implementation in mind, I have outlined it in another question here. http://stackoverflow.com/questions/6776779/boost-asio-multi-io-service-rpc-framework-design-rfc This might give you more insight into what I want to achive. – Hassan Syed Jul 21 '11 at 13:31

3 Answers3

7

If I understand the intent correctly, you want to launch an event handler, when some condition variable is signaled, in context of asio thread pool? I think it would be sufficient to wait on the condition variable in the beginning of the handler, and io_service::post() itself back in the pool in the end, something of this sort:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}
Cubbi
  • 44,959
  • 13
  • 99
  • 162
  • 6
    but the thread that waits will get blocked, isn't there a way of not blocking a thread, but to register a completion handler instead ? I am currently consdiering an alternative mechanism here http://stackoverflow.com/questions/6776779/boost-asio-multi-io-service-rpc-framework-design-rfc – Hassan Syed Jul 21 '11 at 13:27
  • @Hassan Syed : a condition variable is a concept that involves a blocked thread. Perhaps you are looking for async signals instead? boost.asio just added support for signal handlers in 1.47.0: http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/history.html – Cubbi Jul 21 '11 at 13:35
  • As far as I can tell, these are signals that are emited by the operating system. It is shown that you can register for these signals, but it is the OS that will emit them. – Hassan Syed Jul 21 '11 at 13:49
  • your answer is correct, I was operating under the assumption that `io_service::run()` is a blocking call for the callee, and that asio takes care of synchronization inately somehow. I'm glad this assumption isn't true. – Hassan Syed Jul 22 '11 at 15:49
  • 1
    The `io_service::post` link in the answer is broken. Was `io_service::post` removed? It doesn't appear in the current asio doc's reference section. – Praxeolitic Jan 06 '17 at 04:56
0

I can suggest solution based on boost::asio::deadline_timer which works fine for me. This is kind of async event in boost::asio environment. One very important thing is that the 'handler' must be serialised through the same 'strand_' as 'cancel', because using 'boost::asio::deadline_timer' from multiple threads is not thread safe.

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};
0

Unfortunately, Boost ASIO doesn't have an async_wait_for_condvar() method.

In most cases, you also won't need it. Programming the ASIO way usually means, that you use strands, not mutexes or condition variables, to protect shared resources. Except for rare cases, which usually focus around correct construction or destruction order at startup and exit, you won't need mutexes or condition variables at all.

When modifying a shared resource, the classic, partially synchronous threaded way is as follows:

  • Lock the mutex protecting the resource
  • Update whatever needs to be updated
  • Signal a condition variable, if further processing by a waiting thread is required
  • Unlock the mutex

The fully asynchronous ASIO way is though:

  • Generate a message, that contains everything, that is needed to update the resource
  • Post a call to an update handler with that message to the resource's strand
  • If further processing is needed, let that update handler create further message(s) and post them to the apropriate resources' strands.
  • If jobs can be executed on fully private data, then post them directly to the io-context instead.

Here is an example of a class some_shared_resource, that receives a string state and triggers some further processing depending on the state received. Please note, that all processing in the private method some_shared_resource::receive_state() is fully thread-safe, as the strand serializes all calls.

Of course, the example is not complete; some_other_resource needs a similiar send_code_red() method as some_shared_ressource::send_state().

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}

    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

As you see, posting always into ASIO's strands can be a bit tedious at first. But you can move most of that "equip a class with a strand" code into a template.

The good thing about message passing: As you are not using mutexes, you cannot deadlock yourself anymore, even in extreme situations. Also, using message passing, it is often easier to create a high level of parallelity than with classical multithreading. On the downside, moving and copying around all these message objects is time consuming, which can slow down your application.

A last note: Using the weak pointer in the message formed by send_state() facilitates the reliable destruction of some_shared_resource objects: Otherwise, if A calls B and B calls C and C calls A (possibly only after a timeout or similiar), using shared pointers instead of weak pointers in the messages would create cyclic references, which then prevents object destruction. If you are sure, that you never will have cycles, and that processing messages from to-be-deleted objects doesn't pose a problem, you can use shared_from_this() instead of weak_from_this(), of course. If you are sure, that objects won't get deleted before ASIO has been stopped (and all working threads been joined back to the main thread), then you can also directly capture the this pointer instead.

Kai Petzke
  • 1,487
  • 15
  • 24