Git Product home page Git Product logo

enkits's People

Contributors

aaron-boxer avatar aaronfranke avatar aras-p avatar bkmgit avatar dethraid avatar dg0yt avatar dougbinks avatar erincatto avatar eugeneko avatar gpakosz avatar holyblackcat avatar kadir014 avatar leonvictor avatar nxrighthere avatar pr0g avatar ruby0x1 avatar sergof avatar sourceinsight avatar spnda avatar turtlesimos avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

enkits's Issues

Trouble with chunking.

Hi! First off thanks for this lib.
On to the issue. I have index buffers which I want to manipulate, therefore I'm operating in triplets of consecutive cells in the flat index buffer. I'm failing though to manage the ranges.
How would you go about it?

m_pPinnedTaskListPerThread not initialized

Hello,
in the C++11 branch there is a small oversight. In the constructor of TaskScheduler.cpp m_pPinnedTaskListPerThread is not initialized with NULL. When then calling the Initialze() method the delete[] operator will try to delele m_pPinnedTaskListPerThread (whatever it points to...) and it will crash on runtime when compiled under Linux. MSVC (Visual Studio 15.8.6) will ignore? this (it runs at least without crashing).

On the master Branch m_pPinnedTaskListPerThread ist correctly set to NULL in the constructor.

I would normally suggest (for the C++11 branch) to switch to std::unique_ptr but I understand if you do not want too much divergence in the implementations between the master and C++11 branch.

Thanks for this great and easy to use library.
Best wishes,
Fred

Stall in the C++11 when overflowing the pipe in AddTaskSetToPipe

When adding more than (1<<PIPESIZE_LOG2) elements using AddTaskSetToPipe, once the pipe is full it will start executing subtasks inplace but if the other worker threads are asleep, they will not be awaken.

My simple fix was to add

        // wake up threads early
        if( ( numAdded % 128 ) == 0 )
        {
            if( m_NumThreadsActive.load( std::memory_order_relaxed ) < m_NumThreadsRunning.load( std::memory_order_relaxed ) )
            {
                m_NewTaskEvent.notify_all( );
            }
        }

just after the if( !m_pPipesPerThread[gtl_threadNum].WriterTryWriteFront(...) ) { ... } block.

Taskset dependencies

May I ask if you are planning on adding events in the near future? If you are, I would really like to hear about how you plan on designing them. If not, I would be interested in adding them as a PR.

My motivation is: I am designing a completely asynch image decompressor, so I need to connect up different task sets with events, so that the completion of one task set causes a waiting set to enqueue itself.

Interested to hear your plans for this feature.

Feature request: check return codes for semaphore system calls

I noticed that the return codes for semaphore creation etc. aren't being checked for errors. So, create may fail and caller doesn't get notified. What do you think is the best way of handling error conditions: throw an exception, or return false ? Thanks!

ThreadSanitizer reports

I have been testing latest master (4f9941b).
ThreadSanitizer, enabled under XCode 11.0, is reporting some data races when running unmodified samples.

I am reporting the output of one Data race report for ParallelSum as an example.

