diff --git a/src/dbgi/dbgi.c b/src/dbgi/dbgi.c index 3979d149..c6a73889 100644 --- a/src/dbgi/dbgi.c +++ b/src/dbgi/dbgi.c @@ -672,16 +672,21 @@ di_search_items_from_key_params_query(DI_Scope *scope, U128 key, DI_SearchParams di_scope_touch_search_node__stripe_mutex_r_guarded(scope, node); items = node->items; stale = !str8_match(query, node->buckets[node->bucket_read_gen%ArrayCount(node->buckets)].query, 0); - if(stale_out != 0) - { - *stale_out = stale; - } + } + if(stale_out != 0) + { + *stale_out = stale; } // rjf: if stale -> request again if(stale && node->bucket_read_gen <= node->bucket_write_gen && node->bucket_write_gen < node->bucket_read_gen + ArrayCount(node->buckets)-1) { node->bucket_write_gen += 1; + if(node->bucket_write_gen >= node->bucket_items_gen + ArrayCount(node->buckets)) + { + MemoryZeroStruct(&node->items); + MemoryZeroStruct(&items); + } U64 new_bucket_idx = node->bucket_write_gen%ArrayCount(node->buckets); arena_clear(node->buckets[new_bucket_idx].arena); node->buckets[new_bucket_idx].query = push_str8_copy(node->buckets[new_bucket_idx].arena, query); @@ -1166,6 +1171,8 @@ di_u2s_dequeue_req(U64 thread_idx) typedef struct DI_SearchWorkIn DI_SearchWorkIn; struct DI_SearchWorkIn { + U128 key; + U64 initial_bucket_write_gen; Arena **work_thread_arenas; RDI_Parsed *rdi; RDI_SectionKind section_kind; @@ -1176,6 +1183,7 @@ struct DI_SearchWorkIn typedef struct DI_SearchWorkOut DI_SearchWorkOut; struct DI_SearchWorkOut { + B32 cancelled; DI_SearchItemChunkList items; }; ASYNC_WORK_DEF(di_search_work) @@ -1189,6 +1197,11 @@ ASYNC_WORK_DEF(di_search_work) in->work_thread_arenas[thread_idx] = arena_alloc(); } Arena *arena = in->work_thread_arenas[thread_idx]; + U128 key = in->key; + U64 slot_idx = key.u64[0]%di_shared->search_slots_count; + U64 stripe_idx = slot_idx%di_shared->search_stripes_count; + DI_SearchSlot * slot = &di_shared->search_slots[slot_idx]; + DI_SearchStripe * stripe = &di_shared->search_stripes[stripe_idx]; //- rjf: setup output DI_SearchWorkOut *out = push_array(arena, DI_SearchWorkOut, 1); @@ -1222,8 +1235,29 @@ ASYNC_WORK_DEF(di_search_work) } //- rjf: loop through table, gather matches - for(U64 idx = in->element_range.min; idx < in->element_range.max && idx < element_count; idx += 1) + B32 cancelled = 0; + for(U64 idx = in->element_range.min; (idx < in->element_range.max && idx < element_count); idx += 1) { + //- rjf: every so often, check the key's write gen - if it has been bumped, then cancel + if(idx%100 == 0) + { + OS_MutexScopeR(stripe->rw_mutex) + { + for(DI_SearchNode *n = slot->first; n != 0; n = n->next) + { + if(u128_match(n->key, key) && n->bucket_write_gen != in->initial_bucket_write_gen) + { + cancelled = 1; + break; + } + } + } + } + if(cancelled) + { + break; + } + //- rjf: get element, map to string; if empty, continue to next element void *element = (U8 *)table_base + element_size*idx; U32 *name_idx_ptr = (U32 *)((U8 *)element + element_name_idx_off); @@ -1263,6 +1297,7 @@ ASYNC_WORK_DEF(di_search_work) out->items.total_count += 1; } } + out->cancelled = cancelled; ProfEnd(); return out; } @@ -1311,6 +1346,7 @@ di_search_thread__entry_point(void *p) Arena *arena = 0; String8 query = {0}; DI_SearchParams params = {0}; + U64 initial_bucket_write_gen = 0; OS_MutexScopeR(stripe->rw_mutex) { for(DI_SearchNode *n = slot->first; n != 0; n = n->next) @@ -1321,6 +1357,7 @@ di_search_thread__entry_point(void *p) arena = n->buckets[bucket_idx].arena; query = push_str8_copy(scratch.arena, n->buckets[bucket_idx].query); params = di_search_params_copy(scratch.arena, &n->buckets[bucket_idx].params); + initial_bucket_write_gen = n->bucket_write_gen; break; } } @@ -1350,28 +1387,33 @@ di_search_thread__entry_point(void *p) for(U64 task_in_this_rdi_idx = 0; task_in_this_rdi_idx < tasks_per_this_rdi; task_in_this_rdi_idx += 1) { DI_SearchWorkIn *in = push_array(scratch.arena, DI_SearchWorkIn, 1); - in->work_thread_arenas = work_thread_arenas; - in->rdi = rdi; - in->section_kind = params.target; - in->element_range = r1u64(task_in_this_rdi_idx*elements_per_task, (task_in_this_rdi_idx+1)*elements_per_task); - in->element_range.max = ClampTop(in->element_range.max, element_count_in_this_rdi); - in->query = query; - in->dbgi_idx = idx; + in->key = key; + in->initial_bucket_write_gen = initial_bucket_write_gen; + in->work_thread_arenas = work_thread_arenas; + in->rdi = rdi; + in->section_kind = params.target; + in->element_range = r1u64(task_in_this_rdi_idx*elements_per_task, (task_in_this_rdi_idx+1)*elements_per_task); + in->element_range.max = ClampTop(in->element_range.max, element_count_in_this_rdi); + in->query = query; + in->dbgi_idx = idx; async_task_list_push(scratch.arena, &tasks, async_task_launch(scratch.arena, di_search_work, .input = in)); } } } //- rjf: join tasks, form final list + B32 cancelled = 0; DI_SearchItemChunkList items_list = {0}; for(ASYNC_TaskNode *n = tasks.first; n != 0; n = n->next) { DI_SearchWorkOut *out = async_task_join_struct(n->v, DI_SearchWorkOut); di_search_item_chunk_list_concat_in_place(&items_list, &out->items); + cancelled = (cancelled || out->cancelled); } //- rjf: list -> array DI_SearchItemArray items = {0}; + if(!cancelled) { items.count = items_list.total_count; items.v = push_array(arena, DI_SearchItem, items.count); @@ -1418,7 +1460,11 @@ di_search_thread__entry_point(void *p) if(n->scope_refcount == 0) { n->bucket_read_gen += 1; - n->items = items; + if(!cancelled) + { + n->items = items; + n->bucket_items_gen = initial_bucket_write_gen; + } done = 1; } found = 1; diff --git a/src/dbgi/dbgi.h b/src/dbgi/dbgi.h index 8fc799ed..03781f13 100644 --- a/src/dbgi/dbgi.h +++ b/src/dbgi/dbgi.h @@ -190,6 +190,7 @@ struct DI_SearchNode U64 last_update_tick_idx; U64 bucket_read_gen; U64 bucket_write_gen; + U64 bucket_items_gen; DI_SearchBucket buckets[6]; DI_SearchItemArray items; };