From 4eb4ae6305720d226f6ccd1ee8d7b18b8436ced0 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Wed, 30 Mar 2022 17:42:44 +0100 Subject: [PATCH] Replace `sync` with `sync2` --- core/mem/virtual/virtual_platform.odin | 2 +- core/os/os2/process.odin | 2 +- core/sync/atomic.odin | 227 ++--- core/sync/barrier.odin | 80 -- core/sync/channel.odin | 889 ------------------ core/sync/channel_unix.odin | 16 - core/sync/channel_windows.odin | 33 - core/sync/{sync2 => }/extended.odin | 0 core/sync/{sync2 => }/futex_darwin.odin | 0 core/sync/{sync2 => }/futex_freebsd.odin | 0 core/sync/{sync2 => }/futex_linux.odin | 0 core/sync/{sync2 => }/futex_openbsd.odin | 0 core/sync/{sync2 => }/futex_windows.odin | 0 core/sync/{sync2 => }/primitives.odin | 0 core/sync/{sync2 => }/primitives_atomic.odin | 0 core/sync/{sync2 => }/primitives_darwin.odin | 0 core/sync/{sync2 => }/primitives_freebsd.odin | 0 .../sync/{sync2 => }/primitives_internal.odin | 0 core/sync/{sync2 => }/primitives_linux.odin | 0 core/sync/{sync2 => }/primitives_openbsd.odin | 0 .../sync/{sync2 => }/primitives_pthreads.odin | 0 core/sync/{sync2 => }/primitives_windows.odin | 0 core/sync/{sync2 => }/sema_internal.odin | 0 core/sync/sync.odin | 123 --- core/sync/sync2/atomic.odin | 79 -- core/sync/sync_darwin.odin | 54 -- core/sync/sync_freebsd.odin | 40 - core/sync/sync_linux.odin | 36 - core/sync/sync_openbsd.odin | 36 - core/sync/sync_unix.odin | 248 ----- core/sync/{sync2 => }/sync_util.odin | 0 core/sync/sync_windows.odin | 180 ---- core/sync/wait_group.odin | 58 -- core/thread/thread_pool.odin | 17 +- core/thread/thread_unix.odin | 2 +- core/thread/thread_windows.odin | 2 +- examples/all/all_main.odin | 2 - 37 files changed, 79 insertions(+), 2047 deletions(-) delete mode 100644 core/sync/barrier.odin delete mode 100644 core/sync/channel.odin delete mode 100644 core/sync/channel_unix.odin delete mode 100644 core/sync/channel_windows.odin rename core/sync/{sync2 => }/extended.odin (100%) rename core/sync/{sync2 => }/futex_darwin.odin (100%) rename core/sync/{sync2 => }/futex_freebsd.odin (100%) rename core/sync/{sync2 => }/futex_linux.odin (100%) rename core/sync/{sync2 => }/futex_openbsd.odin (100%) rename core/sync/{sync2 => }/futex_windows.odin (100%) rename core/sync/{sync2 => }/primitives.odin (100%) rename core/sync/{sync2 => }/primitives_atomic.odin (100%) rename core/sync/{sync2 => }/primitives_darwin.odin (100%) rename core/sync/{sync2 => }/primitives_freebsd.odin (100%) rename core/sync/{sync2 => }/primitives_internal.odin (100%) rename core/sync/{sync2 => }/primitives_linux.odin (100%) rename core/sync/{sync2 => }/primitives_openbsd.odin (100%) rename core/sync/{sync2 => }/primitives_pthreads.odin (100%) rename core/sync/{sync2 => }/primitives_windows.odin (100%) rename core/sync/{sync2 => }/sema_internal.odin (100%) delete mode 100644 core/sync/sync.odin delete mode 100644 core/sync/sync2/atomic.odin delete mode 100644 core/sync/sync_darwin.odin delete mode 100644 core/sync/sync_freebsd.odin delete mode 100644 core/sync/sync_linux.odin delete mode 100644 core/sync/sync_openbsd.odin delete mode 100644 core/sync/sync_unix.odin rename core/sync/{sync2 => }/sync_util.odin (100%) delete mode 100644 core/sync/sync_windows.odin delete mode 100644 core/sync/wait_group.odin diff --git a/core/mem/virtual/virtual_platform.odin b/core/mem/virtual/virtual_platform.odin index d707bc427..367346f63 100644 --- a/core/mem/virtual/virtual_platform.odin +++ b/core/mem/virtual/virtual_platform.odin @@ -1,7 +1,7 @@ //+private package mem_virtual -import sync "core:sync/sync2" +import "core:sync" Platform_Memory_Block :: struct { block: Memory_Block, diff --git a/core/os/os2/process.odin b/core/os/os2/process.odin index 028951fe3..7fc7a4ac0 100644 --- a/core/os/os2/process.odin +++ b/core/os/os2/process.odin @@ -1,6 +1,6 @@ package os2 -import sync "core:sync/sync2" +import "core:sync" import "core:time" import "core:runtime" diff --git a/core/sync/atomic.odin b/core/sync/atomic.odin index 21dcea178..fe19f17c8 100644 --- a/core/sync/atomic.odin +++ b/core/sync/atomic.odin @@ -1,168 +1,79 @@ -package sync +package sync2 import "core:intrinsics" -Ordering :: enum { - Relaxed, // Monotonic - Release, - Acquire, - Acquire_Release, - Sequentially_Consistent, -} +cpu_relax :: intrinsics.cpu_relax -strongest_failure_ordering_table := [Ordering]Ordering{ - .Relaxed = .Relaxed, - .Release = .Relaxed, - .Acquire = .Acquire, - .Acquire_Release = .Acquire, - .Sequentially_Consistent = .Sequentially_Consistent, -} +atomic_fence :: intrinsics.atomic_fence +atomic_fence_acquire :: intrinsics.atomic_fence_acq +atomic_fence_release :: intrinsics.atomic_fence_rel +atomic_fence_acqrel :: intrinsics.atomic_fence_acqrel -strongest_failure_ordering :: #force_inline proc(order: Ordering) -> Ordering { - return strongest_failure_ordering_table[order] -} +atomic_store :: intrinsics.atomic_store +atomic_store_release :: intrinsics.atomic_store_rel +atomic_store_relaxed :: intrinsics.atomic_store_relaxed +atomic_store_unordered :: intrinsics.atomic_store_unordered -fence :: #force_inline proc($order: Ordering) { - when order == .Relaxed { #panic("there is no such thing as a relaxed fence") } - else when order == .Release { intrinsics.atomic_fence_rel() } - else when order == .Acquire { intrinsics.atomic_fence_acq() } - else when order == .Acquire_Release { intrinsics.atomic_fence_acqrel() } - else when order == .Sequentially_Consistent { intrinsics.atomic_fence() } - else { #panic("unknown order") } -} +atomic_load :: intrinsics.atomic_load +atomic_load_acquire :: intrinsics.atomic_load_acq +atomic_load_relaxed :: intrinsics.atomic_load_relaxed +atomic_load_unordered :: intrinsics.atomic_load_unordered +atomic_add :: intrinsics.atomic_add +atomic_add_acquire :: intrinsics.atomic_add_acq +atomic_add_release :: intrinsics.atomic_add_rel +atomic_add_acqrel :: intrinsics.atomic_add_acqrel +atomic_add_relaxed :: intrinsics.atomic_add_relaxed +atomic_sub :: intrinsics.atomic_sub +atomic_sub_acquire :: intrinsics.atomic_sub_acq +atomic_sub_release :: intrinsics.atomic_sub_rel +atomic_sub_acqrel :: intrinsics.atomic_sub_acqrel +atomic_sub_relaxed :: intrinsics.atomic_sub_relaxed +atomic_and :: intrinsics.atomic_and +atomic_and_acquire :: intrinsics.atomic_and_acq +atomic_and_release :: intrinsics.atomic_and_rel +atomic_and_acqrel :: intrinsics.atomic_and_acqrel +atomic_and_relaxed :: intrinsics.atomic_and_relaxed +atomic_nand :: intrinsics.atomic_nand +atomic_nand_acquire :: intrinsics.atomic_nand_acq +atomic_nand_release :: intrinsics.atomic_nand_rel +atomic_nand_acqrel :: intrinsics.atomic_nand_acqrel +atomic_nand_relaxed :: intrinsics.atomic_nand_relaxed +atomic_or :: intrinsics.atomic_or +atomic_or_acquire :: intrinsics.atomic_or_acq +atomic_or_release :: intrinsics.atomic_or_rel +atomic_or_acqrel :: intrinsics.atomic_or_acqrel +atomic_or_relaxed :: intrinsics.atomic_or_relaxed +atomic_xor :: intrinsics.atomic_xor +atomic_xor_acquire :: intrinsics.atomic_xor_acq +atomic_xor_release :: intrinsics.atomic_xor_rel +atomic_xor_acqrel :: intrinsics.atomic_xor_acqrel +atomic_xor_relaxed :: intrinsics.atomic_xor_relaxed -atomic_store :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) { - when order == .Relaxed { intrinsics.atomic_store_relaxed(dst, val) } - else when order == .Release { intrinsics.atomic_store_rel(dst, val) } - else when order == .Sequentially_Consistent { intrinsics.atomic_store(dst, val) } - else when order == .Acquire { #panic("there is not such thing as an acquire store") } - else when order == .Acquire_Release { #panic("there is not such thing as an acquire/release store") } - else { #panic("unknown order") } -} +atomic_exchange :: intrinsics.atomic_xchg +atomic_exchange_acquire :: intrinsics.atomic_xchg_acq +atomic_exchange_release :: intrinsics.atomic_xchg_rel +atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel +atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed -atomic_load :: #force_inline proc(dst: ^$T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_load_relaxed(dst) } - else when order == .Acquire { return intrinsics.atomic_load_acq(dst) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_load(dst) } - else when order == .Release { #panic("there is no such thing as a release load") } - else when order == .Acquire_Release { #panic("there is no such thing as an acquire/release load") } - else { #panic("unknown order") } -} - -atomic_swap :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_xchg_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_xchg_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_xchg_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_xchg_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_xchg(dst, val) } - else { #panic("unknown order") } -} - -atomic_compare_exchange :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { - when failure == .Relaxed { - when success == .Relaxed { return intrinsics.atomic_cxchg_relaxed(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchg_acq_failrelaxed(dst, old, new) } - else when success == .Acquire_Release { return intrinsics.atomic_cxchg_acqrel_failrelaxed(dst, old, new) } - else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg_failrelaxed(dst, old, new) } - else when success == .Release { return intrinsics.atomic_cxchg_rel(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire { - when success == .Release { return intrinsics.atomic_cxchg_acqrel(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchg_acq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Sequentially_Consistent { - when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire_Release { - #panic("there is not such thing as an acquire/release failure ordering") - } else when failure == .Release { - when success == .Acquire { return instrinsics.atomic_cxchg_failacq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else { - return T{}, false - } - -} - -atomic_compare_exchange_weak :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { - when failure == .Relaxed { - when success == .Relaxed { return intrinsics.atomic_cxchgweak_relaxed(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq_failrelaxed(dst, old, new) } - else when success == .Acquire_Release { return intrinsics.atomic_cxchgweak_acqrel_failrelaxed(dst, old, new) } - else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak_failrelaxed(dst, old, new) } - else when success == .Release { return intrinsics.atomic_cxchgweak_rel(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire { - when success == .Release { return intrinsics.atomic_cxchgweak_acqrel(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Sequentially_Consistent { - when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire_Release { - #panic("there is not such thing as an acquire/release failure ordering") - } else when failure == .Release { - when success == .Acquire { return intrinsics.atomic_cxchgweak_failacq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else { - return T{}, false - } - -} - - -atomic_add :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_add_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_add_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_add_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_add_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_add(dst, val) } - else { #panic("unknown order") } -} - -atomic_sub :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_sub_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_sub_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_sub_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_sub_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_sub(dst, val) } - else { #panic("unknown order") } -} - -atomic_and :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_and_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_and_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_and_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_and_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_and(dst, val) } - else { #panic("unknown order") } -} - -atomic_nand :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_nand_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_nand_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_nand_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_nand_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_nand(dst, val) } - else { #panic("unknown order") } -} - -atomic_or :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_or_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_or_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_or_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_or_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_or(dst, val) } - else { #panic("unknown order") } -} - -atomic_xor :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_xor_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_xor_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_xor_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_xor_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_xor(dst, val) } - else { #panic("unknown order") } -} +// Returns value and optional ok boolean +atomic_compare_exchange_strong :: intrinsics.atomic_cxchg +atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq +atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel +atomic_compare_exchange_strong_acqrel :: intrinsics.atomic_cxchg_acqrel +atomic_compare_exchange_strong_relaxed :: intrinsics.atomic_cxchg_relaxed +atomic_compare_exchange_strong_failrelaxed :: intrinsics.atomic_cxchg_failrelaxed +atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_failacq +atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed +atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed +// Returns value and optional ok boolean +atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak +atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq +atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel +atomic_compare_exchange_weak_acqrel :: intrinsics.atomic_cxchgweak_acqrel +atomic_compare_exchange_weak_relaxed :: intrinsics.atomic_cxchgweak_relaxed +atomic_compare_exchange_weak_failrelaxed :: intrinsics.atomic_cxchgweak_failrelaxed +atomic_compare_exchange_weak_failacquire :: intrinsics.atomic_cxchgweak_failacq +atomic_compare_exchange_weak_acquire_failrelaxed :: intrinsics.atomic_cxchgweak_acq_failrelaxed +atomic_compare_exchange_weak_acqrel_failrelaxed :: intrinsics.atomic_cxchgweak_acqrel_failrelaxed diff --git a/core/sync/barrier.odin b/core/sync/barrier.odin deleted file mode 100644 index 13c870a3b..000000000 --- a/core/sync/barrier.odin +++ /dev/null @@ -1,80 +0,0 @@ -package sync - - -/* -A barrier enabling multiple threads to synchronize the beginning of some computation -Example: - - package example - - import "core:fmt" - import "core:sync" - import "core:thread" - - barrier := &sync.Barrier{}; - - main :: proc() { - fmt.println("Start"); - - THREAD_COUNT :: 4; - threads: [THREAD_COUNT]^thread.Thread; - - sync.barrier_init(barrier, THREAD_COUNT); - defer sync.barrier_destroy(barrier); - - - for _, i in threads { - threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { - // Same messages will be printed together but without any interleaving - fmt.println("Getting ready!"); - sync.barrier_wait(barrier); - fmt.println("Off their marks they go!"); - }); - } - - for t in threads { - thread.destroy(t); // join and free thread - } - fmt.println("Finished"); - } -*/ -Barrier :: struct { - mutex: Blocking_Mutex, - cond: Condition, - index: int, - generation_id: int, - thread_count: int, -} - -barrier_init :: proc(b: ^Barrier, thread_count: int) { - blocking_mutex_init(&b.mutex) - condition_init(&b.cond, &b.mutex) - b.index = 0 - b.generation_id = 0 - b.thread_count = thread_count -} - -barrier_destroy :: proc(b: ^Barrier) { - blocking_mutex_destroy(&b.mutex) - condition_destroy(&b.cond) -} - -// Block the current thread until all threads have rendezvoused -// Barrier can be reused after all threads rendezvoused once, and can be used continuously -barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) { - blocking_mutex_lock(&b.mutex) - defer blocking_mutex_unlock(&b.mutex) - local_gen := b.generation_id - b.index += 1 - if b.index < b.thread_count { - for local_gen == b.generation_id && b.index < b.thread_count { - condition_wait_for(&b.cond) - } - return false - } - - b.index = 0 - b.generation_id += 1 - condition_broadcast(&b.cond) - return true -} diff --git a/core/sync/channel.odin b/core/sync/channel.odin deleted file mode 100644 index 82b9504f4..000000000 --- a/core/sync/channel.odin +++ /dev/null @@ -1,889 +0,0 @@ -package sync - -import "core:mem" -import "core:time" -import "core:intrinsics" -import "core:math/rand" - -_, _ :: time, rand - -Channel_Direction :: enum i8 { - Both = 0, - Send = +1, - Recv = -1, -} - -Channel :: struct($T: typeid, $Direction := Channel_Direction.Both) { - using _internal: ^Raw_Channel, -} - -channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} -channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_destroy :: proc(ch: $C/Channel($T, $D)) { - raw_channel_destroy(ch._internal) -} - -channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) { - res._internal = ch._internal - return -} - -channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) { - res._internal = ch._internal - return -} - - -channel_len :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.len if ch._internal != nil else 0 -} -channel_cap :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.cap if ch._internal != nil else 0 -} - - -channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both { - msg := msg - _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc) -} -channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both { - msg := msg - return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc) -} - -channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both { - c := ch._internal - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc) - } - mutex_lock(&c.mutex) - raw_channel_recv_impl(c, &msg, loc) - mutex_unlock(&c.mutex) - return -} -channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal - if c != nil && mutex_try_lock(&c.mutex) { - if c.len > 0 { - raw_channel_recv_impl(c, &msg, loc) - ok = true - } - mutex_unlock(&c.mutex) - } - return -} -channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both { - res: T - res, ok = channel_try_recv(ch, loc) - if ok && msg != nil { - msg^ = res - } - return -} - - -channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool { - return ch._internal == nil -} -channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool { - c := ch._internal - return c != nil && !c.closed -} - - -channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal == b._internal -} -channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal != b._internal -} - - -channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both { - return raw_channel_can_send(ch._internal) -} -channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both { - return raw_channel_can_recv(ch._internal) -} - - -channel_peek :: proc(ch: $C/Channel($T, $D)) -> int { - c := ch._internal - if c == nil { - return -1 - } - if intrinsics.atomic_load(&c.closed) { - return -1 - } - return intrinsics.atomic_load(&c.len) -} - - -channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) { - raw_channel_close(ch._internal, loc) -} - - -channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal - if c == nil { - return - } - - if !c.closed || c.len > 0 { - msg, ok = channel_recv(ch), true - } - return -} -channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both { - raw_channel_drain(ch._internal) -} - - -channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both { - for msg in channel_iterator(src) { - channel_send(dst, msg) - } -} - - -Raw_Channel_Wait_Queue :: struct { - next: ^Raw_Channel_Wait_Queue, - state: ^uintptr, -} - - -Raw_Channel :: struct { - closed: bool, - ready: bool, // ready to recv - data_offset: u16, // data is stored at the end of this data structure - elem_size: u32, - len, cap: int, - read, write: int, - mutex: Mutex, - cond: Condition, - allocator: mem.Allocator, - - sendq: ^Raw_Channel_Wait_Queue, - recvq: ^Raw_Channel_Wait_Queue, -} - -raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - val.next = head^ - head^ = val -} -raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - p := head - for p^ != nil && p^ != val { - p = &p^.next - } - if p != nil { - p^ = p^.next - } -} - - -raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel { - assert(int(u32(elem_size)) == elem_size) - - s := size_of(Raw_Channel) - s = mem.align_forward_int(s, elem_align) - data_offset := uintptr(s) - s += elem_size * max(cap, 1) - - a := max(elem_align, align_of(Raw_Channel)) - - c := (^Raw_Channel)(mem.alloc(s, a)) - if c == nil { - return nil - } - - c.data_offset = u16(data_offset) - c.elem_size = u32(elem_size) - c.len, c.cap = 0, max(cap, 0) - c.read, c.write = 0, 0 - mutex_init(&c.mutex) - condition_init(&c.cond, &c.mutex) - c.allocator = context.allocator - c.closed = false - - return c -} - - -raw_channel_destroy :: proc(c: ^Raw_Channel) { - if c == nil { - return - } - context.allocator = c.allocator - intrinsics.atomic_store(&c.closed, true) - - condition_destroy(&c.cond) - mutex_destroy(&c.mutex) - free(c) -} - -raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) { - if c == nil { - panic(message="cannot close nil channel", loc=loc) - } - mutex_lock(&c.mutex) - defer mutex_unlock(&c.mutex) - intrinsics.atomic_store(&c.closed, true) - - // Release readers and writers - raw_channel_wait_queue_broadcast(c.recvq) - raw_channel_wait_queue_broadcast(c.sendq) - condition_broadcast(&c.cond) -} - - - -raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { - send :: proc(c: ^Raw_Channel, src: rawptr) { - data := uintptr(c) + uintptr(c.data_offset) - dst := data + uintptr(c.write * int(c.elem_size)) - mem.copy(rawptr(dst), src, int(c.elem_size)) - c.len += 1 - c.write = (c.write + 1) % max(c.cap, 1) - } - - switch { - case c == nil: - panic(message="cannot send message; channel is nil", loc=loc) - case c.closed: - panic(message="cannot send message; channel is closed", loc=loc) - } - - mutex_lock(&c.mutex) - defer mutex_unlock(&c.mutex) - - if c.cap > 0 { - if !block && c.len >= c.cap { - return false - } - - for c.len >= c.cap { - condition_wait_for(&c.cond) - } - } else if c.len > 0 { // TODO(bill): determine correct behaviour - if !block { - return false - } - condition_wait_for(&c.cond) - } else if c.len == 0 && !block { - return false - } - - send(c, msg) - condition_signal(&c.cond) - raw_channel_wait_queue_signal(c.recvq) - - return true -} - -raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) { - recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) { - if c.len < 1 { - panic(message="cannot recv message; channel is empty", loc=loc) - } - c.len -= 1 - - data := uintptr(c) + uintptr(c.data_offset) - src := data + uintptr(c.read * int(c.elem_size)) - mem.copy(dst, rawptr(src), int(c.elem_size)) - c.read = (c.read + 1) % max(c.cap, 1) - } - - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc) - } - intrinsics.atomic_store(&c.ready, true) - for c.len < 1 { - raw_channel_wait_queue_signal(c.sendq) - condition_wait_for(&c.cond) - } - intrinsics.atomic_store(&c.ready, false) - recv(c, res, loc) - if c.cap > 0 { - if c.len == c.cap - 1 { - // NOTE(bill): Only signal on the last one - condition_signal(&c.cond) - } - } else { - condition_signal(&c.cond) - } -} - - -raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false - } - mutex_lock(&c.mutex) - switch { - case c.closed: - ok = false - case c.cap > 0: - ok = c.ready && c.len < c.cap - case: - ok = c.ready && c.len == 0 - } - mutex_unlock(&c.mutex) - return -} -raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false - } - mutex_lock(&c.mutex) - ok = c.len > 0 - mutex_unlock(&c.mutex) - return -} - - -raw_channel_drain :: proc(c: ^Raw_Channel) { - if c == nil { - return - } - mutex_lock(&c.mutex) - c.len = 0 - c.read = 0 - c.write = 0 - mutex_unlock(&c.mutex) -} - - - -MAX_SELECT_CHANNELS :: 64 -SELECT_MAX_TIMEOUT :: max(time.Duration) - -Select_Command :: enum { - Recv, - Send, -} - -Select_Channel :: struct { - channel: ^Raw_Channel, - command: Select_Command, -} - - - -select :: proc(channels: ..Select_Channel) -> (index: int) { - return select_timeout(SELECT_MAX_TIMEOUT, ..channels) -} -select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if c.channel == nil { - continue - } - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - - if count == 0 { - wait_state: uintptr = 0 - for _, i in channels { - q := &queues[i] - q.state = &wait_state - } - - for c, i in channels { - if c.channel == nil { - continue - } - q := &queues[i] - switch c.command { - case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q) - case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q) - } - } - raw_channel_wait_queue_wait_on(&wait_state, timeout) - for c, i in channels { - if c.channel == nil { - continue - } - q := &queues[i] - switch c.command { - case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q) - case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q) - } - } - - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - if count == 0 && timeout == SELECT_MAX_TIMEOUT { - index = -1 - return - } - - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - msg = channel_recv(channels[index]) - - return -} - -select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - - if msg != nil { - channel_send(channels[index], msg) - } - - return -} - -select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.sendq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.sendq, q) - } - - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_try :: proc(channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - - -select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - index = -1 - return - case 1: - index = -1 - if raw_channel_can_recv(channels[0]) { - index = 0 - } - return - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - - -select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check { - switch len(channels) { - case 0: - return -1 - case 1: - if raw_channel_can_send(channels[0]) { - return 0 - } - return -1 - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - index = -1 - return - case 1: - ok: bool - if msg, ok = channel_try_recv(channels[0]); ok { - index = 0 - } - return - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - msg = channel_recv(channels[index]) - return -} - -select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - index = -1 - switch len(channels) { - case 0: - return - case 1: - if channel_try_send(channels[0], msg) { - index = 0 - } - return - } - - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - channel_send(channels[index], msg) - return -} - diff --git a/core/sync/channel_unix.odin b/core/sync/channel_unix.odin deleted file mode 100644 index 47aa46004..000000000 --- a/core/sync/channel_unix.odin +++ /dev/null @@ -1,16 +0,0 @@ -// +build linux, darwin, freebsd, openbsd -package sync - -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - // stub -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} diff --git a/core/sync/channel_windows.odin b/core/sync/channel_windows.odin deleted file mode 100644 index 5d469ffff..000000000 --- a/core/sync/channel_windows.odin +++ /dev/null @@ -1,33 +0,0 @@ -package sync - -import "core:intrinsics" -import win32 "core:sys/windows" -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - ms: win32.DWORD = win32.INFINITE - if max(time.Duration) != SELECT_MAX_TIMEOUT { - ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000) - } - - v := intrinsics.atomic_load(state) - for v == 0 { - win32.WaitOnAddress(state, &v, size_of(state^), ms) - v = intrinsics.atomic_load(state) - } - intrinsics.atomic_store(state, 0) -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - intrinsics.atomic_add(x.state, 1) - win32.WakeByAddressSingle(x.state) - } -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - intrinsics.atomic_add(x.state, 1) - win32.WakeByAddressAll(x.state) - } -} diff --git a/core/sync/sync2/extended.odin b/core/sync/extended.odin similarity index 100% rename from core/sync/sync2/extended.odin rename to core/sync/extended.odin diff --git a/core/sync/sync2/futex_darwin.odin b/core/sync/futex_darwin.odin similarity index 100% rename from core/sync/sync2/futex_darwin.odin rename to core/sync/futex_darwin.odin diff --git a/core/sync/sync2/futex_freebsd.odin b/core/sync/futex_freebsd.odin similarity index 100% rename from core/sync/sync2/futex_freebsd.odin rename to core/sync/futex_freebsd.odin diff --git a/core/sync/sync2/futex_linux.odin b/core/sync/futex_linux.odin similarity index 100% rename from core/sync/sync2/futex_linux.odin rename to core/sync/futex_linux.odin diff --git a/core/sync/sync2/futex_openbsd.odin b/core/sync/futex_openbsd.odin similarity index 100% rename from core/sync/sync2/futex_openbsd.odin rename to core/sync/futex_openbsd.odin diff --git a/core/sync/sync2/futex_windows.odin b/core/sync/futex_windows.odin similarity index 100% rename from core/sync/sync2/futex_windows.odin rename to core/sync/futex_windows.odin diff --git a/core/sync/sync2/primitives.odin b/core/sync/primitives.odin similarity index 100% rename from core/sync/sync2/primitives.odin rename to core/sync/primitives.odin diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/primitives_atomic.odin similarity index 100% rename from core/sync/sync2/primitives_atomic.odin rename to core/sync/primitives_atomic.odin diff --git a/core/sync/sync2/primitives_darwin.odin b/core/sync/primitives_darwin.odin similarity index 100% rename from core/sync/sync2/primitives_darwin.odin rename to core/sync/primitives_darwin.odin diff --git a/core/sync/sync2/primitives_freebsd.odin b/core/sync/primitives_freebsd.odin similarity index 100% rename from core/sync/sync2/primitives_freebsd.odin rename to core/sync/primitives_freebsd.odin diff --git a/core/sync/sync2/primitives_internal.odin b/core/sync/primitives_internal.odin similarity index 100% rename from core/sync/sync2/primitives_internal.odin rename to core/sync/primitives_internal.odin diff --git a/core/sync/sync2/primitives_linux.odin b/core/sync/primitives_linux.odin similarity index 100% rename from core/sync/sync2/primitives_linux.odin rename to core/sync/primitives_linux.odin diff --git a/core/sync/sync2/primitives_openbsd.odin b/core/sync/primitives_openbsd.odin similarity index 100% rename from core/sync/sync2/primitives_openbsd.odin rename to core/sync/primitives_openbsd.odin diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/primitives_pthreads.odin similarity index 100% rename from core/sync/sync2/primitives_pthreads.odin rename to core/sync/primitives_pthreads.odin diff --git a/core/sync/sync2/primitives_windows.odin b/core/sync/primitives_windows.odin similarity index 100% rename from core/sync/sync2/primitives_windows.odin rename to core/sync/primitives_windows.odin diff --git a/core/sync/sync2/sema_internal.odin b/core/sync/sema_internal.odin similarity index 100% rename from core/sync/sync2/sema_internal.odin rename to core/sync/sema_internal.odin diff --git a/core/sync/sync.odin b/core/sync/sync.odin deleted file mode 100644 index 05c86a868..000000000 --- a/core/sync/sync.odin +++ /dev/null @@ -1,123 +0,0 @@ -package sync - -import "core:intrinsics" - -cpu_relax :: #force_inline proc "contextless" () { - intrinsics.cpu_relax() -} - -Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex} - - -Ticket_Mutex :: struct { - ticket: u64, - serving: u64, -} - -ticket_mutex_init :: proc(m: ^Ticket_Mutex) { - atomic_store(&m.ticket, 0, .Relaxed) - atomic_store(&m.serving, 0, .Relaxed) -} - -ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) { - ticket := atomic_add(&m.ticket, 1, .Relaxed) - for ticket != atomic_load(&m.serving, .Acquire) { - intrinsics.cpu_relax() - } -} - -ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) { - atomic_add(&m.serving, 1, .Relaxed) -} - - -Benaphore :: struct { - counter: int, - sema: Semaphore, -} - -benaphore_init :: proc(b: ^Benaphore) { - intrinsics.atomic_store(&b.counter, 0) - semaphore_init(&b.sema) -} - -benaphore_destroy :: proc(b: ^Benaphore) { - semaphore_destroy(&b.sema) -} - -benaphore_lock :: proc(b: ^Benaphore) { - if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { - semaphore_wait_for(&b.sema) - } -} - -benaphore_try_lock :: proc(b: ^Benaphore) -> bool { - v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0) - return v == 0 -} - -benaphore_unlock :: proc(b: ^Benaphore) { - if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { - semaphore_post(&b.sema) - } -} - -Recursive_Benaphore :: struct { - counter: int, - owner: int, - recursion: int, - sema: Semaphore, -} - -recursive_benaphore_init :: proc(b: ^Recursive_Benaphore) { - intrinsics.atomic_store(&b.counter, 0) - semaphore_init(&b.sema) -} - -recursive_benaphore_destroy :: proc(b: ^Recursive_Benaphore) { - semaphore_destroy(&b.sema) -} - -recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) { - tid := current_thread_id() - if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { - if tid != b.owner { - semaphore_wait_for(&b.sema) - } - } - // inside the lock - b.owner = tid - b.recursion += 1 -} - -recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool { - tid := current_thread_id() - if b.owner == tid { - intrinsics.atomic_add_acq(&b.counter, 1) - } else { - v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0) - if v != 0 { - return false - } - // inside the lock - b.owner = tid - } - b.recursion += 1 - return true -} - -recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) { - tid := current_thread_id() - assert(tid == b.owner) - b.recursion -= 1 - recursion := b.recursion - if recursion == 0 { - b.owner = 0 - } - if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { - if recursion == 0 { - semaphore_post(&b.sema) - } - } - // outside the lock -} diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin deleted file mode 100644 index fe19f17c8..000000000 --- a/core/sync/sync2/atomic.odin +++ /dev/null @@ -1,79 +0,0 @@ -package sync2 - -import "core:intrinsics" - -cpu_relax :: intrinsics.cpu_relax - -atomic_fence :: intrinsics.atomic_fence -atomic_fence_acquire :: intrinsics.atomic_fence_acq -atomic_fence_release :: intrinsics.atomic_fence_rel -atomic_fence_acqrel :: intrinsics.atomic_fence_acqrel - -atomic_store :: intrinsics.atomic_store -atomic_store_release :: intrinsics.atomic_store_rel -atomic_store_relaxed :: intrinsics.atomic_store_relaxed -atomic_store_unordered :: intrinsics.atomic_store_unordered - -atomic_load :: intrinsics.atomic_load -atomic_load_acquire :: intrinsics.atomic_load_acq -atomic_load_relaxed :: intrinsics.atomic_load_relaxed -atomic_load_unordered :: intrinsics.atomic_load_unordered - -atomic_add :: intrinsics.atomic_add -atomic_add_acquire :: intrinsics.atomic_add_acq -atomic_add_release :: intrinsics.atomic_add_rel -atomic_add_acqrel :: intrinsics.atomic_add_acqrel -atomic_add_relaxed :: intrinsics.atomic_add_relaxed -atomic_sub :: intrinsics.atomic_sub -atomic_sub_acquire :: intrinsics.atomic_sub_acq -atomic_sub_release :: intrinsics.atomic_sub_rel -atomic_sub_acqrel :: intrinsics.atomic_sub_acqrel -atomic_sub_relaxed :: intrinsics.atomic_sub_relaxed -atomic_and :: intrinsics.atomic_and -atomic_and_acquire :: intrinsics.atomic_and_acq -atomic_and_release :: intrinsics.atomic_and_rel -atomic_and_acqrel :: intrinsics.atomic_and_acqrel -atomic_and_relaxed :: intrinsics.atomic_and_relaxed -atomic_nand :: intrinsics.atomic_nand -atomic_nand_acquire :: intrinsics.atomic_nand_acq -atomic_nand_release :: intrinsics.atomic_nand_rel -atomic_nand_acqrel :: intrinsics.atomic_nand_acqrel -atomic_nand_relaxed :: intrinsics.atomic_nand_relaxed -atomic_or :: intrinsics.atomic_or -atomic_or_acquire :: intrinsics.atomic_or_acq -atomic_or_release :: intrinsics.atomic_or_rel -atomic_or_acqrel :: intrinsics.atomic_or_acqrel -atomic_or_relaxed :: intrinsics.atomic_or_relaxed -atomic_xor :: intrinsics.atomic_xor -atomic_xor_acquire :: intrinsics.atomic_xor_acq -atomic_xor_release :: intrinsics.atomic_xor_rel -atomic_xor_acqrel :: intrinsics.atomic_xor_acqrel -atomic_xor_relaxed :: intrinsics.atomic_xor_relaxed - -atomic_exchange :: intrinsics.atomic_xchg -atomic_exchange_acquire :: intrinsics.atomic_xchg_acq -atomic_exchange_release :: intrinsics.atomic_xchg_rel -atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel -atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed - -// Returns value and optional ok boolean -atomic_compare_exchange_strong :: intrinsics.atomic_cxchg -atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq -atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel -atomic_compare_exchange_strong_acqrel :: intrinsics.atomic_cxchg_acqrel -atomic_compare_exchange_strong_relaxed :: intrinsics.atomic_cxchg_relaxed -atomic_compare_exchange_strong_failrelaxed :: intrinsics.atomic_cxchg_failrelaxed -atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_failacq -atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed -atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed - -// Returns value and optional ok boolean -atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak -atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq -atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel -atomic_compare_exchange_weak_acqrel :: intrinsics.atomic_cxchgweak_acqrel -atomic_compare_exchange_weak_relaxed :: intrinsics.atomic_cxchgweak_relaxed -atomic_compare_exchange_weak_failrelaxed :: intrinsics.atomic_cxchgweak_failrelaxed -atomic_compare_exchange_weak_failacquire :: intrinsics.atomic_cxchgweak_failacq -atomic_compare_exchange_weak_acquire_failrelaxed :: intrinsics.atomic_cxchgweak_acq_failrelaxed -atomic_compare_exchange_weak_acqrel_failrelaxed :: intrinsics.atomic_cxchgweak_acqrel_failrelaxed diff --git a/core/sync/sync_darwin.odin b/core/sync/sync_darwin.odin deleted file mode 100644 index f3bb4d5a3..000000000 --- a/core/sync/sync_darwin.odin +++ /dev/null @@ -1,54 +0,0 @@ -package sync - -import "core:sys/darwin" - -import "core:c" - -foreign import pthread "System.framework" - -current_thread_id :: proc "contextless" () -> int { - tid: u64 - // NOTE(Oskar): available from OSX 10.6 and iOS 3.2. - // For older versions there is `syscall(SYS_thread_selfid)`, but not really - // the same thing apparently. - foreign pthread { pthread_threadid_np :: proc "c" (rawptr, ^u64) -> c.int --- } - pthread_threadid_np(nil, &tid) - return int(tid) -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: darwin.semaphore_t, -} -// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments. -// See core/sys/unix/pthread_linux.odin/pthread_t. - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - ct := darwin.mach_task_self() - res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count)) - assert(res == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - ct := darwin.mach_task_self() - res := darwin.semaphore_destroy(ct, s.handle) - assert(res == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0.. int { - SYS_GETTID :: 186 - return int(intrinsics.syscall(SYS_GETTID)) -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: unix.sem_t, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - assert(unix.sem_destroy(&s.handle) == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0.. int { - return unix.sys_gettid() -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: unix.sem_t, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - assert(unix.sem_destroy(&s.handle) == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0.. int { - return os.current_thread_id() -} - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: unix.sem_t, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - assert(unix.sem_destroy(&s.handle) == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0.. bool { - return unix.pthread_mutex_trylock(&m.handle) == 0 -} - -mutex_unlock :: proc(m: ^Mutex) { - assert(unix.pthread_mutex_unlock(&m.handle) == 0) -} - - -Blocking_Mutex :: struct { - handle: unix.pthread_mutex_t, -} - - -blocking_mutex_init :: proc(m: ^Blocking_Mutex) { - // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. - attrs: unix.pthread_mutexattr_t - assert(unix.pthread_mutexattr_init(&attrs) == 0) - defer unix.pthread_mutexattr_destroy(&attrs) // ignores destruction error - - assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0) -} - -blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_destroy(&m.handle) == 0) - m.handle = {} -} - -blocking_mutex_lock :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_lock(&m.handle) == 0) -} - -// Returns false if someone else holds the lock. -blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool { - return unix.pthread_mutex_trylock(&m.handle) == 0 -} - -blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_unlock(&m.handle) == 0) -} - - - -// Blocks until signalled, and then lets past exactly -// one thread. -Condition :: struct { - handle: unix.pthread_cond_t, - mutex: Condition_Mutex_Ptr, - - // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. - // This means that you may signal the condition before anyone is waiting to cause the - // next thread that tries to wait to just pass by uninterrupted, without sleeping. - // Without this, signalling a condition will only wake up a thread which is already waiting, - // but not one that is about to wait, which can cause your program to become out of sync in - // ways that are hard to debug or fix. - flag: bool, // atomically mutated -} - -condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { - // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition. - attrs: unix.pthread_condattr_t - if unix.pthread_condattr_init(&attrs) != 0 { - return false - } - defer unix.pthread_condattr_destroy(&attrs) // ignores destruction error - - c.flag = false - c.mutex = mutex - return unix.pthread_cond_init(&c.handle, &attrs) == 0 -} - -condition_destroy :: proc(c: ^Condition) { - assert(unix.pthread_cond_destroy(&c.handle) == 0) - c.handle = {} -} - -// Awaken exactly one thread who is waiting on the condition -condition_signal :: proc(c: ^Condition) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - atomic_swap(&c.flag, true, .Sequentially_Consistent) - return unix.pthread_cond_signal(&c.handle) == 0 - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - atomic_swap(&c.flag, true, .Sequentially_Consistent) - return unix.pthread_cond_signal(&c.handle) == 0 - } - return false -} - -// Awaken all threads who are waiting on the condition -condition_broadcast :: proc(c: ^Condition) -> bool { - return unix.pthread_cond_broadcast(&c.handle) == 0 -} - -// Wait for the condition to be signalled. -// Does not block if the condition has been signalled and no one -// has waited on it yet. -condition_wait_for :: proc(c: ^Condition) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - for { - if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - for { - if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - } - return false -} - -// Wait for the condition to be signalled. -// Does not block if the condition has been signalled and no one -// has waited on it yet. -condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - - ns := time.duration_nanoseconds(duration) - timeout: time.TimeSpec - timeout.tv_sec = ns / 1e9 - timeout.tv_nsec = ns % 1e9 - - for { - if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - - ns := time.duration_nanoseconds(duration) - - timeout: time.TimeSpec - timeout.tv_sec = ns / 1e9 - timeout.tv_nsec = ns % 1e9 - - for { - if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - } - return false -} - - - -thread_yield :: proc() { - unix.sched_yield() -} diff --git a/core/sync/sync2/sync_util.odin b/core/sync/sync_util.odin similarity index 100% rename from core/sync/sync2/sync_util.odin rename to core/sync/sync_util.odin diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin deleted file mode 100644 index 0a7cf71b2..000000000 --- a/core/sync/sync_windows.odin +++ /dev/null @@ -1,180 +0,0 @@ -// +build windows -package sync - -import win32 "core:sys/windows" -import "core:time" - -current_thread_id :: proc "contextless" () -> int { - return int(win32.GetCurrentThreadId()) -} - - -// When waited upon, blocks until the internal count is greater than zero, then subtracts one. -// Posting to the semaphore increases the count by one, or the provided amount. -Semaphore :: struct { - _handle: win32.HANDLE, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - s._handle = win32.CreateSemaphoreW(nil, i32(initial_count), 1<<31-1, nil) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - win32.CloseHandle(s._handle) -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - win32.ReleaseSemaphore(s._handle, i32(count), nil) -} - -semaphore_wait_for :: proc(s: ^Semaphore) { - // NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns. - result := win32.WaitForSingleObject(s._handle, win32.INFINITE) - assert(result != win32.WAIT_FAILED) -} - - -Mutex :: struct { - _critical_section: win32.CRITICAL_SECTION, -} - - -mutex_init :: proc(m: ^Mutex, spin_count := 0) { - win32.InitializeCriticalSectionAndSpinCount(&m._critical_section, u32(spin_count)) -} - -mutex_destroy :: proc(m: ^Mutex) { - win32.DeleteCriticalSection(&m._critical_section) -} - -mutex_lock :: proc(m: ^Mutex) { - win32.EnterCriticalSection(&m._critical_section) -} - -mutex_try_lock :: proc(m: ^Mutex) -> bool { - return bool(win32.TryEnterCriticalSection(&m._critical_section)) -} - -mutex_unlock :: proc(m: ^Mutex) { - win32.LeaveCriticalSection(&m._critical_section) -} - -Blocking_Mutex :: struct { - _handle: win32.SRWLOCK, -} - - -blocking_mutex_init :: proc(m: ^Blocking_Mutex) { - win32.InitializeSRWLock(&m._handle) -} - -blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { - // -} - -blocking_mutex_lock :: proc(m: ^Blocking_Mutex) { - win32.AcquireSRWLockExclusive(&m._handle) -} - -blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool { - return bool(win32.TryAcquireSRWLockExclusive(&m._handle)) -} - -blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { - win32.ReleaseSRWLockExclusive(&m._handle) -} - - -// Blocks until signalled. -// When signalled, awakens exactly one waiting thread. -Condition :: struct { - _handle: win32.CONDITION_VARIABLE, - - mutex: Condition_Mutex_Ptr, -} - - -condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { - assert(mutex != nil) - win32.InitializeConditionVariable(&c._handle) - c.mutex = mutex - return true -} - -condition_destroy :: proc(c: ^Condition) { - // -} - -condition_signal :: proc(c: ^Condition) -> bool { - if c._handle.ptr == nil { - return false - } - win32.WakeConditionVariable(&c._handle) - return true -} - -condition_broadcast :: proc(c: ^Condition) -> bool { - if c._handle.ptr == nil { - return false - } - win32.WakeAllConditionVariable(&c._handle) - return true -} - -condition_wait_for :: proc(c: ^Condition) -> bool { - switch m in &c.mutex { - case ^Mutex: - return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, win32.INFINITE) - case ^Blocking_Mutex: - return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, win32.INFINITE, 0) - } - return false -} -condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool { - ms := win32.DWORD((max(time.duration_nanoseconds(duration), 0) + 999999)/1000000) - switch m in &c.mutex { - case ^Mutex: - return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, ms) - case ^Blocking_Mutex: - return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, ms, 0) - } - return false -} - - - - -RW_Lock :: struct { - _handle: win32.SRWLOCK, -} - -rw_lock_init :: proc(l: ^RW_Lock) { - l._handle = win32.SRWLOCK_INIT -} -rw_lock_destroy :: proc(l: ^RW_Lock) { - // -} -rw_lock_read :: proc(l: ^RW_Lock) { - win32.AcquireSRWLockShared(&l._handle) -} -rw_lock_try_read :: proc(l: ^RW_Lock) -> bool { - return bool(win32.TryAcquireSRWLockShared(&l._handle)) -} -rw_lock_write :: proc(l: ^RW_Lock) { - win32.AcquireSRWLockExclusive(&l._handle) -} -rw_lock_try_write :: proc(l: ^RW_Lock) -> bool { - return bool(win32.TryAcquireSRWLockExclusive(&l._handle)) -} -rw_lock_read_unlock :: proc(l: ^RW_Lock) { - win32.ReleaseSRWLockShared(&l._handle) -} -rw_lock_write_unlock :: proc(l: ^RW_Lock) { - win32.ReleaseSRWLockExclusive(&l._handle) -} - - -thread_yield :: proc() { - win32.SwitchToThread() -} - diff --git a/core/sync/wait_group.odin b/core/sync/wait_group.odin deleted file mode 100644 index 63d882ed1..000000000 --- a/core/sync/wait_group.odin +++ /dev/null @@ -1,58 +0,0 @@ -package sync - -import "core:intrinsics" - -Wait_Group :: struct { - counter: int, - mutex: Blocking_Mutex, - cond: Condition, -} - -wait_group_init :: proc(wg: ^Wait_Group) { - wg.counter = 0 - blocking_mutex_init(&wg.mutex) - condition_init(&wg.cond, &wg.mutex) -} - - -wait_group_destroy :: proc(wg: ^Wait_Group) { - condition_destroy(&wg.cond) - blocking_mutex_destroy(&wg.mutex) -} - -wait_group_add :: proc(wg: ^Wait_Group, delta: int) { - if delta == 0 { - return - } - - blocking_mutex_lock(&wg.mutex) - defer blocking_mutex_unlock(&wg.mutex) - - intrinsics.atomic_add(&wg.counter, delta) - if wg.counter < 0 { - panic("sync.Wait_Group negative counter") - } - if wg.counter == 0 { - condition_broadcast(&wg.cond) - if wg.counter != 0 { - panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") - } - } -} - -wait_group_done :: proc(wg: ^Wait_Group) { - wait_group_add(wg, -1) -} - -wait_group_wait :: proc(wg: ^Wait_Group) { - blocking_mutex_lock(&wg.mutex) - defer blocking_mutex_unlock(&wg.mutex) - - if wg.counter != 0 { - condition_wait_for(&wg.cond) - if wg.counter != 0 { - panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") - } - } -} - diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin index 37ee4fa98..97ea2b77d 100644 --- a/core/thread/thread_pool.odin +++ b/core/thread/thread_pool.odin @@ -26,7 +26,7 @@ INVALID_TASK_ID :: Task_Id(-1) Pool :: struct { allocator: mem.Allocator, mutex: sync.Mutex, - sem_available: sync.Semaphore, + sem_available: sync.Sema, processing_task_count: int, // atomic is_running: bool, @@ -40,14 +40,14 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator pool := (^Pool)(t.data) for pool.is_running { - sync.semaphore_wait_for(&pool.sem_available) + sync.sema_wait(&pool.sem_available) if task, ok := pool_try_and_pop_task(pool); ok { pool_do_work(pool, &task) } } - sync.semaphore_post(&pool.sem_available, 1) + sync.sema_post(&pool.sem_available, 1) } @@ -56,8 +56,6 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator pool.tasks = make([dynamic]Task) pool.threads = make([]^Thread, thread_count) - sync.mutex_init(&pool.mutex) - sync.semaphore_init(&pool.sem_available) pool.is_running = true for _, i in pool.threads { @@ -76,9 +74,6 @@ pool_destroy :: proc(pool: ^Pool) { } delete(pool.threads, pool.allocator) - - sync.mutex_destroy(&pool.mutex) - sync.semaphore_destroy(&pool.sem_available) } pool_start :: proc(pool: ^Pool) { @@ -90,7 +85,7 @@ pool_start :: proc(pool: ^Pool) { pool_join :: proc(pool: ^Pool) { pool.is_running = false - sync.semaphore_post(&pool.sem_available, len(pool.threads)) + sync.sema_post(&pool.sem_available, len(pool.threads)) yield() @@ -109,7 +104,7 @@ pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_inde task.user_index = user_index append(&pool.tasks, task) - sync.semaphore_post(&pool.sem_available, 1) + sync.sema_post(&pool.sem_available, 1) } pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) { @@ -140,7 +135,7 @@ pool_wait_and_process :: proc(pool: ^Pool) { // Safety kick if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 { sync.mutex_lock(&pool.mutex) - sync.semaphore_post(&pool.sem_available, len(pool.tasks)) + sync.sema_post(&pool.sem_available, len(pool.tasks)) sync.mutex_unlock(&pool.mutex) } diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index fbf89f122..8452df112 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -4,7 +4,7 @@ package thread import "core:runtime" import "core:intrinsics" -import sync "core:sync/sync2" +import "core:sync" import "core:sys/unix" Thread_State :: enum u8 { diff --git a/core/thread/thread_windows.odin b/core/thread/thread_windows.odin index 428e241d0..210c54a96 100644 --- a/core/thread/thread_windows.odin +++ b/core/thread/thread_windows.odin @@ -3,7 +3,7 @@ package thread import "core:runtime" -import sync "core:sync/sync2" +import "core:sync" import win32 "core:sys/windows" Thread_Os_Specific :: struct { diff --git a/examples/all/all_main.odin b/examples/all/all_main.odin index dafd29ab3..6a039e4dd 100644 --- a/examples/all/all_main.odin +++ b/examples/all/all_main.odin @@ -96,7 +96,6 @@ import sort "core:sort" import strconv "core:strconv" import strings "core:strings" import sync "core:sync" -import sync2 "core:sync/sync2" import testing "core:testing" import scanner "core:text/scanner" import thread "core:thread" @@ -187,7 +186,6 @@ _ :: sort _ :: strconv _ :: strings _ :: sync -_ :: sync2 _ :: testing _ :: scanner _ :: thread