From 61307aefbcae08d826f6d542ecf668d716748b2a Mon Sep 17 00:00:00 2001 From: Nikita Smith Date: Tue, 21 Jan 2025 01:55:13 -0800 Subject: [PATCH] reworked thread pool to share workers via semaphore --- src/linker/lnk.c | 9 +-- src/linker/lnk_config.c | 17 ++--- src/linker/lnk_config.h | 6 +- src/linker/thread_pool/thread_pool.c | 104 ++++++++++----------------- src/linker/thread_pool/thread_pool.h | 9 +-- 5 files changed, 53 insertions(+), 92 deletions(-) diff --git a/src/linker/lnk.c b/src/linker/lnk.c index 764acaf1..ac8bb242 100644 --- a/src/linker/lnk.c +++ b/src/linker/lnk.c @@ -3190,13 +3190,8 @@ lnk_run(int argc, char **argv) LNK_Config *config = lnk_build_config(scratch.arena, argc, argv); - TP_Context *tp; - if (config->shared_thread_pool == LNK_SwitchState_Yes) { - tp = tp_alloc_shared(scratch.arena, config->worker_count, config->shared_thread_pool_mutex_name); - } else { - tp = tp_alloc(scratch.arena, config->worker_count); - } - TP_Arena *tp_arena = tp_arena_alloc(tp); + TP_Context *tp = tp_alloc(scratch.arena, config->worker_count, config->shared_thread_pool_name); + TP_Arena *tp_arena = tp_arena_alloc(tp); #if PROFILE_TELEMETRY { diff --git a/src/linker/lnk_config.c b/src/linker/lnk_config.c index a3151585..6b0a9ed6 100644 --- a/src/linker/lnk_config.c +++ b/src/linker/lnk_config.c @@ -150,8 +150,7 @@ global read_only struct { LNK_CmdSwitch_Rad_PdbHashTypeNameMap, "RAD_PDB_HASH_TYPE_NAME_MAP", ":FILENAME", "Produce map file with hash -> type name mappings." }, { LNK_CmdSwitch_Rad_PdbHashTypeNames, "RAD_PDB_HASH_TYPE_NAMES", ":{NONE|LENIENT|FULL}", "Replace type names in LF_STRUCTURE and LF_CLASS with hashes." }, { LNK_CmdSwitch_Rad_SectVirtOff, "RAD_SECT_VIRT_OFF", ":#", "Set RVA where section data is placed in memory. For internal use only." }, - { LNK_CmdSwitch_Rad_SharedThreadPool, "RAD_SHARED_THREAD_POOL", "[:NO]", "" }, - { LNK_CmdSwitch_Rad_SharedThreadPoolMutexName, "RAD_SHARED_THREAD_POOL_MUTEX_NAME", ":STRING", "" }, + { LNK_CmdSwitch_Rad_SharedThreadPool, "RAD_SHARED_THREAD_POOL", "[:STRING]", "Default value \"" LNK_DEFAULT_THREAD_POOL_NAME "\"" }, { LNK_CmdSwitch_Rad_SuppressError, "RAD_SUPPRESS_ERROR", ":#", "" }, { LNK_CmdSwitch_Rad_SymbolTableCapDefined, "RAD_SYMBOL_TABLE_CAP_DEFINED", ":#", "Number of buckets allocated in the symbol table for defined symbols." }, { LNK_CmdSwitch_Rad_SymbolTableCapInternal, "RAD_SYMBOL_TABLE_CAP_INTERNAL", ":#", "Number of buckets allocated in the symbol table for internal symbols." }, @@ -1677,11 +1676,14 @@ lnk_apply_cmd_option_to_config(Arena *arena, LNK_Config *config, String8 cmd_nam } break; case LNK_CmdSwitch_Rad_SharedThreadPool: { - lnk_cmd_switch_parse_flag(obj_path, lib_path, cmd_switch, value_strings, &config->shared_thread_pool); - } break; - - case LNK_CmdSwitch_Rad_SharedThreadPoolMutexName: { - lnk_cmd_switch_parse_string(obj_path, lib_path, cmd_switch, value_strings, &config->shared_thread_pool_mutex_name); + if (value_strings.node_count == 0) { + config->shared_thread_pool_name = str8_lit(LNK_DEFAULT_THREAD_POOL_NAME); + } else { + lnk_cmd_switch_parse_string(obj_path, lib_path, cmd_switch, value_strings, &config->shared_thread_pool_name); + if (config->shared_thread_pool_name.size == 0) { + lnk_error_cmd_switch(LNK_Error_Cmdl, obj_path, lib_path, cmd_switch, "invalid empty string for thread pool name"); + } + } } break; case LNK_CmdSwitch_Rad_SuppressError: { @@ -1794,7 +1796,6 @@ lnk_config_from_cmd_line(Arena *arena, String8List raw_cmd_line) lnk_cmd_line_push_option_if_not_presentf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_SymbolTableCapWeak, "0x3ffff"); lnk_cmd_line_push_option_if_not_presentf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_SymbolTableCapLib, "0x3ffff"); lnk_cmd_line_push_option_if_not_presentf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_DebugAltPath, "%%_RAD_RDI_PATH%%"); - lnk_cmd_line_push_option_if_not_presentf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_SharedThreadPoolMutexName, "RADLINK_THREAD_POOL_MUTEX"); #if BUILD_DEBUG lnk_cmd_line_push_optionf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_Log, "debug"); lnk_cmd_line_push_optionf(scratch.arena, &cmd_line, LNK_CmdSwitch_Rad_Log, "io_write"); diff --git a/src/linker/lnk_config.h b/src/linker/lnk_config.h index 6db6f692..130d879b 100644 --- a/src/linker/lnk_config.h +++ b/src/linker/lnk_config.h @@ -148,7 +148,6 @@ typedef enum LNK_CmdSwitch_Rad_PdbHashTypeNameLength, LNK_CmdSwitch_Rad_SectVirtOff, LNK_CmdSwitch_Rad_SharedThreadPool, - LNK_CmdSwitch_Rad_SharedThreadPoolMutexName, LNK_CmdSwitch_Rad_SuppressError, LNK_CmdSwitch_Rad_SymbolTableCapDefined, LNK_CmdSwitch_Rad_SymbolTableCapInternal, @@ -286,6 +285,8 @@ typedef enum # error #endif +#define LNK_DEFAULT_THREAD_POOL_NAME "RADLINK_THREAD_POOL" + typedef struct LNK_Config { LNK_ConfigFlags flags; @@ -312,8 +313,7 @@ typedef struct LNK_Config U64 pdb_page_size; U64 worker_count; U64 idle_worker_count; - LNK_SwitchState shared_thread_pool; - String8 shared_thread_pool_mutex_name; + String8 shared_thread_pool_name; U64 *function_pad_min; U64 *manifest_resource_id; B32 no_default_libs; diff --git a/src/linker/thread_pool/thread_pool.c b/src/linker/thread_pool/thread_pool.c index 29d45ee1..6399cd78 100644 --- a/src/linker/thread_pool/thread_pool.c +++ b/src/linker/thread_pool/thread_pool.c @@ -6,42 +6,37 @@ tp_execute_tasks(TP_Worker *worker) { TP_Context *pool = worker->pool; - Arena *arena = NULL; - if (pool->worker_arena) { - arena = pool->worker_arena->v[worker->id]; - } - for (;;) { - // get task id - U64 next_task_id = ins_atomic_u64_inc_eval(&pool->next_task_id); - if (next_task_id > pool->task_count) { + // do we have tasks? + S64 task_left_count = ins_atomic_u64_dec_eval(&pool->task_left_count); + if (task_left_count < 0) { break; } - // invoke task func - U64 task_id = next_task_id - 1; + // invoke task + Arena *arena = pool->worker_arena ? pool->worker_arena->v[worker->id] : 0; + U64 task_id = pool->task_count - (task_left_count+1); pool->task_func(arena, worker->id, task_id, pool->task_data); + + // before last worker takes semaphore wake up main thread + U64 task_done_count = ins_atomic_u64_inc_eval(&pool->task_done_count); + if (task_done_count == pool->task_count) { + os_semaphore_drop(pool->main_semaphore); + } } } internal void tp_worker_main(void *raw_worker) { - TCTX tctx_; - tctx_init_and_equip(&tctx_); + TCTX tctx_; tctx_init_and_equip(&tctx_); - TP_Worker *worker = (TP_Worker *)raw_worker; - TP_Context *pool = worker->pool; + TP_Worker *worker = raw_worker; + TP_Context *pool = worker->pool; while (pool->is_live) { if (os_semaphore_take(pool->task_semaphore, max_U64)) { tp_execute_tasks(worker); - - // before last worker takes semaphore wake up main worker - U64 take_count = ins_atomic_u64_dec_eval(&pool->take_count); - if (take_count == 1) { - os_semaphore_drop(pool->main_semaphore); - } } else { Assert(!"time out"); } @@ -49,7 +44,7 @@ tp_worker_main(void *raw_worker) } internal TP_Context * -tp_alloc(Arena *arena, U32 worker_count) +tp_alloc(Arena *arena, U32 worker_count, String8 name) { ProfBeginDynamic("Alloc Thread Pool [Worker Count: %u]", worker_count); Assert(worker_count > 0); @@ -57,7 +52,7 @@ tp_alloc(Arena *arena, U32 worker_count) // init pool TP_Context *pool = push_array(arena, TP_Context, 1); if (worker_count > 1) { - pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, str8(0,0)); + pool->task_semaphore = os_semaphore_alloc(0, worker_count - 1, name); pool->main_semaphore = os_semaphore_alloc(0, 1, str8(0,0)); } pool->is_live = 1; @@ -81,25 +76,12 @@ tp_alloc(Arena *arena, U32 worker_count) return pool; } -internal TP_Context * -tp_alloc_shared(Arena *arena, U32 worker_count, String8 name) -{ - TP_Context *tp = tp_alloc(arena, worker_count); - tp->shared_mutex_name = name; - tp->shared_mutex_handle = os_shared_mutex_alloc(name); - AssertAlways(!os_handle_match(tp->shared_mutex_handle, os_handle_zero())); - return tp; -} - internal void tp_release(TP_Context *pool) { pool->is_live = 0; os_semaphore_release(pool->task_semaphore); os_semaphore_release(pool->main_semaphore); - if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) { - os_mutex_release(pool->shared_mutex_handle); - } for (U64 i = 1; i < pool->worker_count; i += 1) { os_thread_detach(pool->worker_arr[i].handle); } @@ -171,43 +153,29 @@ tp_temp_end(TP_Temp temp) internal void tp_for_parallel(TP_Context *pool, TP_Arena *arena, U64 task_count, TP_TaskFunc *task_func, void *task_data) { - Assert(!arena || arena->count == pool->worker_count); + if (task_count > 0) { + Assert(!arena || arena->count == pool->worker_count); - // in shared mode take mutex - if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) { - if (!os_shared_mutex_take(pool->shared_mutex_handle, max_U64)) { - AssertAlways(!"failed to take shared mutex"); + // init context + pool->worker_arena = arena; + pool->task_count = task_count; + pool->task_func = task_func; + pool->task_data = task_data; + pool->task_done_count = 0; + pool->task_left_count = task_count; + + // wake up workers + for (U64 worker_idx = 1; worker_idx < pool->worker_count; worker_idx += 1) { + os_semaphore_drop(pool->task_semaphore); } - } - - // setup pool state - pool->worker_arena = arena; - pool->task_count = task_count; - pool->task_func = task_func; - pool->task_data = task_data; - pool->next_task_id = 0; - pool->take_count = 0; - - // do we have enough work for other workers? - pool->take_count = Min(pool->task_count, pool->worker_count); - U64 drop_count = pool->take_count; - for (U64 worker_idx = 1; worker_idx < drop_count; worker_idx += 1) { - os_semaphore_drop(pool->task_semaphore); - } - - // execute tasks on main worker too - TP_Worker *main_worker = &pool->worker_arr[0]; - tp_execute_tasks(main_worker); - - if (drop_count > 1) { - // wait for workers to finish assigned tasks + + // execute tasks on main worker + TP_Worker *main_worker = &pool->worker_arr[0]; + tp_execute_tasks(main_worker); + + // wait for workers to finish tasks os_semaphore_take(pool->main_semaphore, max_U64); } - - // signal other thread pools that we have done our round of tasks - if (!os_handle_match(pool->shared_mutex_handle, os_handle_zero())) { - os_shared_mutex_drop(pool->shared_mutex_handle); - } } internal Rng1U64 * diff --git a/src/linker/thread_pool/thread_pool.h b/src/linker/thread_pool/thread_pool.h index dfc0bdcd..0c31913b 100644 --- a/src/linker/thread_pool/thread_pool.h +++ b/src/linker/thread_pool/thread_pool.h @@ -29,8 +29,6 @@ typedef struct TP_Context { OS_Handle task_semaphore; OS_Handle main_semaphore; - OS_Handle shared_mutex_handle; - String8 shared_mutex_name; B32 is_live; U32 worker_count; TP_Worker *worker_arr; @@ -38,12 +36,11 @@ typedef struct TP_Context U64 task_count; TP_TaskFunc *task_func; void *task_data; - volatile U64 next_task_id; - volatile U64 take_count; + volatile S64 task_left_count; + volatile U64 task_done_count; } TP_Context; -internal TP_Context * tp_alloc(Arena *arena, U32 worker_count); -internal TP_Context * tp_alloc_shared(Arena *arena, U32 worker_count, String8 name); +internal TP_Context * tp_alloc(Arena *arena, U32 worker_count, String8 name); internal void tp_release(TP_Context *pool); internal TP_Arena * tp_arena_alloc(TP_Context *pool); internal void tp_arena_release(TP_Arena **arena_ptr);