From a0f51913dce3b3f8a9770591d70ac949c00a4021 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Wed, 15 Oct 2025 01:59:19 -0400 Subject: [PATCH] initial job queue load test during exit, works with hot-reload. --- code2/host/host.odin | 18 ++++++-- code2/sectr/engine/client_api.odin | 68 ++++++++++++++++++++++++------ code2/sectr/engine/host_api.odin | 8 ++++ code2/sectr/engine/job_system.odin | 31 +++++++------- code2/sectr/pkg_mappings.odin | 13 ++++-- 5 files changed, 102 insertions(+), 36 deletions(-) diff --git a/code2/host/host.odin b/code2/host/host.odin index 6d18a37..b359175 100644 --- a/code2/host/host.odin +++ b/code2/host/host.odin @@ -125,6 +125,9 @@ main :: proc() { host_memory.job_system.running = true host_memory.job_system.worker_num = THREAD_JOB_WORKERS + for & list in host_memory.job_system.job_lists { + list = {} + } // Determine number of physical cores barrier_init(& host_memory.job_hot_reload_sync, THREAD_JOB_WORKERS + 1) for id in THREAD_JOB_WORKER_ID_START ..< THREAD_JOB_WORKER_ID_END { @@ -138,7 +141,6 @@ main :: proc() host_tick_lane() } - sync_store(& host_memory.job_system.running, false, .Release) if thread_memory.id == .Master_Prepper { thread_join_multiple(.. host_memory.threads[1:THREAD_TICK_LANES + THREAD_JOB_WORKERS]) } @@ -184,19 +186,26 @@ host_tick_lane :: proc() sync_client_api() running: b64 = host_memory.client_api.tick_lane( duration_seconds(delta_ns), delta_ns ) == false - if thread_memory.id == .Master_Prepper { sync_store(& host_memory.tick_running, running, .Release) } + if thread_memory.id == .Master_Prepper { + sync_store(& host_memory.tick_running, running, .Release) + } // host_memory.client_api.clean_frame() delta_ns = time_tick_lap_time( & host_tick ) host_tick = time_tick_now() + + leader := barrier_wait(& host_memory.lane_sync) } - leader := barrier_wait(& host_memory.lane_sync) host_lane_shutdown() } host_lane_shutdown :: proc() { profile(#procedure) - spall_buffer_destroy( & host_memory.spall_context, & thread_memory.spall_buffer ) + if thread_memory.id == .Master_Prepper { + sync_store(& host_memory.job_system.running, false, .Release) + } + leader := barrier_wait(& host_memory.lane_job_sync) + // spall_buffer_destroy( & host_memory.spall_context, & thread_memory.spall_buffer ) } host_job_worker_entrypoint :: proc(worker_thread: ^SysThread) @@ -223,6 +232,7 @@ host_job_worker_entrypoint :: proc(worker_thread: ^SysThread) host_memory.client_api.hot_reload(& host_memory, & thread_memory) } } + leader := barrier_wait(& host_memory.lane_job_sync) } @export diff --git a/code2/sectr/engine/client_api.odin b/code2/sectr/engine/client_api.odin index bc13361..ae8a475 100644 --- a/code2/sectr/engine/client_api.odin +++ b/code2/sectr/engine/client_api.odin @@ -34,8 +34,13 @@ then prepare for multi-threaded "laned" tick: thread_wide_startup. @export startup :: proc(host_mem: ^ProcessMemory, thread_mem: ^ThreadMemory) { - memory = host_mem - thread = thread_mem + // Rad Debugger driving me crazy.. + for ; memory == nil; { + memory = host_mem + } + for ; thread == nil; { + thread = thread_mem + } grime_set_profiler_module_context(& memory.spall_context) grime_set_profiler_thread_buffer(& thread.spall_buffer) profile(#procedure) @@ -120,23 +125,36 @@ tick_lane :: proc(host_delta_time_ms: f64, host_delta_ns: Duration) -> (should_c @thread_local dummy: int = 0 dummy += 1 + EXIT_TIME :: 1 + // profile_begin("sokol_app: pre_client_tick") // should_close |= cast(b64) sokol_app.pre_client_frame() @static timer: f64 if thread.id == .Master_Prepper { - timer += host_delta_time_ms - sync_store(& should_close, timer > 5, .Release) + timer += host_delta_time_ms + sync_store(& should_close, timer > EXIT_TIME, .Release) + + // Test dispatching 64 jobs during hot_reload loop (when the above store is uncommented) + for job_id := 1; job_id < 64; job_id += 1 { + memory.job_info_reload[job_id].id = job_id + memory.job_reload[job_id] = make_job_raw(& memory.job_group_reload, & memory.job_info_reload[job_id], test_job, {}, "Job Test (Hot-Reload)") + job_dispatch_single(& memory.job_reload[job_id], .Normal) + } } // profile_end() - // profile_begin("Client Tick") + profile_begin("Client Tick") - // @thread_local test_job: TestJobInfo - // for job_id := 1; job_id < 64; job_id += 1 { - // job_dispatch(test_job, & test_job, .Medium, "Job Test") - // } + if thread.id == .Master_Prepper && timer > EXIT_TIME { + // Test dispatching 64 jobs during the last iteration before exiting. + for job_id := 1; job_id < 64; job_id += 1 { + memory.job_info_exit[job_id].id = job_id + memory.job_exit[job_id] = make_job_raw(& memory.job_group_exit, & memory.job_info_exit[job_id], test_job, {}, "Job Test (Exit)") + job_dispatch_single(& memory.job_exit[job_id], .Normal) + } + } - // profile_end() + profile_end() // profile_begin("sokol_app: post_client_tick") // profile_end() @@ -146,11 +164,35 @@ tick_lane :: proc(host_delta_time_ms: f64, host_delta_ns: Duration) -> (should_c } @export -jobsys_worker_tick :: proc() { +jobsys_worker_tick :: proc() +{ profile("Worker Tick") - @thread_local dummy: int = 0; - dummy += 1 + ORDERED_PRIORITIES :: [len(JobPriority)]JobPriority{.High, .Normal, .Low} + block: for priority in ORDERED_PRIORITIES + { + if memory.job_system.job_lists[priority].head == nil do continue + if sync_mutex_try_lock(& memory.job_system.job_lists[priority].mutex) + { + if job := memory.job_system.job_lists[priority].head; job != nil + { + if int(thread.id) in job.ignored { + sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex) + continue + } + memory.job_system.job_lists[priority].head = job.next + sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex) + + assert(job.group != nil) + assert(job.cb != nil) + job.cb(job.data) + + sync_sub(& job.group.counter, 1, .Seq_Cst) + break block + } + sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex) + } + } } TestJobInfo :: struct { diff --git a/code2/sectr/engine/host_api.odin b/code2/sectr/engine/host_api.odin index 738b4fd..4273f9e 100644 --- a/code2/sectr/engine/host_api.odin +++ b/code2/sectr/engine/host_api.odin @@ -42,6 +42,14 @@ ProcessMemory :: struct { client_api_hot_reloaded: b64, // Used to signal to threads when hot-reload paths should be taken. client_api: ModuleAPI, // Host -> Client Interface client_memory: State, + + // Testing + job_group_reload: JobGroup, + job_info_reload: [64]TestJobInfo, + job_reload: [64]Job, + job_group_exit: JobGroup, + job_info_exit: [64]TestJobInfo, + job_exit: [64]Job, } Host_API :: struct { diff --git a/code2/sectr/engine/job_system.odin b/code2/sectr/engine/job_system.odin index e16602f..11fe44a 100644 --- a/code2/sectr/engine/job_system.odin +++ b/code2/sectr/engine/job_system.odin @@ -8,20 +8,20 @@ JobGroup :: struct { counter: u64, } -JobPriority :: enum { +JobPriority :: enum (u32) { Normal = 0, Low, High, } Job :: struct { - next: ^Job, - cb: JobProc, - data: rawptr, - // scratch: ^CArena, - group: ^JobGroup, - ignored: JobIgnoredThreads, - dbg_lbl: string, + next: ^Job, + cb: JobProc, + data: rawptr, + // scratch: ^CArena, + group: ^JobGroup, + ignored: JobIgnoredThreads, + dbg_label: string, } JobList :: struct { @@ -188,18 +188,20 @@ WorkerID :: enum int { @(private) div_ceil :: #force_inline proc(a, b: int) -> int { return (a + b - 1) / b } -make_job_raw :: proc(group: ^JobGroup, data: rawptr, cb: JobProc, ignored_threads: JobIgnoredThreads = {}) -> Job { +make_job_raw :: proc(group: ^JobGroup, data: rawptr, cb: JobProc, ignored_threads: JobIgnoredThreads = {}, dbg_label: string = "") -> Job { assert(group != nil) assert(cb != nil) - return {cb = cb, data = data, group = group} + return {cb = cb, data = data, group = group, ignored = {}, dbg_label = dbg_label} } -job_dispatch :: proc(job: Job, priorty: JobPriority = .Normal) { +job_dispatch_single :: proc(job: ^Job, priority: JobPriority = .Normal) { assert(job.group != nil) - // sync_add(& job.group.atomic_counter, 1) - // if + sync_add(& job.group.counter, 1, .Seq_Cst) - // sync_mutex_lock + sync_mutex_lock(& memory.job_system.job_lists[priority].mutex) + job.next = memory.job_system.job_lists[priority].head + memory.job_system.job_lists[priority].head = job + sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex) } // Note: it's on you to clean up the memory after the jobs if you use a custom allocator. @@ -240,4 +242,3 @@ job_dispatch :: proc(job: Job, priorty: JobPriority = .Normal) { // group_is_finished :: #force_inline proc(group: ^Group) -> bool { // return intrinsics.atomic_load(&group.atomic_counter) <= 0 // } - diff --git a/code2/sectr/pkg_mappings.odin b/code2/sectr/pkg_mappings.odin index b4f78db..430949f 100644 --- a/code2/sectr/pkg_mappings.odin +++ b/code2/sectr/pkg_mappings.odin @@ -32,10 +32,15 @@ import "core:prof/spall" Spall_Buffer :: spall.Buffer import "core:sync" - AtomicMutex :: sync.Atomic_Mutex - barrier_wait :: sync.barrier_wait - sync_store :: sync.atomic_store_explicit - sync_load :: sync.atomic_load_explicit + AtomicMutex :: sync.Atomic_Mutex + barrier_wait :: sync.barrier_wait + sync_store :: sync.atomic_store_explicit + sync_load :: sync.atomic_load_explicit + sync_add :: sync.atomic_add_explicit + sync_sub :: sync.atomic_sub_explicit + sync_mutex_lock :: sync.atomic_mutex_lock + sync_mutex_unlock :: sync.atomic_mutex_unlock + sync_mutex_try_lock :: sync.atomic_mutex_try_lock import threading "core:thread" SysThread :: threading.Thread