Git Product home page Git Product logo

yaclib's Introduction

YACLib

Yet Another Concurrency Library

GitHub license FOSSA status

Linux macOS Windows Sanitizers

Test coverage: coveralls Test coverage: codecov

Discord

Table of Contents

About YACLib

YACLib is a lightweight C++ library for concurrent and parallel task execution, that is striving to satisfy the following properties:

  • Zero cost abstractions
  • Easy to use
  • Easy to build
  • Good test coverage

For more details check our design document and documentation.

Getting started

For quick start just paste this code in your CMakeLists.txt file.

include(FetchContent)
FetchContent_Declare(yaclib
  GIT_REPOSITORY https://github.com/YACLib/YACLib.git
  GIT_TAG main
)
FetchContent_MakeAvailable(yaclib)
link_libraries(yaclib)

For more details check install guide.

For more details about 'yaclib_std' or fault injection, check doc.

Examples

Here are short examples of using some features from YACLib, for details check documentation.

Asynchronous pipeline

yaclib::FairThreadPool cpu_tp{/*threads=*/4};
yaclib::FairThreadPool io_tp{/*threads=*/1};

yaclib::Run(cpu_tp, [] {  // on cpu_tp
  return 42;
}).ThenInline([](int r) {  // called directly after 'return 42', without Submit to cpu_tp
  return r + 1;
}).Then([](int r) {  // on cpu_tp
  return std::to_string(r);
}).Detach(io_tp, [](std::string&& r) {  // on io_tp
  std::cout << "Pipeline result: <"  << r << ">" << std::endl; // 43
});

We guarantee that no more than one allocation will be made for each step of the pipeline.

We have Then/Detach x IExecutor/previous step IExecutor/Inline.

Also Future/Promise don't contain shared atomic counters!

C++20 coroutine

yaclib::Future<int> task42() {
  co_return 42;
}

yaclib::Future<int> task43() {
  auto value = co_await task42();
  co_return value + 1;
}

You can zero cost-combine Future coroutine code with Future callbacks code. That allows using YAClib for a smooth transfer from C++17 to C++20 with coroutines.

Also Future with coroutine doesn't make additional allocation for Future, only coroutine frame allocation that is caused by compiler, and can be optimized.

And finally co_await doesn't require allocation, so you can combine some async operation without allocation.

Lazy pipeline

auto task = yaclib::Schedule(tp1, [] {
  return 1; 
}).Then([] (int x) {
  return x * 2;
});

task.Run(); // Run task on tp1

Same as asynchronous pipeline, but starting only after Run/ToFuture/Get. Task can be used as coroutine return type too.

Also running a Task that returns a Future doesn't make allocation. And it doesn't need synchronization, so it is even faster than asynchronous pipeline.

Thread pool

yaclib::FairThreadPool tp{/*threads=*/4};
Submit(tp, [] {
  // some computations...
});
Submit(tp, [] {
  // some computations...
});

tp.Stop();
tp.Wait();

Strand, Serial executor

yaclib::FairThreadPool cpu_tp{4};  // thread pool for cpu tasks
yaclib::FairThreadPool io_tp{1};   // thread pool for io tasks
auto strand = yaclib::MakeStrand(&tp);

for (std::size_t i = 0; i < 100; ++i) {
  yaclib::Run(cpu_tp, [] {
    // ... parallel computations ...
  }).Then(strand, [](auto result) {
    // ... critical section ...
  }).Then(io_tp, [] {
    // ... io tasks ...
  }).Detach();
}

This is much more efficient than a mutex because

  1. don't block the threadpool thread.
  2. we execute critical sections in batches (the idea is known as flat-combining).

And also the implementation of strand is lock-free and efficient, without additional allocations.

Mutex

yaclib::FairThreadPool cpu_tp{4};  // thread pool for cpu tasks
yaclib::FairThreadPool io_tp{1};   // thread pool for io tasks
yaclib::Mutex<> m;

