reworked thread pool to share workers via semaphore

This commit is contained in:
Nikita Smith
2025-01-21 01:55:13 -08:00
parent 85ea141b83
commit 61307aefbc
5 changed files with 53 additions and 92 deletions
+36 -68
View File
@@ -6,42 +6,37 @@ tp_execute_tasks(TP_Worker *worker)
{
TP_Context *pool = worker->pool;
Arena *arena = NULL;
if (pool->worker_arena) {
arena = pool->worker_arena->v[worker->id];
}
for (;;) {
// get task id
U64 next_task_id = ins_atomic_u64_inc_eval(&pool->next_task_id);
if (next_task_id > pool->task_count) {
// do we have tasks?
S64 task_left_count = ins_atomic_u64_dec_eval(&pool->task_left_count);
if (task_left_count < 0) {
break;
}
// invoke task func
U64 task_id = next_task_id - 1;
// invoke task
Arena *arena = pool->worker_arena ? pool->worker_arena->v[worker->id] : 0;
U64 task_id = pool->task_count - (task_left_count+1);
pool->task_func(arena, worker->id, task_id, pool->task_data);
// before last worker takes semaphore wake up main thread
U64 task_done_count = ins_atomic_u64_inc_eval(&pool->task_done_count);
if (task_done_count == pool->task_count) {
os_semaphore_drop(pool->main_semaphore);
}
}
}
internal void
tp_worker_main(void *raw_worker)
{
TCTX tctx_;
tctx_init_and_equip(&tctx_);
TCTX tctx_; tctx_init_and_equip(&tctx_);
TP_Worker *worker = (TP_Worker *)raw_worker;
TP_Context *pool = worker->pool;
TP_Worker *worker = raw_worker;
TP_Context *pool = worker->pool;
while (pool->is_live) {
if (os_semaphore_take(pool->task_semaphore, max_U64)) {
tp_execute_tasks(worker);
// before last worker takes semaphore wake up main worker
U64 take_count = ins_atomic_u64_dec_eval(&pool->take_count);
if (take_count == 1) {
os_semaphore_drop(pool->main_semaphore);
}
} else {
Assert(!"time out");
}
@@ -49,7 +44,7 @@ tp_worker_main(void *raw_worker)
}
internal TP_Context *
tp_alloc(Arena *arena, U32 worker_count)
tp_alloc(Arena *arena, U32 worker_count, String8 name)
{
ProfBeginDynamic("Alloc Thread Pool [Worker Count: %u]", worker_count);
Assert(worker_count > 0);
@@ -57,7 +52,7 @@ tp_alloc(Arena *arena, U32 worker_count)
// init pool
TP_Context *pool = push_array(arena, TP_Context, 1);
if (worker_count > 1) {
pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, str8(0,0));
pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, name);
pool->main_semaphore = os_semaphore_alloc(0, 1, str8(0,0));
}
pool->is_live = 1;
@@ -81,25 +76,12 @@ tp_alloc(Arena *arena, U32 worker_count)
return pool;
}
internal TP_Context *
tp_alloc_shared(Arena *arena, U32 worker_count, String8 name)
{
TP_Context *tp = tp_alloc(arena, worker_count);
tp->shared_mutex_name = name;
tp->shared_mutex_handle = os_shared_mutex_alloc(name);
AssertAlways(!os_handle_match(tp->shared_mutex_handle, os_handle_zero()));
return tp;
}
internal void
tp_release(TP_Context *pool)
{
pool->is_live = 0;
os_semaphore_release(pool->task_semaphore);
os_semaphore_release(pool->main_semaphore);
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
os_mutex_release(pool->shared_mutex_handle);
}
for (U64 i = 1; i < pool->worker_count; i += 1) {
os_thread_detach(pool->worker_arr[i].handle);
}
@@ -171,43 +153,29 @@ tp_temp_end(TP_Temp temp)
internal void
tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *task_func, void *task_data)
{
Assert(!arena || arena->count == pool->worker_count);
if (task_count > 0) {
Assert(!arena || arena->count == pool->worker_count);
// in shared mode take mutex
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
if (!os_shared_mutex_take(pool->shared_mutex_handle, max_U64)) {
AssertAlways(!"failed to take shared mutex");
// init context
pool->worker_arena = arena;
pool->task_count = task_count;
pool->task_func = task_func;
pool->task_data = task_data;
pool->task_done_count = 0;
pool->task_left_count = task_count;
// wake up workers
for (U64 worker_idx = 1; worker_idx < pool->worker_count; worker_idx += 1) {
os_semaphore_drop(pool->task_semaphore);
}
}
// setup pool state
pool->worker_arena = arena;
pool->task_count = task_count;
pool->task_func = task_func;
pool->task_data = task_data;
pool->next_task_id = 0;
pool->take_count = 0;
// do we have enough work for other workers?
pool->take_count = Min(pool->task_count, pool->worker_count);
U64 drop_count = pool->take_count;
for (U64 worker_idx = 1; worker_idx < drop_count; worker_idx += 1) {
os_semaphore_drop(pool->task_semaphore);
}
// execute tasks on main worker too
TP_Worker *main_worker = &pool->worker_arr[0];
tp_execute_tasks(main_worker);
if (drop_count > 1) {
// wait for workers to finish assigned tasks
// execute tasks on main worker
TP_Worker *main_worker = &pool->worker_arr[0];
tp_execute_tasks(main_worker);
// wait for workers to finish tasks
os_semaphore_take(pool->main_semaphore, max_U64);
}
// signal other thread pools that we have done our round of tasks
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
os_shared_mutex_drop(pool->shared_mutex_handle);
}
}
internal Rng1U64 *
+3 -6
View File
@@ -29,8 +29,6 @@ typedef struct TP_Context
{
OS_Handle task_semaphore;
OS_Handle main_semaphore;
OS_Handle shared_mutex_handle;
String8 shared_mutex_name;
B32 is_live;
U32 worker_count;
TP_Worker *worker_arr;
@@ -38,12 +36,11 @@ typedef struct TP_Context
U64 task_count;
TP_TaskFunc *task_func;
void *task_data;
volatile U64 next_task_id;
volatile U64 take_count;
volatile S64 task_left_count;
volatile U64 task_done_count;
} TP_Context;
internal TP_Context * tp_alloc(Arena *arena, U32 worker_count);
internal TP_Context * tp_alloc_shared(Arena *arena, U32 worker_count, String8 name);
internal TP_Context * tp_alloc(Arena *arena, U32 worker_count, String8 name);
internal void tp_release(TP_Context *pool);
internal TP_Arena * tp_arena_alloc(TP_Context *pool);
internal void tp_arena_release(TP_Arena **arena_ptr);