==================
WARNING: ThreadSanitizer: data race (pid=56086)
  Read of size 4 at 0x7ffeefbff49c by thread T4:
    #0 enki::TaskScheduler::TryRunTask(unsigned int, unsigned int, unsigned int&) TaskScheduler.cpp:412 (ParallelSum:x86_64+0x100007204)
    #1 enki::TaskScheduler::TryRunTask(unsigned int, unsigned int&) TaskScheduler.cpp:377 (ParallelSum:x86_64+0x1000050d0)
    #2 enki::TaskScheduler::TaskingThreadFunction(enki::ThreadArgs const&) TaskScheduler.cpp:236 (ParallelSum:x86_64+0x100004e04)
    #3 decltype(std::__1::forward<void (*)(enki::ThreadArgs const&)>(fp)(std::__1::forward<enki::ThreadArgs>(fp0))) std::__1::__invoke<void (*)(enki::ThreadArgs const&), enki::ThreadArgs>(void (*&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) type_traits:4361 (ParallelSum:x86_64+0x10000d06d)
    #4 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs>&, std::__1::__tuple_indices<2ul>) thread:342 (ParallelSum:x86_64+0x10000ceb1)
    #5 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs> >(void*) thread:352 (ParallelSum:x86_64+0x10000bf09)

  Previous write of size 4 at 0x7ffeefbff49c by main thread:
    #0 enki::ITaskSet::ITaskSet() TaskScheduler.h:122 (ParallelSum:x86_64+0x100003288)
    #1 ParallelReductionSumTaskSet::ParallelReductionSumTaskSet(unsigned int) ParallelSum.cpp:81 (ParallelSum:x86_64+0x100003ba8)
    #2 ParallelReductionSumTaskSet::ParallelReductionSumTaskSet(unsigned int) ParallelSum.cpp:82 (ParallelSum:x86_64+0x100002e04)
    #3 main ParallelSum.cpp:146 (ParallelSum:x86_64+0x100002390)

  Location is stack of main thread.

  Thread T4 (tid=3398714, running) created by main thread at:
    #0 pthread_create <null>:2673040 (libclang_rt.tsan_osx_dynamic.dylib:x86_64h+0x2aa2d)
    #1 std::__1::__libcpp_thread_create(_opaque_pthread_t**, void* (*)(void*), void*) __threading_support:328 (ParallelSum:x86_64+0x10000be4e)
    #2 std::__1::thread::thread<void (&)(enki::ThreadArgs const&), enki::ThreadArgs, void>(void (&&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) thread:368 (ParallelSum:x86_64+0x10000ba71)
    #3 std::__1::thread::thread<void (&)(enki::ThreadArgs const&), enki::ThreadArgs, void>(void (&&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) thread:360 (ParallelSum:x86_64+0x100006238)
    #4 enki::TaskScheduler::StartThreads() TaskScheduler.cpp:298 (ParallelSum:x86_64+0x100005901)
    #5 enki::TaskScheduler::Initialize(unsigned int) TaskScheduler.cpp:924 (ParallelSum:x86_64+0x10000a687)
    #6 main ParallelSum.cpp:136 (ParallelSum:x86_64+0x10000231a)

SUMMARY: ThreadSanitizer: data race TaskScheduler.cpp:412 in enki::TaskScheduler::TryRunTask(unsigned int, unsigned int, unsigned int&)
==================
ThreadSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.

This is reporting that reading subTask.pTask->m_RangeToRun is a data race

bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ )
{
// ...
 
        if( subTask.pTask->m_RangeToRun < partitionSize )
        {
            SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun );
       }

When declaring ParallelSumTaskSet m_ParallelSumTaskSet; inside struct ParallelReductionSumTaskSet

struct ParallelReductionSumTaskSet : ITaskSet
{
    ParallelSumTaskSet m_ParallelSumTaskSet;
    uint64_t m_FinalSum;

    ParallelReductionSumTaskSet( uint32_t size_ ) : m_ParallelSumTaskSet( size_ ), m_FinalSum(0)
    {
            m_ParallelSumTaskSet.Init( g_TS.GetNumTaskThreads() );
    }

    virtual void    ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ )
    {
        g_TS.AddTaskSetToPipe( &m_ParallelSumTaskSet );
        g_TS.WaitforTask( &m_ParallelSumTaskSet );

        for( uint32_t i = 0; i < m_ParallelSumTaskSet.m_NumPartialSums; ++i )
        {
            m_FinalSum += m_ParallelSumTaskSet.m_pPartialSums[i].count;
        }
    }
}

will initialize ParallelSumTaskSet::m_RangeToRun in the constructor:

    class ITaskSet : public ICompletable
    {
    public:
        ITaskSet()
            : m_SetSize(1)
            , m_MinRange(1)
            , m_RangeToRun(1)
        {}
};

I am not expert on the field, but it looks like a potential false positive, because TryRunTask is executed only after AddTaskSetToPipe.

I try to keep our software clean from all sanitizer reports, so that I can catch real bugs ;)
For this reason if this or other reports look safe, I suggest to add annotations that disable TSAN where appropriate (using no_sanitize("thread")).

Does it make sense for me to report all data races found by the TSAN output?

Oh, and thanks for your excellent work on the library :)

User advice request: deprecate C++98 branch?

Is anyone using the C++98 backwards compatibility branch?

Some of the recent and future changes are going to take a fair amount of time to port, which preferably I'd like to avoid. So I'm considering deprecating the branch.

