From 95b94a0f5669d8fb1c38da945bd73505f5c112d3 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 10 Nov 2020 15:00:40 +0000 Subject: [PATCH] Fix sync.Channel code; add `thread.run_with_poly_data` and `run_with_poly_data(2|3|4)` procedures --- core/sync/channel.odin | 24 ++++---- core/thread/thread.odin | 100 +++++++++++++++++++++++++++++--- core/thread/thread_unix.odin | 5 +- core/thread/thread_windows.odin | 8 ++- 4 files changed, 113 insertions(+), 24 deletions(-) diff --git a/core/sync/channel.odin b/core/sync/channel.odin index 142c8f602..7eddbfaf0 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -145,7 +145,7 @@ channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) { } -channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D >= .Both { +channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both { c := ch._internal; if c == nil { return; @@ -287,18 +287,19 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := for c.len >= c.cap { condition_wait_for(&c.cond); } - } else if c.len > 0 { + } else if c.len > 0 { // TODO(bill): determine correct behaviour if !block { return false; } condition_wait_for(&c.cond); + } else if c.len == 0 && !block { + return false; } send(c, msg); condition_signal(&c.cond); raw_channel_wait_queue_signal(c.recvq); - return true; } @@ -564,7 +565,7 @@ select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) q.state = &state; raw_channel_wait_queue_insert(&c.recvq, q); } - raw_channel_wait_queue_wait_on(&state); + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); for c, i in channels { q := &queues[i]; raw_channel_wait_queue_remove(&c.recvq, q); @@ -618,7 +619,7 @@ select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) q.state = &state; raw_channel_wait_queue_insert(&c.recvq, q); } - raw_channel_wait_queue_wait_on(&state); + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); for c, i in channels { q := &queues[i]; raw_channel_wait_queue_remove(&c.recvq, q); @@ -813,13 +814,12 @@ select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_che select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { switch len(channels) { case 0: - index = 0; + index = -1; return; case 1: - if c := channels[0]; channel_can_recv(c) { + ok: bool; + if msg, ok = channel_try_recv(channels[0]); ok { index = 0; - msg = channel_recv(c); - return; } return; } @@ -850,15 +850,13 @@ select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: i } select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { + index = -1; switch len(channels) { case 0: - index = 0; return; case 1: - if c := channels[0]; channel_can_send(c) { + if channel_try_send(channels[0], msg) { index = 0; - channel_send(c, msg); - return; } return; } diff --git a/core/thread/thread.odin b/core/thread/thread.odin index 85e0cc3fe..b98f4c07c 100644 --- a/core/thread/thread.odin +++ b/core/thread/thread.odin @@ -2,17 +2,26 @@ package thread import "core:runtime" import "core:sync" -import "core:intrinsics" +import "core:mem" +import "intrinsics" + +_ :: intrinsics; Thread_Proc :: #type proc(^Thread); +MAX_USER_ARGUMENTS :: 8; + Thread :: struct { - using specific: Thread_Os_Specific, - procedure: Thread_Proc, - data: rawptr, - user_index: int, + using specific: Thread_Os_Specific, + procedure: Thread_Proc, + data: rawptr, + user_index: int, + user_args: [MAX_USER_ARGUMENTS]rawptr, init_context: Maybe(runtime.Context), + + + creation_allocator: mem.Allocator, } #assert(size_of(Thread{}.user_index) == size_of(uintptr)); @@ -34,17 +43,94 @@ run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority := run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) { thread_proc :: proc(t: ^Thread) { fn := cast(proc(rawptr))t.data; - data := rawptr(uintptr(t.user_index)); + assert(t.user_index >= 1); + data := t.user_args[0]; fn(data); destroy(t); } t := create(thread_proc, priority); t.data = rawptr(fn); - t.user_index = int(uintptr(data)); + t.user_index = 1; + t.user_args = data; t.init_context = init_context; start(t); } +run_with_poly_data :: proc(data: $T, fn: proc(data: T), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where intrinsics.type_is_pointer(T) || size_of(T) == size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(rawptr))t.data; + assert(t.user_index >= 1); + data := t.user_args[0]; + fn(data); + destroy(t); + } + t := create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 1; + t.user_args[0] = transmute(rawptr)data; + t.init_context = init_context; + start(t); +} + +run_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr), + intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(rawptr, rawptr))t.data; + assert(t.user_index >= 2); + fn(t.user_args[0], t.user_args[1]); + destroy(t); + } + t := create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 2; + t.user_args[0] = transmute(rawptr)arg1; + t.user_args[1] = transmute(rawptr)arg2; + t.init_context = init_context; + start(t); +} + +run_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: proc(arg1: T1, arg2: T2, arg3: T3), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr), + intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr), + intrinsics.type_is_pointer(T3) || size_of(T3) == size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(rawptr, rawptr, rawptr))t.data; + assert(t.user_index >= 3); + fn(t.user_args[0], t.user_args[1], t.user_args[2]); + destroy(t); + } + t := create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 3; + t.user_args[0] = transmute(rawptr)arg1; + t.user_args[1] = transmute(rawptr)arg2; + t.user_args[2] = transmute(rawptr)arg3; + t.init_context = init_context; + start(t); +} +run_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4: $T4, fn: proc(arg1: T1, arg2: T2, arg3: T3, arg4: T4), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where intrinsics.type_is_pointer(T1) || size_of(T1) == size_of(rawptr), + intrinsics.type_is_pointer(T2) || size_of(T2) == size_of(rawptr), + intrinsics.type_is_pointer(T3) || size_of(T3) == size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(rawptr, rawptr, rawptr))t.data; + assert(t.user_index >= 3); + fn(t.user_args[0], t.user_args[1], t.user_args[2]); + destroy(t); + } + t := create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 3; + t.user_args[0] = transmute(rawptr)arg1; + t.user_args[1] = transmute(rawptr)arg2; + t.user_args[2] = transmute(rawptr)arg3; + t.init_context = init_context; + start(t); +} + + create_and_start :: proc(fn: Thread_Proc, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) -> ^Thread { t := create(fn, priority); diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index 027ffe026..d87291c0e 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -85,6 +85,7 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T if thread == nil { return nil; } + thread.creation_allocator = context.allocator; // Set thread priority. policy: i32; @@ -106,7 +107,7 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T sync.mutex_init(&thread.start_mutex); sync.condition_init(&thread.start_gate, &thread.start_mutex); if unix.pthread_create(&thread.unix_thread, &attrs, __linux_thread_entry_proc, thread) != 0 { - free(thread); + free(thread, thread.creation_allocator); return nil; } thread.procedure = procedure; @@ -172,7 +173,7 @@ join_multiple :: proc(threads: ..^Thread) { destroy :: proc(t: ^Thread) { join(t); t.unix_thread = {}; - free(t); + free(t, t.creation_allocator); } diff --git a/core/thread/thread_windows.odin b/core/thread/thread_windows.odin index f94632b35..27a14c7f6 100644 --- a/core/thread/thread_windows.odin +++ b/core/thread/thread_windows.odin @@ -49,10 +49,14 @@ create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^T thread := new(Thread); + if thread == nil { + return nil; + } + thread.creation_allocator = context.allocator; win32_thread := win32.CreateThread(nil, 0, __windows_thread_entry_proc, thread, win32.CREATE_SUSPENDED, &win32_thread_id); if win32_thread == nil { - free(thread); + free(thread, thread.creation_allocator); return nil; } thread.procedure = procedure; @@ -111,7 +115,7 @@ join_multiple :: proc(threads: ..^Thread) { destroy :: proc(thread: ^Thread) { join(thread); - free(thread); + free(thread, thread.creation_allocator); } terminate :: proc(using thread : ^Thread, exit_code: u32) {