Git Product home page Git Product logo

cppcoro's Introduction

CppCoro - A coroutine library for C++

The 'cppcoro' library provides a large set of general-purpose primitives for making use of the coroutines TS proposal described in N4680.

These include:

This library is an experimental library that is exploring the space of high-performance, scalable asynchronous programming abstractions that can be built on top of the C++ coroutines proposal.

It has been open-sourced in the hope that others will find it useful and that the C++ community can provide feedback on it and ways to improve it.

It requires a compiler that supports the coroutines TS:

  • Windows + Visual Studio 2017 Windows Build Status
  • Linux + Clang 5.0/6.0 + libc++ Build Status

The Linux version is functional except for the io_context and file I/O related classes which have not yet been implemented for Linux (see issue #15 for more info).

Class Details

task<T>

A task represents an asynchronous computation that is executed lazily in that the execution of the coroutine does not start until the task is awaited.

Example:

#include <cppcoro/read_only_file.hpp>
#include <cppcoro/task.hpp>

cppcoro::task<int> count_lines(std::string path)
{
  auto file = co_await cppcoro::read_only_file::open(path);

  int lineCount = 0;

  char buffer[1024];
  size_t bytesRead;
  std::uint64_t offset = 0;
  do
  {
    bytesRead = co_await file.read(offset, buffer, sizeof(buffer));
    lineCount += std::count(buffer, buffer + bytesRead, '\n');
    offset += bytesRead;
  } while (bytesRead > 0);

  co_return lineCount;
}

cppcoro::task<> usage_example()
{
  // Calling function creates a new task but doesn't start
  // executing the coroutine yet.
  cppcoro::task<int> countTask = count_lines("foo.txt");

  // ...

  // Coroutine is only started when we later co_await the task.
  int lineCount = co_await countTask;

  std::cout << "line count = " << lineCount << std::endl;
}

API Overview:

// <cppcoro/task.hpp>
namespace cppcoro
{
  template<typename T>
  class task
  {
  public:

    using promise_type = <unspecified>;
    using value_type = T;

    task() noexcept;

    task(task&& other) noexcept;
    task& operator=(task&& other);

    // task is a move-only type.
    task(const task& other) = delete;
    task& operator=(const task& other) = delete;

    // Query if the task result is ready.
    bool is_ready() const noexcept;

    // Wait for the task to complete and return the result or rethrow the
    // exception if the operation completed with an unhandled exception.
    //
    // If the task is not yet ready then the awaiting coroutine will be
    // suspended until the task completes. If the the task is_ready() then
    // this operation will return the result synchronously without suspending.
    Awaiter<T&> operator co_await() const & noexcept;
    Awaiter<T&&> operator co_await() const && noexcept;

    // Returns an awaitable that can be co_await'ed to suspend the current
    // coroutine until the task completes.
    //
    // The 'co_await t.when_ready()' expression differs from 'co_await t' in
    // that when_ready() only performs synchronization, it does not return
    // the result or rethrow the exception.
    //
    // This can be useful if you want to synchronize with the task without
    // the possibility of it throwing an exception.
    Awaitable<void> when_ready() const noexcept;
  };

  template<typename T>
  void swap(task<T>& a, task<T>& b);

  // Creates a task that yields the result of co_await'ing the specified awaitable.
  //
  // This can be used as a form of type-erasure of the concrete awaitable, allowing
  // different awaitables that return the same await-result type to be stored in
  // the same task<RESULT> type.
  template<
    typename AWAITABLE,
    typename RESULT = typename awaitable_traits<AWAITABLE>::await_result_t>
  task<RESULT> make_task(AWAITABLE awaitable);
}

You can create a task<T> object by calling a coroutine function that returns a task<T>.

The coroutine must contain a usage of either co_await or co_return. Note that a task<T> coroutine may not use the co_yield keyword.

When a coroutine that returns a task<T> is called, a coroutine frame is allocated if necessary and the parameters are captured in the coroutine frame. The coroutine is suspended at the start of the coroutine body and execution is returned to the caller and a task<T> value that represents the asynchronous computation is returned from the function call.

The coroutine body will start executing when the task<T> value is co_awaited. This will suspend the awaiting coroutine and start execution of the coroutine associated with the awaited task<T> value.

The awaiting coroutine will later be resumed on the thread that completes execution of the awaited task<T>'s coroutine. ie. the thread that executes the co_return or that throws an unhandled exception that terminates execution of the coroutine.

If the task has already run to completion then awaiting it again will obtain the already-computed result without suspending the awaiting coroutine.

If the task object is destroyed before it is awaited then the coroutine never executes and the destructor simply destructs the captured parameters and frees any memory used by the coroutine frame.

shared_task<T>

The shared_task<T> class is a coroutine type that yields a single value asynchronously.

It is 'lazy' in that execution of the task does not start until it is awaited by some coroutine.

It is 'shared' in that the task value can be copied, allowing multiple references to the result of the task to be created. It also allows multiple coroutines to concurrently await the result.

The task will start executing on the thread that first co_awaits the task. Subsequent awaiters will either be suspended and be queued for resumption when the task completes or will continue synchronously if the task has already run to completion.

If an awaiter is suspended while waiting for the task to complete then it will be resumed on the thread that completes execution of the task. ie. the thread that executes the co_return or that throws the unhandled exception that terminates execution of the coroutine.

API Summary

namespace cppcoro
{
  template<typename T = void>
  class shared_task
  {
  public:

    using promise_type = <unspecified>;
    using value_type = T;

    shared_task() noexcept;
    shared_task(const shared_task& other) noexcept;
    shared_task(shared_task&& other) noexcept;
    shared_task& operator=(const shared_task& other) noexcept;
    shared_task& operator=(shared_task&& other) noexcept;

    void swap(shared_task& other) noexcept;

    // Query if the task has completed and the result is ready.
    bool is_ready() const noexcept;

    // Returns an operation that when awaited will suspend the
    // current coroutine until the task completes and the result
    // is available.
    //
    // The type of the result of the 'co_await someTask' expression
    // is an l-value reference to the task's result value (unless T
    // is void in which case the expression has type 'void').
    // If the task completed with an unhandled exception then the
    // exception will be rethrown by the co_await expression.
    Awaiter<T&> operator co_await() const noexcept;

    // Returns an operation that when awaited will suspend the
    // calling coroutine until the task completes and the result
    // is available.
    //
    // The result is not returned from the co_await expression.
    // This can be used to synchronize with the task without the
    // possibility of the co_await expression throwing an exception.
    Awaiter<void> when_ready() const noexcept;

  };

  template<typename T>
  bool operator==(const shared_task<T>& a, const shared_task<T>& b) noexcept;
  template<typename T>
  bool operator!=(const shared_task<T>& a, const shared_task<T>& b) noexcept;

  template<typename T>
  void swap(shared_task<T>& a, shared_task<T>& b) noexcept;

  // Wrap an awaitable value in a shared_task to allow multiple coroutines
  // to concurrently await the result.
  template<
    typename AWAITABLE,
    typename RESULT = typename awaitable_traits<AWAITABLE>::await_result_t>
  shared_task<RESULT> make_shared_task(AWAITABLE awaitable);
}

All const-methods on shared_task<T> are safe to call concurrently with other const-methods on the same instance from multiple threads. It is not safe to call non-const methods of shared_task<T> concurrently with any other method on the same instance of a shared_task<T>.

Comparison to task<T>

The shared_task<T> class is similar to task<T> in that the task does not start execution immediately upon the coroutine function being called. The task only starts executing when it is first awaited.

It differs from task<T> in that the resulting task object can be copied, allowing multiple task objects to reference the same asynchronous result. It also supports multiple coroutines concurrently awaiting the result of the task.

The trade-off is that the result is always an l-value reference to the result, never an r-value reference (since the result may be shared) which may limit ability to move-construct the result into a local variable. It also has a slightly higher run-time cost due to the need to maintain a reference count and support multiple awaiters.

generator<T>

A generator represents a coroutine type that produces a sequence of values of type, T, where values are produced lazily and synchronously.

The coroutine body is able to yield values of type T using the co_yield keyword. Note, however, that the coroutine body is not able to use the co_await keyword; values must be produced synchronously.

For example:

cppcoro::generator<const std::uint64_t> fibonacci()
{
  std::uint64_t a = 0, b = 1;
  while (true)
  {
    co_yield b;
    auto tmp = a;
    a = b;
    b += tmp;
  }
}

void usage()
{
  for (auto i : fibonacci())
  {
    if (i > 1'000'000) break;
    std::cout << i << std::endl;
  }
}

When a coroutine function returning a generator<T> is called the coroutine is created initially suspended. Execution of the coroutine enters the coroutine body when the generator<T>::begin() method is called and continues until either the first co_yield statement is reached or the coroutine runs to completion.

If the returned iterator is not equal to the end() iterator then dereferencing the iterator will return a reference to the value passed to the co_yield statement.

Calling operator++() on the iterator will resume execution of the coroutine and continue until either the next co_yield point is reached or the coroutine runs to completion().

Any unhandled exceptions thrown by the coroutine will propagate out of the begin() or operator++() calls to the caller.

API Summary:

namespace cppcoro
{
    template<typename T>
    class generator
    {
    public:

        using promise_type = <unspecified>;

        class iterator
        {
        public:
            using iterator_category = std::input_iterator_tag;
            using value_type = std::remove_reference_t<T>;
            using reference = value_type&;
            using pointer = value_type*;
            using difference_type = std::size_t;

            iterator(const iterator& other) noexcept;
            iterator& operator=(const iterator& other) noexcept;

            // If the generator coroutine throws an unhandled exception before producing
            // the next element then the exception will propagate out of this call.
            iterator& operator++();

            reference operator*() const noexcept;
            pointer operator->() const noexcept;

            bool operator==(const iterator& other) const noexcept;
            bool operator!=(const iterator& other) const noexcept;
        };

        // Constructs to the empty sequence.
        generator() noexcept;

        generator(generator&& other) noexcept;
        generator& operator=(generator&& other) noexcept;

        generator(const generator& other) = delete;
        generator& operator=(const generator&) = delete;

        ~generator();

        // Starts executing the generator coroutine which runs until either a value is yielded
        // or the coroutine runs to completion or an unhandled exception propagates out of the
        // the coroutine.
        iterator begin();

        iterator end() noexcept;

        // Swap the contents of two generators.
        void swap(generator& other) noexcept;

    };

    template<typename T>
    void swap(generator<T>& a, generator<T>& b) noexcept;

    // Apply function, func, lazily to each element of the source generator
    // and yield a sequence of the results of calls to func().
    template<typename FUNC, typename T>
    generator<std::invoke_result_t<FUNC, T&>> fmap(FUNC func, generator<T> source);
}

recursive_generator<T>

A recursive_generator is similar to a generator except that it is designed to more efficiently support yielding the elements of a nested sequence as elements of an outer sequence.

In addition to being able to co_yield a value of type T you can also co_yield a value of type recursive_generator<T>.

When you co_yield a recursive_generator<T> value the all elements of the yielded generator are yielded as elements of the current generator. The current coroutine is suspended until the consumer has finished consuming all elements of the nested generator, after which point execution of the current coroutine will resume execution to produce the next element.

The benefit of recursive_generator<T> over generator<T> for iterating over recursive data-structures is that the iterator::operator++() is able to directly resume the leaf-most coroutine to produce the next element, rather than having to resume/suspend O(depth) coroutines for each element. The down-side is that there is additional overhead

For example:

// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory(std::filesystem::path path);

cppcoro::recursive_generator<dir_entry> list_directory_recursive(std::filesystem::path path)
{
  for (auto& entry : list_directory(path))
  {
    co_yield entry;
    if (entry.is_directory())
    {
      co_yield list_directory_recursive(entry.path());
    }
  }
}

Note that applying the fmap() operator to a recursive_generator<T> will yield a generator<U> type rather than a recursive_generator<U>. This is because uses of fmap are generally not used in recursive contexts and we try to avoid the extra overhead incurred by recursive_generator.

async_generator<T>

An async_generator represents a coroutine type that produces a sequence of values of type, T, where values are produced lazily and values may be produced asynchronously.

The coroutine body is able to use both co_await and co_yield expressions.

Consumers of the generator can use a for co_await range-based for-loop to consume the values.

Example

cppcoro::async_generator<int> ticker(int count, threadpool& tp)
{
  for (int i = 0; i < count; ++i)
  {
    co_await tp.delay(std::chrono::seconds(1));
    co_yield i;
  }
}

cppcoro::task<> consumer(threadpool& tp)
{
  auto sequence = ticker(10, tp);
  for co_await(std::uint32_t i : sequence)
  {
    std::cout << "Tick " << i << std::endl;
  }
}

API Summary

// <cppcoro/async_generator.hpp>
namespace cppcoro
{
  template<typename T>
  class async_generator
  {
  public:

    class iterator
    {
    public:
      using iterator_tag = std::forward_iterator_tag;
      using difference_type = std::size_t;
      using value_type = std::remove_reference_t<T>;
      using reference = value_type&;
      using pointer = value_type*;

      iterator(const iterator& other) noexcept;
      iterator& operator=(const iterator& other) noexcept;

      // Resumes the generator coroutine if suspended
      // Returns an operation object that must be awaited to wait
      // for the increment operation to complete.
      // If the coroutine runs to completion then the iterator
      // will subsequently become equal to the end() iterator.
      // If the coroutine completes with an unhandled exception then
      // that exception will be rethrown from the co_await expression.
      Awaitable<iterator&> operator++() noexcept;

      // Dereference the iterator.
      pointer operator->() const noexcept;
      reference operator*() const noexcept;

      bool operator==(const iterator& other) const noexcept;
      bool operator!=(const iterator& other) const noexcept;
    };

    // Construct to the empty sequence.
    async_generator() noexcept;
    async_generator(const async_generator&) = delete;
    async_generator(async_generator&& other) noexcept;
    ~async_generator();

    async_generator& operator=(const async_generator&) = delete;
    async_generator& operator=(async_generator&& other) noexcept;

    void swap(async_generator& other) noexcept;

    // Starts execution of the coroutine and returns an operation object
    // that must be awaited to wait for the first value to become available.
    // The result of co_await'ing the returned object is an iterator that
    // can be used to advance to subsequent elements of the sequence.
    //
    // This method is not valid to be called once the coroutine has
    // run to completion.
    Awaitable<iterator> begin() noexcept;
    iterator end() noexcept;

  };

  template<typename T>
  void swap(async_generator<T>& a, async_generator<T>& b);

  // Apply 'func' to each element of the source generator, yielding a sequence of
  // the results of calling 'func' on the source elements.
  template<typename FUNC, typename T>
  async_generator<std::invoke_result_t<FUNC, T&>> fmap(FUNC func, async_generator<T> source);
}

Early termination of an async_generator

When the async_generator object is destructed it requests cancellation of the underlying coroutine. If the coroutine has already run to completion or is currently suspended in a co_yield expression then the coroutine is destroyed immediately. Otherwise, the coroutine will continue execution until it either runs to completion or reaches the next co_yield expression.

When the coroutine frame is destroyed the destructors of all variables in scope at that point will be executed to ensure the resources of the generator are cleaned up.

Note that the caller must ensure that the async_generator object must not be destroyed while a consumer coroutine is executing a co_await expression waiting for the next item to be produced.

single_consumer_event

This is a simple manual-reset event type that supports only a single coroutine awaiting it at a time. This can be used to

API Summary:

// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
  class single_consumer_event
  {
  public:
    single_consumer_event(bool initiallySet = false) noexcept;
    bool is_set() const noexcept;
    void set();
    void reset() noexcept;
    Awaiter<void> operator co_await() const noexcept;
  };
}

