switch from static lane distribution in file/dasm to dynamic counter

This commit is contained in:
Ryan Fleury
2025-09-18 14:19:00 -07:00
parent 7c08d6846b
commit 364e15491c
7 changed files with 68 additions and 40 deletions
+1
View File
@@ -6,6 +6,7 @@ global CondVar async_tick_start_cond_var = {0};
global CondVar async_tick_stop_cond_var = {0};
global Mutex async_tick_start_mutex = {0};
global Mutex async_tick_stop_mutex = {0};
global U64 async_wait_timeout = 0;
global B32 global_async_exit = 0;
internal void
+11 -3
View File
@@ -538,13 +538,20 @@ dasm_tick(void)
arena_clear(dasm_shared->req_arena);
dasm_shared->first_req = dasm_shared->last_req = 0;
dasm_shared->req_count = 0;
dasm_shared->lane_req_take_counter = 0;
}
lane_sync();
//- rjf: do requests
Rng1U64 range = lane_range(reqs_count);
for EachInRange(req_idx, range)
for(;;)
{
//- rjf: get next request
U64 req_num = ins_atomic_u64_inc_eval(&dasm_shared->lane_req_take_counter);
if(req_num < 1 || reqs_count < req_num)
{
break;
}
U64 req_idx = req_num-1;
HS_Scope *hs_scope = hs_scope_open();
DI_Scope *di_scope = di_scope_open();
TXT_Scope *txt_scope = txt_scope_open();
@@ -766,7 +773,7 @@ dasm_tick(void)
}
//- rjf: re-request if stale
MutexScope(dasm_shared->req_mutex)
if(stale) MutexScope(dasm_shared->req_mutex)
{
DASM_RequestNode *req_n = push_array(dasm_shared->req_arena, DASM_RequestNode, 1);
SLLQueuePush(dasm_shared->first_req, dasm_shared->last_req, req_n);
@@ -781,6 +788,7 @@ dasm_tick(void)
di_scope_close(di_scope);
hs_scope_close(hs_scope);
}
lane_sync();
scratch_end(scratch);
ProfEnd();
+3
View File
@@ -268,6 +268,9 @@ struct DASM_Shared
DASM_RequestNode *first_req;
DASM_RequestNode *last_req;
U64 req_count;
// rjf: request take counter
U64 lane_req_take_counter;
};
////////////////////////////////
+9 -2
View File
@@ -310,14 +310,20 @@ fs_tick(void)
arena_clear(fs_shared->req_arena);
fs_shared->first_req = fs_shared->last_req = 0;
fs_shared->req_count = 0;
fs_shared->lane_req_take_counter = 0;
}
lane_sync();
//- rjf: do requests
Rng1U64 range = lane_range(reqs_count);
for EachInRange(req_idx, range)
for(;;)
{
//- rjf: unpack
U64 req_num = ins_atomic_u64_inc_eval(&fs_shared->lane_req_take_counter);
if(req_num < 1 || reqs_count < req_num)
{
break;
}
U64 req_idx = req_num-1;
FS_Request *r = &reqs[req_idx];
HS_Key key = r->key;
String8 path = r->path;
@@ -394,6 +400,7 @@ fs_tick(void)
}
cond_var_broadcast(path_stripe->cv);
}
lane_sync();
scratch_end(scratch);
ProfEnd();
+3
View File
@@ -91,6 +91,9 @@ struct FS_Shared
FS_RequestNode *first_req;
FS_RequestNode *last_req;
U64 req_count;
// rjf: request take counter
U64 lane_req_take_counter;
};
////////////////////////////////
+23 -23
View File
@@ -69,9 +69,9 @@ hs_init(void)
hs_shared->arena = arena;
hs_shared->slots_count = 4096;
hs_shared->stripes_count = Min(hs_shared->slots_count, os_get_system_info()->logical_processor_count);
hs_shared->slots = push_array(arena, HS_Slot, hs_shared->slots_count);
hs_shared->slots = push_array(arena, HS_BlobSlot, hs_shared->slots_count);
hs_shared->stripes = push_array(arena, HS_Stripe, hs_shared->stripes_count);
hs_shared->stripes_free_nodes = push_array(arena, HS_Node *, hs_shared->stripes_count);
hs_shared->stripes_free_nodes = push_array(arena, HS_BlobNode *, hs_shared->stripes_count);
for(U64 idx = 0; idx < hs_shared->stripes_count; idx += 1)
{
HS_Stripe *stripe = &hs_shared->stripes[idx];
@@ -186,11 +186,11 @@ hs_root_release(HS_Root root)
U128 hash = n->hash_history[(n->hash_history_gen+history_idx)%ArrayCount(n->hash_history)];
U64 hash_slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 hash_stripe_idx = hash_slot_idx%hs_shared->stripes_count;
HS_Slot *hash_slot = &hs_shared->slots[hash_slot_idx];
HS_BlobSlot *hash_slot = &hs_shared->slots[hash_slot_idx];
HS_Stripe *hash_stripe = &hs_shared->stripes[hash_stripe_idx];
MutexScopeR(hash_stripe->rw_mutex)
{
for(HS_Node *n = hash_slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = hash_slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash))
{
@@ -226,14 +226,14 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data)
U128 hash = hs_hash_from_data(data);
U64 slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
//- rjf: commit data to cache - if already there, just bump key refcount
ProfScope("commit data to cache - if already there, just bump key refcount") MutexScopeW(stripe->rw_mutex)
{
HS_Node *existing_node = 0;
for(HS_Node *n = slot->first; n != 0; n = n->next)
HS_BlobNode *existing_node = 0;
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash))
{
@@ -243,14 +243,14 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data)
}
if(existing_node == 0)
{
HS_Node *node = hs_shared->stripes_free_nodes[stripe_idx];
HS_BlobNode *node = hs_shared->stripes_free_nodes[stripe_idx];
if(node)
{
SLLStackPop(hs_shared->stripes_free_nodes[stripe_idx]);
}
else
{
node = push_array(stripe->arena, HS_Node, 1);
node = push_array(stripe->arena, HS_BlobNode, 1);
}
node->hash = hash;
if(data_arena != 0)
@@ -359,11 +359,11 @@ hs_submit_data(HS_Key key, Arena **data_arena, String8 data)
{
U64 old_hash_slot_idx = key_expired_hash.u64[1]%hs_shared->slots_count;
U64 old_hash_stripe_idx = old_hash_slot_idx%hs_shared->stripes_count;
HS_Slot *old_hash_slot = &hs_shared->slots[old_hash_slot_idx];
HS_BlobSlot *old_hash_slot = &hs_shared->slots[old_hash_slot_idx];
HS_Stripe *old_hash_stripe = &hs_shared->stripes[old_hash_stripe_idx];
MutexScopeR(old_hash_stripe->rw_mutex)
{
for(HS_Node *n = old_hash_slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = old_hash_slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, key_expired_hash))
{
@@ -411,11 +411,11 @@ hs_scope_close(HS_Scope *scope)
next = touch->next;
U64 slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
MutexScopeR(stripe->rw_mutex)
{
for(HS_Node *n = slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(hash, n->hash))
{
@@ -430,7 +430,7 @@ hs_scope_close(HS_Scope *scope)
}
internal void
hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_Node *node)
hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_BlobNode *node)
{
HS_Touch *touch = hs_tctx->free_touch;
ins_atomic_u64_inc_eval(&node->scope_ref_count);
@@ -455,11 +455,11 @@ hs_hash_downstream_inc(U128 hash)
{
U64 slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
MutexScopeR(stripe->rw_mutex)
{
for(HS_Node *n = slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(hash, n->hash))
{
@@ -475,11 +475,11 @@ hs_hash_downstream_dec(U128 hash)
{
U64 slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
MutexScopeR(stripe->rw_mutex)
{
for(HS_Node *n = slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(hash, n->hash))
{
@@ -523,11 +523,11 @@ hs_data_from_hash(HS_Scope *scope, U128 hash)
String8 result = {0};
U64 slot_idx = hash.u64[1]%hs_shared->slots_count;
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
MutexScopeR(stripe->rw_mutex)
{
for(HS_Node *n = slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash))
{
@@ -552,12 +552,12 @@ hs_tick(void)
for EachInRange(slot_idx, range)
{
U64 stripe_idx = slot_idx%hs_shared->stripes_count;
HS_Slot *slot = &hs_shared->slots[slot_idx];
HS_BlobSlot *slot = &hs_shared->slots[slot_idx];
HS_Stripe *stripe = &hs_shared->stripes[stripe_idx];
B32 slot_has_work = 0;
MutexScopeR(stripe->rw_mutex)
{
for(HS_Node *n = slot->first; n != 0; n = n->next)
for(HS_BlobNode *n = slot->first; n != 0; n = n->next)
{
U64 key_ref_count = ins_atomic_u64_eval(&n->key_ref_count);
U64 scope_ref_count = ins_atomic_u64_eval(&n->scope_ref_count);
@@ -571,7 +571,7 @@ hs_tick(void)
}
if(slot_has_work) MutexScopeW(stripe->rw_mutex)
{
for(HS_Node *n = slot->first, *next = 0; n != 0; n = next)
for(HS_BlobNode *n = slot->first, *next = 0; n != 0; n = next)
{
next = n->next;
U64 key_ref_count = ins_atomic_u64_eval(&n->key_ref_count);
+18 -12
View File
@@ -64,7 +64,7 @@ struct HS_Key
};
////////////////////////////////
//~ rjf: Cache Types
//~ rjf: Root Cache Types
typedef struct HS_RootIDChunkNode HS_RootIDChunkNode;
struct HS_RootIDChunkNode
@@ -101,6 +101,9 @@ struct HS_RootSlot
HS_RootNode *last;
};
////////////////////////////////
//~ rjf: Key Cache Types
#define HS_KEY_HASH_HISTORY_COUNT 64
#define HS_KEY_HASH_HISTORY_STRONG_REF_COUNT 2
@@ -121,11 +124,14 @@ struct HS_KeySlot
HS_KeyNode *last;
};
typedef struct HS_Node HS_Node;
struct HS_Node
////////////////////////////////
//~ rjf: Content Blob Cache Types
typedef struct HS_BlobNode HS_BlobNode;
struct HS_BlobNode
{
HS_Node *next;
HS_Node *prev;
HS_BlobNode *next;
HS_BlobNode *prev;
U128 hash;
Arena *arena;
String8 data;
@@ -134,11 +140,11 @@ struct HS_Node
U64 downstream_ref_count;
};
typedef struct HS_Slot HS_Slot;
struct HS_Slot
typedef struct HS_BlobSlot HS_BlobSlot;
struct HS_BlobSlot
{
HS_Node *first;
HS_Node *last;
HS_BlobNode *first;
HS_BlobNode *last;
};
typedef struct HS_Stripe HS_Stripe;
@@ -188,9 +194,9 @@ struct HS_Shared
// rjf: main data cache
U64 slots_count;
U64 stripes_count;
HS_Slot *slots;
HS_BlobSlot *slots;
HS_Stripe *stripes;
HS_Node **stripes_free_nodes;
HS_BlobNode **stripes_free_nodes;
// rjf: key cache
U64 key_slots_count;
@@ -245,7 +251,7 @@ internal U128 hs_submit_data(HS_Key key, Arena **data_arena, String8 data);
internal HS_Scope *hs_scope_open(void);
internal void hs_scope_close(HS_Scope *scope);
internal void hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_Node *node);
internal void hs_scope_touch_node__stripe_r_guarded(HS_Scope *scope, HS_BlobNode *node);
////////////////////////////////
//~ rjf: Downstream Accesses