Pinned task problem

I think there is a problem with TaskScheduler::WakeThreadsForNewTasks() and pinned tasks.

Consider a possible case: what if the number of suspended threads - those waiting the m_pNewTaskSemaphore to be signalled - increases just before the SemaphoreSignal() called, i.e. the value of waiting was not accurate as some threads fell asleep between the check and the signal. In this case some task threads would idle.
Most of the time it's not a big deal, those threads would awake when next task arrives.
(Not sure if it possible, but even if we are so unlucky and all the tasks threads fall asleep just after the check - the calling thread would handle the task itself.)

Now, when using AddPinnedTask() there's a subtle chance that the thread we pinned the task to was suspended as described above:

  • [User thread] calls AddPinnedTask().
  • [Task thread] falls asleep just after the m_NumThreadsWaitingForNewTasks check.
  • The semaphore is either not being released at all or it awakens some threads but the desired one.
  • [User thread] calls WaitforTask() and hangs as the thread the task is pinned to can't handle the request.

This is the problem I ran into while trying to port my code to enkiTS.
Though to be honest I'm not quite sure if it indeed the case and if my assumption is accurate. Parallel programming is hard.

AddTaskSetToPipe stalls main thread

This was covered in #38, but I don't see how to accomplish it. When submitting ~20 tasks on an 8 thread machine, AddTestSetToPipe will stall the main thread, consistent with its documentation:

        // Adds the TaskSet to pipe and returns if the pipe is not full.
        // If the pipe is full, pTaskSet is run.
        // should only be called from main thread, or within a task
        ENKITS_API void            AddTaskSetToPipe( ITaskSet* pTaskSet_ );

I need the main thread to never stall, and AddTastkSetToPipe() to always return immediately. I feel like there's probably a simple solution to this (as you imply in #38) but I don't see it yet.

Valgrind warning

Not sure if this is a false positive, but:

==25938== Conditional jump or move depends on uninitialised value(s)
==25938==    at 0x48C4973: enki::TaskScheduler::WakeSuspendedThreadsWithPinnedTasks() (TaskScheduler.cpp:558)
==25938==    by 0x48C43E4: enki::TaskScheduler::WaitForNewTasks(unsigned int) (TaskScheduler.cpp:456)
==25938==    by 0x48C3953: enki::TaskScheduler::TaskingThreadFunction(enki::ThreadArgs const&) (TaskScheduler.cpp:234)
==25938==    by 0x48C7AB9: void std::__invoke_impl<void, void (*)(enki::ThreadArgs const&), enki::ThreadArgs>(std::__invoke_other, void (*&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) (invoke.h:60)

C++11 change range size?

Hi! I have been using EnkiTS in a game engine and it was extremely easy to set up and use! I have an array of say 500 objects that I want to call a function on, I have a total of 16 worker threads (including the main thread I guess) but when I benchmark it it seems to be about 50% slower than running it single threaded. This was worse than expectations so I decided to put a breakpoint inside of the lambda scope and I noticed that range.start and range.end was 4 objects apart. So it seems like EnkiTS split it into 125 chunks of work. I would prefer to try to split this into fewer, larger chunks to get better performance out of it. How would I go about doing this? Ideally I think I would like to have WORKER_THREADS * 1 or 2 chunks of work.

Here's the code inside of my lambda scope

[&component_list_pair, delta_time](enki::TaskSetPartition range, uint32_t threadnum)
            {
                for(auto i : Range(range.start, range.end))
                {
                    component_list_pair.second[i]->ParallelUpdate(delta_time);
                }
            }

How to prevent master thread from being scheduled?

enkiTS is an excellent library to serve my project, which is running multiple OpenGL contexts on multiple GPUs.
In my design, master thread is running light-weighted workloads, which means it should never be scheduled to run any ITaskSet tasks.
Is it possible?

Pinned task bug ?

Hi.

I wrote some simple test code for pinned task. After it runs a few seconds, it hit the assert:

Assertion failed: pTailPlus1->pNext, file E:\Source\enkiTS-master\src\LockLessMultiReadPipe.h, line 275

Here is my code:

TaskScheduler g_TS;

std::atomic_uint32_t foo = 0;

struct PinnedTaskHelloWorld : IPinnedTask
{
    PinnedTaskHelloWorld()
        : IPinnedTask(1) // set pinned thread to 1
    {}
    virtual void Execute()
    { 
        printf("This will run on the thread 1, %d\n", foo++);
    }
};

PinnedTaskHelloWorld task[100];

int main(int argc, const char * argv[])
{
    g_TS.Initialize(2);

	while (true)
	{
		for (int i = 0; i < 100; ++i)
		{
			g_TS.AddPinnedTask(&task[i]);

			if (i == 50)
			{
				g_TS.WaitforAll();
			}
		}
	}

    return 0;
}

Did I got something wrong ? I'm on VS2019 x64 build.

Thanks.

GCC Warning

From this line
memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));

