mirror of
https://github.com/Ed94/raddebugger.git
synced 2026-06-17 17:42:22 -07:00
bucket artifact cache requests by wideness/priority; do high/wide, high/thin, low/wide, low/thin
This commit is contained in:
@@ -13,8 +13,11 @@ ac_init(void)
|
||||
ac_shared->cache_slots_count = 256;
|
||||
ac_shared->cache_slots = push_array(arena, AC_Cache *, ac_shared->cache_slots_count);
|
||||
ac_shared->cache_stripes = stripe_array_alloc(arena);
|
||||
ac_shared->req_mutex = mutex_alloc();
|
||||
ac_shared->req_arena = arena_alloc();
|
||||
for EachElement(idx, ac_shared->req_batches)
|
||||
{
|
||||
ac_shared->req_batches[idx].mutex = mutex_alloc();
|
||||
ac_shared->req_batches[idx].arena = arena_alloc();
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////
|
||||
@@ -23,6 +26,8 @@ ac_init(void)
|
||||
internal AC_Artifact
|
||||
ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U64 endt_us)
|
||||
{
|
||||
AC_RequestBatch *req_batch = &ac_shared->req_batches[params->flags & AC_Flag_HighPriority ? 0 : 1];
|
||||
|
||||
//- rjf: create function -> cache
|
||||
AC_Cache *cache = 0;
|
||||
{
|
||||
@@ -76,7 +81,7 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6
|
||||
{
|
||||
if(str8_match(n->key, key, 0))
|
||||
{
|
||||
if(ins_atomic_u64_eval(&n->completion_count) != 0 && (n->gen == params->gen || !params->wait_for_fresh))
|
||||
if(ins_atomic_u64_eval(&n->completion_count) != 0 && (n->gen == params->gen || !(params->flags & AC_Flag_WaitForFresh)))
|
||||
{
|
||||
got_artifact = 1;
|
||||
artifact_is_stale = (n->gen == params->gen);
|
||||
@@ -129,18 +134,27 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6
|
||||
// TODO(rjf): string allocator for keys
|
||||
node->key = str8_copy(stripe->arena, key);
|
||||
node->working_count = 1;
|
||||
node->evict_threshold_us = params->evict_threshold_us;
|
||||
}
|
||||
|
||||
// rjf: request
|
||||
if(need_request)
|
||||
{
|
||||
need_request = 0;
|
||||
MutexScope(ac_shared->req_mutex)
|
||||
MutexScope(req_batch->mutex)
|
||||
{
|
||||
AC_RequestNode *n = push_array(ac_shared->req_arena, AC_RequestNode, 1);
|
||||
SLLQueuePush(ac_shared->first_req, ac_shared->last_req, n);
|
||||
ac_shared->req_count += 1;
|
||||
n->v.key = str8_copy(ac_shared->req_arena, key);
|
||||
AC_RequestNode *n = push_array(req_batch->arena, AC_RequestNode, 1);
|
||||
if(params->flags & AC_Flag_Wide)
|
||||
{
|
||||
SLLQueuePush(req_batch->first_wide, req_batch->last_wide, n);
|
||||
req_batch->wide_count += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
SLLQueuePush(req_batch->first_thin, req_batch->last_thin, n);
|
||||
req_batch->thin_count += 1;
|
||||
}
|
||||
n->v.key = str8_copy(req_batch->arena, key);
|
||||
n->v.gen = params->gen;
|
||||
n->v.create = params->create;
|
||||
}
|
||||
@@ -149,7 +163,7 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6
|
||||
}
|
||||
|
||||
// rjf: get value from node, if possible
|
||||
if(!got_artifact && ins_atomic_u64_eval(&node->completion_count) != 0 && ((node->gen == params->gen) || !params->wait_for_fresh || out_of_time))
|
||||
if(!got_artifact && ins_atomic_u64_eval(&node->completion_count) != 0 && ((node->gen == params->gen) || !(params->flags & AC_Flag_WaitForFresh) || out_of_time))
|
||||
{
|
||||
got_artifact = 1;
|
||||
artifact_is_stale = (node->gen == params->gen);
|
||||
@@ -185,7 +199,9 @@ ac_async_tick(void)
|
||||
{
|
||||
Temp scratch = scratch_begin(0, 0);
|
||||
|
||||
//////////////////////////////
|
||||
//- rjf: do eviction pass across all caches
|
||||
//
|
||||
for EachIndex(cache_slot_idx, ac_shared->cache_slots_count)
|
||||
{
|
||||
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
|
||||
@@ -206,7 +222,7 @@ ac_async_tick(void)
|
||||
for(AC_Node *n = slot->first, *next = 0; n != 0; n = next)
|
||||
{
|
||||
next = n->next;
|
||||
if(access_pt_is_expired(&n->access_pt) && ins_atomic_u64_eval(&n->working_count) == 0)
|
||||
if(access_pt_is_expired(&n->access_pt, .time = n->evict_threshold_us) && ins_atomic_u64_eval(&n->working_count) == 0)
|
||||
{
|
||||
slot_has_work = 1;
|
||||
if(!write_mode)
|
||||
@@ -236,90 +252,216 @@ ac_async_tick(void)
|
||||
}
|
||||
}
|
||||
|
||||
//- rjf: gather all requests
|
||||
local_persist AC_Request *reqs = 0;
|
||||
local_persist U64 reqs_count = 0;
|
||||
if(lane_idx() == 0) MutexScope(ac_shared->req_mutex)
|
||||
//////////////////////////////
|
||||
//- rjf: gather requests
|
||||
//
|
||||
typedef struct RequestBatchTask RequestBatchTask;
|
||||
struct RequestBatchTask
|
||||
{
|
||||
reqs_count = ac_shared->req_count;
|
||||
reqs = push_array(scratch.arena, AC_Request, reqs_count);
|
||||
U64 idx = 0;
|
||||
for EachNode(r, AC_RequestNode, ac_shared->first_req)
|
||||
AC_Request *wide;
|
||||
U64 wide_count;
|
||||
AC_Request *thin;
|
||||
U64 thin_count;
|
||||
};
|
||||
RequestBatchTask *tasks = 0;
|
||||
U64 tasks_count = 0;
|
||||
if(lane_idx() == 0)
|
||||
{
|
||||
tasks_count = 2;
|
||||
tasks = push_array(scratch.arena, RequestBatchTask, tasks_count);
|
||||
for EachElement(task_idx, ac_shared->req_batches)
|
||||
{
|
||||
MemoryCopyStruct(&reqs[idx], &r->v);
|
||||
reqs[idx].key = str8_copy(scratch.arena, reqs[idx].key);
|
||||
idx += 1;
|
||||
AC_RequestBatch *batch = &ac_shared->req_batches[task_idx];
|
||||
MutexScope(batch->mutex)
|
||||
{
|
||||
tasks[task_idx].wide_count = batch->wide_count;
|
||||
tasks[task_idx].thin_count = batch->thin_count;
|
||||
tasks[task_idx].wide = push_array(scratch.arena, AC_Request, tasks[task_idx].wide_count);
|
||||
tasks[task_idx].thin = push_array(scratch.arena, AC_Request, tasks[task_idx].thin_count);
|
||||
{
|
||||
U64 idx = 0;
|
||||
for EachNode(n, AC_RequestNode, batch->first_wide)
|
||||
{
|
||||
MemoryCopyStruct(&tasks[task_idx].wide[idx], &n->v);
|
||||
tasks[task_idx].wide[idx].key = str8_copy(scratch.arena, tasks[task_idx].wide[idx].key);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
{
|
||||
U64 idx = 0;
|
||||
for EachNode(n, AC_RequestNode, batch->first_thin)
|
||||
{
|
||||
MemoryCopyStruct(&tasks[task_idx].thin[idx], &n->v);
|
||||
tasks[task_idx].thin[idx].key = str8_copy(scratch.arena, tasks[task_idx].thin[idx].key);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
arena_clear(batch->arena);
|
||||
batch->first_wide = batch->last_wide = batch->first_thin = batch->last_thin = 0;
|
||||
batch->wide_count = batch->thin_count = 0;
|
||||
}
|
||||
}
|
||||
arena_clear(ac_shared->req_arena);
|
||||
ac_shared->first_req = ac_shared->last_req = 0;
|
||||
ac_shared->req_count = 0;
|
||||
}
|
||||
lane_sync();
|
||||
lane_sync_u64(&tasks, 0);
|
||||
lane_sync_u64(&tasks_count, 0);
|
||||
|
||||
//- rjf: do all requests on all lanes
|
||||
for EachIndex(idx, reqs_count)
|
||||
//////////////////////////////
|
||||
//- rjf: do all requests
|
||||
//
|
||||
for EachIndex(task_idx, tasks_count)
|
||||
{
|
||||
lane_sync();
|
||||
RequestBatchTask *task = &tasks[task_idx];
|
||||
|
||||
// rjf: compute val
|
||||
B32 retry = 0;
|
||||
AC_Artifact val = reqs[idx].create(reqs[idx].key, &retry);
|
||||
|
||||
// rjf: retry? -> resubmit request
|
||||
if(retry && lane_idx() == 0)
|
||||
//- rjf: do all wide requests for this priority
|
||||
ProfScope("wide requests (p%I64u)", task_idx)
|
||||
{
|
||||
MutexScope(ac_shared->req_mutex)
|
||||
for EachIndex(idx, task->wide_count)
|
||||
{
|
||||
AC_RequestNode *n = push_array(ac_shared->req_arena, AC_RequestNode, 1);
|
||||
SLLQueuePush(ac_shared->first_req, ac_shared->last_req, n);
|
||||
ac_shared->req_count += 1;
|
||||
MemoryCopyStruct(&n->v, &reqs[idx]);
|
||||
n->v.key = str8_copy(ac_shared->req_arena, n->v.key);
|
||||
}
|
||||
ins_atomic_u32_eval_assign(&async_loop_again, 1);
|
||||
}
|
||||
|
||||
// rjf: create function -> cache
|
||||
AC_Cache *cache = 0;
|
||||
if(!retry)
|
||||
{
|
||||
U64 cache_hash = u64_hash_from_str8(str8_struct(&reqs[idx].create));
|
||||
U64 cache_slot_idx = cache_hash%ac_shared->cache_slots_count;
|
||||
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
|
||||
RWMutexScope(cache_stripe->rw_mutex, 0)
|
||||
{
|
||||
for(AC_Cache *c = ac_shared->cache_slots[cache_slot_idx]; c != 0; c = c->next)
|
||||
AC_Request *r = &task->wide[idx];
|
||||
|
||||
// rjf: compute val
|
||||
B32 retry = 0;
|
||||
AC_Artifact val = r->create(r->key, &retry);
|
||||
|
||||
// rjf: retry? -> resubmit request
|
||||
if(retry && lane_idx() == 0)
|
||||
{
|
||||
if(c->create == reqs[idx].create)
|
||||
AC_RequestBatch *batch = &ac_shared->req_batches[task_idx];
|
||||
MutexScope(batch->mutex)
|
||||
{
|
||||
cache = c;
|
||||
break;
|
||||
AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1);
|
||||
SLLQueuePush(batch->first_wide, batch->last_wide, n);
|
||||
batch->wide_count += 1;
|
||||
MemoryCopyStruct(&n->v, r);
|
||||
n->v.key = str8_copy(batch->arena, n->v.key);
|
||||
}
|
||||
ins_atomic_u32_eval_assign(&async_loop_again, 1);
|
||||
}
|
||||
|
||||
// rjf: create function -> cache
|
||||
AC_Cache *cache = 0;
|
||||
if(!retry)
|
||||
{
|
||||
U64 cache_hash = u64_hash_from_str8(str8_struct(&r->create));
|
||||
U64 cache_slot_idx = cache_hash%ac_shared->cache_slots_count;
|
||||
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
|
||||
RWMutexScope(cache_stripe->rw_mutex, 0)
|
||||
{
|
||||
for(AC_Cache *c = ac_shared->cache_slots[cache_slot_idx]; c != 0; c = c->next)
|
||||
{
|
||||
if(c->create == r->create)
|
||||
{
|
||||
cache = c;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rjf: write value into cache
|
||||
if(!retry && lane_idx() == 0)
|
||||
{
|
||||
U64 hash = u64_hash_from_str8(r->key);
|
||||
U64 slot_idx = hash%cache->slots_count;
|
||||
AC_Slot *slot = &cache->slots[slot_idx];
|
||||
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
|
||||
RWMutexScope(stripe->rw_mutex, 1)
|
||||
{
|
||||
for(AC_Node *n = slot->first; n != 0; n = n->next)
|
||||
{
|
||||
if(str8_match(n->key, r->key, 0))
|
||||
{
|
||||
n->gen = r->gen;
|
||||
n->val = val;
|
||||
ins_atomic_u64_dec_eval(&n->working_count);
|
||||
ins_atomic_u64_inc_eval(&n->completion_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
cond_var_broadcast(stripe->cv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rjf: write value into cache
|
||||
if(!retry && lane_idx() == 0)
|
||||
//- rjf: do all thin requests for this priority
|
||||
ProfScope("thin requests (p%I64u)", task_idx)
|
||||
{
|
||||
U64 hash = u64_hash_from_str8(reqs[idx].key);
|
||||
U64 slot_idx = hash%cache->slots_count;
|
||||
AC_Slot *slot = &cache->slots[slot_idx];
|
||||
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
|
||||
RWMutexScope(stripe->rw_mutex, 1)
|
||||
U64 req_take_counter = 0;
|
||||
U64 *req_take_counter_ptr = 0;
|
||||
if(lane_idx() == 0)
|
||||
{
|
||||
for(AC_Node *n = slot->first; n != 0; n = n->next)
|
||||
req_take_counter_ptr = &req_take_counter;
|
||||
}
|
||||
lane_sync_u64(&req_take_counter_ptr, 0);
|
||||
for(;;)
|
||||
{
|
||||
U64 req_idx = ins_atomic_u64_inc_eval(req_take_counter_ptr) - 1;
|
||||
if(req_idx >= task->thin_count) { break; }
|
||||
AC_Request *r = &task->thin[req_idx];
|
||||
|
||||
// rjf: compute val
|
||||
B32 retry = 0;
|
||||
AC_Artifact val = r->create(r->key, &retry);
|
||||
|
||||
// rjf: retry? -> resubmit request
|
||||
if(retry)
|
||||
{
|
||||
if(str8_match(n->key, reqs[idx].key, 0))
|
||||
AC_RequestBatch *batch = &ac_shared->req_batches[task_idx];
|
||||
MutexScope(batch->mutex)
|
||||
{
|
||||
n->gen = reqs[idx].gen;
|
||||
n->val = val;
|
||||
ins_atomic_u64_dec_eval(&n->working_count);
|
||||
ins_atomic_u64_inc_eval(&n->completion_count);
|
||||
AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1);
|
||||
SLLQueuePush(batch->first_thin, batch->last_thin, n);
|
||||
batch->thin_count += 1;
|
||||
MemoryCopyStruct(&n->v, r);
|
||||
n->v.key = str8_copy(batch->arena, n->v.key);
|
||||
}
|
||||
ins_atomic_u32_eval_assign(&async_loop_again, 1);
|
||||
}
|
||||
|
||||
// rjf: create function -> cache
|
||||
AC_Cache *cache = 0;
|
||||
if(!retry)
|
||||
{
|
||||
U64 cache_hash = u64_hash_from_str8(str8_struct(&r->create));
|
||||
U64 cache_slot_idx = cache_hash%ac_shared->cache_slots_count;
|
||||
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
|
||||
RWMutexScope(cache_stripe->rw_mutex, 0)
|
||||
{
|
||||
for(AC_Cache *c = ac_shared->cache_slots[cache_slot_idx]; c != 0; c = c->next)
|
||||
{
|
||||
if(c->create == r->create)
|
||||
{
|
||||
cache = c;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rjf: write value into cache
|
||||
if(!retry)
|
||||
{
|
||||
U64 hash = u64_hash_from_str8(r->key);
|
||||
U64 slot_idx = hash%cache->slots_count;
|
||||
AC_Slot *slot = &cache->slots[slot_idx];
|
||||
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
|
||||
RWMutexScope(stripe->rw_mutex, 1)
|
||||
{
|
||||
for(AC_Node *n = slot->first; n != 0; n = n->next)
|
||||
{
|
||||
if(str8_match(n->key, r->key, 0))
|
||||
{
|
||||
n->gen = r->gen;
|
||||
n->val = val;
|
||||
ins_atomic_u64_dec_eval(&n->working_count);
|
||||
ins_atomic_u64_inc_eval(&n->completion_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
cond_var_broadcast(stripe->cv);
|
||||
}
|
||||
}
|
||||
cond_var_broadcast(stripe->cv);
|
||||
}
|
||||
}
|
||||
lane_sync();
|
||||
|
||||
@@ -19,6 +19,15 @@ struct AC_Artifact
|
||||
typedef AC_Artifact AC_CreateFunctionType(String8 key, B32 *retry_out);
|
||||
typedef void AC_DestroyFunctionType(AC_Artifact artifact);
|
||||
|
||||
typedef U32 AC_Flags;
|
||||
typedef enum AC_FlagsEnum
|
||||
{
|
||||
AC_Flag_WaitForFresh = (1<<0),
|
||||
AC_Flag_HighPriority = (1<<1),
|
||||
AC_Flag_Wide = (1<<2),
|
||||
}
|
||||
AC_FlagsEnum;
|
||||
|
||||
typedef struct AC_ArtifactParams AC_ArtifactParams;
|
||||
struct AC_ArtifactParams
|
||||
{
|
||||
@@ -26,8 +35,9 @@ struct AC_ArtifactParams
|
||||
AC_DestroyFunctionType *destroy;
|
||||
U64 slots_count;
|
||||
U64 gen;
|
||||
B32 wait_for_fresh;
|
||||
U64 evict_threshold_us;
|
||||
B32 *stale_out;
|
||||
AC_Flags flags;
|
||||
};
|
||||
|
||||
////////////////////////////////
|
||||
@@ -63,6 +73,7 @@ struct AC_Node
|
||||
AccessPt access_pt;
|
||||
U64 working_count;
|
||||
U64 completion_count;
|
||||
U64 evict_threshold_us;
|
||||
};
|
||||
|
||||
typedef struct AC_Slot AC_Slot;
|
||||
@@ -86,6 +97,19 @@ struct AC_Cache
|
||||
StripeArray stripes;
|
||||
};
|
||||
|
||||
typedef struct AC_RequestBatch AC_RequestBatch;
|
||||
struct AC_RequestBatch
|
||||
{
|
||||
Mutex mutex;
|
||||
Arena *arena;
|
||||
AC_RequestNode *first_wide;
|
||||
AC_RequestNode *last_wide;
|
||||
AC_RequestNode *first_thin;
|
||||
AC_RequestNode *last_thin;
|
||||
U64 wide_count;
|
||||
U64 thin_count;
|
||||
};
|
||||
|
||||
typedef struct AC_Shared AC_Shared;
|
||||
struct AC_Shared
|
||||
{
|
||||
@@ -97,11 +121,7 @@ struct AC_Shared
|
||||
StripeArray cache_stripes;
|
||||
|
||||
// rjf: requests
|
||||
Mutex req_mutex;
|
||||
Arena *req_arena;
|
||||
AC_RequestNode *first_req;
|
||||
AC_RequestNode *last_req;
|
||||
U64 req_count;
|
||||
AC_RequestBatch req_batches[2]; // 0: high priority, 1: low priority
|
||||
};
|
||||
|
||||
////////////////////////////////
|
||||
@@ -118,7 +138,7 @@ internal void ac_init(void);
|
||||
//~ rjf: Cache Lookups
|
||||
|
||||
internal AC_Artifact ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U64 endt_us);
|
||||
#define ac_artifact_from_key(access, key, create_fn, destroy_fn, endt_us, ...) ac_artifact_from_key_((access), (key), &(AC_ArtifactParams){.create = (create_fn), .destroy = (destroy_fn), __VA_ARGS__}, (endt_us))
|
||||
#define ac_artifact_from_key(access, key, create_fn, destroy_fn, endt_us, ...) ac_artifact_from_key_((access), (key), &(AC_ArtifactParams){.create = (create_fn), .destroy = (destroy_fn), .evict_threshold_us = (2000000), __VA_ARGS__}, (endt_us))
|
||||
|
||||
////////////////////////////////
|
||||
//~ rjf: Asynchronous Tick
|
||||
|
||||
Reference in New Issue
Block a user