[geo] move to async layer

This commit is contained in:
Ryan Fleury
2024-11-03 12:17:18 -08:00
parent ffb21277e9
commit 9ce0bc5dbf
2 changed files with 55 additions and 66 deletions
+54 -61
View File
@@ -25,12 +25,6 @@ geo_init(void)
geo_shared->u2x_ring_base = push_array_no_zero(arena, U8, geo_shared->u2x_ring_size);
geo_shared->u2x_ring_cv = os_condition_variable_alloc();
geo_shared->u2x_ring_mutex = os_mutex_alloc();
geo_shared->xfer_thread_count = Clamp(1, os_get_system_info()->logical_processor_count-1, 4);
geo_shared->xfer_threads = push_array(arena, OS_Handle, geo_shared->xfer_thread_count);
for(U64 idx = 0; idx < geo_shared->xfer_thread_count; idx += 1)
{
geo_shared->xfer_threads[idx] = os_thread_launch(geo_xfer_thread__entry_point, (void *)idx, 0);
}
geo_shared->evictor_thread = os_thread_launch(geo_evictor_thread__entry_point, 0, 0);
}
@@ -192,6 +186,7 @@ geo_buffer_from_hash(GEO_Scope *scope, U128 hash)
if(node_is_new)
{
geo_u2x_enqueue_req(hash, max_U64);
async_push_work(geo_xfer_work);
}
}
return handle;
@@ -259,68 +254,65 @@ geo_u2x_dequeue_req(U128 *hash_out)
os_condition_variable_broadcast(geo_shared->u2x_ring_cv);
}
internal void
geo_xfer_thread__entry_point(void *p)
ASYNC_WORK_DEF(geo_xfer_work)
{
for(;;)
HS_Scope *scope = hs_scope_open();
//- rjf: decode
U128 hash = {0};
geo_u2x_dequeue_req(&hash);
//- rjf: unpack hash
U64 slot_idx = hash.u64[1]%geo_shared->slots_count;
U64 stripe_idx = slot_idx%geo_shared->stripes_count;
GEO_Slot *slot = &geo_shared->slots[slot_idx];
GEO_Stripe *stripe = &geo_shared->stripes[stripe_idx];
//- rjf: take task
B32 got_task = 0;
OS_MutexScopeR(stripe->rw_mutex)
{
HS_Scope *scope = hs_scope_open();
//- rjf: decode
U128 hash = {0};
geo_u2x_dequeue_req(&hash);
//- rjf: unpack hash
U64 slot_idx = hash.u64[1]%geo_shared->slots_count;
U64 stripe_idx = slot_idx%geo_shared->stripes_count;
GEO_Slot *slot = &geo_shared->slots[slot_idx];
GEO_Stripe *stripe = &geo_shared->stripes[stripe_idx];
//- rjf: take task
B32 got_task = 0;
OS_MutexScopeR(stripe->rw_mutex)
for(GEO_Node *n = slot->first; n != 0; n = n->next)
{
for(GEO_Node *n = slot->first; n != 0; n = n->next)
if(u128_match(n->hash, hash))
{
if(u128_match(n->hash, hash))
{
got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0);
break;
}
got_task = !ins_atomic_u32_eval_cond_assign(&n->is_working, 1, 0);
break;
}
}
//- rjf: hash -> data
String8 data = {0};
if(got_task)
{
data = hs_data_from_hash(scope, hash);
}
//- rjf: data -> buffer
R_Handle buffer = {0};
if(got_task && data.size != 0)
{
buffer = r_buffer_alloc(R_ResourceKind_Static, data.size, data.str);
}
//- rjf: commit results to cache
if(got_task) OS_MutexScopeW(stripe->rw_mutex)
{
for(GEO_Node *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash))
{
n->buffer = buffer;
ins_atomic_u32_eval_assign(&n->is_working, 0);
ins_atomic_u64_inc_eval(&n->load_count);
break;
}
}
}
hs_scope_close(scope);
}
//- rjf: hash -> data
String8 data = {0};
if(got_task)
{
data = hs_data_from_hash(scope, hash);
}
//- rjf: data -> buffer
R_Handle buffer = {0};
if(got_task && data.size != 0)
{
buffer = r_buffer_alloc(R_ResourceKind_Static, data.size, data.str);
}
//- rjf: commit results to cache
if(got_task) OS_MutexScopeW(stripe->rw_mutex)
{
for(GEO_Node *n = slot->first; n != 0; n = n->next)
{
if(u128_match(n->hash, hash))
{
n->buffer = buffer;
ins_atomic_u32_eval_assign(&n->is_working, 0);
ins_atomic_u64_inc_eval(&n->load_count);
break;
}
}
}
hs_scope_close(scope);
return 0;
}
////////////////////////////////
@@ -329,6 +321,7 @@ geo_xfer_thread__entry_point(void *p)
internal void
geo_evictor_thread__entry_point(void *p)
{
ThreadNameF("[geo] evictor thread");
for(;;)
{
U64 check_time_us = os_now_microseconds();
+1 -5
View File
@@ -90,10 +90,6 @@ struct GEO_Shared
OS_Handle u2x_ring_cv;
OS_Handle u2x_ring_mutex;
// rjf: transfer threads
U64 xfer_thread_count;
OS_Handle *xfer_threads;
// rjf: evictor thread
OS_Handle evictor_thread;
};
@@ -138,7 +134,7 @@ internal R_Handle geo_buffer_from_key(GEO_Scope *scope, U128 key);
internal B32 geo_u2x_enqueue_req(U128 hash, U64 endt_us);
internal void geo_u2x_dequeue_req(U128 *hash_out);
internal void geo_xfer_thread__entry_point(void *p);
ASYNC_WORK_DEF(geo_xfer_work);
////////////////////////////////
//~ rjf: Evictor Threads