Git Product home page Git Product logo

raftlib / raftlib Goto Github PK

View Code? Open in Web Editor NEW
923.0 923.0 122.0 24.12 MB

The RaftLib C++ library, streaming/dataflow concurrency via C++ iostream-like operators

Home Page: http://raftlib.io

License: Apache License 2.0

C++ 96.78% CMake 2.86% Shell 0.36%
c-plus-plus cmake dataflow dataflow-programming dataflow-structure dataflows dsl hpc ipc machine opencv parallel pthreads qthread-library qthreads raftlib runtime streaming thread thread-library

raftlib's Introduction

Raftlib Test Release

Overview

RaftLib is an open-source C++ Library that provides a framework for implementing parallel and concurrent data processing pipelines. It is designed to simplify the development of high-performance data processing applications by abstracting away the complexities of parallelism, concurrency, and data flow management.

It enables stream/data-flow parallel computation by linking parallel compute kernels together using simple right shift operators, similar to C++ streams for string manipulation. RaftLib eliminates the need for explicit usage of traditional threading libraries such as pthreads, std::thread, or OpenMP, which can lead to non-deterministic behavior when misused.

RaftLib's model provides a lock-free FIFO-like access to communication channels that connect each compute kernel. This design ensures efficient and deterministic data transfer between kernels. The library incorporates auto-parallelization, optimization, and convenient features to simplify the development of performant applications.

Key Components:

  1. Pipelines: The core concept in RaftLib is a pipeline, which represents a sequence of data processing stages. Each stage in the pipeline is implemented as a separate computational unit called a "kernel." Kernels are connected together to form a directed acyclic graph (DAG) that represents the data flow between stages.

  2. Kernels: Kernels are the building blocks of a pipeline and encapsulate the computation performed on the input data. Each kernel can have one or more input ports and output ports, allowing data to flow between stages. Kernels are responsible for processing data, applying transformations, and generating output for downstream stages.

  3. Schedulers: RaftLib provides different schedulers to control the execution and parallelism of the pipeline. Schedulers determine how and when kernels are executed, taking into account factors like data dependencies, load balancing, and available computational resources. The library offers various scheduling strategies, including static scheduling, dynamic scheduling, and hybrid scheduling.

  4. Data Flow Management: RaftLib handles the data flow between kernels automatically. It manages the movement of data between stages, ensuring that input data is available when needed and that output data is delivered to the correct destination. The library provides mechanisms for handling backpressure and buffering, allowing efficient processing of data streams.

  5. Parallelism and Concurrency: RaftLib enables parallel execution of pipeline stages by utilizing the available computational resources efficiently. It supports multi-threading and takes advantage of multiple CPU cores to achieve parallelism. Additionally, it can leverage GPU acceleration for certain kernels, further boosting performance.

  6. Integration and Extensibility: RaftLib provides an API and set of tools for integrating the library into existing applications. It supports interoperability with other libraries and frameworks, making it possible to combine RaftLib with domain-specific tools. The library is extensible, allowing developers to define custom kernels and schedulers to fit specific application requirements.

  7. Fault Tolerance: RaftLib offers mechanisms for handling failures and recovering from errors. It supports fault-tolerant execution by providing checkpointing and recovery capabilities, allowing pipelines to resume from a previous state in case of failures.

Overall, RaftLib simplifies the development of parallel and concurrent data processing applications by providing a high-level abstraction for building data flow pipelines. It allows developers to focus on the computation and data transformations, while the library handles the complexities of parallel execution, data flow management, and fault tolerance.

Feel free to give RaftLib a try! If you encounter any issues, please create an issue request. For minor issues, we recommend joining our Slack group for quick resolutions. We also welcome pull requests from the community! If you're interested in benchmarking, you can send the authors an email. We have started a benchmark collection, but it's a work in progress, and we would be delighted to include your code.

User Group / Mailing List: slack channel

Pre-requisites

Linux

  • Compiler: c++17 capable -> Clang, GNU GCC 5.0+, or Intel icc
  • Latest build runs under Linux with above compilers on both x86 and AArch64, with both pthreads and QThreads.

OS X

  • Compiler: c++17 capable -> Clang, GNU GCC 5.0+, or Intel icc
  • OS X M1 runs, compiles, but has some test case hiccups on templates, but doesn't seem to impact functionality.
  • Note for OS X users without a /user/local, specify an install prefix when using CMake.

Windows

  • Builds and runs under Win10

Cloning repository

Clone using the --recurse-submodules to download the library including all submodules and other libraries

git clone --recurse-submodules https://github.com/RaftLib/RaftLib.git

Downloading Manually

Building the library by cloning the repository is the preferred option. However, when this cannot be done, such as in offline networks, manual downloading of the package is necessary. In such cases, we must also ensure to manually download the corresponding dependencies.

Dependencies

The following submodules are required for building RaftLib, and they need to be placed under their corresponding folders within the git-dep directory:

  • affinity - Provides CPU affinity setting capabilities.

    • Commit: Specify the required commit here.
  • cmdargs - Offers command-line argument parsing functionality.

    • Commit: Specify the required commit here.
  • demangle - Facilitates C++ symbol demangling.

    • Commit: Specify the required commit here.
  • shm - Supports shared memory communication.

    • Commit: Specify the required commit here.

