summaryrefslogtreecommitdiff
path: root/libgo/runtime/go-select.c
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/runtime/go-select.c')
-rw-r--r--libgo/runtime/go-select.c758
1 files changed, 758 insertions, 0 deletions
diff --git a/libgo/runtime/go-select.c b/libgo/runtime/go-select.c
new file mode 100644
index 000000000..9d9f728f2
--- /dev/null
+++ b/libgo/runtime/go-select.c
@@ -0,0 +1,758 @@
+/* go-select.c -- implement select.
+
+ Copyright 2009 The Go Authors. All rights reserved.
+ Use of this source code is governed by a BSD-style
+ license that can be found in the LICENSE file. */
+
+#include <pthread.h>
+#include <stdarg.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "config.h"
+#include "go-assert.h"
+#include "channel.h"
+
+/* __go_select builds an array of these structures. */
+
+struct select_channel
+{
+ /* The channel being selected. */
+ struct __go_channel* channel;
+ /* If this channel is selected, the value to return. */
+ size_t retval;
+ /* If this channel is a duplicate of one which appears earlier in
+ the array, this is the array index of the earlier channel. This
+ is -1UL if this is not a dup. */
+ size_t dup_index;
+ /* An entry to put on the send or receive queue. */
+ struct __go_channel_select queue_entry;
+ /* True if selected for send. */
+ _Bool is_send;
+ /* True if channel is ready--it has data to receive or space to
+ send. */
+ _Bool is_ready;
+};
+
+/* This mutex controls access to __go_select_cond. This mutex may not
+ be acquired if any channel locks are held. */
+
+static pthread_mutex_t __go_select_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* When we have to wait for channels, we tell them to trigger this
+ condition variable when they send or receive something. */
+
+static pthread_cond_t __go_select_cond = PTHREAD_COND_INITIALIZER;
+
+/* Sort the channels by address. This avoids deadlock when multiple
+ selects are running on overlapping sets of channels. */
+
+static int
+channel_sort (const void *p1, const void *p2)
+{
+ const struct select_channel *c1 = (const struct select_channel *) p1;
+ const struct select_channel *c2 = (const struct select_channel *) p2;
+
+ if ((uintptr_t) c1->channel < (uintptr_t) c2->channel)
+ return -1;
+ else if ((uintptr_t) c1->channel > (uintptr_t) c2->channel)
+ return 1;
+ else
+ return 0;
+}
+
+/* Return whether there is an entry on QUEUE which can be used for a
+ synchronous send or receive. */
+
+static _Bool
+is_queue_ready (struct __go_channel_select *queue)
+{
+ int x;
+
+ if (queue == NULL)
+ return 0;
+
+ x = pthread_mutex_lock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ while (queue != NULL)
+ {
+ if (*queue->selected == NULL)
+ break;
+ queue = queue->next;
+ }
+
+ x = pthread_mutex_unlock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ return queue != NULL;
+}
+
+/* Return whether CHAN is ready. If IS_SEND is true check whether it
+ has space to send, otherwise check whether it has a value to
+ receive. */
+
+static _Bool
+is_channel_ready (struct __go_channel* channel, _Bool is_send)
+{
+ if (is_send)
+ {
+ if (channel->selected_for_send)
+ return 0;
+ if (channel->is_closed)
+ return 1;
+ if (channel->num_entries > 0)
+ {
+ /* An asynchronous channel is ready for sending if there is
+ room in the buffer. */
+ return ((channel->next_store + 1) % channel->num_entries
+ != channel->next_fetch);
+ }
+ else
+ {
+ if (channel->waiting_to_send)
+ {
+ /* Some other goroutine is waiting to send on this
+ channel, so we can't. */
+ return 0;
+ }
+ if (channel->waiting_to_receive)
+ {
+ /* Some other goroutine is waiting to receive a value,
+ so we can send one. */
+ return 1;
+ }
+ if (is_queue_ready (channel->select_receive_queue))
+ {
+ /* There is a select statement waiting to synchronize
+ with this one. */
+ return 1;
+ }
+ return 0;
+ }
+ }
+ else
+ {
+ if (channel->selected_for_receive)
+ return 0;
+ if (channel->is_closed)
+ return 1;
+ if (channel->num_entries > 0)
+ {
+ /* An asynchronous channel is ready for receiving if there
+ is a value in the buffer. */
+ return channel->next_fetch != channel->next_store;
+ }
+ else
+ {
+ if (channel->waiting_to_receive)
+ {
+ /* Some other goroutine is waiting to receive from this
+ channel, so it is not ready for us to receive. */
+ return 0;
+ }
+ if (channel->next_store > 0)
+ {
+ /* There is data on the channel. */
+ return 1;
+ }
+ if (is_queue_ready (channel->select_send_queue))
+ {
+ /* There is a select statement waiting to synchronize
+ with this one. */
+ return 1;
+ }
+ return 0;
+ }
+ }
+}
+
+/* Mark a channel as selected. The channel is locked. IS_SELECTED is
+ true if the channel was selected for us by another goroutine. We
+ set *NEEDS_BROADCAST if we need to broadcast on the select
+ condition variable. Return true if we got it. */
+
+static _Bool
+mark_channel_selected (struct __go_channel *channel, _Bool is_send,
+ _Bool is_selected, _Bool *needs_broadcast)
+{
+ if (channel->num_entries == 0)
+ {
+ /* This is a synchronous channel. If there is no goroutine
+ currently waiting, but there is another select waiting, then
+ we need to tell that select to use this channel. That may
+ fail--there may be no other goroutines currently waiting--as
+ a third goroutine may already have claimed the select. */
+ if (!is_selected
+ && !channel->is_closed
+ && (is_send
+ ? !channel->waiting_to_receive
+ : channel->next_store == 0))
+ {
+ int x;
+ struct __go_channel_select *queue;
+
+ x = pthread_mutex_lock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ queue = (is_send
+ ? channel->select_receive_queue
+ : channel->select_send_queue);
+ __go_assert (queue != NULL);
+
+ while (queue != NULL)
+ {
+ if (*queue->selected == NULL)
+ {
+ *queue->selected = channel;
+ *queue->is_read = !is_send;
+ break;
+ }
+ queue = queue->next;
+ }
+
+ x = pthread_mutex_unlock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ if (queue == NULL)
+ return 0;
+
+ if (is_send)
+ channel->selected_for_receive = 1;
+ else
+ channel->selected_for_send = 1;
+
+ /* We are going to have to tell the other select that there
+ is something to do. */
+ *needs_broadcast = 1;
+ }
+ }
+
+ if (is_send)
+ channel->selected_for_send = 1;
+ else
+ channel->selected_for_receive = 1;
+
+ return 1;
+}
+
+/* Mark a channel to indicate that a select is waiting. The channel
+ is locked. */
+
+static void
+mark_select_waiting (struct select_channel *sc,
+ struct __go_channel **selected_pointer,
+ _Bool *selected_for_read_pointer)
+{
+ struct __go_channel *channel = sc->channel;
+ _Bool is_send = sc->is_send;
+
+ if (channel->num_entries == 0)
+ {
+ struct __go_channel_select **pp;
+
+ pp = (is_send
+ ? &channel->select_send_queue
+ : &channel->select_receive_queue);
+
+ /* Add an entry to the queue of selects on this channel. */
+ sc->queue_entry.next = *pp;
+ sc->queue_entry.selected = selected_pointer;
+ sc->queue_entry.is_read = selected_for_read_pointer;
+
+ *pp = &sc->queue_entry;
+ }
+
+ channel->select_mutex = &__go_select_mutex;
+ channel->select_cond = &__go_select_cond;
+
+ /* We never actually clear the select_mutex and select_cond fields.
+ In order to clear them safely, we would need to have some way of
+ knowing when no select is waiting for the channel. Thus we
+ introduce a bit of inefficiency for every channel that select
+ needs to wait for. This is harmless other than the performance
+ cost. */
+}
+
+/* Remove the entry for this select waiting on this channel. The
+ channel is locked. We check both queues, because the channel may
+ be selected for both reading and writing. */
+
+static void
+clear_select_waiting (struct select_channel *sc,
+ struct __go_channel **selected_pointer)
+{
+ struct __go_channel *channel = sc->channel;
+
+ if (channel->num_entries == 0)
+ {
+ _Bool found;
+ struct __go_channel_select **pp;
+
+ found = 0;
+
+ for (pp = &channel->select_send_queue; *pp != NULL; pp = &(*pp)->next)
+ {
+ if ((*pp)->selected == selected_pointer)
+ {
+ *pp = (*pp)->next;
+ found = 1;
+ break;
+ }
+ }
+
+ for (pp = &channel->select_receive_queue; *pp != NULL; pp = &(*pp)->next)
+ {
+ if ((*pp)->selected == selected_pointer)
+ {
+ *pp = (*pp)->next;
+ found = 1;
+ break;
+ }
+ }
+
+ __go_assert (found);
+ }
+}
+
+/* Look through the list of channels to see which ones are ready.
+ Lock each channels, and set the is_ready flag. Return the number
+ of ready channels. */
+
+static size_t
+lock_channels_find_ready (struct select_channel *channels, size_t count)
+{
+ size_t ready_count;
+ size_t i;
+
+ ready_count = 0;
+ for (i = 0; i < count; ++i)
+ {
+ struct __go_channel *channel = channels[i].channel;
+ _Bool is_send = channels[i].is_send;
+ size_t dup_index = channels[i].dup_index;
+ int x;
+
+ if (channel == NULL)
+ continue;
+
+ if (dup_index != (size_t) -1UL)
+ {
+ if (channels[dup_index].is_ready)
+ {
+ channels[i].is_ready = 1;
+ ++ready_count;
+ }
+ continue;
+ }
+
+ x = pthread_mutex_lock (&channel->lock);
+ __go_assert (x == 0);
+
+ if (is_channel_ready (channel, is_send))
+ {
+ channels[i].is_ready = 1;
+ ++ready_count;
+ }
+ }
+
+ return ready_count;
+}
+
+/* The channel we are going to select has been forced by some other
+ goroutine. SELECTED_CHANNEL is the channel we will use,
+ SELECTED_FOR_READ is whether the other goroutine wants to read from
+ the channel. Note that the channel could be specified multiple
+ times in this select, so we must mark each appropriate entry for
+ this channel as ready. Every other channel is marked as not ready.
+ All the channels are locked before this routine is called. This
+ returns the number of ready channels. */
+
+size_t
+force_selected_channel_ready (struct select_channel *channels, size_t count,
+ struct __go_channel *selected_channel,
+ _Bool selected_for_read)
+{
+ size_t ready_count;
+ size_t i;
+
+ ready_count = 0;
+ for (i = 0; i < count; ++i)
+ {
+ struct __go_channel *channel = channels[i].channel;
+ _Bool is_send = channels[i].is_send;
+
+ if (channel == NULL)
+ continue;
+
+ if (channel != selected_channel
+ || (is_send ? !selected_for_read : selected_for_read))
+ channels[i].is_ready = 0;
+ else
+ {
+ channels[i].is_ready = 1;
+ ++ready_count;
+ }
+ }
+ __go_assert (ready_count > 0);
+ return ready_count;
+}
+
+/* Unlock all the channels. */
+
+static void
+unlock_channels (struct select_channel *channels, size_t count)
+{
+ size_t i;
+ int x;
+
+ for (i = 0; i < count; ++i)
+ {
+ struct __go_channel *channel = channels[i].channel;
+
+ if (channel == NULL)
+ continue;
+
+ if (channels[i].dup_index != (size_t) -1UL)
+ continue;
+
+ x = pthread_mutex_unlock (&channel->lock);
+ __go_assert (x == 0);
+ }
+}
+
+/* At least one channel is ready. Randomly pick a channel to return.
+ Unlock all the channels. IS_SELECTED is true if the channel was
+ picked for us by some other goroutine. If SELECTED_POINTER is not
+ NULL, remove it from the queue for all the channels. Return the
+ retval field of the selected channel. This will return 0 if we
+ can't use the selected channel, because it relied on synchronizing
+ with some other select, and that select already synchronized with a
+ different channel. */
+
+static size_t
+unlock_channels_and_select (struct select_channel *channels,
+ size_t count, size_t ready_count,
+ _Bool is_selected,
+ struct __go_channel **selected_pointer)
+{
+ size_t selected;
+ size_t ret;
+ _Bool needs_broadcast;
+ size_t i;
+ int x;
+
+ /* Pick which channel we are going to return. */
+#if defined(HAVE_RANDOM)
+ selected = (size_t) random () % ready_count;
+#else
+ selected = (size_t) rand () % ready_count;
+#endif
+ ret = 0;
+ needs_broadcast = 0;
+
+ /* Look at the channels in reverse order so that we don't unlock a
+ duplicated channel until we have seen all its dups. */
+ for (i = 0; i < count; ++i)
+ {
+ size_t j = count - i - 1;
+ struct __go_channel *channel = channels[j].channel;
+ _Bool is_send = channels[j].is_send;
+
+ if (channel == NULL)
+ continue;
+
+ if (channels[j].is_ready)
+ {
+ if (selected == 0)
+ {
+ if (mark_channel_selected (channel, is_send, is_selected,
+ &needs_broadcast))
+ ret = channels[j].retval;
+ }
+
+ --selected;
+ }
+
+ if (channels[j].dup_index == (size_t) -1UL)
+ {
+ if (selected_pointer != NULL)
+ clear_select_waiting (&channels[j], selected_pointer);
+
+ x = pthread_mutex_unlock (&channel->lock);
+ __go_assert (x == 0);
+ }
+ }
+
+ /* The NEEDS_BROADCAST variable is set if we are synchronizing with
+ some other select statement. We can't do the actual broadcast
+ until we have unlocked all the channels. */
+
+ if (needs_broadcast)
+ {
+ x = pthread_mutex_lock (&__go_select_mutex);
+ __go_assert (x == 0);
+
+ x = pthread_cond_broadcast (&__go_select_cond);
+ __go_assert (x == 0);
+
+ x = pthread_mutex_unlock (&__go_select_mutex);
+ __go_assert (x == 0);
+ }
+
+ return ret;
+}
+
+/* Mark all channels to show that we are waiting for them. This is
+ called with the select mutex held, but none of the channels are
+ locked. This returns true if some channel was found to be
+ ready. */
+
+static _Bool
+mark_all_channels_waiting (struct select_channel* channels, size_t count,
+ struct __go_channel **selected_pointer,
+ _Bool *selected_for_read_pointer)
+{
+ _Bool ret;
+ int x;
+ size_t i;
+
+ ret = 0;
+ for (i = 0; i < count; ++i)
+ {
+ struct __go_channel *channel = channels[i].channel;
+ _Bool is_send = channels[i].is_send;
+
+ if (channel == NULL)
+ continue;
+
+ if (channels[i].dup_index != (size_t) -1UL)
+ {
+ size_t j;
+
+ /* A channel may be selected for both read and write. */
+ if (channels[channels[i].dup_index].is_send != is_send)
+ {
+ for (j = channels[i].dup_index + 1; j < i; ++j)
+ {
+ if (channels[j].channel == channel
+ && channels[j].is_send == is_send)
+ break;
+ }
+ if (j < i)
+ continue;
+ }
+ }
+
+ x = pthread_mutex_lock (&channel->lock);
+ __go_assert (x == 0);
+
+ /* To avoid a race condition, we have to check again whether the
+ channel is ready. It may have become ready since we did the
+ first set of checks but before we acquired the select mutex.
+ If we don't check here, we could sleep forever on the select
+ condition variable. */
+ if (is_channel_ready (channel, is_send))
+ ret = 1;
+
+ /* If SELECTED_POINTER is NULL, then we have already marked the
+ channel as waiting. */
+ if (selected_pointer != NULL)
+ mark_select_waiting (&channels[i], selected_pointer,
+ selected_for_read_pointer);
+
+ x = pthread_mutex_unlock (&channel->lock);
+ __go_assert (x == 0);
+ }
+
+ return ret;
+}
+
+/* Implement select. This is called by the compiler-generated code
+ with pairs of arguments: a pointer to a channel, and an int which
+ is non-zero for send, zero for receive. */
+
+size_t
+__go_select (size_t count, _Bool has_default,
+ struct __go_channel **channel_args, _Bool *is_send_args)
+{
+ struct select_channel stack_buffer[16];
+ struct select_channel *allocated_buffer;
+ struct select_channel *channels;
+ size_t i;
+ int x;
+ struct __go_channel *selected_channel;
+ _Bool selected_for_read;
+ _Bool is_queued;
+
+ if (count < sizeof stack_buffer / sizeof stack_buffer[0])
+ {
+ channels = &stack_buffer[0];
+ allocated_buffer = NULL;
+ }
+ else
+ {
+ allocated_buffer = ((struct select_channel *)
+ malloc (count * sizeof (struct select_channel)));
+ channels = allocated_buffer;
+ }
+
+ for (i = 0; i < count; ++i)
+ {
+ struct __go_channel *channel_arg = channel_args[i];
+ _Bool is_send = is_send_args[i];
+
+ channels[i].channel = (struct __go_channel*) channel_arg;
+ channels[i].retval = i + 1;
+ channels[i].dup_index = (size_t) -1UL;
+ channels[i].queue_entry.next = NULL;
+ channels[i].queue_entry.selected = NULL;
+ channels[i].is_send = is_send;
+ channels[i].is_ready = 0;
+ }
+
+ qsort (channels, count, sizeof (struct select_channel), channel_sort);
+
+ for (i = 0; i < count; ++i)
+ {
+ size_t j;
+
+ for (j = 0; j < i; ++j)
+ {
+ if (channels[j].channel == channels[i].channel)
+ {
+ channels[i].dup_index = j;
+ break;
+ }
+ }
+ }
+
+ /* SELECT_CHANNEL is used to select synchronized channels. If no
+ channels are ready, we store a pointer to this variable on the
+ select queue for each synchronized channel. Because the variable
+ may be set by channel operations running in other goroutines,
+ SELECT_CHANNEL may only be accessed when all the channels are
+ locked and/or when the select_data_mutex is locked. */
+ selected_channel = NULL;
+
+ /* SELECTED_FOR_READ is set to true if SELECTED_CHANNEL was set by a
+ goroutine which wants to read from the channel. The access
+ restrictions for this are like those for SELECTED_CHANNEL. */
+ selected_for_read = 0;
+
+ /* IS_QUEUED is true if we have queued up this select on the queues
+ for any associated synchronous channels. We only do this if no
+ channels are ready the first time around the loop. */
+ is_queued = 0;
+
+ while (1)
+ {
+ int ready_count;
+ _Bool is_selected;
+
+ /* Lock all channels, identify which ones are ready. */
+ ready_count = lock_channels_find_ready (channels, count);
+
+ /* All the channels are locked, so we can look at
+ SELECTED_CHANNEL. If it is not NULL, then our choice has
+ been forced by some other goroutine. This can only happen
+ after the first time through the loop. */
+ is_selected = selected_channel != NULL;
+ if (is_selected)
+ ready_count = force_selected_channel_ready (channels, count,
+ selected_channel,
+ selected_for_read);
+
+ if (ready_count > 0)
+ {
+ size_t ret;
+
+ ret = unlock_channels_and_select (channels, count, ready_count,
+ is_selected,
+ (is_queued
+ ? &selected_channel
+ : NULL));
+
+ /* If RET is zero, it means that the channel we picked
+ turned out not to be ready, because some other select
+ grabbed it during our traversal. Loop around and try
+ again. */
+ if (ret == 0)
+ {
+ is_queued = 0;
+ /* We are no longer on any channel queues, so it is safe
+ to touch SELECTED_CHANNEL here. It must be NULL,
+ because otherwise that would somebody has promised to
+ synch up with us and then failed to do so. */
+ __go_assert (selected_channel == NULL);
+ continue;
+ }
+
+ if (allocated_buffer != NULL)
+ free (allocated_buffer);
+
+ return ret;
+ }
+
+ /* No channels were ready. */
+
+ unlock_channels (channels, count);
+
+ if (has_default)
+ {
+ /* Use the default clause. */
+ if (allocated_buffer != NULL)
+ free (allocated_buffer);
+ return 0;
+ }
+
+ /* This is a blocking select. Grab the select lock, tell all
+ the channels to notify us when something happens, and wait
+ for something to happen. */
+
+ x = pthread_mutex_lock (&__go_select_mutex);
+ __go_assert (x == 0);
+
+ /* Check whether CHANNEL_SELECTED was set while the channels
+ were unlocked. If it was set, then we can simply loop around
+ again. We need to check this while the select mutex is held.
+ It is possible that something will set CHANNEL_SELECTED while
+ we mark the channels as waiting. If this happens, that
+ goroutine is required to signal the select condition
+ variable, which means acquiring the select mutex. Since we
+ have the select mutex locked ourselves, we can not miss that
+ signal. */
+
+ x = pthread_mutex_lock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ is_selected = selected_channel != NULL;
+
+ x = pthread_mutex_unlock (&__go_select_data_mutex);
+ __go_assert (x == 0);
+
+ if (!is_selected)
+ {
+ /* Mark the channels as waiting, and check whether they have
+ become ready. */
+ if (!mark_all_channels_waiting (channels, count,
+ (is_queued
+ ? NULL
+ : &selected_channel),
+ (is_queued
+ ? NULL
+ : &selected_for_read)))
+ {
+ x = pthread_cond_wait (&__go_select_cond, &__go_select_mutex);
+ __go_assert (x == 0);
+ }
+
+ is_queued = 1;
+ }
+
+ x = pthread_mutex_unlock (&__go_select_mutex);
+ __go_assert (x == 0);
+ }
+}