impl shared thread pool mode

This commit is contained in:
Nikita Smith
2025-01-20 21:41:24 -08:00
parent e1e7fb745e
commit d3fbc858b8
5 changed files with 118 additions and 68 deletions
+29 -4
View File
@@ -81,12 +81,25 @@ tp_alloc(Arena *arena, U32 worker_count)
return pool;
}
internal TP_Context *
tp_alloc_shared(Arena *arena, U32 worker_count, String8 name)
{
TP_Context *tp = tp_alloc(arena, worker_count);
tp->shared_mutex_name = name;
tp->shared_mutex_handle = os_shared_mutex_alloc(name);
AssertAlways(!os_handle_match(tp->shared_mutex_handle, os_handle_zero()));
return tp;
}
internal void
tp_release(TP_Context *pool)
{
pool->is_live = 0;
os_semaphore_release(pool->task_semaphore);
os_semaphore_release(pool->main_semaphore);
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
os_mutex_release(pool->shared_mutex_handle);
}
for (U64 i = 1; i < pool->worker_count; i += 1) {
os_thread_detach(pool->worker_arr[i].handle);
}
@@ -160,13 +173,20 @@ tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *
{
Assert(!arena || arena->count == pool->worker_count);
// in shared mode take mutex
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
if (!os_shared_mutex_take(pool->shared_mutex_handle, max_U64)) {
AssertAlways(!"failed to take shared mutex");
}
}
// setup pool state
pool->worker_arena = arena;
pool->task_count = task_count;
pool->task_func = task_func;
pool->task_data = task_data;
pool->task_count = task_count;
pool->task_func = task_func;
pool->task_data = task_data;
pool->next_task_id = 0;
pool->take_count = 0;
pool->take_count = 0;
// do we have enough work for other workers?
pool->take_count = Min(pool->task_count, pool->worker_count);
@@ -183,6 +203,11 @@ tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *
// wait for workers to finish assigned tasks
os_semaphore_take(pool->main_semaphore, max_U64);
}
// signal other thread pools that we have done our round of tasks
if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) {
os_shared_mutex_drop(pool->shared_mutex_handle);
}
}
internal Rng1U64 *
+3
View File
@@ -29,6 +29,8 @@ typedef struct TP_Context
{
OS_Handle task_semaphore;
OS_Handle main_semaphore;
OS_Handle shared_mutex_handle;
String8 shared_mutex_name;
B32 is_live;
U32 worker_count;
TP_Worker *worker_arr;
@@ -41,6 +43,7 @@ typedef struct TP_Context
} TP_Context;
internal TP_Context * tp_alloc(Arena *arena, U32 worker_count);
internal TP_Context * tp_alloc_shared(Arena *arena, U32 worker_count, String8 name);
internal void tp_release(TP_Context *pool);
internal TP_Arena * tp_arena_alloc(TP_Context *pool);
internal void tp_arena_release(TP_Arena **arena_ptr);