Before building RaftLib, ensure that you download the corresponding commit of each submodule. You can use the following command within the main repository:

git submodule update --init --recursive

After setting up the dependencies, you can proceed with building and using RaftLib as described.

Build and Install

Using a build directory called e.g.: "build":

mkdir build
cd build
cmake ..
make && make test
sudo make install

NOTE: The default prefix in the makefile is:

PREFIX ?= /usr/local

CMAKE flags

OpenCV

If you want to build the OpenCV example, then you'll need to add to your cmake invocation:

-DBUILD_WOPENCV=true 

Examples

Building the examples can be enabled using:

-DBUILD_EXAMPLES=true

Benchmarks

Building the benchmarks can be enabled using:

-DBUILD_BENCHMARKS=false

Tests

Building tests can be disabled using:

-DBUILD_TESTS=false

QThreads

To use the QThreads User space HPC threading library you will need to use the version with the RaftLib org and follow the RaftLib specific readme. This QThreads version has patches for hwloc2.x applied and fixes for test cases. To compile RaftLib with QThreads linked, add the following (assumes the QThreads library is in your path):

-DUSEQTHREAD=1

String names

This is still an experimental feature. Default is to use legacy string-named ports.

-DSTRING_NAMES=1

Pkg-config path

Set the pkg-config path where to install the raftlib.pc configuration file. Leave empty for the application to figure it out.

-DPKG_CONFIG_PATHWAY="<path>"

Generate position independent code

Sometimes the code needs to be integrated into a shared library, for that this flag allows building the library with position independet code (i.e.: with the compiling flag -fPIC supported by both gcc and clang)

-DBUILD_FPIC=1

Using

When building applications with RaftLib, on Linux it is best to use the pkg-config file, as an example, using the poc.cpp example,

g++ `pkg-config --cflags raftlib` poc.cpp -o poc `pkg-config --libs raftlib`

Feel free to substitute your favorite build tool. I use Ninja and make depending on which machine I'm on. To change out, use cmake to generate the appropriate build files with the -Gxxx flag.

Pkg-config

The primary use of pkg-config is to provide the necessary details for compiling and linking a program to a library. This metadata is stored in pkg-config files. These files have the suffix .pc and reside in specific locations known to the pkg-config tool. RaftLib provides a configuration which which is installed together with the library. Once the configuration file is installed, the command pkg-config --cflags raftlib can be used to provide the compiling details.

Following is an example of what is returned by above command:

-std=c++14 -DL1D_CACHE_LINE_SIZE=64 -DPLATFORM_HAS_NUMA=0 -I/usr/local/include

Contribution Guidelines

We welcome contributions to our project! To maintain a clear and organized development history, please adhere to the Conventional Commits message format when making commits.

Conventional Commits

Please follow the guidelines from the Conventional Commits website when crafting your commit messages. This format helps us generate accurate changelogs and automate the release process based on the types of changes you make.

Automatic Releases

We've streamlined our release process to be automated, thanks to the Conventional Commits message format. This ensures that our project maintains a clear versioning scheme and changelog, without the need for manual intervention.

How Automatic Releases Work

When you follow the Conventional Commits message format for your commit messages, our automated release system interprets these messages and determines the appropriate version bump for the project.

  • Commits with fix: in the message trigger a patch version increase.
  • Commits with feat: in the message trigger a minor version increase.
  • Commits with a BREAKING CHANGE: in the message trigger a major version increase.

Here's an example of how it works:

  • If you contribute a bug fix, such as fix: resolve login issue, it will trigger a patch version increase.
  • If you add a new feature, such as feat: implement user profile customization, it will trigger a minor version increase.
  • If your contribution includes a breaking change, such as BREAKING CHANGE: update authentication method, it will trigger a major version increase.

Benefits of Automated Releases

Automated releases offer several benefits to our development workflow:

  • Consistency: Every release follows a standardized versioning scheme.
  • Changelog Generation: Changelogs are automatically generated based on commit messages.
  • Efficiency: Release management is streamlined, saving time and reducing errors.
  • Transparency: Contributors can see how their changes affect the versioning process.

By adhering to the Conventional Commits format, you play a crucial role in ensuring that our project's releases are accurate, well-documented, and hassle-free.

Thank you for your contributions and for helping us maintain a smooth and automated release process!

Warning

We do not support exclamation marks (!) after <type> for triggering breaking changes.

Citation

If you use this framework for something that gets published, please cite it as:

