diff --git a/src/async/async.c b/src/async/async.c index 9548b577..d67e657c 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -10,10 +10,16 @@ async_init(void) Arena *arena = arena_alloc(); async_shared = push_array(arena, ASYNC_Shared, 1); async_shared->arena = arena; - async_shared->u2w_ring_size = MB(8); - async_shared->u2w_ring_base = push_array_no_zero(arena, U8, async_shared->u2w_ring_size); - async_shared->u2w_ring_mutex = os_mutex_alloc(); - async_shared->u2w_ring_cv = os_condition_variable_alloc(); + for EachEnumVal(ASYNC_Priority, p) + { + ASYNC_Ring *ring = &async_shared->rings[p]; + ring->ring_size = MB(8); + ring->ring_base = push_array_no_zero(arena, U8, ring->ring_size); + ring->ring_mutex = os_mutex_alloc(); + ring->ring_cv = os_condition_variable_alloc(); + } + async_shared->ring_mutex = os_mutex_alloc(); + async_shared->ring_cv = os_condition_variable_alloc(); async_shared->work_threads_count = Max(1, os_get_system_info()->logical_processor_count-1); async_shared->work_threads = push_array(arena, OS_Handle, async_shared->work_threads_count); for EachIndex(idx, async_shared->work_threads_count) @@ -37,6 +43,9 @@ async_thread_count(void) internal B32 async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params) { + // rjf: choose ring + ASYNC_Ring *ring = &async_shared->rings[params->priority]; + // rjf: build work package ASYNC_Work work = {0}; work.work_function = work_function; @@ -50,7 +59,7 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params // thread, and skip ring buffer if so. B32 queued_in_ring_buffer = 0; B32 need_to_execute_on_this_thread = 0; - OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) + OS_MutexScope(ring->ring_mutex) for(;;) { U64 num_available_work_threads = (async_shared->work_threads_count - ins_atomic_u64_eval(&async_shared->work_threads_live_count)); if(num_available_work_threads == 0 && async_work_thread_depth > 0) @@ -58,8 +67,8 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params need_to_execute_on_this_thread = 1; break; } - U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; - U64 available_size = async_shared->u2w_ring_size - unconsumed_size; + U64 unconsumed_size = ring->ring_write_pos - ring->ring_read_pos; + U64 available_size = ring->ring_size - unconsumed_size; if(available_size >= sizeof(work)) { queued_in_ring_buffer = 1; @@ -67,20 +76,21 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params { os_semaphore_take(params->semaphore, max_U64); } - async_shared->u2w_ring_write_pos += ring_write_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_write_pos, &work); + ring->ring_write_pos += ring_write_struct(ring->ring_base, ring->ring_size, ring->ring_write_pos, &work); break; } if(os_now_microseconds() >= params->endt_us) { break; } - os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us); + os_condition_variable_wait(ring->ring_cv, ring->ring_mutex, params->endt_us); } // rjf: broadcast ring buffer cv if we wrote successfully if(queued_in_ring_buffer) { - os_condition_variable_broadcast(async_shared->u2w_ring_cv); + os_condition_variable_broadcast(ring->ring_cv); + os_condition_variable_broadcast(async_shared->ring_cv); } // rjf: if we did not queue successfully, and we have determined that @@ -145,18 +155,38 @@ async_task_join(ASYNC_Task *task) internal ASYNC_Work async_pop_work(void) { + ProfBeginFunction(); ASYNC_Work work = {0}; - OS_MutexScope(async_shared->u2w_ring_mutex) for(;;) + B32 done = 0; + ASYNC_Priority taken_priority = ASYNC_Priority_Low; + OS_MutexScope(async_shared->ring_mutex) for(;!done;) { - U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos; - if(unconsumed_size >= sizeof(work)) + for(ASYNC_Priority priority = ASYNC_Priority_High;; priority = (ASYNC_Priority)(priority - 1)) { - async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work); - break; + ASYNC_Ring *ring = &async_shared->rings[priority]; + OS_MutexScope(ring->ring_mutex) + { + U64 unconsumed_size = ring->ring_write_pos - ring->ring_read_pos; + if(unconsumed_size >= sizeof(work)) + { + ring->ring_read_pos += ring_read_struct(ring->ring_base, ring->ring_size, ring->ring_read_pos, &work); + done = 1; + taken_priority = priority; + } + } + if(priority == ASYNC_Priority_Low) + { + break; + } + } + if(!done) + { + os_condition_variable_wait(async_shared->ring_cv, async_shared->ring_mutex, max_U64); } - os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64); } - os_condition_variable_broadcast(async_shared->u2w_ring_cv); + os_condition_variable_broadcast(async_shared->ring_cv); + os_condition_variable_broadcast(async_shared->rings[taken_priority].ring_cv); + ProfEnd(); return work; } diff --git a/src/async/async.h b/src/async/async.h index 5f2b76d0..4da20144 100644 --- a/src/async/async.h +++ b/src/async/async.h @@ -14,6 +14,14 @@ typedef ASYNC_WORK_SIG(ASYNC_WorkFunctionType); //////////////////////////////// //~ rjf: Work Types +typedef enum ASYNC_Priority +{ + ASYNC_Priority_Low, + ASYNC_Priority_High, + ASYNC_Priority_COUNT +} +ASYNC_Priority; + typedef struct ASYNC_WorkParams ASYNC_WorkParams; struct ASYNC_WorkParams { @@ -22,6 +30,7 @@ struct ASYNC_WorkParams OS_Handle semaphore; U64 *completion_counter; U64 endt_us; + ASYNC_Priority priority; }; typedef struct ASYNC_Work ASYNC_Work; @@ -62,18 +71,26 @@ struct ASYNC_TaskList //////////////////////////////// //~ rjf: Shared State Bundle +typedef struct ASYNC_Ring ASYNC_Ring; +struct ASYNC_Ring +{ + U64 ring_size; + U8 *ring_base; + U64 ring_write_pos; + U64 ring_read_pos; + OS_Handle ring_mutex; + OS_Handle ring_cv; +}; + typedef struct ASYNC_Shared ASYNC_Shared; struct ASYNC_Shared { Arena *arena; - // rjf: user -> work thread ring buffer - U64 u2w_ring_size; - U8 *u2w_ring_base; - U64 u2w_ring_write_pos; - U64 u2w_ring_read_pos; - OS_Handle u2w_ring_mutex; - OS_Handle u2w_ring_cv; + // rjf: user -> work thread ring buffers + ASYNC_Ring rings[ASYNC_Priority_COUNT]; + OS_Handle ring_mutex; + OS_Handle ring_cv; // rjf: work threads OS_Handle *work_threads; @@ -102,7 +119,7 @@ internal U64 async_thread_count(void); //~ rjf: Work Kickoffs internal B32 async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params); -#define async_push_work(function, ...) async_push_work_((function), &(ASYNC_WorkParams){.endt_us = max_U64, __VA_ARGS__}) +#define async_push_work(function, ...) async_push_work_((function), &(ASYNC_WorkParams){.endt_us = max_U64, .priority = ASYNC_Priority_High, __VA_ARGS__}) //////////////////////////////// //~ rjf: Task-Based Work Helper diff --git a/src/ctrl/ctrl_core.c b/src/ctrl/ctrl_core.c index b75ef35b..e282e96c 100644 --- a/src/ctrl/ctrl_core.c +++ b/src/ctrl/ctrl_core.c @@ -3074,12 +3074,11 @@ ctrl_u2c_push_msgs(CTRL_MsgList *msgs, U64 endt_us) { U64 unconsumed_size = (ctrl_state->u2c_ring_write_pos-ctrl_state->u2c_ring_read_pos); U64 available_size = ctrl_state->u2c_ring_size-unconsumed_size; - if(available_size >= sizeof(U64) + msgs_srlzed_baked.size) + U64 needed_size = sizeof(msgs_srlzed_baked.size) + msgs_srlzed_baked.size; + if(available_size >= needed_size) { ctrl_state->u2c_ring_write_pos += ring_write_struct(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_write_pos, &msgs_srlzed_baked.size); ctrl_state->u2c_ring_write_pos += ring_write(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_write_pos, msgs_srlzed_baked.str, msgs_srlzed_baked.size); - ctrl_state->u2c_ring_write_pos += 7; - ctrl_state->u2c_ring_write_pos -= ctrl_state->u2c_ring_write_pos%8; good = 1; break; } @@ -3112,8 +3111,6 @@ ctrl_u2c_pop_msgs(Arena *arena) msgs_srlzed_baked.size = size_to_decode; msgs_srlzed_baked.str = push_array_no_zero(scratch.arena, U8, msgs_srlzed_baked.size); ctrl_state->u2c_ring_read_pos += ring_read(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_read_pos, msgs_srlzed_baked.str, size_to_decode); - ctrl_state->u2c_ring_read_pos += 7; - ctrl_state->u2c_ring_read_pos -= ctrl_state->u2c_ring_read_pos%8; break; } os_condition_variable_wait(ctrl_state->u2c_ring_cv, ctrl_state->u2c_ring_mutex, max_U64); @@ -3140,12 +3137,11 @@ ctrl_c2u_push_events(CTRL_EventList *events) { U64 unconsumed_size = (ctrl_state->c2u_ring_write_pos-ctrl_state->c2u_ring_read_pos); U64 available_size = ctrl_state->c2u_ring_size-unconsumed_size; - if(available_size >= sizeof(U64) + event_srlzed.size) + U64 needed_size = sizeof(event_srlzed.size) + event_srlzed.size; + if(available_size >= needed_size) { ctrl_state->c2u_ring_write_pos += ring_write_struct(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_write_pos, &event_srlzed.size); ctrl_state->c2u_ring_write_pos += ring_write(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_write_pos, event_srlzed.str, event_srlzed.size); - ctrl_state->c2u_ring_write_pos += 7; - ctrl_state->c2u_ring_write_pos -= ctrl_state->c2u_ring_write_pos%8; break; } os_condition_variable_wait(ctrl_state->c2u_ring_cv, ctrl_state->c2u_ring_mutex, os_now_microseconds()+100); @@ -3177,8 +3173,6 @@ ctrl_c2u_pop_events(Arena *arena) event_srlzed.size = size_to_decode; event_srlzed.str = push_array_no_zero(scratch.arena, U8, event_srlzed.size); ctrl_state->c2u_ring_read_pos += ring_read(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_read_pos, event_srlzed.str, event_srlzed.size); - ctrl_state->c2u_ring_read_pos += 7; - ctrl_state->c2u_ring_read_pos -= ctrl_state->c2u_ring_read_pos%8; CTRL_Event *new_event = ctrl_event_list_push(arena, &events); *new_event = ctrl_event_from_serialized_string(arena, event_srlzed); } @@ -4191,22 +4185,21 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg, // searched yet, but it has >4 child branches, meaning it looks like // a project directory // + DI_KeyList preemptively_loaded_keys = {0}; for(CTRL_DbgDirNode *dir_node = parent_dir_node; dir_node != 0; dir_node = dir_node->parent) { if(dir_node->search_count == 0 && (dir_node == parent_dir_node || dir_node->child_count >= 4)) { - Temp temp = temp_begin(scratch.arena); - //- rjf: form full path of this directory node String8List dir_node_path_parts = {0}; for(CTRL_DbgDirNode *n = dir_node; n != 0; n = n->parent) { if(n->name.size != 0) { - str8_list_push_front(temp.arena, &dir_node_path_parts, n->name); + str8_list_push_front(scratch.arena, &dir_node_path_parts, n->name); } } - String8 dir_node_path = str8_list_join(temp.arena, &dir_node_path_parts, &(StringJoin){.sep = str8_lit("/")}); + String8 dir_node_path = str8_list_join(scratch.arena, &dir_node_path_parts, &(StringJoin){.sep = str8_lit("/")}); //- rjf: iterate downwards from this directory recursively, locate // debug infos, and pre-emptively convert @@ -4232,10 +4225,9 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg, // kick off pre-emptive conversion, and gather key. if folders // are encountered, then add them to the tree, and kick off a // sub-search if needed. - DI_KeyList preemptively_loaded_keys = {0}; - OS_FileIter *it = os_file_iter_begin(temp.arena, t->path, 0); + OS_FileIter *it = os_file_iter_begin(scratch.arena, t->path, 0); U64 idx = 0; - for(OS_FileInfo info = {0}; idx < 64 && os_file_iter_next(temp.arena, it, &info); idx += 1) + for(OS_FileInfo info = {0}; idx < 16384 && os_file_iter_next(scratch.arena, it, &info); idx += 1) { // rjf: folder -> do sub-search if not duplicative if(info.props.flags & FilePropertyFlag_IsFolder && task_count < 16384 && !str8_match(str8_prefix(info.name, 1), str8_lit("."), 0)) @@ -4259,9 +4251,9 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg, } if(existing_dir_child->search_count == 0) { - Task *task = push_array(temp.arena, Task, 1); + Task *task = push_array(scratch.arena, Task, 1); task->node = existing_dir_child; - task->path = push_str8f(temp.arena, "%S/%S", t->path, info.name); + task->path = push_str8f(scratch.arena, "%S/%S", t->path, info.name); SLLQueuePush(first_task, last_task, task); task_count += 1; } @@ -4273,30 +4265,26 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg, str8_match(str8_skip_last_dot(info.name), debug_info_ext, StringMatchFlag_CaseInsensitive) && !str8_match(loaded_di_name, info.name, StringMatchFlag_CaseInsensitive)) { - DI_Key key = {push_str8f(temp.arena, "%S/%S", t->path, info.name), info.props.modified}; + DI_Key key = {push_str8f(scratch.arena, "%S/%S", t->path, info.name), info.props.modified}; di_open(&key); - di_key_list_push(temp.arena, &preemptively_loaded_keys, &key); + di_key_list_push(scratch.arena, &preemptively_loaded_keys, &key); } } os_file_iter_end(it); - - // rjf: for each pre-emptively loaded key, wait for the initial - // load task to be done - for(DI_KeyNode *n = preemptively_loaded_keys.first; n != 0; n = n->next) - { - DI_Scope *di_scope = di_scope_open(); - RDI_Parsed *rdi = di_rdi_from_key(di_scope, &n->v, max_U64); - di_scope_close(di_scope); - di_close(&n->v); - } - ProfEnd(); } - - temp_end(temp); - } } + + //- rjf: for each pre-emptively loaded key, wait for the initial + // load task to be done + for(DI_KeyNode *n = preemptively_loaded_keys.first; n != 0; n = n->next) + { + DI_Scope *di_scope = di_scope_open(); + RDI_Parsed *rdi = di_rdi_from_key(di_scope, &n->v, max_U64); + di_scope_close(di_scope); + di_close(&n->v); + } } } diff --git a/src/dasm_cache/dasm_cache.c b/src/dasm_cache/dasm_cache.c index 92cdc9d6..a8aa6776 100644 --- a/src/dasm_cache/dasm_cache.c +++ b/src/dasm_cache/dasm_cache.c @@ -457,8 +457,6 @@ dasm_u2p_enqueue_req(U128 hash, DASM_Params *params, U64 endt_us) dasm_shared->u2p_ring_write_pos += ring_write_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, ¶ms->dbgi_key.path.size); dasm_shared->u2p_ring_write_pos += ring_write(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, params->dbgi_key.path.str, params->dbgi_key.path.size); dasm_shared->u2p_ring_write_pos += ring_write_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, ¶ms->dbgi_key.min_timestamp); - dasm_shared->u2p_ring_write_pos += 7; - dasm_shared->u2p_ring_write_pos -= dasm_shared->u2p_ring_write_pos%8; break; } if(os_now_microseconds() >= endt_us) @@ -492,8 +490,6 @@ dasm_u2p_dequeue_req(Arena *arena, U128 *hash_out, DASM_Params *params_out) params_out->dbgi_key.path.str = push_array(arena, U8, params_out->dbgi_key.path.size); dasm_shared->u2p_ring_read_pos += ring_read(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_read_pos, params_out->dbgi_key.path.str, params_out->dbgi_key.path.size); dasm_shared->u2p_ring_read_pos += ring_read_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_read_pos, ¶ms_out->dbgi_key.min_timestamp); - dasm_shared->u2p_ring_read_pos += 7; - dasm_shared->u2p_ring_read_pos -= dasm_shared->u2p_ring_read_pos%8; break; } os_condition_variable_wait(dasm_shared->u2p_ring_cv, dasm_shared->u2p_ring_mutex, max_U64); diff --git a/src/dbgi/dbgi.c b/src/dbgi/dbgi.c index f73a777b..ea4e65a8 100644 --- a/src/dbgi/dbgi.c +++ b/src/dbgi/dbgi.c @@ -728,13 +728,12 @@ di_u2p_enqueue_key(DI_Key *key, U64 endt_us) { U64 unconsumed_size = di_shared->u2p_ring_write_pos - di_shared->u2p_ring_read_pos; U64 available_size = di_shared->u2p_ring_size - unconsumed_size; - if(available_size >= sizeof(key->path.size) + key->path.size + sizeof(key->min_timestamp)) + U64 needed_size = sizeof(key->min_timestamp) + sizeof(key->path.size) + key->path.size; + if(available_size >= needed_size) { + di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->min_timestamp); di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->path.size); di_shared->u2p_ring_write_pos += ring_write(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, key->path.str, key->path.size); - di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->min_timestamp); - di_shared->u2p_ring_write_pos += 7; - di_shared->u2p_ring_write_pos -= di_shared->u2p_ring_write_pos%8; sent = 1; break; } @@ -759,12 +758,10 @@ di_u2p_dequeue_key(Arena *arena, DI_Key *out_key) U64 unconsumed_size = di_shared->u2p_ring_write_pos - di_shared->u2p_ring_read_pos; if(unconsumed_size >= sizeof(out_key->path.size) + sizeof(out_key->min_timestamp)) { + di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->min_timestamp); di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->path.size); out_key->path.str = push_array(arena, U8, out_key->path.size); di_shared->u2p_ring_read_pos += ring_read(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, out_key->path.str, out_key->path.size); - di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->min_timestamp); - di_shared->u2p_ring_read_pos += 7; - di_shared->u2p_ring_read_pos -= di_shared->u2p_ring_read_pos%8; break; } os_condition_variable_wait(di_shared->u2p_ring_cv, di_shared->u2p_ring_mutex, max_U64); @@ -779,14 +776,12 @@ di_p2u_push_event(DI_Event *event) { U64 unconsumed_size = (di_shared->p2u_ring_write_pos-di_shared->p2u_ring_read_pos); U64 available_size = di_shared->p2u_ring_size-unconsumed_size; - U64 needed_size = sizeof(DI_EventKind) + sizeof(U64) + event->string.size; + U64 needed_size = sizeof(event->kind) + sizeof(event->string.size) + event->string.size; if(available_size >= needed_size) { di_shared->p2u_ring_write_pos += ring_write_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, &event->kind); di_shared->p2u_ring_write_pos += ring_write_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, &event->string.size); di_shared->p2u_ring_write_pos += ring_write(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, event->string.str, event->string.size); - di_shared->p2u_ring_write_pos += 7; - di_shared->p2u_ring_write_pos -= di_shared->p2u_ring_write_pos%8; break; } os_condition_variable_wait(di_shared->p2u_ring_cv, di_shared->p2u_ring_mutex, max_U64); @@ -810,8 +805,6 @@ di_p2u_pop_events(Arena *arena, U64 endt_us) di_shared->p2u_ring_read_pos += ring_read_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_read_pos, &n->v.string.size); n->v.string.str = push_array_no_zero(arena, U8, n->v.string.size); di_shared->p2u_ring_read_pos += ring_read(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_read_pos, n->v.string.str, n->v.string.size); - di_shared->p2u_ring_read_pos += 7; - di_shared->p2u_ring_read_pos -= di_shared->p2u_ring_read_pos%8; } else if(os_now_microseconds() >= endt_us) { @@ -1618,24 +1611,21 @@ di_match_store_section_kind_from_name(DI_MatchStore *store, String8 name, U64 en DLLInsert_NP(store->first_lru_match_name, store->last_lru_match_name, (DI_MatchNameNode *)0, node, lru_next, lru_prev); // rjf: if this node is new w.r.t. the store's current parameters, request it - if(node->req_params_hash != store->params_hash) + U64 completed_params_hash = ins_atomic_u64_eval(&node->cmp_params_hash); + if(completed_params_hash != store->params_hash && node->req_count == ins_atomic_u64_eval(&node->cmp_count)) { B32 sent = 0; OS_MutexScope(store->u2m_ring_mutex) for(;;) { U64 unconsumed_size = store->u2m_ring_write_pos - store->u2m_ring_read_pos; U64 available_size = store->u2m_ring_size - unconsumed_size; - U64 needed_size = sizeof(&node) + sizeof(U64) + sizeof(U64) + name.size; - needed_size += 7; - needed_size -= needed_size%8; + U64 needed_size = sizeof(&node) + sizeof(node->alloc_gen) + sizeof(name.size) + name.size; if(available_size >= needed_size) { store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &node); store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &node->alloc_gen); store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &name.size); store->u2m_ring_write_pos += ring_write(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, name.str, name.size); - store->u2m_ring_write_pos += 7; - store->u2m_ring_write_pos -= store->u2m_ring_write_pos%8; sent = 1; break; } @@ -1648,13 +1638,14 @@ di_match_store_section_kind_from_name(DI_MatchStore *store, String8 name, U64 en if(sent) { os_condition_variable_broadcast(store->u2m_ring_cv); - async_push_work(di_match_work, .input = store); + async_push_work(di_match_work, .input = store, .priority = ASYNC_Priority_Low, .completion_counter = &node->cmp_count); node->req_params_hash = store->params_hash; + node->req_count += 1; } } // rjf: if this node's state is stale, wait for it if we need to - if(os_now_microseconds() < endt_us && node->req_params_hash != ins_atomic_u64_eval(&node->cmp_params_hash)) + if(os_now_microseconds() < endt_us && node->req_params_hash != completed_params_hash) { OS_MutexScopeR(store->match_rw_mutex) for(;;) { @@ -1696,8 +1687,6 @@ ASYNC_WORK_DEF(di_match_work) store->u2m_ring_read_pos += ring_read_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_read_pos, &name.size); name.str = push_array(scratch.arena, U8, name.size); store->u2m_ring_read_pos += ring_read(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_read_pos, name.str, name.size); - store->u2m_ring_read_pos += 7; - store->u2m_ring_read_pos -= store->u2m_ring_read_pos%8; break; } os_condition_variable_wait(store->u2m_ring_cv, store->u2m_ring_mutex, max_U64); diff --git a/src/dbgi/dbgi.h b/src/dbgi/dbgi.h index 8c7d35cc..bb567f00 100644 --- a/src/dbgi/dbgi.h +++ b/src/dbgi/dbgi.h @@ -278,10 +278,12 @@ struct DI_MatchNameNode U64 first_gen_touched; U64 last_gen_touched; U64 req_params_hash; + U64 req_count; String8 name; U64 hash; // rjf: atomically written by match work + U64 cmp_count; U64 cmp_params_hash; RDI_SectionKind section_kind; // DI_Match *first_match; diff --git a/src/file_stream/file_stream.c b/src/file_stream/file_stream.c index 3246a0f8..959e6d61 100644 --- a/src/file_stream/file_stream.c +++ b/src/file_stream/file_stream.c @@ -248,15 +248,14 @@ fs_u2s_enqueue_req(Rng1U64 range, String8 path, U64 endt_us) { U64 unconsumed_size = fs_shared->u2s_ring_write_pos - fs_shared->u2s_ring_read_pos; U64 available_size = fs_shared->u2s_ring_size - unconsumed_size; - if(available_size >= sizeof(U64) + path.size) + U64 needed_size = sizeof(range.min) + sizeof(range.max) + sizeof(path.size) + path.size; + if(available_size >= needed_size) { result = 1; fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &range.min); fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &range.max); fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &path.size); fs_shared->u2s_ring_write_pos += ring_write(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, path.str, path.size); - fs_shared->u2s_ring_write_pos += 7; - fs_shared->u2s_ring_write_pos -= fs_shared->u2s_ring_write_pos%8; break; } os_condition_variable_wait(fs_shared->u2s_ring_cv, fs_shared->u2s_ring_mutex, endt_us); @@ -281,8 +280,6 @@ fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out) fs_shared->u2s_ring_read_pos += ring_read_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_read_pos, &path_out->size); path_out->str = push_array(arena, U8, path_out->size); fs_shared->u2s_ring_read_pos += ring_read(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_read_pos, path_out->str, path_out->size); - fs_shared->u2s_ring_read_pos += 7; - fs_shared->u2s_ring_read_pos -= fs_shared->u2s_ring_read_pos%8; break; } os_condition_variable_wait(fs_shared->u2s_ring_cv, fs_shared->u2s_ring_mutex, max_U64); @@ -420,9 +417,9 @@ fs_detector_thread__entry_point(void *p) range_n = range_n->next) { if(ins_atomic_u64_eval(&range_n->request_count) == ins_atomic_u64_eval(&range_n->completion_count) && - async_push_work(fs_stream_work, .endt_us = os_now_microseconds()+100000, .completion_counter = &range_n->completion_count)) + fs_u2s_enqueue_req(range_n->range, n->path, os_now_microseconds()+100000)) { - fs_u2s_enqueue_req(range_n->range, n->path, max_U64); + async_push_work(fs_stream_work, .completion_counter = &range_n->completion_count); ins_atomic_u64_inc_eval(&range_n->request_count); } } diff --git a/src/mutable_text/mutable_text.c b/src/mutable_text/mutable_text.c index af7a46e3..61b6dc9b 100644 --- a/src/mutable_text/mutable_text.c +++ b/src/mutable_text/mutable_text.c @@ -52,14 +52,13 @@ mtx_enqueue_op(MTX_MutThread *thread, U128 buffer_key, MTX_Op op) { U64 unconsumed_size = thread->ring_write_pos - thread->ring_read_pos; U64 available_size = thread->ring_size - unconsumed_size; - if(available_size >= sizeof(buffer_key) + sizeof(op.range) + sizeof(op.replace.size) + op.replace.size) + U64 needed_size = sizeof(buffer_key) + sizeof(op.range) + sizeof(op.replace.size) + op.replace.size; + if(available_size >= needed_size) { thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &buffer_key); thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &op.range); thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &op.replace.size); thread->ring_write_pos += ring_write(thread->ring_base, thread->ring_size, thread->ring_write_pos, op.replace.str, op.replace.size); - thread->ring_write_pos += 7; - thread->ring_write_pos -= thread->ring_write_pos%8; break; } os_condition_variable_wait(thread->cv, thread->mutex, max_U64); @@ -80,8 +79,6 @@ mtx_dequeue_op(Arena *arena, MTX_MutThread *thread, U128 *buffer_key_out, MTX_Op thread->ring_read_pos += ring_read_struct(thread->ring_base, thread->ring_size, thread->ring_read_pos, &op_out->replace.size); op_out->replace.str = push_array_no_zero(arena, U8, op_out->replace.size); thread->ring_read_pos += ring_read(thread->ring_base, thread->ring_size, thread->ring_read_pos, op_out->replace.str, op_out->replace.size); - thread->ring_read_pos += 7; - thread->ring_read_pos -= thread->ring_read_pos%8; break; } os_condition_variable_wait(thread->cv, thread->mutex, max_U64); diff --git a/src/raddbg/raddbg_core.c b/src/raddbg/raddbg_core.c index cb6d13e3..368a3498 100644 --- a/src/raddbg/raddbg_core.c +++ b/src/raddbg/raddbg_core.c @@ -16582,7 +16582,7 @@ rd_frame(void) // if(ProfIsCapturing()) { - rd_request_frame(); + // rd_request_frame(); } ////////////////////////////// diff --git a/src/text_cache/text_cache.c b/src/text_cache/text_cache.c index d948215f..377ee516 100644 --- a/src/text_cache/text_cache.c +++ b/src/text_cache/text_cache.c @@ -2134,6 +2134,7 @@ txt_u2p_dequeue_req(U128 *hash_out, TXT_LangKind *lang_out) ASYNC_WORK_DEF(txt_parse_work) { ProfBeginFunction(); + //- rjf: get next key U128 hash = {0}; TXT_LangKind lang = TXT_LangKind_Null;