Example:

#include <cppcoro/single_consumer_event.hpp>

cppcoro::single_consumer_event event;
std::string value;

cppcoro::task<> consumer()
{
  // Coroutine will suspend here until some thread calls event.set()
  // eg. inside the producer() function below.
  co_await event;

  std::cout << value << std::endl;
}

void producer()
{
  value = "foo";

  // This will resume the consumer() coroutine inside the call to set()
  // if it is currently suspended.
  event.set();
}

single_consumer_async_auto_reset_event

This class provides an async synchronization primitive that allows a single coroutine to wait until the event is signalled by a call to the set() method.

Once the coroutine that is awaiting the event is released by either a prior or subsequent call to set() the event is automatically reset back to the 'not set' state.

This class is a more efficient version of async_auto_reset_event that can be used in cases where only a single coroutine will be awaiting the event at a time. If you need to support multiple concurrent awaiting coroutines on the event then use the async_auto_reset_event class instead.

API Summary:

// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
  class single_consumer_async_auto_reset_event
  {
  public:

    single_consumer_async_auto_reset_event(
      bool initiallySet = false) noexcept;

    // Change the event to the 'set' state. If a coroutine is awaiting the
    // event then the event is immediately transitioned back to the 'not set'
    // state and the coroutine is resumed.
    void set() noexcept;

    // Returns an Awaitable type that can be awaited to wait until
    // the event becomes 'set' via a call to the .set() method. If
    // the event is already in the 'set' state then the coroutine
    // continues without suspending.
    // The event is automatically reset back to the 'not set' state
    // before resuming the coroutine.
    Awaiter<void> operator co_await() const noexcept;

  };
}

Example Usage:

std::atomic<int> value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;

cppcoro::task<> wait_until_value_is_below(int limit)
{
  while (value.load(std::memory_order_relaxed) >= limit)
  {
    // Wait until there has been some change that we're interested in.
    co_await valueDecreasedEvent;
  }
}

void change_value(int delta)
{
  value.fetch_add(delta, std::memory_order_relaxed);
  // Notify the waiter if there has been some change.
  if (delta < 0) valueDecreasedEvent.set();
}

async_mutex

Provides a simple mutual exclusion abstraction that allows the caller to 'co_await' the mutex from within a coroutine to suspend the coroutine until the mutex lock is acquired.

The implementation is lock-free in that a coroutine that awaits the mutex will not block the thread but will instead suspend the coroutine and later resume it inside the call to unlock() by the previous lock-holder.

API Summary:

// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
  class async_mutex_lock;
  class async_mutex_lock_operation;
  class async_mutex_scoped_lock_operation;

  class async_mutex
  {
  public:
    async_mutex() noexcept;
    ~async_mutex();

    async_mutex(const async_mutex&) = delete;
    async_mutex& operator(const async_mutex&) = delete;

    bool try_lock() noexcept;
    async_mutex_lock_operation lock_async() noexcept;
    async_mutex_scoped_lock_operation scoped_lock_async() noexcept;
    void unlock();
  };

  class async_mutex_lock_operation
  {
  public:
    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
    void await_resume() const noexcept;
  };

  class async_mutex_scoped_lock_operation
  {
  public:
    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
    [[nodiscard]] async_mutex_lock await_resume() const noexcept;
  };

  class async_mutex_lock
  {
  public:
    // Takes ownership of the lock.
    async_mutex_lock(async_mutex& mutex, std::adopt_lock_t) noexcept;

    // Transfer ownership of the lock.
    async_mutex_lock(async_mutex_lock&& other) noexcept;

    async_mutex_lock(const async_mutex_lock&) = delete;
    async_mutex_lock& operator=(const async_mutex_lock&) = delete;

    // Releases the lock by calling unlock() on the mutex.
    ~async_mutex_lock();
  };
}

Example usage:

#include <cppcoro/async_mutex.hpp>
#include <cppcoro/task.hpp>
#include <set>
#include <string>

cppcoro::async_mutex mutex;
std::set<std::string> values;

cppcoro::task<> add_item(std::string value)
{
  cppcoro::async_mutex_lock lock = co_await mutex.scoped_lock_async();
  values.insert(std::move(value));
}

async_manual_reset_event

A manual-reset event is a coroutine/thread-synchronization primitive that allows one or more threads to wait until the event is signalled by a thread that calls set().

The event is in one of two states; 'set' and 'not set'.

If the event is in the 'set' state when a coroutine awaits the event then the coroutine continues without suspending. However if the coroutine is in the 'not set' state then the coroutine is suspended until some thread subsequently calls the set() method.

Any threads that were suspended while waiting for the event to become 'set' will be resumed inside the next call to set() by some thread.

Note that you must ensure that no coroutines are awaiting a 'not set' event when the event is destructed as they will not be resumed.

Example:

cppcoro::async_manual_reset_event event;
std::string value;

void producer()
{
  value = get_some_string_value();

  // Publish a value by setting the event.
  event.set();
}

// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer()
{
  // Wait until value has been published by awaiting event.
  co_await event;

  consume_value(value);
}

API Summary:

namespace cppcoro
{
  class async_manual_reset_event_operation;

  class async_manual_reset_event
  {
  public:
    async_manual_reset_event(bool initiallySet = false) noexcept;
    ~async_manual_reset_event();

    async_manual_reset_event(const async_manual_reset_event&) = delete;
    async_manual_reset_event(async_manual_reset_event&&) = delete;
    async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
    async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

    // Wait until the event becomes set.
    async_manual_reset_event_operation operator co_await() const noexcept;

    bool is_set() const noexcept;

    void set() noexcept;

    void reset() noexcept;

  };

  class async_manual_reset_event_operation
  {
  public:
    async_manual_reset_event_operation(async_manual_reset_event& event) noexcept;

    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
    void await_resume() const noexcept;
  };
}

async_auto_reset_event

An auto-reset event is a coroutine/thread-synchronization primitive that allows one or more threads to wait until the event is signalled by a thread by calling set().

Once a coroutine that is awaiting the event is released by either a prior or subsequent call to set() the event is automatically reset back to the 'not set' state.

API Summary:

// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
  class async_auto_reset_event_operation;

  class async_auto_reset_event
  {
  public:

    async_auto_reset_event(bool initiallySet = false) noexcept;

    ~async_auto_reset_event();

    async_auto_reset_event(const async_auto_reset_event&) = delete;
    async_auto_reset_event(async_auto_reset_event&&) = delete;
    async_auto_reset_event& operator=(const async_auto_reset_event&) = delete;
    async_auto_reset_event& operator=(async_auto_reset_event&&) = delete;

    // Wait for the event to enter the 'set' state.
    //
    // If the event is already 'set' then the event is set to the 'not set'
    // state and the awaiting coroutine continues without suspending.
    // Otherwise, the coroutine is suspended and later resumed when some
    // thread calls 'set()'.
    //
    // Note that the coroutine may be resumed inside a call to 'set()'
    // or inside another thread's call to 'operator co_await()'.
    async_auto_reset_event_operation operator co_await() const noexcept;

    // Set the state of the event to 'set'.
    //
    // If there are pending coroutines awaiting the event then one
    // pending coroutine is resumed and the state is immediately
    // set back to the 'not set' state.
    //
    // This operation is a no-op if the event was already 'set'.
    void set() noexcept;

    // Set the state of the event to 'not-set'.
    //
    // This is a no-op if the state was already 'not set'.
    void reset() noexcept;

  };

  class async_auto_reset_event_operation
  {
  public:
    explicit async_auto_reset_event_operation(async_auto_reset_event& event) noexcept;
    async_auto_reset_event_operation(const async_auto_reset_event_operation& other) noexcept;

    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
    void await_resume() const noexcept;

  };
}

async_latch

An async latch is a synchronization primitive that allows coroutines to asynchronously wait until a counter has been decremented to zero.

The latch is a single-use object. Once the counter reaches zero the latch becomes 'ready' and will remain ready until the latch is destroyed.

API Summary:

// <cppcoro/async_latch.hpp>
namespace cppcoro
{
  class async_latch
  {
  public:

    // Initialise the latch with the specified count.
    async_latch(std::ptrdiff_t initialCount) noexcept;

    // Query if the count has reached zero yet.
    bool is_ready() const noexcept;

    // Decrement the count by n.
    // This will resume any waiting coroutines if the count reaches zero
    // as a result of this call.
    // It is undefined behaviour to decrement the count below zero.
    void count_down(std::ptrdiff_t n = 1) noexcept;

    // Wait until the latch becomes ready.
    // If the latch count is not yet zero then the awaiting coroutine will
    // be suspended and later resumed by a call to count_down() that decrements
    // the count to zero. If the latch count was already zero then the coroutine
    // continues without suspending.
    Awaiter<void> operator co_await() const noexcept;

  };
}

sequence_barrier

A sequence_barrier is a synchronization primitive that allows a single-producer and multiple consumers to coordinate with respect to a monotonically increasing sequence number.

A single producer advances the sequence number by publishing new sequence numbers in a monotonically increasing order. One or more consumers can query the last published sequence number and can wait until a particular sequence number has been published.

A sequence barrier can be used to represent a cursor into a thread-safe producer/consumer ring-buffer

See the LMAX Disruptor pattern for more background: https://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf

API Synopsis:

namespace cppcoro
{
  template<typename SEQUENCE = std::size_t,
           typename TRAITS = sequence_traits<SEQUENCE>>
  class sequence_barrier
  {
  public:
    sequence_barrier(SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept;
	~sequence_barrier();

	SEQUENCE last_published() const noexcept;

	// Wait until the specified targetSequence number has been published.
	//
	// If the operation does not complete synchronously then the awaiting
	// coroutine is resumed on the specified scheduler. Otherwise, the
	// coroutine continues without suspending.
	//
	// The co_await expression resumes with the updated last_published()
	// value, which is guaranteed to be at least 'targetSequence'.
	template<typename SCHEDULER>
	[[nodiscard]]
	Awaitable<SEQUENCE> wait_until_published(SEQUENCE targetSequence,
                                             SCHEDULER& scheduler) const noexcept;

    void publish(SEQUENCE sequence) noexcept;
  };
}

single_producer_sequencer

A single_producer_sequencer is a synchronization primitive that can be used to coordinate access to a ring-buffer for a single producer and one or more consumers.

A producer first acquires one or more slots in a ring-buffer, writes to the ring-buffer elements corresponding to those slots, and then finally publishes the values written to those slots. A producer can never produce more than 'bufferSize' elements in advance of where the consumer has consumed up to.

A consumer then waits for certain elements to be published, processes the items and then notifies the producer when it has finished processing items by publishing the sequence number it has finished consuming in a sequence_barrier object.

API Synopsis:

// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
  template<
    typename SEQUENCE = std::size_t,
    typename TRAITS = sequence_traits<SEQUENCE>>
  class single_producer_sequencer
  {
  public:
    using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;

    single_producer_sequencer(
      const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
      std::size_t bufferSize,
      SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept;

    // Publisher API:

    template<typename SCHEDULER>
    [[nodiscard]]
    Awaitable<SEQUENCE> claim_one(SCHEDULER& scheduler) noexcept;

    template<typename SCHEDULER>
    [[nodiscard]]
    Awaitable<sequence_range<SEQUENCE>> claim_up_to(
      std::size_t count,
      SCHEDULER& scheduler) noexcept;

    void publish(SEQUENCE sequence) noexcept;

    // Consumer API:

    SEQUENCE last_published() const noexcept;

    template<typename SCHEDULER>
    [[nodiscard]]
    Awaitable<SEQUENCE> wait_until_published(
      SEQUENCE targetSequence,
      SCHEDULER& scheduler) const noexcept;

  };
}

Example usage:

using namespace cppcoro;
using namespace std::chrono;

struct message
{
  int id;
  steady_clock::time_point timestamp;
  float data;
};

constexpr size_t bufferSize = 16384; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1;
message buffer[bufferSize];

task<void> producer(
  io_service& ioSvc,
  single_producer_sequencer<size_t>& sequencer)
{
  auto start = steady_clock::now();
  for (int i = 0; i < 1'000'000; ++i)
  {
    // Wait until a slot is free in the buffer.
    size_t seq = co_await sequencer.claim_one(ioSvc);

    // Populate the message.
    auto& msg = buffer[seq & indexMask];
    msg.id = i;
    msg.timestamp = steady_clock::now();
    msg.data = 123;

    // Publish the message.
    sequencer.publish(seq);
  }

  // Publish a sentinel
  auto seq = co_await sequencer.claim_one(ioSvc);
  auto& msg = buffer[seq & indexMask];
  msg.id = -1;
  sequencer.publish(seq);
}

task<void> consumer(
  static_thread_pool& threadPool,
  const single_producer_sequencer<size_t>& sequencer,
  sequence_barrier<size_t>& consumerBarrier)
{
  size_t nextToRead = 0;
  while (true)
  {
    // Wait until the next message is available
    // There may be more than one available.
    const size_t available = co_await sequencer.wait_until_published(nextToRead, threadPool);
    do {
      auto& msg = buffer[nextToRead & indexMask];
      if (msg.id == -1)
      {
        consumerBarrier.publish(nextToRead);
        co_return;
      }

      processMessage(msg);
    } while (nextToRead++ != available);

    // Notify the producer that we've finished processing
    // up to 'nextToRead - 1'.
    consumerBarrier.publish(available);
  }
}

task<void> example(io_service& ioSvc, static_thread_pool& threadPool)
{
  sequence_barrier<size_t> barrier;
  single_producer_sequencer<size_t> sequencer{barrier, bufferSize};

  co_await when_all(
    producer(tp, sequencer),
    consumer(tp, sequencer, barrier));
}

multi_producer_sequencer

The multi_producer_sequencer class is a synchronization primitive that coordinates access to a ring-buffer for multiple producers and one or more consumers.

For a single-producer variant see the single_producer_sequencer class.

Note that the ring-buffer must have a size that is a power-of-two. This is because the implementation uses bitmasks instead of integer division/modulo to calculate the offset into the buffer. Also, this allows the sequence number to safely wrap around the 32-bit/64-bit value.

API Summary:

// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
  template<typename SEQUENCE = std::size_t,
           typename TRAITS = sequence_traits<SEQUENCE>>
  class multi_producer_sequencer
  {
  public:
    multi_producer_sequencer(
      const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
      SEQUENCE initialSequence = TRAITS::initial_sequence);

