shift file streaming layer to using async layer for background work, remove file stream layer thread pool

This commit is contained in:
Ryan Fleury
2024-10-31 11:44:54 -07:00
parent f3a36ece13
commit 86d9b792d8
6 changed files with 115 additions and 101 deletions
+15 -3
View File
@@ -40,9 +40,10 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
B32 result = 0;
ASYNC_Work work = {0};
work.work_function = work_function;
work.input = params->input;
work.output = params->output;
work.semaphore = params->semaphore;
work.input = params->input;
work.output = params->output;
work.semaphore = params->semaphore;
work.completion_counter = params->completion_counter;
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
{
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
@@ -63,6 +64,10 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us);
}
if(result)
{
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
}
return result;
}
@@ -127,6 +132,7 @@ async_work_thread__entry_point(void *p)
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64);
}
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
//- rjf: run work
void *work_out = work.work_function(thread_idx, work.input);
@@ -142,5 +148,11 @@ async_work_thread__entry_point(void *p)
{
os_semaphore_drop(work.semaphore);
}
//- rjf: increment completion counter
if(work.completion_counter != 0)
{
ins_atomic_u64_inc_eval(work.completion_counter);
}
}
}
+2
View File
@@ -20,6 +20,7 @@ struct ASYNC_WorkParams
void *input;
void **output;
OS_Handle semaphore;
U64 *completion_counter;
U64 endt_us;
};
@@ -30,6 +31,7 @@ struct ASYNC_Work
void *input;
void **output;
OS_Handle semaphore;
U64 *completion_counter;
};
////////////////////////////////
+1 -1
View File
@@ -214,7 +214,7 @@
# define ins_atomic_ptr_eval_assign(x,c) (void*)ins_atomic_u32_eval_assign((volatile U32 *)(x), (U32)(c))
# define ins_atomic_ptr_eval(x) (void*)ins_atomic_u32_eval((volatile U32 *)x)
#else
# error Atomic intrinsics for pointers not defined for this achitecture.
# error Atomic intrinsics for pointers not defined for this architecture.
#endif
////////////////////////////////
+3
View File
@@ -27,6 +27,9 @@ main_thread_base_entry_point(void (*entry_point)(CmdLine *cmdline), char **argum
}
//- rjf: initialize all included layers
#if defined(ASYNC_H) && !defined(ASYNC_INIT_MANUAL)
async_init();
#endif
#if defined(TASK_SYSTEM_H) && !defined(TS_INIT_MANUAL)
ts_init();
#endif
+90 -93
View File
@@ -57,7 +57,7 @@ fs_init(void)
fs_shared->streamers = push_array(arena, OS_Handle, 1);
for(U64 idx = 0; idx < fs_shared->streamer_count; idx += 1)
{
fs_shared->streamers[idx] = os_thread_launch(fs_streamer_thread__entry_point, (void *)idx, 0);
// fs_shared->streamers[idx] = os_thread_launch(fs_streamer_thread__entry_point, (void *)idx, 0);
}
fs_shared->detector_thread = os_thread_launch(fs_detector_thread__entry_point, 0, 0);
}
@@ -174,10 +174,11 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us)
}
// rjf: try to send stream request
if(!ins_atomic_u32_eval_cond_assign(&range_node->is_working, 1, 0) &&
!fs_u2s_enqueue_req(range, path, endt_us))
if(ins_atomic_u64_eval(&range_node->request_count) == ins_atomic_u64_eval(&range_node->completion_count) &&
async_push_work(fs_stream_work, .endt_us = endt_us, .completion_counter = &range_node->completion_count))
{
ins_atomic_u32_eval_assign(&range_node->is_working, 0);
fs_u2s_enqueue_req(range, path, max_U64);
ins_atomic_u64_inc_eval(&range_node->request_count);
}
// rjf: try to reobtain results
@@ -317,108 +318,103 @@ fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out)
os_condition_variable_broadcast(fs_shared->u2s_ring_cv);
}
internal void
fs_streamer_thread__entry_point(void *p)
ASYNC_WORK_DEF(fs_stream_work)
{
ThreadNameF("[fs] streamer #%I64u", (U64)p);
for(;;)
Temp scratch = scratch_begin(0, 0);
//- rjf: get next request
Rng1U64 range = {0};
String8 path = {0};
fs_u2s_dequeue_req(scratch.arena, &range, &path);
//- rjf: unpack request
U128 key = fs_big_hash_from_string_range(path, range);
U64 path_hash = fs_little_hash_from_string(path);
U64 path_slot_idx = path_hash%fs_shared->slots_count;
U64 path_stripe_idx = path_slot_idx%fs_shared->stripes_count;
FS_Slot *path_slot = &fs_shared->slots[path_slot_idx];
FS_Stripe *path_stripe = &fs_shared->stripes[path_stripe_idx];
//- rjf: load
ProfBegin("load \"%.*s\"", str8_varg(path));
FileProperties pre_props = os_properties_from_file_path(path);
U64 range_size = dim_1u64(range);
U64 read_size = Min(pre_props.size, range_size);
OS_Handle file = os_file_open(OS_AccessFlag_Read|OS_AccessFlag_ShareRead|OS_AccessFlag_ShareWrite, path);
U64 data_arena_size = read_size+ARENA_HEADER_SIZE;
data_arena_size += KB(4)-1;
data_arena_size -= data_arena_size%KB(4);
ProfBegin("allocate");
Arena *data_arena = arena_alloc(.reserve_size = data_arena_size, .commit_size = data_arena_size);
ProfEnd();
ProfBegin("read");
String8 data = os_string_from_file_range(data_arena, file, r1u64(range.min, range.min+read_size));
ProfEnd();
os_file_close(file);
FileProperties post_props = os_properties_from_file_path(path);
//- rjf: abort if modification timestamps differ - we did not successfully read the file
if(pre_props.modified != post_props.modified)
{
Temp scratch = scratch_begin(0, 0);
//- rjf: get next request
Rng1U64 range = {0};
String8 path = {0};
fs_u2s_dequeue_req(scratch.arena, &range, &path);
//- rjf: unpack request
U128 key = fs_big_hash_from_string_range(path, range);
U64 path_hash = fs_little_hash_from_string(path);
U64 path_slot_idx = path_hash%fs_shared->slots_count;
U64 path_stripe_idx = path_slot_idx%fs_shared->stripes_count;
FS_Slot *path_slot = &fs_shared->slots[path_slot_idx];
FS_Stripe *path_stripe = &fs_shared->stripes[path_stripe_idx];
//- rjf: load
ProfBegin("load \"%.*s\"", str8_varg(path));
FileProperties pre_props = os_properties_from_file_path(path);
U64 range_size = dim_1u64(range);
U64 read_size = Min(pre_props.size, range_size);
OS_Handle file = os_file_open(OS_AccessFlag_Read|OS_AccessFlag_ShareRead|OS_AccessFlag_ShareWrite, path);
U64 data_arena_size = read_size+ARENA_HEADER_SIZE;
data_arena_size += KB(4)-1;
data_arena_size -= data_arena_size%KB(4);
ProfBegin("allocate");
Arena *data_arena = arena_alloc(.reserve_size = data_arena_size, .commit_size = data_arena_size);
ProfEnd();
ProfBegin("read");
String8 data = os_string_from_file_range(data_arena, file, r1u64(range.min, range.min+read_size));
ProfEnd();
os_file_close(file);
FileProperties post_props = os_properties_from_file_path(path);
//- rjf: abort if modification timestamps differ - we did not successfully read the file
if(pre_props.modified != post_props.modified)
ProfScope("abort")
{
ProfScope("abort")
arena_release(data_arena);
MemoryZeroStruct(&data);
data_arena = 0;
}
}
//- rjf: submit
else
{
ProfScope("submit")
{
hs_submit_data(key, &data_arena, data);
}
}
//- rjf: commit info to cache
ProfScope("commit to cache") OS_MutexScopeW(path_stripe->rw_mutex)
{
FS_Node *node = 0;
for(FS_Node *n = path_slot->first; n != 0; n = n->next)
{
if(str8_match(n->path, path, 0))
{
arena_release(data_arena);
MemoryZeroStruct(&data);
data_arena = 0;
node = n;
break;
}
}
//- rjf: submit
else
if(node != 0)
{
ProfScope("submit")
if(node->timestamp != 0)
{
hs_submit_data(key, &data_arena, data);
ins_atomic_u64_inc_eval(&fs_shared->change_gen);
}
}
//- rjf: commit info to cache
ProfScope("commit to cache") OS_MutexScopeW(path_stripe->rw_mutex)
{
FS_Node *node = 0;
for(FS_Node *n = path_slot->first; n != 0; n = n->next)
if(post_props.modified == pre_props.modified)
{
if(str8_match(n->path, path, 0))
node->timestamp = post_props.modified;
node->size = post_props.size;
}
U64 range_hash = fs_little_hash_from_string(str8_struct(&range));
U64 range_slot_idx = range_hash%node->slots_count;
FS_RangeSlot *range_slot = &node->slots[range_slot_idx];
FS_RangeNode *range_node = 0;
for(FS_RangeNode *n = range_slot->first; n != 0; n = n->next)
{
if(MemoryMatchStruct(&n->range, &range))
{
node = n;
range_node = n;
break;
}
}
if(node != 0)
{
if(node->timestamp != 0)
{
ins_atomic_u64_inc_eval(&fs_shared->change_gen);
}
if(post_props.modified == pre_props.modified)
{
node->timestamp = post_props.modified;
node->size = post_props.size;
}
U64 range_hash = fs_little_hash_from_string(str8_struct(&range));
U64 range_slot_idx = range_hash%node->slots_count;
FS_RangeSlot *range_slot = &node->slots[range_slot_idx];
FS_RangeNode *range_node = 0;
for(FS_RangeNode *n = range_slot->first; n != 0; n = n->next)
{
if(MemoryMatchStruct(&n->range, &range))
{
range_node = n;
break;
}
}
ins_atomic_u32_eval_assign(&range_node->is_working, 0);
}
}
os_condition_variable_broadcast(path_stripe->cv);
ProfEnd();
scratch_end(scratch);
}
os_condition_variable_broadcast(path_stripe->cv);
ProfEnd();
scratch_end(scratch);
return 0;
}
////////////////////////////////
@@ -449,10 +445,11 @@ fs_detector_thread__entry_point(void *p)
range_n != 0;
range_n = range_n->next)
{
if(!ins_atomic_u32_eval_cond_assign(&range_n->is_working, 1, 0) &&
!fs_u2s_enqueue_req(range_n->range, n->path, os_now_microseconds()+100000))
if(ins_atomic_u64_eval(&range_n->request_count) == ins_atomic_u64_eval(&range_n->completion_count) &&
async_push_work(fs_stream_work, .endt_us = os_now_microseconds()+100000, .completion_counter = &range_n->completion_count))
{
ins_atomic_u32_eval_assign(&range_n->is_working, 0);
fs_u2s_enqueue_req(range_n->range, n->path, max_U64);
ins_atomic_u64_inc_eval(&range_n->request_count);
}
}
}
+4 -4
View File
@@ -12,7 +12,8 @@ struct FS_RangeNode
{
FS_RangeNode *next;
Rng1U64 range;
B32 is_working;
U64 request_count;
U64 completion_count;
};
typedef struct FS_RangeSlot FS_RangeSlot;
@@ -114,12 +115,11 @@ internal U64 fs_timestamp_from_path(String8 path);
internal U64 fs_size_from_path(String8 path);
////////////////////////////////
//~ rjf: Streamer Threads
//~ rjf: Streaming Work
internal B32 fs_u2s_enqueue_req(Rng1U64 range, String8 path, U64 endt_us);
internal void fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out);
internal void fs_streamer_thread__entry_point(void *p);
ASYNC_WORK_DEF(fs_stream_work);
////////////////////////////////
//~ rjf: Change Detector Thread