TaskScheduler.cpp:525:64: warning: ‘void* memset(void*, int, size_t)’ clearing an object of type ‘struct enki::ProfilerCallbacks’ with no trivial copy-assignment; use assignment or value-initialization instead [-Wclass-memaccess]
     memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));

question : support sleep-waiting ?

Hi,
It looks like all WaitFor* methods in enkiTS is busy waiting. In some situations they will result in long spinning time. So what is the preferred/proper way to use sleep-waiting for tasks finishing in enkiTS ?

Feature suggestion: running tasks from non main/task threads

Hi Doug,

enkiTS does not allow running tasks or waiting for completion from threads other than main/task threads, as I understood.

For example, I would like to be able to use the system from rendering thread, which itself is not a task thread, but a full-fledged thread typically running in parallel with the main one. Or from background loading thread which is mostly idle waiting for IO, but uses tasks to decompress/finalize assets.
It could be cool if enkiTS was able to support that. What do you think? Thanks!

-- Aleksei

Crash in Android 11 beta

This is almost certainly an issue on Android's side, not yours, but I wanted to at least bring things to attention. In enki::DefaultAllocFunc(), non-Win32 programs use posix_memalign(). Our 64-bit Android app is segfaulting at launch, and it seems that the call to posix_memalign() is involved in whatever's going wrong. If we replace that with a call to plain malloc(), our app carries along running "fine". By "fine", I mean this isn't significantly tested or shipped beyond my local build going from segfault-at-launch to looking like all's well.

Like I said, probably an Android 11 beta issue-- which I'm testing on a Pixel phone-- but at least wanted to make sure you were aware.

Edit: correction/clarification. It's not the call to posix_memalign() itself that's segfaulting. When TaskScheduler::StartThreads() runs m_pTaskCompleteSemaphore = SemaphoreNew()-- the second call to SemaphoreNew()-- the resulting placement-new in TaskScheduler::New() is what's segfaulting. Even though posix_memalign() returned a "success" error code of 0, and the pointer to memory is non-null.

Small tweaks to repository

  • The "example folder" link in the readme is broken, it still search for a folder that no longer exist
  • There's a small typo here (cab instead of can, nothing dangerous):
    // Subclass IPinnedTask to create tasks which cab be run on a given thread only.

Valgrind errors on OSX High Sierra

Hi Doug,
I ran my image codec, grok, which uses enkiTS, on OSX with valgrind, and I see this
error:

==3380== Process terminating with default action of signal 11 (SIGSEGV)
==3380==  Access not within mapped region at address 0x18
==3380==    at 0x100CC05BA: _pthread_body (in /usr/lib/system/libsystem_pthread.dylib)
==3380==    by 0x100CC050C: _pthread_start (in /usr/lib/system/libsystem_pthread.dylib)
==3380==    by 0x100CBFBF8: thread_start (in /usr/lib/system/libsystem_pthread.dylib)
==3380==  If you believe this happened as a result of a stack
==3380==  overflow in your program's main thread (unlikely but
==3380==  possible), you can try to increase the size of the
==3380==  main thread stack using the --main-stacksize= flag.
==3380==  The main thread stack size used in this run was 8388608.

This is for an earlier version of enkiTS, as the latest version will result in BAD_ACCESS error and my program crashes.

Have you run any valgrind tests on OSX ? Everything looks good on Linux.

Thanks.

Inconsistent behavior on ARM architecture

Hello!

