diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin new file mode 100644 index 000000000..fbd11be99 --- /dev/null +++ b/core/sync/chan/chan.odin @@ -0,0 +1,401 @@ +package sync_chan + +import "base:builtin" +import "base:intrinsics" +import "base:runtime" +import "core:mem" +import "core:sync" +import "core:math/rand" + +_ :: runtime +_ :: mem +_ :: sync + + + +Direction :: enum { + Send = -1, + Both = 0, + Recv = +1, +} + +Chan :: struct($T: typeid, $D: Direction = Direction.Both) { + #subtype impl: ^Raw_Chan, +} + +Raw_Chan :: struct { + allocator: runtime.Allocator, + allocation_size: int, + + // Buffered + queue: ^Raw_Queue, + + // Unbuffered + r_mutex: sync.Mutex, + w_mutex: sync.Mutex, + unbuffered_data: rawptr, + msg_size: int, + + // Shared + mutex: sync.Mutex, + r_cond: sync.Cond, + w_cond: sync.Cond, + closed: bool, // atomic + r_waiting: int, // atomic + w_waiting: int, // atomic +} + + +create :: proc{ + create_unbuffered, + create_buffered, +} + +@(require_results) +create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) { + c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator) + return +} + +@(require_results) +create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) { + c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator) + return +} + +create_raw :: proc{ + create_raw_unbuffered, + create_raw_buffered, +} + +@(require_results) +create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { + align := max(align_of(Raw_Chan), msg_alignment) + + size := mem.align_forward_int(size_of(Raw_Chan), align) + offset := size + size += msg_size + size = mem.align_forward_int(size, align) + + ptr := mem.alloc(size, align, allocator) or_return + c = (^Raw_Chan)(ptr) + c.allocation_size = size + c.unbuffered_data = ([^]byte)(ptr)[offset:] + c.msg_size = msg_size + return +} + +@(require_results) +create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { + if cap <= 0 { + return create_raw_unbuffered(msg_size, msg_alignment, allocator) + } + + align := max(align_of(Raw_Chan), msg_alignment, align_of(Raw_Queue)) + + size := mem.align_forward_int(size_of(Raw_Chan), align) + q_offset := size + size = mem.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment) + offset := size + size += msg_size * (cap+1) + size = mem.align_forward_int(size, align) + + ptr := mem.alloc(size, align, allocator) or_return + c = (^Raw_Chan)(ptr) + c.allocation_size = size + + bptr := ([^]byte)(ptr) + + c.queue = (^Raw_Queue)(bptr[q_offset:]) + c.msg_size = msg_size + + items := ([^]byte)(bptr[offset:]) + c.unbuffered_data = items + raw_queue_init(c.queue, items[msg_size:], cap, msg_size) + return +} + +destroy :: proc(c: ^Raw_Chan) -> runtime.Allocator_Error { + if c != nil { + allocator := c.allocator + return mem.free_with_size(c, c.allocation_size, allocator) + } + return nil +} + +@(require_results) +as_send :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (s: Chan(T, .Send)) where C.D <= .Both { + return transmute(type_of(s))c +} +@(require_results) +as_recv :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (r: Chan(T, .Recv)) where C.D >= .Both { + return transmute(type_of(r))c +} + + +send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both { + data := data + ok = send_raw(c, &data) + return +} + +@(require_results) +recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both { + ok = recv_raw(c, &data) + return +} + + +@(require_results) +send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { + if c == nil { + return + } + if c.queue != nil { // buffered + sync.guard(&c.mutex) + for c.queue.len == c.queue.cap { + sync.atomic_add(&c.w_waiting, 1) + sync.wait(&c.w_cond, &c.mutex) + sync.atomic_sub(&c.w_waiting, 1) + } + + ok = raw_queue_push(c.queue, msg_in) + if sync.atomic_load(&c.r_waiting) > 0 { + sync.signal(&c.r_cond) + } + } else if c.unbuffered_data != nil { // unbuffered + sync.guard(&c.w_mutex) + sync.guard(&c.mutex) + + if sync.atomic_load(&c.closed) { + return false + } + + mem.copy(c.unbuffered_data, msg_in, c.msg_size) + sync.atomic_add(&c.w_waiting, 1) + if sync.atomic_load(&c.r_waiting) > 0 { + sync.signal(&c.r_cond) + } + sync.wait(&c.w_cond, &c.mutex) + ok = true + } + return +} + +@(require_results) +recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { + if c == nil { + return + } + if c.queue != nil { // buffered + sync.guard(&c.mutex) + for c.queue.len == 0 { + if sync.atomic_load(&c.closed) { + return + } + + sync.atomic_add(&c.r_waiting, 1) + sync.wait(&c.r_cond, &c.mutex) + sync.atomic_sub(&c.r_waiting, 1) + } + + msg := raw_queue_pop(c.queue) + if msg != nil { + mem.copy(msg_out, msg, c.msg_size) + } + + if sync.atomic_load(&c.w_waiting) > 0 { + sync.signal(&c.w_cond) + } + ok = true + } else if c.unbuffered_data != nil { // unbuffered + sync.guard(&c.r_mutex) + sync.guard(&c.mutex) + + for !sync.atomic_load(&c.closed) && + sync.atomic_load(&c.w_waiting) == 0 { + sync.atomic_add(&c.r_waiting, 1) + sync.wait(&c.r_cond, &c.mutex) + sync.atomic_sub(&c.r_waiting, 1) + } + + if sync.atomic_load(&c.closed) { + return + } + + mem.copy(msg_out, c.unbuffered_data, c.msg_size) + sync.atomic_sub(&c.w_waiting, 1) + + sync.signal(&c.w_cond) + ok = true + } + return +} + + +@(require_results) +is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool { + return c != nil && c.queue != nil +} + +@(require_results) +len :: proc "contextless" (c: ^Raw_Chan) -> int { + if c != nil && c.queue != nil { + sync.guard(&c.mutex) + return c.queue.len + } + return 0 +} + +@(require_results) +cap :: proc "contextless" (c: ^Raw_Chan) -> int { + if c != nil && c.queue != nil { + sync.guard(&c.mutex) + return c.queue.cap + } + return 0 +} + +close :: proc "contextless" (c: ^Raw_Chan) -> bool { + if c == nil { + return false + } + sync.guard(&c.mutex) + if sync.atomic_load(&c.closed) { + return false + } + sync.atomic_store(&c.closed, true) + sync.broadcast(&c.r_cond) + sync.broadcast(&c.w_cond) + return true +} + +@(require_results) +is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { + if c == nil { + return true + } + sync.guard(&c.mutex) + return sync.atomic_load(&c.closed) +} + + + + +Raw_Queue :: struct { + data: [^]byte, + len: int, + cap: int, + next: int, + size: int, // element size +} + +raw_queue_init :: proc "contextless" (q: ^Raw_Queue, data: rawptr, cap: int, size: int) { + q.data = ([^]byte)(data) + q.len = 0 + q.cap = cap + q.next = 0 + q.size = size +} + + +@(require_results) +raw_queue_push :: proc "contextless" (q: ^Raw_Queue, data: rawptr) -> bool { + if q.len == q.cap { + return false + } + pos := q.next + q.len + if pos >= q.cap { + pos -= q.cap + } + + val_ptr := q.data[pos*q.size:] + mem.copy(val_ptr, data, q.size) + q.len += 1 + return true +} + +@(require_results) +raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) { + if q.len > 0 { + data = q.data[q.next*q.size:] + q.next += 1 + q.len -= 1 + if q.next >= q.cap { + q.next -= q.cap + } + } + return +} + + +@(require_results) +can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { + if is_buffered(c) { + return len(c) > 0 + } + sync.guard(&c.mutex) + return sync.atomic_load(&c.w_waiting) > 0 +} + + +@(require_results) +can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { + if is_buffered(c) { + sync.guard(&c.mutex) + return len(c) < cap(c) + } + sync.guard(&c.mutex) + return sync.atomic_load(&c.r_waiting) > 0 +} + + + +@(require_results) +select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, ok: bool) #no_bounds_check { + Select_Op :: struct { + idx: int, // local to the slice that was given + is_recv: bool, + } + + candidate_count := builtin.len(recvs)+builtin.len(sends) + candidates := ([^]Select_Op)(intrinsics.alloca(candidate_count*size_of(Select_Op), align_of(Select_Op))) + count := 0 + + for c, i in recvs { + if can_recv(c) { + candidates[count] = { + is_recv = true, + idx = i, + } + count += 1 + } + } + + for c, i in sends { + if can_send(c) { + candidates[count] = { + is_recv = false, + idx = i, + } + count += 1 + } + } + + if count == 0 { + return + } + + r: ^rand.Rand = nil + + + select_idx = rand.int_max(count, r) if count > 0 else 0 + + sel := candidates[select_idx] + if sel.is_recv { + ok = recv_raw(recvs[sel.idx], recv_out) + } else { + ok = send_raw(sends[sel.idx], send_msgs[sel.idx]) + } + return +} \ No newline at end of file