diff --git a/src/async/async.c b/src/async/async.c index c3a6b145..8ec60167 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -40,9 +40,10 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params B32 result = 0; ASYNC_Work work = {0}; work.work_function = work_function; - work.input = params->input; - work.output = params->output; - work.semaphore = params->semaphore; + work.input = params->input; + work.output = params->output; + work.semaphore = params->semaphore; + work.completion_counter = params->completion_counter; OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) { U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; @@ -63,6 +64,10 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params } os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us); } + if(result) + { + os_condition_variable_broadcast(async_shared->u2w_ring_cv); + } return result; } @@ -127,6 +132,7 @@ async_work_thread__entry_point(void *p) } os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64); } + os_condition_variable_broadcast(async_shared->u2w_ring_cv); //- rjf: run work void *work_out = work.work_function(thread_idx, work.input); @@ -142,5 +148,11 @@ async_work_thread__entry_point(void *p) { os_semaphore_drop(work.semaphore); } + + //- rjf: increment completion counter + if(work.completion_counter != 0) + { + ins_atomic_u64_inc_eval(work.completion_counter); + } } } diff --git a/src/async/async.h b/src/async/async.h index 44b171ef..ffdc9953 100644 --- a/src/async/async.h +++ b/src/async/async.h @@ -20,6 +20,7 @@ struct ASYNC_WorkParams void *input; void **output; OS_Handle semaphore; + U64 *completion_counter; U64 endt_us; }; @@ -30,6 +31,7 @@ struct ASYNC_Work void *input; void **output; OS_Handle semaphore; + U64 *completion_counter; }; //////////////////////////////// diff --git a/src/base/base_core.h b/src/base/base_core.h index 8a5fba19..6c7147d6 100644 --- a/src/base/base_core.h +++ b/src/base/base_core.h @@ -214,7 +214,7 @@ # define ins_atomic_ptr_eval_assign(x,c) (void*)ins_atomic_u32_eval_assign((volatile U32 *)(x), (U32)(c)) # define ins_atomic_ptr_eval(x) (void*)ins_atomic_u32_eval((volatile U32 *)x) #else -# error Atomic intrinsics for pointers not defined for this achitecture. +# error Atomic intrinsics for pointers not defined for this architecture. #endif //////////////////////////////// diff --git a/src/base/base_entry_point.c b/src/base/base_entry_point.c index 4d6a6460..205922e6 100644 --- a/src/base/base_entry_point.c +++ b/src/base/base_entry_point.c @@ -27,6 +27,9 @@ main_thread_base_entry_point(void (*entry_point)(CmdLine *cmdline), char **argum } //- rjf: initialize all included layers +#if defined(ASYNC_H) && !defined(ASYNC_INIT_MANUAL) + async_init(); +#endif #if defined(TASK_SYSTEM_H) && !defined(TS_INIT_MANUAL) ts_init(); #endif diff --git a/src/file_stream/file_stream.c b/src/file_stream/file_stream.c index acb7c129..a9cfbe0c 100644 --- a/src/file_stream/file_stream.c +++ b/src/file_stream/file_stream.c @@ -57,7 +57,7 @@ fs_init(void) fs_shared->streamers = push_array(arena, OS_Handle, 1); for(U64 idx = 0; idx < fs_shared->streamer_count; idx += 1) { - fs_shared->streamers[idx] = os_thread_launch(fs_streamer_thread__entry_point, (void *)idx, 0); + // fs_shared->streamers[idx] = os_thread_launch(fs_streamer_thread__entry_point, (void *)idx, 0); } fs_shared->detector_thread = os_thread_launch(fs_detector_thread__entry_point, 0, 0); } @@ -174,10 +174,11 @@ fs_hash_from_path_range(String8 path, Rng1U64 range, U64 endt_us) } // rjf: try to send stream request - if(!ins_atomic_u32_eval_cond_assign(&range_node->is_working, 1, 0) && - !fs_u2s_enqueue_req(range, path, endt_us)) + if(ins_atomic_u64_eval(&range_node->request_count) == ins_atomic_u64_eval(&range_node->completion_count) && + async_push_work(fs_stream_work, .endt_us = endt_us, .completion_counter = &range_node->completion_count)) { - ins_atomic_u32_eval_assign(&range_node->is_working, 0); + fs_u2s_enqueue_req(range, path, max_U64); + ins_atomic_u64_inc_eval(&range_node->request_count); } // rjf: try to reobtain results @@ -317,108 +318,103 @@ fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out) os_condition_variable_broadcast(fs_shared->u2s_ring_cv); } -internal void -fs_streamer_thread__entry_point(void *p) +ASYNC_WORK_DEF(fs_stream_work) { - ThreadNameF("[fs] streamer #%I64u", (U64)p); - for(;;) + Temp scratch = scratch_begin(0, 0); + + //- rjf: get next request + Rng1U64 range = {0}; + String8 path = {0}; + fs_u2s_dequeue_req(scratch.arena, &range, &path); + + //- rjf: unpack request + U128 key = fs_big_hash_from_string_range(path, range); + U64 path_hash = fs_little_hash_from_string(path); + U64 path_slot_idx = path_hash%fs_shared->slots_count; + U64 path_stripe_idx = path_slot_idx%fs_shared->stripes_count; + FS_Slot *path_slot = &fs_shared->slots[path_slot_idx]; + FS_Stripe *path_stripe = &fs_shared->stripes[path_stripe_idx]; + + //- rjf: load + ProfBegin("load \"%.*s\"", str8_varg(path)); + FileProperties pre_props = os_properties_from_file_path(path); + U64 range_size = dim_1u64(range); + U64 read_size = Min(pre_props.size, range_size); + OS_Handle file = os_file_open(OS_AccessFlag_Read|OS_AccessFlag_ShareRead|OS_AccessFlag_ShareWrite, path); + U64 data_arena_size = read_size+ARENA_HEADER_SIZE; + data_arena_size += KB(4)-1; + data_arena_size -= data_arena_size%KB(4); + ProfBegin("allocate"); + Arena *data_arena = arena_alloc(.reserve_size = data_arena_size, .commit_size = data_arena_size); + ProfEnd(); + ProfBegin("read"); + String8 data = os_string_from_file_range(data_arena, file, r1u64(range.min, range.min+read_size)); + ProfEnd(); + os_file_close(file); + FileProperties post_props = os_properties_from_file_path(path); + + //- rjf: abort if modification timestamps differ - we did not successfully read the file + if(pre_props.modified != post_props.modified) { - Temp scratch = scratch_begin(0, 0); - - //- rjf: get next request - Rng1U64 range = {0}; - String8 path = {0}; - fs_u2s_dequeue_req(scratch.arena, &range, &path); - - //- rjf: unpack request - U128 key = fs_big_hash_from_string_range(path, range); - U64 path_hash = fs_little_hash_from_string(path); - U64 path_slot_idx = path_hash%fs_shared->slots_count; - U64 path_stripe_idx = path_slot_idx%fs_shared->stripes_count; - FS_Slot *path_slot = &fs_shared->slots[path_slot_idx]; - FS_Stripe *path_stripe = &fs_shared->stripes[path_stripe_idx]; - - //- rjf: load - ProfBegin("load \"%.*s\"", str8_varg(path)); - FileProperties pre_props = os_properties_from_file_path(path); - U64 range_size = dim_1u64(range); - U64 read_size = Min(pre_props.size, range_size); - OS_Handle file = os_file_open(OS_AccessFlag_Read|OS_AccessFlag_ShareRead|OS_AccessFlag_ShareWrite, path); - U64 data_arena_size = read_size+ARENA_HEADER_SIZE; - data_arena_size += KB(4)-1; - data_arena_size -= data_arena_size%KB(4); - ProfBegin("allocate"); - Arena *data_arena = arena_alloc(.reserve_size = data_arena_size, .commit_size = data_arena_size); - ProfEnd(); - ProfBegin("read"); - String8 data = os_string_from_file_range(data_arena, file, r1u64(range.min, range.min+read_size)); - ProfEnd(); - os_file_close(file); - FileProperties post_props = os_properties_from_file_path(path); - - //- rjf: abort if modification timestamps differ - we did not successfully read the file - if(pre_props.modified != post_props.modified) + ProfScope("abort") { - ProfScope("abort") + arena_release(data_arena); + MemoryZeroStruct(&data); + data_arena = 0; + } + } + + //- rjf: submit + else + { + ProfScope("submit") + { + hs_submit_data(key, &data_arena, data); + } + } + + //- rjf: commit info to cache + ProfScope("commit to cache") OS_MutexScopeW(path_stripe->rw_mutex) + { + FS_Node *node = 0; + for(FS_Node *n = path_slot->first; n != 0; n = n->next) + { + if(str8_match(n->path, path, 0)) { - arena_release(data_arena); - MemoryZeroStruct(&data); - data_arena = 0; + node = n; + break; } } - - //- rjf: submit - else + if(node != 0) { - ProfScope("submit") + if(node->timestamp != 0) { - hs_submit_data(key, &data_arena, data); + ins_atomic_u64_inc_eval(&fs_shared->change_gen); } - } - - //- rjf: commit info to cache - ProfScope("commit to cache") OS_MutexScopeW(path_stripe->rw_mutex) - { - FS_Node *node = 0; - for(FS_Node *n = path_slot->first; n != 0; n = n->next) + if(post_props.modified == pre_props.modified) { - if(str8_match(n->path, path, 0)) + node->timestamp = post_props.modified; + node->size = post_props.size; + } + U64 range_hash = fs_little_hash_from_string(str8_struct(&range)); + U64 range_slot_idx = range_hash%node->slots_count; + FS_RangeSlot *range_slot = &node->slots[range_slot_idx]; + FS_RangeNode *range_node = 0; + for(FS_RangeNode *n = range_slot->first; n != 0; n = n->next) + { + if(MemoryMatchStruct(&n->range, &range)) { - node = n; + range_node = n; break; } } - if(node != 0) - { - if(node->timestamp != 0) - { - ins_atomic_u64_inc_eval(&fs_shared->change_gen); - } - if(post_props.modified == pre_props.modified) - { - node->timestamp = post_props.modified; - node->size = post_props.size; - } - U64 range_hash = fs_little_hash_from_string(str8_struct(&range)); - U64 range_slot_idx = range_hash%node->slots_count; - FS_RangeSlot *range_slot = &node->slots[range_slot_idx]; - FS_RangeNode *range_node = 0; - for(FS_RangeNode *n = range_slot->first; n != 0; n = n->next) - { - if(MemoryMatchStruct(&n->range, &range)) - { - range_node = n; - break; - } - } - ins_atomic_u32_eval_assign(&range_node->is_working, 0); - } } - os_condition_variable_broadcast(path_stripe->cv); - - ProfEnd(); - scratch_end(scratch); } + os_condition_variable_broadcast(path_stripe->cv); + + ProfEnd(); + scratch_end(scratch); + return 0; } //////////////////////////////// @@ -449,10 +445,11 @@ fs_detector_thread__entry_point(void *p) range_n != 0; range_n = range_n->next) { - if(!ins_atomic_u32_eval_cond_assign(&range_n->is_working, 1, 0) && - !fs_u2s_enqueue_req(range_n->range, n->path, os_now_microseconds()+100000)) + if(ins_atomic_u64_eval(&range_n->request_count) == ins_atomic_u64_eval(&range_n->completion_count) && + async_push_work(fs_stream_work, .endt_us = os_now_microseconds()+100000, .completion_counter = &range_n->completion_count)) { - ins_atomic_u32_eval_assign(&range_n->is_working, 0); + fs_u2s_enqueue_req(range_n->range, n->path, max_U64); + ins_atomic_u64_inc_eval(&range_n->request_count); } } } diff --git a/src/file_stream/file_stream.h b/src/file_stream/file_stream.h index 26d3a6f0..e948c5a1 100644 --- a/src/file_stream/file_stream.h +++ b/src/file_stream/file_stream.h @@ -12,7 +12,8 @@ struct FS_RangeNode { FS_RangeNode *next; Rng1U64 range; - B32 is_working; + U64 request_count; + U64 completion_count; }; typedef struct FS_RangeSlot FS_RangeSlot; @@ -114,12 +115,11 @@ internal U64 fs_timestamp_from_path(String8 path); internal U64 fs_size_from_path(String8 path); //////////////////////////////// -//~ rjf: Streamer Threads +//~ rjf: Streaming Work internal B32 fs_u2s_enqueue_req(Rng1U64 range, String8 path, U64 endt_us); internal void fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out); - -internal void fs_streamer_thread__entry_point(void *p); +ASYNC_WORK_DEF(fs_stream_work); //////////////////////////////// //~ rjf: Change Detector Thread