bucket active conversion tasks by priority; adjust thread cap for bg conversion processes, don't flood cores with work while target is doing things

This commit is contained in:
Ryan Fleury
2025-10-15 17:56:10 -07:00
parent 410837d26d
commit 891eaec5cb
5 changed files with 492 additions and 242 deletions
+62
View File
@@ -0,0 +1,62 @@
// Copyright (c) Epic Games Tools
// Licensed under the MIT license (https://opensource.org/license/mit/)
////////////////////////////////
//~ rjf: ID Functions
internal void cfg_id_list_push(Arena *arena, CFG_IDList *list, CFG_ID id);
internal CFG_IDList cfg_id_list_copy(Arena *arena, CFG_IDList *src);
////////////////////////////////
//~ rjf: Node Pointer Data Structure Functions
internal void cfg_node_ptr_list_push(Arena *arena, CFG_NodePtrList *list, CFG_Node *node);
internal void cfg_node_ptr_list_push_front(Arena *arena, CFG_NodePtrList *list, CFG_Node *node);
#define cfg_node_ptr_list_first(list) ((list)->count ? (list)->first->v : &cfg_nil_node)
#define cfg_node_ptr_list_last(list) ((list)->count ? (list)->last->v : &cfg_nil_node)
internal CFG_NodePtrArray cfg_node_ptr_array_from_list(Arena *arena, CFG_NodePtrList *list);
////////////////////////////////
//~ rjf: Config Reading Functions
//- rjf: context selection
internal void cfg_ctx_select(CFG_Ctx *ctx);
//- rjf: tree navigations
internal CFG_Node *cfg_node_from_id(CFG_ID id);
internal CFG_Node *cfg_node_child_from_string(CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_child_from_string_or_parent(CFG_Node *parent, String8 string);
internal CFG_NodePtrList cfg_node_child_list_from_string(Arena *arena, CFG_Node *parent, String8 string);
internal CFG_NodePtrList cfg_node_top_level_list_from_string(Arena *arena, String8 string);
internal CFG_NodeRec cfg_node_rec__depth_first(CFG_Node *root, CFG_Node *node);
//- rjf: serialization
internal String8 cfg_string_from_tree(Arena *arena, String8 root_path, CFG_Node *root);
////////////////////////////////
//~ rjf: Config Writing Functions
//- rjf: state creation / destroying
internal CFG_State *cfg_state_alloc(void);
internal void cfg_state_release(CFG_State *state);
//- rjf: state -> ctx
internal CFG_Ctx *cfg_state_ctx(CFG_State *state);
//- rjf: tree building
internal CFG_Node *cfg_node_alloc(CFG_State *state);
internal void cfg_node_release(CFG_State *state, CFG_Node *node);
internal void cfg_node_release_all_children(CFG_State *state, CFG_Node *node);
internal CFG_Node *cfg_node_new(CFG_State *state, CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_newf(CFG_State *state, CFG_Node *parent, char *fmt, ...);
internal CFG_Node *cfg_node_new_replace(CFG_State *state, CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_new_replacef(CFG_State *state, CFG_Node *parent, char *fmt, ...);
internal CFG_Node *cfg_node_deep_copy(CFG_State *state, CFG_Node *src_root);
internal void cfg_node_equip_string(CFG_State *state, CFG_Node *node, String8 string);
internal void cfg_node_equip_stringf(CFG_State *state, CFG_Node *node, char *fmt, ...);
internal void cfg_node_insert_child(CFG_State *state, CFG_Node *parent, CFG_Node *prev_child, CFG_Node *new_child);
internal void cfg_node_unhook(CFG_State *state, CFG_Node *parent, CFG_Node *child);
internal CFG_Node *cfg_node_child_from_string_or_alloc(CFG_State *state, CFG_Node *parent, String8 string);
//- rjf: deserialization
internal CFG_NodePtrList cfg_node_ptr_list_from_string(Arena *arena, String8 root_path, String8 string);
+178
View File
@@ -0,0 +1,178 @@
// Copyright (c) Epic Games Tools
// Licensed under the MIT license (https://opensource.org/license/mit/)
#ifndef CONFIG_H
#define CONFIG_H
////////////////////////////////
//~ rjf: IDs
typedef U64 CFG_ID;
typedef struct CFG_IDNode CFG_IDNode;
struct CFG_IDNode
{
CFG_IDNode *next;
CFG_ID v;
};
typedef struct CFG_IDList CFG_IDList;
struct CFG_IDList
{
CFG_IDNode *first;
CFG_IDNode *last;
U64 count;
};
////////////////////////////////
//~ rjf: Tree Types
typedef struct CFG_Node CFG_Node;
struct CFG_Node
{
CFG_Node *first;
CFG_Node *last;
CFG_Node *next;
CFG_Node *prev;
CFG_Node *parent;
CFG_ID id;
String8 string;
};
typedef struct CFG_NodePtrNode CFG_NodePtrNode;
struct CFG_NodePtrNode
{
CFG_NodePtrNode *next;
CFG_NodePtrNode *prev;
CFG_Node *v;
};
typedef struct CFG_NodePtrSlot CFG_NodePtrSlot;
struct CFG_NodePtrSlot
{
CFG_NodePtrNode *first;
CFG_NodePtrNode *last;
};
typedef struct CFG_NodePtrList CFG_NodePtrList;
struct CFG_NodePtrList
{
CFG_NodePtrNode *first;
CFG_NodePtrNode *last;
U64 count;
};
typedef struct CFG_NodePtrArray CFG_NodePtrArray;
struct CFG_NodePtrArray
{
CFG_Node **v;
U64 count;
};
typedef struct CFG_NodeRec CFG_NodeRec;
struct CFG_NodeRec
{
CFG_Node *next;
S32 push_count;
S32 pop_count;
};
////////////////////////////////
//~ rjf: Config State Bundles
typedef struct CFG_Ctx CFG_Ctx;
struct CFG_Ctx
{
CFG_Node *root;
U64 id_slots_count;
CFG_NodePtrSlot *id_slots;
U64 change_gen;
CFG_ID last_accessed_id;
CFG_Node *last_accessed;
};
typedef struct CFG_State CFG_State;
struct CFG_State
{
Arena *arena;
CFG_Node *free;
CFG_NodePtrNode *free_id_node;
U64 id_gen;
CFG_Ctx ctx;
};
////////////////////////////////
//~ rjf: Globals
read_only global CFG_Node cfg_nil_node =
{
&cfg_nil_node,
&cfg_nil_node,
&cfg_nil_node,
&cfg_nil_node,
&cfg_nil_node,
};
thread_static CFG_Ctx *cfg_ctx = 0;
////////////////////////////////
//~ rjf: ID Functions
internal void cfg_id_list_push(Arena *arena, CFG_IDList *list, CFG_ID id);
internal CFG_IDList cfg_id_list_copy(Arena *arena, CFG_IDList *src);
////////////////////////////////
//~ rjf: Node Pointer Data Structure Functions
internal void cfg_node_ptr_list_push(Arena *arena, CFG_NodePtrList *list, CFG_Node *node);
internal void cfg_node_ptr_list_push_front(Arena *arena, CFG_NodePtrList *list, CFG_Node *node);
#define cfg_node_ptr_list_first(list) ((list)->count ? (list)->first->v : &cfg_nil_node)
#define cfg_node_ptr_list_last(list) ((list)->count ? (list)->last->v : &cfg_nil_node)
internal CFG_NodePtrArray cfg_node_ptr_array_from_list(Arena *arena, CFG_NodePtrList *list);
////////////////////////////////
//~ rjf: Config Reading Functions
//- rjf: context selection
internal void cfg_ctx_select(CFG_Ctx *ctx);
//- rjf: tree navigations
internal CFG_Node *cfg_node_from_id(CFG_ID id);
internal CFG_Node *cfg_node_child_from_string(CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_child_from_string_or_parent(CFG_Node *parent, String8 string);
internal CFG_NodePtrList cfg_node_child_list_from_string(Arena *arena, CFG_Node *parent, String8 string);
internal CFG_NodePtrList cfg_node_top_level_list_from_string(Arena *arena, String8 string);
internal CFG_NodeRec cfg_node_rec__depth_first(CFG_Node *root, CFG_Node *node);
//- rjf: serialization
internal String8 cfg_string_from_tree(Arena *arena, String8 root_path, CFG_Node *root);
////////////////////////////////
//~ rjf: Config Writing Functions
//- rjf: state creation / destroying
internal CFG_State *cfg_state_alloc(void);
internal void cfg_state_release(CFG_State *state);
//- rjf: state -> ctx
internal CFG_Ctx *cfg_state_ctx(CFG_State *state);
//- rjf: tree building
internal CFG_Node *cfg_node_alloc(CFG_State *state);
internal void cfg_node_release(CFG_State *state, CFG_Node *node);
internal void cfg_node_release_all_children(CFG_State *state, CFG_Node *node);
internal CFG_Node *cfg_node_new(CFG_State *state, CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_newf(CFG_State *state, CFG_Node *parent, char *fmt, ...);
internal CFG_Node *cfg_node_new_replace(CFG_State *state, CFG_Node *parent, String8 string);
internal CFG_Node *cfg_node_new_replacef(CFG_State *state, CFG_Node *parent, char *fmt, ...);
internal CFG_Node *cfg_node_deep_copy(CFG_State *state, CFG_Node *src_root);
internal void cfg_node_equip_string(CFG_State *state, CFG_Node *node, String8 string);
internal void cfg_node_equip_stringf(CFG_State *state, CFG_Node *node, char *fmt, ...);
internal void cfg_node_insert_child(CFG_State *state, CFG_Node *parent, CFG_Node *prev_child, CFG_Node *new_child);
internal void cfg_node_unhook(CFG_State *state, CFG_Node *parent, CFG_Node *child);
internal CFG_Node *cfg_node_child_from_string_or_alloc(CFG_State *state, CFG_Node *parent, String8 string);
//- rjf: deserialization
internal CFG_NodePtrList cfg_node_ptr_list_from_string(Arena *arena, String8 root_path, String8 string);
#endif // CONFIG_H
+246 -236
View File
@@ -553,8 +553,8 @@ di_async_tick(void)
////////////////////////////
//- rjf: pop all requests, high priority first
//
DI_RequestNode *first_req = 0;
DI_RequestNode *last_req = 0;
DI_RequestNode *first_req[2] = {0};
DI_RequestNode *last_req[2] = {0};
for EachElement(idx, di_shared->req_batches)
{
DI_RequestBatch *b = &di_shared->req_batches[idx];
@@ -564,7 +564,7 @@ di_async_tick(void)
{
DI_RequestNode *n_copy = push_array(scratch.arena, DI_RequestNode, 1);
MemoryCopyStruct(&n_copy->v, &n->v);
SLLQueuePush(first_req, last_req, n_copy);
SLLQueuePush(first_req[idx], last_req[idx], n_copy);
}
arena_clear(b->arena);
b->first = b->last = 0;
@@ -591,261 +591,271 @@ di_async_tick(void)
////////////////////////////
//- rjf: generate load tasks for all unique requests
//
for EachNode(n, DI_RequestNode, first_req)
{
// rjf: unpack request
DI_Key key = n->v.key;
// rjf: determine if this request is a duplicate
B32 request_is_duplicate = 1;
//
for EachElement(priority_idx, first_req)
{
for EachNode(n, DI_RequestNode, first_req[priority_idx])
{
U64 hash = u64_hash_from_str8(str8_struct(&key));
U64 slot_idx = hash%di_shared->slots_count;
DI_Slot *slot = &di_shared->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&di_shared->stripes, slot_idx);
RWMutexScope(stripe->rw_mutex, 0)
// rjf: unpack request
DI_Key key = n->v.key;
// rjf: determine if this request is a duplicate
B32 request_is_duplicate = 1;
{
for(DI_Node *n = slot->first; n != 0; n = n->next)
U64 hash = u64_hash_from_str8(str8_struct(&key));
U64 slot_idx = hash%di_shared->slots_count;
DI_Slot *slot = &di_shared->slots[slot_idx];
Stripe *stripe = stripe_from_slot_idx(&di_shared->stripes, slot_idx);
RWMutexScope(stripe->rw_mutex, 0)
{
if(di_key_match(n->key, key) && ins_atomic_u64_eval(&n->completion_count) == 0)
for(DI_Node *n = slot->first; n != 0; n = n->next)
{
request_is_duplicate = (ins_atomic_u64_eval_cond_assign(&n->working_count, 1, 0) != 0);
break;
if(di_key_match(n->key, key) && ins_atomic_u64_eval(&n->completion_count) == 0)
{
request_is_duplicate = (ins_atomic_u64_eval_cond_assign(&n->working_count, 1, 0) != 0);
break;
}
}
}
}
}
// rjf: if not a duplicate, create new task
if(!request_is_duplicate)
{
DI_LoadTask *t = di_shared->free_load_task;
if(t)
// rjf: if not a duplicate, create new task
if(!request_is_duplicate)
{
SLLStackPop(di_shared->free_load_task);
DI_LoadTask *t = di_shared->free_load_task;
if(t)
{
SLLStackPop(di_shared->free_load_task);
}
else
{
t = push_array_no_zero(di_shared->arena, DI_LoadTask, 1);
}
MemoryZeroStruct(t);
DLLPushBack(di_shared->first_load_task[priority_idx], di_shared->last_load_task[priority_idx], t);
t->key = key;
}
else
{
t = push_array_no_zero(di_shared->arena, DI_LoadTask, 1);
}
MemoryZeroStruct(t);
DLLPushBack(di_shared->first_load_task, di_shared->last_load_task, t);
t->key = key;
}
}
}
}
////////////////////////////
//- rjf: update tasks: configure, launch if we can, & retire if we can
//
for(DI_LoadTask *t = di_shared->first_load_task, *next = 0; t != 0; t = next)
{
next = t->next;
//- rjf: unpack key
DI_Key key = t->key;
U64 key_hash = u64_hash_from_str8(str8_struct(&key));
U64 key_slot_idx = key_hash%di_shared->key2path_slots_count;
DI_KeySlot *key_slot = &di_shared->key2path_slots[key_slot_idx];
Stripe *key_stripe = stripe_from_slot_idx(&di_shared->key2path_stripes, key_slot_idx);
//- rjf: get key's O.G. path
String8 og_path = {0};
U64 og_min_timestamp = 0;
RWMutexScope(key_stripe->rw_mutex, 0)
//
for EachElement(priority_idx, di_shared->first_load_task)
{
for(DI_LoadTask *t = di_shared->first_load_task[priority_idx], *next = 0; t != 0; t = next)
{
for(DI_KeyPathNode *n = key_slot->first; n != 0; n = n->next)
next = t->next;
//- rjf: unpack key
DI_Key key = t->key;
U64 key_hash = u64_hash_from_str8(str8_struct(&key));
U64 key_slot_idx = key_hash%di_shared->key2path_slots_count;
DI_KeySlot *key_slot = &di_shared->key2path_slots[key_slot_idx];
Stripe *key_stripe = stripe_from_slot_idx(&di_shared->key2path_stripes, key_slot_idx);
//- rjf: get key's O.G. path
String8 og_path = {0};
U64 og_min_timestamp = 0;
RWMutexScope(key_stripe->rw_mutex, 0)
{
if(di_key_match(n->key, key))
for(DI_KeyPathNode *n = key_slot->first; n != 0; n = n->next)
{
og_path = str8_copy(scratch.arena, n->path);
og_min_timestamp = n->min_timestamp;
break;
}
}
}
//- rjf: analyze O.G. debug info
if(!t->og_analyzed)
{
t->og_analyzed = 1;
OS_Handle file = os_file_open(OS_AccessFlag_ShareRead|OS_AccessFlag_Read, og_path);
FileProperties props = os_properties_from_file(file);
t->og_size = props.size;
U64 rdi_magic_maybe = 0;
if(os_file_read_struct(file, 0, &rdi_magic_maybe) == 8 &&
rdi_magic_maybe == RDI_MAGIC_CONSTANT)
{
t->og_is_rdi = 1;
}
os_file_close(file);
}
U64 og_size = t->og_size;
B32 og_is_rdi = t->og_is_rdi;
//- rjf: compute key's RDI path
String8 rdi_path = {0};
{
if(og_is_rdi)
{
rdi_path = og_path;
}
else
{
rdi_path = str8f(scratch.arena, "%S.rdi", str8_chop_last_dot(og_path));
}
}
//- rjf: determine if RDI is stale
if(!t->rdi_analyzed)
{
t->rdi_analyzed = 1;
OS_Handle file = os_file_open(OS_AccessFlag_ShareRead|OS_AccessFlag_Read, rdi_path);
FileProperties props = os_properties_from_file(file);
if(props.modified < og_min_timestamp)
{
t->rdi_is_stale = 1;
}
else
{
t->rdi_is_stale = 1;
RDI_Header header = {0};
if(os_file_read_struct(file, 0, &header) == sizeof(header))
{
t->rdi_is_stale = (header.encoding_version != RDI_ENCODING_VERSION);
}
}
os_file_close(file);
}
B32 rdi_is_stale = t->rdi_is_stale;
//- rjf: calculate thread counts for conversion processes
if(!og_is_rdi && rdi_is_stale && t->thread_count == 0)
{
U64 thread_count = 1;
U64 max_thread_count = os_get_system_info()->logical_processor_count;
{
if(0){}
else if(og_size <= MB(4)) {thread_count = 1;}
else if(og_size <= MB(256)) {thread_count = max_thread_count/4;}
else if(og_size <= MB(512)) {thread_count = max_thread_count/3;}
else if(og_size <= GB(1)) {thread_count = max_thread_count/2;}
else {thread_count = max_thread_count;}
}
thread_count = Max(1, thread_count);
t->thread_count = thread_count;
}
//- rjf: determine if there are threads available
B32 threads_available = 0;
{
U64 max_threads = os_get_system_info()->logical_processor_count*2;
U64 current_threads = di_shared->conversion_thread_count;
U64 needed_threads = (current_threads + t->thread_count);
threads_available = (max_threads >= needed_threads);
}
//- rjf: launch conversion processes
if(threads_available && !og_is_rdi && rdi_is_stale && t->thread_count != 0 && t->status != DI_LoadTaskStatus_Active)
{
B32 should_compress = 0;
OS_ProcessLaunchParams params = {0};
params.path = os_get_process_info()->binary_path;
params.inherit_env = 1;
params.consoleless = 1;
str8_list_pushf(scratch.arena, &params.cmd_line, "raddbg");
str8_list_pushf(scratch.arena, &params.cmd_line, "--bin");
str8_list_pushf(scratch.arena, &params.cmd_line, "--quiet");
if(should_compress)
{
str8_list_pushf(scratch.arena, &params.cmd_line, "--compress");
}
// str8_list_pushf(scratch.arena, &params.cmd_line, "--capture");
str8_list_pushf(scratch.arena, &params.cmd_line, "--rdi");
str8_list_pushf(scratch.arena, &params.cmd_line, "--out:%S", rdi_path);
str8_list_pushf(scratch.arena, &params.cmd_line, "--thread_count:%I64u", t->thread_count);
str8_list_pushf(scratch.arena, &params.cmd_line, "--signal_pid:%I64u", (U64)os_get_process_info()->pid);
str8_list_pushf(scratch.arena, &params.cmd_line, "--signal_code:%I64u", (U64)t);
str8_list_pushf(scratch.arena, &params.cmd_line, "%S", og_path);
ProfMsg("launch creation for %.*s", str8_varg(rdi_path));
t->process = os_process_launch(&params);
t->status = DI_LoadTaskStatus_Active;
di_shared->conversion_process_count += 1;
di_shared->conversion_thread_count += t->thread_count;
// rjf: send event
MutexScope(di_shared->event_mutex)
{
DI_EventNode *n = push_array(di_shared->event_arena, DI_EventNode, 1);
SLLQueuePush(di_shared->events.first, di_shared->events.last, n);
di_shared->events.count += 1;
n->v.kind = DI_EventKind_ConversionStarted;
n->v.string = str8_copy(di_shared->event_arena, rdi_path);
}
}
//- rjf: if active & process has completed, mark as done
{
U64 exit_code = 0;
if(t->status == DI_LoadTaskStatus_Active)
{
B32 task_is_done = 0;
for(DI_LoadCompletion *c = first_completion; c != 0; c = c->next)
{
if(c->code == (U64)t)
if(di_key_match(n->key, key))
{
task_is_done = 1;
og_path = str8_copy(scratch.arena, n->path);
og_min_timestamp = n->min_timestamp;
break;
}
}
if(!task_is_done)
{
task_is_done = os_process_join(t->process, 0, 0);
}
if(task_is_done)
{
t->status = DI_LoadTaskStatus_Done;
di_shared->conversion_process_count -= 1;
di_shared->conversion_thread_count -= t->thread_count;
}
}
}
//- rjf: if the RDI for this task is not stale, then we're already done - mark this
// task as done & prepped for storing into the cache
if(!rdi_is_stale)
{
t->status = DI_LoadTaskStatus_Done;
}
//- rjf: if the RDI for this task *is* stale, but the O.G. path is actually RDI,
// then we can't actually re-convert to produce a non-stale RDI. in this case, just
// mark as done.
if(rdi_is_stale && og_is_rdi)
{
t->status = DI_LoadTaskStatus_Done;
}
//- rjf: if task is done, retire & recycle task; gather path to load
if(t->status == DI_LoadTaskStatus_Done)
{
if(!os_handle_match(t->process, os_handle_zero())) MutexScope(di_shared->event_mutex)
//- rjf: analyze O.G. debug info
if(!t->og_analyzed)
{
DI_EventNode *n = push_array(di_shared->event_arena, DI_EventNode, 1);
SLLQueuePush(di_shared->events.first, di_shared->events.last, n);
di_shared->events.count += 1;
n->v.kind = DI_EventKind_ConversionEnded;
n->v.string = str8_copy(di_shared->event_arena, rdi_path);
t->og_analyzed = 1;
OS_Handle file = os_file_open(OS_AccessFlag_ShareRead|OS_AccessFlag_Read, og_path);
FileProperties props = os_properties_from_file(file);
t->og_size = props.size;
U64 rdi_magic_maybe = 0;
if(os_file_read_struct(file, 0, &rdi_magic_maybe) == 8 &&
rdi_magic_maybe == RDI_MAGIC_CONSTANT)
{
t->og_is_rdi = 1;
}
os_file_close(file);
}
DLLRemove(di_shared->first_load_task, di_shared->last_load_task, t);
SLLStackPush(di_shared->free_load_task, t);
ParseTaskNode *n = push_array(scratch.arena, ParseTaskNode, 1);
n->v.key = key;
n->v.rdi_path = rdi_path;
SLLQueuePush(first_parse_task, last_parse_task, n);
parse_tasks_count += 1;
}
}
U64 og_size = t->og_size;
B32 og_is_rdi = t->og_is_rdi;
//- rjf: compute key's RDI path
String8 rdi_path = {0};
{
if(og_is_rdi)
{
rdi_path = og_path;
}
else
{
rdi_path = str8f(scratch.arena, "%S.rdi", str8_chop_last_dot(og_path));
}
}
//- rjf: determine if RDI is stale
if(!t->rdi_analyzed)
{
t->rdi_analyzed = 1;
OS_Handle file = os_file_open(OS_AccessFlag_ShareRead|OS_AccessFlag_Read, rdi_path);
FileProperties props = os_properties_from_file(file);
if(props.modified < og_min_timestamp)
{
t->rdi_is_stale = 1;
}
else
{
t->rdi_is_stale = 1;
RDI_Header header = {0};
if(os_file_read_struct(file, 0, &header) == sizeof(header))
{
t->rdi_is_stale = (header.encoding_version != RDI_ENCODING_VERSION);
}
}
os_file_close(file);
}
B32 rdi_is_stale = t->rdi_is_stale;
//- rjf: calculate thread counts for conversion processes
if(!og_is_rdi && rdi_is_stale && t->thread_count == 0)
{
U64 thread_count = 1;
U64 max_thread_count = os_get_system_info()->logical_processor_count;
if(priority_idx > 0)
{
max_thread_count = Max(1, max_thread_count/2);
}
{
if(0){}
else if(og_size <= MB(4)) {thread_count = 1;}
else if(og_size <= MB(256)) {thread_count = max_thread_count/4;}
else if(og_size <= MB(512)) {thread_count = max_thread_count/3;}
else if(og_size <= GB(1)) {thread_count = max_thread_count/2;}
else {thread_count = max_thread_count;}
}
thread_count = Max(1, thread_count);
t->thread_count = thread_count;
}
//- rjf: determine if there are threads available
B32 threads_available = 0;
{
U64 max_threads = os_get_system_info()->logical_processor_count;
U64 current_threads = di_shared->conversion_thread_count;
U64 needed_threads = (current_threads + t->thread_count);
threads_available = (max_threads >= needed_threads);
}
//- rjf: launch conversion processes
if(threads_available && !og_is_rdi && rdi_is_stale && t->thread_count != 0 && t->status != DI_LoadTaskStatus_Active)
{
B32 should_compress = 0;
OS_ProcessLaunchParams params = {0};
params.path = os_get_process_info()->binary_path;
params.inherit_env = 1;
params.consoleless = 1;
str8_list_pushf(scratch.arena, &params.cmd_line, "raddbg");
str8_list_pushf(scratch.arena, &params.cmd_line, "--bin");
str8_list_pushf(scratch.arena, &params.cmd_line, "--quiet");
if(should_compress)
{
str8_list_pushf(scratch.arena, &params.cmd_line, "--compress");
}
// str8_list_pushf(scratch.arena, &params.cmd_line, "--capture");
str8_list_pushf(scratch.arena, &params.cmd_line, "--rdi");
str8_list_pushf(scratch.arena, &params.cmd_line, "--out:%S", rdi_path);
str8_list_pushf(scratch.arena, &params.cmd_line, "--thread_count:%I64u", t->thread_count);
str8_list_pushf(scratch.arena, &params.cmd_line, "--signal_pid:%I64u", (U64)os_get_process_info()->pid);
str8_list_pushf(scratch.arena, &params.cmd_line, "--signal_code:%I64u", (U64)t);
str8_list_pushf(scratch.arena, &params.cmd_line, "%S", og_path);
ProfMsg("launch creation for %.*s", str8_varg(rdi_path));
t->process = os_process_launch(&params);
t->status = DI_LoadTaskStatus_Active;
di_shared->conversion_process_count += 1;
di_shared->conversion_thread_count += t->thread_count;
// rjf: send event
MutexScope(di_shared->event_mutex)
{
DI_EventNode *n = push_array(di_shared->event_arena, DI_EventNode, 1);
SLLQueuePush(di_shared->events.first, di_shared->events.last, n);
di_shared->events.count += 1;
n->v.kind = DI_EventKind_ConversionStarted;
n->v.string = str8_copy(di_shared->event_arena, rdi_path);
}
}
//- rjf: if active & process has completed, mark as done
{
U64 exit_code = 0;
if(t->status == DI_LoadTaskStatus_Active)
{
B32 task_is_done = 0;
for(DI_LoadCompletion *c = first_completion; c != 0; c = c->next)
{
if(c->code == (U64)t)
{
task_is_done = 1;
break;
}
}
if(!task_is_done)
{
task_is_done = os_process_join(t->process, 0, 0);
}
if(task_is_done)
{
t->status = DI_LoadTaskStatus_Done;
di_shared->conversion_process_count -= 1;
di_shared->conversion_thread_count -= t->thread_count;
}
}
}
//- rjf: if the RDI for this task is not stale, then we're already done - mark this
// task as done & prepped for storing into the cache
if(!rdi_is_stale)
{
t->status = DI_LoadTaskStatus_Done;
}
//- rjf: if the RDI for this task *is* stale, but the O.G. path is actually RDI,
// then we can't actually re-convert to produce a non-stale RDI. in this case, just
// mark as done.
if(rdi_is_stale && og_is_rdi)
{
t->status = DI_LoadTaskStatus_Done;
}
//- rjf: if task is done, retire & recycle task; gather path to load
if(t->status == DI_LoadTaskStatus_Done)
{
if(!os_handle_match(t->process, os_handle_zero())) MutexScope(di_shared->event_mutex)
{
DI_EventNode *n = push_array(di_shared->event_arena, DI_EventNode, 1);
SLLQueuePush(di_shared->events.first, di_shared->events.last, n);
di_shared->events.count += 1;
n->v.kind = DI_EventKind_ConversionEnded;
n->v.string = str8_copy(di_shared->event_arena, rdi_path);
}
DLLRemove(di_shared->first_load_task[priority_idx], di_shared->last_load_task[priority_idx], t);
SLLStackPush(di_shared->free_load_task, t);
ParseTaskNode *n = push_array(scratch.arena, ParseTaskNode, 1);
n->v.key = key;
n->v.rdi_path = rdi_path;
SLLQueuePush(first_parse_task, last_parse_task, n);
parse_tasks_count += 1;
}
}
}
////////////////////////////
//- rjf: join all parse tasks
+2 -2
View File
@@ -267,8 +267,8 @@ struct DI_Shared
DI_RequestBatch req_batches[2]; // [0] -> high priority, [1] -> low priority
// rjf: conversion tasks
DI_LoadTask *first_load_task;
DI_LoadTask *last_load_task;
DI_LoadTask *first_load_task[2];
DI_LoadTask *last_load_task[2];
DI_LoadTask *free_load_task;
U64 conversion_process_count;
U64 conversion_thread_count;
+4 -4
View File
@@ -148,13 +148,13 @@
// [ ] step-out-of-loop
//
//- late-conversion performance improvements
// [ ] investigate wide-conversion performance
// [ ] oversubscribing cores?
// [ ] conversion crashes?
// [x] investigate wide-conversion performance
// [x] oversubscribing cores?
// [x] conversion crashes?
// [ ] live++ investigations - ctrl+alt+f11 in UE?
//
//- memory usage improvements
// [ ] "root" concept in hash store, which buckets keys & allows usage code to
// [x] "root" concept in hash store, which buckets keys & allows usage code to
// jettison a collection of keys in retained mode fashion
//
//- short-to-medium term future features