diff --git a/core/sync/channel.odin b/core/sync/channel.odin index 94b21e683..ed9c526ad 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -1,5 +1,6 @@ package sync +// import "core:fmt" import "core:mem" import "core:time" import "core:intrinsics" @@ -39,11 +40,11 @@ channel_cap :: proc(ch: $C/Channel($T)) -> int { channel_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) { msg := msg; - _ = raw_channel_send_impl(ch._internal, &msg, false, loc); + _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc); } channel_try_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) -> bool { msg := msg; - return raw_channel_send_impl(ch._internal, &msg, true, loc); + return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc); } channel_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T) { @@ -68,6 +69,10 @@ channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T channel_is_nil :: proc(ch: $C/Channel($T)) -> bool { return ch._internal == nil; } +channel_is_open :: proc(ch: $C/Channel($T)) -> bool { + c := ch._internal; + return c != nil && !c.closed; +} channel_eq :: proc(a, b: $C/Channel($T)) -> bool { @@ -108,19 +113,14 @@ channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) { } -channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) { +channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, ok: bool) { c := ch._internal; - switch { - case c == nil: + if c == nil { return; - case intrinsics.atomic_load(&c.closed): - if channel_can_recv(ch) { - val = channel_recv(ch); - open = true; - } - case: - val = channel_recv(ch); - open = true; + } + + if !c.closed || c.len > 0 { + val, ok = channel_recv(ch), true; } return; } @@ -129,8 +129,11 @@ channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) { channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { backing: [64]int; candidates := backing[:]; - if len(channels) > len(backing) { - candidates = make([]int, len(channels), context.temp_allocator); + cap := len(channels); + if cap > len(backing) { + candidates = make([]int, cap, context.temp_allocator); + } else { + candidates = candidates[:cap]; } count := u32(0); @@ -298,7 +301,7 @@ raw_channel_destroy :: proc(c: ^Raw_Channel) { } -raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc := #caller_location) -> bool { +raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { send :: proc(c: ^Raw_Channel, src: rawptr) { dst := uintptr(c.data) + uintptr(c.write * c.elem_size); mem.copy(rawptr(dst), src, c.elem_size); @@ -315,7 +318,7 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc mutex_lock(&c.mutex); if c.cap > 0 { - if no_block && c.len >= c.cap { + if !block && c.len >= c.cap { mutex_unlock(&c.mutex); return false; } @@ -323,6 +326,8 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc for c.len >= c.cap { condition_wait_for(&c.cond); } + } else if c.len > 0 { + condition_wait_for(&c.cond); } send(c, msg); @@ -352,7 +357,12 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat } intrinsics.atomic_store(&c.ready, false); recv(c, res, loc); - if c.cap > 0 && c.len == c.cap - 1 { + if c.cap > 0 { + if c.len == c.cap - 1 { + // NOTE(bill): Only signal on the last one + condition_signal(&c.cond); + } + } else { condition_signal(&c.cond); } }