1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
|
/* go-select.c -- implement select.
Copyright 2009 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file. */
#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include "config.h"
#include "go-assert.h"
#include "channel.h"
/* __go_select builds an array of these structures. */
struct select_channel
{
/* The channel being selected. */
struct __go_channel* channel;
/* If this channel is selected, the value to return. */
size_t retval;
/* If this channel is a duplicate of one which appears earlier in
the array, this is the array index of the earlier channel. This
is -1UL if this is not a dup. */
size_t dup_index;
/* An entry to put on the send or receive queue. */
struct __go_channel_select queue_entry;
/* True if selected for send. */
_Bool is_send;
/* True if channel is ready--it has data to receive or space to
send. */
_Bool is_ready;
};
/* This mutex controls access to __go_select_cond. This mutex may not
be acquired if any channel locks are held. */
static pthread_mutex_t __go_select_mutex = PTHREAD_MUTEX_INITIALIZER;
/* When we have to wait for channels, we tell them to trigger this
condition variable when they send or receive something. */
static pthread_cond_t __go_select_cond = PTHREAD_COND_INITIALIZER;
/* Sort the channels by address. This avoids deadlock when multiple
selects are running on overlapping sets of channels. */
static int
channel_sort (const void *p1, const void *p2)
{
const struct select_channel *c1 = (const struct select_channel *) p1;
const struct select_channel *c2 = (const struct select_channel *) p2;
if ((uintptr_t) c1->channel < (uintptr_t) c2->channel)
return -1;
else if ((uintptr_t) c1->channel > (uintptr_t) c2->channel)
return 1;
else
return 0;
}
/* Return whether there is an entry on QUEUE which can be used for a
synchronous send or receive. */
static _Bool
is_queue_ready (struct __go_channel_select *queue)
{
int x;
if (queue == NULL)
return 0;
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
while (queue != NULL)
{
if (*queue->selected == NULL)
break;
queue = queue->next;
}
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
return queue != NULL;
}
/* Return whether CHAN is ready. If IS_SEND is true check whether it
has space to send, otherwise check whether it has a value to
receive. */
static _Bool
is_channel_ready (struct __go_channel* channel, _Bool is_send)
{
if (is_send)
{
if (channel->selected_for_send)
return 0;
if (channel->is_closed)
return 1;
if (channel->num_entries > 0)
{
/* An asynchronous channel is ready for sending if there is
room in the buffer. */
return ((channel->next_store + 1) % channel->num_entries
!= channel->next_fetch);
}
else
{
if (channel->waiting_to_send)
{
/* Some other goroutine is waiting to send on this
channel, so we can't. */
return 0;
}
if (channel->waiting_to_receive)
{
/* Some other goroutine is waiting to receive a value,
so we can send one. */
return 1;
}
if (is_queue_ready (channel->select_receive_queue))
{
/* There is a select statement waiting to synchronize
with this one. */
return 1;
}
return 0;
}
}
else
{
if (channel->selected_for_receive)
return 0;
if (channel->is_closed)
return 1;
if (channel->num_entries > 0)
{
/* An asynchronous channel is ready for receiving if there
is a value in the buffer. */
return channel->next_fetch != channel->next_store;
}
else
{
if (channel->waiting_to_receive)
{
/* Some other goroutine is waiting to receive from this
channel, so it is not ready for us to receive. */
return 0;
}
if (channel->next_store > 0)
{
/* There is data on the channel. */
return 1;
}
if (is_queue_ready (channel->select_send_queue))
{
/* There is a select statement waiting to synchronize
with this one. */
return 1;
}
return 0;
}
}
}
/* Mark a channel as selected. The channel is locked. IS_SELECTED is
true if the channel was selected for us by another goroutine. We
set *NEEDS_BROADCAST if we need to broadcast on the select
condition variable. Return true if we got it. */
static _Bool
mark_channel_selected (struct __go_channel *channel, _Bool is_send,
_Bool is_selected, _Bool *needs_broadcast)
{
if (channel->num_entries == 0)
{
/* This is a synchronous channel. If there is no goroutine
currently waiting, but there is another select waiting, then
we need to tell that select to use this channel. That may
fail--there may be no other goroutines currently waiting--as
a third goroutine may already have claimed the select. */
if (!is_selected
&& !channel->is_closed
&& (is_send
? !channel->waiting_to_receive
: channel->next_store == 0))
{
int x;
struct __go_channel_select *queue;
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
queue = (is_send
? channel->select_receive_queue
: channel->select_send_queue);
__go_assert (queue != NULL);
while (queue != NULL)
{
if (*queue->selected == NULL)
{
*queue->selected = channel;
*queue->is_read = !is_send;
break;
}
queue = queue->next;
}
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
if (queue == NULL)
return 0;
if (is_send)
channel->selected_for_receive = 1;
else
channel->selected_for_send = 1;
/* We are going to have to tell the other select that there
is something to do. */
*needs_broadcast = 1;
}
}
if (is_send)
channel->selected_for_send = 1;
else
channel->selected_for_receive = 1;
return 1;
}
/* Mark a channel to indicate that a select is waiting. The channel
is locked. */
static void
mark_select_waiting (struct select_channel *sc,
struct __go_channel **selected_pointer,
_Bool *selected_for_read_pointer)
{
struct __go_channel *channel = sc->channel;
_Bool is_send = sc->is_send;
if (channel->num_entries == 0)
{
struct __go_channel_select **pp;
pp = (is_send
? &channel->select_send_queue
: &channel->select_receive_queue);
/* Add an entry to the queue of selects on this channel. */
sc->queue_entry.next = *pp;
sc->queue_entry.selected = selected_pointer;
sc->queue_entry.is_read = selected_for_read_pointer;
*pp = &sc->queue_entry;
}
channel->select_mutex = &__go_select_mutex;
channel->select_cond = &__go_select_cond;
/* We never actually clear the select_mutex and select_cond fields.
In order to clear them safely, we would need to have some way of
knowing when no select is waiting for the channel. Thus we
introduce a bit of inefficiency for every channel that select
needs to wait for. This is harmless other than the performance
cost. */
}
/* Remove the entry for this select waiting on this channel. The
channel is locked. We check both queues, because the channel may
be selected for both reading and writing. */
static void
clear_select_waiting (struct select_channel *sc,
struct __go_channel **selected_pointer)
{
struct __go_channel *channel = sc->channel;
if (channel->num_entries == 0)
{
_Bool found;
struct __go_channel_select **pp;
found = 0;
for (pp = &channel->select_send_queue; *pp != NULL; pp = &(*pp)->next)
{
if ((*pp)->selected == selected_pointer)
{
*pp = (*pp)->next;
found = 1;
break;
}
}
for (pp = &channel->select_receive_queue; *pp != NULL; pp = &(*pp)->next)
{
if ((*pp)->selected == selected_pointer)
{
*pp = (*pp)->next;
found = 1;
break;
}
}
__go_assert (found);
}
}
/* Look through the list of channels to see which ones are ready.
Lock each channels, and set the is_ready flag. Return the number
of ready channels. */
static size_t
lock_channels_find_ready (struct select_channel *channels, size_t count)
{
size_t ready_count;
size_t i;
ready_count = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
size_t dup_index = channels[i].dup_index;
int x;
if (channel == NULL)
continue;
if (dup_index != (size_t) -1UL)
{
if (channels[dup_index].is_ready)
{
channels[i].is_ready = 1;
++ready_count;
}
continue;
}
x = pthread_mutex_lock (&channel->lock);
__go_assert (x == 0);
if (is_channel_ready (channel, is_send))
{
channels[i].is_ready = 1;
++ready_count;
}
}
return ready_count;
}
/* The channel we are going to select has been forced by some other
goroutine. SELECTED_CHANNEL is the channel we will use,
SELECTED_FOR_READ is whether the other goroutine wants to read from
the channel. Note that the channel could be specified multiple
times in this select, so we must mark each appropriate entry for
this channel as ready. Every other channel is marked as not ready.
All the channels are locked before this routine is called. This
returns the number of ready channels. */
size_t
force_selected_channel_ready (struct select_channel *channels, size_t count,
struct __go_channel *selected_channel,
_Bool selected_for_read)
{
size_t ready_count;
size_t i;
ready_count = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
if (channel == NULL)
continue;
if (channel != selected_channel
|| (is_send ? !selected_for_read : selected_for_read))
channels[i].is_ready = 0;
else
{
channels[i].is_ready = 1;
++ready_count;
}
}
__go_assert (ready_count > 0);
return ready_count;
}
/* Unlock all the channels. */
static void
unlock_channels (struct select_channel *channels, size_t count)
{
size_t i;
int x;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
if (channel == NULL)
continue;
if (channels[i].dup_index != (size_t) -1UL)
continue;
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
}
/* At least one channel is ready. Randomly pick a channel to return.
Unlock all the channels. IS_SELECTED is true if the channel was
picked for us by some other goroutine. If SELECTED_POINTER is not
NULL, remove it from the queue for all the channels. Return the
retval field of the selected channel. This will return 0 if we
can't use the selected channel, because it relied on synchronizing
with some other select, and that select already synchronized with a
different channel. */
static size_t
unlock_channels_and_select (struct select_channel *channels,
size_t count, size_t ready_count,
_Bool is_selected,
struct __go_channel **selected_pointer)
{
size_t selected;
size_t ret;
_Bool needs_broadcast;
size_t i;
int x;
/* Pick which channel we are going to return. */
#if defined(HAVE_RANDOM)
selected = (size_t) random () % ready_count;
#else
selected = (size_t) rand () % ready_count;
#endif
ret = 0;
needs_broadcast = 0;
/* Look at the channels in reverse order so that we don't unlock a
duplicated channel until we have seen all its dups. */
for (i = 0; i < count; ++i)
{
size_t j = count - i - 1;
struct __go_channel *channel = channels[j].channel;
_Bool is_send = channels[j].is_send;
if (channel == NULL)
continue;
if (channels[j].is_ready)
{
if (selected == 0)
{
if (mark_channel_selected (channel, is_send, is_selected,
&needs_broadcast))
ret = channels[j].retval;
}
--selected;
}
if (channels[j].dup_index == (size_t) -1UL)
{
if (selected_pointer != NULL)
clear_select_waiting (&channels[j], selected_pointer);
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
}
/* The NEEDS_BROADCAST variable is set if we are synchronizing with
some other select statement. We can't do the actual broadcast
until we have unlocked all the channels. */
if (needs_broadcast)
{
x = pthread_mutex_lock (&__go_select_mutex);
__go_assert (x == 0);
x = pthread_cond_broadcast (&__go_select_cond);
__go_assert (x == 0);
x = pthread_mutex_unlock (&__go_select_mutex);
__go_assert (x == 0);
}
return ret;
}
/* Mark all channels to show that we are waiting for them. This is
called with the select mutex held, but none of the channels are
locked. This returns true if some channel was found to be
ready. */
static _Bool
mark_all_channels_waiting (struct select_channel* channels, size_t count,
struct __go_channel **selected_pointer,
_Bool *selected_for_read_pointer)
{
_Bool ret;
int x;
size_t i;
ret = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
if (channel == NULL)
continue;
if (channels[i].dup_index != (size_t) -1UL)
{
size_t j;
/* A channel may be selected for both read and write. */
if (channels[channels[i].dup_index].is_send != is_send)
{
for (j = channels[i].dup_index + 1; j < i; ++j)
{
if (channels[j].channel == channel
&& channels[j].is_send == is_send)
break;
}
if (j < i)
continue;
}
}
x = pthread_mutex_lock (&channel->lock);
__go_assert (x == 0);
/* To avoid a race condition, we have to check again whether the
channel is ready. It may have become ready since we did the
first set of checks but before we acquired the select mutex.
If we don't check here, we could sleep forever on the select
condition variable. */
if (is_channel_ready (channel, is_send))
ret = 1;
/* If SELECTED_POINTER is NULL, then we have already marked the
channel as waiting. */
if (selected_pointer != NULL)
mark_select_waiting (&channels[i], selected_pointer,
selected_for_read_pointer);
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
return ret;
}
/* Implement select. This is called by the compiler-generated code
with pairs of arguments: a pointer to a channel, and an int which
is non-zero for send, zero for receive. */
size_t
__go_select (size_t count, _Bool has_default,
struct __go_channel **channel_args, _Bool *is_send_args)
{
struct select_channel stack_buffer[16];
struct select_channel *allocated_buffer;
struct select_channel *channels;
size_t i;
int x;
struct __go_channel *selected_channel;
_Bool selected_for_read;
_Bool is_queued;
if (count < sizeof stack_buffer / sizeof stack_buffer[0])
{
channels = &stack_buffer[0];
allocated_buffer = NULL;
}
else
{
allocated_buffer = ((struct select_channel *)
malloc (count * sizeof (struct select_channel)));
channels = allocated_buffer;
}
for (i = 0; i < count; ++i)
{
struct __go_channel *channel_arg = channel_args[i];
_Bool is_send = is_send_args[i];
channels[i].channel = (struct __go_channel*) channel_arg;
channels[i].retval = i + 1;
channels[i].dup_index = (size_t) -1UL;
channels[i].queue_entry.next = NULL;
channels[i].queue_entry.selected = NULL;
channels[i].is_send = is_send;
channels[i].is_ready = 0;
}
qsort (channels, count, sizeof (struct select_channel), channel_sort);
for (i = 0; i < count; ++i)
{
size_t j;
for (j = 0; j < i; ++j)
{
if (channels[j].channel == channels[i].channel)
{
channels[i].dup_index = j;
break;
}
}
}
/* SELECT_CHANNEL is used to select synchronized channels. If no
channels are ready, we store a pointer to this variable on the
select queue for each synchronized channel. Because the variable
may be set by channel operations running in other goroutines,
SELECT_CHANNEL may only be accessed when all the channels are
locked and/or when the select_data_mutex is locked. */
selected_channel = NULL;
/* SELECTED_FOR_READ is set to true if SELECTED_CHANNEL was set by a
goroutine which wants to read from the channel. The access
restrictions for this are like those for SELECTED_CHANNEL. */
selected_for_read = 0;
/* IS_QUEUED is true if we have queued up this select on the queues
for any associated synchronous channels. We only do this if no
channels are ready the first time around the loop. */
is_queued = 0;
while (1)
{
int ready_count;
_Bool is_selected;
/* Lock all channels, identify which ones are ready. */
ready_count = lock_channels_find_ready (channels, count);
/* All the channels are locked, so we can look at
SELECTED_CHANNEL. If it is not NULL, then our choice has
been forced by some other goroutine. This can only happen
after the first time through the loop. */
is_selected = selected_channel != NULL;
if (is_selected)
ready_count = force_selected_channel_ready (channels, count,
selected_channel,
selected_for_read);
if (ready_count > 0)
{
size_t ret;
ret = unlock_channels_and_select (channels, count, ready_count,
is_selected,
(is_queued
? &selected_channel
: NULL));
/* If RET is zero, it means that the channel we picked
turned out not to be ready, because some other select
grabbed it during our traversal. Loop around and try
again. */
if (ret == 0)
{
is_queued = 0;
/* We are no longer on any channel queues, so it is safe
to touch SELECTED_CHANNEL here. It must be NULL,
because otherwise that would somebody has promised to
synch up with us and then failed to do so. */
__go_assert (selected_channel == NULL);
continue;
}
if (allocated_buffer != NULL)
free (allocated_buffer);
return ret;
}
/* No channels were ready. */
unlock_channels (channels, count);
if (has_default)
{
/* Use the default clause. */
if (allocated_buffer != NULL)
free (allocated_buffer);
return 0;
}
/* This is a blocking select. Grab the select lock, tell all
the channels to notify us when something happens, and wait
for something to happen. */
x = pthread_mutex_lock (&__go_select_mutex);
__go_assert (x == 0);
/* Check whether CHANNEL_SELECTED was set while the channels
were unlocked. If it was set, then we can simply loop around
again. We need to check this while the select mutex is held.
It is possible that something will set CHANNEL_SELECTED while
we mark the channels as waiting. If this happens, that
goroutine is required to signal the select condition
variable, which means acquiring the select mutex. Since we
have the select mutex locked ourselves, we can not miss that
signal. */
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
is_selected = selected_channel != NULL;
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
if (!is_selected)
{
/* Mark the channels as waiting, and check whether they have
become ready. */
if (!mark_all_channels_waiting (channels, count,
(is_queued
? NULL
: &selected_channel),
(is_queued
? NULL
: &selected_for_read)))
{
x = pthread_cond_wait (&__go_select_cond, &__go_select_mutex);
__go_assert (x == 0);
}
is_queued = 1;
}
x = pthread_mutex_unlock (&__go_select_mutex);
__go_assert (x == 0);
}
}
|