diff --git a/core/sync/barrier.odin b/core/sync/barrier.odin new file mode 100644 index 000000000..51abec536 --- /dev/null +++ b/core/sync/barrier.odin @@ -0,0 +1,81 @@ +package sync + + +// A barrier enabling multiple threads to synchronize the beginning of some computation +/* + * Example: + * + * package example + * + * import "core:fmt" + * import "core:sync" + * import "core:thread" + * + * barrier := &sync.Barrier{}; + * + * main :: proc() { + * fmt.println("Start"); + * + * THREAD_COUNT :: 4; + * threads: [THREAD_COUNT]^thread.Thread; + * + * sync.barrier_init(barrier, THREAD_COUNT); + * defer sync.barrier_destroy(barrier); + * + * + * for _, i in threads { + * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { + * // Same messages will be printed together but without any interleaving + * fmt.println("Getting ready!"); + * sync.barrier_wait(barrier); + * fmt.println("Off their marks they go!"); + * }); + * } + * + * for t in threads { + * thread.destroy(t); // join and free thread + * } + * fmt.println("Finished"); + * } + * + */ +Barrier :: struct { + mutex: Blocking_Mutex, + cond: Condition, + index: int, + generation_id: int, + thread_count: int, +} + +barrier_init :: proc(b: ^Barrier, thread_count: int) { + blocking_mutex_init(&b.mutex); + condition_init(&b.cond, &b.mutex); + b.index = 0; + b.generation_id = 0; + b.thread_count = thread_count; +} + +barrier_destroy :: proc(b: ^Barrier) { + blocking_mutex_destroy(&b.mutex); + condition_destroy(&b.cond); +} + +// Block the current thread until all threads have rendezvoused +// Barrier can be reused after all threads rendezvoused once, and can be used continuously +barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) { + blocking_mutex_lock(&b.mutex); + defer blocking_mutex_unlock(&b.mutex); + local_gen := b.generation_id; + b.index += 1; + if b.index < b.thread_count { + for local_gen == b.generation_id && b.index < b.thread_count { + condition_wait_for(&b.cond); + } + return false; + } + + b.index = 0; + b.generation_id += 1; + condition_broadcast(&b.cond); + return true; +} diff --git a/core/sync/sync.odin b/core/sync/sync.odin index 6101cdbc1..6ac349cb4 100644 --- a/core/sync/sync.odin +++ b/core/sync/sync.odin @@ -6,6 +6,9 @@ cpu_relax :: inline proc() { intrinsics.cpu_relax(); } +Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex}; + + Ticket_Mutex :: struct { ticket: u64, serving: u64, diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin index 6b0c9b5f8..dd8901872 100644 --- a/core/sync/sync_unix.odin +++ b/core/sync/sync_unix.odin @@ -3,33 +3,18 @@ package sync import "core:sys/unix" -// A lock that can only be held by one thread at once. +// A recursive lock that can only be held by one thread at once Mutex :: struct { handle: unix.pthread_mutex_t, } -// Blocks until signalled, and then lets past exactly -// one thread. -Condition :: struct { - handle: unix.pthread_cond_t, - mutex: ^Mutex, - - // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. - // This means that you may signal the condition before anyone is waiting to cause the - // next thread that tries to wait to just pass by uninterrupted, without sleeping. - // Without this, signalling a condition will only wake up a thread which is already waiting, - // but not one that is about to wait, which can cause your program to become out of sync in - // ways that are hard to debug or fix. - flag: bool, // atomically mutated -} - - mutex_init :: proc(m: ^Mutex) { // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. attrs: unix.pthread_mutexattr_t; assert(unix.pthread_mutexattr_init(&attrs) == 0); defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error + unix.pthread_mutexattr_settype(&attrs, unix.PTHREAD_MUTEX_RECURSIVE); assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0); } @@ -53,7 +38,56 @@ mutex_unlock :: proc(m: ^Mutex) { } -condition_init :: proc(c: ^Condition, mutex: ^Mutex) -> bool { +Blocking_Mutex :: struct { + handle: unix.pthread_mutex_t, +} + + +blocking_mutex_init :: proc(m: ^Blocking_Mutex) { + // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. + attrs: unix.pthread_mutexattr_t; + assert(unix.pthread_mutexattr_init(&attrs) == 0); + defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error + + assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0); +} + +blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { + assert(unix.pthread_mutex_destroy(&m.handle) == 0); + m.handle = {}; +} + +blocking_mutex_lock :: proc(m: ^Blocking_Mutex) { + assert(unix.pthread_mutex_lock(&m.handle) == 0); +} + +// Returns false if someone else holds the lock. +blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool { + return unix.pthread_mutex_trylock(&m.handle) == 0; +} + +blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { + assert(unix.pthread_mutex_unlock(&m.handle) == 0); +} + + + +// Blocks until signalled, and then lets past exactly +// one thread. +Condition :: struct { + handle: unix.pthread_cond_t, + mutex: Condition_Mutex_Ptr, + + // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. + // This means that you may signal the condition before anyone is waiting to cause the + // next thread that tries to wait to just pass by uninterrupted, without sleeping. + // Without this, signalling a condition will only wake up a thread which is already waiting, + // but not one that is about to wait, which can cause your program to become out of sync in + // ways that are hard to debug or fix. + flag: bool, // atomically mutated +} + +condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition. attrs: unix.pthread_condattr_t; if unix.pthread_condattr_init(&attrs) != 0 { @@ -73,10 +107,19 @@ condition_destroy :: proc(c: ^Condition) { // Awaken exactly one thread who is waiting on the condition condition_signal :: proc(c: ^Condition) -> bool { - mutex_lock(c.mutex); - defer mutex_unlock(c.mutex); - atomic_swap(&c.flag, true, .Sequentially_Consistent); - return unix.pthread_cond_signal(&c.handle) == 0; + switch m in c.mutex { + case ^Mutex: + mutex_lock(m); + defer mutex_unlock(m); + atomic_swap(&c.flag, true, .Sequentially_Consistent); + return unix.pthread_cond_signal(&c.handle) == 0; + case ^Blocking_Mutex: + blocking_mutex_lock(m); + defer blocking_mutex_unlock(m); + atomic_swap(&c.flag, true, .Sequentially_Consistent); + return unix.pthread_cond_signal(&c.handle) == 0; + } + return false; } // Awaken all threads who are waiting on the condition @@ -88,24 +131,48 @@ condition_broadcast :: proc(c: ^Condition) -> bool { // Does not block if the condition has been signalled and no one // has waited on it yet. condition_wait_for :: proc(c: ^Condition) -> bool { - mutex_lock(c.mutex); - defer mutex_unlock(c.mutex); - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true; - } - for { - if unix.pthread_cond_wait(&c.handle, &c.mutex.handle) != 0 { - return false; - } + switch m in c.mutex { + case ^Mutex: + mutex_lock(m); + defer mutex_unlock(m); + // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, + // the thread that gets signalled and wakes up, discovers that the flag was taken and goes + // back to sleep. + // Though this overall behavior is the most sane, there may be a better way to do this that means that + // the first thread to wait, gets the flag first. if atomic_swap(&c.flag, false, .Sequentially_Consistent) { return true; } - } + for { + if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { + return false; + } + if atomic_swap(&c.flag, false, .Sequentially_Consistent) { + return true; + } + } + return false; + case ^Blocking_Mutex: + blocking_mutex_lock(m); + defer blocking_mutex_unlock(m); + // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, + // the thread that gets signalled and wakes up, discovers that the flag was taken and goes + // back to sleep. + // Though this overall behavior is the most sane, there may be a better way to do this that means that + // the first thread to wait, gets the flag first. + if atomic_swap(&c.flag, false, .Sequentially_Consistent) { + return true; + } + for { + if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { + return false; + } + if atomic_swap(&c.flag, false, .Sequentially_Consistent) { + return true; + } + } + return false; + } return false; } diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin index 134e8e806..885174279 100644 --- a/core/sync/sync_windows.odin +++ b/core/sync/sync_windows.odin @@ -4,24 +4,6 @@ package sync import win32 "core:sys/windows" import "core:time" -Mutex :: struct { - _critical_section: win32.CRITICAL_SECTION, -} - -Blocking_Mutex :: struct { - _handle: win32.SRWLOCK, -} - - -Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex}; - -// Blocks until signalled. -// When signalled, awakens exactly one waiting thread. -Condition :: struct { - _handle: win32.CONDITION_VARIABLE, - - mutex: Condition_Mutex_Ptr, -} // When waited upon, blocks until the internal count is greater than zero, then subtracts one. // Posting to the semaphore increases the count by one, or the provided amount. @@ -29,11 +11,6 @@ Semaphore :: struct { _handle: win32.HANDLE, } -RW_Lock :: struct { - _handle: win32.SRWLOCK, -} - - semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { s._handle = win32.CreateSemaphoreW(nil, i32(initial_count), 1<<31-1, nil); } @@ -53,6 +30,11 @@ semaphore_wait_for :: proc(s: ^Semaphore) { } +Mutex :: struct { + _critical_section: win32.CRITICAL_SECTION, +} + + mutex_init :: proc(m: ^Mutex, spin_count := 0) { win32.InitializeCriticalSectionAndSpinCount(&m._critical_section, u32(spin_count)); } @@ -73,6 +55,11 @@ mutex_unlock :: proc(m: ^Mutex) { win32.LeaveCriticalSection(&m._critical_section); } +Blocking_Mutex :: struct { + _handle: win32.SRWLOCK, +} + + blocking_mutex_init :: proc(m: ^Blocking_Mutex) { // } @@ -94,6 +81,15 @@ blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { } +// Blocks until signalled. +// When signalled, awakens exactly one waiting thread. +Condition :: struct { + _handle: win32.CONDITION_VARIABLE, + + mutex: Condition_Mutex_Ptr, +} + + condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { assert(mutex != nil); win32.InitializeConditionVariable(&c._handle); @@ -143,6 +139,11 @@ condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bo + +RW_Lock :: struct { + _handle: win32.SRWLOCK, +} + rw_lock_init :: proc(l: ^RW_Lock) { l._handle = win32.SRWLOCK_INIT; } @@ -167,6 +168,3 @@ rw_lock_read_unlock :: proc(l: ^RW_Lock) { rw_lock_write_unlock :: proc(l: ^RW_Lock) { win32.ReleaseSRWLockExclusive(&l._handle); } - - -