From 75e3df6da2c9a1b503093f7fa393c9cf95c379ca Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 12 Oct 2021 11:03:52 +0100 Subject: [PATCH] Add utility procedure groups for sync primitives --- core/sync/sync2/extended.odin | 45 ++++++++++----- core/sync/sync2/primitives_atomic.odin | 34 +++++++++-- core/sync/sync2/sync_util.odin | 80 ++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 17 deletions(-) create mode 100644 core/sync/sync2/sync_util.odin diff --git a/core/sync/sync2/extended.odin b/core/sync/sync2/extended.odin index 3ef4c04e5..d6a99fe04 100644 --- a/core/sync/sync2/extended.odin +++ b/core/sync/sync2/extended.odin @@ -77,31 +77,31 @@ wait_group_wait_with_timeout :: proc(wg: ^Wait_Group, duration: time.Duration) - * import "core:sync" * import "core:thread" * - * barrier := &sync.Barrier{}; + * barrier := &sync.Barrier{} * * main :: proc() { - * fmt.println("Start"); + * fmt.println("Start") * - * THREAD_COUNT :: 4; - * threads: [THREAD_COUNT]^thread.Thread; + * THREAD_COUNT :: 4 + * threads: [THREAD_COUNT]^thread.Thread * - * sync.barrier_init(barrier, THREAD_COUNT); - * defer sync.barrier_destroy(barrier); + * sync.barrier_init(barrier, THREAD_COUNT) + * defer sync.barrier_destroy(barrier) * * * for _, i in threads { * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { * // Same messages will be printed together but without any interleaving - * fmt.println("Getting ready!"); - * sync.barrier_wait(barrier); - * fmt.println("Off their marks they go!"); - * }); + * fmt.println("Getting ready!") + * sync.barrier_wait(barrier) + * fmt.println("Off their marks they go!") + * }) * } * * for t in threads { - * thread.destroy(t); // join and free thread + * thread.destroy(t) // join and free thread * } - * fmt.println("Finished"); + * fmt.println("Finished") * } * */ @@ -186,7 +186,11 @@ ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) { ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) { atomic_add_relaxed(&m.serving, 1) } - +@(deferred_in=ticket_mutex_unlock) +ticket_mutex_guard :: proc(m: ^Ticket_Mutex) -> bool { + ticket_mutex_lock(m) + return true +} Benaphore :: struct { @@ -211,6 +215,12 @@ benaphore_unlock :: proc(b: ^Benaphore) { } } +@(deferred_in=benaphore_unlock) +benaphore_guard :: proc(m: ^Benaphore) -> bool { + benaphore_lock(m) + return true +} + Recursive_Benaphore :: struct { counter: int, owner: int, @@ -261,15 +271,24 @@ recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) { // outside the lock } +@(deferred_in=recursive_benaphore_unlock) +recursive_benaphore_guard :: proc(m: ^Recursive_Benaphore) -> bool { + recursive_benaphore_lock(m) + return true +} +// Once is a data value that will perform exactly on action. +// +// A Once must not be copied after first use. Once :: struct { m: Mutex, done: bool, } +// once_do calls the procedure fn if and only if once_do is being called for the first for this instance of Once. once_do :: proc(o: ^Once, fn: proc()) { @(cold) do_slow :: proc(o: ^Once, fn: proc()) { diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin index 46be038aa..a13b73a99 100644 --- a/core/sync/sync2/primitives_atomic.odin +++ b/core/sync/sync2/primitives_atomic.odin @@ -409,6 +409,14 @@ Atomic_Sema :: struct { count: int, } +atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) { + atomic_mutex_lock(&s.mutex) + defer atomic_mutex_unlock(&s.mutex) + + s.count += count + atomic_cond_signal(&s.cond) +} + atomic_sema_wait :: proc(s: ^Atomic_Sema) { atomic_mutex_lock(&s.mutex) defer atomic_mutex_unlock(&s.mutex) @@ -423,11 +431,29 @@ atomic_sema_wait :: proc(s: ^Atomic_Sema) { } } -atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) { +atomic_sema_wait_with_timeout :: proc(s: ^Atomic_Sema, duration: time.Duration) -> bool { + if duration <= 0 { + return false + } atomic_mutex_lock(&s.mutex) defer atomic_mutex_unlock(&s.mutex) + + start := time.tick_now() - s.count += count - atomic_cond_signal(&s.cond) + for s.count == 0 { + remaining := duration - time.tick_since(start) + if remaining < 0 { + return false + } + + if !atomic_cond_wait_with_timeout(&s.cond, &s.mutex, remaining) { + return false + } + } + + s.count -= 1 + if s.count > 0 { + atomic_cond_signal(&s.cond) + } + return true } - diff --git a/core/sync/sync2/sync_util.odin b/core/sync/sync2/sync_util.odin new file mode 100644 index 000000000..c367c4220 --- /dev/null +++ b/core/sync/sync2/sync_util.odin @@ -0,0 +1,80 @@ +package sync2 + + +guard :: proc{ + mutex_guard, + rw_mutex_guard, + ticket_mutex_guard, + benaphore_guard, + recursive_benaphore_guard, + atomic_mutex_guard, + atomic_recursive_mutex_guard, +} + +shared_guard :: proc{ + rw_mutex_shared_guard, + atomic_rw_mutex_shared_guard, +} + +lock :: proc{ + mutex_lock, + rw_mutex_lock, + ticket_mutex_lock, + benaphore_lock, + recursive_benaphore_lock, + atomic_mutex_lock, + atomic_recursive_mutex_lock, +} + +unlock :: proc{ + mutex_unlock, + rw_mutex_unlock, + ticket_mutex_unlock, + benaphore_unlock, + recursive_benaphore_unlock, + atomic_mutex_unlock, + atomic_recursive_mutex_unlock, +} + +try_lock :: proc{ + mutex_try_lock, + rw_mutex_try_lock, + benaphore_try_lock, + recursive_benaphore_try_lock, + atomic_mutex_try_lock, + atomic_recursive_mutex_try_lock, +} + + +wait :: proc{ + cond_wait, + sema_wait, + atomic_cond_wait, + atomic_sema_wait, + futex_wait, +} + +wait_for_timeout :: proc{ + cond_wait_with_timeout, + sema_wait_with_timeout, + atomic_cond_wait_with_timeout, + atomic_sema_wait_with_timeout, + futex_wait_with_timeout, +} + +post :: proc{ + sema_post, + atomic_sema_post, +} + +signal :: proc{ + cond_signal, + atomic_cond_signal, + futex_signal, +} + +broadcast :: proc{ + cond_broadcast, + atomic_cond_broadcast, + futex_broadcast, +} \ No newline at end of file