    std::size_t buffer_size() const noexcept;

    // Consumer interface
    //
    // Each consumer keeps track of their own 'lastKnownPublished' value
    // and must pass this to the methods that query for an updated last-known
    // published sequence number.

    SEQUENCE last_published_after(SEQUENCE lastKnownPublished) const noexcept;

    template<typename SCHEDULER>
    Awaitable<SEQUENCE> wait_until_published(
      SEQUENCE targetSequence,
      SEQUENCE lastKnownPublished,
      SCHEDULER& scheduler) const noexcept;

    // Producer interface

    // Query whether any slots available for claiming (approx.)
    bool any_available() const noexcept;

    template<typename SCHEDULER>
    Awaitable<SEQUENCE> claim_one(SCHEDULER& scheduler) noexcept;

    template<typename SCHEDULER>
    Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to(
      std::size_t count,
      SCHEDULER& scheduler) noexcept;

    // Mark the specified sequence number as published.
    void publish(SEQUENCE sequence) noexcept;

    // Mark all sequence numbers in the specified range as published.
    void publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept;
  };
}

Cancellation

A cancellation_token is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function.

To obtain a cancellation_token that is able to be cancelled you must first create a cancellation_source object. The cancellation_source::token() method can be used to manufacture new cancellation_token values that are linked to that cancellation_source object.

When you want to later request cancellation of an operation you have passed a cancellation_token to you can call cancellation_source::request_cancellation() on an associated cancellation_source object.

Functions can respond to a request for cancellation in one of two ways:

  1. Poll for cancellation at regular intervals by calling either cancellation_token::is_cancellation_requested() or cancellation_token::throw_if_cancellation_requested().
  2. Register a callback to be executed when cancellation is requested using the cancellation_registration class.

API Summary:

namespace cppcoro
{
  class cancellation_source
  {
  public:
    // Construct a new, independently cancellable cancellation source.
    cancellation_source();

    // Construct a new reference to the same cancellation state.
    cancellation_source(const cancellation_source& other) noexcept;
    cancellation_source(cancellation_source&& other) noexcept;

    ~cancellation_source();

    cancellation_source& operator=(const cancellation_source& other) noexcept;
    cancellation_source& operator=(cancellation_source&& other) noexcept;

    bool is_cancellation_requested() const noexcept;
    bool can_be_cancelled() const noexcept;
    void request_cancellation();

    cancellation_token token() const noexcept;
  };

  class cancellation_token
  {
  public:
    // Construct a token that can't be cancelled.
    cancellation_token() noexcept;

    cancellation_token(const cancellation_token& other) noexcept;
    cancellation_token(cancellation_token&& other) noexcept;

    ~cancellation_token();

    cancellation_token& operator=(const cancellation_token& other) noexcept;
    cancellation_token& operator=(cancellation_token&& other) noexcept;

    bool is_cancellation_requested() const noexcept;
    void throw_if_cancellation_requested() const;

    // Query if this token can ever have cancellation requested.
    // Code can use this to take a more efficient code-path in cases
    // that the operation does not need to handle cancellation.
    bool can_be_cancelled() const noexcept;
  };

  // RAII class for registering a callback to be executed if cancellation
  // is requested on a particular cancellation token.
  class cancellation_registration
  {
  public:

    // Register a callback to be executed if cancellation is requested.
    // Callback will be called with no arguments on the thread that calls
    // request_cancellation() if cancellation is not yet requested, or
    // called immediately if cancellation has already been requested.
    // Callback must not throw an unhandled exception when called.
    template<typename CALLBACK>
    cancellation_registration(cancellation_token token, CALLBACK&& callback);

    cancellation_registration(const cancellation_registration& other) = delete;

    ~cancellation_registration();
  };

  class operation_cancelled : public std::exception
  {
  public:
    operation_cancelled();
    const char* what() const override;
  };
}

Example: Polling Approach

cppcoro::task<> do_something_async(cppcoro::cancellation_token token)
{
  // Explicitly define cancellation points within the function
  // by calling throw_if_cancellation_requested().
  token.throw_if_cancellation_requested();

  co_await do_step_1();

  token.throw_if_cancellation_requested();

  do_step_2();

  // Alternatively, you can query if cancellation has been
  // requested to allow yourself to do some cleanup before
  // returning.
  if (token.is_cancellation_requested())
  {
    display_message_to_user("Cancelling operation...");
    do_cleanup();
    throw cppcoro::operation_cancelled{};
  }

  do_final_step();
}

Example: Callback Approach

// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait(cppcoro::cancellation_token token)
{
  auto timer = create_timer(10s);

  cppcoro::cancellation_registration registration(token, [&]
  {
    // Call existing timer cancellation API.
    timer.cancel();
  });

  co_await timer;
}

static_thread_pool

The static_thread_pool class provides an abstraction that lets you schedule work on a fixed-size pool of threads.

This class implements the Scheduler concept (see below).

You can enqueue work to the thread-pool by executing co_await threadPool.schedule(). This operation will suspend the current coroutine, enqueue it for execution on the thread-pool and the thread pool will then resume the coroutine when a thread in the thread-pool is next free to run the coroutine. This operation is guaranteed not to throw and, in the common case, will not allocate any memory.

This class makes use of a work-stealing algorithm to load-balance work across multiple threads. Work enqueued to the thread-pool from a thread-pool thread will be scheduled for execution on the same thread in a LIFO queue. Work enqueued to the thread-pool from a remote thread will be enqueued to a global FIFO queue. When a worker thread runs out of work from its local queue it first tries to dequeue work from the global queue. If that queue is empty then it next tries to steal work from the back of the queues of the other worker threads.

API Summary:

namespace cppcoro
{
  class static_thread_pool
  {
  public:
    // Initialise the thread-pool with a number of threads equal to
    // std::thread::hardware_concurrency().
    static_thread_pool();

    // Initialise the thread pool with the specified number of threads.
    explicit static_thread_pool(std::uint32_t threadCount);

    std::uint32_t thread_count() const noexcept;

    class schedule_operation
    {
    public:
      schedule_operation(static_thread_pool* tp) noexcept;

      bool await_ready() noexcept;
      bool await_suspend(std::experimental::coroutine_handle<> h) noexcept;
      bool await_resume() noexcept;

    private:
      // unspecified
    };

    // Return an operation that can be awaited by a coroutine.
    //
    //
    [[nodiscard]]
    schedule_operation schedule() noexcept;

  private:

    // Unspecified

  };
}

Example usage: Simple

cppcoro::task<std::string> do_something_on_threadpool(cppcoro::static_thread_pool& tp)
{
  // First schedule the coroutine onto the threadpool.
  co_await tp.schedule();

  // When it resumes, this coroutine is now running on the threadpool.
  do_something();
}

Example usage: Doing things in parallel - using schedule_on() operator with static_thread_pool.

cppcoro::task<double> dot_product(static_thread_pool& tp, double a[], double b[], size_t count)
{
  if (count > 1000)
  {
    // Subdivide the work recursively into two equal tasks
    // The first half is scheduled to the thread pool so it can run concurrently
    // with the second half which continues on this thread.
    size_t halfCount = count / 2;
    auto [first, second] = co_await when_all(
      schedule_on(tp, dot_product(tp, a, b, halfCount),
      dot_product(tp, a + halfCount, b + halfCount, count - halfCount));
    co_return first + second;
  }
  else
  {
    double sum = 0.0;
    for (size_t i = 0; i < count; ++i)
    {
      sum += a[i] * b[i];
    }
    co_return sum;
  }
}

io_service and io_work_scope

The io_service class provides an abstraction for processing I/O completion events from asynchronous I/O operations.

When an asynchronous I/O operation completes, the coroutine that was awaiting that operation will be resumed on an I/O thread inside a call to one of the event-processing methods: process_events(), process_pending_events(), process_one_event() or process_one_pending_event().

The io_service class does not manage any I/O threads. You must ensure that some thread calls one of the event-processing methods for coroutines awaiting I/O completion events to be dispatched. This can either be a dedicated thread that calls process_events() or mixed in with some other event loop (e.g. a UI event loop) by periodically polling for new events via a call to process_pending_events() or process_one_pending_event().

This allows integration of the io_service event-loop with other event loops, such as a user-interface event loop.

You can multiplex processing of events across multiple threads by having multiple threads call process_events(). You can specify a hint as to the maximum number of threads to have actively processing events via an optional io_service constructor parameter.

On Windows, the implementation makes use of the Windows I/O Completion Port facility to dispatch events to I/O threads in a scalable manner.

API Summary:

namespace cppcoro
{
  class io_service
  {
  public:

    class schedule_operation;
    class timed_schedule_operation;

    io_service();
    io_service(std::uint32_t concurrencyHint);

    io_service(io_service&&) = delete;
    io_service(const io_service&) = delete;
    io_service& operator=(io_service&&) = delete;
    io_service& operator=(const io_service&) = delete;

    ~io_service();

    // Scheduler methods

    [[nodiscard]]
    schedule_operation schedule() noexcept;

    template<typename REP, typename RATIO>
    [[nodiscard]]
    timed_schedule_operation schedule_after(
      std::chrono::duration<REP, RATIO> delay,
      cppcoro::cancellation_token cancellationToken = {}) noexcept;

    // Event-loop methods
    //
    // I/O threads must call these to process I/O events and execute
    // scheduled coroutines.

    std::uint64_t process_events();
    std::uint64_t process_pending_events();
    std::uint64_t process_one_event();
    std::uint64_t process_one_pending_event();

    // Request that all threads processing events exit their event loops.
    void stop() noexcept;

    // Query if some thread has called stop()
    bool is_stop_requested() const noexcept;

    // Reset the event-loop after a call to stop() so that threads can
    // start processing events again.
    void reset();

    // Reference-counting methods for tracking outstanding references
    // to the io_service.
    //
    // The io_service::stop() method will be called when the last work
    // reference is decremented.
    //
    // Use the io_work_scope RAII class to manage calling these methods on
    // entry-to and exit-from a scope.
    void notify_work_started() noexcept;
    void notify_work_finished() noexcept;

  };

  class io_service::schedule_operation
  {
  public:
    schedule_operation(const schedule_operation&) noexcept;
    schedule_operation& operator=(const schedule_operation&) noexcept;

    bool await_ready() const noexcept;
    void await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
    void await_resume() noexcept;
  };

  class io_service::timed_schedule_operation
  {
  public:
    timed_schedule_operation(timed_schedule_operation&&) noexcept;

    timed_schedule_operation(const timed_schedule_operation&) = delete;
    timed_schedule_operation& operator=(const timed_schedule_operation&) = delete;
    timed_schedule_operation& operator=(timed_schedule_operation&&) = delete;

    bool await_ready() const noexcept;
    void await_suspend(std::experimental::coroutine_handle<> awaiter);
    void await_resume();
  };

  class io_work_scope
  {
  public:

    io_work_scope(io_service& ioService) noexcept;

    io_work_scope(const io_work_scope& other) noexcept;
    io_work_scope(io_work_scope&& other) noexcept;

    ~io_work_scope();

    io_work_scope& operator=(const io_work_scope& other) noexcept;
    io_work_scope& operator=(io_work_scope&& other) noexcept;

    io_service& service() const noexcept;
  };

}

Example:

#include <cppcoro/task.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/io_service.hpp>
#include <cppcoro/read_only_file.hpp>

#include <experimental/filesystem>
#include <memory>
#include <algorithm>
#include <iostream>

namespace fs = std::experimental::filesystem;

cppcoro::task<std::uint64_t> count_lines(cppcoro::io_service& ioService, fs::path path)
{
  auto file = cppcoro::read_only_file::open(ioService, path);

  constexpr size_t bufferSize = 4096;
  auto buffer = std::make_unique<std::uint8_t[]>(bufferSize);

  std::uint64_t newlineCount = 0;

  for (std::uint64_t offset = 0, fileSize = file.size(); offset < fileSize;)
  {
    const auto bytesToRead = static_cast<size_t>(
      std::min<std::uint64_t>(bufferSize, fileSize - offset));

    const auto bytesRead = co_await file.read(offset, buffer.get(), bytesToRead);

    newlineCount += std::count(buffer.get(), buffer.get() + bytesRead, '\n');

    offset += bytesRead;
  }

  co_return newlineCount;
}

cppcoro::task<> run(cppcoro::io_service& ioService)
{
  cppcoro::io_work_scope ioScope(ioService);

  auto lineCount = co_await count_lines(ioService, fs::path{"foo.txt"});

  std::cout << "foo.txt has " << lineCount << " lines." << std::endl;;
}

cppcoro::task<> process_events(cppcoro::io_service& ioService)
{
  // Process events until the io_service is stopped.
  // ie. when the last io_work_scope goes out of scope.
  ioService.process_events();
  co_return;
}

int main()
{
  cppcoro::io_service ioService;

  cppcoro::sync_wait(cppcoro::when_all_ready(
    run(ioService),
    process_events(ioService)));

  return 0;
}

io_service as a scheduler

An io_service class implements the interfaces for the Scheduler and DelayedScheduler concepts.

This allows a coroutine to suspend execution on the current thread and schedule itself for resumption on an I/O thread associated with a particular io_service object.

Example:

cppcoro::task<> do_something(cppcoro::io_service& ioService)
{
  // Coroutine starts execution on the thread of the task awaiter.

  // A coroutine can transfer execution to an I/O thread by awaiting the
  // result of io_service::schedule().
  co_await ioService.schedule();

  // At this point, the coroutine is now executing on an I/O thread
  // inside a call to one of the io_service event processing methods.

  // A coroutine can also perform a delayed-schedule that will suspend
  // the coroutine for a specified duration of time before scheduling
  // it for resumption on an I/O thread.
  co_await ioService.schedule_after(100ms);

  // At this point, the coroutine is executing on a potentially different I/O thread.
}

file, readable_file, writable_file

These types are abstract base-classes for performing concrete file I/O.

API Summary:

namespace cppcoro
{
  class file_read_operation;
  class file_write_operation;

  class file
  {
  public:

    virtual ~file();

    std::uint64_t size() const;

  protected:

    file(file&& other) noexcept;

  };

  class readable_file : public virtual file
  {
  public:

    [[nodiscard]]
    file_read_operation read(
      std::uint64_t offset,
      void* buffer,
      std::size_t byteCount,
      cancellation_token ct = {}) const noexcept;

  };

  class writable_file : public virtual file
  {
  public:

    void set_size(std::uint64_t fileSize);

    [[nodiscard]]
    file_write_operation write(
      std::uint64_t offset,
      const void* buffer,
      std::size_t byteCount,
      cancellation_token ct = {}) noexcept;

  };

  class file_read_operation
  {
  public:

    file_read_operation(file_read_operation&& other) noexcept;

    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter);
    std::size_t await_resume();

  };

  class file_write_operation
  {
  public:

    file_write_operation(file_write_operation&& other) noexcept;

    bool await_ready() const noexcept;
    bool await_suspend(std::experimental::coroutine_handle<> awaiter);
    std::size_t await_resume();

  };
}

