dougbinks / enkits Goto Github PK
View Code? Open in Web Editor NEWA permissively licensed C and C++ Task Scheduler for creating parallel programs. Requires C++11 support.
License: zlib License
A permissively licensed C and C++ Task Scheduler for creating parallel programs. Requires C++11 support.
License: zlib License
Hi! First off thanks for this lib.
On to the issue. I have index buffers which I want to manipulate, therefore I'm operating in triplets of consecutive cells in the flat index buffer. I'm failing though to manage the ranges.
How would you go about it?
Hello,
in the C++11 branch there is a small oversight. In the constructor of TaskScheduler.cpp m_pPinnedTaskListPerThread
is not initialized with NULL
. When then calling the Initialze()
method the delete[]
operator will try to delele m_pPinnedTaskListPerThread
(whatever it points to...) and it will crash on runtime when compiled under Linux. MSVC (Visual Studio 15.8.6) will ignore? this (it runs at least without crashing).
On the master Branch m_pPinnedTaskListPerThread
ist correctly set to NULL
in the constructor.
I would normally suggest (for the C++11 branch) to switch to std::unique_ptr
but I understand if you do not want too much divergence in the implementations between the master and C++11 branch.
Thanks for this great and easy to use library.
Best wishes,
Fred
When adding more than (1<<PIPESIZE_LOG2) elements using AddTaskSetToPipe, once the pipe is full it will start executing subtasks inplace but if the other worker threads are asleep, they will not be awaken.
My simple fix was to add
// wake up threads early
if( ( numAdded % 128 ) == 0 )
{
if( m_NumThreadsActive.load( std::memory_order_relaxed ) < m_NumThreadsRunning.load( std::memory_order_relaxed ) )
{
m_NewTaskEvent.notify_all( );
}
}
just after the if( !m_pPipesPerThread[gtl_threadNum].WriterTryWriteFront(...) ) { ... } block.
Nice library!
Suppose I have a task task123
that can only execute when task1
, task2
and task3
have completed. Can enkiTS support this work flow ?
May I ask if you are planning on adding events in the near future? If you are, I would really like to hear about how you plan on designing them. If not, I would be interested in adding them as a PR.
My motivation is: I am designing a completely asynch image decompressor, so I need to connect up different task sets with events, so that the completion of one task set causes a waiting set to enqueue itself.
Interested to hear your plans for this feature.
I noticed that the return codes for semaphore creation etc. aren't being checked for errors. So, create may fail and caller doesn't get notified. What do you think is the best way of handling error conditions: throw an exception, or return false ? Thanks!
I have been testing latest master (4f9941b).
ThreadSanitizer, enabled under XCode 11.0, is reporting some data races when running unmodified samples.
I am reporting the output of one Data race report for ParallelSum
as an example.
==================
WARNING: ThreadSanitizer: data race (pid=56086)
Read of size 4 at 0x7ffeefbff49c by thread T4:
#0 enki::TaskScheduler::TryRunTask(unsigned int, unsigned int, unsigned int&) TaskScheduler.cpp:412 (ParallelSum:x86_64+0x100007204)
#1 enki::TaskScheduler::TryRunTask(unsigned int, unsigned int&) TaskScheduler.cpp:377 (ParallelSum:x86_64+0x1000050d0)
#2 enki::TaskScheduler::TaskingThreadFunction(enki::ThreadArgs const&) TaskScheduler.cpp:236 (ParallelSum:x86_64+0x100004e04)
#3 decltype(std::__1::forward<void (*)(enki::ThreadArgs const&)>(fp)(std::__1::forward<enki::ThreadArgs>(fp0))) std::__1::__invoke<void (*)(enki::ThreadArgs const&), enki::ThreadArgs>(void (*&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) type_traits:4361 (ParallelSum:x86_64+0x10000d06d)
#4 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs>&, std::__1::__tuple_indices<2ul>) thread:342 (ParallelSum:x86_64+0x10000ceb1)
#5 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (*)(enki::ThreadArgs const&), enki::ThreadArgs> >(void*) thread:352 (ParallelSum:x86_64+0x10000bf09)
Previous write of size 4 at 0x7ffeefbff49c by main thread:
#0 enki::ITaskSet::ITaskSet() TaskScheduler.h:122 (ParallelSum:x86_64+0x100003288)
#1 ParallelReductionSumTaskSet::ParallelReductionSumTaskSet(unsigned int) ParallelSum.cpp:81 (ParallelSum:x86_64+0x100003ba8)
#2 ParallelReductionSumTaskSet::ParallelReductionSumTaskSet(unsigned int) ParallelSum.cpp:82 (ParallelSum:x86_64+0x100002e04)
#3 main ParallelSum.cpp:146 (ParallelSum:x86_64+0x100002390)
Location is stack of main thread.
Thread T4 (tid=3398714, running) created by main thread at:
#0 pthread_create <null>:2673040 (libclang_rt.tsan_osx_dynamic.dylib:x86_64h+0x2aa2d)
#1 std::__1::__libcpp_thread_create(_opaque_pthread_t**, void* (*)(void*), void*) __threading_support:328 (ParallelSum:x86_64+0x10000be4e)
#2 std::__1::thread::thread<void (&)(enki::ThreadArgs const&), enki::ThreadArgs, void>(void (&&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) thread:368 (ParallelSum:x86_64+0x10000ba71)
#3 std::__1::thread::thread<void (&)(enki::ThreadArgs const&), enki::ThreadArgs, void>(void (&&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) thread:360 (ParallelSum:x86_64+0x100006238)
#4 enki::TaskScheduler::StartThreads() TaskScheduler.cpp:298 (ParallelSum:x86_64+0x100005901)
#5 enki::TaskScheduler::Initialize(unsigned int) TaskScheduler.cpp:924 (ParallelSum:x86_64+0x10000a687)
#6 main ParallelSum.cpp:136 (ParallelSum:x86_64+0x10000231a)
SUMMARY: ThreadSanitizer: data race TaskScheduler.cpp:412 in enki::TaskScheduler::TryRunTask(unsigned int, unsigned int, unsigned int&)
==================
ThreadSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
This is reporting that reading subTask.pTask->m_RangeToRun
is a data race
bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ )
{
// ...
if( subTask.pTask->m_RangeToRun < partitionSize )
{
SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun );
}
When declaring ParallelSumTaskSet m_ParallelSumTaskSet;
inside struct ParallelReductionSumTaskSet
struct ParallelReductionSumTaskSet : ITaskSet
{
ParallelSumTaskSet m_ParallelSumTaskSet;
uint64_t m_FinalSum;
ParallelReductionSumTaskSet( uint32_t size_ ) : m_ParallelSumTaskSet( size_ ), m_FinalSum(0)
{
m_ParallelSumTaskSet.Init( g_TS.GetNumTaskThreads() );
}
virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ )
{
g_TS.AddTaskSetToPipe( &m_ParallelSumTaskSet );
g_TS.WaitforTask( &m_ParallelSumTaskSet );
for( uint32_t i = 0; i < m_ParallelSumTaskSet.m_NumPartialSums; ++i )
{
m_FinalSum += m_ParallelSumTaskSet.m_pPartialSums[i].count;
}
}
}
will initialize ParallelSumTaskSet::m_RangeToRun
in the constructor:
class ITaskSet : public ICompletable
{
public:
ITaskSet()
: m_SetSize(1)
, m_MinRange(1)
, m_RangeToRun(1)
{}
};
I am not expert on the field, but it looks like a potential false positive, because TryRunTask
is executed only after AddTaskSetToPipe
.
I try to keep our software clean from all sanitizer reports, so that I can catch real bugs ;)
For this reason if this or other reports look safe, I suggest to add annotations that disable TSAN where appropriate (using no_sanitize("thread")
).
Does it make sense for me to report all data races found by the TSAN output?
Oh, and thanks for your excellent work on the library :)
Is anyone using the C++98 backwards compatibility branch?
Some of the recent and future changes are going to take a fair amount of time to port, which preferably I'd like to avoid. So I'm considering deprecating the branch.
enkiCreateTaskScheduler -> enkiNewTaskScheduler
in the C doc
/projects/enkiTS/src/TaskScheduler.cpp:38:8: error: thread-local storage is unsupported for the current target
static thread_local uint32_t gtl_threadNum = 0;
^
1 error generated.
Based off of what I read here, http://stackoverflow.com/questions/23791060/c-thread-local-storage-clang-503-0-40-mac-osx , I was able to get it to compile by changing thread_local to __thread .
I think there is a problem with TaskScheduler::WakeThreadsForNewTasks()
and pinned tasks.
Consider a possible case: what if the number of suspended threads - those waiting the m_pNewTaskSemaphore
to be signalled - increases just before the SemaphoreSignal()
called, i.e. the value of waiting
was not accurate as some threads fell asleep between the check and the signal. In this case some task threads would idle.
Most of the time it's not a big deal, those threads would awake when next task arrives.
(Not sure if it possible, but even if we are so unlucky and all the tasks threads fall asleep just after the check - the calling thread would handle the task itself.)
Now, when using AddPinnedTask()
there's a subtle chance that the thread we pinned the task to was suspended as described above:
AddPinnedTask()
.m_NumThreadsWaitingForNewTasks
check.WaitforTask()
and hangs as the thread the task is pinned to can't handle the request.This is the problem I ran into while trying to port my code to enkiTS.
Though to be honest I'm not quite sure if it indeed the case and if my assumption is accurate. Parallel programming is hard.
Would you like to wrap any pointer data members with the class template “std::unique_ptr”?
Would it be useful to have an Initialized
flag, and log warning if user tries to call methods without having initialized ?
This was covered in #38, but I don't see how to accomplish it. When submitting ~20 tasks on an 8 thread machine, AddTestSetToPipe will stall the main thread, consistent with its documentation:
// Adds the TaskSet to pipe and returns if the pipe is not full.
// If the pipe is full, pTaskSet is run.
// should only be called from main thread, or within a task
ENKITS_API void AddTaskSetToPipe( ITaskSet* pTaskSet_ );
I need the main thread to never stall, and AddTastkSetToPipe() to always return immediately. I feel like there's probably a simple solution to this (as you imply in #38) but I don't see it yet.
Not sure if this is a false positive, but:
==25938== Conditional jump or move depends on uninitialised value(s)
==25938== at 0x48C4973: enki::TaskScheduler::WakeSuspendedThreadsWithPinnedTasks() (TaskScheduler.cpp:558)
==25938== by 0x48C43E4: enki::TaskScheduler::WaitForNewTasks(unsigned int) (TaskScheduler.cpp:456)
==25938== by 0x48C3953: enki::TaskScheduler::TaskingThreadFunction(enki::ThreadArgs const&) (TaskScheduler.cpp:234)
==25938== by 0x48C7AB9: void std::__invoke_impl<void, void (*)(enki::ThreadArgs const&), enki::ThreadArgs>(std::__invoke_other, void (*&&)(enki::ThreadArgs const&), enki::ThreadArgs&&) (invoke.h:60)
Hi! I have been using EnkiTS in a game engine and it was extremely easy to set up and use! I have an array of say 500 objects that I want to call a function on, I have a total of 16 worker threads (including the main thread I guess) but when I benchmark it it seems to be about 50% slower than running it single threaded. This was worse than expectations so I decided to put a breakpoint inside of the lambda scope and I noticed that range.start and range.end was 4 objects apart. So it seems like EnkiTS split it into 125 chunks of work. I would prefer to try to split this into fewer, larger chunks to get better performance out of it. How would I go about doing this? Ideally I think I would like to have WORKER_THREADS * 1 or 2 chunks of work.
Here's the code inside of my lambda scope
[&component_list_pair, delta_time](enki::TaskSetPartition range, uint32_t threadnum)
{
for(auto i : Range(range.start, range.end))
{
component_list_pair.second[i]->ParallelUpdate(delta_time);
}
}
do enkiTS support timed scheduling.
for eaxample schedule a task to run at specific date time in future?
enkiTS is an excellent library to serve my project, which is running multiple OpenGL contexts on multiple GPUs.
In my design, master thread is running light-weighted workloads, which means it should never be scheduled to run any ITaskSet
tasks.
Is it possible?
Hi.
I wrote some simple test code for pinned task. After it runs a few seconds, it hit the assert:
Assertion failed: pTailPlus1->pNext, file E:\Source\enkiTS-master\src\LockLessMultiReadPipe.h, line 275
Here is my code:
TaskScheduler g_TS;
std::atomic_uint32_t foo = 0;
struct PinnedTaskHelloWorld : IPinnedTask
{
PinnedTaskHelloWorld()
: IPinnedTask(1) // set pinned thread to 1
{}
virtual void Execute()
{
printf("This will run on the thread 1, %d\n", foo++);
}
};
PinnedTaskHelloWorld task[100];
int main(int argc, const char * argv[])
{
g_TS.Initialize(2);
while (true)
{
for (int i = 0; i < 100; ++i)
{
g_TS.AddPinnedTask(&task[i]);
if (i == 50)
{
g_TS.WaitforAll();
}
}
}
return 0;
}
Did I got something wrong ? I'm on VS2019 x64 build.
Thanks.
From this line
memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));
TaskScheduler.cpp:525:64: warning: ‘void* memset(void*, int, size_t)’ clearing an object of type ‘struct enki::ProfilerCallbacks’ with no trivial copy-assignment; use assignment or value-initialization instead [-Wclass-memaccess]
memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));
Hi,
It looks like all WaitFor* methods in enkiTS is busy waiting. In some situations they will result in long spinning time. So what is the preferred/proper way to use sleep-waiting for tasks finishing in enkiTS ?
enkiCreateTaskSet needs a reciprocal delete function.
Hi Doug,
enkiTS does not allow running tasks or waiting for completion from threads other than main/task threads, as I understood.
For example, I would like to be able to use the system from rendering thread, which itself is not a task thread, but a full-fledged thread typically running in parallel with the main one. Or from background loading thread which is mostly idle waiting for IO, but uses tasks to decompress/finalize assets.
It could be cool if enkiTS was able to support that. What do you think? Thanks!
-- Aleksei
I can use it to bind multiple events at once. Do they not affect each other?
I have an encoder and decoder that both are using the api, but they are running in a video pipeline, so encoding and decoding are happening concurrently. Is there a way of safely using the api in this situation ?
This is almost certainly an issue on Android's side, not yours, but I wanted to at least bring things to attention. In enki::DefaultAllocFunc(), non-Win32 programs use posix_memalign(). Our 64-bit Android app is segfaulting at launch, and it seems that the call to posix_memalign() is involved in whatever's going wrong. If we replace that with a call to plain malloc(), our app carries along running "fine". By "fine", I mean this isn't significantly tested or shipped beyond my local build going from segfault-at-launch to looking like all's well.
Like I said, probably an Android 11 beta issue-- which I'm testing on a Pixel phone-- but at least wanted to make sure you were aware.
Edit: correction/clarification. It's not the call to posix_memalign()
itself that's segfaulting. When TaskScheduler::StartThreads()
runs m_pTaskCompleteSemaphore = SemaphoreNew()
-- the second call to SemaphoreNew()-- the resulting placement-new in TaskScheduler::New() is what's segfaulting. Even though posix_memalign()
returned a "success" error code of 0, and the pointer to memory is non-null.
Line 147 in 175238f
Hi Doug,
I ran my image codec, grok, which uses enkiTS, on OSX with valgrind, and I see this
error:
==3380== Process terminating with default action of signal 11 (SIGSEGV)
==3380== Access not within mapped region at address 0x18
==3380== at 0x100CC05BA: _pthread_body (in /usr/lib/system/libsystem_pthread.dylib)
==3380== by 0x100CC050C: _pthread_start (in /usr/lib/system/libsystem_pthread.dylib)
==3380== by 0x100CBFBF8: thread_start (in /usr/lib/system/libsystem_pthread.dylib)
==3380== If you believe this happened as a result of a stack
==3380== overflow in your program's main thread (unlikely but
==3380== possible), you can try to increase the size of the
==3380== main thread stack using the --main-stacksize= flag.
==3380== The main thread stack size used in this run was 8388608.
This is for an earlier version of enkiTS, as the latest version will result in BAD_ACCESS error and my program crashes.
Have you run any valgrind tests on OSX ? Everything looks good on Linux.
Thanks.
Hello!
I've been successfully using enkiTS
on different platforms based on x86_64
and it works just fine. After I ported my project on ARM
(NVIDIA Tegra) I've noticed inconsistent behavior. I tried to run the examples on the target and TasksThroughput
either deadlocks or reports wrong check count.
It appears to be an issue related to memory reordering, has this library been used/tested on ARM
before?
Thanks
Is this an alternative to Intel TBB, firing multiple threads with different priorities for different tasks?
What problems does it solve?
P.S. For me the only reason for using some custom scheduling system is to reduce latency for very small tasks (because yield tends to switch off the thread for 5-60ms under heavy load from other threads).
Hi Doug,
hopefully I'm not misunderstanding the intent of MinRange, but I found that it doesn't behave as expected.
There's a comment saying that the last partition will be smaller than MinRange if SetSize is not a multiple of MinRange, but this is only true if MinRange is larger than NumPartitions of the TaskScheduler.
So you will get partitions smaller than MinRange even if SetSize is a multiple of MinRange.
Furthermore you often (most of the time?) want partitions that are a multiple of MinRange e.g. for alignment reasons.
One way of fixing this is to align NumPartitions and NumInitialPartitions up to the next multiple of MinRange in AddTaskSetToPipe().
So you'd have something like this:
diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp
index 187673a..7b1e394 100644
--- a/src/TaskScheduler.cpp
+++ b/src/TaskScheduler.cpp
@@ -327,16 +327,19 @@ void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_,
WakeThreads();
}
+#define IS_POW2(n) ((((n) & ((n) - 1)) == 0) && ((n) != 0))
+#define ALIGN_UP(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
{
+ assert(IS_POW2(pTaskSet->m_MinRange);
// set running count to -1 to guarantee it won't be found complete until all subtasks added
pTaskSet->m_RunningCount = -1;
// divide task up and add to pipe
- pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions;
+ pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / ALIGN_UP(m_NumPartitions, pTaskSet->m_MinRange);
if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; }
- uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions;
+ uint32_t rangeToSplit = pTaskSet->m_SetSize / ALIGN_UP(m_NumInitialPartitions, pTaskSet->m_MinRange);
if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; }
SubTaskSet subTask;
This way things would work as before when MinRange is 1 and as long as SetSize is a multiple of MinRange so will be the partitions.
If MinRange is a power of 2 then this doesn't add much overhead, which is an acceptable restriction for me, not sure it is for others though.
What do you think, is there a better way of dealing with this?
Simple repo case:
#include "TaskScheduler.h"
#include "Timer.h"
#include <stdio.h>
#include <inttypes.h>
#include <assert.h>
#ifndef _WIN32
#include <string.h>
#endif
using namespace enki;
TaskScheduler g_TS;
std::atomic<int> count = 0;
struct TestTask : ITaskSet
{
void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) override
{
count++;
(void)range;
(void)threadnum;
}
};
void func()
{
TestTask t;
g_TS.AddTaskSetToPipe(&t);
g_TS.WaitforTask(&t);
}
int main(int argc, const char * argv[])
{
g_TS.Initialize();
while (true) {
func();
}
return 0;
}
This will eventually lead to a crash with the following callstack
> Dependencies.exe!enki::TaskScheduler::TaskComplete(enki::ICompletable * pTask_, bool bWakeThreads_, unsigned int threadNum_) Line 450 C++
Dependencies.exe!enki::TaskScheduler::TryRunTask(unsigned int threadNum_, unsigned int priority_, unsigned int & hintPipeToCheck_io_) Line 436 C++
Dependencies.exe!enki::TaskScheduler::TryRunTask(unsigned int threadNum_, unsigned int & hintPipeToCheck_io_) Line 378 C++
Dependencies.exe!enki::TaskScheduler::TaskingThreadFunction(const enki::ThreadArgs & args_) Line 237 C++
at this point:
Dependency* pDependent = pTask_->m_pDependents;
while( pDependent )
{
int prevDeps = pDependent->pTaskToRunOnCompletion->m_DependenciesCompletedCount.fetch_add( 1, std::memory_order_release );
It seems to be easier to trigger the crash by running with a lower thread count (with 4 threads, I hit the crash in 10-15 seconds), and running in debug.
See the attached code for a minimal repro. I am new to this API so if I am making an obvious mistake, please do let me know. This bug appears to occur reliably when TASK_COUNT
>= 50 and when TASK_COUNT
is 5000 it will always happen.
Context / use case: I am developing a terrain streaming system. On world load I spawn about 5000~ tasks which procedurally generate each terrain cell's heightmap, then another 5000~ with various dependencies to generate the meshes.
#include "TaskScheduler.h"
int main(int, char**)
{
static enki::TaskScheduler s_scheduler;
s_scheduler.Initialize();
static constexpr size_t TASK_RANGE = 65*65;
static constexpr size_t TASK_COUNT = 50;
struct Repro : public enki::ITaskSet
{
Repro() : enki::ITaskSet(TASK_RANGE) {};
virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t) override
{
if (range.start > TASK_RANGE || range.end > TASK_RANGE)
{
__debugbreak(); // bug?!
}
}
};
std::vector<std::unique_ptr<Repro>> jobs;
for (size_t i = 0; i < TASK_COUNT; ++i)
{
jobs.emplace_back(std::make_unique<Repro>());
}
for (std::unique_ptr<Repro>& job : jobs)
{
s_scheduler.AddTaskSetToPipe(job.get());
}
s_scheduler.WaitforAll();
}
From a preliminary search of the library code, the problem appears to occur only when the task buffer is full and it must be executed immediately. The code does not properly handle ranges that are not divisible by the range step, so what should be the final step overruns into an infinite loop.
https://github.com/dougbinks/enkiTS/blob/master/src/TaskScheduler.cpp#L656
I have removed this block - it doesn't make sense to me, if we were simply executing the same job immediately because there was no room, why would we have to tweak the range that we've already calculated? This fixed the problem for me. I did verify that every element of the range was being generated with the following test:
struct Repro : public enki::ITaskSet
{
std::vector<int> data;
Repro() : enki::ITaskSet(TASK_RANGE) { data.resize(TASK_RANGE); };
virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t) override
{
if (range.start > TASK_RANGE || range.end > TASK_RANGE)
{
__debugbreak(); // bug?!
}
for (size_t i = range.start; i < range.end; ++i)
{
data[i] = 1;
}
}
};
... later
for (std::unique_ptr<Repro>& job : jobs)
{
int count = 0;
for (size_t i = 0; i < TASK_RANGE; ++i)
{
count += job->data[i];
}
ASSERT(count == TASK_RANGE);
}
Hey, just noticed this using the _CrtMemCheckpoint/_CrtMemDifference/_CrtDumpMemoryLeaks - in C++11 version, the std::thread objects are created on the heap in TaskScheduler.cpp line 148:
m_pThreads[thread] = new std::thread( TaskingThreadFunction, m_pThreadNumStore[thread] );
but there's no corresponding delete. I've added it to StopThreads just before m_NumThreads = 0, updated function:
void TaskScheduler::StopThreads( bool bWait_ )
{
if( m_bHaveThreads )
{
// wait for them threads quit before deleting data
m_bRunning = 0;
while( bWait_ && m_NumThreadsRunning )
{
// keep firing event to ensure all threads pick up state of m_bRunning
m_NewTaskEvent.notify_all();
}
for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
{
m_pThreads[thread]->join();
delete m_pThreads[thread];
}
m_NumThreads = 0;
delete[] m_pThreadNumStore;
delete[] m_pThreads;
m_pThreadNumStore = 0;
m_pThreads = 0;
m_bHaveThreads = false;
m_NumThreadsActive = 0;
m_NumThreadsRunning = 0;
}
}
->join is needed to prevent asserts. Also, maybe m_pThreads could be just * instead of **, no need to create each std::thread object separately on the heap?
std::thread* m_pThreads;
....
m_pThreads[thread] = std::thread( TaskingThreadFunction, m_pThreadNumStore[thread] );
should work too? I'm not 100% sure, my c++11-fu isn't that great :)
https://github.com/dougbinks/enkiTS/blob/master/src/LockLessMultiReadPipe.h#L277
this line of code can't compile on vs2015, it generate a error error c2593 'operator ==' is ambiguous
.
I have to change it to while( (T*)NULL == pTailPlus1->pNext ) {;}
to make it compiles.
Hello.
Minor problem: as far as I can understand, there is a race condition between checking for tasks and incrementing m_NumThreadsWaiting in TaskScheduler::WaitForTasks.
In my opinion, the best way to fix this would be
This algorithm is very similar to benaphore (atomic counter + kernel semaphore) described in http://preshing.com/20150316/semaphores-are-surprisingly-versatile/
I will happily a pull-request with a fix if you'd like. Notice that this also fixes the #12
Short version:
ICompletable, ITaskSet & IPinnedTask are missing virtual destructors, which can be problematic if allocating tasks dynamically on the heap.
Detailed version:
The following pseudocode shows how the program might crash due to missing Virtual destructors.
The offending line of code is commented with CRASH
enki::TaskScheduler jobSystem;
class MyTask1 : public ITaskSet { ...}
class MyTask2 : public ITaskSet { ...}
class MyPinnedTask : public IPinnedTask { ...}
// An array where I store all my running tasks
myArray<ICompletable*> runningTasks;
// I allocate my tasks on the main thread
MyTask1* task = new MyTask1;
jobSystem.AddTaskSetToPipe(task);
// And store the tasks in an array to be able to check them for completness later
runningTasks.push_back(task);
// Once every frame I loop through my array of tasks and delete the ones that are completed:
void GameUpdate()
{
for(int i=0; i<runningTasks.size(); i++)
{
ICompletable* job = runningTasks[i];
if (job->GetIsComplete())
{
runningTasks.remove(i);
delete job; // CRASH
}
}
}
Once i submit a task, an exception appear.
I noticed you init m_NumPartitions
at 0 here.
Line 522 in 13aa878
The problem then is basically here
Line 407 in 13aa878
I'm creating the scheduler just doing
g_TS.Initialize();
And then i submit a task to it doing
g_TS.AddTaskSetToPipe(&task)
So why that value is 0? Do i did something wrong?
Title is self-explanatory, the formatting is all messed-up :-)
See for example:
Line 72 in 5b135fd
GCC complains that size_, userData_ etc are never used in this method:
ENKITS_API void enki::DefaultFreeFunc( void* ptr_, size_t size_, void* userData_, const char* file_, int line_ )
{
#ifdef _WIN32
_aligned_free( ptr_ );
#else
free( ptr_ );
#endif
};
Not a big deal, but the fewer warnings the better :)
Hi Doug,
I updated enkiTS to 1.4 and the new constant CACHE_LINE_SIZE
ends up clashing with a macro defined in the SDK of a famous video game console which I cannot name due to a NDA.
As you target game development with enkiTS I thought you might like to know this.
Personally I see 3 ways to deal with this:
SIZE_OF_CACHE_LINE
ENKI_
On an unrelated note, I would also recommend that you move all your variables into your enki
namespace.
Regards,
Bruno
Hey,
I've been using enkiTS for a while now and I've really enjoying it. After using it on Win32, it worked on Android/Clang with only 1 unused variable when _DEBUG
is not defined int err = sem_init( &semaphoreid.sem, 0, 0 ); Happy to open another issue / PR for this.
I then went ahead and moved it over to iOS and ran into this Issue.
I understand this is avoidable by adding enkiTS as its own lib/project in xcode, which I plan to do, however I wanted to bring this to your attention either way.
when precompiled headers are turned on and the precompiled header happens to have malloc.h
included, the compiler throws this error:
/enkits/TaskScheduler.cpp:87:9: error: expected identifier
THREAD_STATE_NONE, // shouldn't get this value
^
In file included from /precompiled-header.pch:14:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/malloc/malloc.h:28:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/mach_types.h:87:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/exception_types.h:182:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/thread_status.h:76:
In file included from /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/machine/thread_status.h:33:
/Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS13.2.sdk/usr/include/mach/arm/thread_status.h:55:34: note: expanded from macro 'THREAD_STATE_NONE'
#define THREAD_STATE_NONE 5
For me to get around it I renamed THREAD_STATE_NONE
however you could also undefine THREAD_STATE_NONE
when __APPLE__
is defined since TaskScheduler.cpp doesn't use it.
Hello, thank you for your library!
Minor problem: as far as I can understand, the current implementation of EventSignal/EventWait on POSIX systems allows for a race condition:
There are two ways to fix this behavior: either add a "signaled" variable to the event, which will be checked and modified with mutex locked or anonymous semaphores should be used instead (http://man7.org/linux/man-pages/man3/sem_init.3.html with pshared = 0).
The latter is generally a preferred way at least on Linux and OS X, as it is both simpler, faster and matches the Windows impl.
I will prepare a pull-request with a fix if you'd like.
Hi Doug,
In your README file, you are referencing Dependency.cpp/_c.c but the files are named Dependencies.cpp/_c.c so the links are broken.
I am not sure which you wanted so I will let you choose :)
StopThreads goes into an infinite loop. I suppose this has something to do with when the static object is destroyed.
What is the reason for not returning a std::future
(C++11) upon adding a task instead of having to call WaitforTask
?
Using Microprofile on Windows, I noticed that SetEvent
can occasionally take longer than usual. enkiTS calls it to wake up the worker threads in AddTaskSetToPipe
.
SetEvent
usually blocks for less than 1ms, but I've seen spikes way up in the 20s of ms, which is a problem if the game waits on a task which is blocked by AddTaskSetToPipe
. It ends up producing noticable frame spikes every few seconds.
By changing the Event Object to auto-reset in EventCreate
(https://msdn.microsoft.com/en-us/library/windows/desktop/ms682655(v=vs.85).aspx) these spikes disappear. However, this will only wake up 1 thread at a time, and may decrease thread utilization.
I could also avoid the issue by increasing the spin count, however this of course increases power consumption.
This is probably not a big issue if the threads rarely wait, so it depends on the workload of the scheduler, as well as the number of cores in use. Maybe auto-reset mode should be an option?
WaitForAll()
adds a dummy pinned task and then waits:
Line 796 in 4f9941b
Sometimes WaitForAll()
won't awake.
Not sure if it is due to recent changes, but I have never experienced this problem until today
when I merged the master into my fork.
It seems when this happens the thread we're pinning the dummy task to, is an external thread (added via RegisterExternalTaskThread()
). In my case this was the dedicated rendering thread - it handles main thread requests and sleeps, never running tasks explicitly, so there's no guarantee it will run the dummy task at all.
Whilst enkiTS only allocates at initialization time, a custom allocator would be useful to some users for tracking memory consumption.
https://twitter.com/serhii_rieznik/status/1187011358220541952
Would you like to add more error handling for return values from functions like the following?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.