sketch out new async path for ctrl memory streaming

This commit is contained in:
Ryan Fleury
2025-09-23 10:58:17 -07:00
parent 2ce581fa19
commit 9787c698e6
12 changed files with 263 additions and 56 deletions
+2 -2
View File
@@ -137,10 +137,10 @@ ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType
}
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void
ac_tick(void)
ac_async_tick(void)
{
Temp scratch = scratch_begin(0, 0);
+2 -2
View File
@@ -99,8 +99,8 @@ internal void ac_init(void);
internal void *ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType *create, AC_DestroyFunctionType *destroy, U64 slots_count);
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void ac_tick(void);
internal void ac_async_tick(void);
#endif // ARTIFACT_CACHE_H
+7 -4
View File
@@ -198,16 +198,19 @@ async_thread_entry_point(void *params)
async_loop_again = 0;
}
#if defined(ARTIFACT_CACHE_H)
ac_tick();
ac_async_tick();
#endif
#if defined(CONTENT_H)
c_tick();
c_async_tick();
#endif
#if defined(FILE_STREAM_H)
fs_tick();
fs_async_tick();
#endif
#if defined(CTRL_CORE_H)
ctrl_async_tick();
#endif
#if defined(TEXTURE_CACHE_H)
tex_tick();
tex_async_tick();
#endif
cond_var_broadcast(async_tick_stop_cond_var);
}
+6 -13
View File
@@ -13,13 +13,6 @@
# include "third_party/xxHash/xxhash.h"
#endif
internal U64
c_little_hash_from_data(String8 data)
{
U64 result = XXH3_64bits(data.str, data.size);
return result;
}
internal U128
c_hash_from_data(String8 data)
{
@@ -181,7 +174,7 @@ internal U128
c_submit_data(C_Key key, Arena **data_arena, String8 data)
{
//- rjf: unpack key
U64 key_hash = c_little_hash_from_data(str8_struct(&key));
U64 key_hash = u64_hash_from_str8(str8_struct(&key));
U64 key_slot_idx = key_hash%c_shared->key_slots_count;
U64 key_stripe_idx = key_slot_idx%c_shared->key_stripes_count;
C_KeySlot *key_slot = &c_shared->key_slots[key_slot_idx];
@@ -288,7 +281,7 @@ c_submit_data(C_Key key, Arena **data_arena, String8 data)
// rjf: key is new -> add this key to the associated root
if(key_is_new)
{
U64 root_hash = c_little_hash_from_data(str8_struct(&key.root));
U64 root_hash = u64_hash_from_str8(str8_struct(&key.root));
U64 root_slot_idx = root_hash%c_shared->root_slots_count;
U64 root_stripe_idx = root_slot_idx%c_shared->root_stripes_count;
C_RootSlot *root_slot = &c_shared->root_slots[root_slot_idx];
@@ -348,7 +341,7 @@ c_submit_data(C_Key key, Arena **data_arena, String8 data)
internal void
c_close_key(C_Key key)
{
U64 key_hash = c_little_hash_from_data(str8_struct(&key));
U64 key_hash = u64_hash_from_str8(str8_struct(&key));
U64 key_slot_idx = key_hash%c_shared->key_slots_count;
U64 key_stripe_idx = key_slot_idx%c_shared->key_stripes_count;
C_KeySlot *key_slot = &c_shared->key_slots[key_slot_idx];
@@ -438,7 +431,7 @@ internal U128
c_hash_from_key(C_Key key, U64 rewind_count)
{
U128 result = {0};
U64 key_hash = c_little_hash_from_data(str8_struct(&key));
U64 key_hash = u64_hash_from_str8(str8_struct(&key));
U64 key_slot_idx = key_hash%c_shared->key_slots_count;
U64 key_stripe_idx = key_slot_idx%c_shared->key_stripes_count;
C_KeySlot *key_slot = &c_shared->key_slots[key_slot_idx];
@@ -483,10 +476,10 @@ c_data_from_hash(Access *access, U128 hash)
}
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void
c_tick(void)
c_async_tick(void)
{
ProfBeginFunction();
+2 -3
View File
@@ -197,7 +197,6 @@ global C_Shared *c_shared = 0;
////////////////////////////////
//~ rjf: Basic Helpers
internal U64 c_little_hash_from_data(String8 data);
internal U128 c_hash_from_data(String8 data);
internal C_ID c_id_make(U64 u64_0, U64 u64_1);
internal B32 c_id_match(C_ID a, C_ID b);
@@ -238,8 +237,8 @@ internal U128 c_hash_from_key(C_Key key, U64 rewind_count);
internal String8 c_data_from_hash(Access *access, U128 hash);
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void c_tick(void);
internal void c_async_tick(void);
#endif // CONTENT_H
+207 -23
View File
@@ -1594,6 +1594,8 @@ ctrl_init(void)
ctrl_state->exception_code_filters[k/64] |= 1ull<<(k%64);
}
}
ctrl_state->mem_req_mutex = mutex_alloc();
ctrl_state->mem_req_arena = arena_alloc();
ctrl_state->u2ms_ring_size = KB(64);
ctrl_state->u2ms_ring_base = push_array(arena, U8, ctrl_state->u2ms_ring_size);
ctrl_state->u2ms_ring_mutex = mutex_alloc();
@@ -1632,36 +1634,28 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
CTRL_ProcessMemoryCacheSlot *process_slot = &cache->slots[process_slot_idx];
CTRL_ProcessMemoryCacheStripe *process_stripe = &cache->stripes[process_stripe_idx];
//- rjf: get the hash store root for this process; construct process node if it
//- rjf: get the content root for this process; construct process node if it
// doesn't exist
C_Root root = {0};
{
B32 node_found = 0;
MutexScopeR(process_stripe->rw_mutex)
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
B32 node_found = 0;
RWMutexScope(process_stripe->rw_mutex, write_mode)
{
if(ctrl_handle_match(n->handle, process))
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
node_found = 1;
root = n->root;
break;
if(ctrl_handle_match(n->handle, process))
{
node_found = 1;
root = n->root;
break;
}
}
}
}
if(!node_found) MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->handle, process))
{
node_found = 1;
root = n->root;
break;
}
}
if(!node_found)
if(write_mode && !node_found)
{
node_found = 1;
Arena *node_arena = arena_alloc();
CTRL_ProcessMemoryCacheNode *node = push_array(node_arena, CTRL_ProcessMemoryCacheNode, 1);
DLLPushBack(process_slot->first, process_slot->last, node);
@@ -1672,6 +1666,10 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
node->range_hash_slots = push_array(node_arena, CTRL_ProcessMemoryRangeHashSlot, node->range_hash_slots_count);
root = node->root;
}
if(node_found)
{
break;
}
}
}
@@ -1685,7 +1683,7 @@ ctrl_key_from_process_vaddr_range(CTRL_Handle process, Rng1U64 vaddr_range, B32
id.u128[0].u64[0] |= (1ull << 63);
}
}
U64 range_hash = c_little_hash_from_data(str8_struct(&id));
U64 range_hash = u64_hash_from_str8(str8_struct(&id));
//- rjf: form full key
C_Key key = c_key_make(root, id);
@@ -6932,7 +6930,7 @@ ASYNC_WORK_DEF(ctrl_mem_stream_work)
CTRL_ProcessMemoryCacheStripe *process_stripe = &cache->stripes[process_stripe_idx];
//- rjf: unpack address range hash cache key
U64 range_hash = c_little_hash_from_data(str8_struct(&key.id));
U64 range_hash = u64_hash_from_str8(str8_struct(&key.id));
//- rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
@@ -7461,3 +7459,189 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work)
scratch_end(scratch);
return 0;
}
////////////////////////////////
//~ rjf: Asynchronous Tick
internal void
ctrl_async_tick(void)
{
Temp scratch = scratch_begin(0, 0);
//- rjf: get all memory requests
U64 mem_reqs_count = 0;
CTRL_MemRequest *mem_reqs = 0;
MutexScope(ctrl_state->mem_req_mutex)
{
mem_reqs_count = ctrl_state->mem_req_count;
mem_reqs = push_array(scratch.arena, CTRL_MemRequest, mem_reqs_count);
U64 idx = 0;
for EachNode(n, CTRL_MemRequestNode, ctrl_state->first_mem_req)
{
MemoryCopyStruct(&mem_reqs[idx], &n->v);
idx += 1;
}
}
//- rjf: do all memory requests
{
CTRL_ProcessMemoryCache *cache = &ctrl_state->process_memory_cache;
U64 mem_req_take_counter = 0;
U64 *mem_req_take_counter_ptr = &mem_req_take_counter;
lane_sync_u64(&mem_req_take_counter_ptr, 0);
for(;;)
{
// rjf: take next task
U64 mem_req_num = ins_atomic_u64_inc_eval(mem_req_take_counter_ptr);
U64 mem_req_idx = (mem_req_num-1);
if(mem_reqs_count <= mem_req_idx)
{
break;
}
// rjf: unpack request
CTRL_MemRequest *req = &mem_reqs[mem_req_idx];
C_Key key = req->key;
CTRL_Handle process = req->process;
Rng1U64 vaddr_range = req->vaddr_range;
B32 zero_terminated = req->zero_terminated;
// rjf: unpack process key
U64 process_hash = ctrl_hash_from_handle(process);
U64 process_slot_idx = process_hash%cache->slots_count;
U64 process_stripe_idx = process_slot_idx%cache->stripes_count;
CTRL_ProcessMemoryCacheSlot *process_slot = &cache->slots[process_slot_idx];
CTRL_ProcessMemoryCacheStripe *process_stripe = &cache->stripes[process_stripe_idx];
// rjf: unpack little hash of range key
U64 range_hash = u64_hash_from_str8(str8_struct(&key.id));
// rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
{
vaddr_range_clamped.max = Max(vaddr_range_clamped.max, vaddr_range_clamped.min);
U64 max_size_cap = Min(max_U64-vaddr_range_clamped.min, GB(1));
vaddr_range_clamped.max = Min(vaddr_range_clamped.max, vaddr_range_clamped.min+max_size_cap);
}
// rjf: read
U64 range_size = 0;
Arena *range_arena = 0;
void *range_base = 0;
U64 zero_terminated_size = 0;
U64 pre_read_mem_gen = ctrl_mem_gen();
B32 pre_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
{
range_size = dim_1u64(vaddr_range_clamped);
U64 page_size = os_get_system_info()->page_size;
U64 arena_size = AlignPow2(range_size + ARENA_HEADER_SIZE, page_size);
range_arena = arena_alloc(.reserve_size = range_size+ARENA_HEADER_SIZE, .commit_size = range_size+ARENA_HEADER_SIZE);
if(range_arena == 0)
{
range_size = 0;
}
else
{
range_base = push_array_no_zero(range_arena, U8, range_size);
U64 bytes_read = 0;
U64 retry_count = 0;
U64 retry_limit = range_size > page_size ? 64 : 0;
for(Rng1U64 vaddr_range_clamped_retry = vaddr_range_clamped;
retry_count <= retry_limit;
retry_count += 1)
{
bytes_read = dmn_process_read(process.dmn_handle, vaddr_range_clamped_retry, range_base);
if(bytes_read == 0 && vaddr_range_clamped_retry.max > vaddr_range_clamped_retry.min)
{
U64 diff = (vaddr_range_clamped_retry.max-vaddr_range_clamped_retry.min)/2;
vaddr_range_clamped_retry.max -= diff;
vaddr_range_clamped_retry.max = AlignDownPow2(vaddr_range_clamped_retry.max, page_size);
if(diff == 0)
{
break;
}
}
else
{
break;
}
}
if(bytes_read == 0)
{
arena_release(range_arena);
range_base = 0;
range_size = 0;
range_arena = 0;
}
else if(bytes_read < range_size)
{
MemoryZero((U8 *)range_base + bytes_read, range_size-bytes_read);
}
zero_terminated_size = range_size;
if(zero_terminated)
{
for(U64 idx = 0; idx < bytes_read; idx += 1)
{
if(((U8 *)range_base)[idx] == 0)
{
zero_terminated_size = idx;
break;
}
}
}
}
}
U64 post_read_mem_gen = ctrl_mem_gen();
B32 post_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
// rjf: read successful -> submit to hash store
U128 hash = {0};
if(range_base != 0 && pre_read_mem_gen == post_read_mem_gen)
{
hash = c_submit_data(key, &range_arena, str8((U8*)range_base, zero_terminated_size));
}
else if(range_arena != 0)
{
arena_release(range_arena);
}
// rjf: commit new info to cache
MutexScopeW(process_stripe->rw_mutex)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
{
if(ctrl_handle_match(n->handle, process))
{
U64 range_slot_idx = range_hash%n->range_hash_slots_count;
CTRL_ProcessMemoryRangeHashSlot *range_slot = &n->range_hash_slots[range_slot_idx];
for(CTRL_ProcessMemoryRangeHashNode *range_n = range_slot->first; range_n != 0; range_n = range_n->next)
{
if(c_id_match(range_n->id, key.id))
{
if(pre_read_mem_gen == post_read_mem_gen)
{
range_n->mem_gen = post_read_mem_gen;
}
range_n->working_count -= 1;
goto commit__break_all;
}
}
}
}
commit__break_all:;
}
// rjf: broadcast changes
cond_var_broadcast(process_stripe->cv);
if(!u128_match(u128_zero(), hash))
{
if(ctrl_state->wakeup_hook != 0)
{
ctrl_state->wakeup_hook();
}
}
}
}
scratch_end(scratch);
}
+28
View File
@@ -828,6 +828,22 @@ typedef CTRL_WAKEUP_FUNCTION_DEF(CTRL_WakeupFunctionType);
////////////////////////////////
//~ rjf: Main State Types
typedef struct CTRL_MemRequest CTRL_MemRequest;
struct CTRL_MemRequest
{
C_Key key;
CTRL_Handle process;
Rng1U64 vaddr_range;
B32 zero_terminated;
};
typedef struct CTRL_MemRequestNode CTRL_MemRequestNode;
struct CTRL_MemRequestNode
{
CTRL_MemRequestNode *next;
CTRL_MemRequest v;
};
typedef struct CTRL_State CTRL_State;
struct CTRL_State
{
@@ -891,6 +907,13 @@ struct CTRL_State
String8List msg_user_bp_touched_files;
String8List msg_user_bp_touched_symbols;
// rjf: memory requests
Mutex mem_req_mutex;
Arena *mem_req_arena;
CTRL_MemRequestNode *first_mem_req;
CTRL_MemRequestNode *last_mem_req;
U64 mem_req_count;
// rjf: user -> memstream ring buffer
U64 u2ms_ring_size;
U8 *u2ms_ring_base;
@@ -1227,4 +1250,9 @@ ASYNC_WORK_DEF(ctrl_call_stack_build_work);
ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work);
////////////////////////////////
//~ rjf: Asynchronous Tick
internal void ctrl_async_tick(void);
#endif // CTRL_CORE_H
+2 -2
View File
@@ -241,10 +241,10 @@ fs_properties_from_path(String8 path)
}
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void
fs_tick(void)
fs_async_tick(void)
{
ProfBeginFunction();
Temp scratch = scratch_begin(0, 0);
+2 -2
View File
@@ -125,8 +125,8 @@ internal U128 fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us);
internal FileProperties fs_properties_from_path(String8 path);
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void fs_tick(void);
internal void fs_async_tick(void);
#endif // FILE_STREAM_H
+1 -1
View File
@@ -40,7 +40,7 @@ mtx_init(void)
internal void
mtx_push_op(C_Key buffer_key, MTX_Op op)
{
U64 hash = c_little_hash_from_data(str8_struct(&buffer_key));
U64 hash = u64_hash_from_str8(str8_struct(&buffer_key));
MTX_MutThread *thread = &mtx_shared->mut_threads[hash%mtx_shared->mut_threads_count];
mtx_enqueue_op(thread, buffer_key, op);
}
+2 -2
View File
@@ -137,10 +137,10 @@ tex_texture_from_key_topology(Access *access, C_Key key, TEX_Topology topology,
}
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void
tex_tick(void)
tex_async_tick(void)
{
if(ins_atomic_u64_eval(&tex_shared) == 0) { return; }
ProfBeginFunction();
+2 -2
View File
@@ -105,9 +105,9 @@ internal R_Handle tex_texture_from_hash_topology(Access *access, U128 hash, TEX_
internal R_Handle tex_texture_from_key_topology(Access *access, C_Key key, TEX_Topology topology, U128 *hash_out);
////////////////////////////////
//~ rjf: Tick
//~ rjf: Asynchronous Tick
internal void tex_tick(void);
internal void tex_async_tick(void);
////////////////////////////////
//~ rjf: Artifact Cache Hooks / Lookups