Alex Alex - 3 months ago 29
C++ Question

Boost ASIO threadpool with a thread timeout

I'm using the Boost ASIO library as a threadpool, which is widely described. However, I want to interrupt each thread, should the thread process for longer than 1 second and move onto the next posted task for the thread.

I can easily implement this using a separate deadline_timer, which is reset if the thread finishes before the deadline or interrupts the thread should the task go on for too long. However I assumed this would be built into ASIO. As it seems natural to have a task, with a timeout for network operations. But I can't see anything in the API for it, to do that succinctly.

Can anyone tell me if this functionality already exists? Or should I implement it the way I described?

Answer

Here's a quick solution I knocked together.

It requires that your submitted function objects accept an argument of type exec_context.

The task running in the io_service can query the .canceled() accessor (which is atomic) to determine whether it should cancel early.

It can then either throw an exception or return whatever value it intended to return.

The caller submits via the submit function. This function wraps the worker function with the context object and marshals its return value and/or exception into a std::future.

The caller can then query or wait on this future (or ignore it) as appropriate.

The caller gets a handle object, which has the method cancel() on it. Using this handle, the caller can either cancel, query or wait on the submitted task.

Hope it helps. It was fun to write.

#include <boost/asio.hpp>
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <future>
#include <stdexcept>
#include <exception>
#include <utility>
#include <type_traits>


//
// an object to allow the caller to communicate a cancellation request to the
// submitted task
//
struct exec_controller
{
    /// @returns previous cancellation request state;
    bool notify_cancel()
    {
        return _should_cancel.exchange(true);
    }

    bool should_cancel() const {
        return _should_cancel;
    }

private:
    std::atomic<bool> _should_cancel = { false };
};

template<class Ret>
struct exec_state : std::enable_shared_from_this<exec_state<Ret>>
{
    using return_type = Ret;

    bool notify_cancel() {
        return _controller.notify_cancel();
    }

    std::shared_ptr<exec_controller>
    get_controller_ptr() {
        return std::shared_ptr<exec_controller>(this->shared_from_this(),
                                                std::addressof(_controller));
    }

    std::promise<return_type>& promise() { return _promise; }

private:
    std::promise<return_type> _promise;
    exec_controller _controller;
};

struct applyer;

struct exec_context
{
    exec_context(std::shared_ptr<exec_controller> impl)
    : _impl(impl)
    {}

    bool canceled() const {
        return _impl->should_cancel();
    }

private:
    friend applyer;
    std::shared_ptr<exec_controller> _impl;
};

struct applyer
{
    template<class F, class Ret>
    void operator()(F& f, std::shared_ptr<exec_state<Ret>> const& p) const
    {
        try {
            p->promise().set_value(f(exec_context { p->get_controller_ptr() }));
        }
        catch(...) {
            p->promise().set_exception(std::current_exception());
        }
    }

    template<class F>
    void operator()(F& f, std::shared_ptr<exec_state<void>> const& p) const
    {
        try {
            f(exec_context { p->get_controller_ptr() });
            p->promise().set_value();
        }
        catch(...) {
            p->promise().set_exception(std::current_exception());
        }
    }
};

template<class Ret>
struct exec_result
{
    using return_type = Ret;
    exec_result(std::shared_ptr<exec_state<return_type>> p)
    : _impl(p)
    {}

    bool cancel() {
        return _impl->notify_cancel();
    }

    std::future<Ret>& get_future()
    {
        return _future;
    }

private:

    std::shared_ptr<exec_state<return_type>> _impl;
    std::future<return_type> _future { _impl->promise().get_future() };
};


template<class Executor, class F>
auto submit(Executor& exec, F&& f)
{
    using function_type = std::decay_t<F>;
    using result_type = std::result_of_t<function_type(exec_context)>;
    using state_type = exec_state<result_type>;
    auto shared_state = std::make_shared<state_type>();
    exec.post([shared_state, f = std::forward<F>(f)]
              {
                  applyer()(f, shared_state);
              });
    return exec_result<result_type>(std::move(shared_state));
}


int main()
{
    using namespace std::literals;

    boost::asio::io_service ios;
    boost::asio::io_service::strand strand(ios);
    boost::asio::io_service::work work(ios);

    std::thread runner([&] { ios.run(); });
    std::thread runner2([&] { ios.run(); });

    auto func = [](auto context)
    {
        for(int i = 0 ; i < 1000 ; ++i)
        {
            if (context.canceled())
                throw std::runtime_error("canceled");
            std::this_thread::sleep_for(100ms);
        }
    };

    auto handle = submit(strand, func);
    auto handle2 = submit(ios, [](auto context) { return 2 + 2; });
    // cancel the handle, or wait on it as you wish

    std::this_thread::sleep_for(1s);
    handle.cancel();
    handle2.cancel();   // prove that late cancellation is a nop
    try {
        std::cout << "2 + 2 is " << handle2.get_future().get() << std::endl;
    }
    catch(std::exception& e)
    {
        std::cerr << "failed to add 2 + 2 : " << e.what() << std::endl;
    }
    try {
        handle.get_future().get();
        std::cout << "task completed" << std::endl;
    }
    catch(std::exception const& e) {
        std::cout << "task threw exception: " << e.what() << std::endl;
    }

    ios.stop();
    runner.join();
    runner2.join();
}

update: v2 adds some privacy protection to the classes, demonstrates 2 simultaneous tasks.

expected output:

2 + 2 is 4
task threw exception: canceled