extend artifact cache loop with an early-out cancellation path, when new high priority requests have been pushed, while low priority ones are still being worked on

This commit is contained in:
Ryan Fleury
2025-09-25 15:35:22 -07:00
parent c4bf855af9
commit 366c63e3ce
+79
View File
@@ -319,7 +319,17 @@ ac_async_tick(void)
lane_sync();
RequestBatchTask *task = &tasks[task_idx];
//- rjf: set up cancellation signal
U64 cancelled = 0;
U64 *cancelled_ptr = 0;
if(lane_idx() == 0)
{
cancelled_ptr = &cancelled;
}
lane_sync_u64(&cancelled_ptr, 0);
//- rjf: do all wide requests for this priority
U64 done_wide_count = 0;
ProfScope("wide requests (p%I64u)", task_idx)
{
for EachIndex(idx, task->wide_count)
@@ -327,6 +337,21 @@ ac_async_tick(void)
lane_sync();
AC_Request *r = &task->wide[idx];
// rjf: any new higher priority tasks? -> cancel
if(task_idx == 1 && idx != 0) MutexScope(ac_shared->req_batches[0].mutex)
{
if(ac_shared->req_batches[0].wide_count != 0 || ac_shared->req_batches[0].thin_count != 0)
{
ins_atomic_u64_eval_assign(cancelled_ptr, 1);
}
}
// rjf: cancelled? -> exit
if(ins_atomic_u64_eval(cancelled_ptr))
{
break;
}
// rjf: compute val
B32 retry = 0;
AC_Artifact val = r->create(r->key, r->gen, r->last_requested_gen, &retry);
@@ -388,11 +413,16 @@ ac_async_tick(void)
}
cond_var_broadcast(stripe->cv);
}
// rjf: increment count
lane_sync();
done_wide_count += 1;
}
}
lane_sync();
//- rjf: do all thin requests for this priority
U64 done_thin_count = 0;
ProfScope("thin requests (p%I64u)", task_idx)
{
U64 req_take_counter = 0;
@@ -404,6 +434,22 @@ ac_async_tick(void)
lane_sync_u64(&req_take_counter_ptr, 0);
for(;;)
{
// rjf: any new higher priority tasks? -> cancel
if(task_idx == 1 && ins_atomic_u64_eval(req_take_counter_ptr) != 0) MutexScope(ac_shared->req_batches[0].mutex)
{
if(ac_shared->req_batches[0].wide_count != 0 || ac_shared->req_batches[0].thin_count != 0)
{
ins_atomic_u64_eval_assign(cancelled_ptr, 1);
}
}
// rjf: cancelled? -> exit
if(ins_atomic_u64_eval(cancelled_ptr))
{
break;
}
// rjf: take next task
U64 req_idx = ins_atomic_u64_inc_eval(req_take_counter_ptr) - 1;
if(req_idx >= task->thin_count) { break; }
AC_Request *r = &task->thin[req_idx];
@@ -471,6 +517,39 @@ ac_async_tick(void)
}
}
lane_sync();
done_thin_count = ins_atomic_u64_eval(req_take_counter_ptr);
lane_sync();
}
//- rjf: cancelled early, unfinished tasks? -> defer to next tick
if(lane_idx() == 0 && task_idx > 0)
{
AC_RequestBatch *batch = &ac_shared->req_batches[task_idx];
MutexScope(batch->mutex)
{
// rjf: push leftover wide tasks
for(U64 idx = done_wide_count; idx < task->wide_count; idx += 1)
{
AC_Request *r = &task->wide[idx];
AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1);
SLLQueuePush(batch->first_wide, batch->last_wide, n);
batch->wide_count += 1;
MemoryCopyStruct(&n->v, r);
n->v.key = str8_copy(batch->arena, n->v.key);
}
// rjf: push leftover thin tasks
for(U64 idx = done_thin_count; idx < task->thin_count; idx += 1)
{
AC_Request *r = &task->thin[idx];
AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1);
SLLQueuePush(batch->first_thin, batch->last_thin, n);
batch->thin_count += 1;
MemoryCopyStruct(&n->v, r);
n->v.key = str8_copy(batch->arena, n->v.key);
}
}
ins_atomic_u32_eval_assign(&async_loop_again, 1);
}
lane_sync();
}