tweaks and fixed in thread pool

- created a separate entry point for shared workers
- fixed race condition in tp_run_tasks where worker would read
  modified task count and cause unpredictable behavior
This commit is contained in:
Nikita Smith
2025-01-24 10:33:42 -08:00
parent b18060ef0d
commit 8196ef0a6e
2 changed files with 104 additions and 50 deletions
+96 -45
View File
@@ -2,25 +2,27 @@
// Licensed under the MIT license (https://opensource.org/license/mit/)
internal void
tp_execute_tasks(TP_Worker *worker)
tp_run_tasks(TP_Context *pool, TP_Worker *worker)
{
TP_Context *pool = worker->pool;
for (;;) {
// do we have tasks?
S64 task_left_count = ins_atomic_u64_dec_eval(&pool->task_left_count);
if (task_left_count < 0) {
S64 task_left = ins_atomic_u64_dec_eval(&pool->task_left);
// are there any tasks left to run?
if (task_left < 0) {
break;
}
// invoke task
Arena *arena = pool->worker_arena ? pool->worker_arena->v[worker->id] : 0;
U64 task_id = pool->task_count - (task_left_count+1);
// run task
Arena *arena = pool->task_arena ? pool->task_arena->v[worker->id] : 0;
U64 task_id = pool->task_count - (task_left+1);
pool->task_func(arena, worker->id, task_id, pool->task_data);
// before last worker takes semaphore wake up main thread
U64 task_done_count = ins_atomic_u64_inc_eval(&pool->task_done_count);
if (task_done_count == pool->task_count) {
// cache task count so we dont touch pool memory after atomic inc
U64 task_count = pool->task_count;
// on last task ping main thread
U64 task_done = ins_atomic_u64_inc_eval(&pool->task_done);
if (task_done == task_count) {
os_semaphore_drop(pool->main_semaphore);
}
}
@@ -29,16 +31,27 @@ tp_execute_tasks(TP_Worker *worker)
internal void
tp_worker_main(void *raw_worker)
{
TCTX tctx_; tctx_init_and_equip(&tctx_);
TCTX tctx_; tctx_init_and_equip(&tctx_);
TP_Worker *worker = raw_worker;
TP_Context *pool = worker->pool;
while (pool->is_live) {
for (; pool->is_live; ) {
if (os_semaphore_take(pool->task_semaphore, max_U64)) {
tp_execute_tasks(worker);
} else {
Assert(!"time out");
tp_run_tasks(pool, worker);
}
}
}
internal void
tp_worker_main_shared(void *raw_worker)
{
TCTX tctx_; tctx_init_and_equip(&tctx_);
TP_Worker *worker = raw_worker;
TP_Context *pool = worker->pool;
for (; pool->is_live; ) {
if (os_semaphore_take(pool->exec_semaphore, max_U64)) {
if (os_semaphore_take(pool->task_semaphore, max_U64)) {
tp_run_tasks(pool, worker);
}
}
}
}
@@ -47,14 +60,32 @@ internal TP_Context *
tp_alloc(Arena *arena, U32 worker_count, String8 name)
{
ProfBeginDynamic("Alloc Thread Pool [Worker Count: %u]", worker_count);
Assert(worker_count > 0);
// init pool
TP_Context *pool = push_array(arena, TP_Context, 1);
AssertAlways(worker_count > 0);
B32 is_shared = (name.size > 0);
// alloc semaphores
OS_Handle main_semaphore = {0};
OS_Handle task_semaphore = {0};
OS_Handle exec_semaphore = {0};
if (worker_count > 1) {
pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, name);
pool->main_semaphore = os_semaphore_alloc(0, 1, str8(0,0));
main_semaphore = os_semaphore_alloc(0, 1, str8_zero());
if (is_shared) {
task_semaphore = os_semaphore_alloc(0, worker_count, name);
exec_semaphore = os_semaphore_alloc(0, worker_count, str8_zero());
} else {
task_semaphore = os_semaphore_alloc(0, worker_count, str8_zero());
}
}
// pick entry point for the workers
void *worker_entry = is_shared ? tp_worker_main_shared : tp_worker_main;
// init pool
TP_Context *pool = push_array(arena, TP_Context, 1);
pool->exec_semaphore = exec_semaphore;
pool->task_semaphore = task_semaphore;
pool->main_semaphore = main_semaphore;
pool->is_live = 1;
pool->worker_count = worker_count;
pool->worker_arr = push_array(arena, TP_Worker, worker_count);
@@ -62,14 +93,14 @@ tp_alloc(Arena *arena, U32 worker_count, String8 name)
// init worker data
for (U64 i = 0; i < worker_count; i += 1) {
TP_Worker *worker = &pool->worker_arr[i];
worker->id = i;
worker->pool = pool;
worker->id = i;
worker->pool = pool;
}
// launch worker threads
for (U64 i = 1; i < worker_count; i += 1) {
TP_Worker *worker = &pool->worker_arr[i];
worker->handle = os_thread_launch(tp_worker_main, worker, 0);
worker->handle = os_thread_launch(worker_entry, worker, 0);
}
ProfEnd();
@@ -80,11 +111,25 @@ internal void
tp_release(TP_Context *pool)
{
pool->is_live = 0;
os_semaphore_release(pool->task_semaphore);
os_semaphore_release(pool->main_semaphore);
B32 is_shared = !os_handle_match(pool->exec_semaphore, os_handle_zero());
if (is_shared) {
for (U64 i = 0; i < pool->worker_count; ++i) {
os_semaphore_drop(pool->exec_semaphore);
}
}
for (U64 i = 0; i < pool->worker_count; ++i) {
os_semaphore_drop(pool->task_semaphore);
}
for (U64 i = 1; i < pool->worker_count; i += 1) {
os_thread_detach(pool->worker_arr[i].handle);
}
if (is_shared) {
os_semaphore_release(pool->exec_semaphore);
}
os_semaphore_release(pool->task_semaphore);
os_semaphore_release(pool->main_semaphore);
MemoryZeroStruct(pool);
}
@@ -151,27 +196,33 @@ tp_temp_end(TP_Temp temp)
}
internal void
tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *task_func, void *task_data)
tp_for_parallel(TP_Context *pool, TP_Arena *task_arena, U64 task_count, TP_TaskFunc *task_func, void *task_data)
{
if (task_count > 0) {
Assert(!arena || arena->count == pool->worker_count);
// init run
pool->task_arena = task_arena;
pool->task_func = task_func;
pool->task_data = task_data;
pool->task_count = task_count;
pool->task_done = 0;
ins_atomic_u64_eval_assign(&pool->task_left, task_count);
// init context
pool->worker_arena = arena;
pool->task_count = task_count;
pool->task_func = task_func;
pool->task_data = task_data;
pool->task_done_count = 0;
pool->task_left_count = task_count;
U64 drop_count = Min(task_count, pool->worker_count);
// if we are in shared mode ping local semaphore
if (!os_handle_match(pool->exec_semaphore, os_handle_zero())) {
for (U64 worker_idx = 0; worker_idx < drop_count; worker_idx +=1) {
os_semaphore_drop(pool->exec_semaphore);
}
}
// wake up workers
for (U64 worker_idx = 1; worker_idx < pool->worker_count; worker_idx += 1) {
// ping shared semaphore
for (U64 worker_idx = 0; worker_idx < drop_count; worker_idx += 1) {
os_semaphore_drop(pool->task_semaphore);
}
// execute tasks on main worker
TP_Worker *main_worker = &pool->worker_arr[0];
tp_execute_tasks(main_worker);
// run tasks on main worker
tp_run_tasks(pool, &pool->worker_arr[0]);
// wait for workers to finish tasks
os_semaphore_take(pool->main_semaphore, max_U64);
+8 -5
View File
@@ -27,17 +27,20 @@ typedef struct TP_Worker
typedef struct TP_Context
{
B32 is_live;
OS_Handle exec_semaphore;
OS_Handle task_semaphore;
OS_Handle main_semaphore;
B32 is_live;
U32 worker_count;
TP_Worker *worker_arr;
TP_Arena *worker_arena;
U64 task_count;
TP_Arena *task_arena;
TP_TaskFunc *task_func;
void *task_data;
volatile S64 task_left_count;
volatile U64 task_done_count;
U64 task_count;
U64 task_done;
S64 task_left;
} TP_Context;
internal TP_Context * tp_alloc(Arena *arena, U32 worker_count, String8 name);