diff --git a/src/geo_cache/geo_cache.c b/src/geo_cache/geo_cache.c index 502b5b98..81cd10a7 100644 --- a/src/geo_cache/geo_cache.c +++ b/src/geo_cache/geo_cache.c @@ -25,12 +25,6 @@ geo_init(void) geo_shared->u2x_ring_base = push_array_no_zero(arena, U8, geo_shared->u2x_ring_size); geo_shared->u2x_ring_cv = os_condition_variable_alloc(); geo_shared->u2x_ring_mutex = os_mutex_alloc(); - geo_shared->xfer_thread_count = Clamp(1, os_get_system_info()->logical_processor_count-1, 4); - geo_shared->xfer_threads = push_array(arena, OS_Handle, geo_shared->xfer_thread_count); - for(U64 idx = 0; idx < geo_shared->xfer_thread_count; idx += 1) - { - geo_shared->xfer_threads[idx] = os_thread_launch(geo_xfer_thread__entry_point, (void *)idx, 0); - } geo_shared->evictor_thread = os_thread_launch(geo_evictor_thread__entry_point, 0, 0); } @@ -192,6 +186,7 @@ geo_buffer_from_hash(GEO_Scope *scope, U128 hash) if(node_is_new) { geo_u2x_enqueue_req(hash, max_U64); + async_push_work(geo_xfer_work); } } return handle; @@ -259,68 +254,65 @@ geo_u2x_dequeue_req(U128 *hash_out) os_condition_variable_broadcast(geo_shared->u2x_ring_cv); } -internal void -geo_xfer_thread__entry_point(void *p) +ASYNC_WORK_DEF(geo_xfer_work) { - for(;;) + HS_Scope *scope = hs_scope_open(); + + //- rjf: decode + U128 hash = {0}; + geo_u2x_dequeue_req(&hash); + + //- rjf: unpack hash + U64 slot_idx = hash.u64[1]%geo_shared->slots_count; + U64 stripe_idx = slot_idx%geo_shared->stripes_count; + GEO_Slot *slot = &geo_shared->slots[slot_idx]; + GEO_Stripe *stripe = &geo_shared->stripes[stripe_idx]; + + //- rjf: take task + B32 got_task = 0; + OS_MutexScopeR(stripe->rw_mutex) { - HS_Scope *scope = hs_scope_open(); - - //- rjf: decode - U128 hash = {0}; - geo_u2x_dequeue_req(&hash); - - //- rjf: unpack hash - U64 slot_idx = hash.u64[1]%geo_shared->slots_count; - U64 stripe_idx = slot_idx%geo_shared->stripes_count; - GEO_Slot *slot = &geo_shared->slots[slot_idx]; - GEO_Stripe *stripe = &geo_shared->stripes[stripe_idx]; - - //- rjf: take task - B32 got_task = 0; - OS_MutexScopeR(stripe->rw_mutex) + for(GEO_Node *n = slot->first; n != 0; n = n->next) { - for(GEO_Node *n = slot->first; n != 0; n = n->next) + if(u128_match(n->hash, hash)) { - if(u128_match(n->hash, hash)) - { - got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0); - break; - } + got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0); + break; } } - - //- rjf: hash -> data - String8 data = {0}; - if(got_task) - { - data = hs_data_from_hash(scope, hash); - } - - //- rjf: data -> buffer - R_Handle buffer = {0}; - if(got_task && data.size != 0) - { - buffer = r_buffer_alloc(R_ResourceKind_Static, data.size, data.str); - } - - //- rjf: commit results to cache - if(got_task) OS_MutexScopeW(stripe->rw_mutex) - { - for(GEO_Node *n = slot->first; n != 0; n = n->next) - { - if(u128_match(n->hash, hash)) - { - n->buffer = buffer; - ins_atomic_u32_eval_assign(&n->is_working, 0); - ins_atomic_u64_inc_eval(&n->load_count); - break; - } - } - } - - hs_scope_close(scope); } + + //- rjf: hash -> data + String8 data = {0}; + if(got_task) + { + data = hs_data_from_hash(scope, hash); + } + + //- rjf: data -> buffer + R_Handle buffer = {0}; + if(got_task && data.size != 0) + { + buffer = r_buffer_alloc(R_ResourceKind_Static, data.size, data.str); + } + + //- rjf: commit results to cache + if(got_task) OS_MutexScopeW(stripe->rw_mutex) + { + for(GEO_Node *n = slot->first; n != 0; n = n->next) + { + if(u128_match(n->hash, hash)) + { + n->buffer = buffer; + ins_atomic_u32_eval_assign(&n->is_working, 0); + ins_atomic_u64_inc_eval(&n->load_count); + break; + } + } + } + + hs_scope_close(scope); + return 0; } //////////////////////////////// @@ -329,6 +321,7 @@ geo_xfer_thread__entry_point(void *p) internal void geo_evictor_thread__entry_point(void *p) { + ThreadNameF("[geo] evictor thread"); for(;;) { U64 check_time_us = os_now_microseconds(); diff --git a/src/geo_cache/geo_cache.h b/src/geo_cache/geo_cache.h index 62fc29fb..a86c86cf 100644 --- a/src/geo_cache/geo_cache.h +++ b/src/geo_cache/geo_cache.h @@ -90,10 +90,6 @@ struct GEO_Shared OS_Handle u2x_ring_cv; OS_Handle u2x_ring_mutex; - // rjf: transfer threads - U64 xfer_thread_count; - OS_Handle *xfer_threads; - // rjf: evictor thread OS_Handle evictor_thread; }; @@ -138,7 +134,7 @@ internal R_Handle geo_buffer_from_key(GEO_Scope *scope, U128 key); internal B32 geo_u2x_enqueue_req(U128 hash, U64 endt_us); internal void geo_u2x_dequeue_req(U128 *hash_out); -internal void geo_xfer_thread__entry_point(void *p); +ASYNC_WORK_DEF(geo_xfer_work); //////////////////////////////// //~ rjf: Evictor Threads