Merge pull request #4232 from Feoramund/test-sync

Add test suites for `core:sync` and `core:sync/chan`
This commit is contained in:
gingerBill
2024-09-16 17:27:20 +01:00
committed by GitHub
23 changed files with 1168 additions and 174 deletions
+1 -1
View File
@@ -920,7 +920,7 @@ get_page_size :: proc() -> int {
_processor_core_count :: proc() -> int {
count : int = 0
count_size := size_of(count)
if _sysctlbyname("hw.logicalcpu", &count, &count_size, nil, 0) == 0 {
if _sysctlbyname("hw.ncpu", &count, &count_size, nil, 0) == 0 {
if count > 0 {
return count
}
+1 -1
View File
@@ -978,7 +978,7 @@ get_page_size :: proc() -> int {
_processor_core_count :: proc() -> int {
count : int = 0
count_size := size_of(count)
if _sysctlbyname("hw.logicalcpu", &count, &count_size, nil, 0) == 0 {
if _sysctlbyname("hw.ncpu", &count, &count_size, nil, 0) == 0 {
if count > 0 {
return count
}
+42 -41
View File
@@ -22,19 +22,17 @@ Raw_Chan :: struct {
allocator: runtime.Allocator,
allocation_size: int,
msg_size: u16,
closed: b16, // atomic
closed: b16, // guarded by `mutex`
mutex: sync.Mutex,
r_cond: sync.Cond,
w_cond: sync.Cond,
r_waiting: int, // atomic
w_waiting: int, // atomic
r_waiting: int, // guarded by `mutex`
w_waiting: int, // guarded by `mutex`
// Buffered
queue: ^Raw_Queue,
// Unbuffered
r_mutex: sync.Mutex,
w_mutex: sync.Mutex,
unbuffered_data: rawptr,
}
@@ -164,27 +162,30 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
}
if c.queue != nil { // buffered
sync.guard(&c.mutex)
for c.queue.len == c.queue.cap {
sync.atomic_add(&c.w_waiting, 1)
for !c.closed && c.queue.len == c.queue.cap {
c.w_waiting += 1
sync.wait(&c.w_cond, &c.mutex)
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
}
if c.closed {
return false
}
ok = raw_queue_push(c.queue, msg_in)
if sync.atomic_load(&c.r_waiting) > 0 {
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.w_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
sync.atomic_add(&c.w_waiting, 1)
if sync.atomic_load(&c.r_waiting) > 0 {
c.w_waiting += 1
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
sync.wait(&c.w_cond, &c.mutex)
@@ -201,13 +202,13 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
if c.queue != nil { // buffered
sync.guard(&c.mutex)
for c.queue.len == 0 {
if sync.atomic_load(&c.closed) {
if c.closed {
return
}
sync.atomic_add(&c.r_waiting, 1)
c.r_waiting += 1
sync.wait(&c.r_cond, &c.mutex)
sync.atomic_sub(&c.r_waiting, 1)
c.r_waiting -= 1
}
msg := raw_queue_pop(c.queue)
@@ -215,27 +216,26 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
mem.copy(msg_out, msg, int(c.msg_size))
}
if sync.atomic_load(&c.w_waiting) > 0 {
if c.w_waiting > 0 {
sync.signal(&c.w_cond)
}
ok = true
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.r_mutex)
sync.guard(&c.mutex)
for !sync.atomic_load(&c.closed) &&
sync.atomic_load(&c.w_waiting) == 0 {
sync.atomic_add(&c.r_waiting, 1)
for !c.closed &&
c.w_waiting == 0 {
c.r_waiting += 1
sync.wait(&c.r_cond, &c.mutex)
sync.atomic_sub(&c.r_waiting, 1)
c.r_waiting -= 1
}
if sync.atomic_load(&c.closed) {
if c.closed {
return
}
mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
sync.signal(&c.w_cond)
ok = true
@@ -255,21 +255,24 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool)
return false
}
if c.closed {
return false
}
ok = raw_queue_push(c.queue, msg_in)
if sync.atomic_load(&c.r_waiting) > 0 {
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.w_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
sync.atomic_add(&c.w_waiting, 1)
if sync.atomic_load(&c.r_waiting) > 0 {
c.w_waiting += 1
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
sync.wait(&c.w_cond, &c.mutex)
@@ -294,21 +297,19 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
mem.copy(msg_out, msg, int(c.msg_size))
}
if sync.atomic_load(&c.w_waiting) > 0 {
if c.w_waiting > 0 {
sync.signal(&c.w_cond)
}
return true
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.r_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) ||
sync.atomic_load(&c.w_waiting) == 0 {
if c.closed || c.w_waiting == 0 {
return false
}
mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
sync.signal(&c.w_cond)
return true
@@ -351,10 +352,10 @@ close :: proc "contextless" (c: ^Raw_Chan) -> bool {
return false
}
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
sync.atomic_store(&c.closed, true)
c.closed = true
sync.broadcast(&c.r_cond)
sync.broadcast(&c.w_cond)
return true
@@ -366,7 +367,7 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
return true
}
sync.guard(&c.mutex)
return bool(sync.atomic_load(&c.closed))
return bool(c.closed)
}
@@ -423,9 +424,9 @@ raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) {
can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
sync.guard(&c.mutex)
if is_buffered(c) {
return len(c) > 0
return c.queue.len > 0
}
return sync.atomic_load(&c.w_waiting) > 0
return c.w_waiting > 0
}
@@ -435,7 +436,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
if is_buffered(c) {
return c.queue.len < c.queue.cap
}
return sync.atomic_load(&c.r_waiting) > 0
return c.w_waiting == 0
}
@@ -484,4 +485,4 @@ select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []
ok = send_raw(sends[sel.idx], send_msgs[sel.idx])
}
return
}
}
+38 -37
View File
@@ -8,7 +8,7 @@ _ :: vg
Wait group.
Wait group is a synchronization primitive used by the waiting thread to wait,
until a all working threads finish work.
until all working threads finish work.
The waiting thread first sets the number of working threads it will expect to
wait for using `wait_group_add` call, and start waiting using `wait_group_wait`
@@ -35,7 +35,7 @@ Wait_Group :: struct #no_copy {
/*
Increment an internal counter of a wait group.
This procedure atomicaly increments a number to the specified wait group's
This procedure atomically increments a number to the specified wait group's
internal counter by a specified amount. This operation can be done on any
thread.
*/
@@ -48,12 +48,12 @@ wait_group_add :: proc "contextless" (wg: ^Wait_Group, delta: int) {
atomic_add(&wg.counter, delta)
if wg.counter < 0 {
_panic("sync.Wait_Group negative counter")
panic_contextless("sync.Wait_Group negative counter")
}
if wg.counter == 0 {
cond_broadcast(&wg.cond)
if wg.counter != 0 {
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
}
}
}
@@ -81,7 +81,7 @@ wait_group_wait :: proc "contextless" (wg: ^Wait_Group) {
if wg.counter != 0 {
cond_wait(&wg.cond, &wg.mutex)
if wg.counter != 0 {
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
}
}
}
@@ -105,7 +105,7 @@ wait_group_wait_with_timeout :: proc "contextless" (wg: ^Wait_Group, duration: t
return false
}
if wg.counter != 0 {
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
}
}
return true
@@ -121,7 +121,7 @@ When `barrier_wait` procedure is called by any thread, that thread will block
the execution, until all threads associated with the barrier reach the same
point of execution and also call `barrier_wait`.
when barrier is initialized, a `thread_count` parameter is passed, signifying
When a barrier is initialized, a `thread_count` parameter is passed, signifying
the amount of participant threads of the barrier. The barrier also keeps track
of an internal atomic counter. When a thread calls `barrier_wait`, the internal
counter is incremented. When the internal counter reaches `thread_count`, it is
@@ -208,7 +208,7 @@ Represents a thread synchronization primitive that, when signalled, releases one
single waiting thread and then resets automatically to a state where it can be
signalled again.
When a thread calls `auto_reset_event_wait`, it's execution will be blocked,
When a thread calls `auto_reset_event_wait`, its execution will be blocked,
until the event is signalled by another thread. The call to
`auto_reset_event_signal` wakes up exactly one thread waiting for the event.
*/
@@ -228,15 +228,15 @@ thread.
*/
auto_reset_event_signal :: proc "contextless" (e: ^Auto_Reset_Event) {
old_status := atomic_load_explicit(&e.status, .Relaxed)
new_status := old_status + 1 if old_status < 1 else 1
for {
new_status := old_status + 1 if old_status < 1 else 1
if _, ok := atomic_compare_exchange_weak_explicit(&e.status, old_status, new_status, .Release, .Relaxed); ok {
break
}
if old_status < 0 {
sema_post(&e.sema)
}
cpu_relax()
}
if old_status < 0 {
sema_post(&e.sema)
}
}
@@ -297,7 +297,7 @@ waiting to acquire the lock, exactly one of those threads is unblocked and
allowed into the critical section.
*/
ticket_mutex_unlock :: #force_inline proc "contextless" (m: ^Ticket_Mutex) {
atomic_add_explicit(&m.serving, 1, .Relaxed)
atomic_add_explicit(&m.serving, 1, .Release)
}
/*
@@ -331,8 +331,8 @@ Benaphore.
A benaphore is a combination of an atomic variable and a semaphore that can
improve locking efficiency in a no-contention system. Acquiring a benaphore
lock doesn't call into an internal semaphore, if no other thread in a middle of
a critical section.
lock doesn't call into an internal semaphore, if no other thread is in the
middle of a critical section.
Once a lock on a benaphore is acquired by a thread, no other thread is allowed
into any critical sections, associted with the same benaphore, until the lock
@@ -355,7 +355,7 @@ from entering any critical sections associated with the same benaphore, until
until the lock is released.
*/
benaphore_lock :: proc "contextless" (b: ^Benaphore) {
if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 {
if atomic_add_explicit(&b.counter, 1, .Acquire) > 0 {
sema_wait(&b.sema)
}
}
@@ -381,10 +381,10 @@ Release a lock on a benaphore.
This procedure releases a lock on the specified benaphore. If any of the threads
are waiting on the lock, exactly one thread is allowed into a critical section
associated with the same banaphore.
associated with the same benaphore.
*/
benaphore_unlock :: proc "contextless" (b: ^Benaphore) {
if atomic_sub_explicit(&b.counter, 1, .Release) > 0 {
if atomic_sub_explicit(&b.counter, 1, .Release) > 1 {
sema_post(&b.sema)
}
}
@@ -418,8 +418,8 @@ benaphore_guard :: proc "contextless" (m: ^Benaphore) -> bool {
/*
Recursive benaphore.
Recurisve benaphore is just like a plain benaphore, except it allows reentrancy
into the critical section.
A recursive benaphore is just like a plain benaphore, except it allows
reentrancy into the critical section.
When a lock is acquired on a benaphore, all other threads attempting to
acquire a lock on the same benaphore will be blocked from any critical sections,
@@ -449,13 +449,15 @@ recursive benaphore, until the lock is released.
*/
recursive_benaphore_lock :: proc "contextless" (b: ^Recursive_Benaphore) {
tid := current_thread_id()
if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 {
if tid != b.owner {
sema_wait(&b.sema)
check_owner: if tid != atomic_load_explicit(&b.owner, .Acquire) {
atomic_add_explicit(&b.counter, 1, .Relaxed)
if _, ok := atomic_compare_exchange_strong_explicit(&b.owner, 0, tid, .Release, .Relaxed); ok {
break check_owner
}
sema_wait(&b.sema)
atomic_store_explicit(&b.owner, tid, .Release)
}
// inside the lock
b.owner = tid
b.recursion += 1
}
@@ -472,15 +474,14 @@ benaphore, until the lock is released.
*/
recursive_benaphore_try_lock :: proc "contextless" (b: ^Recursive_Benaphore) -> bool {
tid := current_thread_id()
if b.owner == tid {
atomic_add_explicit(&b.counter, 1, .Acquire)
}
if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 {
check_owner: if tid != atomic_load_explicit(&b.owner, .Acquire) {
if _, ok := atomic_compare_exchange_strong_explicit(&b.owner, 0, tid, .Release, .Relaxed); ok {
atomic_add_explicit(&b.counter, 1, .Relaxed)
break check_owner
}
return false
}
// inside the lock
b.owner = tid
b.recursion += 1
return true
}
@@ -494,14 +495,14 @@ for other threads for entering.
*/
recursive_benaphore_unlock :: proc "contextless" (b: ^Recursive_Benaphore) {
tid := current_thread_id()
_assert(tid == b.owner, "tid != b.owner")
assert_contextless(tid == atomic_load_explicit(&b.owner, .Relaxed), "tid != b.owner")
b.recursion -= 1
recursion := b.recursion
if recursion == 0 {
b.owner = 0
}
if atomic_sub_explicit(&b.counter, 1, .Release) > 0 {
if recursion == 0 {
if atomic_sub_explicit(&b.counter, 1, .Relaxed) == 1 {
atomic_store_explicit(&b.owner, 0, .Release)
} else {
sema_post(&b.sema)
}
}
@@ -740,4 +741,4 @@ Make event available.
one_shot_event_signal :: proc "contextless" (e: ^One_Shot_Event) {
atomic_store_explicit(&e.state, 1, .Release)
futex_broadcast(&e.state)
}
}
+6 -6
View File
@@ -48,7 +48,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati
case -ETIMEDOUT:
return false
case:
_panic("darwin.os_sync_wait_on_address_with_timeout failure")
panic_contextless("darwin.os_sync_wait_on_address_with_timeout failure")
}
} else {
@@ -63,7 +63,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati
case ETIMEDOUT:
return false
case:
_panic("futex_wait failure")
panic_contextless("futex_wait failure")
}
return true
@@ -83,7 +83,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) {
case -ENOENT:
return
case:
_panic("darwin.os_sync_wake_by_address_any failure")
panic_contextless("darwin.os_sync_wake_by_address_any failure")
}
}
} else {
@@ -99,7 +99,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) {
case ENOENT:
return
case:
_panic("futex_wake_single failure")
panic_contextless("futex_wake_single failure")
}
}
@@ -119,7 +119,7 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) {
case -ENOENT:
return
case:
_panic("darwin.os_sync_wake_by_address_all failure")
panic_contextless("darwin.os_sync_wake_by_address_all failure")
}
}
} else {
@@ -135,7 +135,7 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) {
case ENOENT:
return
case:
_panic("futex_wake_all failure")
panic_contextless("futex_wake_all failure")
}
}
+4 -4
View File
@@ -21,7 +21,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool {
continue
}
_panic("_futex_wait failure")
panic_contextless("_futex_wait failure")
}
unreachable()
@@ -44,14 +44,14 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati
return false
}
_panic("_futex_wait_with_timeout failure")
panic_contextless("_futex_wait_with_timeout failure")
}
_futex_signal :: proc "contextless" (f: ^Futex) {
errno := freebsd._umtx_op(f, .WAKE, 1, nil, nil)
if errno != nil {
_panic("_futex_signal failure")
panic_contextless("_futex_signal failure")
}
}
@@ -59,6 +59,6 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) {
errno := freebsd._umtx_op(f, .WAKE, cast(c.ulong)max(i32), nil, nil)
if errno != nil {
_panic("_futex_broadcast failure")
panic_contextless("_futex_broadcast failure")
}
}
+4 -4
View File
@@ -15,7 +15,7 @@ _futex_wait :: proc "contextless" (futex: ^Futex, expected: u32) -> bool {
return true
case:
// TODO(flysand): More descriptive panic messages based on the vlaue of `errno`
_panic("futex_wait failure")
panic_contextless("futex_wait failure")
}
}
@@ -34,7 +34,7 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du
case .NONE, .EINTR, .EAGAIN:
return true
case:
_panic("futex_wait_with_timeout failure")
panic_contextless("futex_wait_with_timeout failure")
}
}
@@ -44,7 +44,7 @@ _futex_signal :: proc "contextless" (futex: ^Futex) {
case .NONE:
return
case:
_panic("futex_wake_single failure")
panic_contextless("futex_wake_single failure")
}
}
@@ -57,6 +57,6 @@ _futex_broadcast :: proc "contextless" (futex: ^Futex) {
case .NONE:
return
case:
_panic("_futex_wake_all failure")
panic_contextless("_futex_wake_all failure")
}
}
+4 -4
View File
@@ -35,7 +35,7 @@ _futex_wait :: proc "contextless" (futex: ^Futex, expected: u32) -> bool {
case EINTR, EAGAIN:
return true
case:
_panic("futex_wait failure")
panic_contextless("futex_wait failure")
}
}
return true
@@ -55,7 +55,7 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du
case ETIMEDOUT:
return false
case:
_panic("futex_wait_with_timeout failure")
panic_contextless("futex_wait_with_timeout failure")
}
}
return true
@@ -63,12 +63,12 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du
_futex_signal :: proc "contextless" (futex: ^Futex) {
if _, ok := intrinsics.syscall_bsd(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, 1, 0, 0, 0); !ok {
_panic("futex_wake_single failure")
panic_contextless("futex_wake_single failure")
}
}
_futex_broadcast :: proc "contextless" (futex: ^Futex) {
if _, ok := intrinsics.syscall_bsd(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, uintptr(max(i32)), 0, 0, 0); !ok {
_panic("_futex_wake_all failure")
panic_contextless("_futex_wake_all failure")
}
}
+4 -4
View File
@@ -36,7 +36,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool {
return false
}
_panic("futex_wait failure")
panic_contextless("futex_wait failure")
}
_futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, duration: time.Duration) -> bool {
@@ -62,14 +62,14 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati
return false
}
_panic("futex_wait_with_timeout failure")
panic_contextless("futex_wait_with_timeout failure")
}
_futex_signal :: proc "contextless" (f: ^Futex) {
res := _unix_futex(f, FUTEX_WAKE_PRIVATE, 1, nil)
if res == -1 {
_panic("futex_wake_single failure")
panic_contextless("futex_wake_single failure")
}
}
@@ -77,6 +77,6 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) {
res := _unix_futex(f, FUTEX_WAKE_PRIVATE, u32(max(i32)), nil)
if res == -1 {
_panic("_futex_wake_all failure")
panic_contextless("_futex_wake_all failure")
}
}
+4 -4
View File
@@ -10,7 +10,7 @@ import "core:time"
_futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool {
when !intrinsics.has_target_feature("atomics") {
_panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
} else {
s := intrinsics.wasm_memory_atomic_wait32((^u32)(f), expected, -1)
return s != 0
@@ -19,7 +19,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool {
_futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, duration: time.Duration) -> bool {
when !intrinsics.has_target_feature("atomics") {
_panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
} else {
s := intrinsics.wasm_memory_atomic_wait32((^u32)(f), expected, i64(duration))
return s != 0
@@ -28,7 +28,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati
_futex_signal :: proc "contextless" (f: ^Futex) {
when !intrinsics.has_target_feature("atomics") {
_panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
} else {
loop: for {
s := intrinsics.wasm_memory_atomic_notify32((^u32)(f), 1)
@@ -41,7 +41,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) {
_futex_broadcast :: proc "contextless" (f: ^Futex) {
when !intrinsics.has_target_feature("atomics") {
_panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it")
} else {
loop: for {
s := intrinsics.wasm_memory_atomic_notify32((^u32)(f), ~u32(0))
+2 -18
View File
@@ -1,6 +1,5 @@
package sync
import "base:runtime"
import "core:time"
/*
@@ -390,7 +389,7 @@ recursive_mutex_guard :: proc "contextless" (m: ^Recursive_Mutex) -> bool {
A condition variable.
`Cond` implements a condition variable, a rendezvous point for threads waiting
for signalling the occurence of an event. Condition variables are used on
for signalling the occurence of an event. Condition variables are used in
conjuction with mutexes to provide a shared access to one or more shared
variable.
@@ -560,7 +559,7 @@ futex_wait :: proc "contextless" (f: ^Futex, expected: u32) {
return
}
ok := _futex_wait(f, expected)
_assert(ok, "futex_wait failure")
assert_contextless(ok, "futex_wait failure")
}
/*
@@ -597,18 +596,3 @@ Wake up multiple threads waiting on a futex.
futex_broadcast :: proc "contextless" (f: ^Futex) {
_futex_broadcast(f)
}
@(private)
_assert :: proc "contextless" (cond: bool, msg: string) {
if !cond {
_panic(msg)
}
}
@(private)
_panic :: proc "contextless" (msg: string) -> ! {
runtime.print_string(msg)
runtime.print_byte('\n')
runtime.trap()
}
+2 -2
View File
@@ -240,7 +240,7 @@ atomic_recursive_mutex_lock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) {
atomic_recursive_mutex_unlock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) {
tid := current_thread_id()
_assert(tid == m.owner, "tid != m.owner")
assert_contextless(tid == m.owner, "tid != m.owner")
m.recursion -= 1
recursion := m.recursion
if recursion == 0 {
@@ -361,7 +361,7 @@ atomic_sema_wait_with_timeout :: proc "contextless" (s: ^Atomic_Sema, duration:
if !futex_wait_with_timeout(&s.count, u32(original_count), remaining) {
return false
}
original_count = s.count
original_count = atomic_load_explicit(&s.count, .Relaxed)
}
if original_count == atomic_compare_exchange_strong_explicit(&s.count, original_count, original_count-1, .Acquire, .Acquire) {
return true
+11
View File
@@ -26,6 +26,8 @@ import "core:os"
@(private="file", thread_local)
local_test_index: libc.sig_atomic_t
@(private="file", thread_local)
local_test_index_set: bool
// Windows does not appear to have a SIGTRAP, so this is defined here, instead
// of in the libc package, just so there's no confusion about it being
@@ -45,6 +47,13 @@ stop_runner_callback :: proc "c" (sig: libc.int) {
@(private="file")
stop_test_callback :: proc "c" (sig: libc.int) {
if !local_test_index_set {
// We're a thread created by a test thread.
//
// There's nothing we can do to inform the test runner about who
// signalled, so hopefully the test will handle their own sub-threads.
return
}
if local_test_index == -1 {
// We're the test runner, and we ourselves have caught a signal from
// which there is no recovery.
@@ -114,6 +123,7 @@ This is a dire bug and should be reported to the Odin developers.
_setup_signal_handler :: proc() {
local_test_index = -1
local_test_index_set = true
// Catch user interrupt / CTRL-C.
libc.signal(libc.SIGINT, stop_runner_callback)
@@ -135,6 +145,7 @@ _setup_signal_handler :: proc() {
_setup_task_signal_handler :: proc(test_index: int) {
local_test_index = cast(libc.sig_atomic_t)test_index
local_test_index_set = true
}
_should_stop_runner :: proc() -> bool {
+6 -6
View File
@@ -272,7 +272,7 @@ create_and_start :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil,
t := create(thread_proc, priority)
t.data = rawptr(fn)
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
start(t)
@@ -307,7 +307,7 @@ create_and_start_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_co
t.user_index = 1
t.user_args[0] = data
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
start(t)
@@ -347,7 +347,7 @@ create_and_start_with_poly_data :: proc(data: $T, fn: proc(data: T), init_contex
mem.copy(&t.user_args[0], &data, size_of(T))
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
@@ -394,7 +394,7 @@ create_and_start_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2),
_ = copy(user_args[n:], mem.ptr_to_bytes(&arg2))
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
@@ -443,7 +443,7 @@ create_and_start_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: pr
_ = copy(user_args[n:], mem.ptr_to_bytes(&arg3))
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
@@ -494,7 +494,7 @@ create_and_start_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4:
_ = copy(user_args[n:], mem.ptr_to_bytes(&arg4))
if self_cleanup {
t.flags += {.Self_Cleanup}
intrinsics.atomic_or(&t.flags, {.Self_Cleanup})
}
t.init_context = init_context
+1
View File
@@ -60,6 +60,7 @@ pool_thread_runner :: proc(t: ^Thread) {
if task, ok := pool_pop_waiting(pool); ok {
data.task = task
pool_do_work(pool, task)
sync.guard(&pool.mutex)
data.task = {}
}
}
+8 -24
View File
@@ -5,18 +5,14 @@ package thread
import "base:runtime"
import "core:sync"
import "core:sys/unix"
import "core:time"
_IS_SUPPORTED :: true
CAS :: sync.atomic_compare_exchange_strong
// NOTE(tetra): Aligned here because of core/unix/pthread_linux.odin/pthread_t.
// Also see core/sys/darwin/mach_darwin.odin/semaphore_t.
Thread_Os_Specific :: struct #align(16) {
unix_thread: unix.pthread_t, // NOTE: very large on Darwin, small on Linux.
cond: sync.Cond,
mutex: sync.Mutex,
start_ok: sync.Sema,
}
//
// Creates a thread which will run the given procedure.
@@ -29,14 +25,10 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread {
// We need to give the thread a moment to start up before we enable cancellation.
can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_ENABLE, nil) == 0
sync.lock(&t.mutex)
t.id = sync.current_thread_id()
for (.Started not_in sync.atomic_load(&t.flags)) {
// HACK: use a timeout so in the event that the condition is signalled at THIS comment's exact point
// (after checking flags, before starting the wait) it gets itself out of that deadlock after a ms.
sync.wait_with_timeout(&t.cond, &t.mutex, time.Millisecond)
if .Started not_in sync.atomic_load(&t.flags) {
sync.wait(&t.start_ok)
}
if .Joined in sync.atomic_load(&t.flags) {
@@ -66,8 +58,6 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread {
sync.atomic_or(&t.flags, { .Done })
sync.unlock(&t.mutex)
if .Self_Cleanup in sync.atomic_load(&t.flags) {
res := unix.pthread_detach(t.unix_thread)
assert_contextless(res == 0)
@@ -132,7 +122,7 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread {
_start :: proc(t: ^Thread) {
sync.atomic_or(&t.flags, { .Started })
sync.signal(&t.cond)
sync.post(&t.start_ok)
}
_is_done :: proc(t: ^Thread) -> bool {
@@ -140,24 +130,18 @@ _is_done :: proc(t: ^Thread) -> bool {
}
_join :: proc(t: ^Thread) {
// sync.guard(&t.mutex)
if unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
return
}
// Preserve other flags besides `.Joined`, like `.Started`.
unjoined := sync.atomic_load(&t.flags) - {.Joined}
joined := unjoined + {.Joined}
// Try to set `t.flags` from unjoined to joined. If it returns joined,
// it means the previous value had that flag set and we can return.
if res, ok := CAS(&t.flags, unjoined, joined); res == joined && !ok {
// If the previous value was already `Joined`, then we can return.
if .Joined in sync.atomic_or(&t.flags, {.Joined}) {
return
}
// Prevent non-started threads from blocking main thread with initial wait
// condition.
if .Started not_in unjoined {
if .Started not_in sync.atomic_load(&t.flags) {
_start(t)
}
unix.pthread_join(t.unix_thread, nil)
+3 -3
View File
@@ -27,7 +27,7 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread {
__windows_thread_entry_proc :: proc "system" (t_: rawptr) -> win32.DWORD {
t := (^Thread)(t_)
if .Joined in t.flags {
if .Joined in sync.atomic_load(&t.flags) {
return 0
}
@@ -48,9 +48,9 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread {
t.procedure(t)
}
intrinsics.atomic_store(&t.flags, t.flags + {.Done})
intrinsics.atomic_or(&t.flags, {.Done})
if .Self_Cleanup in t.flags {
if .Self_Cleanup in sync.atomic_load(&t.flags) {
win32.CloseHandle(t.win32_thread)
t.win32_thread = win32.INVALID_HANDLE
// NOTE(ftphikari): It doesn't matter which context 'free' received, right?
+10 -2
View File
@@ -3195,11 +3195,11 @@ void gb_affinity_init(gbAffinity *a) {
a->core_count = 1;
a->threads_per_core = 1;
if (sysctlbyname("hw.logicalcpu", &count, &count_size, NULL, 0) == 0) {
if (sysctlbyname("kern.smp.cpus", &count, &count_size, NULL, 0) == 0) {
if (count > 0) {
a->thread_count = count;
// Get # of physical cores
if (sysctlbyname("hw.physicalcpu", &count, &count_size, NULL, 0) == 0) {
if (sysctlbyname("kern.smp.cores", &count, &count_size, NULL, 0) == 0) {
if (count > 0) {
a->core_count = count;
a->threads_per_core = a->thread_count / count;
@@ -3210,6 +3210,14 @@ void gb_affinity_init(gbAffinity *a) {
}
}
}
} else if (sysctlbyname("hw.ncpu", &count, &count_size, NULL, 0) == 0) {
// SMP disabled or unavailable.
if (count > 0) {
a->is_accurate = true;
a->thread_count = count;
a->core_count = count;
a->threads_per_core = 1;
}
}
}
+20 -13
View File
@@ -12,6 +12,26 @@ import "core:strings"
import "core:testing"
import "core:time/datetime"
Custom_Data :: struct {
a: int,
}
@(init)
init_custom_type_setter :: proc() {
// NOTE: This is done here so it can be out of the flow of the
// multi-threaded test runner, to prevent any data races that could be
// reported by using `-sanitize:thread`.
//
// Do mind that this means every test here acknowledges the `Custom_Data` type.
flags.register_type_setter(proc (data: rawptr, data_type: typeid, _, _: string) -> (string, bool, runtime.Allocator_Error) {
if data_type == Custom_Data {
(cast(^Custom_Data)data).a = 32
return "", true, nil
}
return "", false, nil
})
}
@(test)
test_no_args :: proc(t: ^testing.T) {
S :: struct {
@@ -1230,9 +1250,6 @@ test_net :: proc(t: ^testing.T) {
@(test)
test_custom_type_setter :: proc(t: ^testing.T) {
Custom_Bool :: distinct bool
Custom_Data :: struct {
a: int,
}
S :: struct {
a: Custom_Data,
@@ -1240,16 +1257,6 @@ test_custom_type_setter :: proc(t: ^testing.T) {
}
s: S
// NOTE: Mind that this setter is global state, and the test runner is multi-threaded.
// It should be fine so long as all type setter tests are in this one test proc.
flags.register_type_setter(proc (data: rawptr, data_type: typeid, _, _: string) -> (string, bool, runtime.Allocator_Error) {
if data_type == Custom_Data {
(cast(^Custom_Data)data).a = 32
return "", true, nil
}
return "", false, nil
})
defer flags.register_type_setter(nil)
args := [?]string { "-a:hellope", "-b:true" }
result := flags.parse(&s, args[:])
testing.expect_value(t, result, nil)
+2
View File
@@ -39,6 +39,8 @@ download_assets :: proc() {
@(require) import "slice"
@(require) import "strconv"
@(require) import "strings"
@(require) import "sync"
@(require) import "sync/chan"
@(require) import "sys/posix"
@(require) import "sys/windows"
@(require) import "text/i18n"
@@ -0,0 +1,274 @@
package test_core_sync_chan
import "base:runtime"
import "base:intrinsics"
import "core:log"
import "core:math/rand"
import "core:sync/chan"
import "core:testing"
import "core:thread"
import "core:time"
Message_Type :: enum i32 {
Result,
Add,
Multiply,
Subtract,
Divide,
End,
}
Message :: struct {
type: Message_Type,
i: i64,
}
Comm :: struct {
host: chan.Chan(Message),
client: chan.Chan(Message),
manual_buffering: bool,
}
BUFFER_SIZE :: 8
MAX_RAND :: 32
FAIL_TIME :: 1 * time.Second
SLEEP_TIME :: 1 * time.Millisecond
comm_client :: proc(th: ^thread.Thread) {
data := cast(^Comm)th.data
manual_buffering := data.manual_buffering
n: i64
for manual_buffering && !chan.can_recv(data.host) {
thread.yield()
}
recv_loop: for msg in chan.recv(data.host) {
#partial switch msg.type {
case .Add: n += msg.i
case .Multiply: n *= msg.i
case .Subtract: n -= msg.i
case .Divide: n /= msg.i
case .End:
break recv_loop
case:
panic("Unknown message type for client.")
}
for manual_buffering && !chan.can_recv(data.host) {
thread.yield()
}
}
for manual_buffering && !chan.can_send(data.host) {
thread.yield()
}
chan.send(data.client, Message{.Result, n})
chan.close(data.client)
}
send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) {
expected = 1
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
chan.send(host, Message{.Add, 1})
log.debug(Message{.Add, 1})
for _ in 0..<1+2*BUFFER_SIZE {
msg: Message
msg.i = 1 + rand.int63_max(MAX_RAND)
switch rand.int_max(4) {
case 0:
msg.type = .Add
expected += msg.i
case 1:
msg.type = .Multiply
expected *= msg.i
case 2:
msg.type = .Subtract
expected -= msg.i
case 3:
msg.type = .Divide
expected /= msg.i
}
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
if manual_buffering {
testing.expect(t, chan.len(host) == 0)
}
chan.send(host, msg)
log.debug(msg)
}
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
chan.send(host, Message{.End, 0})
log.debug(Message{.End, 0})
chan.close(host)
return
}
@test
test_chan_buffered :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
comm: Comm
alloc_err: runtime.Allocator_Error
comm.host, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator)
assert(alloc_err == nil, "allocation failed")
comm.client, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator)
assert(alloc_err == nil, "allocation failed")
defer {
chan.destroy(comm.host)
chan.destroy(comm.client)
}
testing.expect(t, chan.is_buffered(comm.host))
testing.expect(t, chan.is_buffered(comm.client))
testing.expect(t, !chan.is_unbuffered(comm.host))
testing.expect(t, !chan.is_unbuffered(comm.client))
testing.expect_value(t, chan.len(comm.host), 0)
testing.expect_value(t, chan.len(comm.client), 0)
testing.expect_value(t, chan.cap(comm.host), BUFFER_SIZE)
testing.expect_value(t, chan.cap(comm.client), BUFFER_SIZE)
reckoner := thread.create(comm_client)
defer thread.destroy(reckoner)
reckoner.data = &comm
thread.start(reckoner)
expected := send_messages(t, comm.host, manual_buffering = false)
// Sleep so we can give the other thread enough time to buffer its message.
time.sleep(SLEEP_TIME)
testing.expect_value(t, chan.len(comm.client), 1)
result, ok := chan.try_recv(comm.client)
// One more sleep to ensure it has enough time to close.
time.sleep(SLEEP_TIME)
testing.expect_value(t, chan.is_closed(comm.client), true)
testing.expect_value(t, ok, true)
testing.expect_value(t, result.i, expected)
log.debug(result, expected)
// Make sure sending to closed channels fails.
testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false)
testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false)
testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false)
testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false)
_, ok = chan.recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.recv(comm.client); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false)
}
@test
test_chan_unbuffered :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
comm: Comm
comm.manual_buffering = true
alloc_err: runtime.Allocator_Error
comm.host, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator)
assert(alloc_err == nil, "allocation failed")
comm.client, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator)
assert(alloc_err == nil, "allocation failed")
defer {
chan.destroy(comm.host)
chan.destroy(comm.client)
}
testing.expect(t, !chan.is_buffered(comm.host))
testing.expect(t, !chan.is_buffered(comm.client))
testing.expect(t, chan.is_unbuffered(comm.host))
testing.expect(t, chan.is_unbuffered(comm.client))
testing.expect_value(t, chan.len(comm.host), 0)
testing.expect_value(t, chan.len(comm.client), 0)
testing.expect_value(t, chan.cap(comm.host), 0)
testing.expect_value(t, chan.cap(comm.client), 0)
reckoner := thread.create(comm_client)
defer thread.destroy(reckoner)
reckoner.data = &comm
thread.start(reckoner)
for !chan.can_send(comm.client) {
thread.yield()
}
expected := send_messages(t, comm.host)
testing.expect_value(t, chan.is_closed(comm.host), true)
for !chan.can_recv(comm.client) {
thread.yield()
}
result, ok := chan.try_recv(comm.client)
testing.expect_value(t, ok, true)
testing.expect_value(t, result.i, expected)
log.debug(result, expected)
// Sleep so we can give the other thread enough time to close its side
// after we've received its message.
time.sleep(SLEEP_TIME)
testing.expect_value(t, chan.is_closed(comm.client), true)
// Make sure sending and receiving on closed channels fails.
testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false)
testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false)
testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false)
testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false)
_, ok = chan.recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.recv(comm.client); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false)
}
@test
test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
ch, alloc_err := chan.create_buffered(chan.Chan(int), 1, context.allocator)
assert(alloc_err == nil, "allocation failed")
defer chan.destroy(ch)
testing.expect(t, chan.can_send(ch))
testing.expect(t, chan.send(ch, 32))
testing.expect(t, chan.close(ch))
testing.expect(t, !chan.send(ch, 32))
}
// This test guarantees a buffered channel's messages can still be received
// even after closing. This is currently how the API works. If that changes,
// this test will need to change.
@test
test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
ch, alloc_err := chan.create_buffered(chan.Chan(int), 2, context.allocator)
assert(alloc_err == nil, "allocation failed")
defer chan.destroy(ch)
testing.expect(t, chan.can_send(ch))
testing.expect(t, chan.send(ch, 32))
testing.expect(t, chan.send(ch, 64))
testing.expect(t, chan.close(ch))
result, ok := chan.recv(ch)
testing.expect_value(t, result, 32)
testing.expect(t, ok)
result, ok = chan.try_recv(ch)
testing.expect_value(t, result, 64)
testing.expect(t, ok)
}
+718
View File
@@ -0,0 +1,718 @@
// NOTE(Feoramund): These tests should be run a few hundred times, with and
// without `-sanitize:thread` enabled, to ensure maximum safety.
//
// Keep in mind that running with the debug logs uncommented can result in
// failures disappearing due to the delay of sending the log message causing
// different synchronization patterns.
//
// These tests are temporarily disabled on Darwin, as there is currently a
// stall occurring which I cannot debug.
//+build !darwin
package test_core_sync
import "base:intrinsics"
// import "core:log"
import "core:sync"
import "core:testing"
import "core:thread"
import "core:time"
FAIL_TIME :: 1 * time.Second
SLEEP_TIME :: 1 * time.Millisecond
SMALL_SLEEP_TIME :: 10 * time.Microsecond
// This needs to be high enough to cause a data race if any of the
// synchronization primitives fail.
THREADS :: 8
// Manually wait on all threads to finish.
//
// This reduces a dependency on a `Wait_Group` or similar primitives.
//
// It's also important that we wait for every thread to finish, as it's
// possible for a thread to finish after the test if we don't check, despite
// joining it to the test thread.
wait_for :: proc(threads: []^thread.Thread) {
wait_loop: for {
count := len(threads)
for v in threads {
if thread.is_done(v) {
count -= 1
}
}
if count == 0 {
break wait_loop
}
thread.yield()
}
for t in threads {
thread.join(t)
thread.destroy(t)
}
}
//
// core:sync/primitives.odin
//
@test
test_mutex :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
m: sync.Mutex,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("MUTEX-%v> locking", th.id)
sync.mutex_lock(&data.m)
data.number += 1
// log.debugf("MUTEX-%v> unlocking", th.id)
sync.mutex_unlock(&data.m)
// log.debugf("MUTEX-%v> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
testing.expect_value(t, data.number, THREADS)
}
@test
test_rw_mutex :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
m1: sync.RW_Mutex,
m2: sync.RW_Mutex,
number1: int,
number2: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
sync.rw_mutex_shared_lock(&data.m1)
n := data.number1
sync.rw_mutex_shared_unlock(&data.m1)
sync.rw_mutex_lock(&data.m2)
data.number2 += n
sync.rw_mutex_unlock(&data.m2)
}
data: Data
threads: [THREADS]^thread.Thread
sync.rw_mutex_lock(&data.m1)
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
data.number1 = 1
sync.rw_mutex_unlock(&data.m1)
wait_for(threads[:])
testing.expect_value(t, data.number2, THREADS)
}
@test
test_recursive_mutex :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
m: sync.Recursive_Mutex,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("REC_MUTEX-%v> locking", th.id)
tried1 := sync.recursive_mutex_try_lock(&data.m)
for _ in 0..<3 {
sync.recursive_mutex_lock(&data.m)
}
tried2 := sync.recursive_mutex_try_lock(&data.m)
// log.debugf("REC_MUTEX-%v> locked", th.id)
data.number += 1
// log.debugf("REC_MUTEX-%v> unlocking", th.id)
for _ in 0..<3 {
sync.recursive_mutex_unlock(&data.m)
}
if tried1 { sync.recursive_mutex_unlock(&data.m) }
if tried2 { sync.recursive_mutex_unlock(&data.m) }
// log.debugf("REC_MUTEX-%v> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
testing.expect_value(t, data.number, THREADS)
}
@test
test_cond :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
c: sync.Cond,
m: sync.Mutex,
i: int,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
sync.mutex_lock(&data.m)
for intrinsics.atomic_load(&data.i) != 1 {
sync.cond_wait(&data.c, &data.m)
}
data.number += intrinsics.atomic_load(&data.i)
sync.mutex_unlock(&data.m)
}
data: Data
threads: [THREADS]^thread.Thread
data.i = -1
sync.mutex_lock(&data.m)
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
time.sleep(SLEEP_TIME)
data.i = 1
sync.mutex_unlock(&data.m)
sync.cond_broadcast(&data.c)
wait_for(threads[:])
testing.expect_value(t, data.number, THREADS)
}
@test
test_cond_with_timeout :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
c: sync.Cond
m: sync.Mutex
sync.mutex_lock(&m)
sync.cond_wait_with_timeout(&c, &m, SLEEP_TIME)
}
@test
test_semaphore :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
s: sync.Sema,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("SEM-%v> waiting", th.id)
sync.sema_wait(&data.s)
data.number += 1
// log.debugf("SEM-%v> posting", th.id)
sync.sema_post(&data.s)
// log.debugf("SEM-%v> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
sync.sema_post(&data.s)
wait_for(threads[:])
testing.expect_value(t, data.number, THREADS)
}
@test
test_semaphore_with_timeout :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
s: sync.Sema
sync.sema_wait_with_timeout(&s, SLEEP_TIME)
}
@test
test_futex :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
f: sync.Futex,
i: int,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("FUTEX-%v> waiting", th.id)
sync.futex_wait(&data.f, 3)
// log.debugf("FUTEX-%v> done", th.id)
n := data.i
intrinsics.atomic_add(&data.number, n)
}
data: Data
data.i = -1
data.f = 3
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
data.i = 1
// Change the futex variable to keep late-starters from stalling.
data.f = 0
sync.futex_broadcast(&data.f)
wait_for(threads[:])
testing.expect_value(t, data.number, THREADS)
}
@test
test_futex_with_timeout :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
f: sync.Futex = 1
sync.futex_wait_with_timeout(&f, 1, SLEEP_TIME)
}
//
// core:sync/extended.odin
//
@test
test_wait_group :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
step1: sync.Wait_Group,
step2: sync.Wait_Group,
i: int,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
sync.wait_group_wait(&data.step1)
n := data.i
intrinsics.atomic_add(&data.number, n)
sync.wait_group_done(&data.step2)
}
data: Data
data.i = -1
threads: [THREADS]^thread.Thread
sync.wait_group_add(&data.step1, 1)
sync.wait_group_add(&data.step2, THREADS)
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
time.sleep(SMALL_SLEEP_TIME)
data.i = 1
sync.wait_group_done(&data.step1)
sync.wait_group_wait(&data.step2)
wait_for(threads[:])
testing.expect_value(t, data.step1.counter, 0)
testing.expect_value(t, data.step2.counter, 0)
testing.expect_value(t, data.number, THREADS)
}
@test
test_wait_group_with_timeout :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
wg: sync.Wait_Group
sync.wait_group_wait_with_timeout(&wg, SLEEP_TIME)
}
@test
test_barrier :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
b: sync.Barrier,
i: int,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
sync.barrier_wait(&data.b)
intrinsics.atomic_add(&data.number, data.i)
}
data: Data
data.i = -1
threads: [THREADS]^thread.Thread
sync.barrier_init(&data.b, THREADS + 1) // +1 for this thread, of course.
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
time.sleep(SMALL_SLEEP_TIME)
data.i = 1
sync.barrier_wait(&data.b)
wait_for(threads[:])
testing.expect_value(t, data.b.index, 0)
testing.expect_value(t, data.b.generation_id, 1)
testing.expect_value(t, data.b.thread_count, THREADS + 1)
testing.expect_value(t, data.number, THREADS)
}
@test
test_auto_reset :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
a: sync.Auto_Reset_Event,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("AUR-%v> entering", th.id)
sync.auto_reset_event_wait(&data.a)
// log.debugf("AUR-%v> adding", th.id)
data.number += 1
// log.debugf("AUR-%v> signalling", th.id)
sync.auto_reset_event_signal(&data.a)
// log.debugf("AUR-%v> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
// There is a chance that this test can stall if a signal is sent before
// all threads are queued, because it's possible for some number of threads
// to get to the waiting state, the signal to fire, all of the waited
// threads to pass successfully, then the other threads come in with no-one
// to run a signal.
//
// So we'll just test a fully-waited queue of cascading threads.
for {
status := intrinsics.atomic_load(&data.a.status)
if status == -THREADS {
// log.debug("All Auto_Reset_Event threads have queued.")
break
}
intrinsics.cpu_relax()
}
sync.auto_reset_event_signal(&data.a)
wait_for(threads[:])
// The last thread should leave this primitive in a signalled state.
testing.expect_value(t, data.a.status, 1)
testing.expect_value(t, data.number, THREADS)
}
@test
test_auto_reset_already_signalled :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
a: sync.Auto_Reset_Event
sync.auto_reset_event_signal(&a)
sync.auto_reset_event_wait(&a)
testing.expect_value(t, a.status, 0)
}
@test
test_ticket_mutex :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
m: sync.Ticket_Mutex,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("TIC-%i> entering", th.id)
// intrinsics.debug_trap()
sync.ticket_mutex_lock(&data.m)
// log.debugf("TIC-%i> locked", th.id)
data.number += 1
// log.debugf("TIC-%i> unlocking", th.id)
sync.ticket_mutex_unlock(&data.m)
// log.debugf("TIC-%i> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
testing.expect_value(t, data.m.ticket, THREADS)
testing.expect_value(t, data.m.serving, THREADS)
testing.expect_value(t, data.number, THREADS)
}
@test
test_benaphore :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
b: sync.Benaphore,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
sync.benaphore_lock(&data.b)
data.number += 1
sync.benaphore_unlock(&data.b)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
testing.expect_value(t, data.b.counter, 0)
testing.expect_value(t, data.number, THREADS)
}
@test
test_recursive_benaphore :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
b: sync.Recursive_Benaphore,
number: int,
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("REC_BEP-%i> entering", th.id)
tried1 := sync.recursive_benaphore_try_lock(&data.b)
for _ in 0..<3 {
sync.recursive_benaphore_lock(&data.b)
}
tried2 := sync.recursive_benaphore_try_lock(&data.b)
// log.debugf("REC_BEP-%i> locked", th.id)
data.number += 1
for _ in 0..<3 {
sync.recursive_benaphore_unlock(&data.b)
}
if tried1 { sync.recursive_benaphore_unlock(&data.b) }
if tried2 { sync.recursive_benaphore_unlock(&data.b) }
// log.debugf("REC_BEP-%i> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
// The benaphore should be unowned at the end.
testing.expect_value(t, data.b.counter, 0)
testing.expect_value(t, data.b.owner, 0)
testing.expect_value(t, data.b.recursion, 0)
testing.expect_value(t, data.number, THREADS)
}
@test
test_once :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
once: sync.Once,
number: int,
}
write :: proc "contextless" (data: rawptr) {
data := cast(^Data)data
data.number += 1
}
p :: proc(th: ^thread.Thread) {
data := cast(^Data)th.data
// log.debugf("ONCE-%v> entering", th.id)
sync.once_do_with_data_contextless(&data.once, write, data)
// log.debugf("ONCE-%v> leaving", th.id)
}
data: Data
threads: [THREADS]^thread.Thread
for &v in threads {
v = thread.create(p)
v.data = &data
v.init_context = context
thread.start(v)
}
wait_for(threads[:])
testing.expect_value(t, data.once.done, true)
testing.expect_value(t, data.number, 1)
}
@test
test_park :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
car: sync.Parker,
number: int,
}
data: Data
th := thread.create_and_start_with_data(&data, proc(data: rawptr) {
data := cast(^Data)data
time.sleep(SLEEP_TIME)
sync.unpark(&data.car)
data.number += 1
})
sync.park(&data.car)
wait_for([]^thread.Thread{ th })
PARKER_EMPTY :: 0
testing.expect_value(t, data.car.state, PARKER_EMPTY)
testing.expect_value(t, data.number, 1)
}
@test
test_park_with_timeout :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
car: sync.Parker
sync.park_with_timeout(&car, SLEEP_TIME)
}
@test
test_one_shot_event :: proc(t: ^testing.T) {
testing.set_fail_timeout(t, FAIL_TIME)
Data :: struct {
event: sync.One_Shot_Event,
number: int,
}
data: Data
th := thread.create_and_start_with_data(&data, proc(data: rawptr) {
data := cast(^Data)data
time.sleep(SLEEP_TIME)
sync.one_shot_event_signal(&data.event)
data.number += 1
})
sync.one_shot_event_wait(&data.event)
wait_for([]^thread.Thread{ th })
testing.expect_value(t, data.event.state, 1)
testing.expect_value(t, data.number, 1)
}
+3
View File
@@ -63,6 +63,9 @@ execute_struct_checks :: proc(t: ^testing.T) {
waiting: for {
status: i32
wpid := posix.waitpid(pid, &status, {})
if status == posix.EINTR {
continue
}
if !testing.expectf(t, wpid != -1, "waitpid() failure: %v", posix.strerror()) {
return false
}