I've been successfully using enkiTS on different platforms based on x86_64 and it works just fine. After I ported my project on ARM (NVIDIA Tegra) I've noticed inconsistent behavior. I tried to run the examples on the target and TasksThroughput either deadlocks or reports wrong check count.
It appears to be an issue related to memory reordering, has this library been used/tested on ARM before?

Thanks

Could you state the purpose of this library more clearly?

Is this an alternative to Intel TBB, firing multiple threads with different priorities for different tasks?

What problems does it solve?

P.S. For me the only reason for using some custom scheduling system is to reduce latency for very small tasks (because yield tends to switch off the thread for 5-60ms under heavy load from other threads).

Unexpected behaviour with ITaskSet MinRange

Hi Doug,

hopefully I'm not misunderstanding the intent of MinRange, but I found that it doesn't behave as expected.
There's a comment saying that the last partition will be smaller than MinRange if SetSize is not a multiple of MinRange, but this is only true if MinRange is larger than NumPartitions of the TaskScheduler.
So you will get partitions smaller than MinRange even if SetSize is a multiple of MinRange.

Furthermore you often (most of the time?) want partitions that are a multiple of MinRange e.g. for alignment reasons.

One way of fixing this is to align NumPartitions and NumInitialPartitions up to the next multiple of MinRange in AddTaskSetToPipe().
So you'd have something like this:

diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp
index 187673a..7b1e394 100644
--- a/src/TaskScheduler.cpp
+++ b/src/TaskScheduler.cpp
@@ -327,16 +327,19 @@ void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_,
        WakeThreads();
 }
 
+#define IS_POW2(n) ((((n) & ((n) - 1)) == 0) && ((n) != 0))
+#define ALIGN_UP(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
 void    TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
 {
+    assert(IS_POW2(pTaskSet->m_MinRange);
        // set running count to -1 to guarantee it won't be found complete until all subtasks added
     pTaskSet->m_RunningCount = -1;
 
     // divide task up and add to pipe
-    pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions;
+    pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / ALIGN_UP(m_NumPartitions, pTaskSet->m_MinRange);
     if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; }
 
-       uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions;
+       uint32_t rangeToSplit = pTaskSet->m_SetSize / ALIGN_UP(m_NumInitialPartitions, pTaskSet->m_MinRange);
        if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; }
 
     SubTaskSet subTask;

This way things would work as before when MinRange is 1 and as long as SetSize is a multiple of MinRange so will be the partitions.
If MinRange is a power of 2 then this doesn't add much overhead, which is an acceptable restriction for me, not sure it is for others though.

What do you think, is there a better way of dealing with this?

Occasional crash when waiting on task that's stored as a local variable

Simple repo case:

#include "TaskScheduler.h"
#include "Timer.h"

#include <stdio.h>
#include <inttypes.h>
#include <assert.h>

#ifndef _WIN32
    #include <string.h>
#endif

using namespace enki;

TaskScheduler g_TS;

std::atomic<int> count = 0;

struct TestTask : ITaskSet
{
    void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) override
    {
        count++;
        (void)range;
        (void)threadnum;
    }
};

void func()
{
    TestTask t;
    g_TS.AddTaskSetToPipe(&t);
    g_TS.WaitforTask(&t);
}

int main(int argc, const char * argv[])
{
    g_TS.Initialize();

    while (true) {
        func();
    }

    return 0;

}

This will eventually lead to a crash with the following callstack

>	Dependencies.exe!enki::TaskScheduler::TaskComplete(enki::ICompletable * pTask_, bool bWakeThreads_, unsigned int threadNum_) Line 450	C++
 	Dependencies.exe!enki::TaskScheduler::TryRunTask(unsigned int threadNum_, unsigned int priority_, unsigned int & hintPipeToCheck_io_) Line 436	C++
 	Dependencies.exe!enki::TaskScheduler::TryRunTask(unsigned int threadNum_, unsigned int & hintPipeToCheck_io_) Line 378	C++
 	Dependencies.exe!enki::TaskScheduler::TaskingThreadFunction(const enki::ThreadArgs & args_) Line 237	C++