auto compute = [&] () -> yaclib::Future<> {
  co_await On(tp);
  // ... parallel computations ...
  auto guard = co_await m.Lock();
  // ... critical section ...
  co_await guard.UnlockOn(io_tp);
  // ... io tasks ...
};

for (std::size_t i = 0; i < 100; ++i) {
  compute().Detach();
}

First, this is the only correct mutex implementation for C++20 coroutines as far as I know (cppcoro, libunifex, folly::coro implement Unlock incorrectly, it serializes the code after Unlock)

Second, Mutex inherits all the Strand benefits.

Rescheduling

yaclib::Future<> bar(yaclib::IExecutor& cpu, yaclib::IExecutor& io) {
  co_await On(cpu);
  // ... some heavy computation ...
  co_await On(io);
  // ... some io computation ...
}

This is really zero-cost, just suspend the coroutine and submit its resume to another executor, without synchronization inside the coroutine and allocations anywhere.

WhenAll

yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;

// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
  fs.push_back(yaclib::Run(tp, [i]() -> int {
    return random() * i;
  }));
}

// Will be ready when all futures are ready
yaclib::Future<std::vector<int>> all = WhenAll(fs.begin(), fs.size());
std::vector<int> unique_ints = std::move(all).Then([](std::vector<int> ints) {
  ints.erase(std::unique(ints.begin(), ints.end()), ints.end());
  return ints;
}).Get().Ok();

Doesn't make more than 3 allocations regardless of input size.

WhenAny

yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;

// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
  fs.push_back(yaclib::Run(tp, [i] {
    // connect with one of the database shards
    return i;
  }));
}

// Will be ready when any future is ready
WhenAny(fs.begin(), fs.size()).Detach([](int i) {
  // some work with database
});

Doesn't make more than 2 allocations regardless of input size.

Future unwrapping

yaclib::FairThreadPool tp_output{/*threads=*/1};
yaclib::FairThreadPool tp_compute{/*threads=CPU cores*/};

auto future = yaclib::Run(tp_output, [] {
  std::cout << "Outer task" << std::endl;
  return yaclib::Run(tp_compute, [] { return 42; });
}).Then(/*tp_compute*/ [](int result) {
  result *= 13;
  return yaclib::Run(tp_output, [result] { 
    std::cout << "Result = " << result << std::endl; 
  });
});

Sometimes it's necessary to return from one async function the result of the other. It would be possible with the wait on this result. But this would cause blocking of the thread while waiting for the task to complete.

This problem can be solved using future unwrapping: when an async function returns a Future object, instead of setting its result to the Future object, the inner Future will "replace" the outer Future. This means that the outer Future will complete when the inner Future finishes and will acquire the result of the inner Future.

It also doesn't require additional allocations.

Timed wait

yaclib::FairThreadPool tp{/*threads=*/4};

yaclib::Future<int> f1 = yaclib::Run(tp, [] { return 42; });
yaclib::Future<double> f2 = yaclib::Run(tp, [] { return 15.0; });

WaitFor(10ms, f1, f2);  // or Wait / WaitUntil

if (f1.Ready()) {
  Process(std::as_const(f1).Get());
  yaclib::Result<int> res1 = std::as_const(f1).Get();
  assert(f1.Valid());  // f1 valid here
}

if (f2.Ready()) {
  Process(std::move(f2).Get());
  assert(!f2.Valid());  // f2 invalid here
}

We support Wait/WaitFor/WaitUntil. Also all of them don't make allocation, and we have optimized the path for single Future (used in Future::Get()).

WaitGroup

yaclib::WaitGroup wg{1};

yaclib::FairThreadPool tp;

wg.Add(2/*default=1*/);
Submit(tp, [] {
   wg.Done();
});
Submit(tp, [] {
   wg.Done();
});

yaclib::Future<int> f1 = yaclib::Run(tp, [] {...});
wg.Attach(f1);  // auto Done then Future became Ready

yaclib::Future<> f2 = yaclib::Run(tp, [] {...});
wg.Consume(std::move(f2));  // auto Done then Future became Ready