read_only_file, write_only_file, read_write_file

These types represent concrete file I/O classes.

API Summary:

namespace cppcoro
{
  class read_only_file : public readable_file
  {
  public:

    [[nodiscard]]
    static read_only_file open(
      io_service& ioService,
      const std::experimental::filesystem::path& path,
      file_share_mode shareMode = file_share_mode::read,
      file_buffering_mode bufferingMode = file_buffering_mode::default_);

  };

  class write_only_file : public writable_file
  {
  public:

    [[nodiscard]]
    static write_only_file open(
      io_service& ioService,
      const std::experimental::filesystem::path& path,
      file_open_mode openMode = file_open_mode::create_or_open,
      file_share_mode shareMode = file_share_mode::none,
      file_buffering_mode bufferingMode = file_buffering_mode::default_);

  };

  class read_write_file : public readable_file, public writable_file
  {
  public:

    [[nodiscard]]
    static read_write_file open(
      io_service& ioService,
      const std::experimental::filesystem::path& path,
      file_open_mode openMode = file_open_mode::create_or_open,
      file_share_mode shareMode = file_share_mode::none,
      file_buffering_mode bufferingMode = file_buffering_mode::default_);

  };
}

All open() functions throw std::system_error on failure.

Networking

NOTE: Networking abstractions are currently only supported on the Windows platform. Linux support will be coming soon.

socket

The socket class can be used to send/receive data over the network asynchronously.

Currently only supports TCP/IP, UDP/IP over IPv4 and IPv6.

API Summary:

// <cppcoro/net/socket.hpp>
namespace cppcoro::net
{
  class socket
  {
  public:

    static socket create_tcpv4(ip_service& ioSvc);
    static socket create_tcpv6(ip_service& ioSvc);
    static socket create_updv4(ip_service& ioSvc);
    static socket create_udpv6(ip_service& ioSvc);

    socket(socket&& other) noexcept;

    ~socket();

    socket& operator=(socket&& other) noexcept;

    // Return the native socket handle for the socket
    <platform-specific> native_handle() noexcept;

    const ip_endpoint& local_endpoint() const noexcept;
    const ip_endpoint& remote_endpoint() const noexcept;

    void bind(const ip_endpoint& localEndPoint);

    void listen();

    [[nodiscard]]
    Awaitable<void> connect(const ip_endpoint& remoteEndPoint) noexcept;
    [[nodiscard]]
    Awaitable<void> connect(const ip_endpoint& remoteEndPoint,
                            cancellation_token ct) noexcept;

    [[nodiscard]]
    Awaitable<void> accept(socket& acceptingSocket) noexcept;
    [[nodiscard]]
    Awaitable<void> accept(socket& acceptingSocket,
                           cancellation_token ct) noexcept;

    [[nodiscard]]
    Awaitable<void> disconnect() noexcept;
    [[nodiscard]]
    Awaitable<void> disconnect(cancellation_token ct) noexcept;

    [[nodiscard]]
    Awaitable<std::size_t> send(const void* buffer, std::size_t size) noexcept;
    [[nodiscard]]
    Awaitable<std::size_t> send(const void* buffer,
                                std::size_t size,
                                cancellation_token ct) noexcept;

    [[nodiscard]]
    Awaitable<std::size_t> recv(void* buffer, std::size_t size) noexcept;
    [[nodiscard]]
    Awaitable<std::size_t> recv(void* buffer,
                                std::size_t size,
                                cancellation_token ct) noexcept;

    [[nodiscard]]
    socket_recv_from_operation recv_from(
        void* buffer,
        std::size_t size) noexcept;
    [[nodiscard]]
    socket_recv_from_operation_cancellable recv_from(
        void* buffer,
        std::size_t size,
        cancellation_token ct) noexcept;

    [[nodiscard]]
    socket_send_to_operation send_to(
        const ip_endpoint& destination,
        const void* buffer,
        std::size_t size) noexcept;
    [[nodiscard]]
    socket_send_to_operation_cancellable send_to(
        const ip_endpoint& destination,
        const void* buffer,
        std::size_t size,
        cancellation_token ct) noexcept;

    void close_send();
    void close_recv();

  };
}

Example: Echo Server

#include <cppcoro/net/socket.hpp>
#include <cppcoro/io_service.hpp>
#include <cppcoro/cancellation_source.hpp>
#include <cppcoro/async_scope.hpp>
#include <cppcoro/on_scope_exit.hpp>

#include <memory>
#include <iostream>

cppcoro::task<void> handle_connection(socket s)
{
  try
  {
    const size_t bufferSize = 16384;
    auto buffer = std::make_unique<unsigned char[]>(bufferSize);
    size_t bytesRead;
    do {
      // Read some bytes
      bytesRead = co_await s.recv(buffer.get(), bufferSize);

      // Write some bytes
      size_t bytesWritten = 0;
      while (bytesWritten < bytesRead) {
        bytesWritten += co_await s.send(
          buffer.get() + bytesWritten,
          bytesRead - bytesWritten);
      }
    } while (bytesRead != 0);

    s.close_send();

    co_await s.disconnect();
  }
  catch (...)
  {
    std::cout << "connection failed" << std::
  }
}

cppcoro::task<void> echo_server(
  cppcoro::net::ipv4_endpoint endpoint,
  cppcoro::io_service& ioSvc,
  cancellation_token ct)
{
  cppcoro::async_scope scope;

  std::exception_ptr ex;
  try
  {
    auto listeningSocket = cppcoro::net::socket::create_tcpv4(ioSvc);
    listeningSocket.bind(endpoint);
    listeningSocket.listen();

    while (true) {
      auto connection = cppcoro::net::socket::create_tcpv4(ioSvc);
      co_await listeningSocket.accept(connection, ct);
      scope.spawn(handle_connection(std::move(connection)));
    }
  }
  catch (cppcoro::operation_cancelled)
  {
  }
  catch (...)
  {
    ex = std::current_exception();
  }

  // Wait until all handle_connection tasks have finished.
  co_await scope.join();

  if (ex) std::rethrow_exception(ex);
}

int main(int argc, const char* argv[])
{
    cppcoro::io_service ioSvc;

    if (argc != 2) return -1;

    auto endpoint = cppcoro::ipv4_endpoint::from_string(argv[1]);
    if (!endpoint) return -1;

    (void)cppcoro::sync_wait(cppcoro::when_all(
        [&]() -> task<>
        {
            // Shutdown the event loop once finished.
            auto stopOnExit = cppcoro::on_scope_exit([&] { ioSvc.stop(); });

            cppcoro::cancellation_source canceller;
            co_await cppcoro::when_all(
                [&]() -> task<>
                {
                    // Run for 30s then stop accepting new connections.
                    co_await ioSvc.schedule_after(std::chrono::seconds(30));
                    canceller.request_cancellation();
                }(),
                echo_server(*endpoint, ioSvc, canceller.token()));
        }(),
        [&]() -> task<>
        {
            ioSvc.process_events();
        }()));

    return 0;
}

ip_address, ipv4_address, ipv6_address

Helper classes for representing an IP address.

API Synopsis:

namespace cppcoro::net
{
  class ipv4_address
  {
    using bytes_t = std::uint8_t[4];
  public:
    constexpr ipv4_address();
    explicit constexpr ipv4_address(std::uint32_t integer);
    explicit constexpr ipv4_address(const std::uint8_t(&bytes)[4]);
    explicit constexpr ipv4_address(std::uint8_t b0,
                                    std::uint8_t b1,
                                    std::uint8_t b2,
                                    std::uint8_t b3);

    constexpr const bytes_t& bytes() const;

    constexpr std::uint32_t to_integer() const;

    static constexpr ipv4_address loopback();

    constexpr bool is_loopback() const;
    constexpr bool is_private_network() const;

    constexpr bool operator==(ipv4_address other) const;
    constexpr bool operator!=(ipv4_address other) const;
    constexpr bool operator<(ipv4_address other) const;
    constexpr bool operator>(ipv4_address other) const;
    constexpr bool operator<=(ipv4_address other) const;
    constexpr bool operator>=(ipv4_address other) const;

    std::string to_string();

    static std::optional<ipv4_address> from_string(std::string_view string) noexcept;
  };

  class ipv6_address
  {
    using bytes_t = std::uint8_t[16];
  public:
    constexpr ipv6_address();

    explicit constexpr ipv6_address(
      std::uint64_t subnetPrefix,
      std::uint64_t interfaceIdentifier);

    constexpr ipv6_address(
      std::uint16_t part0,
      std::uint16_t part1,
      std::uint16_t part2,
      std::uint16_t part3,
      std::uint16_t part4,
      std::uint16_t part5,
      std::uint16_t part6,
      std::uint16_t part7);

    explicit constexpr ipv6_address(
        const std::uint16_t(&parts)[8]);

    explicit constexpr ipv6_address(
        const std::uint8_t(bytes)[16]);

    constexpr const bytes_t& bytes() const;

    constexpr std::uint64_t subnet_prefix() const;
    constexpr std::uint64_t interface_identifier() const;

    static constexpr ipv6_address unspecified();
    static constexpr ipv6_address loopback();

    static std::optional<ipv6_address> from_string(std::string_view string) noexcept;

    std::string to_string() const;

    constexpr bool operator==(const ipv6_address& other) const;
    constexpr bool operator!=(const ipv6_address& other) const;
    constexpr bool operator<(const ipv6_address& other) const;
    constexpr bool operator>(const ipv6_address& other) const;
    constexpr bool operator<=(const ipv6_address& other) const;
    constexpr bool operator>=(const ipv6_address& other) const;

  };

  class ip_address
  {
  public:

    // Constructs to IPv4 address 0.0.0.0
    ip_address() noexcept;

    ip_address(ipv4_address address) noexcept;
    ip_address(ipv6_address address) noexcept;

    bool is_ipv4() const noexcept;
    bool is_ipv6() const noexcept;

    const ipv4_address& to_ipv4() const;
    const ipv6_address& to_ipv6() const;

    const std::uint8_t* bytes() const noexcept;

    std::string to_string() const;

    static std::optional<ip_address> from_string(std::string_view string) noexcept;

    bool operator==(const ip_address& rhs) const noexcept;
    bool operator!=(const ip_address& rhs) const noexcept;

    //  ipv4_address sorts less than ipv6_address
    bool operator<(const ip_address& rhs) const noexcept;
    bool operator>(const ip_address& rhs) const noexcept;
    bool operator<=(const ip_address& rhs) const noexcept;
    bool operator>=(const ip_address& rhs) const noexcept;

  };
}

ip_endpoint, ipv4_endpoint ipv6_endpoint

Helper classes for representing an IP address and port-number.

API Synopsis:

namespace cppcoro::net
{
  class ipv4_endpoint
  {
  public:
    ipv4_endpoint() noexcept;
    explicit ipv4_endpoint(ipv4_address address, std::uint16_t port = 0) noexcept;

    const ipv4_address& address() const noexcept;
    std::uint16_t port() const noexcept;

    std::string to_string() const;
    static std::optional<ipv4_endpoint> from_string(std::string_view string) noexcept;
  };

  bool operator==(const ipv4_endpoint& a, const ipv4_endpoint& b);
  bool operator!=(const ipv4_endpoint& a, const ipv4_endpoint& b);
  bool operator<(const ipv4_endpoint& a, const ipv4_endpoint& b);
  bool operator>(const ipv4_endpoint& a, const ipv4_endpoint& b);
  bool operator<=(const ipv4_endpoint& a, const ipv4_endpoint& b);
  bool operator>=(const ipv4_endpoint& a, const ipv4_endpoint& b);

  class ipv6_endpoint
  {
  public:
    ipv6_endpoint() noexcept;
    explicit ipv6_endpoint(ipv6_address address, std::uint16_t port = 0) noexcept;

    const ipv6_address& address() const noexcept;
    std::uint16_t port() const noexcept;

    std::string to_string() const;
    static std::optional<ipv6_endpoint> from_string(std::string_view string) noexcept;
  };

  bool operator==(const ipv6_endpoint& a, const ipv6_endpoint& b);
  bool operator!=(const ipv6_endpoint& a, const ipv6_endpoint& b);
  bool operator<(const ipv6_endpoint& a, const ipv6_endpoint& b);
  bool operator>(const ipv6_endpoint& a, const ipv6_endpoint& b);
  bool operator<=(const ipv6_endpoint& a, const ipv6_endpoint& b);
  bool operator>=(const ipv6_endpoint& a, const ipv6_endpoint& b);

  class ip_endpoint
  {
  public:
     // Constructs to IPv4 end-point 0.0.0.0:0
     ip_endpoint() noexcept;

     ip_endpoint(ipv4_endpoint endpoint) noexcept;
     ip_endpoint(ipv6_endpoint endpoint) noexcept;

     bool is_ipv4() const noexcept;
     bool is_ipv6() const noexcept;

     const ipv4_endpoint& to_ipv4() const;
     const ipv6_endpoint& to_ipv6() const;

     ip_address address() const noexcept;
     std::uint16_t port() const noexcept;

     std::string to_string() const;

     static std::optional<ip_endpoint> from_string(std::string_view string) noexcept;

     bool operator==(const ip_endpoint& rhs) const noexcept;
     bool operator!=(const ip_endpoint& rhs) const noexcept;

     //  ipv4_endpoint sorts less than ipv6_endpoint
     bool operator<(const ip_endpoint& rhs) const noexcept;
     bool operator>(const ip_endpoint& rhs) const noexcept;
     bool operator<=(const ip_endpoint& rhs) const noexcept;
     bool operator>=(const ip_endpoint& rhs) const noexcept;
  };
}

Functions

sync_wait()

The sync_wait() function can be used to synchronously wait until the specified awaitable completes.

The specified awaitable will be co_awaited on current thread inside a newly created coroutine.

The sync_wait() call will block until the operation completes and will return the result of the co_await expression or rethrow the exception if the co_await expression completed with an unhandled exception.

The sync_wait() function is mostly useful for starting a top-level task from within main() and waiting until the task finishes, in practice it is the only way to start the first/top-level task.

API Summary:

// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
  template<typename AWAITABLE>
  auto sync_wait(AWAITABLE&& awaitable)
    -> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}

Examples:

void example_task()
{
  auto makeTask = []() -> task<std::string>
  {
    co_return "foo";
  };

  auto task = makeTask();

  // start the lazy task and wait until it completes
  sync_wait(task); // -> "foo"
  sync_wait(makeTask()); // -> "foo"
}

void example_shared_task()
{
  auto makeTask = []() -> shared_task<std::string>
  {
    co_return "foo";
  };

  auto task = makeTask();
  // start the shared task and wait until it completes
  sync_wait(task) == "foo";
  sync_wait(makeTask()) == "foo";
}

when_all_ready()

The when_all_ready() function can be used to create a new awaitable that completes when all of the input awaitables complete.

Input tasks can be any type of awaitable.