at this point:

    Dependency* pDependent = pTask_->m_pDependents;
    while( pDependent )
    {
        int prevDeps = pDependent->pTaskToRunOnCompletion->m_DependenciesCompletedCount.fetch_add( 1, std::memory_order_release );

It seems to be easier to trigger the crash by running with a lower thread count (with 4 threads, I hit the crash in 10-15 seconds), and running in debug.

Tasks are dispatched with invalid ranges

See the attached code for a minimal repro. I am new to this API so if I am making an obvious mistake, please do let me know. This bug appears to occur reliably when TASK_COUNT >= 50 and when TASK_COUNT is 5000 it will always happen.

Context / use case: I am developing a terrain streaming system. On world load I spawn about 5000~ tasks which procedurally generate each terrain cell's heightmap, then another 5000~ with various dependencies to generate the meshes.

#include "TaskScheduler.h"

int main(int, char**)
{
    static enki::TaskScheduler s_scheduler;
    s_scheduler.Initialize();

    static constexpr size_t TASK_RANGE = 65*65;
    static constexpr size_t TASK_COUNT = 50;

    struct Repro : public enki::ITaskSet
    {
        Repro() : enki::ITaskSet(TASK_RANGE) {};
        virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t) override
        {
            if (range.start > TASK_RANGE || range.end > TASK_RANGE)
            {
                __debugbreak(); // bug?!
            }
        }
    };

    std::vector<std::unique_ptr<Repro>> jobs;

    for (size_t i = 0; i < TASK_COUNT; ++i)
    {
        jobs.emplace_back(std::make_unique<Repro>());
    }

    for (std::unique_ptr<Repro>& job : jobs)
    {
        s_scheduler.AddTaskSetToPipe(job.get());
    }

    s_scheduler.WaitforAll();
}

From a preliminary search of the library code, the problem appears to occur only when the task buffer is full and it must be executed immediately. The code does not properly handle ranges that are not divisible by the range step, so what should be the final step overruns into an infinite loop.

https://github.com/dougbinks/enkiTS/blob/master/src/TaskScheduler.cpp#L656

I have removed this block - it doesn't make sense to me, if we were simply executing the same job immediately because there was no room, why would we have to tweak the range that we've already calculated? This fixed the problem for me. I did verify that every element of the range was being generated with the following test:

struct Repro : public enki::ITaskSet
{
    std::vector<int> data;
    Repro() : enki::ITaskSet(TASK_RANGE) { data.resize(TASK_RANGE); };
    virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t) override
    {
        if (range.start > TASK_RANGE || range.end > TASK_RANGE)
        {
            __debugbreak(); // bug?!
        }

        for (size_t i = range.start; i < range.end; ++i)
        {
            data[i] = 1;
        }
    }
};

... later

for (std::unique_ptr<Repro>& job : jobs)
{
    int count = 0;
    for (size_t i = 0; i < TASK_RANGE; ++i)
    {
        count += job->data[i];
    }
    ASSERT(count == TASK_RANGE);
}

Minor memory leak

Hey, just noticed this using the _CrtMemCheckpoint/_CrtMemDifference/_CrtDumpMemoryLeaks - in C++11 version, the std::thread objects are created on the heap in TaskScheduler.cpp line 148:

m_pThreads[thread] = new std::thread( TaskingThreadFunction, m_pThreadNumStore[thread] );

but there's no corresponding delete. I've added it to StopThreads just before m_NumThreads = 0, updated function:

void TaskScheduler::StopThreads( bool bWait_ )
{
    if( m_bHaveThreads )
    {
        // wait for them threads quit before deleting data
        m_bRunning = 0;
        while( bWait_ && m_NumThreadsRunning )
        {
            // keep firing event to ensure all threads pick up state of m_bRunning
           m_NewTaskEvent.notify_all();
        }

        for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
        {
            m_pThreads[thread]->join();
            delete m_pThreads[thread];
        }

	m_NumThreads = 0;
        delete[] m_pThreadNumStore;
        delete[] m_pThreads;
        m_pThreadNumStore = 0;
        m_pThreads = 0;

        m_bHaveThreads = false;
	m_NumThreadsActive = 0;
	m_NumThreadsRunning = 0;
    }
}

->join is needed to prevent asserts. Also, maybe m_pThreads could be just * instead of **, no need to create each std::thread object separately on the heap?

std::thread* m_pThreads;
....
m_pThreads[thread] = std::thread( TaskingThreadFunction, m_pThreadNumStore[thread] );

