diff --git a/src/base/base_entry_point.c b/src/base/base_entry_point.c index d2d1e966..a0de2ba9 100644 --- a/src/base/base_entry_point.c +++ b/src/base/base_entry_point.c @@ -6,6 +6,7 @@ global CondVar async_tick_start_cond_var = {0}; global CondVar async_tick_stop_cond_var = {0}; global Mutex async_tick_start_mutex = {0}; global Mutex async_tick_stop_mutex = {0}; +global U64 async_wait_timeout = 0; global B32 global_async_exit = 0; internal void diff --git a/src/dasm_cache/dasm_cache.c b/src/dasm_cache/dasm_cache.c index c6b10a54..a3754f7d 100644 --- a/src/dasm_cache/dasm_cache.c +++ b/src/dasm_cache/dasm_cache.c @@ -538,13 +538,20 @@ dasm_tick(void) arena_clear(dasm_shared->req_arena); dasm_shared->first_req = dasm_shared->last_req = 0; dasm_shared->req_count = 0; + dasm_shared->lane_req_take_counter = 0; } lane_sync(); //- rjf: do requests - Rng1U64 range = lane_range(reqs_count); - for EachInRange(req_idx, range) + for(;;) { + //- rjf: get next request + U64 req_num = ins_atomic_u64_inc_eval(&dasm_shared->lane_req_take_counter); + if(req_num < 1 || reqs_count < req_num) + { + break; + } + U64 req_idx = req_num-1; HS_Scope *hs_scope = hs_scope_open(); DI_Scope *di_scope = di_scope_open(); TXT_Scope *txt_scope = txt_scope_open(); @@ -766,7 +773,7 @@ dasm_tick(void) } //- rjf: re-request if stale - MutexScope(dasm_shared->req_mutex) + if(stale) MutexScope(dasm_shared->req_mutex) { DASM_RequestNode *req_n = push_array(dasm_shared->req_arena, DASM_RequestNode, 1); SLLQueuePush(dasm_shared->first_req, dasm_shared->last_req, req_n); @@ -781,6 +788,7 @@ dasm_tick(void) di_scope_close(di_scope); hs_scope_close(hs_scope); } + lane_sync(); scratch_end(scratch); ProfEnd(); diff --git a/src/dasm_cache/dasm_cache.h b/src/dasm_cache/dasm_cache.h index 855cf348..26690ec6 100644 --- a/src/dasm_cache/dasm_cache.h +++ b/src/dasm_cache/dasm_cache.h @@ -268,6 +268,9 @@ struct DASM_Shared DASM_RequestNode *first_req; DASM_RequestNode *last_req; U64 req_count; + + // rjf: request take counter + U64 lane_req_take_counter; }; //////////////////////////////// diff --git a/src/file_stream/file_stream.c b/src/file_stream/file_stream.c index d7a0dd02..488a1552 100644 --- a/src/file_stream/file_stream.c +++ b/src/file_stream/file_stream.c @@ -310,14 +310,20 @@ fs_tick(void) arena_clear(fs_shared->req_arena); fs_shared->first_req = fs_shared->last_req = 0; fs_shared->req_count = 0; + fs_shared->lane_req_take_counter = 0; } lane_sync(); //- rjf: do requests - Rng1U64 range = lane_range(reqs_count); - for EachInRange(req_idx, range) + for(;;) { //- rjf: unpack + U64 req_num = ins_atomic_u64_inc_eval(&fs_shared->lane_req_take_counter); + if(req_num < 1 || reqs_count < req_num) + { + break; + } + U64 req_idx = req_num-1; FS_Request *r = &reqs[req_idx]; HS_Key key = r->key; String8 path = r->path; @@ -394,6 +400,7 @@ fs_tick(void) } cond_var_broadcast(path_stripe->cv); } + lane_sync(); scratch_end(scratch); ProfEnd(); diff --git a/src/file_stream/file_stream.h b/src/file_stream/file_stream.h index 8bd6ba3b..ad084ca5 100644 --- a/src/file_stream/file_stream.h +++ b/src/file_stream/file_stream.h @@ -91,6 +91,9 @@ struct FS_Shared FS_RequestNode *first_req; FS_RequestNode *last_req; U64 req_count; + + // rjf: request take counter + U64 lane_req_take_counter; }; //////////////////////////////// diff --git a/src/hash_store/hash_store.c b/src/hash_store/hash_store.c index 48555c7c..7f7771fd 100644 --- a/src/hash_store/hash_store.c +++ b/src/hash_store/hash_store.c @@ -69,9 +69,9 @@ hs_init(void) hs_shared->arena = arena; hs_shared->slots_count = 4096; hs_shared->stripes_count = Min(hs_shared->slots_count, os_get_system_info()->logical_processor_count); - hs_shared->slots = push_array(arena, HS_Slot, hs_shared->slots_count); + hs_shared->slots = push_array(arena, HS_BlobSlot, hs_shared->slots_count); hs_shared->stripes = push_array(arena, HS_Stripe, hs_shared->stripes_count); - hs_shared->stripes_free_nodes = push_array(arena, HS_Node *, hs_shared->stripes_count); + hs_shared->stripes_free_nodes = push_array(arena, HS_BlobNode *, hs_shared->stripes_count); for(U64 idx = 0; idx < hs_shared->stripes_count; idx += 1) { HS_Stripe *stripe = &hs_shared->stripes[idx]; @@ -186,11 +186,11 @@ hs_root_release(HS_Root root) U128 hash = n->hash_history[(n->hash_history_gen+history_idx)%ArrayCount(n->hash_history)]; U64 hash_slot_idx = hash.u64[1]%hs_shared->slots_count; U64 hash_stripe_idx = hash_slot_idx%hs_shared->stripes_count; - HS_Slot *hash_slot = &hs_shared->slots[hash_slot_idx]; + HS_BlobSlot *hash_slot = &hs_shared->slots[hash_slot_idx]; HS_Stripe *hash_stripe = &hs_shared->stripes[hash_stripe_idx]; MutexScopeR(hash_stripe->rw_mutex) { - for(HS_Node *n = hash_slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = hash_slot->first; n != 0; n = n->next) { if(u128_match(n->hash, hash)) { @@ -226,14 +226,14 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data) U128 hash = hs_hash_from_data(data); U64 slot_idx = hash.u64[1]%hs_shared->slots_count; U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; //- rjf: commit data to cache - if already there, just bump key refcount ProfScope("commit data to cache - if already there, just bump key refcount") MutexScopeW(stripe->rw_mutex) { - HS_Node *existing_node = 0; - for(HS_Node *n = slot->first; n != 0; n = n->next) + HS_BlobNode *existing_node = 0; + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { if(u128_match(n->hash, hash)) { @@ -243,14 +243,14 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data) } if(existing_node == 0) { - HS_Node *node = hs_shared->stripes_free_nodes[stripe_idx]; + HS_BlobNode *node = hs_shared->stripes_free_nodes[stripe_idx]; if(node) { SLLStackPop(hs_shared->stripes_free_nodes[stripe_idx]); } else { - node = push_array(stripe->arena, HS_Node, 1); + node = push_array(stripe->arena, HS_BlobNode, 1); } node->hash = hash; if(data_arena != 0) @@ -359,11 +359,11 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data) { U64 old_hash_slot_idx = key_expired_hash.u64[1]%hs_shared->slots_count; U64 old_hash_stripe_idx = old_hash_slot_idx%hs_shared->stripes_count; - HS_Slot *old_hash_slot = &hs_shared->slots[old_hash_slot_idx]; + HS_BlobSlot *old_hash_slot = &hs_shared->slots[old_hash_slot_idx]; HS_Stripe *old_hash_stripe = &hs_shared->stripes[old_hash_stripe_idx]; MutexScopeR(old_hash_stripe->rw_mutex) { - for(HS_Node *n = old_hash_slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = old_hash_slot->first; n != 0; n = n->next) { if(u128_match(n->hash, key_expired_hash)) { @@ -411,11 +411,11 @@ hs_scope_close(HS_Scope *scope) next = touch->next; U64 slot_idx = hash.u64[1]%hs_shared->slots_count; U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; MutexScopeR(stripe->rw_mutex) { - for(HS_Node *n = slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { if(u128_match(hash, n->hash)) { @@ -430,7 +430,7 @@ hs_scope_close(HS_Scope *scope) } internal void -hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_Node *node) +hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_BlobNode *node) { HS_Touch *touch = hs_tctx->free_touch; ins_atomic_u64_inc_eval(&node->scope_ref_count); @@ -455,11 +455,11 @@ hs_hash_downstream_inc(U128 hash) { U64 slot_idx = hash.u64[1]%hs_shared->slots_count; U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; MutexScopeR(stripe->rw_mutex) { - for(HS_Node *n = slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { if(u128_match(hash, n->hash)) { @@ -475,11 +475,11 @@ hs_hash_downstream_dec(U128 hash) { U64 slot_idx = hash.u64[1]%hs_shared->slots_count; U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; MutexScopeR(stripe->rw_mutex) { - for(HS_Node *n = slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { if(u128_match(hash, n->hash)) { @@ -523,11 +523,11 @@ hs_data_from_hash(HS_Scope *scope, U128 hash) String8 result = {0}; U64 slot_idx = hash.u64[1]%hs_shared->slots_count; U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; MutexScopeR(stripe->rw_mutex) { - for(HS_Node *n = slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { if(u128_match(n->hash, hash)) { @@ -552,12 +552,12 @@ hs_tick(void) for EachInRange(slot_idx, range) { U64 stripe_idx = slot_idx%hs_shared->stripes_count; - HS_Slot *slot = &hs_shared->slots[slot_idx]; + HS_BlobSlot *slot = &hs_shared->slots[slot_idx]; HS_Stripe *stripe = &hs_shared->stripes[stripe_idx]; B32 slot_has_work = 0; MutexScopeR(stripe->rw_mutex) { - for(HS_Node *n = slot->first; n != 0; n = n->next) + for(HS_BlobNode *n = slot->first; n != 0; n = n->next) { U64 key_ref_count = ins_atomic_u64_eval(&n->key_ref_count); U64 scope_ref_count = ins_atomic_u64_eval(&n->scope_ref_count); @@ -571,7 +571,7 @@ hs_tick(void) } if(slot_has_work) MutexScopeW(stripe->rw_mutex) { - for(HS_Node *n = slot->first, *next = 0; n != 0; n = next) + for(HS_BlobNode *n = slot->first, *next = 0; n != 0; n = next) { next = n->next; U64 key_ref_count = ins_atomic_u64_eval(&n->key_ref_count); diff --git a/src/hash_store/hash_store.h b/src/hash_store/hash_store.h index 488b8730..73dce513 100644 --- a/src/hash_store/hash_store.h +++ b/src/hash_store/hash_store.h @@ -64,7 +64,7 @@ struct HS_Key }; //////////////////////////////// -//~ rjf: Cache Types +//~ rjf: Root Cache Types typedef struct HS_RootIDChunkNode HS_RootIDChunkNode; struct HS_RootIDChunkNode @@ -101,6 +101,9 @@ struct HS_RootSlot HS_RootNode *last; }; +//////////////////////////////// +//~ rjf: Key Cache Types + #define HS_KEY_HASH_HISTORY_COUNT 64 #define HS_KEY_HASH_HISTORY_STRONG_REF_COUNT 2 @@ -121,11 +124,14 @@ struct HS_KeySlot HS_KeyNode *last; }; -typedef struct HS_Node HS_Node; -struct HS_Node +//////////////////////////////// +//~ rjf: Content Blob Cache Types + +typedef struct HS_BlobNode HS_BlobNode; +struct HS_BlobNode { - HS_Node *next; - HS_Node *prev; + HS_BlobNode *next; + HS_BlobNode *prev; U128 hash; Arena *arena; String8 data; @@ -134,11 +140,11 @@ struct HS_Node U64 downstream_ref_count; }; -typedef struct HS_Slot HS_Slot; -struct HS_Slot +typedef struct HS_BlobSlot HS_BlobSlot; +struct HS_BlobSlot { - HS_Node *first; - HS_Node *last; + HS_BlobNode *first; + HS_BlobNode *last; }; typedef struct HS_Stripe HS_Stripe; @@ -188,9 +194,9 @@ struct HS_Shared // rjf: main data cache U64 slots_count; U64 stripes_count; - HS_Slot *slots; + HS_BlobSlot *slots; HS_Stripe *stripes; - HS_Node **stripes_free_nodes; + HS_BlobNode **stripes_free_nodes; // rjf: key cache U64 key_slots_count; @@ -245,7 +251,7 @@ internal U128 hs_submit_data(HS_Key key, Arena **data_arena, String8 data); internal HS_Scope *hs_scope_open(void); internal void hs_scope_close(HS_Scope *scope); -internal void hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_Node *node); +internal void hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_BlobNode *node); //////////////////////////////// //~ rjf: Downstream Accesses