When the returned awaitable is co_awaited it will co_await each of the input awaitables in turn on the awaiting thread in the order they are passed to the when_all_ready() function. If these tasks to not complete synchronously then they will execute concurrently.

Once all of the co_await expressions on input awaitables have run to completion the returned awaitable will complete and resume the awaiting coroutine. The awaiting coroutine will be resumed on the thread of the input awaitable that is last to complete.

The returned awaitable is guaranteed not to throw an exception when co_awaited, even if some of the input awaitables fail with an unhandled exception.

Note, however, that the when_all_ready() call itself may throw std::bad_alloc if it was unable to allocate memory for the coroutine frames required to await each of the input awaitables. It may also throw an exception if any of the input awaitable objects throw from their copy/move constructors.

The result of co_awaiting the returned awaitable is a std::tuple or std::vector of when_all_task<RESULT> objects. These objects allow you to obtain the result (or exception) of each input awaitable separately by calling the when_all_task<RESULT>::result() method of the corresponding output task. This allows the caller to concurrently await multiple awaitables and synchronize on their completion while still retaining the ability to subsequently inspect the results of each of the co_await operations for success/failure.

This differs from when_all() where the failure of any individual co_await operation causes the overall operation to fail with an exception. This means you cannot determine which of the component co_await operations failed and also prevents you from obtaining the results of the other co_await operations.

API summary:

// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
  // Concurrently await multiple awaitables.
  //
  // Returns an awaitable object that, when co_await'ed, will co_await each of the input
  // awaitable objects and will resume the awaiting coroutine only when all of the
  // component co_await operations complete.
  //
  // Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
  // one for each input awaitable and where T is the result-type of the co_await expression
  // on the corresponding awaitable.
  //
  // AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
  // (if passed as lvalue). The co_await expression will be executed on an rvalue of the
  // copied awaitable.
  template<typename... AWAITABLES>
  auto when_all_ready(AWAITABLES&&... awaitables)
    -> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;

  // Concurrently await each awaitable in a vector of input awaitables.
  template<
    typename AWAITABLE,
    typename RESULT = typename awaitable_traits<AWAITABLE>::await_result_t>
  auto when_all_ready(std::vector<AWAITABLE> awaitables)
    -> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}

Example usage:

task<std::string> get_record(int id);

task<> example1()
{
  // Run 3 get_record() operations concurrently and wait until they're all ready.
  // Returns a std::tuple of tasks that can be unpacked using structured bindings.
  auto [task1, task2, task3] = co_await when_all_ready(
    get_record(123),
    get_record(456),
    get_record(789));

  // Unpack the result of each task
  std::string& record1 = task1.result();
  std::string& record2 = task2.result();
  std::string& record3 = task3.result();

  // Use records....
}

task<> example2()
{
  // Create the input tasks. They don't start executing yet.
  std::vector<task<std::string>> tasks;
  for (int i = 0; i < 1000; ++i)
  {
    tasks.emplace_back(get_record(i));
  }

  // Execute all tasks concurrently.
  std::vector<detail::when_all_task<std::string>> resultTasks =
    co_await when_all_ready(std::move(tasks));

  // Unpack and handle each result individually once they're all complete.
  for (int i = 0; i < 1000; ++i)
  {
    try
    {
      std::string& record = tasks[i].result();
      std::cout << i << " = " << record << std::endl;
    }
    catch (const std::exception& ex)
    {
      std::cout << i << " : " << ex.what() << std::endl;
    }
  }
}

when_all()

The when_all() function can be used to create a new Awaitable that when co_awaited will co_await each of the input awaitables concurrently and return an aggregate of their individual results.

When the returned awaitable is awaited, it will co_await each of the input awaitables on the current thread. Once the first awaitable suspends, the second task will be started, and so on. The operations execute concurrently until they have all run to completion.

Once all component co_await operations have run to completion, an aggregate of the results is constructed from each individual result. If an exception is thrown by any of the input tasks or if the construction of the aggregate result throws an exception then the exception will propagate out of the co_await of the returned awaitable.

If multiple co_await operations fail with an exception then one of the exceptions will propagate out of the co_await when_all() expression the other exceptions will be silently ignored. It is not specified which operation's exception will be chosen.

If it is important to know which component co_await operation failed or to retain the ability to obtain results of other operations even if some of them fail then you you should use when_all_ready() instead.

API Summary:

// <cppcoro/when_all.hpp>
namespace cppcoro
{
  // Variadic version.
  //
  // Note that if the result of `co_await awaitable` yields a void-type
  // for some awaitables then the corresponding component for that awaitable
  // in the tuple will be an empty struct of type detail::void_value.
  template<typename... AWAITABLES>
  auto when_all(AWAITABLES&&... awaitables)
    -> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;

  // Overload for vector<Awaitable<void>>.
  template<
    typename AWAITABLE,
    typename RESULT = typename awaitable_traits<AWAITABLE>::await_result_t,
    std::enable_if_t<std::is_void_v<RESULT>, int> = 0>
  auto when_all(std::vector<AWAITABLE> awaitables)
    -> Awaitable<void>;

  // Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
  template<
    typename AWAITABLE,
    typename RESULT = typename awaitable_traits<AWAITABLE>::await_result_t,
    std::enable_if_t<!std::is_void_v<RESULT>, int> = 0>
  auto when_all(std::vector<AWAITABLE> awaitables)
    -> Awaitable<std::vector<std::conditional_t<
         std::is_lvalue_reference_v<RESULT>,
         std::reference_wrapper<std::remove_reference_t<RESULT>>,
         std::remove_reference_t<RESULT>>>>;
}

Examples:

task<A> get_a();
task<B> get_b();

task<> example1()
{
  // Run get_a() and get_b() concurrently.
  // Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
  auto [a, b] = co_await when_all(get_a(), get_b());

  // use a, b
}

task<std::string> get_record(int id);

task<> example2()
{
  std::vector<task<std::string>> tasks;
  for (int i = 0; i < 1000; ++i)
  {
    tasks.emplace_back(get_record(i));
  }

  // Concurrently execute all get_record() tasks.
  // If any of them fail with an exception then the exception will propagate
  // out of the co_await expression once they have all completed.
  std::vector<std::string> records = co_await when_all(std::move(tasks));

  // Process results
  for (int i = 0; i < 1000; ++i)
  {
    std::cout << i << " = " << records[i] << std::endl;
  }
}

fmap()

The fmap() function can be used to apply a callable function to the value(s) contained within a container-type, returning a new container-type of the results of applying the function the contained value(s).

The fmap() function can apply a function to values of type generator<T>, recursive_generator<T> and async_generator<T> as well as any value that supports the Awaitable concept (eg. task<T>).

Each of these types provides an overload for fmap() that takes two arguments; a function to apply and the container value. See documentation for each type for the supported fmap() overloads.

For example, the fmap() function can be used to apply a function to the eventual result of a task<T>, producing a new task<U> that will complete with the return-value of the function.

// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b(A value);

// And a task that yields a value of type A
cppcoro::task<A> get_an_a();

// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());

// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);

API Summary:

// <cppcoro/fmap.hpp>
namespace cppcoro
{
  template<typename FUNC>
  struct fmap_transform
  {
    fmap_transform(FUNC&& func) noexcept(std::is_nothrow_move_constructible_v<FUNC>);
    FUNC func;
  };

  // Type-deducing constructor for fmap_transform object that can be used
  // in conjunction with operator|.
  template<typename FUNC>
  fmap_transform<FUNC> fmap(FUNC&& func);

  // operator| overloads for providing pipe-based syntactic sugar for fmap()
  // such that the expression:
  //   <value-expr> | cppcoro::fmap(<func-expr>)
  // is equivalent to:
  //   fmap(<func-expr>, <value-expr>)

  template<typename T, typename FUNC>
  decltype(auto) operator|(T&& value, fmap_transform<FUNC>&& transform);

  template<typename T, typename FUNC>
  decltype(auto) operator|(T&& value, fmap_transform<FUNC>& transform);

  template<typename T, typename FUNC>
  decltype(auto) operator|(T&& value, const fmap_transform<FUNC>& transform);

  // Generic overload for all awaitable types.
  //
  // Returns an awaitable that when co_awaited, co_awaits the specified awaitable
  // and applies the specified func to the result of the 'co_await awaitable'
  // expression as if by 'std::invoke(func, co_await awaitable)'.
  //
  // If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
  // returned awaitable is equivalent to 'co_await awaitable, func()'.
  template<
    typename FUNC,
    typename AWAITABLE,
    std::enable_if_t<is_awaitable_v<AWAITABLE>, int> = 0>
  auto fmap(FUNC&& func, AWAITABLE&& awaitable)
    -> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
}

The fmap() function is designed to look up the correct overload by argument-dependent lookup (ADL) so it should generally be called without the cppcoro:: prefix.

resume_on()

The resume_on() function can be used to control the execution context that an awaitable will resume the awaiting coroutine on when awaited. When applied to an async_generator it controls which execution context the co_await g.begin() and co_await ++it operations resume the awaiting coroutines on.

Normally, the awaiting coroutine of an awaitable (eg. a task) or async_generator will resume execution on whatever thread the operation completed on. In some cases this may not be the thread that you want to continue executing on. In these cases you can use the resume_on() function to create a new awaitable or generator that will resume execution on a thread associated with a specified scheduler.

The resume_on() function can be used either as a normal function returning a new awaitable/generator. Or it can be used in a pipeline-syntax.

Example:

task<record> load_record(int id);

ui_thread_scheduler uiThreadScheduler;

task<> example()
{
  // This will start load_record() on the current thread.
  // Then when load_record() completes (probably on an I/O thread)
  // it will reschedule execution onto thread pool and call to_json
  // Once to_json completes it will transfer execution onto the
  // ui thread before resuming this coroutine and returning the json text.
  task<std::string> jsonTask =
    load_record(123)
    | cppcoro::resume_on(threadpool::default())
    | cppcoro::fmap(to_json)
    | cppcoro::resume_on(uiThreadScheduler);

  // At this point, all we've done is create a pipeline of tasks.
  // The tasks haven't started executing yet.

  // Await the result. Starts the pipeline of tasks.
  std::string jsonText = co_await jsonTask;

  // Guaranteed to be executing on ui thread here.

  someUiControl.set_text(jsonText);
}

API Summary:

// <cppcoro/resume_on.hpp>
namespace cppcoro
{
  template<typename SCHEDULER, typename AWAITABLE>
  auto resume_on(SCHEDULER& scheduler, AWAITABLE awaitable)
    -> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;

  template<typename SCHEDULER, typename T>
  async_generator<T> resume_on(SCHEDULER& scheduler, async_generator<T> source);

  template<typename SCHEDULER>
  struct resume_on_transform
  {
    explicit resume_on_transform(SCHEDULER& scheduler) noexcept;
    SCHEDULER& scheduler;
  };

  // Construct a transform/operation that can be applied to a source object
  // using "pipe" notation (ie. operator|).
  template<typename SCHEDULER>
  resume_on_transform<SCHEDULER> resume_on(SCHEDULER& scheduler) noexcept;

  // Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
  template<typename T, typename SCHEDULER>
  decltype(auto) operator|(T&& value, resume_on_transform<SCHEDULER> transform)
  {
    return resume_on(transform.scheduler, std::forward<T>(value));
  }
}

schedule_on()

The schedule_on() function can be used to change the execution context that a given awaitable or async_generator starts executing on.

When applied to an async_generator it also affects which execution context it resumes on after co_yield statement.

Note that the schedule_on transform does not specify the thread that the awaitable or async_generator will complete or yield results on, that is up to the implementation of the awaitable or generator.

See the resume_on() operator for a transform that controls the thread the operation completes on.

For example:

task<int> get_value();
io_service ioSvc;

task<> example()
{
  // Starts executing get_value() on the current thread.
  int a = co_await get_value();

  // Starts executing get_value() on a thread associated with ioSvc.
  int b = co_await schedule_on(ioSvc, get_value());
}

API Summary:

// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
  // Return a task that yields the same result as 't' but that
  // ensures that 't' is co_await'ed on a thread associated with
  // the specified scheduler. Resulting task will complete on
  // whatever thread 't' would normally complete on.
  template<typename SCHEDULER, typename AWAITABLE>
  auto schedule_on(SCHEDULER& scheduler, AWAITABLE awaitable)
    -> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;

  // Return a generator that yields the same sequence of results as
  // 'source' but that ensures that execution of the coroutine starts
  // execution on a thread associated with 'scheduler' and resumes
  // after a 'co_yield' on a thread associated with 'scheduler'.
  template<typename SCHEDULER, typename T>
  async_generator<T> schedule_on(SCHEDULER& scheduler, async_generator<T> source);

  template<typename SCHEDULER>
  struct schedule_on_transform
  {
    explicit schedule_on_transform(SCHEDULER& scheduler) noexcept;
    SCHEDULER& scheduler;
  };

  template<typename SCHEDULER>
  schedule_on_transform<SCHEDULER> schedule_on(SCHEDULER& scheduler) noexcept;

  template<typename T, typename SCHEDULER>
  decltype(auto) operator|(T&& value, schedule_on_transform<SCHEDULER> transform);
}

Metafunctions

awaitable_traits<T>

This template metafunction can be used to determine what the resulting type of a co_await expression will be if applied to an expression of type T.

Note that this assumes the value of type T is being awaited in a context where it is unaffected by any await_transform applied by the coroutine's promise object. The results may differ if a value of type T is awaited in such a context.

The awaitable_traits<T> template metafunction does not define the awaiter_t or await_result_t nested typedefs if type, T, is not awaitable. This allows its use in SFINAE contexts that disables overloads when T is not awaitable.

API Summary:

// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
  template<typename T>
  struct awaitable_traits
  {
    // The type that results from applying `operator co_await()` to a value
    // of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
    typename awaiter_t = <unspecified>;

    // The type of the result of co_await'ing a value of type T.
    typename await_result_t = <unspecified>;
  };
}

is_awaitable<T>

The is_awaitable<T> template metafunction allows you to query whether or not a given type can be co_awaited or not from within a coroutine.

API Summary:

// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
  template<typename T>
  struct is_awaitable : std::bool_constant<...>
  {};

  template<typename T>
  constexpr bool is_awaitable_v = is_awaitable<T>::value;
}

Concepts

Awaitable<T> concept

An Awaitable<T> is a concept that indicates that a type can be co_awaited in a coroutine context that has no await_transform overloads and that the result of the co_await expression has type, T.

For example, the type task<T> implements the concept Awaitable<T&&> whereas the type task<T>& implements the concept Awaitable<T&>.

Awaiter<T> concept

An Awaiter<T> is a concept that indicates a type contains the await_ready, await_suspend and await_resume methods required to implement the protocol for suspending/resuming an awaiting coroutine.

A type that satisfies Awaiter<T> must have, for an instance of the type, awaiter:

  • awaiter.await_ready() -> bool
  • awaiter.await_suspend(std::experimental::coroutine_handle<void>{}) -> void or bool or std::experimental::coroutine_handle<P> for some P.
  • awaiter.await_resume() -> T