@article{blc16,
  author = {Beard, Jonathan C and Li, Peng and Chamberlain, Roger D},
  title = {RaftLib: A C++ Template Library for High Performance Stream Parallel Processing},
  year = {2016},
  doi = {http://dx.doi.org/10.1177/1094342016672542},
  eprint = {http://hpc.sagepub.com/content/early/2016/10/18/1094342016672542.full.pdf+html},
  journal = {International Journal of High Performance Computing Applications}
}

Other Info Sources

Feel free to e-mail one of the authors of the repo

raftlib's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

raftlib's Issues

Only first of many parallel kernel stages is executed.

I have a pipeline laid out something like this:

map += videoSource <= (convertColor >> resize) >= videoSink;

Where videoSource is a multi-port output, convertColor and resize are all 1-1 input-to-output setups that are clonable and videoSink is a parallel_k fan-in stage.

For some reason, when I run my application only videoSource, convertColor and videoSink are run in this configuration. If I use only convertColor or resize individually, they are each run but if I try to use any two together for the joined parallel kernel, only the first is run.

Am I using the incorrect syntax or missing something else?

raft::kset syntax

I want to make building complex topologies simpler than it currently is. Specifically, I want to be able to do something like the following:

raft::map m;
raft::kernel src,dst;
raft::kernel a, b, c, d;
m += src <= raft::kset( a, b, c, d ) >= dst;

This means that there are links as follows:

src -> a
src -> b
src -> c
src -> d
a -> dst
b -> dst
c -> dst
d -> dst

This syntax seems fairly natural to me, but I wanted to get input from a larger audience before building it into the run-time. I should also note that this syntax addition does not modify any of the current semantics.

-Jonathan

bad alloc exception instead of own library exception when no output ports

E.g. in sumapp.cpp :

input.addPort< T > ( "input_a", "input_b");
// output.addPort< T >( "sum" );

The error message:

terminate called after throwing an instance of 'std::bad_alloc'
what(): std::bad_alloc

Expected is to have an exception from the library itself pointing out that the problem is the missing output port.

break into multiple packages

So I had a great suggestion over the weekend to break the RaftLib framework into multiple packages, that can be downloaded,compiled, and integrated as needed. This could take the form of a cmake that selectively compiles pieces, a bootstrap like solution that build pieces, or even a "package manager" of sorts that builds and installs the matching dependencies.....if there are any thoughts feel free to comment.

One input split in several outputs

Hey Jonathan, hope you're doing well..

Say we have A >> B >> C
A sends one message to B.
B then splits this message into several hundreds of other messages that needs to send to C.

The problem I face is the following:

If I return raft::proceed after B sends a message to C, then the framework does not start B again (because B already consumed the input message from A)

If B sends all messages to C in a loop without returning raft::proceed it might happen that the queue gets saturated with so many messages.

So what is the best way to proceed in this scenario ?

Thanks in advance..

Compiling RaftLib for Windows

Currently RaftLib is using the Makefile system, which works well for Linux and OS X. However, I also need to have RaftLib available in Windows, and available for Visual Studio projects.

At the moment, how can I compile RaftLib in such a way that it can be readily used by Visual-Studio-based projects? If I compile using cygwin Makefile system, then can the library still be usable by Visual Studio?

In the long run, would it be a good idea to migrate RaftLib to Cmake, so that it can automatically generate build scripts for any systems?

Need to support non-POD pointers such as smart pointers

RaftLib is an actor-oriented framework for stream processing, and one of the most important applications are video stream processing, which uses data structures such as cv::Mat from OpenCV or similar data structures, that use non-POD pointers such as smart pointers within the structure.

However, as illustrated in Issue #4, currently non-POD pointers are not supported and the presence of such in objects in queues will cause segfault.

This makes raftlib unusable in any substantial applications that involves complex data structures, and is a shopstopper for RaftLib. Therefore we need to add support of non-POD pointers to RaftLib as top priority

stream persistency, logging/restart

Given that many "big-data" frameworks use some sort of RDD, I think it's time once we get the beta release out to add in selectable persistence and restart zones (won't be as naive as most of the Apache Spark/Storm resilience methods) that are similar to HPC fast checkpoint/restart mechanisms. Currently most frameworks use methods that unnecessarily stress the IO system. The exact implementation is TBD, however I'd like input on how the programmer should specify critical persistency points within the application.

For the example below...we have critical specified however it could be some other descriptor. I'm thinking three would do, to specify low, moderate, critical on restart point. That way the runtime can use the data rates on the edges to dynamically adjust the interval time based on the desired checkpointing criticality.

raft::kernel a,b;
raft::map m;

m += a >> raft::checkpoint::critical >> b;
m.exe();

Would love some input though.

Remove dependency on boost

Currently the entire projects depends on boost library. It looks like the only functionally that is used from boost is

    boost::core::demangle

The boost demangle is just a thin wrapper of include <cxxabi.h>. Removing dependency on boost will allow RaftLib to be easy to integrate. Currently, if boost 1.59 is not installed ( for example I have 1.58 on Ubuntu 16.04) the boost submodules are initialized. In our company we have strict 'no submodule in submodule' policy. So in order to remove boost dependency, I propose one of the following:

  • Replace boost demangle with RaftLib Demangle
  • Instead of reporting demangled name in exception, report mangled name.

I know that in most systems installing boost is not a big deal, but image I have an embedded computer like raspberry pi and only 4GB of disk space.

add diff type stream

Transferring large items is well inefficient. A simple way to get around this is to pass a pointer, but now each worker kernel can modify that location. It's also not immediately obvious to the runtime that you can't send that pointer to another networked location (which won't work). A solution would be to add a non-pointer type that saves only diff changes that worker kernels make to the local "copy" then transmit the "patch" of the diff and the copy only when going over network links. You could also pre-transmit the bulk of the data then transmit the "patch" as it's ready, minimizing latency for networks.

A bit more detailed documentation needed

I know you guys are focused on bringing more features to the lib, but it would be good to provide a bit better documentation and examples (I sent another post about this), so the ones willing to use the current version can fully understand what they can do with it..

E.g.

  • I am not sure how to use signals, I can see raft::fileread uses raft::eof, but not sure what is done with it.
  • I don't see any documentation for the method send()
  • not sure how to use allocate() with e.g. int
  • How to know what are the built-in out-of-the-box kernels ? e.g. raft:fileread, raft:.print, etc..
  • ...

make link syntax less clunky, more c++ like

Issue

I've had several requests to make the syntax for linking compute kernels less clunky and more C++ like. That means keeping the map::link syntax but also supporting something with a bit more polish.

Potential Solution

int
main( int argc, char **argv )
{  
   //generic random kernel instantiations
   kernel f;
   kernel g;
   //explicit declaration of map for example, potentially keep hidden one for compatibility
   map m;
   //in order version
   m >> f >> "a"_o >> "b"_i >> g;
   //out order version
   m >> f >> "a"_o >> raft::ooo >> "b"_i >> g;
   m.exe();
   return( EXIT_SUCCESS );
}

data drop / fifo overrun

Are there any plans to offer any thing like just latching in the last value on the ports, or somewhat equivalently, a FIFO that replaces the oldest value when it overflows?

Would be useful when you are processing data live. The specific use case I'm looking at is using it to prototype opencv filters.

What's the difference between returning raft::proceed and not.

Imagine 2 scenarios:

a. Producer sends an object down the stream
b. Sleeps 10 seconds
c. Returns raft::proceed
d. The library calls my producer again
e. Back to (a)

a. Producer in a loop while(!terminate)
b. Sends an object down the stream and sleeps 10 seconds.
c. Back to (a)

Is there any difference between 1 and 2 ?

commit / cleanup / test networking code

Self explanatory. There's a prototype that was built in the course of my thesis....but I need to make sure that it is extensible. Far too often I find code unreadable, messes of spaghetti code...I don't want that in this project.

about running the examples

hi, I clone all the project from the github and compile the project, but I still can't make the examples, I want to know if the examples don't match the api now?

force thread/process/pool by kernel

In the RaftLib C++Now tutorial (full slide deck here: http://www.jonathanbeard.io/pdf/cppnow2016.pdf) I proposed this syntax for partitioning a VM space:
screen shot 2016-08-22 at 11 32 53 am

Seems intuitive at first, however when you have multiple ports on a single compute kernel then you begin to have issues (i.e. you need to specify the same thing on multiple stream graph edges). So I'm looking at something that works more like this:

raft::map m;

raft::kernel a,c;
/** all of b forced to a single process, call acts as a decorator **/
auto b_proc( raft::parallel::process( b ) );
m += a >> b_proc >> c;
m.exe();

The end goal is of course to be able to force a few kernels to a specific type of resource (i.e., isolated process, thread, etc.). This isn't normally necessary, however, it sometimes comes up when building applications that utilize special IO devices or those that have strange VM behavior.

Any thoughts?

Nested parallelism

I wonder if this is possible:

A <= (B <= C >> D >=E)

E.g. this could work in this way:
A: reads filenames from a file system and send each to B
B: reads file and sends every record to C
C modifies record and sends to D
D modifies record and sends to E
E uploads records to DB and when all records belonging to a certain file are processed, marks file as done.

So the parallelism is on file level and on record level..

I know in the documentation says "not yet implemented nested split/joins" but just double checking..

Question about the map of kernels

Hi Jonathan, hope you're doing fine.

Having following random map:

a --> b --> c    // I.e.: a >> b >> c
        +-> d    // I.e.: b >> d
  +-> e --> f    // I.e.: a >> e >> f

a is the unique source and c, d and f are three different destinations that receive messages from a.

Does the library provide any way for a to know all its destinations ? I.e.: c, d and f

What I want to do is, whenever c, d and f are ready with certain messages (e.g. "commit"), then a can start sending the next batch of messages.

Thanks in advance...

Addressing Feedback from CppCon presentation proposal

I'm looking for some good feedback, on perhaps the hardest part of any open source software project....documentation.

One review in particular is a bit worrisome for me, since the library is intended to be "intuitive" and easy to use. It was also the only non-accept rating, being "borderline." From the RaftLib org members (and the web in general) are there any improvements we can make to the documentation (either the raftlib.io front page, the wiki, or through more blogposts perhaps) to make RaftLib easier to use and understand from the start? The review in question is below and please respond directly via comments (as always, be respectful and constructive). Thanks!!

I was disappointed in the level of description, examples, etc in the Raftlib information I found. Given that I'm at least somewhat familiar with the topic I expected that I would "get" it right away. It was harder than I thought it should be. Also there is no mention of other approaches - HPX for example. I think I know what he means by "Streams" but maybe I don't. Of course this is partly due to the usage of "stream" in the standard library which has a clear meaning - but I'm hoping the authors "Streams" aren't related. There are a number of data flow implementation methods that I'm thinking are more promising - in particular the Standard library ranges proposal in development. I have the feeling that this is a work in progress rather than something that programmers are expected to be able to use right now.

More memory leaks

Thanks for fixing the previous leak, now it works fine.
I have found a couple more:

valgrind --leak-check=full ./readtest cmake_install.cmake
valgrind --leak-check=full ./rbzip2 -i alice.txt -o alice.bz

prefetch buffer control

Is there anyway to control the port buffer size? For example. I have one kernel to download data and another kernel to process the data. Downloading (prefetching) is much faster than processing which uses a lot of disk space. Is there a way to control the prefetched (port) buffer size and halt the downloading kernel when port buffer is full?

Question: Supporting dynamic changes to the topology once the runtime is started?

The use case I am thinking of relates to demultiplexing media transport streams (MPEG Transport Streams).

In this scenario an MPEG TS multiplexed stream can contain multiple elementary video and audio streams (utilising various codecs) that can appear or disappear as the multiplexed stream progresses.

As an example, in case a new audio stream is discovered it would be desirable to add an output on the demultiplexer kernel and connect a new decoder kernel to this output. Similarly to remove kernels when an elementary stream disappears.

How to add a spy to port ?

Hi, I have the following scenario.
I have 4 kernels that do processing of some sort. Each of them has one input and one output ( first and last are single output/ single input).

 m += a >> b >> c >> d;

I am designing a 'Spy' type of kernels that I want to attach to specific ports and monitor the content that passes though. They will not modify anything, just read the content. Image this will be used for visualization purposes, testing, logging or data dump.

One solution I found is to create an sky kernel and include it in the production chain.

 m += a >> spy >> b >> c >> d;

However, this has the disadvantage that I have to edit the production chain code.

I would much more like to write something like:

spy.attach( a, b);

Is there any way I can do this with raftlib ? Can you provide an example. Thank in advance.

Missing include

On the dev branch, raftinc/defs.hpp is missing an #include <memory> for std::unique_ptr. This causes compilation to fail with g++ 6.4 on Linux (Debian sid).

Suggestion : Dynamic Pipelines

Hello,
I have pipeline defined in textfile like OperationA para1 para2|OperationB para1 para2 para3|OperationC para1 where OperationA takes 2 input parameters , OperationB takes 3 parameters

(I have already defined these operations as functions, all these functions have same returntype )

Can someone provide me pointers, how can I read this from text and create a pipeline dynamically in code for this.

(The operation names can vary , depending upon whats defined in text file)

How to "ZeroCopy" from source to destination ?

Hi there,

Great job with the lib.
Let's say I have A >> B >> C. Is it possible to allocate data on A and just release it on C ?
I don't seem to understand how to do it ..

Thanks..
PS: in a different post I provided a "poc.cpp" where I show (more or less) what I want to do ..

Some macro definitions make problems

The header file /usr/local/include/raftinc/kernelpreempt.hpp

defines following macro #define restore( k ) longjmp( k->running_state, 1 )

"restore" is a keyword that anybody could use in their code.
Being defined as a macro can cause some problems later on.

For example when including some boost header files I get following error message:

/usr/include/boost/io/ios_state.hpp:47:11: error: expected identifier before โ€˜->โ€™ token
void restore()

The code at that line gets messed up with the previous macro
void restore() { s_save_.flags( a_save_ ); }

So maybe Raftlib can define macros in a more unique way, say e.g. with a namespace ? Maybe as
#define RAFTrestore( k ) longjmp( k->running_state, 1 )

Exception when unused output port but segmentation fault when unused input port

Test done on sumapp.cpp example:

if I add one output port
output.addPort< T >( "sum", "test" );

as expected I get an exception:

terminate called after throwing an instance of 'AmbiguousPortAssignmentException'
what(): One port expected, more than one found!
Output port from source kernel (Sum) has more than a single port.

but if I add another input port:
input.addPort< T > ( "input_a", "input_b", "test" );

I get a segmentation fault.

Not sure if that is the right behavior?

seg fault -> bug report via e-mail

abc@abc-virtual-machine:/mnt/hgfs/svn/visionframework/examples$ gdb
./testraft10GNU gdb (Ubuntu 7.10-0ubuntu1) 7.10
Copyright (C) 2015 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later http://gnu.org/licenses/gpl.html
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
http://www.gnu.org/software/gdb/bugs/.
Find the GDB manual and other documentation resources online at:
http://www.gnu.org/software/gdb/documentation/.
For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from ./testraft10...done.
(gdb) run
Starting program: /mnt/hgfs/svn/visionframework/examples/testraft10
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7fffdfa4e700 (LWP 9071)]
[New Thread 0x7fffdf24d700 (LWP 9072)]
[New Thread 0x7fffde6c8700 (LWP 9073)]
[New Thread 0x7fffddec7700 (LWP 9074)]
[New Thread 0x7fffdd6c6700 (LWP 9075)]
[New Thread 0x7fffdcec5700 (LWP 9076)]
[New Thread 0x7fffcffff700 (LWP 9077)]
[New Thread 0x7fffcf7fe700 (LWP 9078)]
[New Thread 0x7fffceffd700 (LWP 9079)]
[New Thread 0x7fffce7fc700 (LWP 9080)]
[New Thread 0x7fffcdffb700 (LWP 9081)]

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7fffceffd700 (LWP 9079)]
0x00000000004053ae in __gnu_cxx::__exchange_and_add (__mem=0x81c, __val=-1)
at /usr/include/c++/5/ext/atomicity.h:49
49 { return __atomic_fetch_add(__mem, __val, __ATOMIC_ACQ_REL); }
(gdb)
(gdb) backtrace
#0 0x00000000004053ae in __gnu_cxx::__exchange_and_add (__mem=0x81c, __val=-1)

at /usr/include/c++/5/ext/atomicity.h:49

#1 0x0000000000405445 in __gnu_cxx::__exchange_and_add_dispatch (__mem=0x81c,

__val=-1) at /usr/include/c++/5/ext/atomicity.h:82

