cancellation mechanism for debug info fuzzy searching

This commit is contained in:
Ryan Fleury
2024-11-11 10:38:52 -08:00
parent ff2e9df705
commit 5893ab94cb
2 changed files with 60 additions and 13 deletions
+59 -13
View File
@@ -672,16 +672,21 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams
di_scope_touch_search_node__stripe_mutex_r_guarded(scope, node);
items = node->items;
stale = !str8_match(query, node->buckets[node->bucket_read_gen%ArrayCount(node->buckets)].query, 0);
if(stale_out != 0)
{
*stale_out = stale;
}
}
if(stale_out != 0)
{
*stale_out = stale;
}
// rjf: if stale -> request again
if(stale && node->bucket_read_gen <= node->bucket_write_gen && node->bucket_write_gen < node->bucket_read_gen + ArrayCount(node->buckets)-1)
{
node->bucket_write_gen += 1;
if(node->bucket_write_gen >= node->bucket_items_gen + ArrayCount(node->buckets))
{
MemoryZeroStruct(&node->items);
MemoryZeroStruct(&items);
}
U64 new_bucket_idx = node->bucket_write_gen%ArrayCount(node->buckets);
arena_clear(node->buckets[new_bucket_idx].arena);
node->buckets[new_bucket_idx].query = push_str8_copy(node->buckets[new_bucket_idx].arena, query);
@@ -1166,6 +1171,8 @@ di_u2s_dequeue_req(U64 thread_idx)
typedef struct DI_SearchWorkIn DI_SearchWorkIn;
struct DI_SearchWorkIn
{
U128 key;
U64 initial_bucket_write_gen;
Arena **work_thread_arenas;
RDI_Parsed *rdi;
RDI_SectionKind section_kind;
@@ -1176,6 +1183,7 @@ struct DI_SearchWorkIn
typedef struct DI_SearchWorkOut DI_SearchWorkOut;
struct DI_SearchWorkOut
{
B32 cancelled;
DI_SearchItemChunkList items;
};
ASYNC_WORK_DEF(di_search_work)
@@ -1189,6 +1197,11 @@ ASYNC_WORK_DEF(di_search_work)
in->work_thread_arenas[thread_idx] = arena_alloc();
}
Arena *arena = in->work_thread_arenas[thread_idx];
U128 key = in->key;
U64 slot_idx = key.u64[0]%di_shared->search_slots_count;
U64 stripe_idx = slot_idx%di_shared->search_stripes_count;
DI_SearchSlot * slot = &di_shared->search_slots[slot_idx];
DI_SearchStripe * stripe = &di_shared->search_stripes[stripe_idx];
//- rjf: setup output
DI_SearchWorkOut *out = push_array(arena, DI_SearchWorkOut, 1);
@@ -1222,8 +1235,29 @@ ASYNC_WORK_DEF(di_search_work)
}
//- rjf: loop through table, gather matches
for(U64 idx = in->element_range.min; idx < in->element_range.max && idx < element_count; idx += 1)
B32 cancelled = 0;
for(U64 idx = in->element_range.min; (idx < in->element_range.max && idx < element_count); idx += 1)
{
//- rjf: every so often, check the key's write gen - if it has been bumped, then cancel
if(idx%100 == 0)
{
OS_MutexScopeR(stripe->rw_mutex)
{
for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->key, key) && n->bucket_write_gen != in->initial_bucket_write_gen)
{
cancelled = 1;
break;
}
}
}
}
if(cancelled)
{
break;
}
//- rjf: get element, map to string; if empty, continue to next element
void *element = (U8 *)table_base + element_size*idx;
U32 *name_idx_ptr = (U32 *)((U8 *)element + element_name_idx_off);
@@ -1263,6 +1297,7 @@ ASYNC_WORK_DEF(di_search_work)
out->items.total_count += 1;
}
}
out->cancelled = cancelled;
ProfEnd();
return out;
}
@@ -1311,6 +1346,7 @@ di_search_thread__entry_point(void *p)
Arena *arena = 0;
String8 query = {0};
DI_SearchParams params = {0};
U64 initial_bucket_write_gen = 0;
OS_MutexScopeR(stripe->rw_mutex)
{
for(DI_SearchNode *n = slot->first; n != 0; n = n->next)
@@ -1321,6 +1357,7 @@ di_search_thread__entry_point(void *p)
arena = n->buckets[bucket_idx].arena;
query = push_str8_copy(scratch.arena, n->buckets[bucket_idx].query);
params = di_search_params_copy(scratch.arena, &n->buckets[bucket_idx].params);
initial_bucket_write_gen = n->bucket_write_gen;
break;
}
}
@@ -1350,28 +1387,33 @@ di_search_thread__entry_point(void *p)
for(U64 task_in_this_rdi_idx = 0; task_in_this_rdi_idx < tasks_per_this_rdi; task_in_this_rdi_idx += 1)
{
DI_SearchWorkIn *in = push_array(scratch.arena, DI_SearchWorkIn, 1);
in->work_thread_arenas = work_thread_arenas;
in->rdi = rdi;
in->section_kind = params.target;
in->element_range = r1u64(task_in_this_rdi_idx*elements_per_task, (task_in_this_rdi_idx+1)*elements_per_task);
in->element_range.max = ClampTop(in->element_range.max, element_count_in_this_rdi);
in->query = query;
in->dbgi_idx = idx;
in->key = key;
in->initial_bucket_write_gen = initial_bucket_write_gen;
in->work_thread_arenas = work_thread_arenas;
in->rdi = rdi;
in->section_kind = params.target;
in->element_range = r1u64(task_in_this_rdi_idx*elements_per_task, (task_in_this_rdi_idx+1)*elements_per_task);
in->element_range.max = ClampTop(in->element_range.max, element_count_in_this_rdi);
in->query = query;
in->dbgi_idx = idx;
async_task_list_push(scratch.arena, &tasks, async_task_launch(scratch.arena, di_search_work, .input = in));
}
}
}
//- rjf: join tasks, form final list
B32 cancelled = 0;
DI_SearchItemChunkList items_list = {0};
for(ASYNC_TaskNode *n = tasks.first; n != 0; n = n->next)
{
DI_SearchWorkOut *out = async_task_join_struct(n->v, DI_SearchWorkOut);
di_search_item_chunk_list_concat_in_place(&items_list, &out->items);
cancelled = (cancelled || out->cancelled);
}
//- rjf: list -> array
DI_SearchItemArray items = {0};
if(!cancelled)
{
items.count = items_list.total_count;
items.v = push_array(arena, DI_SearchItem, items.count);
@@ -1418,7 +1460,11 @@ di_search_thread__entry_point(void *p)
if(n->scope_refcount == 0)
{
n->bucket_read_gen += 1;
n->items = items;
if(!cancelled)
{
n->items = items;
n->bucket_items_gen = initial_bucket_write_gen;
}
done = 1;
}
found = 1;
+1
View File
@@ -190,6 +190,7 @@ struct DI_SearchNode
U64 last_update_tick_idx;
U64 bucket_read_gen;
U64 bucket_write_gen;
U64 bucket_items_gen;
DI_SearchBucket buckets[6];
DI_SearchItemArray items;
};