pull out table stripe arrays as base layer primitive; unified 'artifact cache' experiment

This commit is contained in:
Ryan Fleury
2025-09-22 14:07:21 -07:00
parent 6b278e1ccc
commit 9459327687
8 changed files with 467 additions and 25 deletions
+207
View File
@@ -0,0 +1,207 @@
// Copyright (c) Epic Games Tools
// Licensed under the MIT license (https://opensource.org/license/mit/)
////////////////////////////////
//~ rjf: Layer Initialization
internal void
ac_init(void)
{
Arena *arena = arena_alloc();
ac_shared = push_array(arena, AC_Shared, 1);
ac_shared->arena = arena;
ac_shared->cache_slots_count = 256;
ac_shared->cache_slots = push_array(arena, AC_Cache *, ac_shared->cache_slots_count);
ac_shared->cache_stripes = stripe_array_alloc(arena);
}
////////////////////////////////
//~ rjf: Cache Lookups
internal void *
ac_artifact_from_key(Access *access, String8 key, AC_CreateFunctionType *create, AC_DestroyFunctionType *destroy, U64 slots_count)
{
//- rjf: create function -> cache
AC_Cache *cache = 0;
{
U64 cache_hash = u64_hash_from_str8(str8_struct(&create));
U64 cache_slot_idx = cache_hash%ac_shared->cache_slots_count;
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
RWMutexScope(cache_stripe->rw_mutex, write_mode)
{
for(AC_Cache *c = ac_shared->cache_slots[cache_slot_idx]; c != 0; c = c->next)
{
if(c->create == create)
{
cache = c;
break;
}
}
if(write_mode && cache == 0)
{
cache = push_array(cache_stripe->arena, AC_Cache, 1);
SLLStackPush(ac_shared->cache_slots[cache_slot_idx], cache);
cache->create = create;
cache->destroy = destroy;
cache->slots_count = slots_count;
cache->slots = push_array(cache_stripe->arena, AC_Slot, slots_count);
cache->stripes = stripe_array_alloc(cache_stripe->arena);
}
}
if(cache != 0)
{
break;
}
}
}
//- rjf: cache * key -> artifact
void *artifact = 0;
{
U64 hash = u64_hash_from_str8(key);
U64 slot_idx = hash%cache->slots_count;
AC_Slot *slot = &cache->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
B32 found = 0;
RWMutexScope(stripe->rw_mutex, write_mode)
{
for(AC_Node *n = slot->first; n != 0; n = n->next)
{
if(str8_match(n->key, key, 0))
{
found = 1;
artifact = n->val;
access_touch(access, &n->access_pt, stripe->cv);
break;
}
}
if(write_mode && !found)
{
AC_Node *node = stripe->free;
if(node)
{
stripe->free = node->next;
}
else
{
node = push_array(stripe->arena, AC_Node, 1);
DLLPushBack(slot->first, slot->last, node);
// TODO(rjf): string allocator for keys
node->key = str8_copy(stripe->arena, key);
node->working_count = 1;
}
}
}
if(found)
{
break;
}
else if(write_mode)
{
MutexScope(ac_shared->req_mutex)
{
AC_RequestNode *n = push_array(ac_shared->req_arena, AC_RequestNode, 1);
SLLQueuePush(ac_shared->first_req, ac_shared->last_req, n);
ac_shared->req_count += 1;
n->v.key = str8_copy(ac_shared->req_arena, key);
n->v.create = create;
}
}
}
}
return artifact;
}
////////////////////////////////
//~ rjf: Tick
internal void
ac_tick(void)
{
Temp scratch = scratch_begin(0, 0);
//- rjf: do eviction pass across all caches
for EachIndex(cache_slot_idx, ac_shared->cache_slots_count)
{
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
RWMutexScope(cache_stripe->rw_mutex, 0)
{
for EachNode(cache, AC_Cache, ac_shared->cache_slots[cache_slot_idx])
{
Rng1U64 slot_range = lane_range(cache->slots_count);
for EachInRange(slot_idx, slot_range)
{
AC_Slot *slot = &cache->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
B32 slot_has_work = 0;
RWMutexScope(stripe->rw_mutex, write_mode)
{
for(AC_Node *n = slot->first, *next = 0; n != 0; n = next)
{
next = n->next;
if(access_pt_is_expired(&n->access_pt) && ins_atomic_u64_eval(&n->working_count) == 0)
{
slot_has_work = 1;
if(!write_mode)
{
break;
}
else
{
DLLRemove(slot->first, slot->last, n);
n->next = (AC_Node *)stripe->free;
stripe->free = n;
if(cache->destroy)
{
cache->destroy(n->val);
}
}
}
}
}
if(!slot_has_work)
{
break;
}
}
}
}
}
}
//- rjf: gather all requests
local_persist AC_Request *reqs = 0;
local_persist U64 reqs_count = 0;
if(lane_idx() == 0) MutexScope(ac_shared->req_mutex)
{
reqs_count = ac_shared->req_count;
reqs = push_array(scratch.arena, AC_Request, reqs_count);
U64 idx = 0;
for EachNode(r, AC_RequestNode, ac_shared->first_req)
{
MemoryCopyStruct(&reqs[idx], &r->v);
reqs[idx].key = str8_copy(scratch.arena, reqs[idx].key);
idx += 1;
}
arena_clear(ac_shared->req_arena);
ac_shared->first_req = ac_shared->last_req = 0;
ac_shared->req_count = 0;
}
lane_sync();
//- rjf: do all requests on all lanes
for EachIndex(idx, reqs_count)
{
reqs[idx].create(reqs[idx].key);
}
lane_sync();
scratch_end(scratch);
}
+104
View File
@@ -0,0 +1,104 @@
// Copyright (c) Epic Games Tools
// Licensed under the MIT license (https://opensource.org/license/mit/)
#ifndef ARTIFACT_CACHE_H
#define ARTIFACT_CACHE_H
////////////////////////////////
//~ rjf: Artifact Computation Function Types
typedef void *AC_CreateFunctionType(String8 key);
typedef void AC_DestroyFunctionType(void *artifact);
////////////////////////////////
//~ rjf: Cache Types
typedef struct AC_Request AC_Request;
struct AC_Request
{
String8 key;
AC_CreateFunctionType *create;
};
typedef struct AC_RequestNode AC_RequestNode;
struct AC_RequestNode
{
AC_RequestNode *next;
AC_Request v;
};
typedef struct AC_Node AC_Node;
struct AC_Node
{
AC_Node *next;
AC_Node *prev;
// rjf: key/value
String8 key;
void *val;
// rjf: metadata
AccessPt access_pt;
U64 working_count;
};
typedef struct AC_Slot AC_Slot;
struct AC_Slot
{
AC_Node *first;
AC_Node *last;
};
typedef struct AC_Cache AC_Cache;
struct AC_Cache
{
// rjf: link / key for cache-cache
AC_Cache *next;
AC_CreateFunctionType *create;
AC_DestroyFunctionType *destroy;
// rjf: artifact cache
U64 slots_count;
AC_Slot *slots;
StripeArray stripes;
};
typedef struct AC_Shared AC_Shared;
struct AC_Shared
{
Arena *arena;
// rjf: cache cache
U64 cache_slots_count;
AC_Cache **cache_slots;
StripeArray cache_stripes;
// rjf: requests
Mutex req_mutex;
Arena *req_arena;
AC_RequestNode *first_req;
AC_RequestNode *last_req;
U64 req_count;
};
////////////////////////////////
//~ rjf: Globals
global AC_Shared *ac_shared = 0;
////////////////////////////////
//~ rjf: Layer Initialization
internal void ac_init(void);
////////////////////////////////
//~ rjf: Cache Lookups
internal void *ac_artifact_from_key(Access *access, String8 key, AC_CreateFunctionType *create, AC_DestroyFunctionType *destroy, U64 slots_count);
////////////////////////////////
//~ rjf: Tick
internal void ac_tick(void);
#endif // ARTIFACT_CACHE_H
+6
View File
@@ -53,6 +53,9 @@ main_thread_base_entry_point(int arguments_count, char **arguments)
}
//- rjf: initialize all included layers
#if defined(ARTIFACT_CACHE_H) && !defined(AC_INIT_MANUAL)
ac_init();
#endif
#if defined(ASYNC_H) && !defined(ASYNC_INIT_MANUAL)
async_init(&cmdline);
#endif
@@ -198,6 +201,9 @@ async_thread_entry_point(void *params)
{
async_loop_again = 0;
}
#if defined(ARTIFACT_CACHE_H)
ac_tick();
#endif
#if defined(CONTENT_H)
c_tick();
#endif
+36
View File
@@ -64,3 +64,39 @@ internal void semaphore_drop(Semaphore semaphore)
internal Barrier barrier_alloc(U64 count) {return os_barrier_alloc(count);}
internal void barrier_release(Barrier barrier) {os_barrier_release(barrier);}
internal void barrier_wait(Barrier barrier) {os_barrier_wait(barrier);}
////////////////////////////////
//~ rjf: Table Stripe Functions
internal StripeArray
stripe_array_alloc(Arena *arena)
{
StripeArray array = {0};
array.count = os_get_system_info()->logical_processor_count;
array.v = push_array(arena, Stripe, array.count);
for EachIndex(idx, array.count)
{
array.v[idx].arena = arena_alloc();
array.v[idx].rw_mutex = rw_mutex_alloc();
array.v[idx].cv = cond_var_alloc();
}
return array;
}
internal void
stripe_array_release(StripeArray *stripes)
{
for EachIndex(idx, stripes->count)
{
arena_release(stripes->v[idx].arena);
rw_mutex_release(stripes->v[idx].rw_mutex);
cond_var_release(stripes->v[idx].cv);
}
}
internal Stripe *
stripe_from_slot_idx(StripeArray *stripes, U64 slot_idx)
{
Stripe *stripe = &stripes->v[slot_idx%stripes->count];
return stripe;
}
+26
View File
@@ -47,6 +47,25 @@ struct Barrier
U64 u64[1];
};
////////////////////////////////
//~ rjf: Table Stripes
typedef struct Stripe Stripe;
struct Stripe
{
Arena *arena;
RWMutex rw_mutex;
CondVar cv;
void *free;
};
typedef struct StripeArray StripeArray;
struct StripeArray
{
Stripe *v;
U64 count;
};
////////////////////////////////
//~ rjf: Thread Functions
@@ -104,4 +123,11 @@ internal void barrier_wait(Barrier barrier);
#define MutexScopeW(mutex) DeferLoop(rw_mutex_take_w(mutex), rw_mutex_drop_w(mutex))
#define MutexScopeRWPromote(mutex) DeferLoop((rw_mutex_drop_r(mutex), rw_mutex_take_w(mutex)), (rw_mutex_drop_w(mutex), rw_mutex_take_r(mutex)))
////////////////////////////////
//~ rjf: Table Stripe Functions
internal StripeArray stripe_array_alloc(Arena *arena);
internal void stripe_array_release(StripeArray *stripes);
internal Stripe *stripe_from_slot_idx(StripeArray *stripes, U64 slot_idx);
#endif // BASE_THREADS_H
+2
View File
@@ -222,6 +222,7 @@
#include "linker/hash_table.h"
#include "os/os_inc.h"
#include "async/async.h"
#include "artifact_cache/artifact_cache.h"
#include "rdi/rdi_local.h"
#include "rdi_make/rdi_make_local.h"
#include "mdesk/mdesk.h"
@@ -271,6 +272,7 @@
#include "linker/hash_table.c"
#include "os/os_inc.c"
#include "async/async.c"
#include "artifact_cache/artifact_cache.c"
#include "rdi/rdi_local.c"
#include "rdi_make/rdi_make_local.c"
#include "mdesk/mdesk.c"
+59 -15
View File
@@ -1601,15 +1601,8 @@ txt_init(void)
txt_shared->arena = arena;
txt_shared->slots_count = 1024;
txt_shared->slots = push_array(arena, TXT_Slot, txt_shared->slots_count);
txt_shared->stripes_count = Min(txt_shared->slots_count, os_get_system_info()->logical_processor_count);
txt_shared->stripes = push_array(arena, TXT_Stripe, txt_shared->stripes_count);
txt_shared->stripes_free_nodes = push_array(arena, TXT_Node *, txt_shared->stripes_count);
for(U64 idx = 0; idx < txt_shared->stripes_count; idx += 1)
{
txt_shared->stripes[idx].arena = arena_alloc();
txt_shared->stripes[idx].rw_mutex = rw_mutex_alloc();
txt_shared->stripes[idx].cv = cond_var_alloc();
}
txt_shared->stripes = stripe_array_alloc(arena);
txt_shared->stripes_free_nodes = push_array(arena, TXT_Node *, txt_shared->stripes.count);
txt_shared->u2p_ring_size = KB(64);
txt_shared->u2p_ring_base = push_array_no_zero(arena, U8, txt_shared->u2p_ring_size);
txt_shared->u2p_ring_cv = cond_var_alloc();
@@ -1627,9 +1620,9 @@ txt_text_info_from_hash_lang(Access *access, U128 hash, TXT_LangKind lang)
if(!u128_match(hash, u128_zero()))
{
U64 slot_idx = hash.u64[1]%txt_shared->slots_count;
U64 stripe_idx = slot_idx%txt_shared->stripes_count;
TXT_Slot *slot = &txt_shared->slots[slot_idx];
TXT_Stripe *stripe = &txt_shared->stripes[stripe_idx];
Stripe *stripe = stripe_from_slot_idx(&txt_shared->stripes, slot_idx);
U64 stripe_idx = (stripe - txt_shared->stripes.v);
B32 found = 0;
MutexScopeR(stripe->rw_mutex)
{
@@ -2128,9 +2121,8 @@ ASYNC_WORK_DEF(txt_parse_work)
//- rjf: unpack hash
U64 slot_idx = hash.u64[1]%txt_shared->slots_count;
U64 stripe_idx = slot_idx%txt_shared->stripes_count;
TXT_Slot *slot = &txt_shared->slots[slot_idx];
TXT_Stripe *stripe = &txt_shared->stripes[stripe_idx];
Stripe *stripe = stripe_from_slot_idx(&txt_shared->stripes, slot_idx);
//- rjf: take task
B32 got_task = 0;
@@ -2434,9 +2426,9 @@ txt_evictor_thread__entry_point(void *p)
{
for(U64 slot_idx = 0; slot_idx < txt_shared->slots_count; slot_idx += 1)
{
U64 stripe_idx = slot_idx%txt_shared->stripes_count;
TXT_Slot *slot = &txt_shared->slots[slot_idx];
TXT_Stripe *stripe = &txt_shared->stripes[stripe_idx];
Stripe *stripe = stripe_from_slot_idx(&txt_shared->stripes, slot_idx);
U64 stripe_idx = (stripe - txt_shared->stripes.v);
B32 slot_has_work = 0;
MutexScopeR(stripe->rw_mutex)
{
@@ -2474,3 +2466,55 @@ txt_evictor_thread__entry_point(void *p)
os_sleep_milliseconds(500);
}
}
////////////////////////////////
//~ rjf: Tick
internal void
txt_tick(void)
{
//- rjf: do eviction pass
{
Rng1U64 range = lane_range(txt_shared->slots_count);
for EachInRange(slot_idx, range)
{
TXT_Slot *slot = &txt_shared->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&txt_shared->stripes, slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
B32 slot_has_work = 0;
RWMutexScope(stripe->rw_mutex, write_mode)
{
for(TXT_Node *n = slot->first, *next = 0; n != 0; n = next)
{
next = n->next;
if(access_pt_is_expired(&n->access_pt) &&
n->load_count != 0 &&
n->is_working == 0)
{
slot_has_work = 1;
if(!write_mode)
{
break;
}
else
{
DLLRemove(slot->first, slot->last, n);
c_hash_downstream_dec(n->hash);
if(n->arena != 0)
{
arena_release(n->arena);
}
SLLStackPush(txt_shared->stripes_free_nodes[(stripe - txt_shared->stripes.v)], n);
}
}
}
}
if(!slot_has_work)
{
break;
}
}
}
}
}
+27 -10
View File
@@ -158,6 +158,20 @@ typedef TXT_TokenArray TXT_LangLexFunctionType(Arena *arena, U64 *bytes_processe
////////////////////////////////
//~ rjf: Cache Types
typedef struct TXT_Request TXT_Request;
struct TXT_Request
{
U128 hash;
TXT_LangKind lang;
};
typedef struct TXT_RequestNode TXT_RequestNode;
struct TXT_RequestNode
{
TXT_RequestNode *next;
TXT_Request v;
};
typedef struct TXT_Node TXT_Node;
struct TXT_Node
{
@@ -186,14 +200,6 @@ struct TXT_Slot
TXT_Node *last;
};
typedef struct TXT_Stripe TXT_Stripe;
struct TXT_Stripe
{
Arena *arena;
RWMutex rw_mutex;
CondVar cv;
};
////////////////////////////////
//~ rjf: Shared State
@@ -207,11 +213,17 @@ struct TXT_Shared
// rjf: cache
U64 slots_count;
U64 stripes_count;
TXT_Slot *slots;
TXT_Stripe *stripes;
StripeArray stripes;
TXT_Node **stripes_free_nodes;
// rjf: requests
Mutex req_mutex;
Arena *req_arena;
TXT_RequestNode *first_req;
TXT_RequestNode *last_req;
U64 req_count;
// rjf: user -> parse thread
U64 u2p_ring_size;
U8 *u2p_ring_base;
@@ -293,4 +305,9 @@ ASYNC_WORK_DEF(txt_parse_work);
internal void txt_evictor_thread__entry_point(void *p);
////////////////////////////////
//~ rjf: Tick
internal void txt_tick(void);
#endif // TEXT_CACHE_H