Remove unneeded synchronizations in Chan

Everything was already guarded by `c.mutex`.
This commit is contained in:
Feoramund
2024-09-15 22:00:53 -04:00
parent 7f7cfebc91
commit d38f5ffb49
+35 -43
View File
@@ -22,19 +22,17 @@ Raw_Chan :: struct {
allocator: runtime.Allocator,
allocation_size: int,
msg_size: u16,
closed: b16, // atomic
closed: b16, // guarded by `mutex`
mutex: sync.Mutex,
r_cond: sync.Cond,
w_cond: sync.Cond,
r_waiting: int, // atomic
w_waiting: int, // atomic
r_waiting: int, // guarded by `mutex`
w_waiting: int, // guarded by `mutex`
// Buffered
queue: ^Raw_Queue,
// Unbuffered
r_mutex: sync.Mutex,
w_mutex: sync.Mutex,
unbuffered_data: rawptr,
}
@@ -164,32 +162,30 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
}
if c.queue != nil { // buffered
sync.guard(&c.mutex)
for !sync.atomic_load(&c.closed) &&
c.queue.len == c.queue.cap {
sync.atomic_add(&c.w_waiting, 1)
for !c.closed && c.queue.len == c.queue.cap {
c.w_waiting += 1
sync.wait(&c.w_cond, &c.mutex)
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
}
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
ok = raw_queue_push(c.queue, msg_in)
if sync.atomic_load(&c.r_waiting) > 0 {
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.w_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
sync.atomic_add(&c.w_waiting, 1)
if sync.atomic_load(&c.r_waiting) > 0 {
c.w_waiting += 1
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
sync.wait(&c.w_cond, &c.mutex)
@@ -206,13 +202,13 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
if c.queue != nil { // buffered
sync.guard(&c.mutex)
for c.queue.len == 0 {
if sync.atomic_load(&c.closed) {
if c.closed {
return
}
sync.atomic_add(&c.r_waiting, 1)
c.r_waiting += 1
sync.wait(&c.r_cond, &c.mutex)
sync.atomic_sub(&c.r_waiting, 1)
c.r_waiting -= 1
}
msg := raw_queue_pop(c.queue)
@@ -220,27 +216,26 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
mem.copy(msg_out, msg, int(c.msg_size))
}
if sync.atomic_load(&c.w_waiting) > 0 {
if c.w_waiting > 0 {
sync.signal(&c.w_cond)
}
ok = true
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.r_mutex)
sync.guard(&c.mutex)
for !sync.atomic_load(&c.closed) &&
sync.atomic_load(&c.w_waiting) == 0 {
sync.atomic_add(&c.r_waiting, 1)
for !c.closed &&
c.w_waiting == 0 {
c.r_waiting += 1
sync.wait(&c.r_cond, &c.mutex)
sync.atomic_sub(&c.r_waiting, 1)
c.r_waiting -= 1
}
if sync.atomic_load(&c.closed) {
if c.closed {
return
}
mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
sync.signal(&c.w_cond)
ok = true
@@ -260,25 +255,24 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool)
return false
}
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
ok = raw_queue_push(c.queue, msg_in)
if sync.atomic_load(&c.r_waiting) > 0 {
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.w_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
mem.copy(c.unbuffered_data, msg_in, int(c.msg_size))
sync.atomic_add(&c.w_waiting, 1)
if sync.atomic_load(&c.r_waiting) > 0 {
c.w_waiting += 1
if c.r_waiting > 0 {
sync.signal(&c.r_cond)
}
sync.wait(&c.w_cond, &c.mutex)
@@ -303,21 +297,19 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
mem.copy(msg_out, msg, int(c.msg_size))
}
if sync.atomic_load(&c.w_waiting) > 0 {
if c.w_waiting > 0 {
sync.signal(&c.w_cond)
}
return true
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.r_mutex)
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) ||
sync.atomic_load(&c.w_waiting) == 0 {
if c.closed || c.w_waiting == 0 {
return false
}
mem.copy(msg_out, c.unbuffered_data, int(c.msg_size))
sync.atomic_sub(&c.w_waiting, 1)
c.w_waiting -= 1
sync.signal(&c.w_cond)
return true
@@ -360,10 +352,10 @@ close :: proc "contextless" (c: ^Raw_Chan) -> bool {
return false
}
sync.guard(&c.mutex)
if sync.atomic_load(&c.closed) {
if c.closed {
return false
}
sync.atomic_store(&c.closed, true)
c.closed = true
sync.broadcast(&c.r_cond)
sync.broadcast(&c.w_cond)
return true
@@ -375,7 +367,7 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
return true
}
sync.guard(&c.mutex)
return bool(sync.atomic_load(&c.closed))
return bool(c.closed)
}
@@ -434,7 +426,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
if is_buffered(c) {
return c.queue.len > 0
}
return sync.atomic_load(&c.w_waiting) > 0
return c.w_waiting > 0
}
@@ -444,7 +436,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
if is_buffered(c) {
return c.queue.len < c.queue.cap
}
return sync.atomic_load(&c.w_waiting) == 0
return c.w_waiting == 0
}
@@ -493,4 +485,4 @@ select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []
ok = send_raw(sends[sel.idx], send_msgs[sel.idx])
}
return
}
}