#2 0x000000000040a395 in

std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release
(this=0x814) at /usr/include/c++/5/bits/shared_ptr_base.h:147
#3 0x000000000042c042 in

std::__shared_count<(__gnu_cxx::_Lock_policy)2>::operator=
(this=0x7fffd00054d8, __r=...)
at /usr/include/c++/5/bits/shared_ptr_base.h:678
#4 0x000000000042800d in std::__shared_ptr<cv::Mat,

(__gnu_cxx::_Lock_policy)2>::operator= (this=0x7fffd00054d0)
at /usr/include/c++/5/bits/shared_ptr_base.h:867
#5 0x0000000000428037 in std::shared_ptrcv::Mat::operator= (

this=0x7fffd00054d0) at /usr/include/c++/5/bits/shared_ptr.h:93

#6 0x0000000000428061 in Foocv::Mat::operator= (this=0x7fffd00054d0)

at testraft10.cpp:33

#7 0x0000000000429c1e in RingBufferBase<Foocv::Mat,

(Type::RingBufferType)0>::local_push (this=0x7fffd0001640,
ptr=0x7fffceffca30,
signal=@0x7fffceffc9dc: raft::none)
at /usr/local/include/raft_dir/ringbufferheap.tcc:576
#8 0x000000000040876c in FIFO::push<Foocv::Mat&> (this=0x7fffd0001640,

