From 2ef0e6b8f618a90ada3805cac859d9fe1a77b0a8 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sat, 9 Oct 2021 16:33:28 +0100 Subject: [PATCH] Update `core:sync/sync2` to have a generic `Futex` interface, and implement the calls appropriately for each platform --- core/sync/sync2/extended.odin | 21 +-- core/sync/sync2/futex_darwin.odin | 83 ++++++++++ core/sync/sync2/futex_linux.odin | 98 ++++++++++++ core/sync/sync2/futex_windows.odin | 41 +++++ core/sync/sync2/primitives.odin | 61 ++++++-- core/sync/sync2/primitives_atomic.odin | 19 +-- core/sync/sync2/primitives_darwin.odin | 106 +------------ core/sync/sync2/primitives_internal.odin | 184 +++++++++++++++++++++++ core/sync/sync2/primitives_linux.odin | 9 +- core/sync/sync2/primitives_pthreads.odin | 181 ---------------------- core/sync/sync2/primitives_windows.odin | 85 ----------- 11 files changed, 486 insertions(+), 402 deletions(-) create mode 100644 core/sync/sync2/futex_darwin.odin create mode 100644 core/sync/sync2/futex_linux.odin create mode 100644 core/sync/sync2/futex_windows.odin create mode 100644 core/sync/sync2/primitives_internal.odin diff --git a/core/sync/sync2/extended.odin b/core/sync/sync2/extended.odin index 8f99fc0e3..e52da3b22 100644 --- a/core/sync/sync2/extended.odin +++ b/core/sync/sync2/extended.odin @@ -251,17 +251,18 @@ Once :: struct { } once_do :: proc(o: ^Once, fn: proc()) { - if atomic_load_acquire(&o.done) == false { - _once_do_slow(o, fn) + @(cold) + do_slow :: proc(o: ^Once, fn: proc()) { + mutex_lock(&o.m) + defer mutex_unlock(&o.m) + if !o.done { + fn() + atomic_store_release(&o.done, true) + } } -} -@(cold) -_once_do_slow :: proc(o: ^Once, fn: proc()) { - mutex_lock(&o.m) - defer mutex_unlock(&o.m) - if !o.done { - fn() - atomic_store_release(&o.done, true) + + if atomic_load_acquire(&o.done) == false { + do_slow(o, fn) } } diff --git a/core/sync/sync2/futex_darwin.odin b/core/sync/sync2/futex_darwin.odin new file mode 100644 index 000000000..3802d8a49 --- /dev/null +++ b/core/sync/sync2/futex_darwin.odin @@ -0,0 +1,83 @@ +//+private +//+build darwin +package sync2 + +import "core:c" +import "core:time" + +foreign import System "System.framework" + +foreign System { + __ulock_wait :: proc "c" (operation: u32, addr: rawptr, value: u64, timeout_ms: u32) -> c.int --- + __ulock_wait2 :: proc "c" (operation: u32, addr: rawptr, value: u64, timeout_ns: u64, value2: u64) -> c.int --- + __ulock_wake :: proc "c" (operation: u32, addr: rawptr, wake_value: u64) -> c.int --- +} + + +UL_COMPARE_AND_WAIT :: 1 +ULF_WAKE_ALL :: 0x00000100 +ULF_NO_ERRNO :: 0x01000000 + +ENOENT :: -2 +EINTR :: -4 +EFAULT :: -14 +ETIMEDOUT :: -60 + +_futex_wait :: proc(f: ^Futex, expected: u32) -> Futex_Error { + return _futex_wait_with_timeout(f, expected, 0) +} + +_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error { + timeout_ns := u64(duration) + timeout_overflowed := false + + s := __ulock_wait2(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, u64(expected), timeout_ns, 0) + if s >= 0 { + return nil + } + switch s { + case EINTR, EFAULT: + return nil + case ETIMEDOUT: + return .Timed_Out + case: + panic("futex_wait failure") + } + return nil + +} + +_futex_wake_single :: proc(f: ^Futex) { + loop: for { + s := __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0) + if s >= 0 { + return + } + switch s { + case EINTR, EFAULT: + continue loop + case ENOENT: + return + case: + panic("futex_wake_single failure") + } + } +} + +_futex_wake_all :: proc(f: ^Futex) { + loop: for { + s := __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0) + if s >= 0 { + return + } + switch s { + case EINTR, EFAULT: + continue loop + case ENOENT: + return + case: + panic("futex_wake_all failure") + } + } +} + diff --git a/core/sync/sync2/futex_linux.odin b/core/sync/sync2/futex_linux.odin new file mode 100644 index 000000000..44162172a --- /dev/null +++ b/core/sync/sync2/futex_linux.odin @@ -0,0 +1,98 @@ +//+private +//+build linux +package sync2 + +import "core:c" +import "core:time" +import "core:intrinsics" + +FUTEX_WAIT :: 0 +FUTEX_WAKE :: 1 +FUTEX_PRIVATE_FLAG :: 128 + +FUTEX_WAIT_PRIVATE :: (FUTEX_WAIT | FUTEX_PRIVATE_FLAG) +FUTEX_WAKE_PRIVATE :: (FUTEX_WAKE | FUTEX_PRIVATE_FLAG) + +foreign import libc "system:c" + +foreign libc { + __errno_location :: proc "c" () -> ^c.int --- +} + +ESUCCESS :: 0 +EINTR :: -4 +EAGAIN :: -11 +EFAULT :: -14 +EINVAL :: -22 +ETIMEDOUT :: -110 + +get_errno :: proc(r: int) -> int { + if -4096 < r && r < 0 { + return r + } + return 0 +} + +internal_futex :: proc(f: ^Futex, op: uintptr, val: u32, timeout: rawptr) -> int { + code := int(intrinsics.syscall(202, uintptr(f), uintptr(op), uintptr(val), uintptr(timeout), 0, 0)) + return get_errno(code) +} + + +_futex_wait :: proc(f: ^Futex, expected: u32) -> Futex_Error { + err := internal_futex(f, FUTEX_WAIT_PRIVATE | FUTEX_WAIT, expected, nil) + switch err { + case ESUCCESS, EINTR, EAGAIN, EINVAL: + // okay + case ETIMEDOUT: + return .Timed_Out + case EFAULT: + fallthrough + case: + panic("futex_wait failure") + } + return nil +} + +_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error { + timeout: struct { + tv_sec: c.long, + tv_nsec: c.long, + } + + timeout.tv_sec = (c.long)(duration/1e9) + timeout.tv_nsec = (c.long)(duration%1e9) + + err := internal_futex(f, FUTEX_WAIT_PRIVATE | FUTEX_WAIT, expected, &timeout) + switch err { + case ESUCCESS, EINTR, EAGAIN, EINVAL: + // okay + case ETIMEDOUT: + return .Timed_Out + case EFAULT: + fallthrough + case: + panic("futex_wait_with_timeout failure") + } + return nil +} + + +_futex_wake_single :: proc(f: ^Futex) { + err := internal_futex(f, FUTEX_WAKE_PRIVATE | FUTEX_WAKE, 1, nil) + switch err { + case ESUCCESS, EINVAL, EFAULT: + // okay + case: + panic("futex_wake_single failure") + } +} +_futex_wake_all :: proc(f: ^Futex) { + err := internal_futex(f, FUTEX_WAKE_PRIVATE | FUTEX_WAKE, u32(max(i32)), nil) + switch err { + case ESUCCESS, EINVAL, EFAULT: + // okay + case: + panic("_futex_wake_all failure") + } +} diff --git a/core/sync/sync2/futex_windows.odin b/core/sync/sync2/futex_windows.odin new file mode 100644 index 000000000..da9e7c134 --- /dev/null +++ b/core/sync/sync2/futex_windows.odin @@ -0,0 +1,41 @@ +//+private +//+build windows +package sync2 + +import "core:time" + +foreign import Synchronization "system:Synchronization.lib" + +@(default_calling_convention="c") +foreign Synchronization { + WaitOnAddress :: proc(Address: rawptr, CompareAddress: rawptr, AddressSize: uint, dwMilliseconds: u32) -> b32 --- + WakeByAddressSingle :: proc(Address: rawptr) --- + WakeByAddressAll :: proc(Address: rawptr) --- +} + +_futex_wait :: proc(f: ^Futex, expect: u32) -> Futex_Error { + expect := expect + ms :: ~u32(0) // infinite + ok := WaitOnAddress(f, &expect, size_of(expect), ms) + return nil if ok else .Timed_Out +} + +_futex_wait_with_timeout :: proc(f: ^Futex, expect: u32, duration: time.Duration) -> Futex_Error { + expect := expect + + ms: u32 = 0 + if duration >= 0 { + ms = u32(u64(duration)/1e6) + } + + ok := WaitOnAddress(f, &expect, size_of(expect), ms) + return nil if ok else .Timed_Out +} + +_futex_wake_single :: proc(f: ^Futex) { + WakeByAddressSingle(f) +} + +_futex_wake_all :: proc(f: ^Futex) { + WakeByAddressAll(f) +} \ No newline at end of file diff --git a/core/sync/sync2/primitives.odin b/core/sync/sync2/primitives.odin index 7d7115274..5ea17af6b 100644 --- a/core/sync/sync2/primitives.odin +++ b/core/sync/sync2/primitives.odin @@ -153,10 +153,6 @@ cond_wait :: proc(c: ^Cond, m: ^Mutex) { _cond_wait(c, m) } -cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { - return _cond_wait_with_timeout(c, m, timeout) -} - cond_signal :: proc(c: ^Cond) { _cond_signal(c) } @@ -166,20 +162,67 @@ cond_broadcast :: proc(c: ^Cond) { } - // When waited upon, blocks until the internal count is greater than zero, then subtracts one. // Posting to the semaphore increases the count by one, or the provided amount. // // A Sema must not be copied after first use Sema :: struct { - impl: _Sema, + count: Futex, } - sema_wait :: proc(s: ^Sema) { - _sema_wait(s) + for { + original_count := atomic_load(&s.count) + for original_count == 0 { + futex_wait(&s.count, u32(original_count)) + original_count = s.count + } + if original_count == atomic_compare_exchange_strong(&s.count, original_count-1, original_count) { + return + } + } } sema_post :: proc(s: ^Sema, count := 1) { - _sema_post(s, count) + atomic_add(&s.count, Futex(count)) + if count == 1 { + futex_wake_single(&s.count) + } else { + futex_wake_all(&s.count) + } +} + + +// Futex is a fast userspace mutual exclusion lock, using a 32-bit memory address as a hint +// +// An Futex must not be copied after first use +Futex :: distinct u32 + +Futex_Error :: enum { + None, + Timed_Out, +} + +futex_wait :: proc(f: ^Futex, expected: u32) { + if u32(atomic_load(f)) != expected { + return + } + + assert(_futex_wait(f, expected) != nil, "futex_wait failure") +} + +futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error { + if u32(atomic_load(f)) != expected { + return nil + } + + return _futex_wait_with_timeout(f, expected, duration) +} + +futex_wake_single :: proc(f: ^Futex) { + _futex_wake_single(f) +} + +futex_wake_all :: proc(f: ^Futex) { + _futex_wake_all(f) } diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin index c8c049dde..06e89341b 100644 --- a/core/sync/sync2/primitives_atomic.odin +++ b/core/sync/sync2/primitives_atomic.odin @@ -2,7 +2,7 @@ package sync2 import "core:time" -Atomic_Mutex_State :: enum i32 { +Atomic_Mutex_State :: enum Futex { Unlocked = 0, Locked = 1, Waiting = 2, @@ -42,8 +42,8 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked { return } - - // TODO(bill): Use a Futex here for Linux to improve performance and error handling + + futex_wait((^Futex)(&m.state), u32(new_state)) cpu_relax() } } @@ -62,7 +62,7 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { @(cold) unlock_slow :: proc(m: ^Atomic_Mutex) { - // TODO(bill): Use a Futex here for Linux to improve performance and error handling + futex_wake_single((^Futex)(&m.state)) } @@ -289,20 +289,20 @@ atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool { @(private="file") Queue_Item :: struct { next: ^Queue_Item, - futex: i32, + futex: Futex, } @(private="file") queue_item_wait :: proc(item: ^Queue_Item) { for atomic_load_acquire(&item.futex) == 0 { - // TODO(bill): Use a Futex here for Linux to improve performance and error handling + futex_wait(&item.futex, 0) cpu_relax() } } @(private="file") queue_item_signal :: proc(item: ^Queue_Item) { atomic_store_release(&item.futex, 1) - // TODO(bill): Use a Futex here for Linux to improve performance and error handling + futex_wake_single(&item.futex) } @@ -331,11 +331,6 @@ atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) { atomic_mutex_lock(m) } -atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool { - // TODO(bill): _cond_wait_with_timeout for unix - return false -} - atomic_cond_signal :: proc(c: ^Atomic_Cond) { if !atomic_load(&c.pending) { return diff --git a/core/sync/sync2/primitives_darwin.odin b/core/sync/sync2/primitives_darwin.odin index 309567f75..4c7e80aa2 100644 --- a/core/sync/sync2/primitives_darwin.odin +++ b/core/sync/sync2/primitives_darwin.odin @@ -18,126 +18,36 @@ _current_thread_id :: proc "contextless" () -> int { return int(tid) } -foreign { - @(link_name="usleep") - _darwin_usleep :: proc "c" (us: uint) -> i32 --- - @(link_name="sched_yield") - _darwin_sched_yield :: proc "c" () -> i32 --- -} - -_atomic_try_wait_slow :: proc(ptr: ^u32, val: u32) { - history: uint = 10 - for { - // Exponential wait - _darwin_usleep(history >> 2) - history += history >> 2 - if history > (1 << 10) { - history = 1 << 10 - } - - if atomic_load(ptr) != val { - break - } - } -} - -_atomic_wait :: proc(ptr: ^u32, val: u32) { - if intrinsics.expect(atomic_load(ptr) != val, true) { - return - } - for i in 0..<16 { - if atomic_load(ptr) != val { - return - } - if i < 12 { - intrinsics.cpu_relax() - } else { - _darwin_sched_yield() - } - } - - for val == atomic_load(ptr) { - _atomic_try_wait_slow(ptr, val) - } -} _Mutex :: struct { - + mutex: Atomic_Mutex, } _mutex_lock :: proc(m: ^Mutex) { + atomic_mutex_lock(&m.impl.mutex) } _mutex_unlock :: proc(m: ^Mutex) { + atomic_mutex_unlock(&m.impl.mutex) } _mutex_try_lock :: proc(m: ^Mutex) -> bool { - return false + return atomic_mutex_try_lock(&m.impl.mutex) } -_RW_Mutex :: struct { -} - -_rw_mutex_lock :: proc(rw: ^RW_Mutex) { -} - -_rw_mutex_unlock :: proc(rw: ^RW_Mutex) { -} - -_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { - return false -} - -_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { -} - -_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { -} - -_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { - return false -} - - -_Recursive_Mutex :: struct { -} - -_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { -} - -_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { -} - -_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - return false -} - - - - _Cond :: struct { + cond: Atomic_Cond, } _cond_wait :: proc(c: ^Cond, m: ^Mutex) { -} - -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { - return false + atomic_cond_wait(&c.impl.cond, &m.impl.mutex) } _cond_signal :: proc(c: ^Cond) { + atomic_cond_signal(&c.impl.cond) } _cond_broadcast :: proc(c: ^Cond) { -} - - -_Sema :: struct { -} - -_sema_wait :: proc(s: ^Sema) { -} - -_sema_post :: proc(s: ^Sema, count := 1) { + atomic_cond_broadcast(&c.impl.cond) } diff --git a/core/sync/sync2/primitives_internal.odin b/core/sync/sync2/primitives_internal.odin new file mode 100644 index 000000000..a6747a737 --- /dev/null +++ b/core/sync/sync2/primitives_internal.odin @@ -0,0 +1,184 @@ +//+private +package sync2 + +when #config(ODIN_SYNC_RECURSIVE_MUTEX_USE_FUTEX, true) { + _Recursive_Mutex :: struct { + owner: Futex, + recursion: i32, + } + + _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { + tid := Futex(current_thread_id()) + for { + prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) + switch prev_owner { + case 0, tid: + m.impl.recursion += 1 + // inside the lock + return + } + + futex_wait(&m.impl.owner, u32(prev_owner)) + } + } + + _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { + m.impl.recursion -= 1 + if m.impl.recursion != 0 { + return + } + atomic_exchange_release(&m.impl.owner, 0) + + futex_wake_single(&m.impl.owner) + // outside the lock + + } + + _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { + tid := Futex(current_thread_id()) + prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) + switch prev_owner { + case 0, tid: + m.impl.recursion += 1 + // inside the lock + return true + } + return false + } +} else { + _Recursive_Mutex :: struct { + owner: int, + recursion: int, + mutex: Mutex, + } + + _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { + tid := current_thread_id() + if tid != m.impl.owner { + mutex_lock(&m.impl.mutex) + } + // inside the lock + m.impl.owner = tid + m.impl.recursion += 1 + } + + _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { + tid := current_thread_id() + assert(tid == m.impl.owner) + m.impl.recursion -= 1 + recursion := m.impl.recursion + if recursion == 0 { + m.impl.owner = 0 + } + if recursion == 0 { + mutex_unlock(&m.impl.mutex) + } + // outside the lock + + } + + _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { + tid := current_thread_id() + if m.impl.owner == tid { + return mutex_try_lock(&m.impl.mutex) + } + if !mutex_try_lock(&m.impl.mutex) { + return false + } + // inside the lock + m.impl.owner = tid + m.impl.recursion += 1 + return true + } +} + + +when ODIN_OS != "windows" { + RW_Mutex_State :: distinct uint + RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2 + RW_Mutex_State_Is_Writing :: RW_Mutex_State(1) + RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1 + RW_Mutex_State_Reader :: RW_Mutex_State(1)< bool { + if mutex_try_lock(&rw.impl.mutex) { + state := atomic_load(&rw.impl.state) + if state & RW_Mutex_State_Reader_Mask == 0 { + _ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing) + return true + } + + mutex_unlock(&rw.impl.mutex) + } + return false + } + + _rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + state := atomic_load(&rw.impl.state) + for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + ok: bool + state, ok = atomic_compare_exchange_weak(&rw.impl.state, state, state + RW_Mutex_State_Reader) + if ok { + return + } + } + + mutex_lock(&rw.impl.mutex) + _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) + mutex_unlock(&rw.impl.mutex) + } + + _rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader) + + if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && + (state & RW_Mutex_State_Is_Writing != 0) { + sema_post(&rw.impl.sema) + } + } + + _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + state := atomic_load(&rw.impl.state) + if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + _, ok := atomic_compare_exchange_strong(&rw.impl.state, state, state + RW_Mutex_State_Reader) + if ok { + return true + } + } + if mutex_try_lock(&rw.impl.mutex) { + _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) + mutex_unlock(&rw.impl.mutex) + return true + } + + return false + } + +} \ No newline at end of file diff --git a/core/sync/sync2/primitives_linux.odin b/core/sync/sync2/primitives_linux.odin index 2fafaed7a..4c81295bd 100644 --- a/core/sync/sync2/primitives_linux.odin +++ b/core/sync/sync2/primitives_linux.odin @@ -2,14 +2,9 @@ //+private package sync2 -// TODO(bill): remove libc -foreign import libc "system:c" +import "core:intrinsics" _current_thread_id :: proc "contextless" () -> int { - foreign libc { - syscall :: proc(number: i32, #c_vararg args: ..any) -> i32 --- - } - SYS_GETTID :: 186 - return int(syscall(SYS_GETTID)) + return int(intrinsics.syscall(SYS_GETTID)) } diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin index 97c453810..276b3a90c 100644 --- a/core/sync/sync2/primitives_pthreads.odin +++ b/core/sync/sync2/primitives_pthreads.odin @@ -2,8 +2,6 @@ //+private package sync2 -when #config(ODIN_SYNC_USE_PTHREADS, true) { - import "core:time" import "core:runtime" import "core:sys/unix" @@ -32,142 +30,6 @@ _mutex_try_lock :: proc(m: ^Mutex) -> bool { return err == 0 } - - -RW_Mutex_State :: distinct uint -RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2 -RW_Mutex_State_Is_Writing :: RW_Mutex_State(1) -RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1 -RW_Mutex_State_Reader :: RW_Mutex_State(1)< bool { - if mutex_try_lock(&rw.impl.mutex) { - state := atomic_load(&rw.impl.state) - if state & RW_Mutex_State_Reader_Mask == 0 { - _ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing) - return true - } - - mutex_unlock(&rw.impl.mutex) - } - return false -} - -_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { - state := atomic_load(&rw.impl.state) - for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - ok: bool - state, ok = atomic_compare_exchange_weak(&rw.impl.state, state, state + RW_Mutex_State_Reader) - if ok { - return - } - } - - mutex_lock(&rw.impl.mutex) - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) - mutex_unlock(&rw.impl.mutex) -} - -_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { - state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader) - - if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && - (state & RW_Mutex_State_Is_Writing != 0) { - sema_post(&rw.impl.sema) - } -} - -_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { - state := atomic_load(&rw.impl.state) - if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - _, ok := atomic_compare_exchange_strong(&rw.impl.state, state, state + RW_Mutex_State_Reader) - if ok { - return true - } - } - if mutex_try_lock(&rw.impl.mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) - mutex_unlock(&rw.impl.mutex) - return true - } - - return false -} - - -_Recursive_Mutex :: struct { - owner: int, - recursion: int, - mutex: Mutex, -} - -_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { - tid := _current_thread_id() - if tid != m.impl.owner { - mutex_lock(&m.impl.mutex) - } - // inside the lock - m.impl.owner = tid - m.impl.recursion += 1 -} - -_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { - tid := _current_thread_id() - assert(tid == m.impl.owner) - m.impl.recursion -= 1 - recursion := m.impl.recursion - if recursion == 0 { - m.impl.owner = 0 - } - if recursion == 0 { - mutex_unlock(&m.impl.mutex) - } - // outside the lock - -} - -_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - tid := _current_thread_id() - if m.impl.owner == tid { - return mutex_try_lock(&m.impl.mutex) - } - if !mutex_try_lock(&m.impl.mutex) { - return false - } - // inside the lock - m.impl.owner = tid - m.impl.recursion += 1 - return true -} - - _Cond :: struct { pthread_cond: unix.pthread_cond_t, } @@ -177,17 +39,6 @@ _cond_wait :: proc(c: ^Cond, m: ^Mutex) { assert(err == 0) } -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { - ns := time.duration_nanoseconds(timeout) - timeout_timespec := &time.TimeSpec{ - tv_sec = ns / 1e9, - tv_nsec = ns % 1e9, - } - err := unix.pthread_cond_timedwait(&c.impl.pthread_cond, &m.impl.pthread_mutex, timeout_timespec) - // TODO(bill): - return err == 0 -} - _cond_signal :: proc(c: ^Cond) { err := unix.pthread_cond_signal(&c.impl.pthread_cond) assert(err == 0) @@ -197,35 +48,3 @@ _cond_broadcast :: proc(c: ^Cond) { err := unix.pthread_cond_broadcast(&c.impl.pthread_cond) assert(err == 0) } - -_Sema :: struct { - mutex: Mutex, - cond: Cond, - count: int, -} - -_sema_wait :: proc(s: ^Sema) { - mutex_lock(&s.impl.mutex) - defer mutex_unlock(&s.impl.mutex) - - for s.impl.count == 0 { - cond_wait(&s.impl.cond, &s.impl.mutex) - } - - s.impl.count -= 1 - if s.impl.count > 0 { - cond_signal(&s.impl.cond) - } -} - -_sema_post :: proc(s: ^Sema, count := 1) { - mutex_lock(&s.impl.mutex) - defer mutex_unlock(&s.impl.mutex) - - s.impl.count += count - cond_signal(&s.impl.cond) -} - - - -} // ODIN_SYNC_USE_PTHREADS diff --git a/core/sync/sync2/primitives_windows.odin b/core/sync/sync2/primitives_windows.odin index 934df6a2a..e02bbdfd7 100644 --- a/core/sync/sync2/primitives_windows.odin +++ b/core/sync/sync2/primitives_windows.odin @@ -54,55 +54,6 @@ _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { } -_Recursive_Mutex :: struct { - owner: u32, - claim_count: i32, -} - -_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { - tid := win32.GetCurrentThreadId() - for { - prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) - switch prev_owner { - case 0, tid: - m.impl.claim_count += 1 - // inside the lock - return - } - - win32.WaitOnAddress( - &m.impl.owner, - &prev_owner, - size_of(prev_owner), - win32.INFINITE, - ) - } -} - -_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { - m.impl.claim_count -= 1 - if m.impl.claim_count != 0 { - return - } - atomic_exchange_release(&m.impl.owner, 0) - win32.WakeByAddressSingle(&m.impl.owner) - // outside the lock - -} - -_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - tid := win32.GetCurrentThreadId() - prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) - switch prev_owner { - case 0, tid: - m.impl.claim_count += 1 - // inside the lock - return true - } - return false -} - - _Cond :: struct { @@ -113,11 +64,6 @@ _cond_wait :: proc(c: ^Cond, m: ^Mutex) { _ = win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, win32.INFINITE, 0) } -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { - ms := win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000) - return cast(bool)win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, ms, 0) -} - _cond_signal :: proc(c: ^Cond) { win32.WakeConditionVariable(&c.impl.cond) } @@ -126,34 +72,3 @@ _cond_broadcast :: proc(c: ^Cond) { win32.WakeAllConditionVariable(&c.impl.cond) } - -_Sema :: struct { - count: i32, -} - -_sema_wait :: proc(s: ^Sema) { - for { - original_count := s.impl.count - for original_count == 0 { - win32.WaitOnAddress( - &s.impl.count, - &original_count, - size_of(original_count), - win32.INFINITE, - ) - original_count = s.impl.count - } - if original_count == atomic_compare_exchange_strong(&s.impl.count, original_count-1, original_count) { - return - } - } -} - -_sema_post :: proc(s: ^Sema, count := 1) { - atomic_add(&s.impl.count, i32(count)) - if count == 1 { - win32.WakeByAddressSingle(&s.impl.count) - } else { - win32.WakeByAddressAll(&s.impl.count) - } -}