diff --git a/src/artifact_cache/artifact_cache.c b/src/artifact_cache/artifact_cache.c index 16f7a221..1910b345 100644 --- a/src/artifact_cache/artifact_cache.c +++ b/src/artifact_cache/artifact_cache.c @@ -18,6 +18,7 @@ ac_init(void) ac_shared->req_batches[idx].mutex = mutex_alloc(); ac_shared->req_batches[idx].arena = arena_alloc(); } + ac_shared->cancel_thread = thread_launch(ac_cancel_thread_entry_point, 0); } //////////////////////////////// @@ -161,8 +162,8 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6 } n->v.key = str8_copy(req_batch->arena, key); n->v.gen = params->gen; + n->v.cancel_signal = &node->cancelled; n->v.create = params->create; - n->v.cancel_signal = params->cancel_signal; } cond_var_broadcast(async_tick_start_cond_var); ins_atomic_u32_eval_assign(&async_loop_again, 1); @@ -576,3 +577,60 @@ ac_async_tick(void) scratch_end(scratch); } + +//////////////////////////////// +//~ rjf: Cancel Thread + +internal void +ac_cancel_thread_entry_point(void *p) +{ + for(;;) + { + os_sleep_milliseconds(100); + + //- rjf: scan in-flight nodes for expiration + for EachIndex(cache_slot_idx, ac_shared->cache_slots_count) + { + Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx); + RWMutexScope(cache_stripe->rw_mutex, 0) + { + for EachNode(cache, AC_Cache, ac_shared->cache_slots[cache_slot_idx]) + { + Rng1U64 slot_range = lane_range(cache->slots_count); + for EachInRange(slot_idx, slot_range) + { + AC_Slot *slot = &cache->slots[slot_idx]; + Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx); + for(B32 write_mode = 0; write_mode <= 1; write_mode += 1) + { + B32 slot_has_work = 0; + RWMutexScope(stripe->rw_mutex, write_mode) + { + for(AC_Node *n = slot->first, *next = 0; n != 0; n = next) + { + next = n->next; + if(access_pt_is_expired(&n->access_pt, .time = n->evict_threshold_us) && ins_atomic_u64_eval(&n->working_count) > 0) + { + slot_has_work = 1; + if(!write_mode) + { + break; + } + else + { + n->cancelled = 1; + } + } + } + } + if(!slot_has_work) + { + break; + } + } + } + } + } + } + } +} diff --git a/src/artifact_cache/artifact_cache.h b/src/artifact_cache/artifact_cache.h index 1f5bf00c..af651f46 100644 --- a/src/artifact_cache/artifact_cache.h +++ b/src/artifact_cache/artifact_cache.h @@ -37,7 +37,6 @@ struct AC_ArtifactParams U64 gen; U64 evict_threshold_us; B32 *stale_out; - B32 *cancel_signal; AC_Flags flags; }; @@ -127,6 +126,9 @@ struct AC_Shared // rjf: requests AC_RequestBatch req_batches[2]; // 0: high priority, 1: low priority + + // rjf: cancel thread + Thread cancel_thread; }; //////////////////////////////// @@ -139,11 +141,6 @@ global AC_Shared *ac_shared = 0; internal void ac_init(void); -//////////////////////////////// -//~ rjf: Helpers - -internal B32 ac_cancelled(void); - //////////////////////////////// //~ rjf: Cache Lookups @@ -155,4 +152,9 @@ internal AC_Artifact ac_artifact_from_key_(Access *access, String8 key, AC_Artif internal void ac_async_tick(void); +//////////////////////////////// +//~ rjf: Cancel Thread + +internal void ac_cancel_thread_entry_point(void *p); + #endif // ARTIFACT_CACHE_H diff --git a/src/dbg_info/dbg_info.c b/src/dbg_info/dbg_info.c index 994fcb39..c45ed0b3 100644 --- a/src/dbg_info/dbg_info.c +++ b/src/dbg_info/dbg_info.c @@ -1142,9 +1142,10 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * Rng1U64 range = lane_range(element_count); for EachInRange(idx, range) { - //- rjf: every so often, check if we need to cancel, and cancel + //- rjf: every so often, check if we need to cancel, and cancel + if(idx%10000 == 0 && !!ins_atomic_u32_eval(cancel_signal)) { - // TODO(rjf) + break; } //- rjf: get element, map to string; if empty, continue to next element @@ -1244,7 +1245,15 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * } } lane_sync(); - + + //- rjf: decide if we cancelled + B32 cancelled = 0; + if(lane_idx() == 0 && !!ins_atomic_u32_eval(cancel_signal)) + { + cancelled = 1; + } + lane_sync_u64(&cancelled, 0); + //- rjf: produce sort records typedef struct SortRecord SortRecord; struct SortRecord @@ -1255,7 +1264,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * U64 sort_records_count = all_items->total_count; SortRecord *sort_records = 0; SortRecord *sort_records__swap = 0; - ProfScope("produce sort records") + if(!cancelled) ProfScope("produce sort records") { if(lane_idx() == 0) { @@ -1283,7 +1292,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * lane_sync(); //- rjf: sort records - ProfScope("sort records") + if(!cancelled) ProfScope("sort records") { //- rjf: set up common data U64 bits_per_digit = 8; @@ -1382,7 +1391,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * //- rjf: produce final array DI_SearchItemArray items = {0}; - ProfScope("produce final array") + if(!cancelled) ProfScope("produce final array") { if(lane_idx() == 0) { @@ -1401,11 +1410,20 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 * } lane_sync(); - //- rjf: bundle as artifact - artifact.u64[0] = (U64)arenas; - artifact.u64[1] = arenas_count; - artifact.u64[2] = (U64)items.v; - artifact.u64[3] = items.count; + //- rjf: bundle as artifact + if(!cancelled) + { + artifact.u64[0] = (U64)arenas; + artifact.u64[1] = arenas_count; + artifact.u64[2] = (U64)items.v; + artifact.u64[3] = items.count; + } + + //- rjf: release results on cancel + else + { + arena_release(arena); + } } scratch_end(scratch); access_close(access);