Any type that implements the Awaiter<T> concept also implements the Awaitable<T> concept.

Scheduler concept

A Scheduler is a concept that allows scheduling execution of coroutines within some execution context.

concept Scheduler
{
  Awaitable<void> schedule();
}

Given a type, S, that implements the Scheduler concept, and an instance, s, of type S:

  • The s.schedule() method returns an awaitable-type such that co_await s.schedule() will unconditionally suspend the current coroutine and schedule it for resumption on the execution context associated with the scheduler, s.
  • The result of the co_await s.schedule() expression has type void.
cppcoro::task<> f(Scheduler& scheduler)
{
  // Execution of the coroutine is initially on the caller's execution context.

  // Suspends execution of the coroutine and schedules it for resumption on
  // the scheduler's execution context.
  co_await scheduler.schedule();

  // At this point the coroutine is now executing on the scheduler's
  // execution context.
}

DelayedScheduler concept

A DelayedScheduler is a concept that allows a coroutine to schedule itself for execution on the scheduler's execution context after a specified duration of time has elapsed.

concept DelayedScheduler : Scheduler
{
  template<typename REP, typename RATIO>
  Awaitable<void> schedule_after(std::chrono::duration<REP, RATIO> delay);

  template<typename REP, typename RATIO>
  Awaitable<void> schedule_after(
    std::chrono::duration<REP, RATIO> delay,
    cppcoro::cancellation_token cancellationToken);
}

Given a type, S, that implements the DelayedScheduler and an instance, s of type S:

  • The s.schedule_after(delay) method returns an object that can be awaited such that co_await s.schedule_after(delay) suspends the current coroutine for a duration of delay before scheduling the coroutine for resumption on the execution context associated with the scheduler, s.
  • The co_await s.schedule_after(delay) expression has type void.

Building

The cppcoro library supports building under Windows with Visual Studio 2017 and Linux with Clang 5.0+.

This library makes use of the Cake build system (no, not the C# one).

The cake build system is checked out automatically as a git submodule so you don't need to download or install it separately.

Building on Windows

This library currently requires Visual Studio 2017 or later and the Windows 10 SDK.

Support for Clang (#3) and Linux (#15) is planned.

Prerequisites

The Cake build-system is implemented in Python and requires Python 2.7 to be installed.

Ensure Python 2.7 interpreter is in your PATH and available as 'python'.

Ensure Visual Studio 2017 Update 3 or later is installed. Note that there are some known issues with coroutines in Update 2 or earlier that have been fixed in Update 3.

You can also use an experimental version of the Visual Studio compiler by downloading a NuGet package from https://vcppdogfooding.azurewebsites.net/ and unzipping the .nuget file to a directory. Just update the config.cake file to point at the unzipped location by modifying and uncommenting the following line:

nugetPath = None # r'C:\Path\To\VisualCppTools.14.0.25224-Pre'

Ensure that you have the Windows 10 SDK installed. It will use the latest Windows 10 SDK and Universal C Runtime version by default.

Cloning the repository

The cppcoro repository makes use of git submodules to pull in the source for the Cake build system.

This means you need to pass the --recursive flag to the git clone command. eg.

c:\Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git

If you have already cloned cppcoro, then you should update the submodules after pulling changes.

c:\Code\cppcoro> git submodule update --init --recursive

Building from the command-line

To build from the command-line just run 'cake.bat' in the workspace root.

eg.

C:\cppcoro> cake.bat
Building with C:\cppcoro\config.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling test\main.cpp
Compiling test\main.cpp
Compiling test\main.cpp
Compiling test\main.cpp
...
Linking build\windows_x86_msvc14.10_debug\test\run.exe
Linking build\windows_x64_msvc14.10_optimised\test\run.exe
Linking build\windows_x86_msvc14.10_optimised\test\run.exe
Linking build\windows_x64_msvc14.10_debug\test\run.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.

By default, running cake with no arguments will build all projects with all build variants and execute the unit-tests. You can narrow what is built by passing additional command-line arguments. eg.

c:\cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:\Users\Lewis\Code\cppcoro\config.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving build\windows_x64_msvc14.10_debug\lib\cppcoro.lib
Build succeeded.
Build took 0:00:00.321.

You can run cake --help to list available command-line options.

Building Visual Studio project files

To develop from within Visual Studio you can build .vcproj/.sln files by running cake.bat -p.

eg.

c:\cppcoro> cake.bat -p
Building with C:\cppcoro\config.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:\cppcoro\config.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.

When you build these projects from within Visual Studio it will call out to cake to perform the compilation.

Building on Linux

The cppcoro project can also be built under Linux using Clang + libc++ 5.0 or later.

Building cppcoro has been tested under Ubuntu 17.04.

Prerequisities

Ensure you have the following packages installed:

  • Python 2.7
  • Clang >= 5.0
  • LLD >= 5.0
  • libc++ >= 5.0

Building cppcoro

This is assuming you have Clang and libc++ built and installed.

If you don't have Clang configured yet, see the following sections for details on setting up Clang for building with cppcoro.

Checkout cppcoro and its submodules:

git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro

Run init.sh to setup the cake bash function:

cd cppcoro
source init.sh

Then you can run cake from the workspace root to build cppcoro and run tests:

$ cake

You can specify additional command-line arguments to customise the build:

  • --help will print out help for command-line arguments
  • --debug=run will show the build command-lines being run
  • release=debug or release=optimised will limit the build variant to either debug or optimised (by default it will build both).
  • lib/build.cake will just build the cppcoro library and not the tests.
  • test/build.cake@task_tests.cpp will just compile a particular source file
  • test/build.cake@testresult will build and run the tests

For example:

$ cake --debug=run release=debug lib/build.cake

Customising location of Clang

If your clang compiler is not located at /usr/bin/clang then you can specify an alternative location using one or more of the following command-line options for cake:

  • --clang-executable=<name> - Specify the clang executable name to use instead of clang. eg. to force use of Clang 8.0 pass --clang-executable=clang-8
  • --clang-executable=<abspath> - Specify the full path to clang executable. The build system will also look for other executables in the same directory. If this path has the form <prefix>/bin/<name> then this will also set the default clang-install-prefix to <prefix>.
  • --clang-install-prefix=<path> - Specify path where clang has been installed. This will cause the build system to look for clang under <path>/bin (unless overridden by --clang-executable).
  • --libcxx-install-prefix=<path> - Specify path where libc++ has been installed. By default the build system will look for libc++ in the same location as clang. Use this command-line option if it is installed in a different location.

Example: Use a specific version of clang installed in the default location

$ cake --clang-executable=clang-8

Example: Use the default version of clang from a custom location

$ cake --clang-install-prefix=/path/to/clang-install

Example: Use a specific version of clang, in a custom location, with libc++ from a different location

$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install

Using a snapshot build of Clang

If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.

Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.

For example, for Ubuntu 17.04 Zesty:

Edit /etc/apt/sources.list and add the following lines:

deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main

Install the PGP key for those packages:

$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -

Install Clang and LLD:

$ sudo apt-get install clang-6.0 lld-6.0

The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself. See below.

Building your own Clang

You can also use the bleeding-edge Clang version by building Clang from source yourself.

See instructions here:

To do this you will need to install the following pre-requisites:

$ sudo apt-get install git cmake ninja-build clang lld

Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.

Checkout LLVM + Clang + LLD + libc++ repositories:

mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx

Configure and build Clang:

mkdir clang-build
cd clang-build
cmake -GNinja \
      -DCMAKE_CXX_COMPILER=/usr/bin/clang++ \
      -DCMAKE_C_COMPILER=/usr/bin/clang \
      -DCMAKE_BUILD_TYPE=MinSizeRel \
      -DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
      -DCMAKE_BUILD_WITH_INSTALL_RPATH="yes" \
      -DLLVM_TARGETS_TO_BUILD=X86 \
      -DLLVM_ENABLE_PROJECTS="lld;clang" \
      ../llvm
ninja install-clang \
      install-clang-headers \
      install-llvm-ar \
      install-lld

Building libc++

The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.

Checkout libc++ + llvm:

mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx

Build libc++:

mkdir libcxx-build
cd libcxx-build
cmake -GNinja \
      -DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++" \
      -DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang" \
      -DCMAKE_BUILD_TYPE=Release \
      -DCMAKE_INSTALL_PREFIX="/path/to/clang/install" \
      -DLLVM_PATH="../llvm" \
      -DLIBCXX_CXX_ABI=libstdc++ \
      -DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/" \
      ../libcxx
ninja cxx
ninja install

This will build and install libc++ into the same install directory where you have clang installed.

Installing from vcpkg

The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:

git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh  # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoro

If the version is out of date, please create an issue or pull request on the vcpkg repository.

Support

GitHub issues are the primary mechanism for support, bug reports and feature requests.

Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.

If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.

cppcoro's People

Contributors

alinshans avatar andreasbuhr avatar anlongfei avatar billyoneal avatar blapid avatar cheney-w avatar denchat avatar fghzxm avatar gomez-addams avatar gouravkumar2 avatar grishavanika avatar lewissbaker avatar lofcek avatar lunastorm avatar menefotto avatar modocache avatar mwinterb avatar nathiss avatar uran198 avatar yaoxinliu avatar yehezkelshb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cppcoro's Issues

Make when_all usable with arbitrary awaitables rather than only for task<T> and shared_task<T>

For example, allow passing a cppcoro::file_read_operation into the variadic when_all overload.

Also see if we can get rid of the need for intrusive get_starter() methods on task and shared_task and instead just create N temporary coroutines within when_all to use as the continuation for each of the awaitable operations.

If possible, I'd still like to retain the behaviour such that if when_all successfully returns a task without throwing std::bad_alloc that then you can guarantee that co_awaiting the returned task will start each of the async operations and wait until they all complete.

Add concurrency abstraction for server connection-handling workloads

The when_all concurrency primitive currently requires that all of the tasks are created up front whereas with server workloads we'll typically be dynamically starting new tasks to handle the connections as clients connect.

Consider adding some kind of task_pool or possibly when_all_ready overload that takes async_generator<task<>> and returns a task<> that can start new tasks as they are created but still ensure there is no potential for dangling tasks.

Add support for building with clang

@GorNishanov has been working on adding C++ coroutine support to clang and llvm.

The build system needs to be updated to add support for building this library with clang so that it can be tested on compilers other than MSVC.

As far as I can tell, not all of the changes have been upstreamed to clang yet so I'll need to use a custom build of clang in the mean-time.

Deprecate and remove 'eager' task classes `task` and `shared_task`

Once we have a way to synchronously block waiting for a task in #27 and when_all in #10 there shouldn't be any need for the eagerly started task types any more and we can limit use to just the lazy task types (ie. lazy_task and shared_lazy_task)

One of the big motivations for getting rid of eagerly-started tasks is that it is difficult to write exception-safe code with use of eagerly-started tasks that aren't immediately co_awaited.

We don't want to leave dangling tasks/computation as this can lead to ignored/uncaught exceptions or unsynchronised access to shared resources.

For example, consider the case where we want to execute two tasks concurrently and wait for them both to finish before continuing, using their results:

task<A> do1();
task<B> do2();

task<> do1and2concurrently_unsafe()
{
  // The calls to do1() and do2() start the tasks eagerly but are potentially executed
  // in any order and either one of them could fail after the other has already started
  // a concurrent computation.
  auto [a, b] = cppcoro::when_all(do1(), do2());

  // use a and b ...
}

To implement this in an exception-safe way we'd need to modify the function as follows:

task<> do1and2concurrently_safe()
{
  // This implicitly starts executing the task and it executes concurrently with the
  // rest of the logic below until we co_await the task.
  task<A> t1 = do1();

  // Now we need to start the do2() task but since this could fail we need
  // to start it inside a try/catch so we can wait for t1 to finish before it
  // goes out of scope (we don't want to leave any dangling computation).
  task<B> t2;
  std::exception_ptr ex;
  try
  {
    // This call might throw, since it may need to allocate a coroutine frame
    // which could fail with std::bad_alloc.
    t2 = do2();
  }
  catch (...)
  {
    // Don't want to leave t1 still executing but we can't co_await inside catch-block
    // So we capture current exception and do co_await outside catch block.
    ex = std::current_exception();
  }

  if (ex)
  {
    // Wait until t1 completes before rethrowing the exception.
    co_await t1.when_ready();
    std::rethrow_exception(ex);
  }

  // Now that we have both t1 and t2 started successfully we can use when_all() to get the results.
  auto [a, b] = when_all(std::move(t1), std::move(t2));

  // use a, b ...  
}

Compared with lazy_task version which is both concise and exception-safe:

lazy_task<> do1();
lazy_task<> do2();

lazy_task<> do1and2concurrently()
{
  // The calls to do1() and do2() can still execute in any order but all they
  // do is allocate coroutine frames, they don't start any computation.
  // If the second call fails then the normal stack-unwinding will ensure the
  // first coroutine frame is destroyed. It doesn't need to worry about waiting
  // for the first task to complete. Either they both start or none of them do.
  auto [a, b] = cppcoro::when_all(do1(), do2());

  // use a and b ...
}

With a lazy_task, the task is either being co_awaited by some other coroutine or it is not executing (it has either not yet started, or has completed executing) and so the lazy_task is always safe to destruct and will free the coroutine frame it owns.

A side-benefit of using lazy_task everywhere is that it can be implemented without the need for std::atomic operations to synchronise coroutine completion and awaiter. This can have some potential benefits for performance by avoiding use of atomic operations for basic sequential flow of execution.

Add generator<T> coroutine type

Add a new type cppcoro::generator<T> that allows you to write a coroutine that yields a sequence of values procedurally from within the coroutine using the co_yield keyword.

eg.

cppcoro::generator<int> range(int n)
{
  for (int i = 0; i < n; ++i) co_yield i;
}

// Outputs: 0 1 2 3 4 5 6 7 8 9
void usage()
{
  for (auto i : range(10))
  {
    std::cout << i << " ";
  }
}

Add recursive_generator<T> coroutine type

Add a new recursive_generator<T> type that allows for efficient enumeration over a sequence that is defined recursively.

This type would allow you to pass either a T or a recursive_generator<T> to the co_yield expression.
The increment operator on the iterator would then directly resume the leaf-most coroutine rather than the having to resume each coroutine in the stack until the leaf is resumed and then later suspend every coroutine on the stack for each item.

eg.

generator<directory_entry> list_directory(std::string path);

recursive_generator<directory_entry> recursive_list_directory(std::string path)
{
  for (auto& entry : list_directory(path))
  {
    co_yield entry;
    if (entry.is_directory())
    {
      co_yield recursive_list_directory(entry.path());
    }
  }
}

void usage()
{
  for (auto& entry : recursive_list_directory("foo/bar"))
  {
    std::cout << entry.path() << "\n";
  }
}

Add when_any() for waiting for at least one task to complete

The difficult part of designing when_any() will be how to handle cancellation of the co_await operations of the other tasks.

Currently, the task<T> and shared_task<T> types don't allow the caller to cancel the co_await operation once it has been awaited. We need to wait for the task to complete before the awaiting coroutine returns.

If the tasks themselves are cancellable, we could hook something up using cancellation_tokens.
eg. If we pass the same cancellation_token into each task then concurrently await all of the tasks and when any task completes, we then call request_cancellation() on the cancellation_source to request the other tasks to cancel promptly. Then we could just use when_all() to wait for all of the tasks.

To do this more generally, we'd need to be able to cancel the await operation on a task without necessarily cancelling the task itself. This would require a different data-structure in the promise object for keeping track of awaiters to allow unsubscribing an awaiter from that list in a lock-free way.
Maybe consider a similar data-structure to that used by cancellation_registration?

Don't yield sequence of mutable T& from generator<T> and async_generator<T>

Currently, the generator<T> and async_generator<T> classes return a mutable reference to the yielded value.

This can lead to unexpected consequences:

generator<int> ints(int end)
{
  for (int i = 0; i < end; ++i) co_yield i;
}

void usage()
{
  for (auto&& x : ints(100))
  {
    x *= 3; // Whoops, this just modified the 'i' variable in ints()
  }
}

This behaviour is potentially unexpected and could lead to bugs.

Perhaps a generator<T> should instead yield a sequence of const T&, or possibly just T?

I was initially avoiding having operator* returning a prvalue since it potentially penalises use-cases where the consumer does not need to take a copy of the value and where the generator would otherwise be able to reuse storage for the value. eg.

generator<std::string> read_lines(text_stream source)
{
  std::string value;
  while (true)
  {
    for (char c : source.chars())
    {
      if (c == '\n') break;
      value.push_back(c);
    }
    co_yield value;
    value.clear(); // clear contents, retaining capacity of string.
  }
}

void find_long_lines()
{
  int i = 1;
  // Don't want to force a copy/move of the string for each line here.
  for (const auto& line : read_lines(get_stream()))
  {
    if (line.size() > 80)
      std::cout << i << ": " << line << std::endl;
    ++i;
  }
}

However, making it return const T& could also penalise the consumers that do want to obtain the elements since it prevents the use of move-constructor. Perhaps in such cases the function should return a generator<T&&> which yields a sequence of T&& which would allow the consumer to decide whether or not to move the result.

io_service question

It would be good to cover the vision and strategy behind io_service in some doc or wiki. This is needed to have organized approach for supporting a generic platform.

  1. Why program platform-specific IO polling backend directly in this project instead of integrating with one of omni-platform IO polling libraries such as libevent, libev, libuv, or boost asio.

  2. Re boost in particular, as I understand it, Networking TS is based on boost asio, and it has been accepted for C++20(?). So, would it not be best to integrate with boost io_service from this TS?

  3. IO polling is inevitably related to multithreading. There are several popular models: single-threaded event loop (ex: libevent), single threaded polling with multithreaded event processing (ASIO), fully multithreaded polling and processing (GRPC). Where is this project stand in this regard? Is there some specific set of goals in mind? Perhaps, there is a perimeter beyond which this project should not go and leave the rest for custom integrations?

Add blocking `io_service::process_events_until_complete(task)` function

Once eager tasks are eliminated in #29 it will make it no longer possible to start executing a task on a single thread that enters the io_service::process_events() event loop.

The only way to start a task will be sync_wait(task) introduced in #27, however you can't then (easily) enter the process_events() event loop to process I/O completion events that are raised.

Adding an io_service::process_events_until_complete(task) function would allow starting a lazy_task and then entering the io_service event loop in such a way that it will exit from the event loop once the provided task completes.
eg.

lazy_task<> run(io_service& io);

int main()
{
  io_service io;
  io.process_events_until_complete(run());
  return 0;
}

Add shared_task<T> and shared_lazy_task<T> classes

The ability to have multiple consumers wait on the result of a task is required for some scenarios.
eg. where you want to pass a prerequisite task into multiple sub-tasks that each need to await that task.

The task<T> and lazy_task<T> classes are move-only and support only a single awaiting coroutine at a time.

This issue is proposing to add a shared_task<T> class and a shared_lazy_task<T> class that support copy-construction and assignment with reference-counting semantics and support multiple concurrent awaiting coroutines.

It should be possible to implement in a lock-free fashion using std::atomic pointers.

Documentation clarification

in task.hpp there is

/// \brief
	/// A lazy task represents an asynchronous operation that is not started
	/// until it is first awaited.
	///
	/// When you call a coroutine that returns a task, the coroutine
	/// simply captures any passed parameters and returns exeuction to the
	/// caller. Execution of the coroutine body does not start until the
	/// coroutine is first co_await'ed.
	///
	/// Comparison with task<T>
	/// -----------------------
	/// The lazy task has lower overhead than cppcoro::task<T> as it does not
	/// require the use of atomic operations to synchronise potential races
	/// between the awaiting coroutine suspending and the coroutine completing.
	///
	/// The awaiting coroutine is suspended prior to the task being started
	/// which means that when the task completes it can unconditionally
	/// resume the awaiter.
	///
	/// One limitation of this approach is that if the task completes
	/// synchronously then, unless the compiler is able to perform tail-calls,
	/// the awaiting coroutine will be resumed inside a nested stack-frame.
	/// This call lead to stack-overflow if long chains of tasks complete
	/// synchronously.
	///
	/// The task<T> type does not have this issue as the awaiting coroutine is
	/// not suspended in the case that the task completes synchronously.
	template<typename T = void>
	class task

The section Comparison with task<T> is not clear: what is being compared with task<T>? This IS the task<T> definition after all.

Btw, what is the best venue to ask similar questions in the future?

Add an async disruptor/ring-buffer queue abstraction for buffered communication between producer/consumer coroutines

We have async_generator<T> that can be used for producer/consumer coroutines to communicate, however it has no buffering capability which means that the elements can only be generated and processed one at a time.

We should look at something similar to https://github.com/lewissbaker/disruptorplus but that uses co_await and coroutine suspension as a wait-strategy for handling the full/empty buffer cases.

  • Add support for a single-threaded producer as well as a multi-threaded producer.
  • Add a sequence_barrier abstraction.
  • Needs integration with schedulers to allow producer to resume consumer asynchronously.

I have some code kicking around in a side-project that I can port to cppcoro.

Split documentation into separate files/sections

The documentation is currently all sitting in the README which is getting pretty long now.

The README should be kept pretty high-level and contain a table-of-contents with some motivating examples, status and build instructions.

API docs for individual classes/abstractions should be moved out to separate files under a top-level docs/ folder.

Support running async cleanup operations when an async_generator is destroyed early

@ericniebler's recent blog entry, Ranges, Coroutines, and React: Early Musings on the Future of Async in C++, contained the following code-snippet that showed how async_generator<T> could be used to read a file in chunks:

auto async_file_chunk( const std::string& str ) -> async_generator<static_buf_t<1024>&>
{
  static_buf_t<1024> buffer;
 
  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(), &openreq, str.c_str(), O_RDONLY, 0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(), &readreq, file, &buffer, 1, -1);
      if (result <= 0)
        break;

      buffer.len = result;
      co_yield buffer;
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(), &closereq, file);
  }
}