item=..., signal=raft::none) at /usr/local/include/raft_dir/fifo.hpp:202

#9 0x000000000042b31a in OneToMany<cv::Mat, 2ul>::run (this=0x6c77d0)

---Type to continue, or q to quit---
at testraft10.cpp:91
#10 0x0000000000439382 in Schedule::kernelRun(raft::kernel*, bool

volatile&, __jmp_buf_tag () [1], __jmp_buf_tag () [1]) ()
#11 0x000000000043981c in simple_schedule::simple_run(void*) ()
#12 0x00007ffff6b386aa in start_thread (arg=0x7fffceffd700)

at pthread_create.c:333

#13 0x00007ffff6656eed in clone ()

at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

(gdb)

0.pdf

Memory leaks

It seems there are some memory leaks in the library.

valgrind --leak-check=full ./sumapp

add leaky buffer raftmanip stream modifier

So networking pipes have the concept of dropping packets when buffers are full. It seems like this could be used in many streaming data applications. Looks like it would be fairly easy to implement with the stuff I already build for the parallel process forcing (in one of the branches).

Need a more real-life-like example

All examples provided include a kernel that is part of the library distribution, e.g. raft::random_variate(), raft::print(), raft::filereader(), etc...

Having to check the source code to know how to build a producer shouldn't really be the way to go.

