checkpoint on more artifact cache port of ctrl process memory streaming

This commit is contained in:
Ryan Fleury
2025-09-23 17:05:45 -07:00
parent 141b6c1396
commit 7e05a60ffe
3 changed files with 149 additions and 120 deletions
+147 -120
View File
@@ -7466,127 +7466,134 @@ ASYNC_WORK_DEF(ctrl_call_stack_tree_build_work)
internal AC_Artifact
ctrl_memory_artifact_create(String8 key, B32 *retry_out)
{
//- rjf: unpack key
CTRL_Handle process = {0};
Rng1U64 vaddr_range = {0};
B32 zero_terminated = 0;
{
U64 key_read_off = 0;
key_read_off += str8_deserial_read_struct(key, key_read_off, &process);
key_read_off += str8_deserial_read_struct(key, key_read_off, &vaddr_range);
key_read_off += str8_deserial_read_struct(key, key_read_off, &zero_terminated);
}
//- rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
{
vaddr_range_clamped.max = Max(vaddr_range_clamped.max, vaddr_range_clamped.min);
U64 max_size_cap = Min(max_U64-vaddr_range_clamped.min, GB(1));
vaddr_range_clamped.max = Min(vaddr_range_clamped.max, vaddr_range_clamped.min+max_size_cap);
}
//- rjf: do read
U64 range_size = 0;
Arena *range_arena = 0;
void *range_base = 0;
U64 zero_terminated_size = 0;
U64 pre_read_mem_gen = ctrl_mem_gen();
B32 pre_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
{
range_size = dim_1u64(vaddr_range_clamped);
U64 page_size = os_get_system_info()->page_size;
U64 arena_size = AlignPow2(range_size + ARENA_HEADER_SIZE, page_size);
range_arena = arena_alloc(.reserve_size = range_size+ARENA_HEADER_SIZE, .commit_size = range_size+ARENA_HEADER_SIZE);
if(range_arena == 0)
{
range_size = 0;
}
else
{
range_base = push_array_no_zero(range_arena, U8, range_size);
U64 bytes_read = 0;
U64 retry_count = 0;
U64 retry_limit = range_size > page_size ? 64 : 0;
for(Rng1U64 vaddr_range_clamped_retry = vaddr_range_clamped;
retry_count <= retry_limit;
retry_count += 1)
{
bytes_read = dmn_process_read(process.dmn_handle, vaddr_range_clamped_retry, range_base);
if(bytes_read == 0 && vaddr_range_clamped_retry.max > vaddr_range_clamped_retry.min)
{
U64 diff = (vaddr_range_clamped_retry.max-vaddr_range_clamped_retry.min)/2;
vaddr_range_clamped_retry.max -= diff;
vaddr_range_clamped_retry.max = AlignDownPow2(vaddr_range_clamped_retry.max, page_size);
if(diff == 0)
{
break;
}
}
else
{
break;
}
}
if(bytes_read == 0)
{
arena_release(range_arena);
range_base = 0;
range_size = 0;
range_arena = 0;
}
else if(bytes_read < range_size)
{
MemoryZero((U8 *)range_base + bytes_read, range_size-bytes_read);
}
zero_terminated_size = range_size;
if(zero_terminated)
{
for(U64 idx = 0; idx < bytes_read; idx += 1)
{
if(((U8 *)range_base)[idx] == 0)
{
zero_terminated_size = idx;
break;
}
}
U64 bytes_overkill = (bytes_read - zero_terminated_size);
arena_pop(range_arena, bytes_overkill);
}
}
}
U64 post_read_mem_gen = ctrl_mem_gen();
B32 post_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
//- rjf: form content key
C_Key content_key = {0};
{
content_key.id.u128[0] = u128_hash_from_str8(key);
}
//- rjf: read successful -> submit to hash store
U128 hash = {0};
if(range_base != 0 && pre_read_mem_gen == post_read_mem_gen)
{
hash = c_submit_data(content_key, &range_arena, str8((U8*)range_base, zero_terminated_size));
}
else if(range_arena != 0)
{
arena_release(range_arena);
}
//- rjf: wakeup on new reads
if(!u128_match(u128_zero(), hash))
{
if(ctrl_state->wakeup_hook != 0)
{
ctrl_state->wakeup_hook();
}
}
//- rjf: return content key bundled as artifact
AC_Artifact artifact = {0};
StaticAssert(sizeof(content_key) == sizeof(artifact), artifact_key_size_check);
MemoryCopyStruct(&artifact, &content_key);
if(lane_idx() == 0)
{
//- rjf: unpack key
CTRL_Handle process = {0};
Rng1U64 vaddr_range = {0};
B32 zero_terminated = 0;
{
U64 key_read_off = 0;
key_read_off += str8_deserial_read_struct(key, key_read_off, &process);
key_read_off += str8_deserial_read_struct(key, key_read_off, &vaddr_range);
key_read_off += str8_deserial_read_struct(key, key_read_off, &zero_terminated);
}
//- rjf: clamp vaddr range
Rng1U64 vaddr_range_clamped = vaddr_range;
{
vaddr_range_clamped.max = Max(vaddr_range_clamped.max, vaddr_range_clamped.min);
U64 max_size_cap = Min(max_U64-vaddr_range_clamped.min, GB(1));
vaddr_range_clamped.max = Min(vaddr_range_clamped.max, vaddr_range_clamped.min+max_size_cap);
}
//- rjf: do read
U64 range_size = 0;
Arena *range_arena = 0;
void *range_base = 0;
U64 zero_terminated_size = 0;
U64 pre_read_mem_gen = ctrl_mem_gen();
B32 pre_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
{
range_size = dim_1u64(vaddr_range_clamped);
U64 page_size = os_get_system_info()->page_size;
U64 arena_size = AlignPow2(range_size + ARENA_HEADER_SIZE, page_size);
range_arena = arena_alloc(.reserve_size = range_size+ARENA_HEADER_SIZE, .commit_size = range_size+ARENA_HEADER_SIZE);
if(range_arena == 0)
{
range_size = 0;
}
else
{
range_base = push_array_no_zero(range_arena, U8, range_size);
U64 bytes_read = 0;
U64 retry_count = 0;
U64 retry_limit = range_size > page_size ? 64 : 0;
for(Rng1U64 vaddr_range_clamped_retry = vaddr_range_clamped;
retry_count <= retry_limit;
retry_count += 1)
{
bytes_read = dmn_process_read(process.dmn_handle, vaddr_range_clamped_retry, range_base);
if(bytes_read == 0 && vaddr_range_clamped_retry.max > vaddr_range_clamped_retry.min)
{
U64 diff = (vaddr_range_clamped_retry.max-vaddr_range_clamped_retry.min)/2;
vaddr_range_clamped_retry.max -= diff;
vaddr_range_clamped_retry.max = AlignDownPow2(vaddr_range_clamped_retry.max, page_size);
if(diff == 0)
{
break;
}
}
else
{
break;
}
}
if(bytes_read == 0)
{
arena_release(range_arena);
range_base = 0;
range_size = 0;
range_arena = 0;
}
else if(bytes_read < range_size)
{
MemoryZero((U8 *)range_base + bytes_read, range_size-bytes_read);
}
zero_terminated_size = range_size;
if(zero_terminated)
{
for(U64 idx = 0; idx < bytes_read; idx += 1)
{
if(((U8 *)range_base)[idx] == 0)
{
zero_terminated_size = idx;
break;
}
}
U64 bytes_overkill = (bytes_read - zero_terminated_size);
arena_pop(range_arena, bytes_overkill);
}
}
}
U64 post_read_mem_gen = ctrl_mem_gen();
B32 post_run_state = ins_atomic_u64_eval(&ctrl_state->ctrl_thread_run_state);
//- rjf: form content key
C_Key content_key = {0};
{
content_key.id.u128[0] = u128_hash_from_str8(key);
}
//- rjf: read successful -> submit to hash store
U128 hash = {0};
if(range_base != 0 && pre_read_mem_gen == post_read_mem_gen)
{
hash = c_submit_data(content_key, &range_arena, str8((U8*)range_base, zero_terminated_size));
}
else if(range_arena != 0)
{
arena_release(range_arena);
}
//- rjf: wakeup on new reads
if(!u128_match(u128_zero(), hash))
{
if(ctrl_state->wakeup_hook != 0)
{
ctrl_state->wakeup_hook();
}
}
//- rjf: bundle content key as artifact
StaticAssert(sizeof(content_key) == sizeof(artifact), artifact_key_size_check);
MemoryCopyStruct(&artifact, &content_key);
}
lane_sync_u64(&artifact.u64[0], 0);
lane_sync_u64(&artifact.u64[1], 0);
lane_sync_u64(&artifact.u64[2], 0);
lane_sync_u64(&artifact.u64[3], 0);
return artifact;
}
@@ -7597,3 +7604,23 @@ ctrl_memory_artifact_destroy(AC_Artifact artifact)
MemoryCopyStruct(&key, &artifact);
c_close_key(key);
}
internal C_Key
ctrl_key_from_process_vaddr_range_new(CTRL_Handle process, Rng1U64 vaddr_range, B32 zero_terminated, U64 endt_us, B32 *out_is_stale)
{
ProfBeginFunction();
struct
{
CTRL_Handle process;
Rng1U64 vaddr_range;
B32 zero_terminated;
} key_data = {process, vaddr_range, zero_terminated};
String8 key = str8_struct(&key_data);
Access *access = access_open();
AC_Artifact artifact = ac_artifact_from_key(access, key, ctrl_mem_gen(), ctrl_memory_artifact_create, ctrl_memory_artifact_destroy, 2048);
C_Key content_key = {0};
MemoryCopyStruct(&content_key, &artifact);
access_close(access);
ProfEnd();
return content_key;
}