Switched to native futex on NetBSD

This commit is contained in:
Andreas T Jonsson
2024-04-25 21:50:34 +02:00
parent ce80c37c75
commit 3000508c02
4 changed files with 61 additions and 174 deletions
+49 -140
View File
@@ -1,165 +1,74 @@
//+private
package sync
import "core:c"
import "base:intrinsics"
import "core:time"
import "core:c"
import "core:sys/unix"
@(private="file")
Wait_Node :: struct {
thread: unix.pthread_t,
futex: ^Futex,
prev, next: ^Wait_Node,
foreign import libc "system:c"
FUTEX_PRIVATE_FLAG :: 128
FUTEX_WAIT_PRIVATE :: 0 | FUTEX_PRIVATE_FLAG
FUTEX_WAKE_PRIVATE :: 1 | FUTEX_PRIVATE_FLAG
EINTR :: 4 /* Interrupted system call */
EAGAIN :: 35 /* Resource temporarily unavailable */
ETIMEDOUT :: 60 /* Operation timed out */
Time_Spec :: struct {
time_sec: uint,
time_nsec: uint,
}
@(private="file")
atomic_flag :: distinct bool
@(private="file")
Wait_Queue :: struct {
lock: atomic_flag,
list: Wait_Node,
}
@(private="file")
waitq_lock :: proc "contextless" (waitq: ^Wait_Queue) {
for cast(bool)atomic_exchange_explicit(&waitq.lock, atomic_flag(true), .Acquire) {
cpu_relax() // spin...
get_last_error :: proc "contextless" () -> int {
foreign libc {
__errno :: proc() -> ^c.int ---
}
}
@(private="file")
waitq_unlock :: proc "contextless" (waitq: ^Wait_Queue) {
atomic_store_explicit(&waitq.lock, atomic_flag(false), .Release)
return int(__errno()^)
}
// FIXME: This approach may scale badly in the future,
// possible solution - hash map (leads to deadlocks now).
@(private="file")
g_waitq: Wait_Queue
@(init, private="file")
g_waitq_init :: proc() {
g_waitq = {
list = {
prev = &g_waitq.list,
next = &g_waitq.list,
},
_futex_wait :: proc "contextless" (futex: ^Futex, expected: u32) -> bool {
if cast(int) intrinsics.syscall(unix.SYS___futex, uintptr(futex), FUTEX_WAIT_PRIVATE, uintptr(expected), 0) == -1 {
switch get_last_error() {
case EINTR, EAGAIN:
return true
case:
_panic("futex_wait failure")
}
}
return true
}
@(private="file")
get_waitq :: #force_inline proc "contextless" (f: ^Futex) -> ^Wait_Queue {
_ = f
return &g_waitq
}
_futex_wait :: proc "contextless" (f: ^Futex, expect: u32) -> (ok: bool) {
waitq := get_waitq(f)
waitq_lock(waitq)
defer waitq_unlock(waitq)
head := &waitq.list
waiter := Wait_Node{
thread = unix.pthread_self(),
futex = f,
prev = head,
next = head.next,
}
waiter.prev.next = &waiter
waiter.next.prev = &waiter
old_mask, mask: unix.sigset_t
unix.sigemptyset(&mask)
unix.sigaddset(&mask, unix.SIGCONT)
unix.pthread_sigmask(unix.SIG_BLOCK, &mask, &old_mask)
if u32(atomic_load_explicit(f, .Acquire)) == expect {
waitq_unlock(waitq)
defer waitq_lock(waitq)
sig: c.int
unix.sigwait(&mask, &sig)
errno := unix.errno()
ok = errno == unix.ERROR_NONE
}
waiter.prev.next = waiter.next
waiter.next.prev = waiter.prev
unix.pthread_sigmask(unix.SIG_SETMASK, &old_mask, nil)
// FIXME: Add error handling!
return
}
_futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expect: u32, duration: time.Duration) -> (ok: bool) {
_futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, duration: time.Duration) -> bool {
if duration <= 0 {
return false
}
waitq := get_waitq(f)
waitq_lock(waitq)
defer waitq_unlock(waitq)
head := &waitq.list
waiter := Wait_Node{
thread = unix.pthread_self(),
futex = f,
prev = head,
next = head.next,
}
waiter.prev.next = &waiter
waiter.next.prev = &waiter
old_mask, mask: unix.sigset_t
unix.sigemptyset(&mask)
unix.sigaddset(&mask, unix.SIGCONT)
unix.pthread_sigmask(unix.SIG_BLOCK, &mask, &old_mask)
if u32(atomic_load_explicit(f, .Acquire)) == expect {
waitq_unlock(waitq)
defer waitq_lock(waitq)
info: unix.siginfo_t
ts := unix.timespec{
tv_sec = i64(duration / 1e9),
tv_nsec = i64(duration % 1e9),
if cast(int) intrinsics.syscall(unix.SYS___futex, uintptr(futex), FUTEX_WAIT_PRIVATE, uintptr(expected), cast(uintptr) &Time_Spec{
time_sec = cast(uint)(duration / 1e9),
time_nsec = cast(uint)(duration % 1e9),
}) == -1 {
switch get_last_error() {
case EINTR, EAGAIN:
return true
case ETIMEDOUT:
return false
case:
_panic("futex_wait_with_timeout failure")
}
unix.sigtimedwait(&mask, &info, &ts)
errno := unix.errno()
ok = errno == unix.EAGAIN || errno == unix.ERROR_NONE
}
waiter.prev.next = waiter.next
waiter.next.prev = waiter.prev
unix.pthread_sigmask(unix.SIG_SETMASK, &old_mask, nil)
// FIXME: Add error handling!
return
return true
}
_futex_signal :: proc "contextless" (f: ^Futex) {
waitq := get_waitq(f)
waitq_lock(waitq)
defer waitq_unlock(waitq)
head := &waitq.list
for waiter := head.next; waiter != head; waiter = waiter.next {
if waiter.futex == f {
unix.pthread_kill(waiter.thread, unix.SIGCONT)
break
}
_futex_signal :: proc "contextless" (futex: ^Futex) {
if cast(int) intrinsics.syscall(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, 1) == -1 {
_panic("futex_wake_single failure")
}
}
_futex_broadcast :: proc "contextless" (f: ^Futex) {
waitq := get_waitq(f)
waitq_lock(waitq)
defer waitq_unlock(waitq)
head := &waitq.list
for waiter := head.next; waiter != head; waiter = waiter.next {
if waiter.futex == f {
unix.pthread_kill(waiter.thread, unix.SIGCONT)
}
_futex_broadcast :: proc "contextless" (futex: ^Futex) {
if cast(int) intrinsics.syscall(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, uintptr(max(i32))) == -1 {
_panic("_futex_wake_all failure")
}
}
-30
View File
@@ -1,30 +0,0 @@
package unix
import "core:c"
foreign import libc "system:c"
ERROR_NONE :: 0
EAGAIN :: 35
SIGCONT :: 19
SIG_BLOCK :: 1
SIG_UNBLOCK :: 2
SIG_SETMASK :: 3
siginfo_t :: struct { _: [128]u8 }
sigset_t :: struct { _: [4]u32 }
foreign libc {
@(link_name="__sigemptyset14") sigemptyset :: proc(set: ^sigset_t) -> c.int ---
@(link_name="__sigaddset14") sigaddset :: proc(set: ^sigset_t, _signal: c.int) -> c.int ---
@(link_name="__sigtimedwait50") sigtimedwait :: proc(set: ^sigset_t, info: ^siginfo_t, timeout: ^timespec) -> c.int ---
@(link_name="sigwait") sigwait :: proc(set: ^sigset_t, _signal: ^c.int) -> c.int ---
@(private="file", link_name="__errno") get_error_location :: proc() -> ^c.int ---
}
errno :: #force_inline proc "contextless" () -> int {
return int(get_error_location()^)
}
+3
View File
@@ -0,0 +1,3 @@
package unix
SYS___futex : uintptr : 166
+9 -4
View File
@@ -634,9 +634,15 @@ gb_internal void thread_set_name(Thread *t, char const *name) {
#endif
}
#if defined(GB_SYSTEM_LINUX)
#include <linux/futex.h>
#if defined(GB_SYSTEM_LINUX) || defined(GB_SYSTEM_NETBSD)
#include <sys/syscall.h>
#ifdef GB_SYSTEM_LINUX
#include <linux/futex.h>
#else
#include <sys/futex.h>
#define SYS_futex SYS___futex
#endif
gb_internal void futex_signal(Futex *addr) {
int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0);
@@ -903,11 +909,10 @@ gb_internal void futex_wait(Futex *f, Footex val) {
} while (f->load() == val);
}
#elif defined(GB_SYSTEM_HAIKU) || defined(GB_SYSTEM_NETBSD)
#elif defined(GB_SYSTEM_HAIKU)
// Futex implementation taken from https://tavianator.com/2023/futex.html
#include <signal.h>
#include <pthread.h>
#include <atomic>