auto coro = [&] () -> yaclib::Future<> {
  co_await On(tp);
  co_await wg; // alias for co_await wg.Await(CurrentThreadPool());
  std::cout << f1.Touch().Ok(); // Valid access to Result of Ready Future
};

auto coro_f = coro();

wg.Done(/*default=1*/);
wg.Wait();

Effective like simple atomic counter in intrusive pointer, also doesn't require any allocation.

Exception recovering

yaclib::FairThreadPool tp{/*threads=*/4};

auto f = yaclib::Run(tp, [] {
  if (random() % 2) {
    throw std::runtime_error{"1"};
  }
  return 42;
}).Then([](int y) {
  if (random() % 2) {
    throw std::runtime_error{"2"};
  }
  return y + 15;
}).Then([](int z) {  // Will not run if we have any error
  return z * 2;
}).Then([](std::exception_ptr e) {  // Recover from error codes
  try {
    std::rethrow_exception(e);
  } catch (const std::runtime_error& e) {
    std::cout << e.what() << std::endl;
  }
  return 10;  // Some default value
});
int x = std::move(f).Get().Value();

Error recovering

yaclib::FairThreadPool tp{/*threads=*/4};

auto f = yaclib::Run<std::error_code>(tp, [] {
  if (random() % 2) {
    return std::make_error_code(1);
  }
  return 42;
}).Then([](int y) {
  if (random() % 2) {
    return std::make_error_code(2);
  }
  return y + 15;
}).Then([](int z) {  // Will not run if we have any error
  return z * 2;
}).Then([](std::error_code ec) {  // Recover from error codes
  std::cout << ec.value() << std::endl;
  return 10;  // Some default value
});
int x = std::move(f).Get().Value();

Use Result for smart recovering

yaclib::FairThreadPool tp{/*threads=*/4};

auto f = yaclib::Run(tp, [] {
  if (random() % 2) {
    return std::make_error_code(1);
  }
  return 42;
}).Then([](int y) {
  if (random() % 2) {
    throw std::runtime_error{"2"};
  }
  return y + 15;
}).Then([](yaclib::Result<int>&& z) {
  if (!z) {
    return 10;  // Some default value
  }
  return std::move(z).Value();
});
int x = std::move(f).Get().Value();

Requirements

YACLib is a static library, that uses CMake as a build system and requires a compiler with C++17 or newer.

If the library doesn't compile on some compiler satisfying this condition, please create an issue. Pull requests with fixes are welcome!

We can also try to support older standards. If you are interested in it, check this discussion.

We test following configurations:

✅ - CI tested

👌 - manually tested

Compiler\OS Linux Windows macOS Android
GCC ✅ 7+ 👌 MinGW ✅ 7+ 👌
Clang ✅ 8+ ✅ ClangCL ✅ 8+ 👌
AppleClang ✅ 12+
MSVC ✅ 14.20+

MinGW works in CI early, check this.

Releases

YACLib follows the Abseil Live at Head philosophy (update to the latest commit from the main branch as often as possible).

So we recommend using the latest commit in the main branch in your projects.

This is safe because we suggest compiling YACLib from source, and each commit in main goes through dozens of test runs in various configurations. Our test coverage is 100%, to simplify, we run tests on the cartesian product of possible configurations:

os x compiler x stdlib x sanitizer x fault injection backend

However, we realize this philosophy doesn't work for every project, so we also provide Releases.

We don't believe in SemVer (check this), but we use a year.month.day[.patch] versioning approach. I'll release a new version if you ask, or I'll decide we have important or enough changes.

Contributing

We are always open for issues and pull requests. Check our good first issues.

For more details you can check the following links:

Thanks

Contacts

You can contact us by my email: [email protected]

Or join our Discord Server

License

YACLib is made available under MIT License. See LICENSE file for details.

We would be glad if you let us know that you're using our library.

FOSSA Status

yaclib's People

Contributors