Unfortunately, this snippet contains a flaw in that if the consumer stops consuming the elements of the async_generator before it reaches the end() of the sequence then the file handle will never be closed.

Currently, when the async_generator object is destructed it calls coroutine_handle<>::destroy() which destroys the coroutine frame and any object that was in-scope at the co_yield statement where the producer coroutine last produced a value. The coroutine does not get a chance to execute any code to clean up resources other than via running destructors. This allows you to perform synchronous cleanup operations via usage of RAII types but it means you can't perform any async cleanup operations (like the async fs_close example above, or gracefully shutting down a socket connection).

What if, instead of destroying the coroutine frame when the async_generator is destroyed we resume the generator coroutine but instead make the co_yield expression return an error-code (or throw a generation_cancelled exception)? This would then allow the coroutine to respond to the cancellation request and perform any cleanup operations.

For example, if the co_yield expression were to return an enum value of generator_op:::move_next or generator_op::cancel then the main-loop of the above snipped could have been modified thus:

auto async_file_chunk( const std::string& str ) -> async_generator<static_buf_t<1024>&>
{
  static_buf_t<1024> buffer;
 
  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(), &openreq, str.c_str(), O_RDONLY, 0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(), &readreq, file, &buffer, 1, -1);
      if (result <= 0)
        break;

      buffer.len = result;

      // The next two lines are the only ones that have changed.
      generator_op yieldResult = co_yield buffer;
      if (yieldResult == generator_op::cancel) break;
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(), &closereq, file);
  }
}

If the coroutine subsequently tried to execute another co_yield expression after it had been cancelled then the co_yield expression would complete immediately again with the generator_op::cancel result.

There are a couple of issues with this approach, however:

  1. The first is that the syntax for writing a correct producer coroutine is now more complicated (you need to check return-value of co_yield expression).
    This could be somewhat alleviated by either throwing an exception (which I'm also not liking that much since I don't like to use exceptions for expected/normal control-flow)
    Or it could be an opt-in behaviour (eg. by constructing some scoped object on the stack, or otherwise communicating with the promise that you want to continue after a cancellation request)
  2. The continued execution of the coroutine now represents a detached computation that you cannot synchronise against, or otherwise know when it will complete. eg. If it is closing a file asynchronously I may still want to know when the file has been closed so that I can delete it or open it again once the file lock has been released.

The second issue is perhaps the greater one here.

This could possibly be mitigated by requiring the caller to explicitly start the production of the generator by awaiting a task<> returned by the async_generator<T>::produce() member function.
With such a task, the generator would not start executing until both the produce() task and the task returned by begin() had been awaited. The caller would need to use something like when_all() to concurrently execute both tasks.

The producer task would not complete until the generator coroutine has run to completion.
The consumer task is free to destroy the async_generator and complete early without consuming the entire sequence. This would send a cancel request to the producer task so it can terminate quickly.

There are potential issues with composability of such an async_generator that I haven't yet worked through.

Make async_mutex::lock_async behaviour more explicit by introducing a scoped_lock_async() method

The async_mutex::lock_async() method currently has the dual behaviour of allowing you to lock the mutex in such a way that requires manual unlocking (ie. no RAII) as well as allowing you to encapsulate the lock in an async_mutex_lock object that will ensure the lock is released when the lock object goes out of scope.

The behaviour is currently chosen based on whether the caller assigns the result of the co_await m.lock_async() expression to an async_mutex_lock variable.

task<T> f(async_mutex& m)
{
  // Needs manual unlocking
  co_await m.lock_async(); 
  m.unlock();
}

task<T> g(async_mutex& m)
{
  // Lock encapsulated in RAII object
  async_mutex_lock lock = co_await m.lock_async(); 
}

This seems potentially error-prone way of doing things.

Consider having lock_async() always require manual unlocking and introduce a new scoped_lock_async() that returns an async_mutex_lock object.

Remove use of std::atomic in task and async_generator once symmetric transfer is available

The current implementation of task and async_generator make use of atomics to arbitrate between a potential race between the awaiting consumer suspending while waiting for a value to be produced and the completion of the next value.

An alternative approach which doesn't require use of atomic operations is to suspend the awaiting coroutine first, attach its coroutine_handle as the continuation of the task/async_generator and then resume the task/async_generator. When the task/async_generator produces a value, it calls .resume() on the coroutine handle. It can do this without needing any conditionals or atomic operations since the continuation is attached before the coroutine starts executing.

The problem with this alternative approach is that it can lead to stack-overflow if the coroutine produces its value synchronously and the awaiting coroutine awaits many tasks in a row that all complete synchronously. This is because the awaiting coroutine resumes the producer coroutine inside an await_suspend() method by calling coroutine_handle::resume(). If the producer coroutine produces a value synchronously then inside the call to coroutine_handle::resume() it will call await_suspend() at either a co_yield or final-suspend point and then resume the continuation by calling coroutine_handle::resume() on the awaiting coroutine.

For example, if you have a coroutine foo that continually awaits task<> bar() which completes synchronously then you can end up with a stack-trace that looks like:

  • foo [resume]
  • task::promise_type::final_awaiter::await_suspend()
  • bar [resume]
  • task::await_suspend()
  • foo [resume]
  • task::promise_type::final_awaiter::await_suspend()
  • bar [resume]
  • task::await_suspend()
  • foo [resume]
  • etc...

Under Clang optimised builds these calls can be performed as tail-calls which avoids the stack-overflow issue (they are all void-returning functions).
However this isn't guaranteed - Clang debug and both MSVC optimised and debug builds are not able to perform tail-calls here.

To work around this, the current implementation first resumes the task's coroutine and waits until it suspends. Then it checks to see if it completes synchronously and if so then returns false from await_suspend() to continue execution of the awaiting coroutine without incurring an extra stack-frame. However, this means we then need to use atomics to decide the race between the producer running to completion on another thread and the consumer suspending on the current thread.

@GorNishanov has recently added an extension to Clang that allows returning coroutine_handle from await_suspend() that will be immediately resumed, but this time with guarantee that the compiler will perform a tail-call resumption of the returned coroutine. This is called "symmetric transfer" and allow suspending one coroutine and resuming another without consuming any additional stack-space.

Once this capability is fully implemented in clang and is also available in MSVC we can get rid of the use of atomics in task and async_generator and make use of symmetric transfer to avoid stack-overflow instead. This should improve performance of these classes.

Make when_all_ready return task<tuple<expected<T>...>>

Rather than returning a collection (tuple/vector) of the original awaitable objects from when_all_ready, we should consider returning a collection of the results encapsulated in some kind of expected<T, std::exception_ptr> type.

This would allow us to generalise when_all_ready to support arbitrary awaitables rather than being restricted to just supporting task and shared_task awaitables.

It would also have the benefit of giving the caller synchronous access to the results without needing to subsequently re-co_await the awaitables to extract the values. This would allow use with awaitables that don't support being co_awaited multiple times.

Add 'bind' operator for task types

Basic usage:

cppcoro::task<A> makeA();
cppcoro::task<B> foo(A a);

cppcoro::task<B> b = makeA() | cpporo:bind(foo);
cppcoro::task<B> b2 = cppcoro::bind(foo, makeA());

Variadic usage:

cppcoro::task<A> makeA();
cppcoro::task<B> makeB();
cppcoro::task<C> func(A a, B b);

cppcoro::task<C> c = cppcoro::bind(func, makeA(), makeB());

Or alternatively, should this be done with an apply operator composed with bind and when_all?

cppcoro::task<C> c =
  cppcoro::when_all(makeA(), makeB())
  | cppcoro::bind(cppcoro::apply(func));

// Equivalent to
cppcoro::task<C> c = [](cppcoro::task<A> a, cppcoro::task<B> b) -> cppcoro::task<C>
{
  co_return co_await std::apply(func, co_await cppcoro::when_all(std::move(a), std::move(b)));
}(makeA(), makeB());

Use custom allocator for when_all_ready_task objects in when_all_ready(std::vector<AWAITABLE>)

