async priorities, strip out incorrect ring position calculations

This commit is contained in:
Ryan Fleury
2024-11-13 18:23:18 -08:00
parent 5a79518dcb
commit 4c0ebc851c
10 changed files with 116 additions and 99 deletions
+47 -17
View File
@@ -10,10 +10,16 @@ async_init(void)
Arena *arena = arena_alloc();
async_shared = push_array(arena, ASYNC_Shared, 1);
async_shared->arena = arena;
async_shared->u2w_ring_size = MB(8);
async_shared->u2w_ring_base = push_array_no_zero(arena, U8, async_shared->u2w_ring_size);
async_shared->u2w_ring_mutex = os_mutex_alloc();
async_shared->u2w_ring_cv = os_condition_variable_alloc();
for EachEnumVal(ASYNC_Priority, p)
{
ASYNC_Ring *ring = &async_shared->rings[p];
ring->ring_size = MB(8);
ring->ring_base = push_array_no_zero(arena, U8, ring->ring_size);
ring->ring_mutex = os_mutex_alloc();
ring->ring_cv = os_condition_variable_alloc();
}
async_shared->ring_mutex = os_mutex_alloc();
async_shared->ring_cv = os_condition_variable_alloc();
async_shared->work_threads_count = Max(1, os_get_system_info()->logical_processor_count-1);
async_shared->work_threads = push_array(arena, OS_Handle, async_shared->work_threads_count);
for EachIndex(idx, async_shared->work_threads_count)
@@ -37,6 +43,9 @@ async_thread_count(void)
internal B32
async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params)
{
// rjf: choose ring
ASYNC_Ring *ring = &async_shared->rings[params->priority];
// rjf: build work package
ASYNC_Work work = {0};
work.work_function = work_function;
@@ -50,7 +59,7 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
// thread, and skip ring buffer if so.
B32 queued_in_ring_buffer = 0;
B32 need_to_execute_on_this_thread = 0;
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
OS_MutexScope(ring->ring_mutex) for(;;)
{
U64 num_available_work_threads = (async_shared->work_threads_count - ins_atomic_u64_eval(&async_shared->work_threads_live_count));
if(num_available_work_threads == 0 && async_work_thread_depth > 0)
@@ -58,8 +67,8 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
need_to_execute_on_this_thread = 1;
break;
}
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
U64 available_size = async_shared->u2w_ring_size - unconsumed_size;
U64 unconsumed_size = ring->ring_write_pos - ring->ring_read_pos;
U64 available_size = ring->ring_size - unconsumed_size;
if(available_size >= sizeof(work))
{
queued_in_ring_buffer = 1;
@@ -67,20 +76,21 @@ async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params
{
os_semaphore_take(params->semaphore, max_U64);
}
async_shared->u2w_ring_write_pos += ring_write_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_write_pos, &work);
ring->ring_write_pos += ring_write_struct(ring->ring_base, ring->ring_size, ring->ring_write_pos, &work);
break;
}
if(os_now_microseconds() >= params->endt_us)
{
break;
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, params->endt_us);
os_condition_variable_wait(ring->ring_cv, ring->ring_mutex, params->endt_us);
}
// rjf: broadcast ring buffer cv if we wrote successfully
if(queued_in_ring_buffer)
{
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
os_condition_variable_broadcast(ring->ring_cv);
os_condition_variable_broadcast(async_shared->ring_cv);
}
// rjf: if we did not queue successfully, and we have determined that
@@ -145,18 +155,38 @@ async_task_join(ASYNC_Task *task)
internal ASYNC_Work
async_pop_work(void)
{
ProfBeginFunction();
ASYNC_Work work = {0};
OS_MutexScope(async_shared->u2w_ring_mutex) for(;;)
B32 done = 0;
ASYNC_Priority taken_priority = ASYNC_Priority_Low;
OS_MutexScope(async_shared->ring_mutex) for(;!done;)
{
U64 unconsumed_size = async_shared->u2w_ring_write_pos - async_shared->u2w_ring_read_pos;
if(unconsumed_size >= sizeof(work))
for(ASYNC_Priority priority = ASYNC_Priority_High;; priority = (ASYNC_Priority)(priority - 1))
{
async_shared->u2w_ring_read_pos += ring_read_struct(async_shared->u2w_ring_base, async_shared->u2w_ring_size, async_shared->u2w_ring_read_pos, &work);
break;
ASYNC_Ring *ring = &async_shared->rings[priority];
OS_MutexScope(ring->ring_mutex)
{
U64 unconsumed_size = ring->ring_write_pos - ring->ring_read_pos;
if(unconsumed_size >= sizeof(work))
{
ring->ring_read_pos += ring_read_struct(ring->ring_base, ring->ring_size, ring->ring_read_pos, &work);
done = 1;
taken_priority = priority;
}
}
if(priority == ASYNC_Priority_Low)
{
break;
}
}
if(!done)
{
os_condition_variable_wait(async_shared->ring_cv, async_shared->ring_mutex, max_U64);
}
os_condition_variable_wait(async_shared->u2w_ring_cv, async_shared->u2w_ring_mutex, max_U64);
}
os_condition_variable_broadcast(async_shared->u2w_ring_cv);
os_condition_variable_broadcast(async_shared->ring_cv);
os_condition_variable_broadcast(async_shared->rings[taken_priority].ring_cv);
ProfEnd();
return work;
}
+25 -8
View File
@@ -14,6 +14,14 @@ typedef ASYNC_WORK_SIG(ASYNC_WorkFunctionType);
////////////////////////////////
//~ rjf: Work Types
typedef enum ASYNC_Priority
{
ASYNC_Priority_Low,
ASYNC_Priority_High,
ASYNC_Priority_COUNT
}
ASYNC_Priority;
typedef struct ASYNC_WorkParams ASYNC_WorkParams;
struct ASYNC_WorkParams
{
@@ -22,6 +30,7 @@ struct ASYNC_WorkParams
OS_Handle semaphore;
U64 *completion_counter;
U64 endt_us;
ASYNC_Priority priority;
};
typedef struct ASYNC_Work ASYNC_Work;
@@ -62,18 +71,26 @@ struct ASYNC_TaskList
////////////////////////////////
//~ rjf: Shared State Bundle
typedef struct ASYNC_Ring ASYNC_Ring;
struct ASYNC_Ring
{
U64 ring_size;
U8 *ring_base;
U64 ring_write_pos;
U64 ring_read_pos;
OS_Handle ring_mutex;
OS_Handle ring_cv;
};
typedef struct ASYNC_Shared ASYNC_Shared;
struct ASYNC_Shared
{
Arena *arena;
// rjf: user -> work thread ring buffer
U64 u2w_ring_size;
U8 *u2w_ring_base;
U64 u2w_ring_write_pos;
U64 u2w_ring_read_pos;
OS_Handle u2w_ring_mutex;
OS_Handle u2w_ring_cv;
// rjf: user -> work thread ring buffers
ASYNC_Ring rings[ASYNC_Priority_COUNT];
OS_Handle ring_mutex;
OS_Handle ring_cv;
// rjf: work threads
OS_Handle *work_threads;
@@ -102,7 +119,7 @@ internal U64 async_thread_count(void);
//~ rjf: Work Kickoffs
internal B32 async_push_work_(ASYNC_WorkFunctionType *work_function, ASYNC_WorkParams *params);
#define async_push_work(function, ...) async_push_work_((function), &(ASYNC_WorkParams){.endt_us = max_U64, __VA_ARGS__})
#define async_push_work(function, ...) async_push_work_((function), &(ASYNC_WorkParams){.endt_us = max_U64, .priority = ASYNC_Priority_High, __VA_ARGS__})
////////////////////////////////
//~ rjf: Task-Based Work Helper
+23 -35
View File
@@ -3074,12 +3074,11 @@ ctrl_u2c_push_msgs(CTRL_MsgList *msgs, U64 endt_us)
{
U64 unconsumed_size = (ctrl_state->u2c_ring_write_pos-ctrl_state->u2c_ring_read_pos);
U64 available_size = ctrl_state->u2c_ring_size-unconsumed_size;
if(available_size >= sizeof(U64) + msgs_srlzed_baked.size)
U64 needed_size = sizeof(msgs_srlzed_baked.size) + msgs_srlzed_baked.size;
if(available_size >= needed_size)
{
ctrl_state->u2c_ring_write_pos += ring_write_struct(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_write_pos, &msgs_srlzed_baked.size);
ctrl_state->u2c_ring_write_pos += ring_write(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_write_pos, msgs_srlzed_baked.str, msgs_srlzed_baked.size);
ctrl_state->u2c_ring_write_pos += 7;
ctrl_state->u2c_ring_write_pos -= ctrl_state->u2c_ring_write_pos%8;
good = 1;
break;
}
@@ -3112,8 +3111,6 @@ ctrl_u2c_pop_msgs(Arena *arena)
msgs_srlzed_baked.size = size_to_decode;
msgs_srlzed_baked.str = push_array_no_zero(scratch.arena, U8, msgs_srlzed_baked.size);
ctrl_state->u2c_ring_read_pos += ring_read(ctrl_state->u2c_ring_base, ctrl_state->u2c_ring_size, ctrl_state->u2c_ring_read_pos, msgs_srlzed_baked.str, size_to_decode);
ctrl_state->u2c_ring_read_pos += 7;
ctrl_state->u2c_ring_read_pos -= ctrl_state->u2c_ring_read_pos%8;
break;
}
os_condition_variable_wait(ctrl_state->u2c_ring_cv, ctrl_state->u2c_ring_mutex, max_U64);
@@ -3140,12 +3137,11 @@ ctrl_c2u_push_events(CTRL_EventList *events)
{
U64 unconsumed_size = (ctrl_state->c2u_ring_write_pos-ctrl_state->c2u_ring_read_pos);
U64 available_size = ctrl_state->c2u_ring_size-unconsumed_size;
if(available_size >= sizeof(U64) + event_srlzed.size)
U64 needed_size = sizeof(event_srlzed.size) + event_srlzed.size;
if(available_size >= needed_size)
{
ctrl_state->c2u_ring_write_pos += ring_write_struct(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_write_pos, &event_srlzed.size);
ctrl_state->c2u_ring_write_pos += ring_write(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_write_pos, event_srlzed.str, event_srlzed.size);
ctrl_state->c2u_ring_write_pos += 7;
ctrl_state->c2u_ring_write_pos -= ctrl_state->c2u_ring_write_pos%8;
break;
}
os_condition_variable_wait(ctrl_state->c2u_ring_cv, ctrl_state->c2u_ring_mutex, os_now_microseconds()+100);
@@ -3177,8 +3173,6 @@ ctrl_c2u_pop_events(Arena *arena)
event_srlzed.size = size_to_decode;
event_srlzed.str = push_array_no_zero(scratch.arena, U8, event_srlzed.size);
ctrl_state->c2u_ring_read_pos += ring_read(ctrl_state->c2u_ring_base, ctrl_state->c2u_ring_size, ctrl_state->c2u_ring_read_pos, event_srlzed.str, event_srlzed.size);
ctrl_state->c2u_ring_read_pos += 7;
ctrl_state->c2u_ring_read_pos -= ctrl_state->c2u_ring_read_pos%8;
CTRL_Event *new_event = ctrl_event_list_push(arena, &events);
*new_event = ctrl_event_from_serialized_string(arena, event_srlzed);
}
@@ -4191,22 +4185,21 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg,
// searched yet, but it has >4 child branches, meaning it looks like
// a project directory
//
DI_KeyList preemptively_loaded_keys = {0};
for(CTRL_DbgDirNode *dir_node = parent_dir_node; dir_node != 0; dir_node = dir_node->parent)
{
if(dir_node->search_count == 0 && (dir_node == parent_dir_node || dir_node->child_count >= 4))
{
Temp temp = temp_begin(scratch.arena);
//- rjf: form full path of this directory node
String8List dir_node_path_parts = {0};
for(CTRL_DbgDirNode *n = dir_node; n != 0; n = n->parent)
{
if(n->name.size != 0)
{
str8_list_push_front(temp.arena, &dir_node_path_parts, n->name);
str8_list_push_front(scratch.arena, &dir_node_path_parts, n->name);
}
}
String8 dir_node_path = str8_list_join(temp.arena, &dir_node_path_parts, &(StringJoin){.sep = str8_lit("/")});
String8 dir_node_path = str8_list_join(scratch.arena, &dir_node_path_parts, &(StringJoin){.sep = str8_lit("/")});
//- rjf: iterate downwards from this directory recursively, locate
// debug infos, and pre-emptively convert
@@ -4232,10 +4225,9 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg,
// kick off pre-emptive conversion, and gather key. if folders
// are encountered, then add them to the tree, and kick off a
// sub-search if needed.
DI_KeyList preemptively_loaded_keys = {0};
OS_FileIter *it = os_file_iter_begin(temp.arena, t->path, 0);
OS_FileIter *it = os_file_iter_begin(scratch.arena, t->path, 0);
U64 idx = 0;
for(OS_FileInfo info = {0}; idx < 64 && os_file_iter_next(temp.arena, it, &info); idx += 1)
for(OS_FileInfo info = {0}; idx < 16384 && os_file_iter_next(scratch.arena, it, &info); idx += 1)
{
// rjf: folder -> do sub-search if not duplicative
if(info.props.flags & FilePropertyFlag_IsFolder && task_count < 16384 && !str8_match(str8_prefix(info.name, 1), str8_lit("."), 0))
@@ -4259,9 +4251,9 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg,
}
if(existing_dir_child->search_count == 0)
{
Task *task = push_array(temp.arena, Task, 1);
Task *task = push_array(scratch.arena, Task, 1);
task->node = existing_dir_child;
task->path = push_str8f(temp.arena, "%S/%S", t->path, info.name);
task->path = push_str8f(scratch.arena, "%S/%S", t->path, info.name);
SLLQueuePush(first_task, last_task, task);
task_count += 1;
}
@@ -4273,30 +4265,26 @@ ctrl_thread__next_dmn_event(Arena *arena, DMN_CtrlCtx *ctrl_ctx, CTRL_Msg *msg,
str8_match(str8_skip_last_dot(info.name), debug_info_ext, StringMatchFlag_CaseInsensitive) &&
!str8_match(loaded_di_name, info.name, StringMatchFlag_CaseInsensitive))
{
DI_Key key = {push_str8f(temp.arena, "%S/%S", t->path, info.name), info.props.modified};
DI_Key key = {push_str8f(scratch.arena, "%S/%S", t->path, info.name), info.props.modified};
di_open(&key);
di_key_list_push(temp.arena, &preemptively_loaded_keys, &key);
di_key_list_push(scratch.arena, &preemptively_loaded_keys, &key);
}
}
os_file_iter_end(it);
// rjf: for each pre-emptively loaded key, wait for the initial
// load task to be done
for(DI_KeyNode *n = preemptively_loaded_keys.first; n != 0; n = n->next)
{
DI_Scope *di_scope = di_scope_open();
RDI_Parsed *rdi = di_rdi_from_key(di_scope, &n->v, max_U64);
di_scope_close(di_scope);
di_close(&n->v);
}
ProfEnd();
}
temp_end(temp);
}
}
//- rjf: for each pre-emptively loaded key, wait for the initial
// load task to be done
for(DI_KeyNode *n = preemptively_loaded_keys.first; n != 0; n = n->next)
{
DI_Scope *di_scope = di_scope_open();
RDI_Parsed *rdi = di_rdi_from_key(di_scope, &n->v, max_U64);
di_scope_close(di_scope);
di_close(&n->v);
}
}
}
-4
View File
@@ -457,8 +457,6 @@ dasm_u2p_enqueue_req(U128 hash, DASM_Params *params, U64 endt_us)
dasm_shared->u2p_ring_write_pos += ring_write_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, &params->dbgi_key.path.size);
dasm_shared->u2p_ring_write_pos += ring_write(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, params->dbgi_key.path.str, params->dbgi_key.path.size);
dasm_shared->u2p_ring_write_pos += ring_write_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_write_pos, &params->dbgi_key.min_timestamp);
dasm_shared->u2p_ring_write_pos += 7;
dasm_shared->u2p_ring_write_pos -= dasm_shared->u2p_ring_write_pos%8;
break;
}
if(os_now_microseconds() >= endt_us)
@@ -492,8 +490,6 @@ dasm_u2p_dequeue_req(Arena *arena, U128 *hash_out, DASM_Params *params_out)
params_out->dbgi_key.path.str = push_array(arena, U8, params_out->dbgi_key.path.size);
dasm_shared->u2p_ring_read_pos += ring_read(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_read_pos, params_out->dbgi_key.path.str, params_out->dbgi_key.path.size);
dasm_shared->u2p_ring_read_pos += ring_read_struct(dasm_shared->u2p_ring_base, dasm_shared->u2p_ring_size, dasm_shared->u2p_ring_read_pos, &params_out->dbgi_key.min_timestamp);
dasm_shared->u2p_ring_read_pos += 7;
dasm_shared->u2p_ring_read_pos -= dasm_shared->u2p_ring_read_pos%8;
break;
}
os_condition_variable_wait(dasm_shared->u2p_ring_cv, dasm_shared->u2p_ring_mutex, max_U64);
+11 -22
View File
@@ -728,13 +728,12 @@ di_u2p_enqueue_key(DI_Key *key, U64 endt_us)
{
U64 unconsumed_size = di_shared->u2p_ring_write_pos - di_shared->u2p_ring_read_pos;
U64 available_size = di_shared->u2p_ring_size - unconsumed_size;
if(available_size >= sizeof(key->path.size) + key->path.size + sizeof(key->min_timestamp))
U64 needed_size = sizeof(key->min_timestamp) + sizeof(key->path.size) + key->path.size;
if(available_size >= needed_size)
{
di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->min_timestamp);
di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->path.size);
di_shared->u2p_ring_write_pos += ring_write(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, key->path.str, key->path.size);
di_shared->u2p_ring_write_pos += ring_write_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_write_pos, &key->min_timestamp);
di_shared->u2p_ring_write_pos += 7;
di_shared->u2p_ring_write_pos -= di_shared->u2p_ring_write_pos%8;
sent = 1;
break;
}
@@ -759,12 +758,10 @@ di_u2p_dequeue_key(Arena *arena, DI_Key *out_key)
U64 unconsumed_size = di_shared->u2p_ring_write_pos - di_shared->u2p_ring_read_pos;
if(unconsumed_size >= sizeof(out_key->path.size) + sizeof(out_key->min_timestamp))
{
di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->min_timestamp);
di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->path.size);
out_key->path.str = push_array(arena, U8, out_key->path.size);
di_shared->u2p_ring_read_pos += ring_read(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, out_key->path.str, out_key->path.size);
di_shared->u2p_ring_read_pos += ring_read_struct(di_shared->u2p_ring_base, di_shared->u2p_ring_size, di_shared->u2p_ring_read_pos, &out_key->min_timestamp);
di_shared->u2p_ring_read_pos += 7;
di_shared->u2p_ring_read_pos -= di_shared->u2p_ring_read_pos%8;
break;
}
os_condition_variable_wait(di_shared->u2p_ring_cv, di_shared->u2p_ring_mutex, max_U64);
@@ -779,14 +776,12 @@ di_p2u_push_event(DI_Event *event)
{
U64 unconsumed_size = (di_shared->p2u_ring_write_pos-di_shared->p2u_ring_read_pos);
U64 available_size = di_shared->p2u_ring_size-unconsumed_size;
U64 needed_size = sizeof(DI_EventKind) + sizeof(U64) + event->string.size;
U64 needed_size = sizeof(event->kind) + sizeof(event->string.size) + event->string.size;
if(available_size >= needed_size)
{
di_shared->p2u_ring_write_pos += ring_write_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, &event->kind);
di_shared->p2u_ring_write_pos += ring_write_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, &event->string.size);
di_shared->p2u_ring_write_pos += ring_write(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_write_pos, event->string.str, event->string.size);
di_shared->p2u_ring_write_pos += 7;
di_shared->p2u_ring_write_pos -= di_shared->p2u_ring_write_pos%8;
break;
}
os_condition_variable_wait(di_shared->p2u_ring_cv, di_shared->p2u_ring_mutex, max_U64);
@@ -810,8 +805,6 @@ di_p2u_pop_events(Arena *arena, U64 endt_us)
di_shared->p2u_ring_read_pos += ring_read_struct(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_read_pos, &n->v.string.size);
n->v.string.str = push_array_no_zero(arena, U8, n->v.string.size);
di_shared->p2u_ring_read_pos += ring_read(di_shared->p2u_ring_base, di_shared->p2u_ring_size, di_shared->p2u_ring_read_pos, n->v.string.str, n->v.string.size);
di_shared->p2u_ring_read_pos += 7;
di_shared->p2u_ring_read_pos -= di_shared->p2u_ring_read_pos%8;
}
else if(os_now_microseconds() >= endt_us)
{
@@ -1618,24 +1611,21 @@ di_match_store_section_kind_from_name(DI_MatchStore *store, String8 name, U64 en
DLLInsert_NP(store->first_lru_match_name, store->last_lru_match_name, (DI_MatchNameNode *)0, node, lru_next, lru_prev);
// rjf: if this node is new w.r.t. the store's current parameters, request it
if(node->req_params_hash != store->params_hash)
U64 completed_params_hash = ins_atomic_u64_eval(&node->cmp_params_hash);
if(completed_params_hash != store->params_hash && node->req_count == ins_atomic_u64_eval(&node->cmp_count))
{
B32 sent = 0;
OS_MutexScope(store->u2m_ring_mutex) for(;;)
{
U64 unconsumed_size = store->u2m_ring_write_pos - store->u2m_ring_read_pos;
U64 available_size = store->u2m_ring_size - unconsumed_size;
U64 needed_size = sizeof(&node) + sizeof(U64) + sizeof(U64) + name.size;
needed_size += 7;
needed_size -= needed_size%8;
U64 needed_size = sizeof(&node) + sizeof(node->alloc_gen) + sizeof(name.size) + name.size;
if(available_size >= needed_size)
{
store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &node);
store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &node->alloc_gen);
store->u2m_ring_write_pos += ring_write_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, &name.size);
store->u2m_ring_write_pos += ring_write(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_write_pos, name.str, name.size);
store->u2m_ring_write_pos += 7;
store->u2m_ring_write_pos -= store->u2m_ring_write_pos%8;
sent = 1;
break;
}
@@ -1648,13 +1638,14 @@ di_match_store_section_kind_from_name(DI_MatchStore *store, String8 name, U64 en
if(sent)
{
os_condition_variable_broadcast(store->u2m_ring_cv);
async_push_work(di_match_work, .input = store);
async_push_work(di_match_work, .input = store, .priority = ASYNC_Priority_Low, .completion_counter = &node->cmp_count);
node->req_params_hash = store->params_hash;
node->req_count += 1;
}
}
// rjf: if this node's state is stale, wait for it if we need to
if(os_now_microseconds() < endt_us && node->req_params_hash != ins_atomic_u64_eval(&node->cmp_params_hash))
if(os_now_microseconds() < endt_us && node->req_params_hash != completed_params_hash)
{
OS_MutexScopeR(store->match_rw_mutex) for(;;)
{
@@ -1696,8 +1687,6 @@ ASYNC_WORK_DEF(di_match_work)
store->u2m_ring_read_pos += ring_read_struct(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_read_pos, &name.size);
name.str = push_array(scratch.arena, U8, name.size);
store->u2m_ring_read_pos += ring_read(store->u2m_ring_base, store->u2m_ring_size, store->u2m_ring_read_pos, name.str, name.size);
store->u2m_ring_read_pos += 7;
store->u2m_ring_read_pos -= store->u2m_ring_read_pos%8;
break;
}
os_condition_variable_wait(store->u2m_ring_cv, store->u2m_ring_mutex, max_U64);
+2
View File
@@ -278,10 +278,12 @@ struct DI_MatchNameNode
U64 first_gen_touched;
U64 last_gen_touched;
U64 req_params_hash;
U64 req_count;
String8 name;
U64 hash;
// rjf: atomically written by match work
U64 cmp_count;
U64 cmp_params_hash;
RDI_SectionKind section_kind;
// DI_Match *first_match;
+4 -7
View File
@@ -248,15 +248,14 @@ fs_u2s_enqueue_req(Rng1U64 range, String8 path, U64 endt_us)
{
U64 unconsumed_size = fs_shared->u2s_ring_write_pos - fs_shared->u2s_ring_read_pos;
U64 available_size = fs_shared->u2s_ring_size - unconsumed_size;
if(available_size >= sizeof(U64) + path.size)
U64 needed_size = sizeof(range.min) + sizeof(range.max) + sizeof(path.size) + path.size;
if(available_size >= needed_size)
{
result = 1;
fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &range.min);
fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &range.max);
fs_shared->u2s_ring_write_pos += ring_write_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, &path.size);
fs_shared->u2s_ring_write_pos += ring_write(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_write_pos, path.str, path.size);
fs_shared->u2s_ring_write_pos += 7;
fs_shared->u2s_ring_write_pos -= fs_shared->u2s_ring_write_pos%8;
break;
}
os_condition_variable_wait(fs_shared->u2s_ring_cv, fs_shared->u2s_ring_mutex, endt_us);
@@ -281,8 +280,6 @@ fs_u2s_dequeue_req(Arena *arena, Rng1U64 *range_out, String8 *path_out)
fs_shared->u2s_ring_read_pos += ring_read_struct(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_read_pos, &path_out->size);
path_out->str = push_array(arena, U8, path_out->size);
fs_shared->u2s_ring_read_pos += ring_read(fs_shared->u2s_ring_base, fs_shared->u2s_ring_size, fs_shared->u2s_ring_read_pos, path_out->str, path_out->size);
fs_shared->u2s_ring_read_pos += 7;
fs_shared->u2s_ring_read_pos -= fs_shared->u2s_ring_read_pos%8;
break;
}
os_condition_variable_wait(fs_shared->u2s_ring_cv, fs_shared->u2s_ring_mutex, max_U64);
@@ -420,9 +417,9 @@ fs_detector_thread__entry_point(void *p)
range_n = range_n->next)
{
if(ins_atomic_u64_eval(&range_n->request_count) == ins_atomic_u64_eval(&range_n->completion_count) &&
async_push_work(fs_stream_work, .endt_us = os_now_microseconds()+100000, .completion_counter = &range_n->completion_count))
fs_u2s_enqueue_req(range_n->range, n->path, os_now_microseconds()+100000))
{
fs_u2s_enqueue_req(range_n->range, n->path, max_U64);
async_push_work(fs_stream_work, .completion_counter = &range_n->completion_count);
ins_atomic_u64_inc_eval(&range_n->request_count);
}
}
+2 -5
View File
@@ -52,14 +52,13 @@ mtx_enqueue_op(MTX_MutThread *thread, U128 buffer_key, MTX_Op op)
{
U64 unconsumed_size = thread->ring_write_pos - thread->ring_read_pos;
U64 available_size = thread->ring_size - unconsumed_size;
if(available_size >= sizeof(buffer_key) + sizeof(op.range) + sizeof(op.replace.size) + op.replace.size)
U64 needed_size = sizeof(buffer_key) + sizeof(op.range) + sizeof(op.replace.size) + op.replace.size;
if(available_size >= needed_size)
{
thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &buffer_key);
thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &op.range);
thread->ring_write_pos += ring_write_struct(thread->ring_base, thread->ring_size, thread->ring_write_pos, &op.replace.size);
thread->ring_write_pos += ring_write(thread->ring_base, thread->ring_size, thread->ring_write_pos, op.replace.str, op.replace.size);
thread->ring_write_pos += 7;
thread->ring_write_pos -= thread->ring_write_pos%8;
break;
}
os_condition_variable_wait(thread->cv, thread->mutex, max_U64);
@@ -80,8 +79,6 @@ mtx_dequeue_op(Arena *arena, MTX_MutThread *thread, U128 *buffer_key_out, MTX_Op
thread->ring_read_pos += ring_read_struct(thread->ring_base, thread->ring_size, thread->ring_read_pos, &op_out->replace.size);
op_out->replace.str = push_array_no_zero(arena, U8, op_out->replace.size);
thread->ring_read_pos += ring_read(thread->ring_base, thread->ring_size, thread->ring_read_pos, op_out->replace.str, op_out->replace.size);
thread->ring_read_pos += 7;
thread->ring_read_pos -= thread->ring_read_pos%8;
break;
}
os_condition_variable_wait(thread->cv, thread->mutex, max_U64);
+1 -1
View File
@@ -16582,7 +16582,7 @@ rd_frame(void)
//
if(ProfIsCapturing())
{
rd_request_frame();
// rd_request_frame();
}
//////////////////////////////
+1
View File
@@ -2134,6 +2134,7 @@ txt_u2p_dequeue_req(U128 *hash_out, TXT_LangKind *lang_out)
ASYNC_WORK_DEF(txt_parse_work)
{
ProfBeginFunction();
//- rjf: get next key
U128 hash = {0};
TXT_LangKind lang = TXT_LangKind_Null;