fossabot avatar harashimahashi avatar jsteemann avatar kononovk avatar madoka-wizard avatar mbkkt avatar mkornaukhov03 avatar myannyax avatar ri7ay avatar thesalvator avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

yaclib's Issues

Implement IThreadFactory

This class should help to set service values for thread pool threads:

  • Name
  • Priority
  • Callbacks before starting using thread IThreadPool, after ending using thread IThreadPool

It is also a cache of threads between the operating system and a set of user IThreadPool's with a dynamic size or lifetime that is different from the lifetime of the program.

Implement IMailBoxExecutor

This class execute only last N tasks using IExecutorPtr.

To implement use SPSC lock-free fixed-size queue (ring buffer)

Add WhenAll for futures

WhenAll combinator
Container<Future> -> Future<Container>
Container<Future> -> Future
Container maybe any std container or Args...

Add SharedFuture

First of feel free to ask me(@MBkkt) any questions about this task, often the task may not be described in enough detail for devs who don't have the full context.

Make T SharedFuture<T>::Get() const;
"Allocate" the mutex inside the SharedFuture class, not on the stack as is the case with T Future::Get() &&

Write abstraction for void() functor

This abstraction will have to be used in intrusive containers.
Therefore, it needs to be allocated on the heap and there should be pointers to prev/next in it.

And don't forget to write tests

Add WaitGroup

  • Remove CallerCore
  • Test ITask::Cancel
  • Add WaitGroup: wait many futures without their invalidation and without overhead expect one mutex/condvar
struct WaitGroup { // not copy/move
  template<typename T>
  void Add(Future<T>& f) &;

  void Wait() &;
};

Implement IMailBoxTask

IMailBoxTask это ITask с несколькими особенностями

  • Владение ею шарится между пользователем и IExector (Если это не так то не используйте ее)
  • В параметре конструктора она принимает число N, это число сколько раз она исполнится:
    Приведу паттерн использования:
auto task = CreateMailBoxTask(2, lamdda)
// when timer tick
any_executor->Execute(task);

Хочется, чтобы таска исполнялась минимальное число раз между количеством тиков и N.
Собственно N будет строгим минимумом тогда когда тики происходят быстрее чем таска исполняется,

TODO(MBkkt) Лучшее описание

Add WhenAny for futures

WhenAny combinator
Container<Future> -> Future
Container maybe any std container or Args...

Write good callback abstraction

ITask <=> Intrusive node + vtable + virtual void Call() = 0

detail::Task(functor<void()>) -> Functor member + public inheritance ITask, void Call() call functor<void()>

detail::CallbackTask(functor<void(Args...)>) -> Functor member + raw memory for Args + public inheritance ITask, void Call() call functor<void(Args...)>(Args...), method some like SetArgs(Args...)

CreateCallback <=> detail::CallbackTask(functor<void(Args...)>)
when we should execute callback and know its args we:
callback->SetArgs(args...);
tp->Execute(callback)
or if inline callback call
callback->Call(args....)

Codecov fix

Code coverage dont check header files, to avoid checking another libraries.

  • Understand how to fix this for our internal headers

Add clang-tidy

Use all checks, otherwise, we will discuss which ones to disable in PR

Necessary for #6

Lazy Future 1

Check researches and implementations of shared future (e.g. libunifex, articles, etc.). Try to design and implement this abstraction.

  • Add lazy::Run, which returns a special object (Sender concept): save functor on the stack and make method Get which return Future.
  • Make method for Sender with same semantic as Future::Then.

Add CI

  • Use GitHub actions
  • Build library, tests, benchmarks for all checked configurations
    • Linux + Clang
    • Linux + GCC
    • Windows + MSVC
    • OS X + AppleClang
    • Windows + other compilers (if possible)
    • Android + GCC (if possible)
    • Android + Clang (if possible)
    • IOS + AppleClang (if possible)
  • Check with clang-format
  • Check with clang-tidy #8
  • Run tests with sanitizers #7

Improve docs

