artifact cache waitable cache access; debugging / fixes; start plugging in file stream to artifact cache

This commit is contained in:
Ryan Fleury
2025-09-24 11:25:18 -07:00
parent 7e05a60ffe
commit e7368af35c
11 changed files with 260 additions and 179 deletions
+86 -53
View File
@@ -21,12 +21,12 @@ ac_init(void)
//~ rjf: Cache Lookups
internal AC_Artifact
ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType *create, AC_DestroyFunctionType *destroy, U64 slots_count)
ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U64 endt_us)
{
//- rjf: create function -> cache
AC_Cache *cache = 0;
{
U64 cache_hash = u64_hash_from_str8(str8_struct(&create));
U64 cache_hash = u64_hash_from_str8(str8_struct(&params->create));
U64 cache_slot_idx = cache_hash%ac_shared->cache_slots_count;
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
@@ -35,7 +35,7 @@ ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType
{
for(AC_Cache *c = ac_shared->cache_slots[cache_slot_idx]; c != 0; c = c->next)
{
if(c->create == create)
if(c->create == params->create)
{
cache = c;
break;
@@ -45,10 +45,10 @@ ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType
{
cache = push_array(cache_stripe->arena, AC_Cache, 1);
SLLStackPush(ac_shared->cache_slots[cache_slot_idx], cache);
cache->create = create;
cache->destroy = destroy;
cache->slots_count = slots_count;
cache->slots = push_array(cache_stripe->arena, AC_Slot, slots_count);
cache->create = params->create;
cache->destroy = params->destroy;
cache->slots_count = Max(256, params->slots_count);
cache->slots = push_array(cache_stripe->arena, AC_Slot, cache->slots_count);
cache->stripes = stripe_array_alloc(cache_stripe->arena);
}
}
@@ -59,78 +59,109 @@ ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType
}
}
//- rjf: cache * key -> artifact
//- rjf: unpack key
U64 hash = u64_hash_from_str8(key);
U64 slot_idx = hash%cache->slots_count;
AC_Slot *slot = &cache->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
//- rjf: cache * key -> existing artifact
B32 got_artifact = 0;
B32 need_request = 0;
AC_Artifact artifact = {0};
RWMutexScope(stripe->rw_mutex, 0)
{
U64 hash = u64_hash_from_str8(key);
U64 slot_idx = hash%cache->slots_count;
AC_Slot *slot = &cache->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
for(AC_Node *n = slot->first; n != 0; n = n->next)
{
B32 found = 0;
B32 need_request = 0;
RWMutexScope(stripe->rw_mutex, write_mode)
if(str8_match(n->key, key, 0))
{
// rjf: look up node
for(AC_Node *n = slot->first; n != 0; n = n->next)
if(ins_atomic_u64_eval(&n->completion_count) != 0 && (n->gen == params->gen || !params->wait_for_fresh))
{
if(str8_match(n->key, key, 0))
{
found = 1;
artifact = n->val;
access_touch(access, &n->access_pt, stripe->cv);
if(n->gen != gen)
{
B32 got_task = (ins_atomic_u64_eval_cond_assign(&n->working_count, 1, 0) == 0);
need_request = got_task;
}
break;
}
got_artifact = 1;
artifact = n->val;
access_touch(access, &n->access_pt, stripe->cv);
}
// rjf: no node? -> create
if(write_mode && !found)
if(n->gen != params->gen)
{
need_request = 1;
AC_Node *node = stripe->free;
if(node)
{
stripe->free = node->next;
}
else
{
node = push_array_no_zero(stripe->arena, AC_Node, 1);
}
MemoryZeroStruct(node);
DLLPushBack(slot->first, slot->last, node);
// TODO(rjf): string allocator for keys
node->key = str8_copy(stripe->arena, key);
node->working_count = 1;
B32 got_task = (ins_atomic_u64_eval_cond_assign(&n->working_count, 1, 0) == 0);
need_request = got_task;
}
break;
}
}
}
//- rjf: didn't get artifact we want? -> fall back to slow path
if(!got_artifact || need_request)
{
RWMutexScope(stripe->rw_mutex, 1) for(;;)
{
B32 out_of_time = (os_now_microseconds() >= endt_us);
// rjf: find node in cache
AC_Node *node = 0;
for(AC_Node *n = slot->first; n != 0; n = n->next)
{
if(str8_match(n->key, key, 0))
{
node = n;
break;
}
}
// rjf: need request -> push
// rjf: no node? -> create
if(node == 0)
{
need_request = 1;
node = stripe->free;
if(node)
{
stripe->free = node->next;
}
else
{
node = push_array_no_zero(stripe->arena, AC_Node, 1);
}
MemoryZeroStruct(node);
DLLPushBack(slot->first, slot->last, node);
// TODO(rjf): string allocator for keys
node->key = str8_copy(stripe->arena, key);
node->working_count = 1;
}
// rjf: request
if(need_request)
{
need_request = 0;
MutexScope(ac_shared->req_mutex)
{
AC_RequestNode *n = push_array(ac_shared->req_arena, AC_RequestNode, 1);
SLLQueuePush(ac_shared->first_req, ac_shared->last_req, n);
ac_shared->req_count += 1;
n->v.key = str8_copy(ac_shared->req_arena, key);
n->v.gen = gen;
n->v.create = create;
n->v.gen = params->gen;
n->v.create = params->create;
}
cond_var_broadcast(async_tick_start_cond_var);
ins_atomic_u32_eval_assign(&async_loop_again, 1);
}
// rjf: found node -> break
if(found)
// rjf: get value from node, if possible
if(!got_artifact && ins_atomic_u64_eval(&node->completion_count) != 0 && ((node->gen == params->gen) || !params->wait_for_fresh || out_of_time))
{
got_artifact = 1;
artifact = node->val;
access_touch(access, &node->access_pt, stripe->cv);
}
// rjf: abort if needed
if(out_of_time || got_artifact || is_async_thread)
{
break;
}
// rjf: wait for results
cond_var_wait_rw(stripe->cv, stripe->rw_mutex, 1, endt_us);
}
}
@@ -275,9 +306,11 @@ ac_async_tick(void)
n->gen = reqs[idx].gen;
n->val = val;
ins_atomic_u64_dec_eval(&n->working_count);
ins_atomic_u64_inc_eval(&n->completion_count);
}
}
}
cond_var_broadcast(stripe->cv);
}
}
lane_sync();
+13 -1
View File
@@ -19,6 +19,16 @@ struct AC_Artifact
typedef AC_Artifact AC_CreateFunctionType(String8 key, B32 *retry_out);
typedef void AC_DestroyFunctionType(AC_Artifact artifact);
typedef struct AC_ArtifactParams AC_ArtifactParams;
struct AC_ArtifactParams
{
AC_CreateFunctionType *create;
AC_DestroyFunctionType *destroy;
U64 slots_count;
U64 gen;
B32 wait_for_fresh;
};
////////////////////////////////
//~ rjf: Cache Types
@@ -51,6 +61,7 @@ struct AC_Node
// rjf: metadata
AccessPt access_pt;
U64 working_count;
U64 completion_count;
};
typedef struct AC_Slot AC_Slot;
@@ -105,7 +116,8 @@ internal void ac_init(void);
////////////////////////////////
//~ rjf: Cache Lookups
internal AC_Artifact ac_artifact_from_key(Access *access, String8 key, U64 gen, AC_CreateFunctionType *create, AC_DestroyFunctionType *destroy, U64 slots_count);
internal AC_Artifact ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U64 endt_us);
#define ac_artifact_from_key(access, key, create_fn, destroy_fn, endt_us, ...) ac_artifact_from_key_((access), (key), &(AC_ArtifactParams){.create = (create_fn), .destroy = (destroy_fn), __VA_ARGS__}, (endt_us))
////////////////////////////////
//~ rjf: Asynchronous Tick