It would be good to have one or (preferable) more examples showing how to create all kind of kernels..

E.g. like file attached...

g++ -g -o poc poc.cpp -lraft -pthread

poc.zip

  • edited to fix link - jcb 3 July 2017

Segmentation fault when no input port available

On the example you sent me if we comment out the input port on the consumer, we get a segmentation fault. It would be nice to have an exception instead..

/**
 *
 * Proof of concept Raftlib
 *
 * Want to have a 3 kernels stream which produces and sends 10 numbers down the stream.
 * The 10 numbers should be created by the first kernel and destroyed by the last.
 *
 */
#include <raft>
#include <raftio>


struct big_t
{
    int i;
    std::uintptr_t start;
#ifdef ZEROCPY
    char padding[ 100 ];
#else
    char padding[ 32 ];
#endif
};



/**
 * Producer: sends down the stream numbers from 1 to 10
 */
class A : public raft::kernel
{
private:
    int i   = 0;
    int cnt = 0;

public:
    A() : raft::kernel()
    {
        output.addPort< big_t >("out");
    }

    virtual raft::kstatus run()
    {
        i++;

        if ( i <= 10 ) 
        {
            auto &c( output["out"].allocate< big_t >() );
            c.i = i;
            c.start = reinterpret_cast< std::uintptr_t >( &(c.i) );
            output["out"].send();
        }
        else
        {   
            return (raft::stop);
        }

        return (raft::proceed);
    };
};

