initial job queue load test during exit, works with hot-reload.

This commit is contained in:
2025-10-15 01:59:19 -04:00
parent 9f75d080a7
commit a0f51913dc
5 changed files with 102 additions and 36 deletions

View File

@@ -125,6 +125,9 @@ main :: proc()
{
host_memory.job_system.running = true
host_memory.job_system.worker_num = THREAD_JOB_WORKERS
for & list in host_memory.job_system.job_lists {
list = {}
}
// Determine number of physical cores
barrier_init(& host_memory.job_hot_reload_sync, THREAD_JOB_WORKERS + 1)
for id in THREAD_JOB_WORKER_ID_START ..< THREAD_JOB_WORKER_ID_END {
@@ -138,7 +141,6 @@ main :: proc()
host_tick_lane()
}
sync_store(& host_memory.job_system.running, false, .Release)
if thread_memory.id == .Master_Prepper {
thread_join_multiple(.. host_memory.threads[1:THREAD_TICK_LANES + THREAD_JOB_WORKERS])
}
@@ -184,19 +186,26 @@ host_tick_lane :: proc()
sync_client_api()
running: b64 = host_memory.client_api.tick_lane( duration_seconds(delta_ns), delta_ns ) == false
if thread_memory.id == .Master_Prepper { sync_store(& host_memory.tick_running, running, .Release) }
if thread_memory.id == .Master_Prepper {
sync_store(& host_memory.tick_running, running, .Release)
}
// host_memory.client_api.clean_frame()
delta_ns = time_tick_lap_time( & host_tick )
host_tick = time_tick_now()
leader := barrier_wait(& host_memory.lane_sync)
}
leader := barrier_wait(& host_memory.lane_sync)
host_lane_shutdown()
}
host_lane_shutdown :: proc()
{
profile(#procedure)
spall_buffer_destroy( & host_memory.spall_context, & thread_memory.spall_buffer )
if thread_memory.id == .Master_Prepper {
sync_store(& host_memory.job_system.running, false, .Release)
}
leader := barrier_wait(& host_memory.lane_job_sync)
// spall_buffer_destroy( & host_memory.spall_context, & thread_memory.spall_buffer )
}
host_job_worker_entrypoint :: proc(worker_thread: ^SysThread)
@@ -223,6 +232,7 @@ host_job_worker_entrypoint :: proc(worker_thread: ^SysThread)
host_memory.client_api.hot_reload(& host_memory, & thread_memory)
}
}
leader := barrier_wait(& host_memory.lane_job_sync)
}
@export

View File

