diff --git a/src/artifact_cache/artifact_cache.c b/src/artifact_cache/artifact_cache.c index 94a64706..5606adfe 100644 --- a/src/artifact_cache/artifact_cache.c +++ b/src/artifact_cache/artifact_cache.c @@ -319,7 +319,17 @@ ac_async_tick(void) lane_sync(); RequestBatchTask *task = &tasks[task_idx]; + //- rjf: set up cancellation signal + U64 cancelled = 0; + U64 *cancelled_ptr = 0; + if(lane_idx() == 0) + { + cancelled_ptr = &cancelled; + } + lane_sync_u64(&cancelled_ptr, 0); + //- rjf: do all wide requests for this priority + U64 done_wide_count = 0; ProfScope("wide requests (p%I64u)", task_idx) { for EachIndex(idx, task->wide_count) @@ -327,6 +337,21 @@ ac_async_tick(void) lane_sync(); AC_Request *r = &task->wide[idx]; + // rjf: any new higher priority tasks? -> cancel + if(task_idx == 1 && idx != 0) MutexScope(ac_shared->req_batches[0].mutex) + { + if(ac_shared->req_batches[0].wide_count != 0 || ac_shared->req_batches[0].thin_count != 0) + { + ins_atomic_u64_eval_assign(cancelled_ptr, 1); + } + } + + // rjf: cancelled? -> exit + if(ins_atomic_u64_eval(cancelled_ptr)) + { + break; + } + // rjf: compute val B32 retry = 0; AC_Artifact val = r->create(r->key, r->gen, r->last_requested_gen, &retry); @@ -388,11 +413,16 @@ ac_async_tick(void) } cond_var_broadcast(stripe->cv); } + + // rjf: increment count + lane_sync(); + done_wide_count += 1; } } lane_sync(); //- rjf: do all thin requests for this priority + U64 done_thin_count = 0; ProfScope("thin requests (p%I64u)", task_idx) { U64 req_take_counter = 0; @@ -404,6 +434,22 @@ ac_async_tick(void) lane_sync_u64(&req_take_counter_ptr, 0); for(;;) { + // rjf: any new higher priority tasks? -> cancel + if(task_idx == 1 && ins_atomic_u64_eval(req_take_counter_ptr) != 0) MutexScope(ac_shared->req_batches[0].mutex) + { + if(ac_shared->req_batches[0].wide_count != 0 || ac_shared->req_batches[0].thin_count != 0) + { + ins_atomic_u64_eval_assign(cancelled_ptr, 1); + } + } + + // rjf: cancelled? -> exit + if(ins_atomic_u64_eval(cancelled_ptr)) + { + break; + } + + // rjf: take next task U64 req_idx = ins_atomic_u64_inc_eval(req_take_counter_ptr) - 1; if(req_idx >= task->thin_count) { break; } AC_Request *r = &task->thin[req_idx]; @@ -471,6 +517,39 @@ ac_async_tick(void) } } lane_sync(); + done_thin_count = ins_atomic_u64_eval(req_take_counter_ptr); + lane_sync(); + } + + //- rjf: cancelled early, unfinished tasks? -> defer to next tick + if(lane_idx() == 0 && task_idx > 0) + { + AC_RequestBatch *batch = &ac_shared->req_batches[task_idx]; + MutexScope(batch->mutex) + { + // rjf: push leftover wide tasks + for(U64 idx = done_wide_count; idx < task->wide_count; idx += 1) + { + AC_Request *r = &task->wide[idx]; + AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1); + SLLQueuePush(batch->first_wide, batch->last_wide, n); + batch->wide_count += 1; + MemoryCopyStruct(&n->v, r); + n->v.key = str8_copy(batch->arena, n->v.key); + } + + // rjf: push leftover thin tasks + for(U64 idx = done_thin_count; idx < task->thin_count; idx += 1) + { + AC_Request *r = &task->thin[idx]; + AC_RequestNode *n = push_array(batch->arena, AC_RequestNode, 1); + SLLQueuePush(batch->first_thin, batch->last_thin, n); + batch->thin_count += 1; + MemoryCopyStruct(&n->v, r); + n->v.key = str8_copy(batch->arena, n->v.key); + } + } + ins_atomic_u32_eval_assign(&async_loop_again, 1); } lane_sync(); }