MAKE IT BETTER

  • #108
  • #107
  • Fix typo, grammar, etc
  • Fix undocumentated API
  • Translate design doc to English and separate design/done/plan

Fix link with headers in doxygen

If we mix headers and links in markdown it incorrect render in doxygen. (For example, check SANITIZERS.md). Check how to fix this doxygen bug

Implement AsyncMutex

This class Execute() tasks strictly sequentially using IExecutorPtr.
To implement use Wait-free MPSC queue

Write different implementations of IThreadPool and benchmark it

  • Implement good TestAndSet spinlock

Write different implementations IThreadPool

  • IntusiveList with std::mutex
  • IntusiveList with spinlock
  • IntusiveList with std::mutex per thread
  • IntusiveList with spinlock per thread
  • Michael Scott queue
  • Michael Scott queue per thread
  • Lock-free MPSC queue with std::mutex for consumers
  • Lock-free MPSC queue with spinlock for consumers
  • Lock-free MPSC queue per thread
  • Lock-free MPSC queue with std::mutex and per-thread SPSC queue for consumers
  • Lock-free MPSC queue with spinlock and per-thread SPSC queue for consumers
  • Lock-free deque
  • Lock-free deque per thread

And whatever else you can invent

  • Benchmark their

P.S. per thread means you should use work-stealing or work-distribution

Write benchmarks for IThreadPool

Add benchmarks relative to the master.
It is proposed to measure push and execution of 4 types:
A task that is executed faster than

  1. 1 us
  2. 1 ms
  3. 1 sec
  4. 1 min

Add documentation

Write basic documentation using GitHub Flavored Markdown:

  • /README.md - table of contents = links to other documentation entries.
  • /docs/README.md - library overview.
  • /docs/INSTALL.md - prerequisites and install options.
  • /docs/DEPENDENCIES.md - library build dependencies.
  • /docs/CODESTYLE.md - additional code style information (i. e. entries which couldn't be put in .clang-format).

YACLib

  1. Don't forget to add @MBkkt to all PR
  2. If the issue has no assignees or only @MBkkt is listed. You can ask for it yourself.
  3. If @MBkkt is indicated in the issue as the assignee, you need to additionally discuss the implementation with him.

Add stackfull coroutine

Recommend to look at this link https://www.agner.org/optimize

Write fault injection thread interface for STL

https://en.cppreference.com/w/cpp/thread

yaclib::std::

C++ 17:

  • thread
  • get_id
  • yield
  • sleep_for
  • sleep_until
  • mutex
  • timed_mutex
  • recursive_mutex
  • shared_mutex
  • shared_timed_mutex
  • once_flag
  • call_once
  • condition_variable
  • condition_variable_any

C++ 20:

  • jthread
  • counting_semaphore
  • binary_semaphore
  • latch
  • barrier
  • atomic::wait/notify_one/all

Maybe?

  • notify_all_at_thread_exit
  • jthread::stop_token
  • Thread cancellation

Definitely no.

  • promise
  • packaged_task
  • future
  • shared_future
  • async

Improve WhenAll/Any

Main idea to create:

  • Future<void> WhenAll(...); ... can be any type of futures
  • Future<vector<T>> TakeAll(...); ... can be same type of futures
  • Future<std::tuple<T...>> TakeAllArgs(...); ... can be any type of futures
  • Future<void> WhenAny(...); ... can be any type of futures
  • Future<vector<T>> TakeAny(...); ... can be same type of futures
  • Future<std::tuple<T...>> TakeAnyArgs(...); ... can be any type of futures

Create library

  • Use CMake
  • Make library static
  • Create project structure like:
    • /                                        // root
    • /include/yaclib                  // public header files
    • /include/yaclib/executor   // executors
    • /include/yaclib/future       // future/promise
    • /src                                  // all source and private header files
    • /src/executor                   // executors
    • /src/future                       // future/promise
    • /test                                // tests
    • /bench                            // benchmarks
    • /doc                                // library overview and design docs
    • /example                        // example of library usage

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.