/**
 * Consumer: takes the number from input and dumps it to the console
 */
class C : public raft::kernel
{
private:
    int cnt = 0;
public:
    C() : raft::kernel()
    {
        //input.addPort< big_t >("in");
    }

    virtual raft::kstatus run()
    {
        auto &a( input[ "in" ].peek< big_t >() );
        std::cout << std::dec << a.i << " - " << std::hex << a.start << " - " << std::hex <<  
            reinterpret_cast< std::uintptr_t >( &a.i ) << "\n";
        input[ "in" ].recycle(1);

        input[ "in" ].recycle(1);
        return (raft::proceed);
    }
};

int main()
{
    A a;
    C c;

    raft::map m;
    
    m += a >> c;

    m.exe();

    return( EXIT_SUCCESS );
}

what is `#include <cmd>`?

source : examples/general/pi/pisim.cpp

error detail:
RaftLib-RaftLib-d6e6fa3/examples/general/pi/pisim.cpp:29:15: fatal error: cmd: No such file or directory
#include

I can't find any about cmd info.

Proper behavior for output port being sent to two input ports

I'm not really sure from the documentation what the expected behavior is for this, but it's crashing right now so that likely isn't it.

If you have one output node and you connect it to two different kernels; something like:

m += a >> b; m += a >> c;

what should the behavior be for this construct? My thinking on this was that it should duplicate the stream; it seemed like splitting (send some to B, some to C) had a more explicit syntax.

I have a test case written up for this here: master...jdavidberger:duplicateTest. It also has an assert setup where the UB is that causes the crash.

I suspect that this is already somewhat on the radar, the thing I really wanted to know here is if the behavior I expected was the actual target behavior. If it is, I can also PR the testsuite addition.

For now I'm just using an explicit block that duplicates all inputs to n which works as expected.

Mistakenly reports "C++11 alignas unsupported" with g++ 6.3

It seems that __alignof_is_defined isn't defined despite alignof being supported. This causes a truckload of warnings.

Since the ALIGNOF macro is never used, is there any reason not to remove it?

I'm using g++ 6.3 on Debian unstable, if that's helpful. No messages are emitted when using clang++ 4.0.

OpenCV / Surveyor pattern / Nanomsg

Hi guys,

Hope you are all well !

I was checking out your cool library for a project I am working on based on OpenCV and Dlib.

In a nutshell, I would like to create a graph of distributed pre and post processing tasks on a stream web-camera image on iOS . In a nutshell, I would like to use RaftLib as in your example with openCV, and add a layer the surveyor protocol from the nanomsg library, to execute a flow like the following:

Events flow example:

  1. Skip frame, execute chain of processes every 3 frames
  2. Resize frame
  3. Filter frame, eg: if the image is blurry, stop the chain if not valid quality
  4. Rotate frame
  5. Mirror frame
  6. Distributed Processing frame
    • dlib's faciallandmark detection process (timeout 1s)
    • inverted visual dictionary (simple bag of words based on Features2d module) (timeout 1s)
  7. Processing locks
    • if face detected stop the pre-processing steps, and block request for marker detection.
    • if a sub-process return no results, return to the default processing flow.

Questions:

  1. Would it be possible to do such processing flow with RaftLib ?
  2. Is it possible to couple Raftlib with puffin-buffer and puffin-stream to aggregate results ?
  3. As I saw that Raftlib is really performance oriented, what would be the performances bottlenecks in such flow based processing ?

Refs:

Have a great day !

Cheers,
Richard

Parallelization with more than one port

Hey Jonathan,
hope your 2018 is going well.

I have a quick question about parallel streams. Say I want to have following configuration:

 m += a <= b ["processOne"]=> c
 m +=      b ["processTwo"]=> d

Then the port processOne is multiplied (e.g. processOne0, processOne1,, etc) as many times as threads in c and the same for processTwo and d.

As I understand the current framework does not support this. Is there any specific reason why it is not supported now? And what do you think would be the efforts for introducing something like this (i.e. do you think I can implement it myself) ?

Thanks a lot for your help..

integrate hwloc

Need to build hwloc -> scotch arch topology within the src/partition_scotch.cpp file to get real topology info.

The exact point hwloc needs to integrate is here (starts at line 188)

#ifndef USE_HWLOC      
   //TODO add hwloc topology call
   //add version of call that uses a tree of the hardware from hwloc
   //might need format conversion
   if( SCOTCH_archCmplt( &archdat, cores /** num cores **/) != 0 )
   {
      /** TODO, add RaftLib Exception **/
      std::cerr << "Failed to create architecture file\n";
      exit( EXIT_FAILURE );
   }
#else

qthreads

Convert qthreads to cmake, add in pool scheduler....the basic idea is to use qthreads for user-space threading vs. re-coding assembly every time we move to a new platform. Multiple qthreads can run inside a single kernel thread. Will need to re-examine the exception/ (OS) signal mechanism once this happens and evaluate if there need to be changes.

