artifact cache cancellation mechanism - use in dbgi searching, since this gets huge (& is often not needed quickly after querying)

This commit is contained in:
Ryan Fleury
2025-10-17 14:10:51 -07:00
parent 0d8590a2f2
commit 6bf2d2abb1
3 changed files with 96 additions and 18 deletions
+59 -1
View File
@@ -18,6 +18,7 @@ ac_init(void)
ac_shared->req_batches[idx].mutex = mutex_alloc();
ac_shared->req_batches[idx].arena = arena_alloc();
}
ac_shared->cancel_thread = thread_launch(ac_cancel_thread_entry_point, 0);
}
////////////////////////////////
@@ -161,8 +162,8 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6
}
n->v.key = str8_copy(req_batch->arena, key);
n->v.gen = params->gen;
n->v.cancel_signal = &node->cancelled;
n->v.create = params->create;
n->v.cancel_signal = params->cancel_signal;
}
cond_var_broadcast(async_tick_start_cond_var);
ins_atomic_u32_eval_assign(&async_loop_again, 1);
@@ -576,3 +577,60 @@ ac_async_tick(void)
scratch_end(scratch);
}
////////////////////////////////
//~ rjf: Cancel Thread
internal void
ac_cancel_thread_entry_point(void *p)
{
for(;;)
{
os_sleep_milliseconds(100);
//- rjf: scan in-flight nodes for expiration
for EachIndex(cache_slot_idx, ac_shared->cache_slots_count)
{
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
RWMutexScope(cache_stripe->rw_mutex, 0)
{
for EachNode(cache, AC_Cache, ac_shared->cache_slots[cache_slot_idx])
{
Rng1U64 slot_range = lane_range(cache->slots_count);
for EachInRange(slot_idx, slot_range)
{
AC_Slot *slot = &cache->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
{
B32 slot_has_work = 0;
RWMutexScope(stripe->rw_mutex, write_mode)
{
for(AC_Node *n = slot->first, *next = 0; n != 0; n = next)
{
next = n->next;
if(access_pt_is_expired(&n->access_pt, .time = n->evict_threshold_us) && ins_atomic_u64_eval(&n->working_count) > 0)
{
slot_has_work = 1;
if(!write_mode)
{
break;
}
else
{
n->cancelled = 1;
}
}
}
}
if(!slot_has_work)
{
break;
}
}
}
}
}
}
}
}
+8 -6
View File
@@ -37,7 +37,6 @@ struct AC_ArtifactParams
U64 gen;
U64 evict_threshold_us;
B32 *stale_out;
B32 *cancel_signal;
AC_Flags flags;
};
@@ -127,6 +126,9 @@ struct AC_Shared
// rjf: requests
AC_RequestBatch req_batches[2]; // 0: high priority, 1: low priority
// rjf: cancel thread
Thread cancel_thread;
};
////////////////////////////////
@@ -139,11 +141,6 @@ global AC_Shared *ac_shared = 0;
internal void ac_init(void);
////////////////////////////////
//~ rjf: Helpers
internal B32 ac_cancelled(void);
////////////////////////////////
//~ rjf: Cache Lookups
@@ -155,4 +152,9 @@ internal AC_Artifact ac_artifact_from_key_(Access *access, String8 key, AC_Artif
internal void ac_async_tick(void);
////////////////////////////////
//~ rjf: Cancel Thread
internal void ac_cancel_thread_entry_point(void *p);
#endif // ARTIFACT_CACHE_H
+29 -11
View File
@@ -1142,9 +1142,10 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
Rng1U64 range = lane_range(element_count);
for EachInRange(idx, range)
{
//- rjf: every so often, check if we need to cancel, and cancel
//- rjf: every so often, check if we need to cancel, and cancel
if(idx%10000 == 0 && !!ins_atomic_u32_eval(cancel_signal))
{
// TODO(rjf)
break;
}
//- rjf: get element, map to string; if empty, continue to next element
@@ -1244,7 +1245,15 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
}
}
lane_sync();
//- rjf: decide if we cancelled
B32 cancelled = 0;
if(lane_idx() == 0 && !!ins_atomic_u32_eval(cancel_signal))
{
cancelled = 1;
}
lane_sync_u64(&cancelled, 0);
//- rjf: produce sort records
typedef struct SortRecord SortRecord;
struct SortRecord
@@ -1255,7 +1264,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
U64 sort_records_count = all_items->total_count;
SortRecord *sort_records = 0;
SortRecord *sort_records__swap = 0;
ProfScope("produce sort records")
if(!cancelled) ProfScope("produce sort records")
{
if(lane_idx() == 0)
{
@@ -1283,7 +1292,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
lane_sync();
//- rjf: sort records
ProfScope("sort records")
if(!cancelled) ProfScope("sort records")
{
//- rjf: set up common data
U64 bits_per_digit = 8;
@@ -1382,7 +1391,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
//- rjf: produce final array
DI_SearchItemArray items = {0};
ProfScope("produce final array")
if(!cancelled) ProfScope("produce final array")
{
if(lane_idx() == 0)
{
@@ -1401,11 +1410,20 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
}
lane_sync();
//- rjf: bundle as artifact
artifact.u64[0] = (U64)arenas;
artifact.u64[1] = arenas_count;
artifact.u64[2] = (U64)items.v;
artifact.u64[3] = items.count;
//- rjf: bundle as artifact
if(!cancelled)
{
artifact.u64[0] = (U64)arenas;
artifact.u64[1] = arenas_count;
artifact.u64[2] = (U64)items.v;
artifact.u64[3] = items.count;
}
//- rjf: release results on cancel
else
{
arena_release(arena);
}
}
scratch_end(scratch);
access_close(access);