sketch out artifact cache hooks for file streaming layer

This commit is contained in:
Ryan Fleury
2025-09-23 11:58:00 -07:00
parent 6cdce22284
commit 141b6c1396
8 changed files with 215 additions and 209 deletions
-3
View File
@@ -206,9 +206,6 @@ async_thread_entry_point(void *params)
#if defined(FILE_STREAM_H)
fs_async_tick();
#endif
#if defined(CTRL_CORE_H)
ctrl_async_tick();
#endif
#if defined(TEXTURE_CACHE_H)
tex_async_tick();
#endif
+16
View File
@@ -2891,3 +2891,19 @@ u64_hash_from_str8(String8 string)
U64 result = u64_hash_from_seed_str8(5381, string);
return result;
}
internal U128
u128_hash_from_seed_str8(U64 seed, String8 string)
{
U128 result = {0};
XXH128_hash_t hash = XXH3_128bits_withSeed(string.str, string.size, seed);
MemoryCopy(&result, &hash, sizeof(result));
return result;
}
internal U128
u128_hash_from_str8(String8 string)
{
U128 result = u128_hash_from_seed_str8(5381, string);
return result;
}
+2
View File
@@ -449,5 +449,7 @@ internal U64 str8_deserial_read_block(String8 string, U64 off, U64 size, Stri
internal U64 u64_hash_from_seed_str8(U64 seed, String8 string);
internal U64 u64_hash_from_str8(String8 string);
internal U128 u128_hash_from_seed_str8(U64 seed, String8 string);
internal U128 u128_hash_from_str8(String8 string);
#endif // BASE_STRINGS_H
+108 -171
View File
@@ -7463,200 +7463,137 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work)
////////////////////////////////
//~ rjf: Process Memory Artifact Cache Hooks / Lookups
internal void *
internal AC_Artifact
ctrl_memory_artifact_create(String8 key, B32 *retry_out)
{
}
internal void
ctrl_memory_artifact_destroy(void *ptr)
{
}
////////////////////////////////
//~ rjf: Asynchronous Tick
internal void
ctrl_async_tick(void)
{
Temp scratch = scratch_begin(0, 0);
#if 0
//- rjf: get all memory requests
U64 mem_reqs_count = 0;
CTRL_MemRequest *mem_reqs = 0;
MutexScope(ctrl_state->mem_req_mutex)
//- rjf: unpack key
CTRL_Handle process = {0};
Rng1U64 vaddr_range = {0};
B32 zero_terminated = 0;
{
mem_reqs_count = ctrl_state->mem_req_count;
mem_reqs = push_array(scratch.arena, CTRL_MemRequest, mem_reqs_count);
U64 idx = 0;
for EachNode(n, CTRL_MemRequestNode, ctrl_state->first_mem_req)
{
MemoryCopyStruct(&mem_reqs[idx], &n->v);
idx += 1;
}
U64 key_read_off = 0;
key_read_off += str8_deserial_read_struct(key, key_read_off, &process);
key_read_off += str8_deserial_read_struct(key, key_read_off, &vaddr_range);
key_read_off += str8_deserial_read_struct(key, key_read_off, &zero_terminated);
}
//- rjf: do all memory requests
//- rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
{
CTRL_ProcessMemoryCache *cache = &ctrl_state->process_memory_cache;
U64 mem_req_take_counter = 0;
U64 *mem_req_take_counter_ptr = &mem_req_take_counter;
lane_sync_u64(&mem_req_take_counter_ptr, 0);
for(;;)
vaddr_range_clamped.max = Max(vaddr_range_clamped.max, vaddr_range_clamped.min);
U64 max_size_cap = Min(max_U64-vaddr_range_clamped.min, GB(1));
vaddr_range_clamped.max = Min(vaddr_range_clamped.max, vaddr_range_clamped.min+max_size_cap);
}
//- rjf: do read
U64 range_size = 0;
Arena *range_arena = 0;
void *range_base = 0;
U64 zero_terminated_size = 0;
U64 pre_read_mem_gen = ctrl_mem_gen();
B32 pre_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
{
range_size = dim_1u64(vaddr_range_clamped);
U64 page_size = os_get_system_info()->page_size;
U64 arena_size = AlignPow2(range_size + ARENA_HEADER_SIZE, page_size);
range_arena = arena_alloc(.reserve_size = range_size+ARENA_HEADER_SIZE, .commit_size = range_size+ARENA_HEADER_SIZE);
if(range_arena == 0)
{
// rjf: take next task
U64 mem_req_num = ins_atomic_u64_inc_eval(mem_req_take_counter_ptr);
U64 mem_req_idx = (mem_req_num-1);
if(mem_reqs_count <= mem_req_idx)
range_size = 0;
}
else
{
range_base = push_array_no_zero(range_arena, U8, range_size);
U64 bytes_read = 0;
U64 retry_count = 0;
U64 retry_limit = range_size > page_size ? 64 : 0;
for(Rng1U64 vaddr_range_clamped_retry = vaddr_range_clamped;
retry_count <= retry_limit;
retry_count += 1)
{
break;
}
// rjf: unpack request
CTRL_MemRequest *req = &mem_reqs[mem_req_idx];
C_Key key = req->key;
CTRL_Handle process = req->process;
Rng1U64 vaddr_range = req->vaddr_range;
B32 zero_terminated = req->zero_terminated;
// rjf: unpack process key
U64 process_hash = ctrl_hash_from_handle(process);
U64 process_slot_idx = process_hash%cache->slots_count;
U64 process_stripe_idx = process_slot_idx%cache->stripes_count;
CTRL_ProcessMemoryCacheSlot *process_slot = &cache->slots[process_slot_idx];
CTRL_ProcessMemoryCacheStripe *process_stripe = &cache->stripes[process_stripe_idx];
// rjf: unpack little hash of range key
U64 range_hash = u64_hash_from_str8(str8_struct(&key.id));
// rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
{
vaddr_range_clamped.max = Max(vaddr_range_clamped.max, vaddr_range_clamped.min);
U64 max_size_cap = Min(max_U64-vaddr_range_clamped.min, GB(1));
vaddr_range_clamped.max = Min(vaddr_range_clamped.max, vaddr_range_clamped.min+max_size_cap);
}
// rjf: read
U64 range_size = 0;
Arena *range_arena = 0;
void *range_base = 0;
U64 zero_terminated_size = 0;
U64 pre_read_mem_gen = ctrl_mem_gen();
B32 pre_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
{
range_size = dim_1u64(vaddr_range_clamped);
U64 page_size = os_get_system_info()->page_size;
U64 arena_size = AlignPow2(range_size + ARENA_HEADER_SIZE, page_size);
range_arena = arena_alloc(.reserve_size = range_size+ARENA_HEADER_SIZE, .commit_size = range_size+ARENA_HEADER_SIZE);
if(range_arena == 0)
bytes_read = dmn_process_read(process.dmn_handle, vaddr_range_clamped_retry, range_base);
if(bytes_read == 0 && vaddr_range_clamped_retry.max > vaddr_range_clamped_retry.min)
{
range_size = 0;
U64 diff = (vaddr_range_clamped_retry.max-vaddr_range_clamped_retry.min)/2;
vaddr_range_clamped_retry.max -= diff;
vaddr_range_clamped_retry.max = AlignDownPow2(vaddr_range_clamped_retry.max, page_size);
if(diff == 0)
{
break;
}
}
else
{
range_base = push_array_no_zero(range_arena, U8, range_size);
U64 bytes_read = 0;
U64 retry_count = 0;
U64 retry_limit = range_size > page_size ? 64 : 0;
for(Rng1U64 vaddr_range_clamped_retry = vaddr_range_clamped;
retry_count <= retry_limit;
retry_count += 1)
{
bytes_read = dmn_process_read(process.dmn_handle, vaddr_range_clamped_retry, range_base);
if(bytes_read == 0 && vaddr_range_clamped_retry.max > vaddr_range_clamped_retry.min)
{
U64 diff = (vaddr_range_clamped_retry.max-vaddr_range_clamped_retry.min)/2;
vaddr_range_clamped_retry.max -= diff;
vaddr_range_clamped_retry.max = AlignDownPow2(vaddr_range_clamped_retry.max, page_size);
if(diff == 0)
{
break;
}
}
else
{
break;
}
}
if(bytes_read == 0)
{
arena_release(range_arena);
range_base = 0;
range_size = 0;
range_arena = 0;
}
else if(bytes_read < range_size)
{
MemoryZero((U8 *)range_base + bytes_read, range_size-bytes_read);
}
zero_terminated_size = range_size;
if(zero_terminated)
{
for(U64 idx = 0; idx < bytes_read; idx += 1)
{
if(((U8 *)range_base)[idx] == 0)
{
zero_terminated_size = idx;
break;
}
}
}
break;
}
}
U64 post_read_mem_gen = ctrl_mem_gen();
B32 post_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
// rjf: read successful -> submit to hash store
U128 hash = {0};
if(range_base != 0 && pre_read_mem_gen == post_read_mem_gen)
{
hash = c_submit_data(key, &range_arena, str8((U8*)range_base, zero_terminated_size));
}
else if(range_arena != 0)
if(bytes_read == 0)
{
arena_release(range_arena);
range_base = 0;
range_size = 0;
range_arena = 0;
}
// rjf: commit new info to cache
MutexScopeW(process_stripe->rw_mutex)
else if(bytes_read < range_size)
{
for(CTRL_ProcessMemoryCacheNode *n = process_slot->first; n != 0; n = n->next)
MemoryZero((U8 *)range_base + bytes_read, range_size-bytes_read);
}
zero_terminated_size = range_size;
if(zero_terminated)
{
for(U64 idx = 0; idx < bytes_read; idx += 1)
{
if(ctrl_handle_match(n->handle, process))
if(((U8 *)range_base)[idx] == 0)
{
U64 range_slot_idx = range_hash%n->range_hash_slots_count;
CTRL_ProcessMemoryRangeHashSlot *range_slot = &n->range_hash_slots[range_slot_idx];
for(CTRL_ProcessMemoryRangeHashNode *range_n = range_slot->first; range_n != 0; range_n = range_n->next)
{
if(c_id_match(range_n->id, key.id))
{
if(pre_read_mem_gen == post_read_mem_gen)
{
range_n->mem_gen = post_read_mem_gen;
}
range_n->working_count -= 1;
goto commit__break_all;
}
}
zero_terminated_size = idx;
break;
}
}
commit__break_all:;
}
// rjf: broadcast changes
cond_var_broadcast(process_stripe->cv);
if(!u128_match(u128_zero(), hash))
{
if(ctrl_state->wakeup_hook != 0)
{
ctrl_state->wakeup_hook();
}
U64 bytes_overkill = (bytes_read - zero_terminated_size);
arena_pop(range_arena, bytes_overkill);
}
}
}
#endif
scratch_end(scratch);
U64 post_read_mem_gen = ctrl_mem_gen();
B32 post_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
//- rjf: form content key
C_Key content_key = {0};
{
content_key.id.u128[0] = u128_hash_from_str8(key);
}
//- rjf: read successful -> submit to hash store
U128 hash = {0};
if(range_base != 0 && pre_read_mem_gen == post_read_mem_gen)
{
hash = c_submit_data(content_key, &range_arena, str8((U8*)range_base, zero_terminated_size));
}
else if(range_arena != 0)
{
arena_release(range_arena);
}
//- rjf: wakeup on new reads
if(!u128_match(u128_zero(), hash))
{
if(ctrl_state->wakeup_hook != 0)
{
ctrl_state->wakeup_hook();
}
}
//- rjf: return content key bundled as artifact
AC_Artifact artifact = {0};
StaticAssert(sizeof(content_key) == sizeof(artifact), artifact_key_size_check);
MemoryCopyStruct(&artifact, &content_key);
return artifact;
}
internal void
ctrl_memory_artifact_destroy(AC_Artifact artifact)
{
C_Key key = {0};
MemoryCopyStruct(&key, &artifact);
c_close_key(key);
}
+2 -7
View File
@@ -1253,12 +1253,7 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work);
////////////////////////////////
//~ rjf: Process Memory Artifact Cache Hooks / Lookups
internal void *ctrl_memory_artifact_create(String8 key, B32 *retry_out);
internal void ctrl_memory_artifact_destroy(void *ptr);
////////////////////////////////
//~ rjf: Asynchronous Tick
internal void ctrl_async_tick(void);
internal AC_Artifact ctrl_memory_artifact_create(String8 key, B32 *retry_out);
internal void ctrl_memory_artifact_destroy(AC_Artifact artifact);
#endif // CTRL_CORE_H
+83 -26
View File
@@ -68,6 +68,89 @@ fs_change_gen(void)
////////////////////////////////
//~ rjf: Cache Interaction
internal AC_Artifact
fs_artifact_create(String8 key, B32 *retry_out)
{
Temp scratch = scratch_begin(0, 0);
//- rjf: unpack key
String8 path = {0};
Rng1U64 range = {0};
{
U64 key_read_off = 0;
key_read_off += str8_deserial_read_struct(key, key_read_off, &path.size);
path.str = push_array(scratch.arena, U8, path.size);
key_read_off += str8_deserial_read(key, key_read_off, path.str, path.size, 1);
key_read_off += str8_deserial_read_struct(key, key_read_off, &range);
}
//- rjf: do read
ProfBegin("read \"%.*s\" [0x%I64x, 0x%I64x)", str8_varg(path), range.min, range.max);
FileProperties pre_props = os_properties_from_file_path(path);
U64 range_size = dim_1u64(range);
U64 read_size = Min(pre_props.size - range.min, range_size);
OS_Handle file = os_file_open(OS_AccessFlag_Read|OS_AccessFlag_ShareRead|OS_AccessFlag_ShareWrite, path);
B32 file_handle_is_valid = !os_handle_match(os_handle_zero(), file);
U64 data_arena_size = read_size+ARENA_HEADER_SIZE;
data_arena_size += KB(4)-1;
data_arena_size -= data_arena_size%KB(4);
ProfBegin("allocate");
Arena *data_arena = arena_alloc(.reserve_size = data_arena_size, .commit_size = data_arena_size);
ProfEnd();
ProfBegin("read");
String8 data = os_string_from_file_range(data_arena, file, r1u64(range.min, range.min+read_size));
ProfEnd();
os_file_close(file);
FileProperties post_props = os_properties_from_file_path(path);
//- rjf: form content key
C_Key content_key = {0};
{
content_key.id.u128[0] = u128_hash_from_str8(key);
}
//- rjf: abort if modification timestamps or sizes differ - we did not successfully read the file
B32 read_good = (pre_props.modified == post_props.modified &&
pre_props.size == post_props.size &&
read_size == data.size &&
(file_handle_is_valid || pre_props.flags & FilePropertyFlag_IsFolder));
if(!read_good)
{
retry_out[0] = 1;
ProfScope("abort")
{
arena_release(data_arena);
MemoryZeroStruct(&data);
data_arena = 0;
}
}
//- rjf: submit to content store
else
{
ProfScope("submit")
{
c_submit_data(content_key, &data_arena, data);
}
}
//- rjf: bundle content key as artifact
AC_Artifact artifact = {0};
StaticAssert(sizeof(content_key) == sizeof(artifact), artifact_key_size_check);
MemoryCopyStruct(&artifact, &content_key);
scratch_end(scratch);
return artifact;
}
internal void
fs_artifact_destroy(AC_Artifact artifact)
{
C_Key key = {0};
MemoryCopyStruct(&key, &artifact);
c_close_key(key);
}
internal C_Key
fs_key_from_path_range(String8 path, Rng1U64 range, U64 endt_us)
{
@@ -214,32 +297,6 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us)
return hash;
}
internal FileProperties
fs_properties_from_path(String8 path)
{
Temp scratch = scratch_begin(0, 0);
FileProperties result = {0};
path = path_normalized_from_string(scratch.arena, path);
U64 path_hash = fs_little_hash_from_string(path);
U64 slot_idx = path_hash%fs_shared->slots_count;
U64 stripe_idx = slot_idx%fs_shared->stripes_count;
FS_Slot *slot = &fs_shared->slots[slot_idx];
FS_Stripe *stripe = &fs_shared->stripes[stripe_idx];
MutexScopeR(stripe->rw_mutex)
{
for(FS_Node *n = slot->first; n != 0; n = n->next)
{
if(str8_match(path, n->path, 0))
{
result = n->props;
break;
}
}
}
scratch_end(scratch);
return result;
}
////////////////////////////////
//~ rjf: Asynchronous Tick
+3 -1
View File
@@ -120,9 +120,11 @@ internal U64 fs_change_gen(void);
////////////////////////////////
//~ rjf: Cache Interaction
internal AC_Artifact fs_artifact_create(String8 key, B32 *retry_out);
internal void fs_artifact_destroy(AC_Artifact artifact);
internal C_Key fs_key_from_path_range(String8 path, Rng1U64 range, U64 endt_us);
internal U128 fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us);
internal FileProperties fs_properties_from_path(String8 path);
////////////////////////////////
//~ rjf: Asynchronous Tick
+1 -1
View File
@@ -2177,7 +2177,7 @@ RD_VIEW_UI_FUNCTION_DEF(text)
B32 file_is_out_of_date = 0;
String8 out_of_date_dbgi_name = {0};
{
U64 file_timestamp = fs_properties_from_path(rd_regs()->file_path).modified;
U64 file_timestamp = os_properties_from_file_path(rd_regs()->file_path).modified;
if(file_timestamp != 0)
{
for(DI_KeyNode *n = dbgi_keys.first; n != 0; n = n->next)