This might also open up another avenue of research/development as it creates another degree of freedom when scheduling. Do we put a kernel inside a kernel thread, or are there any advantages in leaving it as a heavy weight thread? Right now the idea is to keep heavily communicating kernels highly local so they share as much info via cache as possible.

stream consistency modifiers

We really don't always need perfect FIFO behavior....performance wise we can improve perf quite a bit if we simply add a stream modifier that enables specification of ordering then more performance can be had, especially for the MPMC type FIFOs. I've talked in the past about having a "fuzzy" or "approximate" stream type.

I'd like to come up with a range for this one that specifies everything from normal FIFO behavior to something that looks like a unordered list all the way to a FIFO could potentially do unsafe things like overwrite valid values in the FIFO if the application can tolerate it (the trade-off being better performance for lower quality data integrity).

namespace raft
{
namespace consistency
{
    enum type : manip_vec_t { 
      seq_cst /** sequentially consistent, default **/,
      relaxed /** no ordering constraint, only atomicity and validity of data from kernel a -> b **/,
      drop_mrt /** drop most recently transmitted, use it to overwrite if queue full **/,
      drop_random /** randomly overwrite elements in queue if full **/,
      approximate  /** this could mean approximate floating points, compression, etc., **/
    };
} /** end namespace consistency **/
} /** end namespace raft **/

raft::kernel a,b;
raft::map m;
/** don't care about ordering between a->b only that the messages get there **/
m += a >> raft::manip< raft::consistency::relaxed >::value >> b;
m.exe();

One design point for things like approximate is the best interface to return the epsilon of approximation for the FIFO. One thought is to create a std::numeric_limits like interface to extract it from the port. The other is an equally okay port function that returns the epsilon as a floating point number...and zero if not approximate type.

Does the peek() function provide any timeout ?

Imagine following scenario:

m += a1 >> c["in1"];
m += a2 >> c["in2"];

c loops ports in1 and in2 to consume the corresponding messages from a1 and a2. Now imagine producer a1 does not produce anything. Even though a1 delivers raft::proceed, c gets stuck waiting for a message from it.
I searched in the documentation but could not find any timeout flag for peek().
How would you recommend proceeding ?

Here a little poc:

/**
 *
 * Proof of concept Raftlib
 *
 * Want to have 1 consumer and 2 producers. The consumer should timeout and continue with next
 * producer if first producer does not produce..
 *
 */
#include <raft>
#include <raftio>


struct big_t
{
    int i;
    std::uintptr_t start;
#ifdef ZEROCPY
    char padding[ 100 ];
#else
    char padding[ 32 ];
#endif
};



/**
 * Producer: sends down the stream numbers from 1 to 10
 */
class A1 : public raft::kernel
{
private:
    int i   = 0;
    int cnt = 0;

public:
    A1() : raft::kernel()
    {
        output.addPort< big_t >("out");
    }

    virtual raft::kstatus run()
    {
        i++;

        if ( i <= 10 ) 
        {
            auto &c( output["out"].allocate< big_t >() );
            c.i = i;
            c.start = reinterpret_cast< std::uintptr_t >( &(c.i) );
            output["out"].send();
        }
        else 
        {
            return (raft::stop);
        }

        return (raft::proceed);
    };
};

class A2 : public raft::kernel
{
private:
    int i   = 0;
    int cnt = 0;

public:
    A2() : raft::kernel()
    {
        output.addPort< big_t >("out");
    }

    virtual raft::kstatus run()
    {
        i++;

        if ( i <= 10 ) 
        {
            NULL; //
            sleep(1);
        }
        else 
        {
            return (raft::stop);
        }

        return (raft::proceed);
    };
};

/**
 * Consumer: takes the number from input and dumps it to the console
 */
class C : public raft::kernel
{
private:
    int cnt = 0;
public:
    C() : raft::kernel()
    {
        input.addPort< big_t >("in1");
        input.addPort< big_t >("in2");
    }

    virtual raft::kstatus run()
    {
        if (cnt % 2 == 0)
        {
            try {
                std::cout << "in1: " << cnt << "\n";
                auto &a( input[ "in1" ].peek< big_t >() );
                std::cout << std::dec << a.i << " - " << std::hex << a.start << " - " << std::hex <<  
                    reinterpret_cast< std::uintptr_t >( &a.i ) << "\n";
                input[ "in1" ].recycle(1);
            }
            catch(ClosedPortAccessException& cpae)
            {
                NULL; // continue
            }
        }
        else
        {
            try {
                std::cout << "in2: " << cnt << "\n";
                auto &a( input[ "in2" ].peek< big_t >() );
                std::cout << "yeap\n";
                std::cout << std::dec << a.i << " - " << std::hex << a.start << " - " << std::hex <<  
                    reinterpret_cast< std::uintptr_t >( &a.i ) << "\n";
                input[ "in2" ].recycle(1);
            }
            catch(ClosedPortAccessException& cpae)
            {
                NULL; // continue
            }
        }
        ++cnt;
        return (raft::proceed);
    }
};

int main()
{
    A1 a1;
    A2 a2;
    C c;

    raft::map m;
    
    m += a1 >> c["in1"];
    m += a2 >> c["in2"];

    m.exe();

    return( EXIT_SUCCESS );
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.