should work too? I'm not 100% sure, my c++11-fu isn't that great :)

Correctness of TaskScheduler::WakeThreads

Hello.

Minor problem: as far as I can understand, there is a race condition between checking for tasks and incrementing m_NumThreadsWaiting in TaskScheduler::WaitForTasks.

  1. [worker thread] TaskScheduler::WaitForTasks is called when there are no tasks.
  2. [another thread] TaskScheduler::SplitAndAddTask from another thread adds the task, and calls the TaskScheduler::WakeThreads.
  3. [another thread] TaskScheduler::WakeThreads checks the m_NumThreadsWaiting and does not signal event.
  4. [worker thread] No tasks are found, worker thread increments m_NumThreadsWaiting. EventWait is called, sleeping the thread until EventSignal is called next time, even when the thread could be processing the added task.

In my opinion, the best way to fix this would be

  1. Changing the Event interface to Semaphore interface: SemaphorePublish(int count) + SemaphoreWait().
  2. Changing the TaskScheduler::WaitForTasks so that it double-checks the pipes after incrementing m_NumThreadsWaiting.

This algorithm is very similar to benaphore (atomic counter + kernel semaphore) described in http://preshing.com/20150316/semaphores-are-surprisingly-versatile/

I will happily a pull-request with a fix if you'd like. Notice that this also fixes the #12

Dynamically allocating tasks

Short version:
ICompletable, ITaskSet & IPinnedTask are missing virtual destructors, which can be problematic if allocating tasks dynamically on the heap.

Detailed version:
The following pseudocode shows how the program might crash due to missing Virtual destructors.
The offending line of code is commented with CRASH

enki::TaskScheduler jobSystem;
 
class MyTask1 : public ITaskSet { ...}
class MyTask2 : public ITaskSet { ...}
class MyPinnedTask : public IPinnedTask { ...}
 
// An array where I store all my running tasks
myArray<ICompletable*> runningTasks;
 
// I allocate my tasks on the main thread
MyTask1* task = new MyTask1;
jobSystem.AddTaskSetToPipe(task);

// And store the tasks in an array to be able to check them for completness later
runningTasks.push_back(task);                         
 
// Once every frame I loop through my array of tasks and delete the ones that are completed:
void GameUpdate()
{
    for(int i=0; i<runningTasks.size(); i++)
    {
       ICompletable* job = runningTasks[i];
       if (job->GetIsComplete())
       {
           runningTasks.remove(i);
           delete job; // CRASH
       }
    }
}

Problem understanding how to submit a task

Once i submit a task, an exception appear.
I noticed you init m_NumPartitions at 0 here.

, m_NumPartitions(0)

The problem then is basically here

pTaskSet_->m_RangeToRun = pTaskSet_->m_SetSize / m_NumPartitions;

because there's a division by zero

I'm creating the scheduler just doing

g_TS.Initialize();

And then i submit a task to it doing

g_TS.AddTaskSetToPipe(&task)

So why that value is 0? Do i did something wrong?

GCC warnings

GCC complains that size_, userData_ etc are never used in this method:

ENKITS_API void  enki::DefaultFreeFunc(  void* ptr_,   size_t size_, void* userData_, const char* file_, int line_ )
{
#ifdef _WIN32
    _aligned_free( ptr_ );
#else
    free( ptr_ );
#endif
};

Not a big deal, but the fewer warnings the better :)

CACHE_LINE_SIZE name clashes with macro from major console SDK

Hi Doug,

I updated enkiTS to 1.4 and the new constant CACHE_LINE_SIZE ends up clashing with a macro defined in the SDK of a famous video game console which I cannot name due to a NDA.

As you target game development with enkiTS I thought you might like to know this.

Personally I see 3 ways to deal with this:

  1. Dumb arbitrary rename, say to SIZE_OF_CACHE_LINE
  2. Prefix all macro and macro-like names with ENKI_
  3. Use CamelCase for all your non-macro symbols

On an unrelated note, I would also recommend that you move all your variables into your enki namespace.

Regards,
Bruno

Fails to compile in XCode/iOS with precompiled headers enabled.