@@ -34,8 +34,13 @@ then prepare for multi-threaded "laned" tick: thread_wide_startup.
@export
startup :: proc(host_mem: ^ProcessMemory, thread_mem: ^ThreadMemory)
{
memory = host_mem
thread = thread_mem
// Rad Debugger driving me crazy..
for ; memory == nil; {
memory = host_mem
}
for ; thread == nil; {
thread = thread_mem
}
grime_set_profiler_module_context(& memory.spall_context)
grime_set_profiler_thread_buffer(& thread.spall_buffer)
profile(#procedure)
@@ -120,23 +125,36 @@ tick_lane :: proc(host_delta_time_ms: f64, host_delta_ns: Duration) -> (should_c
@thread_local dummy: int = 0
dummy += 1
EXIT_TIME :: 1
// profile_begin("sokol_app: pre_client_tick")
// should_close |= cast(b64) sokol_app.pre_client_frame()
@static timer: f64
if thread.id == .Master_Prepper {
timer += host_delta_time_ms
sync_store(& should_close, timer > 5, .Release)
timer += host_delta_time_ms
sync_store(& should_close, timer > EXIT_TIME, .Release)
// Test dispatching 64 jobs during hot_reload loop (when the above store is uncommented)
for job_id := 1; job_id < 64; job_id += 1 {
memory.job_info_reload[job_id].id = job_id
memory.job_reload[job_id] = make_job_raw(& memory.job_group_reload, & memory.job_info_reload[job_id], test_job, {}, "Job Test (Hot-Reload)")
job_dispatch_single(& memory.job_reload[job_id], .Normal)
}
}
// profile_end()
// profile_begin("Client Tick")
profile_begin("Client Tick")
// @thread_local test_job: TestJobInfo
// for job_id := 1; job_id < 64; job_id += 1 {
// job_dispatch(test_job, & test_job, .Medium, "Job Test")
// }
if thread.id == .Master_Prepper && timer > EXIT_TIME {
// Test dispatching 64 jobs during the last iteration before exiting.
for job_id := 1; job_id < 64; job_id += 1 {
memory.job_info_exit[job_id].id = job_id
memory.job_exit[job_id] = make_job_raw(& memory.job_group_exit, & memory.job_info_exit[job_id], test_job, {}, "Job Test (Exit)")
job_dispatch_single(& memory.job_exit[job_id], .Normal)
}
}
// profile_end()
profile_end()
// profile_begin("sokol_app: post_client_tick")
// profile_end()
@@ -146,11 +164,35 @@ tick_lane :: proc(host_delta_time_ms: f64, host_delta_ns: Duration) -> (should_c
}
@export
jobsys_worker_tick :: proc() {
jobsys_worker_tick :: proc()
{
profile("Worker Tick")
@thread_local dummy: int = 0;
dummy += 1
ORDERED_PRIORITIES :: [len(JobPriority)]JobPriority{.High, .Normal, .Low}
block: for priority in ORDERED_PRIORITIES
{
if memory.job_system.job_lists[priority].head == nil do continue
if sync_mutex_try_lock(& memory.job_system.job_lists[priority].mutex)
{
if job := memory.job_system.job_lists[priority].head; job != nil
{
if int(thread.id) in job.ignored {
sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex)
continue
}
memory.job_system.job_lists[priority].head = job.next
sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex)
assert(job.group != nil)
assert(job.cb != nil)
job.cb(job.data)
sync_sub(& job.group.counter, 1, .Seq_Cst)
break block
}
sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex)
}
}
}
TestJobInfo :: struct {

View File

@@ -42,6 +42,14 @@ ProcessMemory :: struct {
client_api_hot_reloaded: b64, // Used to signal to threads when hot-reload paths should be taken.
client_api: ModuleAPI, // Host -> Client Interface
client_memory: State,
// Testing
job_group_reload: JobGroup,
job_info_reload: [64]TestJobInfo,
job_reload: [64]Job,
job_group_exit: JobGroup,
job_info_exit: [64]TestJobInfo,
job_exit: [64]Job,
}
Host_API :: struct {

View File

@@ -8,20 +8,20 @@ JobGroup :: struct {
counter: u64,
}
JobPriority :: enum {
JobPriority :: enum (u32) {
Normal = 0,
Low,
High,
}
Job :: struct {
next: ^Job,
cb: JobProc,
data: rawptr,
// scratch: ^CArena,
group: ^JobGroup,
ignored: JobIgnoredThreads,
dbg_lbl: string,
next: ^Job,
cb: JobProc,
data: rawptr,
// scratch: ^CArena,
group: ^JobGroup,
ignored: JobIgnoredThreads,
dbg_label: string,
}
JobList :: struct {
@@ -188,18 +188,20 @@ WorkerID :: enum int {
@(private) div_ceil :: #force_inline proc(a, b: int) -> int { return (a + b - 1) / b }
make_job_raw :: proc(group: ^JobGroup, data: rawptr, cb: JobProc, ignored_threads: JobIgnoredThreads = {}) -> Job {
make_job_raw :: proc(group: ^JobGroup, data: rawptr, cb: JobProc, ignored_threads: JobIgnoredThreads = {}, dbg_label: string = "") -> Job {
assert(group != nil)
assert(cb != nil)
return {cb = cb, data = data, group = group}
return {cb = cb, data = data, group = group, ignored = {}, dbg_label = dbg_label}
}
job_dispatch :: proc(job: Job, priorty: JobPriority = .Normal) {
job_dispatch_single :: proc(job: ^Job, priority: JobPriority = .Normal) {
assert(job.group != nil)
// sync_add(& job.group.atomic_counter, 1)
// if
sync_add(& job.group.counter, 1, .Seq_Cst)
// sync_mutex_lock
sync_mutex_lock(& memory.job_system.job_lists[priority].mutex)
job.next = memory.job_system.job_lists[priority].head
memory.job_system.job_lists[priority].head = job
sync_mutex_unlock(& memory.job_system.job_lists[priority].mutex)
}
// Note: it's on you to clean up the memory after the jobs if you use a custom allocator.
@@ -240,4 +242,3 @@ job_dispatch :: proc(job: Job, priorty: JobPriority = .Normal) {
// group_is_finished :: #force_inline proc(group: ^Group) -> bool {
// return intrinsics.atomic_load(&group.atomic_counter) <= 0
// }

View File

@@ -32,10 +32,15 @@ import "core:prof/spall"
Spall_Buffer :: spall.Buffer
import "core:sync"
AtomicMutex :: sync.Atomic_Mutex
barrier_wait :: sync.barrier_wait
sync_store :: sync.atomic_store_explicit
sync_load :: sync.atomic_load_explicit
AtomicMutex :: sync.Atomic_Mutex
barrier_wait :: sync.barrier_wait
sync_store :: sync.atomic_store_explicit
sync_load :: sync.atomic_load_explicit
sync_add :: sync.atomic_add_explicit
sync_sub :: sync.atomic_sub_explicit
sync_mutex_lock :: sync.atomic_mutex_lock
sync_mutex_unlock :: sync.atomic_mutex_unlock
sync_mutex_try_lock :: sync.atomic_mutex_try_lock
import threading "core:thread"
SysThread :: threading.Thread