From b633a42bc2a4dc0102d8d81c812c98282add86eb Mon Sep 17 00:00:00 2001 From: gingerBill Date: Fri, 26 Jun 2020 19:16:17 +0100 Subject: [PATCH] Revert channel.odin --- core/sync/channel.odin | 191 ++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 90 deletions(-) diff --git a/core/sync/channel.odin b/core/sync/channel.odin index 809f1bf39..ae9a196c5 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -2,7 +2,6 @@ package sync import "core:mem" import "core:time" -import "core:fmt" import "core:math/rand" _, _ :: time, rand; @@ -18,16 +17,13 @@ _Channel_Internal :: struct(T: typeid) { unbuffered_msg: T, // Will be used as the backing to the queue if no `cap` is given - mutex: Mutex, - r_mutex: Mutex, - w_mutex: Mutex, - r_cond: Condition, - w_cond: Condition, + mutex: Mutex, + r_cond: Condition, + w_cond: Condition, - is_buffered: bool, - is_closed: bool, - r_waiting: int, - w_waiting: int, + closed: bool, + r_waiting: int, + w_waiting: int, } channel_init :: proc(c: ^$C/Channel($T), cap: int = 0, allocator := context.allocator) { @@ -42,20 +38,16 @@ channel_make :: proc($T: typeid, cap: int = 0, allocator := context.allocator) - ch.allocator = allocator; mutex_init(&ch.mutex); - mutex_init(&ch.r_mutex); - mutex_init(&ch.w_mutex); condition_init(&ch.r_cond, &ch.mutex); condition_init(&ch.w_cond, &ch.mutex); - ch.is_closed = false; + ch.closed = false; ch.r_waiting = 0; ch.w_waiting = 0; ch.unbuffered_msg = T{}; if cap > 0 { - ch.is_buffered = true; ch.queue = make([dynamic]T, 0, cap, ch.allocator); } else { - ch.is_buffered = false; d := mem.Raw_Dynamic_Array{ data = &ch.unbuffered_msg, len = 0, @@ -75,8 +67,6 @@ channel_destroy :: proc(ch: $C/Channel($T)) { } mutex_destroy(&ch.mutex); - mutex_destroy(&ch.r_mutex); - mutex_destroy(&ch.w_mutex); condition_destroy(&ch.r_cond); condition_destroy(&ch.w_cond); free(ch.internal, ch.allocator); @@ -85,8 +75,8 @@ channel_destroy :: proc(ch: $C/Channel($T)) { channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) { mutex_lock(&ch.mutex); - if !ch.is_closed { - ch.is_closed = true; + if !ch.closed { + ch.closed = true; condition_broadcast(&ch.r_cond); condition_broadcast(&ch.w_cond); ok = true; @@ -99,45 +89,25 @@ channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) { channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - // fmt.println("channel_write"); - // defer fmt.println("channel_write done"); - if ch.is_closed { + if ch.closed { return; } - for !channel_can_write(ch) { + + for len(ch.queue) == cap(ch.queue) { ch.w_waiting += 1; condition_wait_for(&ch.w_cond); ch.w_waiting -= 1; } - if ch.is_buffered { - if len(ch.queue) < cap(ch.queue) { - append(&ch.queue, msg); - ok = true; - } - - if ch.r_waiting > 0 { - condition_signal(&ch.r_cond); - } - } else { - for len(ch.queue) == cap(ch.queue) { - ch.w_waiting += 1; - condition_wait_for(&ch.w_cond); - ch.w_waiting -= 1; - } - assert(len(ch.queue) < cap(ch.queue)); + if len(ch.queue) < cap(ch.queue) { append(&ch.queue, msg); ok = true; - assert(ch.w_waiting >= 0); - ch.w_waiting += 1; + } - if ch.r_waiting > 0 { - condition_signal(&ch.r_cond); - } - - condition_wait_for(&ch.w_cond); + if ch.r_waiting > 0 { + condition_signal(&ch.r_cond); } return; @@ -146,41 +116,27 @@ channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) { channel_read :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) #optional_ok { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - // fmt.println("channel_read"); - // defer fmt.println("channel_read done"); - if ch.is_closed { - return; - } - for !channel_can_read(ch) { + for len(ch.queue) == 0 { + if ch.closed { + return; + } + ch.r_waiting += 1; condition_wait_for(&ch.r_cond); ch.r_waiting -= 1; } - if ch.is_closed { - return; - } - if ch.is_buffered { - assert(len(ch.queue) > 0); - msg, ok = pop_front_safe(&ch.queue); + msg, ok = pop_front(&ch.queue); - if ch.w_waiting > 0 { - condition_signal(&ch.w_cond); - } - } else { - assert(ch.w_waiting > 0); - assert(len(ch.queue) > 0); - msg, ok = pop_front_safe(&ch.queue); - - ch.w_waiting -= 1; + if ch.w_waiting > 0 { condition_signal(&ch.w_cond); } return; } -channel_len :: proc(ch: $C/Channel($T)) -> (size: int) { +channel_size :: proc(ch: $C/Channel($T)) -> (size: int) { if channel_is_buffered(ch) { mutex_lock(&ch.mutex); size = len(ch.queue); @@ -191,56 +147,111 @@ channel_len :: proc(ch: $C/Channel($T)) -> (size: int) { channel_is_closed :: proc(ch: $C/Channel($T)) -> bool { mutex_lock(&ch.mutex); - closed := ch.is_closed; + closed := ch.closed; mutex_unlock(&ch.mutex); return closed; } channel_is_buffered :: proc(ch: $C/Channel($T)) -> bool { - return ch.is_buffered; + q := transmute(mem.Raw_Dynamic_Array)ch.queue; + return q.cap != 0 && (q.data != &ch.unbuffered_msg); } channel_can_write :: proc(ch: $C/Channel($T)) -> bool { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - if ch.is_closed { - return false; - } - if ch.is_buffered { - return len(ch.queue) < cap(ch.queue); - } - return ch.r_waiting > 0; + return len(ch.queue) < cap(ch.queue); } channel_can_read :: proc(ch: $C/Channel($T)) -> bool { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - if ch.is_buffered { - return len(ch.queue) > 0; - } - return ch.w_waiting > 0; + return len(ch.queue) > 0; } channel_can_read_write :: proc(ch: $C/Channel($T)) -> bool { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - if ch.is_buffered { - return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue); - } - return ch.r_waiting > 0 && ch.w_waiting > 0; + return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue); } channel_iterator :: proc(ch: $C/Channel($T)) -> (elem: T, ok: bool) { mutex_lock(&ch.mutex); defer mutex_unlock(&ch.mutex); - if ch.is_buffered { - if len(ch.queue) > 0 { - return channel_read(ch); - } - } else if ch.w_waiting > 0 { + if len(ch.queue) > 0 { return channel_read(ch); } return T{}, false; } + + + +channel_select :: proc(readers, writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) { + Candidate :: struct { + ch: C, + msg: T, + index: int, + read: bool, + }; + + count := 0; + candidates := make([]Candidate, len(readers) + len(writers)); + defer delete(candidates); + + for c, i in readers { + if channel_can_read(c) { + candidates[count] = { + ch = c, + index = i, + read = true, + }; + count += 1; + } + } + + for c, i in writers { + if channel_can_write(c) { + candidates[count] = { + ch = c, + index = count, + read = false, + msg = write_msgs[i], + }; + count += 1; + } + } + + if count == 0 { + return T{}, -1; + } + + // Randomize the input + r := rand.create(time.read_cycle_counter()); + s := candidates[rand.int_max(count, &r)]; + if s.read { + ok: bool; + if read_msg, ok = channel_read(s.ch); !ok { + index = -1; + return; + } + } else { + if !channel_write(s.ch, s.msg) { + index = -1; + return; + } + } + + index = s.index; + return; +} + + +channel_select_write :: proc(writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) { + return channel_select([]C{}, writers, msg); +} +channel_select_read :: proc(readers: []$C/Channel($T)) -> (index: int) { + _, index = channel_select(readers, []C{}, nil); + return; +}