With the when_all_ready(std::vector<AWAITABLE>) overloads we need to allocate N coroutine frames, one for each awaitable in the list. As the number of coroutine frames that needs to be allocated is not known at compile time, the compiler will be unable to elide the allocation of the coroutine frames.

Rather than perform N separate memory allocations, it might be useful to use a custom allocator that allocates space for all N coroutine frames up front so that we only perform a single heap allocation.

We can't know the size of the coroutine frame at compile time so we'll need to initialise the allocator with the number of tasks that will need to be allocated. Then when the first task is created the allocator will be passed the coroutine frame size. We can then allocate a pool of size N * coroutine frame size and allocate subsequent coroutine frames from that pool.

Add ability to specify scheduler that coroutine should be resumed on when awaiting various synchronisation operations

task<> f(some_scheduler& scheduler, async_mutex& mutex)
{
  auto lock = co_await mutex.scoped_lock_async();

  // Coroutine is now potentially executing on whatever execution context the
  // prior call to mutex.unlock() that released the mutex occurred.
  // We don't have any control over this here.

  // We can manually re-schedule ourselves for execution on a particular execution context.
  // This means that the mutex.unlock() call has resumed this coroutine only to immediately
  // suspend it again.
  co_await scheduler.schedule();

  // Also when the lock goes out of scope here and mutex.unlock() is called
  // we will be implicitly resuming the next coroutine that is waiting to
  // acquire the mutex. If that coroutine then unlocks the mutex without
  // suspending then it will recursively resume the next waiting coroutine, etc.,
  // blocking further execution of this coroutine until one of the lock holders
  // coroutines suspends its execution.
}

Some issues:

  1. It could be more efficient to directly schedule the coroutine for resumption on the scheduler rather than resuming it and suspending it again.
  2. This unconditionally re-schedules the coroutine, which may not be necessary if we were already executing on the right execution context before the acquiring the lock and we acquired the lock synchronously.

You could do something like this now to (mostly) solve (2):

task<> f(some_scheduler& scheduler, async_mutex& mutex)
{
  if (!mutex.try_lock())
  {
    // This might still complete synchronously if the lock was released
    // between call to try_lock and lock_async.
    co_await mutex.lock_async();

    // Only reschedule if we (probably) didn't acquire the lock synchronously.
    // NOTE: This needs to be noexcept to be exception-safe.
    co_await scheduler.schedule();
  }

  async_mutex_lock lock(mutex, std::adopt_lock);
}

For solving (1) I'm thinking of something like this:

task<> f(some_scheduler& scheduler, async_mutex& mutex)
{
  auto lock = co_await mutex.scoped_lock_async().resume_on(scheduler);

  // Or possibly more simply
  auto lock2 = co_await mutex.scoped_lock_async(scheduler);
}

It may be possible to make this a general facility that is applicable to other awaitables:

auto lock = co_await mutex.scoped_lock_async() | resume_on(scheduler);
// or
auto lock = co_await resume_on(scheduler, mutex.scoped_lock_async());

Add async_auto_reset_event

Add an implementation of async_auto_reset_event that allows multiple concurrent awaiters.

It differs from async_manual_reset_event in that a call to set() releases at one pending waiter rather than releasing all pending waiters.

Add 'transform' operator that can be applied to various monad types

Support applying a function that transforms value of type A to value of type B to:

  • task<A> to produce task<B> (etc. for other task types)
  • generator<A> to prouduce generator<B> (etc. for other generator types)

Example syntax:

B a_to_b(const A& a);
cppcoro::task<A> make_an_a();

cppcoro::task<B> b = make_an_a() | cppcoro::transform(a_to_b);

For task<T> this is kind of similar to the proposed std::future<T>::then() method.

Decide on approach for handling memory allocation failure of coroutine frame

The default approach is for the call to the coroutine function to throw an exception itself if allocation of the coroutine frame fails (typically due to std::bad_alloc).

For coroutine types that return a task, should we instead implement the promise_type::get_return_object_on_allocation_failure method to return a special task value that defers throwing the std::bad_alloc exception until the returned task is awaited?

This would make it possible to declare coroutine functions as noexcept (provided captured parameter copies all have noexcept copy/move constructors).

Mac platform support

I tried to compile this library on Mac using the latest clang/c++ and cmake.
After excluding file io and win32 stuff similarly to how it's done for Linux, the only module that does not compile is lightweight_manual_reset_event.cpp for the reason that futex does not exist on Mac. A natural solve for this is to use std condition variable and mutex. It's not particularly "lightweight" but at least there won't be any compatibility issues which may be useful for other platforms.

I can contribute this code if this is seen useful. But the other part of this puzzle is making cake build work on Mac. Btw, any particular reason cake is used for this project instead of cmake or say scons which is similar ideologically as far as I can tell?

Make lazy_task safe to use in a loop when the task completes synchronously

The lazy_task implementation currently unconditionally suspends the awaiter before starting execution of the task and then unconditionally resumes the awaiter when it reaches the final_suspend point.

This approach means that we don't need any synchronisation (ie. std::atomic usages) to coordinate awaiter and task.

However, it has the downside that if the lazy_task completes synchronously then the awaiter is recursively resumed. This can potentially consume a little bit of stack space every time a coroutine awaits the a lazy_task that completes synchronously if the compiler is not able to perform tail-call optimisation on the calls to void await_suspend() and void coroutine_handle<>::resume() (note that MSVC is not currently able to perform this tail-call optimisation and Clang only does this under optimised builds).

If the coroutine is awaiting lazy_task values in a loop and can possibly have a large number of these tasks complete synchronously then this could lead to stack-overflow.

eg.

lazy_task<int> async_one() { co_return 1; }

lazy_task<int> sum()
{
  int result = 0;
  for (int i = 0; i < 1'000'000; ++i)
  {
    result += co_await async_one();
  }
  co_return result;
}

The lazy_task implementation needs to be modified to not recursively resume the awaiter in the case that it completes synchronously. This, unfortunately means it's going to need std::atomic to decide the race between the awaiter suspending and the task completing.

Nested coroutines possible??

Hi there. Whilst trying to get my head around how coroutines work I had a naive program that ended up bring completely broken. I was wondering if cppcoro has the ability, or could be made to have the ability to build nested coroutines? Here's a toy, pseudo code example to highlight the concept I was trying to achieve.

SleepAwaitable sleep_some()
{
    return SleepAwaitable{1s};    // await_suspends launches thread and sleeps, then coro.resume()
}

SleepAwaitable sleep_launcher()
{
    co_await sleep_some();
}
DoAwaitable do_thing()
{
    co_await sleep_launcher();
    co_await sleep_launcher();   // never executed
}

int main()
{
    do_thing();
}

It's obvious now, but the SleepAwaitable{} thread resumes after co_await sleep_some then returns back to the thread. The do_thing progress has completely gone.

My main reason for asking as that I intend to have a coroutine of data transfer, but at some point I might need to pass control to another thread pool, have some work done (Potentially with it's own co_await's to other async operations) and then return to the original caller.

Thanks

Add support for compiling with Visual Studio 2017

The Cake build system currently relies on the registry to determine whether a particular Visual Studio version is installed and if so where it is installed.

Once lewissbaker/cake#8 has been addressed in Cake then we need to update the config.cake for cppcoro to detect whether VS 2017 is available and if so then configure a new MsvcCompiler tool that makes use of it.

build problem: stddef.not found

Sorry if this is not the right venue for this question, but it looks like a problem with the cppcoro build.
On ubuntu 16.04,
I installed clang 6 using apt and manually build libc++ per your installation instructions.

yfinkelstein@ubuntu16:~/cppcoro$ ls -la /usr/lib/llvm-6.0/
total 48
drwxr-xr-x   8 root root  4096 Aug 23 13:49 .
drwxr-xr-x 166 root root 12288 Aug 23 16:19 ..
drwxr-xr-x   2 root root  4096 Aug 23 13:49 bin
drwxr-xr-x   2 root root  4096 Aug 23 13:49 build
lrwxrwxrwx   1 root root    14 Aug  6 21:32 cmake -> lib/cmake/llvm
drwxr-xr-x   4 root root  4096 Aug 23 15:31 include
drwxr-xr-x   4 root root 12288 Aug 23 15:31 lib
drwxr-xr-x   2 root root  4096 Aug 23 13:49 libexec
drwxr-xr-x   7 root root  4096 Aug 23 13:49 share

libc++ is installed under llvm:

yfinkelstein@ubuntu16:~/cppcoro$ ls -la /usr/lib/llvm-6.0/include/c++
total 12
drwxr-xr-x 3 root root 4096 Aug 23 15:31 .
drwxr-xr-x 4 root root 4096 Aug 23 15:31 ..
drwxr-xr-x 6 root root 4096 Aug 23 15:31 v1

But while building cppcoro I inevitably get this error:

/usr/bin/clang: failed with exit code -2
In file included from lib/async_auto_reset_event.cpp:6:
In file included from ./include/cppcoro/async_auto_reset_event.hpp:8:
In file included from /usr/lib/llvm-6.0/bin/../include/c++/v1/experimental/coroutine:50:
In file included from /usr/lib/llvm-6.0/bin/../include/c++/v1/new:89:
In file included from /usr/lib/llvm-6.0/bin/../include/c++/v1/exception:81:
/usr/lib/llvm-6.0/bin/../include/c++/v1/cstddef:44:15: fatal error: 'stddef.h' file not found
#include_next <stddef.h>
              ^~~~~~~~~~

I decided to also copy stdc++ headers under /usr/include just in case but that does not help.

Below are all versions of stddef.h that I have:

finkelstein@ubuntu16:~/cppcoro$ find /usr -name stddef.h
/usr/lib/llvm-6.0/lib/clang/6.0.0/include/stddef.h
/usr/lib/llvm-6.0/include/c++/v1/stddef.h
/usr/lib/gcc/x86_64-linux-gnu/5/include/stddef.h
/usr/lib/gcc/x86_64-linux-gnu/6/include/stddef.h
/usr/include/linux/stddef.h
/usr/include/c++/v1/stddef.h
/usr/src/linux-headers-4.4.0-92/include/uapi/linux/stddef.h
/usr/src/linux-headers-4.4.0-92/include/linux/stddef.h

I left the 2 key properties in config.cake intact:

  clangInstallPrefix = '/usr'

  # Set this to the install-prefix of where libc++ is installed.
  # You only need to set this if it is not installed at the same
  # location as clangInstallPrefix.
  libCxxInstallPrefix = None # '/path/to/install'

Is there something wrong with my setup?

Thanks!

P.S.
Your project is quite exciting and I'm really curious what kind of performance I would get with thousands of coroutines and multiple threads.

Windows 8/Server 12 is minimum due to WaitOnAddress/WakeByAddressAll

I was doing some test integration of cppcoro into our product but linker errors meant we could not proceed due to the use of WaitOnAddress/WakeByAddressAll. We have a product with a significant number of users on post XP/2K3 but pre Windows 8 and we need to support those platforms.

NOTE: I will code a solution at some point if nothing official is implemented but I wanted to raise this as a potential issue.

Allow 'co_return task' as way of implementing tail-recursion of tasks

If a coroutine wants to return the value of another task as its result then you currently need to co_await the other task first and then co_return the result of that. eg.

lazy_task<T> foo(int n);

lazy_task<T> bar()
{
  co_return co_await foo(123);
}

task_task<> usage()
{
  lazy_task<T> t = bar();
  T result = co_await t;
}

However, this means that the coroutine frame of bar() remains alive until foo() completes as the bar() coroutine is registered as the continuation of the foo() task.

If the coroutine frame of bar() is large then this means we're not releasing the memory/resources of the bar coroutine frame as soon as we could which can lead to additional memory pressure in applications.

It is also potentially problematic for recursive tasks where the recursion depth could be arbitrarily deep, the memory consumption of the call chain could be unbounded.

It would be nice if instead we could perform a tail-recursion here by using co_return task instead of co_return co_await task. This would effectively have the semantics of moving the returned task into the lazy_task<T> object that is being awaited at the top of the call chain. ie. that the result of the returned task becomes the result of the current coroutine.

This would allow the current coroutine frame to be freed before resuming the returned task. This means that in a purely-tail-recursive set of tasks that we'd have at most 2 coroutine frames in existence at one time, even if the recursion had unbounded depth.

eg.

lazy_task<T> foo();

lazy_task<T> bar()
{
  co_return foo(123); // Don't co_await foo() task before returning.
}

task_task<> usage()
{
  lazy_task<T> t = bar();
  T result = co_await t;
}

The co_return foo(123); statement basically moves the lazy_task<T> value returned by foo() call into the t local variable being awaited in usage(). This frees the bar() coroutine frame before then resuming the foo() coroutine.

Split async I/O functionality into a separate 'cppcoro_io' library

As discussed in #46 (comment) the async I/O facilities provided by cppcoro should be split out into a separate library to allow applications to make use of the generic, core components of cppcoro (task, async_generator, when_all etc.) without pulling in the platform-specific async I/O subsystems.

This will allow applications to more easily use other I/O frameworks like libuv, boost::asio or the Networking TS in conjunction with cppcoro.

Add async network/socket capability

Extend I/O support to include support for sockets (at least tcp/ip and udp/ip protocols) using winsock and I/O completion ports on top of cppcoro::io_service.

Needs async methods for: accept, connect, disconnect, send/sendto, recv/recvfrom
Ideally also support gather-send and scatter-recv operations.

This will also need some abstraction for dealing with IP address (IPv4 + IPv6).
Need to look into what the networking TS provides towards this.

Use a proper unit-testing system framework

The tests for cppcoro are currently written using plain functions and standard library asserts.
While these are functional, it would be nice to make use of a system that eliminated the boiler-plate code, made it easier to split tests across different source files, provided better reporting of tests and failures, command-line options for running individual tests.

Add support for Windows thread-pool scheduler/io_context

The io_service class is currently a thin wrapper around Win32 IO completion-port and requires clients to manually manage their own threads to call process_events().

This can be fairly easily used to build a fixed-size thread-pool by spawning N threads and having each thread call io_service::process_events().

However, there are advantages to using the built-in Win32 thread-pool facilities to implement the thread-pool instead of manually spawning/managing your own pool of threads.

One benefit is that the OS is able to dynamically spin up more threads or shutdown idle threads as required.
Another benefit is that multiple libraries/frameworks can share the same underlying pool of threads without needing to go through the same API.

Otherwise, if your application uses, say 3 libraries, each of which create their own thread-pools, then you could end up with 3N threads being created across 3 separate thread-pools. If all of those libraries instead used the OS thread pool then a single pool of threads can be used to multiplex execution of tasks from each of those libraries.

Add support for Linux

While most of cppcoro is platform agnostic, some of the thread-pooling and I/O code is OS-specific.

This issue is for providing an implemention of the cppcoro I/O abstractions for Linux.

Will need to get it building under Clang as a first step. See #3.
Then:

  • Look into writing an epoll I/O event loop, based io_service implementation.
  • Port file and socket (once written) interfaces to Linux.
  • Add build-system support for Linux

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.