eliminate txt layer parsing threads; use async layer

This commit is contained in:
Ryan Fleury
2024-11-03 11:12:24 -08:00
parent 61e7aaadeb
commit c2855a9a46
2 changed files with 160 additions and 172 deletions
+159 -167
View File
@@ -1611,12 +1611,6 @@ txt_init(void)
txt_shared->u2p_ring_base = push_array_no_zero(arena, U8, txt_shared->u2p_ring_size);
txt_shared->u2p_ring_cv = os_condition_variable_alloc();
txt_shared->u2p_ring_mutex = os_mutex_alloc();
txt_shared->parse_thread_count = Clamp(1, os_get_system_info()->logical_processor_count-1, 4);
txt_shared->parse_threads = push_array(arena, OS_Handle, txt_shared->parse_thread_count);
for(U64 idx = 0; idx < txt_shared->parse_thread_count; idx += 1)
{
txt_shared->parse_threads[idx] = os_thread_launch(txt_parse_thread__entry_point, (void *)idx, 0);
}
txt_shared->evictor_thread = os_thread_launch(txt_evictor_thread__entry_point, 0, 0);
}
@@ -1782,6 +1776,7 @@ txt_text_info_from_hash_lang(TXT_Scope *scope, U128 hash, TXT_LangKind lang)
if(node_is_new)
{
txt_u2p_enqueue_req(hash, lang, max_U64);
async_push_work(txt_parse_work);
}
}
return info;
@@ -2151,202 +2146,199 @@ txt_u2p_dequeue_req(U128 *hash_out, TXT_LangKind *lang_out)
os_condition_variable_broadcast(txt_shared->u2p_ring_cv);
}
internal void
txt_parse_thread__entry_point(void *p)
ASYNC_WORK_DEF(txt_parse_work)
{
for(;;)
//- rjf: get next key
U128 hash = {0};
TXT_LangKind lang = TXT_LangKind_Null;
txt_u2p_dequeue_req(&hash, &lang);
HS_Scope *scope = hs_scope_open();
//- rjf: unpack hash
U64 slot_idx = hash.u64[1]%txt_shared->slots_count;
U64 stripe_idx = slot_idx%txt_shared->stripes_count;
TXT_Slot *slot = &txt_shared->slots[slot_idx];
TXT_Stripe *stripe = &txt_shared->stripes[stripe_idx];
//- rjf: take task
B32 got_task = 0;
OS_MutexScopeR(stripe->rw_mutex)
{
//- rjf: get next key
U128 hash = {0};
TXT_LangKind lang = TXT_LangKind_Null;
txt_u2p_dequeue_req(&hash, &lang);
HS_Scope *scope = hs_scope_open();
for(TXT_Node *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash) && n->lang == lang)
{
got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0);
break;
}
}
}
//- rjf: hash -> data
String8 data = {0};
if(got_task)
{
data = hs_data_from_hash(scope, hash);
}
//- rjf: data -> text info
Arena *info_arena = 0;
TXT_TextInfo info = {0};
if(got_task && !u128_match(hash, u128_zero()))
{
info_arena = arena_alloc();
//- rjf: unpack hash
U64 slot_idx = hash.u64[1]%txt_shared->slots_count;
U64 stripe_idx = slot_idx%txt_shared->stripes_count;
TXT_Slot *slot = &txt_shared->slots[slot_idx];
TXT_Stripe *stripe = &txt_shared->stripes[stripe_idx];
//- rjf: take task
B32 got_task = 0;
//- rjf: grab pointers to working counters
U64 *bytes_processed_ptr = 0;
U64 *bytes_to_process_ptr = 0;
OS_MutexScopeR(stripe->rw_mutex)
{
for(TXT_Node *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash) && n->lang == lang)
{
got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0);
break;
bytes_processed_ptr = &n->info.bytes_processed;
bytes_to_process_ptr = &n->info.bytes_to_process;
}
}
}
//- rjf: hash -> data
String8 data = {0};
if(got_task)
//- rjf: set # of bytes to process
if(bytes_to_process_ptr)
{
data = hs_data_from_hash(scope, hash);
// (line ending calc) (line counting) (line measuring) (lexing)
ins_atomic_u64_eval_assign(bytes_to_process_ptr, Min(data.size, 1024) + data.size + data.size + data.size*(lang != TXT_LangKind_Null));
}
//- rjf: data -> text info
Arena *info_arena = 0;
TXT_TextInfo info = {0};
if(got_task && !u128_match(hash, u128_zero()))
//- rjf: detect line end kind
TXT_LineEndKind line_end_kind = TXT_LineEndKind_Null;
{
info_arena = arena_alloc();
//- rjf: grab pointers to working counters
U64 *bytes_processed_ptr = 0;
U64 *bytes_to_process_ptr = 0;
OS_MutexScopeR(stripe->rw_mutex)
U64 lf_count = 0;
U64 cr_count = 0;
for(U64 idx = 0; idx < data.size && idx < 1024; idx += 1)
{
for(TXT_Node *n = slot->first; n != 0; n = n->next)
if(data.str[idx] == '\r')
{
if(u128_match(n->hash, hash) && n->lang == lang)
{
bytes_processed_ptr = &n->info.bytes_processed;
bytes_to_process_ptr = &n->info.bytes_to_process;
}
cr_count += 1;
}
if(data.str[idx] == '\n')
{
lf_count += 1;
}
}
//- rjf: set # of bytes to process
if(bytes_to_process_ptr)
if(cr_count >= lf_count/2 && lf_count >= 1)
{
// (line ending calc) (line counting) (line measuring) (lexing)
ins_atomic_u64_eval_assign(bytes_to_process_ptr, Min(data.size, 1024) + data.size + data.size + data.size*(lang != TXT_LangKind_Null));
line_end_kind = TXT_LineEndKind_CRLF;
}
//- rjf: detect line end kind
TXT_LineEndKind line_end_kind = TXT_LineEndKind_Null;
else if(lf_count >= 1)
{
U64 lf_count = 0;
U64 cr_count = 0;
for(U64 idx = 0; idx < data.size && idx < 1024; idx += 1)
{
if(data.str[idx] == '\r')
{
cr_count += 1;
}
if(data.str[idx] == '\n')
{
lf_count += 1;
}
}
if(cr_count >= lf_count/2 && lf_count >= 1)
{
line_end_kind = TXT_LineEndKind_CRLF;
}
else if(lf_count >= 1)
{
line_end_kind = TXT_LineEndKind_LF;
}
info.line_end_kind = line_end_kind;
}
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024));
}
//- rjf: count # of lines
U64 line_count = 1;
U64 byte_process_start_idx = 0;
for(U64 idx = 0; idx < data.size; idx += 1)
{
if(data.str[idx] == '\n' || data.str[idx] == '\r')
{
line_count += 1;
if(data.str[idx] == '\r')
{
idx += 1;
}
}
if(idx && idx%1000 == 0)
{
ins_atomic_u64_add_eval(bytes_processed_ptr, 1000);
}
}
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size);
}
//- rjf: allocate & store line ranges
info.lines_count = line_count;
info.lines_ranges = push_array_no_zero(info_arena, Rng1U64, info.lines_count);
U64 line_idx = 0;
U64 line_start_idx = 0;
for(U64 idx = 0; idx <= data.size; idx += 1)
{
if(idx == data.size || data.str[idx] == '\n' || data.str[idx] == '\r')
{
Rng1U64 line_range = r1u64(line_start_idx, idx);
U64 line_size = dim_1u64(line_range);
info.lines_ranges[line_idx] = line_range;
info.lines_max_size = Max(info.lines_max_size, line_size);
line_idx += 1;
line_start_idx = idx+1;
if(idx < data.size && data.str[idx] == '\r')
{
line_start_idx += 1;
idx += 1;
}
}
if(idx && idx%1000 == 0)
{
ins_atomic_u64_add_eval(bytes_processed_ptr, 1000);
}
}
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size + data.size);
}
//- rjf: lang -> lex function
TXT_LangLexFunctionType *lex_function = txt_lex_function_from_lang_kind(lang);
//- rjf: lex function * data -> tokens
TXT_TokenArray tokens = {0};
if(lex_function != 0)
{
tokens = lex_function(info_arena, bytes_processed_ptr, data);
}
info.tokens = tokens;
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size + data.size + data.size*(lex_function != 0));
line_end_kind = TXT_LineEndKind_LF;
}
info.line_end_kind = line_end_kind;
}
//- rjf: commit results to cache
if(got_task) OS_MutexScopeW(stripe->rw_mutex)
//- rjf: bump progress
if(bytes_processed_ptr)
{
for(TXT_Node *n = slot->first; n != 0; n = n->next)
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024));
}
//- rjf: count # of lines
U64 line_count = 1;
U64 byte_process_start_idx = 0;
for(U64 idx = 0; idx < data.size; idx += 1)
{
if(data.str[idx] == '\n' || data.str[idx] == '\r')
{
if(u128_match(n->hash, hash) && n->lang == lang)
line_count += 1;
if(data.str[idx] == '\r')
{
n->arena = info_arena;
info.bytes_processed = n->info.bytes_processed;
info.bytes_to_process = n->info.bytes_to_process;
MemoryCopyStruct(&n->info, &info);
ins_atomic_u32_eval_assign(&n->is_working, 0);
ins_atomic_u64_inc_eval(&n->load_count);
break;
idx += 1;
}
}
if(idx && idx%1000 == 0)
{
ins_atomic_u64_add_eval(bytes_processed_ptr, 1000);
}
}
hs_scope_close(scope);
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size);
}
//- rjf: allocate & store line ranges
info.lines_count = line_count;
info.lines_ranges = push_array_no_zero(info_arena, Rng1U64, info.lines_count);
U64 line_idx = 0;
U64 line_start_idx = 0;
for(U64 idx = 0; idx <= data.size; idx += 1)
{
if(idx == data.size || data.str[idx] == '\n' || data.str[idx] == '\r')
{
Rng1U64 line_range = r1u64(line_start_idx, idx);
U64 line_size = dim_1u64(line_range);
info.lines_ranges[line_idx] = line_range;
info.lines_max_size = Max(info.lines_max_size, line_size);
line_idx += 1;
line_start_idx = idx+1;
if(idx < data.size && data.str[idx] == '\r')
{
line_start_idx += 1;
idx += 1;
}
}
if(idx && idx%1000 == 0)
{
ins_atomic_u64_add_eval(bytes_processed_ptr, 1000);
}
}
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size + data.size);
}
//- rjf: lang -> lex function
TXT_LangLexFunctionType *lex_function = txt_lex_function_from_lang_kind(lang);
//- rjf: lex function * data -> tokens
TXT_TokenArray tokens = {0};
if(lex_function != 0)
{
tokens = lex_function(info_arena, bytes_processed_ptr, data);
}
info.tokens = tokens;
//- rjf: bump progress
if(bytes_processed_ptr)
{
ins_atomic_u64_eval_assign(bytes_processed_ptr, Min(data.size, 1024) + data.size + data.size + data.size*(lex_function != 0));
}
}
//- rjf: commit results to cache
if(got_task) OS_MutexScopeW(stripe->rw_mutex)
{
for(TXT_Node *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash) && n->lang == lang)
{
n->arena = info_arena;
info.bytes_processed = n->info.bytes_processed;
info.bytes_to_process = n->info.bytes_to_process;
MemoryCopyStruct(&n->info, &info);
ins_atomic_u32_eval_assign(&n->is_working, 0);
ins_atomic_u64_inc_eval(&n->load_count);
break;
}
}
}
hs_scope_close(scope);
return 0;
}
////////////////////////////////