WIP: tick lanes were working, currently bootstrapping the job system.

This commit is contained in:
2025-10-14 00:04:30 -04:00
parent 0d904fba7c
commit c106d3bc96
12 changed files with 408 additions and 162 deletions

View File

@@ -21,13 +21,14 @@ load_client_api :: proc(version_id: int) -> (loaded_module: Client_API) {
// Get the live dll loaded up
live_file := Path_Sectr_Live_Module
file_copy_sync( Path_Sectr_Module, live_file, allocator = context.temp_allocator )
did_load: bool; lib, did_load = os_lib_load( live_file )
did_load: bool; lib, did_load = os_lib_load( Path_Sectr_Module )
if ! did_load do panic( "Failed to load the sectr module.")
startup = cast( type_of( host_memory.client_api.startup)) os_lib_get_proc(lib, "startup")
tick_lane_startup = cast( type_of( host_memory.client_api.tick_lane_startup)) os_lib_get_proc(lib, "tick_lane_startup")
hot_reload = cast( type_of( host_memory.client_api.hot_reload)) os_lib_get_proc(lib, "hot_reload")
tick_lane = cast( type_of( host_memory.client_api.tick_lane)) os_lib_get_proc(lib, "tick_lane")
clean_frame = cast( type_of( host_memory.client_api.clean_frame)) os_lib_get_proc(lib, "clean_frame")
startup = cast( type_of( host_memory.client_api.startup)) os_lib_get_proc(lib, "startup")
tick_lane_startup = cast( type_of( host_memory.client_api.tick_lane_startup)) os_lib_get_proc(lib, "tick_lane_startup")
hot_reload = cast( type_of( host_memory.client_api.hot_reload)) os_lib_get_proc(lib, "hot_reload")
tick_lane = cast( type_of( host_memory.client_api.tick_lane)) os_lib_get_proc(lib, "tick_lane")
clean_frame = cast( type_of( host_memory.client_api.clean_frame)) os_lib_get_proc(lib, "clean_frame")
jobsys_worker_tick = cast( type_of( host_memory.client_api.jobsys_worker_tick)) os_lib_get_proc(lib, "jobsys_worker_tick")
if startup == nil do panic("Failed to load sectr.startup symbol" )
if tick_lane_startup == nil do panic("Failed to load sectr.tick_lane_startup symbol" )
if hot_reload == nil do panic("Failed to load sectr.hot_reload symbol" )
@@ -45,19 +46,13 @@ main :: proc()
arena_init(& host_memory.host_scratch, host_memory.host_scratch_buf[:])
context.allocator = arena_allocator(& host_memory.host_persist)
context.temp_allocator = arena_allocator(& host_memory.host_scratch)
when SHOULD_SETUP_PROFILERS
{
// Setup profilers
buffer_backing := make([]u8, SPALL_BUFFER_DEFAULT_SIZE * 4)
host_memory.spall_profiler.ctx = spall_context_create(Path_Sectr_Spall_Record)
host_memory.spall_profiler.buffer = spall_buffer_create(buffer_backing)
}
// Setu the "Master Prepper" thread
thread_memory.id = .Master_Prepper
thread_id := thread_current_id()
// Setup the "Master Prepper" thread
{
thread_memory.id = .Master_Prepper
thread_id := thread_current_id()
using thread_memory
system_ctx = & host_memory.threads[WorkerID.Master_Prepper]
host_memory.threads[WorkerID.Master_Prepper] = new(SysThread)
system_ctx = host_memory.threads[WorkerID.Master_Prepper]
system_ctx.creation_allocator = {}
system_ctx.procedure = master_prepper_proc
when ODIN_OS == .Windows {
@@ -65,15 +60,21 @@ main :: proc()
// system_ctx.win32_thread_id = w32_get_current_thread_id()
system_ctx.id = cast(int) system_ctx.win32_thread_id
}
free_all(context.temp_allocator)
}
when SHOULD_SETUP_PROFILERS
{
// Setup main profiler
host_memory.spall_context = spall_context_create(Path_Sectr_Spall_Record)
grime_set_profiler_module_context(& host_memory.spall_context)
thread_memory.spall_buffer = spall_buffer_create(thread_memory.spall_buffer_backing[:], cast(u32) thread_memory.system_ctx.id)
}
// Setup the logger
path_logger_finalized: string
{
fmt_backing := make([]byte, 32 * Kilo)
defer free_all(context.temp_allocator)
profile("Setup the logger")
fmt_backing := make([]byte, 32 * Kilo, allocator = context.temp_allocator);
// Generating the logger's name, it will be used when the app is shutting down.
path_logger_finalized : string
{
startup_time := time_now()
year, month, day := time_date( startup_time)
@@ -82,18 +83,19 @@ main :: proc()
if ! os_is_directory( Path_Logs ) {
os_make_directory( Path_Logs )
}
timestamp := str_pfmt_buffer( fmt_backing, "%04d-%02d-%02d_%02d-%02d-%02d", year, month, day, hour, min, sec)
path_logger_finalized = str_pfmt_buffer( fmt_backing, "%s/sectr_%v.log", Path_Logs, timestamp)
timestamp := str_pfmt_buffer( fmt_backing, "%04d-%02d-%02d_%02d-%02d-%02d", year, month, day, hour, min, sec)
host_memory.path_logger_finalized = str_pfmt_buffer( fmt_backing, "%s/sectr_%v.log", Path_Logs, timestamp)
}
logger_init( & host_memory.logger, "Sectr Host", str_pfmt_buffer( fmt_backing, "%s/sectr.log", Path_Logs ) )
context.logger = to_odin_logger( & host_memory.logger )
{
// Log System Context
builder := strbuilder_from_bytes( fmt_backing )
builder := strbuilder_from_bytes( fmt_backing )
str_pfmt_builder( & builder, "Core Count: %v, ", os_core_count() )
str_pfmt_builder( & builder, "Page Size: %v", os_page_size() )
log_print( to_str(builder) )
}
free_all(context.temp_allocator)
}
context.logger = to_odin_logger( & host_memory.logger )
// Load the Enviornment API for the first-time
@@ -104,47 +106,114 @@ main :: proc()
// Client API Startup
host_memory.client_api.startup(& host_memory, & thread_memory)
// Start the tick lanes
/*thread_wide_startup() :: proc()*/ {
{
profile("thread_wide_startup")
assert(thread_memory.id == .Master_Prepper)
host_memory.tick_lanes = THREAD_TICK_LANES
barrier_init(& host_memory.lane_sync, THREAD_TICK_LANES)
if THREAD_TICK_LANES > 1 {
launch_tick_lane_thread(.Atomic_Accountant)
barrier_init(& host_memory.client_api_sync_lock, THREAD_TICK_LANES)
for id in 1 ..= (THREAD_TICK_LANES - 1) {
launch_tick_lane_thread(cast(WorkerID) id)
}
}
host_tick_lane_startup(thread_memory.system_ctx)
grime_set_profiler_module_context(& host_memory.spall_context)
grime_set_profiler_thread_buffer(& thread_memory.spall_buffer)
// Job System Setup
{
host_memory.job_system.worker_num = THREAD_JOB_WORKERS
// Determine number of physical cores
barrier_init(& host_memory.job_hot_reload_sync, THREAD_JOB_WORKERS + 1)
for id in THREAD_JOB_WORKER_ID_START ..< THREAD_JOB_WORKER_ID_END {
worker_thread := thread_create(host_job_worker_entrypoint, .Normal)
worker_thread.user_index = int(id)
host_memory.threads[worker_thread.user_index] = worker_thread
thread_start(worker_thread)
}
}
host_tick_lane()
}
// We have all threads resolve here (non-laned threads will need to have end-run signal broadcasted)
if thread_memory.id == .Master_Prepper {
thread_join_multiple(.. host_memory.threads[1:])
}
unload_client_api( & host_memory.client_api )
// End profiling
spall_context_destroy( & host_memory.spall_context )
log_print("Succesfuly closed")
file_close( host_memory.logger.file )
file_rename( str_pfmt_tmp( "%s/sectr.log", Path_Logs), host_memory.path_logger_finalized )
}
launch_tick_lane_thread :: proc(id : WorkerID) {
assert_contextless(thread_memory.id == .Master_Prepper)
// TODO(Ed): We need to make our own version of this that doesn't allocate memory.
lane_thread := thread_create(host_tick_lane_startup, .High)
lane_thread := thread_create(host_tick_lane_entrypoint, .High)
lane_thread.user_index = int(id)
host_memory.threads[lane_thread.user_index] = lane_thread
thread_start(lane_thread)
}
host_tick_lane_startup :: proc(lane_thread: ^SysThread) {
host_tick_lane_entrypoint :: proc(lane_thread: ^SysThread) {
thread_memory.system_ctx = lane_thread
thread_memory.id = cast(WorkerID) lane_thread.user_index
host_memory.client_api.tick_lane_startup(& thread_memory)
when SHOULD_SETUP_PROFILERS
{
thread_memory.spall_buffer = spall_buffer_create(thread_memory.spall_buffer_backing[:], cast(u32) thread_memory.system_ctx.id)
host_memory.client_api.tick_lane_startup(& thread_memory)
grime_set_profiler_thread_buffer(& thread_memory.spall_buffer)
}
host_tick_lane()
}
host_tick_lane :: proc()
{
profile(#procedure)
delta_ns: Duration
host_tick := time_tick_now()
running : b64 = true
for ; running ;
{
profile("Host Tick")
leader := barrier_wait(& host_memory.client_api_sync_lock)
sync_client_api()
running = host_memory.client_api.tick_lane( duration_seconds(delta_ns), delta_ns )
running = host_memory.client_api.tick_lane( duration_seconds(delta_ns), delta_ns ) == false
// host_memory.client_api.clean_frame()
delta_ns = time_tick_lap_time( & host_tick )
host_tick = time_tick_now()
delta_ns = time_tick_lap_time( & host_tick )
host_tick = time_tick_now()
}
leader := barrier_wait(& host_memory.lane_sync)
host_lane_shutdown()
}
host_lane_shutdown :: proc()
{
profile(#procedure)
spall_buffer_destroy( & host_memory.spall_context, & thread_memory.spall_buffer )
}
host_job_worker_entrypoint :: proc(worker_thread: ^SysThread)
{
thread_memory.system_ctx = worker_thread
thread_memory.id = cast(WorkerID) worker_thread.user_index
when SHOULD_SETUP_PROFILERS
{
thread_memory.spall_buffer = spall_buffer_create(thread_memory.spall_buffer_backing[:], cast(u32) thread_memory.system_ctx.id)
host_memory.client_api.tick_lane_startup(& thread_memory)
grime_set_profiler_thread_buffer(& thread_memory.spall_buffer)
}
for ; sync_load(& host_memory.job_system.running, .Relaxed);
{
profile("Host Job Tick")
host_memory.client_api.jobsys_worker_tick()
// TODO(Ed): We cannot allow job threads to enter the reload barrier until they have drained their enqueued jobs.
if sync_load(& host_memory.client_api_hot_reloaded, .Acquire) {
leader :=barrier_wait(& host_memory.job_hot_reload_sync)
break
}
}
}
@@ -152,25 +221,32 @@ host_tick_lane :: proc()
sync_client_api :: proc()
{
profile(#procedure)
// We don't want any lanes to be in client callstack during a hot-reload
leader := barrier_wait(& host_memory.lane_sync)
if thread_memory.id == .Master_Prepper
{
if thread_memory.id == .Master_Prepper
write_time, result := file_last_write_time_by_name( Path_Sectr_Module );
if result == OS_ERROR_NONE && host_memory.client_api.write_time != write_time
{
profile("Master_Prepper: Reloading client module")
write_time, result := file_last_write_time_by_name( Path_Sectr_Module );
if result == OS_ERROR_NONE && host_memory.client_api.write_time != write_time
{
sync_store(& host_memory.client_api_hot_reloaded, true, .Release)
version_id := host_memory.client_api.lib_version + 1
unload_client_api( & host_memory.client_api )
// Wait for pdb to unlock (linker may still be writting)
for ; file_is_locked( Path_Sectr_Debug_Symbols ) && file_is_locked( Path_Sectr_Live_Module ); {}
thread_sleep( Millisecond * 100 )
host_memory.client_api = load_client_api( version_id )
verify( host_memory.client_api.lib_version != 0, "Failed to hot-reload the sectr module" )
}
sync_store(& host_memory.client_api_hot_reloaded, true, .Release)
// We nee to wait for the job queue to drain.
barrier_wait(& host_memory.job_hot_reload_sync)
version_id := host_memory.client_api.lib_version + 1
unload_client_api( & host_memory.client_api )
// Wait for pdb to unlock (linker may still be writting)
for ; file_is_locked( Path_Sectr_Debug_Symbols ) && file_is_locked( Path_Sectr_Live_Module ); {}
thread_sleep( Millisecond * 100 )
host_memory.client_api = load_client_api( version_id )
verify( host_memory.client_api.lib_version != 0, "Failed to hot-reload the sectr module" )
// Don't let jobs continue until after we clear loading.
barrier_wait(& host_memory.job_hot_reload_sync)
}
}
leader := barrier_wait(& host_memory.client_api_sync_lock)
leader = barrier_wait(& host_memory.lane_sync)
// Lanes are safe to continue.
if sync_load(& host_memory.client_api_hot_reloaded, .Acquire)
{
host_memory.client_api.hot_reload(& host_memory, & thread_memory)

View File

@@ -11,6 +11,7 @@ import "core:dynlib"
import "core:fmt"
str_pfmt_builder :: fmt.sbprintf
str_pfmt_buffer :: fmt.bprintf
str_pfmt_tmp :: fmt.tprintf
import "core:log"
LoggerLevel :: log.Level
@@ -24,8 +25,11 @@ import "core:os"
OS_ERROR_NONE :: os.ERROR_NONE
OS_Error :: os.Error
FileTime :: os.File_Time
file_close :: os.close
file_last_write_time_by_name :: os.last_write_time_by_name
file_remove :: os.remove
file_rename :: os.rename
file_status :: os.stat
os_is_directory :: os.is_dir
os_make_directory :: os.make_directory
os_core_count :: os.processor_core_count
@@ -33,9 +37,10 @@ import "core:os"
process_exit :: os.exit
import "core:prof/spall"
SPALL_BUFFER_DEFAULT_SIZE :: spall.BUFFER_DEFAULT_SIZE
spall_context_create :: spall.context_create
spall_context_destroy :: spall.context_destroy
spall_buffer_create :: spall.buffer_create
spall_buffer_destroy :: spall.buffer_destroy
import "core:strings"
strbuilder_from_bytes :: strings.builder_from_bytes
@@ -46,6 +51,7 @@ import "core:sync"
barrier_init :: sync.barrier_init
barrier_wait :: sync.barrier_wait
thread_current_id :: sync.current_thread_id
// Cache coherent loads and stores (synchronizes relevant cache blocks/lines)
sync_load :: sync.atomic_load_explicit
sync_store :: sync.atomic_store_explicit
@@ -62,35 +68,72 @@ import "core:time"
time_tick_lap_time :: time.tick_lap_time
import "core:thread"
SysThread :: thread.Thread
thread_create :: thread.create
thread_start :: thread.start
SysThread :: thread.Thread
thread_create :: thread.create
thread_start :: thread.start
thread_destroy :: thread.destroy
thread_join_multiple :: thread.join_multiple
thread_terminate :: thread.terminate
import grime "codebase:grime"
DISABLE_GRIME_PROFILING :: grime.DISABLE_PROFILING
file_copy_sync :: grime.file_copy_sync
grime_set_profiler_module_context :: grime.set_profiler_module_context
grime_set_profiler_thread_buffer :: grime.set_profiler_thread_buffer
file_is_locked :: grime.file_is_locked
logger_init :: grime.logger_init
to_odin_logger :: grime.to_odin_logger
// Need to have it with un-wrapped allocator
// file_copy_sync :: grime.file_copy_sync
file_copy_sync :: proc( path_src, path_dst: string, allocator := context.allocator ) -> b32
{
file_size : i64
{
path_info, result := file_status( path_src, allocator )
if result != OS_ERROR_NONE {
log_print_fmt("Could not get file info: %v", result, LoggerLevel.Error )
return false
}
file_size = path_info.size
}
src_content, result := os.read_entire_file_from_filename( path_src, allocator )
if ! result {
log_print_fmt( "Failed to read file to copy: %v", path_src, LoggerLevel.Error )
debug_trap()
return false
}
result = os.write_entire_file( path_dst, src_content, false )
if ! result {
log_print_fmt( "Failed to copy file: %v", path_dst, LoggerLevel.Error )
debug_trap()
return false
}
return true
}
import "codebase:sectr"
DISABLE_HOST_PROFILING :: sectr.DISABLE_HOST_PROFILING
DISABLE_CLIENT_PROFILING :: sectr.DISABLE_CLIENT_PROFILING
Path_Logs :: sectr.Path_Logs
Path_Sectr_Debug_Symbols :: sectr.Path_Debug_Symbols
Path_Sectr_Live_Module :: sectr.Path_Live_Module
Path_Sectr_Module :: sectr.Path_Module
Path_Sectr_Spall_Record :: sectr.Path_Spall_Record
MAX_THREADS :: sectr.MAX_THREADS
THREAD_TICK_LANES :: sectr.THREAD_TICK_LANES
Path_Logs :: sectr.Path_Logs
Path_Sectr_Debug_Symbols :: sectr.Path_Debug_Symbols
Path_Sectr_Live_Module :: sectr.Path_Live_Module
Path_Sectr_Module :: sectr.Path_Module
Path_Sectr_Spall_Record :: sectr.Path_Spall_Record
MAX_THREADS :: sectr.MAX_THREADS
THREAD_TICK_LANES :: sectr.THREAD_TICK_LANES
THREAD_JOB_WORKERS :: sectr.THREAD_JOB_WORKERS
THREAD_JOB_WORKER_ID_START :: sectr.THREAD_JOB_WORKER_ID_START
THREAD_JOB_WORKER_ID_END :: sectr.THREAD_JOB_WORKER_ID_END
Client_API :: sectr.ModuleAPI
ProcessMemory :: sectr.ProcessMemory
ThreadMemory :: sectr.ThreadMemory
WorkerID :: sectr.WorkerID
SpallProfiler :: sectr.SpallProfiler
Client_API :: sectr.ModuleAPI
ProcessMemory :: sectr.ProcessMemory
ThreadMemory :: sectr.ThreadMemory
WorkerID :: sectr.WorkerID
ensure :: #force_inline proc( condition : b32, msg : string, location := #caller_location ) {
if condition do return
@@ -122,15 +165,24 @@ log_print_fmt :: proc( fmt : string, args : ..any, level := LoggerLevel.Info, l
log.logf( level, fmt, ..args, location = loc )
}
@(deferred_none = profile_end, disabled = DISABLE_HOST_PROFILING) profile :: #force_inline proc "contextless" ( name : string, loc := #caller_location ) { spall._buffer_begin( & host_memory.spall_profiler.ctx, & host_memory.spall_profiler.buffer, name, "", loc ) }
@( disabled = DISABLE_HOST_PROFILING) profile_begin :: #force_inline proc "contextless" ( name : string, loc := #caller_location ) { spall._buffer_begin( & host_memory.spall_profiler.ctx, & host_memory.spall_profiler.buffer, name, "", loc ) }
@( disabled = DISABLE_HOST_PROFILING) profile_end :: #force_inline proc "contextless" () { spall._buffer_end ( & host_memory.spall_profiler.ctx, & host_memory.spall_profiler.buffer) }
SHOULD_SETUP_PROFILERS :: \
DISABLE_GRIME_PROFILING == false ||
DISABLE_CLIENT_PROFILING == false ||
DISABLE_HOST_PROFILING == false
@(deferred_none = profile_end, disabled = DISABLE_HOST_PROFILING)
profile :: #force_inline proc "contextless" ( name : string, loc := #caller_location ) {
spall._buffer_begin( & host_memory.spall_context, & thread_memory.spall_buffer, name, "", loc )
}
@(disabled = DISABLE_HOST_PROFILING)
profile_begin :: #force_inline proc "contextless" ( name : string, loc := #caller_location ) {
spall._buffer_begin( & host_memory.spall_context, & thread_memory.spall_buffer, name, "", loc )
}
@(disabled = DISABLE_HOST_PROFILING)
profile_end :: #force_inline proc "contextless" () {
spall._buffer_end( & host_memory.spall_context, & thread_memory.spall_buffer)
}
Kilo :: 1024
Mega :: Kilo * 1024
Giga :: Mega * 1024