Hey,
I've been using enkiTS for a while now and I've really enjoying it. After using it on Win32, it worked on Android/Clang with only 1 unused variable when _DEBUG is not defined int err = sem_init( &semaphoreid.sem, 0, 0 ); Happy to open another issue / PR for this.

I then went ahead and moved it over to iOS and ran into this Issue.
I understand this is avoidable by adding enkiTS as its own lib/project in xcode, which I plan to do, however I wanted to bring this to your attention either way.
when precompiled headers are turned on and the precompiled header happens to have malloc.h included, the compiler throws this error:

/enkits/TaskScheduler.cpp:87:9: error: expected identifier
        THREAD_STATE_NONE,                  // shouldn't get this value
        ^
In file included from /precompiled-header.pch:14:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/malloc/malloc.h:28:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/mach_types.h:87:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/exception_types.h:182:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/thread_status.h:76:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/machine/thread_status.h:33:
/Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/arm/thread_status.h:55:34: note: expanded from macro 'THREAD_STATE_NONE'
#define THREAD_STATE_NONE        5

For me to get around it I renamed THREAD_STATE_NONE however you could also undefine THREAD_STATE_NONE when __APPLE__ is defined since TaskScheduler.cpp doesn't use it.

Correctness of event implementation in POSIX systems

Hello, thank you for your library!

Minor problem: as far as I can understand, the current implementation of EventSignal/EventWait on POSIX systems allows for a race condition:

  1. [worker thread] TaskScheduler::WaitForTasks is called when there are no tasks.
  2. [worker thread] No tasks are found, worker thread increments m_NumThreadsWaiting.
  3. [another thread] TaskScheduler::SplitAndAddTask from another thread adds the task, and calls the TaskScheduler::WakeThreads.
  4. [another thread] TaskScheduler::WakeThreads checks the m_NumThreadsWaiting and calls the EventSignal, effectively calling pthread_cond_broadcast (btw, it should rather be called with mutex unlocked). No threads are waiting on the cond, therefore no threads are waked.
  5. [worker thread] EventWait is called, sleeping the thread until EventSignal is called next time, even when the thread could be processing the added task.

There are two ways to fix this behavior: either add a "signaled" variable to the event, which will be checked and modified with mutex locked or anonymous semaphores should be used instead (http://man7.org/linux/man-pages/man3/sem_init.3.html with pshared = 0).

The latter is generally a preferred way at least on Linux and OS X, as it is both simpler, faster and matches the Windows impl.

I will prepare a pull-request with a fix if you'd like.

error in link to dependencies example

Hi Doug,

In your README file, you are referencing Dependency.cpp/_c.c but the files are named Dependencies.cpp/_c.c so the links are broken.

I am not sure which you wanted so I will let you choose :)

[Question] std::future

What is the reason for not returning a std::future (C++11) upon adding a task instead of having to call WaitforTask?

Occasional performance spikes in SetEvent

Using Microprofile on Windows, I noticed that SetEvent can occasionally take longer than usual. enkiTS calls it to wake up the worker threads in AddTaskSetToPipe.
SetEvent usually blocks for less than 1ms, but I've seen spikes way up in the 20s of ms, which is a problem if the game waits on a task which is blocked by AddTaskSetToPipe. It ends up producing noticable frame spikes every few seconds.
By changing the Event Object to auto-reset in EventCreate (https://msdn.microsoft.com/en-us/library/windows/desktop/ms682655(v=vs.85).aspx) these spikes disappear. However, this will only wake up 1 thread at a time, and may decrease thread utilization.
I could also avoid the issue by increasing the spin count, however this of course increases power consumption.
This is probably not a big issue if the threads rarely wait, so it depends on the workload of the scheduler, as well as the number of cores in use. Maybe auto-reset mode should be an option?

WaitforAll() and external threads

WaitForAll() adds a dummy pinned task and then waits:

WaitforTask( &dummyWaitTask );

Sometimes WaitForAll() won't awake.

Not sure if it is due to recent changes, but I have never experienced this problem until today
when I merged the master into my fork.

It seems when this happens the thread we're pinning the dummy task to, is an external thread (added via RegisterExternalTaskThread()). In my case this was the dedicated rendering thread - it handles main thread requests and sleeps, never running tasks explicitly, so there's no guarantee it will run the dummy task at all.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.