From 52d38f1788f11dfb0b7e55ec694258cdce012374 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 12 Jun 2025 17:15:09 -0300 Subject: [PATCH 1/6] test/core/sync/chan: serialize try_select tests These tests will race access to __global_context_for_test, which can cause the test suite to flake. Even though only a single test actually references the variable, the logic in try_select consumes it. --- tests/core/sync/chan/test_core_sync_chan.odin | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index e8bb553b1..608d0c3d2 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -35,6 +35,10 @@ MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second SLEEP_TIME :: 1 * time.Millisecond +// Synchronizes try_select tests that require access to global state. +test_lock: sync.Mutex +__global_context_for_test: rawptr + comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data manual_buffering := data.manual_buffering @@ -277,6 +281,7 @@ test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) { // operation will process it. @test test_try_select_raw_happy :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) @@ -348,6 +353,7 @@ test_try_select_raw_happy :: proc(t: ^testing.T) { // try_select_raw operation does not block. @test test_try_select_raw_default_state :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) @@ -374,6 +380,7 @@ test_try_select_raw_default_state :: proc(t: ^testing.T) { // thread between calls to can_{send,recv} and try_{send,recv}_raw. @test test_try_select_raw_no_toctou :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) // Trigger will be used to coordinate between the thief and the try_select. @@ -382,9 +389,6 @@ test_try_select_raw_no_toctou :: proc(t: ^testing.T) { assert(trigger_err == nil, "allocation failed") defer chan.destroy(trigger) - @(static) - __global_context_for_test: rawptr - __global_context_for_test = &trigger defer __global_context_for_test = nil From c29168f76f05e98e7532c65eda253e14992f8ddf Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 12 Jun 2025 12:39:57 -0300 Subject: [PATCH 2/6] core/sync/chan.try_send: avoid blocking if no reader is available This changes the semantics of try_send to be consistently non-blocking. That is, if the buffered is full OR there are no readers it returns false. The previous behaviour was such that it would block in the latter case of no reader, and it would wait for a reader. That is problematic because it produces inconsistent behaviour between buffered and unbuffered channels which is astonishing and adds complexity to the caller. To illustrate the problem with the old behaviour, consider the try_select operation: if a send-channel happens to be unbuffered the try_select (which wants to never block) can now block, that unbuffered send channel is selected (at random) and there is no reader on the other side. Thus we have unpredictable blocking behaviour, which breaks the guarantee that try_select never blocks. If you want a blocking send you can just call "send" (the blocking variant). In addition, there is some reader/writer math done inside can_{send,recv} such that they only report true if there is sufficient reader/writer capacity. If there is contention we need to ensure that each reader is paired to exactly one writer. Consider try_send: if there is a single reader we can send. If there is a single reader and a single writer, then we cannot send, as that reader will be paired with the existing writer. Therefore can_send is only true if there are more readers than writers at the time of check. NOTE: The original tests don't need to use wait-looping with thread.yield() or heuristic sleep. Instead we can just use blocking channel operations rather than non-blocking operations. --- core/sync/chan/chan.odin | 15 ++--- tests/core/sync/chan/test_core_sync_chan.odin | 63 ++++++------------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index c5a4cf317..1d91556b5 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -779,7 +779,7 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed { + if c.closed || c.r_waiting - c.w_waiting <= 0 { return false } @@ -843,7 +843,7 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed || c.w_waiting == 0 { + if c.closed || c.w_waiting - c.r_waiting <= 0 { return false } @@ -1046,8 +1046,9 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { } /* -Returns whether a message is ready to be read, i.e., -if a call to `recv` or `recv_raw` would block +Returns whether a message can be read without blocking the current +thread. Specifically, it checks if the channel is buffered and not full, +or if there is already a writer attempting to send a message. **Inputs** - `c`: The channel @@ -1075,7 +1076,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len > 0 } - return c.w_waiting > 0 + return c.w_waiting - c.r_waiting > 0 } @@ -1088,7 +1089,7 @@ or if there is already a reader waiting for a message. - `c`: The channel **Returns** -- `true` if a message can be send, `false` otherwise +- `true` if a message can be sent, `false` otherwise Example: @@ -1110,7 +1111,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len < c.queue.cap } - return c.w_waiting == 0 + return c.r_waiting - c.w_waiting > 0 } /* diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 608d0c3d2..52b1f7d31 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -33,7 +33,6 @@ Comm :: struct { BUFFER_SIZE :: 8 MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second -SLEEP_TIME :: 1 * time.Millisecond // Synchronizes try_select tests that require access to global state. test_lock: sync.Mutex @@ -41,14 +40,9 @@ __global_context_for_test: rawptr comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data - manual_buffering := data.manual_buffering n: i64 - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - recv_loop: for msg in chan.recv(data.host) { #partial switch msg.type { case .Add: n += msg.i @@ -60,14 +54,6 @@ comm_client :: proc(th: ^thread.Thread) { case: panic("Unknown message type for client.") } - - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - } - - for manual_buffering && !chan.can_send(data.host) { - thread.yield() } chan.send(data.client, Message{.Result, n}) @@ -76,9 +62,6 @@ comm_client :: proc(th: ^thread.Thread) { send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { expected = 1 - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.Add, 1}) log.debug(Message{.Add, 1}) @@ -100,9 +83,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: expected /= msg.i } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } if manual_buffering { testing.expect(t, chan.len(host) == 0) } @@ -111,9 +91,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: log.debug(msg) } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.End, 0}) log.debug(Message{.End, 0}) chan.close(host) @@ -152,18 +129,15 @@ test_chan_buffered :: proc(t: ^testing.T) { expected := send_messages(t, comm.host, manual_buffering = false) - // Sleep so we can give the other thread enough time to buffer its message. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.len(comm.client), 1) - result, ok := chan.try_recv(comm.client) - - // One more sleep to ensure it has enough time to close. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.is_closed(comm.client), true) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) + + // Wait for channel to close. + _, ok = chan.recv(comm.client) + testing.expect(t, !ok, "channel should have been closed") + + testing.expect_value(t, chan.is_closed(comm.client), true) log.debug(result, expected) // Make sure sending to closed channels fails. @@ -175,6 +149,8 @@ test_chan_buffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test @@ -197,6 +173,10 @@ test_chan_unbuffered :: proc(t: ^testing.T) { testing.expect(t, !chan.is_buffered(comm.client)) testing.expect(t, chan.is_unbuffered(comm.host)) testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect(t, !chan.can_send(comm.host)) + testing.expect(t, !chan.can_send(comm.client)) + testing.expect(t, !chan.can_recv(comm.host)) + testing.expect(t, !chan.can_recv(comm.client)) testing.expect_value(t, chan.len(comm.host), 0) testing.expect_value(t, chan.len(comm.client), 0) testing.expect_value(t, chan.cap(comm.host), 0) @@ -207,25 +187,16 @@ test_chan_unbuffered :: proc(t: ^testing.T) { reckoner.data = &comm thread.start(reckoner) - for !chan.can_send(comm.client) { - thread.yield() - } - expected := send_messages(t, comm.host) testing.expect_value(t, chan.is_closed(comm.host), true) - for !chan.can_recv(comm.client) { - thread.yield() - } - - result, ok := chan.try_recv(comm.client) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) log.debug(result, expected) - // Sleep so we can give the other thread enough time to close its side - // after we've received its message. - time.sleep(SLEEP_TIME) + _, ok2 := chan.recv(comm.client) + testing.expect(t, !ok2, "read of closed channel should return false") testing.expect_value(t, chan.is_closed(comm.client), true) @@ -238,6 +209,8 @@ test_chan_unbuffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test From 2d12e265ccb51ce6385f56a53e2ea261eb92ac82 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 12 Jun 2025 15:06:27 -0300 Subject: [PATCH 3/6] tests/core/sync/chan: add test for contended try_send This test ensures that contending threads racing to try_send against a single blocking read will result in exactly one winner without any senders blocking. --- tests/core/sync/chan/test_core_sync_chan.odin | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 52b1f7d31..ae7456d99 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -4,6 +4,7 @@ import "base:runtime" import "base:intrinsics" import "core:log" import "core:math/rand" +import "core:sync" import "core:sync/chan" import "core:testing" import "core:thread" @@ -227,6 +228,154 @@ test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { testing.expect(t, !chan.send(ch, 32)) } +// Ensures that try_send for unbuffered channels works as expected. +// If 1 reader of a channel, and 3 try_senders, only one of the senders +// will succeed and none of them will block. +@test +test_unbuffered_try_send_chan_contention :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + start, start_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(start_alloc_err == nil, "allocation failed") + defer chan.destroy(start) + + trigger, trigger_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(trigger_alloc_err == nil, "allocation failed") + defer chan.destroy(trigger) + + results, results_alloc_err := chan.create(chan.Chan(int), 3, context.allocator) + assert(results_alloc_err == nil, "allocation failed") + defer chan.destroy(results) + + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + // There are no readers or writers, so calling recv or send would block! + testing.expect_value(t, chan.can_send(ch), false) + testing.expect_value(t, chan.can_recv(ch), false) + + // Non-blocking operations should not block, and should return false. + testing.expect_value(t, chan.try_send(ch, -1), false) + if v, ok := chan.try_recv(ch); ok { + testing.expect_value(t, ok, false) + testing.expect_value(t, v, 0) + } + + // Spinup several threads contending to send on an unbuffered channel. + contenders: [3]^thread.Thread + wait: sync.Wait_Group + + for ii in 0.. Date: Thu, 12 Jun 2025 15:41:48 -0300 Subject: [PATCH 4/6] core/sync/chan.send: return false if channel is closed while blocked This commit makes send behave the same as recv: that the call will return false if the channel is closed while a thread is waiting on the blocking operation. Prior logic would have send return true even if the channel was actually closed rather than read from. Docs adjusted to make this clear. Tests added to lock in this behaviour. --- core/sync/chan/chan.odin | 23 ++++++---- tests/core/sync/chan/test_core_sync_chan.odin | 44 +++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 1d91556b5..1f434f004 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -420,8 +420,8 @@ as_recv :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (r: Chan(T, Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full -until the channel is being read from. `send` will return -`false` when attempting to send on an already closed channel. +until the channel is being read from or the channel is closed. `send` will +return `false` when attempting to send on an already closed channel. **Inputs** - `c`: The channel @@ -492,8 +492,9 @@ try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty -until the channel is being written to. `recv` will return -`false` when attempting to receive a message on an already closed channel. +until the channel is being written to or the channel is closed. `recv` will +return `false` when attempting to receive a message on an already closed +channel. **Inputs** - `c`: The channel @@ -566,8 +567,8 @@ try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full -until the channel is being read from. `send_raw` will return -`false` when attempting to send on an already closed channel. +until the channel is being read from or the channel is closed. `send_raw` will +return `false` when attempting to send on an already closed channel. Note: The message referenced by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. @@ -633,6 +634,11 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { sync.signal(&c.r_cond) } sync.wait(&c.w_cond, &c.mutex) + + if c.closed { + return false + } + ok = true } return @@ -642,8 +648,9 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty -until the channel is being written to. `recv_raw` will return -`false` when attempting to receive a message on an already closed channel. +until the channel is being written to or the channel is closed. `recv_raw` +will return `false` when attempting to receive a message on an already closed +channel. Note: The location pointed to by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index ae7456d99..8267d6bef 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -228,6 +228,50 @@ test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { testing.expect(t, !chan.send(ch, 32)) } +// Ensures that if a thread is doing a blocking send and the channel +// is closed, it will report false to indicate a failure to complete. +@test +test_fail_blocking_send_on_close :: proc(t: ^testing.T) { + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + sender := thread.create_and_start_with_poly_data(ch, proc(ch: chan.Chan(int)) { + assert(!chan.send(ch, 42)) + }) + + for !chan.can_recv(ch) { + thread.yield() + } + + testing.expect(t, chan.close(ch)) + thread.join(sender) + thread.destroy(sender) +} + +// Ensures that if a thread is doing a blocking read and the channel +// is closed, it will report false to indicate a failure to complete. +@test +test_fail_blocking_recv_on_close :: proc(t: ^testing.T) { + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + reader := thread.create_and_start_with_poly_data(ch, proc(ch: chan.Chan(int)) { + v, ok := chan.recv(ch) + assert(!ok) + assert(v == 0) + }) + + for !chan.can_send(ch) { + thread.yield() + } + + testing.expect(t, chan.close(ch)) + thread.join(reader) + thread.destroy(reader) +} + // Ensures that try_send for unbuffered channels works as expected. // If 1 reader of a channel, and 3 try_senders, only one of the senders // will succeed and none of them will block. From 130b2dc36d6c573b07b0db0454b7e20e4c5ad9c4 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Fri, 13 Jun 2025 18:07:21 -0300 Subject: [PATCH 5/6] tests/core/sync/chan: test concurrent send/close/recv This test is designed to ensure that a call to send will always correctly report whether the value was transmitted. If recv wins, a close call should not be able to intercept the send thread. --- tests/core/sync/chan/test_core_sync_chan.odin | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 8267d6bef..fdac61550 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -619,3 +619,58 @@ test_try_select_raw_no_toctou :: proc(t: ^testing.T) { thread.join(thief) thread.destroy(thief) } + +// Ensures that a sender will always report correctly whether the value was received +// or not in the event of channel closure. +// +// 1. send thread does a blocking send +// 2. recv and close threads race +// 3. send returns false if close won and reports true if recv won +// +// We know if recv won by whether it sends us the original value on the results channel. +// This test is non-deterministic. +@test +test_send_close_read :: proc(t: ^testing.T) { + trigger, trigger_err := chan.create(chan.Chan(int), context.allocator) + assert(trigger_err == nil, "allocation failed") + defer chan.destroy(trigger) + + ch, alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + results, results_err := chan.create(chan.Chan(int), 1, context.allocator) + assert(results_err == nil, "allocation failed") + defer chan.destroy(results) + + receiver := thread.create_and_start_with_poly_data3(trigger, results, ch, proc(trigger, results, ch: chan.Chan(int)) { + _, _ = chan.recv(trigger) + v, _ := chan.recv(ch) + assert(chan.send(results, v)) + }) + + closer := thread.create_and_start_with_poly_data2(trigger, ch, proc(trigger, ch: chan.Chan(int)) { + _, _ = chan.recv(trigger) + ok := chan.close(ch) + assert(ok) + }) + + testing.expect(t, chan.close(trigger)) + + did_send := chan.send(ch, 42) + + v, ok := chan.recv(results) + testing.expect(t, ok) + + if v == 42 { + testing.expect(t, did_send) + } else { + testing.expect(t, !did_send) + } + + thread.join_multiple(receiver, closer) + thread.destroy(receiver) + thread.destroy(closer) +} + + From 17927729dd56574de1f547a9d34369bd4731fa41 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Fri, 13 Jun 2025 18:08:44 -0300 Subject: [PATCH 6/6] core/sync/chan: (unbuffered) ack reads This fixes an issue where a call to close could intercept the dance between send and recv, causing send to report incorrectly that a value was not transmitted (when it actually was). --- core/sync/chan/chan.odin | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 1f434f004..05312e5a2 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -83,6 +83,8 @@ Raw_Chan :: struct { r_waiting: int, // guarded by `mutex` w_waiting: int, // guarded by `mutex` + did_read: bool, // lets a sender know if the value was read + // Buffered queue: ^Raw_Queue, @@ -628,14 +630,20 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { return false } + c.did_read = false + defer c.did_read = false + mem.copy(c.unbuffered_data, msg_in, int(c.msg_size)) + c.w_waiting += 1 + if c.r_waiting > 0 { sync.signal(&c.r_cond) } + sync.wait(&c.w_cond, &c.mutex) - if c.closed { + if c.closed && !c.did_read { return false } @@ -713,8 +721,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - for !c.closed && - c.w_waiting == 0 { + for !c.closed && c.w_waiting == 0 { c.r_waiting += 1 sync.wait(&c.r_cond, &c.mutex) c.r_waiting -= 1 @@ -727,6 +734,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { mem.copy(msg_out, c.unbuffered_data, int(c.msg_size)) c.w_waiting -= 1 + c.did_read = true sync.signal(&c.w_cond) ok = true }