From 554fd8c5195424bdbcabf5de30fdc183aba391bd Mon Sep 17 00:00:00 2001 From: upstream source tree Date: Sun, 15 Mar 2015 20:14:05 -0400 Subject: obtained gcc-4.6.4.tar.bz2 from upstream website; verified gcc-4.6.4.tar.bz2.sig; imported gcc-4.6.4 source tree from verified upstream tarball. downloading a git-generated archive based on the 'upstream' tag should provide you with a source tree that is binary identical to the one extracted from the above tarball. if you have obtained the source via the command 'git clone', however, do note that line-endings of files in your working directory might differ from line-endings of the respective files in the upstream repository. --- libgo/go/netchan/common.go | 325 ++++++++++++++++++++++++ libgo/go/netchan/export.go | 390 +++++++++++++++++++++++++++++ libgo/go/netchan/import.go | 243 ++++++++++++++++++ libgo/go/netchan/netchan_test.go | 515 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 1473 insertions(+) create mode 100644 libgo/go/netchan/common.go create mode 100644 libgo/go/netchan/export.go create mode 100644 libgo/go/netchan/import.go create mode 100644 libgo/go/netchan/netchan_test.go (limited to 'libgo/go/netchan') diff --git a/libgo/go/netchan/common.go b/libgo/go/netchan/common.go new file mode 100644 index 000000000..56c0b2519 --- /dev/null +++ b/libgo/go/netchan/common.go @@ -0,0 +1,325 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "gob" + "net" + "os" + "reflect" + "sync" + "time" +) + +// The direction of a connection from the client's perspective. +type Dir int + +const ( + Recv Dir = iota + Send +) + +func (dir Dir) String() string { + switch dir { + case Recv: + return "Recv" + case Send: + return "Send" + } + return "???" +} + +// Payload types +const ( + payRequest = iota // request structure follows + payError // error structure follows + payData // user payload follows + payAck // acknowledgement; no payload + payClosed // channel is now closed + payAckSend // payload has been delivered. +) + +// A header is sent as a prefix to every transmission. It will be followed by +// a request structure, an error structure, or an arbitrary user payload structure. +type header struct { + Id int + PayloadType int + SeqNum int64 +} + +// Sent with a header once per channel from importer to exporter to report +// that it wants to bind to a channel with the specified direction for count +// messages, with space for size buffered values. If count is -1, it means unlimited. +type request struct { + Name string + Count int64 + Size int + Dir Dir +} + +// Sent with a header to report an error. +type error struct { + Error string +} + +// Used to unify management of acknowledgements for import and export. +type unackedCounter interface { + unackedCount() int64 + ack() int64 + seq() int64 +} + +// A channel and its direction. +type chanDir struct { + ch *reflect.ChanValue + dir Dir +} + +// clientSet contains the objects and methods needed for tracking +// clients of an exporter and draining outstanding messages. +type clientSet struct { + mu sync.Mutex // protects access to channel and client maps + names map[string]*chanDir + clients map[unackedCounter]bool +} + +// Mutex-protected encoder and decoder pair. +type encDec struct { + decLock sync.Mutex + dec *gob.Decoder + encLock sync.Mutex + enc *gob.Encoder +} + +func newEncDec(conn net.Conn) *encDec { + return &encDec{ + dec: gob.NewDecoder(conn), + enc: gob.NewEncoder(conn), + } +} + +// Decode an item from the connection. +func (ed *encDec) decode(value reflect.Value) os.Error { + ed.decLock.Lock() + err := ed.dec.DecodeValue(value) + if err != nil { + // TODO: tear down connection? + } + ed.decLock.Unlock() + return err +} + +// Encode a header and payload onto the connection. +func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error { + ed.encLock.Lock() + hdr.PayloadType = payloadType + err := ed.enc.Encode(hdr) + if err == nil { + if payload != nil { + err = ed.enc.Encode(payload) + } + } + if err != nil { + // TODO: tear down connection if there is an error? + } + ed.encLock.Unlock() + return err +} + +// See the comment for Exporter.Drain. +func (cs *clientSet) drain(timeout int64) os.Error { + startTime := time.Nanoseconds() + for { + pending := false + cs.mu.Lock() + // Any messages waiting for a client? + for _, chDir := range cs.names { + if chDir.ch.Len() > 0 { + pending = true + } + } + // Any unacknowledged messages? + for client := range cs.clients { + n := client.unackedCount() + if n > 0 { // Check for > rather than != just to be safe. + pending = true + break + } + } + cs.mu.Unlock() + if !pending { + break + } + if timeout > 0 && time.Nanoseconds()-startTime >= timeout { + return os.ErrorString("timeout") + } + time.Sleep(100 * 1e6) // 100 milliseconds + } + return nil +} + +// See the comment for Exporter.Sync. +func (cs *clientSet) sync(timeout int64) os.Error { + startTime := time.Nanoseconds() + // seq remembers the clients and their seqNum at point of entry. + seq := make(map[unackedCounter]int64) + for client := range cs.clients { + seq[client] = client.seq() + } + for { + pending := false + cs.mu.Lock() + // Any unacknowledged messages? Look only at clients that existed + // when we started and are still in this client set. + for client := range seq { + if _, ok := cs.clients[client]; ok { + if client.ack() < seq[client] { + pending = true + break + } + } + } + cs.mu.Unlock() + if !pending { + break + } + if timeout > 0 && time.Nanoseconds()-startTime >= timeout { + return os.ErrorString("timeout") + } + time.Sleep(100 * 1e6) // 100 milliseconds + } + return nil +} + +// A netChan represents a channel imported or exported +// on a single connection. Flow is controlled by the receiving +// side by sending payAckSend messages when values +// are delivered into the local channel. +type netChan struct { + *chanDir + name string + id int + size int // buffer size of channel. + + // sender-specific state + ackCh chan bool // buffered with space for all the acks we need + space int // available space. + + // receiver-specific state + sendCh chan reflect.Value // buffered channel of values received from other end. + ed *encDec // so that we can send acks. + count int64 // number of values still to receive. +} + +// Create a new netChan with the given name (only used for +// messages), id, direction, buffer size, and count. +// The connection to the other side is represented by ed. +func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan { + c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count} + if c.dir == Send { + c.ackCh = make(chan bool, size) + c.space = size + } + return c +} + +// Close the channel. +func (nch *netChan) close() { + if nch.dir == Recv { + if nch.sendCh != nil { + // If the sender goroutine is active, close the channel to it. + // It will close nch.ch when it can. + close(nch.sendCh) + } else { + nch.ch.Close() + } + } else { + nch.ch.Close() + close(nch.ackCh) + } +} + +// Send message from remote side to local receiver. +func (nch *netChan) send(val reflect.Value) { + if nch.dir != Recv { + panic("send on wrong direction of channel") + } + if nch.sendCh == nil { + // If possible, do local send directly and ack immediately. + if nch.ch.TrySend(val) { + nch.sendAck() + return + } + // Start sender goroutine to manage delayed delivery of values. + nch.sendCh = make(chan reflect.Value, nch.size) + go nch.sender() + } + if ok := nch.sendCh <- val; !ok { + // TODO: should this be more resilient? + panic("netchan: remote sender sent more values than allowed") + } +} + +// sendAck sends an acknowledgment that a message has left +// the channel's buffer. If the messages remaining to be sent +// will fit in the channel's buffer, then we don't +// need to send an ack. +func (nch *netChan) sendAck() { + if nch.count < 0 || nch.count > int64(nch.size) { + nch.ed.encode(&header{Id: nch.id}, payAckSend, nil) + } + if nch.count > 0 { + nch.count-- + } +} + +// The sender process forwards items from the sending queue +// to the destination channel, acknowledging each item. +func (nch *netChan) sender() { + if nch.dir != Recv { + panic("sender on wrong direction of channel") + } + // When Exporter.Hangup is called, the underlying channel is closed, + // and so we may get a "too many operations on closed channel" error + // if there are outstanding messages in sendCh. + // Make sure that this doesn't panic the whole program. + defer func() { + if r := recover(); r != nil { + // TODO check that r is "too many operations", otherwise re-panic. + } + }() + for v := range nch.sendCh { + nch.ch.Send(v) + nch.sendAck() + } + nch.ch.Close() +} + +// Receive value from local side for sending to remote side. +func (nch *netChan) recv() (val reflect.Value, closed bool) { + if nch.dir != Send { + panic("recv on wrong direction of channel") + } + + if nch.space == 0 { + // Wait for buffer space. + <-nch.ackCh + nch.space++ + } + nch.space-- + return nch.ch.Recv(), nch.ch.Closed() +} + +// acked is called when the remote side indicates that +// a value has been delivered. +func (nch *netChan) acked() { + if nch.dir != Send { + panic("recv on wrong direction of channel") + } + if ok := nch.ackCh <- true; !ok { + panic("netchan: remote receiver sent too many acks") + // TODO: should this be more resilient? + } +} diff --git a/libgo/go/netchan/export.go b/libgo/go/netchan/export.go new file mode 100644 index 000000000..0f72ca7a9 --- /dev/null +++ b/libgo/go/netchan/export.go @@ -0,0 +1,390 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* + The netchan package implements type-safe networked channels: + it allows the two ends of a channel to appear on different + computers connected by a network. It does this by transporting + data sent to a channel on one machine so it can be recovered + by a receive of a channel of the same type on the other. + + An exporter publishes a set of channels by name. An importer + connects to the exporting machine and imports the channels + by name. After importing the channels, the two machines can + use the channels in the usual way. + + Networked channels are not synchronized; they always behave + as if they are buffered channels of at least one element. +*/ +package netchan + +// BUG: can't use range clause to receive when using ImportNValues to limit the count. + +import ( + "log" + "net" + "os" + "reflect" + "strconv" + "sync" +) + +// Export + +// expLog is a logging convenience function. The first argument must be a string. +func expLog(args ...interface{}) { + args[0] = "netchan export: " + args[0].(string) + log.Print(args...) +} + +// An Exporter allows a set of channels to be published on a single +// network port. A single machine may have multiple Exporters +// but they must use different ports. +type Exporter struct { + *clientSet + listener net.Listener +} + +type expClient struct { + *encDec + exp *Exporter + chans map[int]*netChan // channels in use by client + mu sync.Mutex // protects remaining fields + errored bool // client has been sent an error + seqNum int64 // sequences messages sent to client; has value of highest sent + ackNum int64 // highest sequence number acknowledged + seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu +} + +func newClient(exp *Exporter, conn net.Conn) *expClient { + client := new(expClient) + client.exp = exp + client.encDec = newEncDec(conn) + client.seqNum = 0 + client.ackNum = 0 + client.chans = make(map[int]*netChan) + return client +} + +func (client *expClient) sendError(hdr *header, err string) { + error := &error{err} + expLog("sending error to client:", error.Error) + client.encode(hdr, payError, error) // ignore any encode error, hope client gets it + client.mu.Lock() + client.errored = true + client.mu.Unlock() +} + +func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan { + exp := client.exp + exp.mu.Lock() + ech, ok := exp.names[name] + exp.mu.Unlock() + if !ok { + client.sendError(hdr, "no such channel: "+name) + return nil + } + if ech.dir != dir { + client.sendError(hdr, "wrong direction for channel: "+name) + return nil + } + nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count) + client.chans[hdr.Id] = nch + return nch +} + +func (client *expClient) getChan(hdr *header, dir Dir) *netChan { + nch := client.chans[hdr.Id] + if nch == nil { + return nil + } + if nch.dir != dir { + client.sendError(hdr, "wrong direction for channel: "+nch.name) + } + return nch +} + +// The function run manages sends and receives for a single client. For each +// (client Recv) request, this will launch a serveRecv goroutine to deliver +// the data for that channel, while (client Send) requests are handled as +// data arrives from the client. +func (client *expClient) run() { + hdr := new(header) + hdrValue := reflect.NewValue(hdr) + req := new(request) + reqValue := reflect.NewValue(req) + error := new(error) + for { + *hdr = header{} + if err := client.decode(hdrValue); err != nil { + expLog("error decoding client header:", err) + break + } + switch hdr.PayloadType { + case payRequest: + *req = request{} + if err := client.decode(reqValue); err != nil { + expLog("error decoding client request:", err) + break + } + if req.Size < 1 { + panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values") + } + switch req.Dir { + case Recv: + // look up channel before calling serveRecv to + // avoid a lock around client.chans. + if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil { + go client.serveRecv(nch, *hdr, req.Count) + } + case Send: + client.newChan(hdr, Recv, req.Name, req.Size, req.Count) + // The actual sends will have payload type payData. + // TODO: manage the count? + default: + error.Error = "request: can't handle channel direction" + expLog(error.Error, req.Dir) + client.encode(hdr, payError, error) + } + case payData: + client.serveSend(*hdr) + case payClosed: + client.serveClosed(*hdr) + case payAck: + client.mu.Lock() + if client.ackNum != hdr.SeqNum-1 { + // Since the sequence number is incremented and the message is sent + // in a single instance of locking client.mu, the messages are guaranteed + // to be sent in order. Therefore receipt of acknowledgement N means + // all messages <=N have been seen by the recipient. We check anyway. + expLog("sequence out of order:", client.ackNum, hdr.SeqNum) + } + if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. + client.ackNum = hdr.SeqNum + } + client.mu.Unlock() + case payAckSend: + if nch := client.getChan(hdr, Send); nch != nil { + nch.acked() + } + default: + log.Exit("netchan export: unknown payload type", hdr.PayloadType) + } + } + client.exp.delClient(client) +} + +// Send all the data on a single channel to a client asking for a Recv. +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) { + for { + val, closed := nch.recv() + if closed { + if err := client.encode(&hdr, payClosed, nil); err != nil { + expLog("error encoding server closed message:", err) + } + break + } + // We hold the lock during transmission to guarantee messages are + // sent in sequence number order. Also, we increment first so the + // value of client.SeqNum is the value of the highest used sequence + // number, not one beyond. + client.mu.Lock() + client.seqNum++ + hdr.SeqNum = client.seqNum + client.seqLock.Lock() // guarantee ordering of messages + client.mu.Unlock() + err := client.encode(&hdr, payData, val.Interface()) + client.seqLock.Unlock() + if err != nil { + expLog("error encoding client response:", err) + client.sendError(&hdr, err.String()) + break + } + // Negative count means run forever. + if count >= 0 { + if count--; count <= 0 { + break + } + } + } +} + +// Receive and deliver locally one item from a client asking for a Send +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveSend(hdr header) { + nch := client.getChan(&hdr, Recv) + if nch == nil { + return + } + // Create a new value for each received item. + val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) + if err := client.decode(val); err != nil { + expLog("value decode:", err, "; type ", nch.ch.Type()) + return + } + nch.send(val) +} + +// Report that client has closed the channel that is sending to us. +// The header is passed by value to avoid issues of overwriting. +func (client *expClient) serveClosed(hdr header) { + nch := client.getChan(&hdr, Recv) + if nch == nil { + return + } + nch.close() +} + +func (client *expClient) unackedCount() int64 { + client.mu.Lock() + n := client.seqNum - client.ackNum + client.mu.Unlock() + return n +} + +func (client *expClient) seq() int64 { + client.mu.Lock() + n := client.seqNum + client.mu.Unlock() + return n +} + +func (client *expClient) ack() int64 { + client.mu.Lock() + n := client.seqNum + client.mu.Unlock() + return n +} + +// Wait for incoming connections, start a new runner for each +func (exp *Exporter) listen() { + for { + conn, err := exp.listener.Accept() + if err != nil { + expLog("listen:", err) + break + } + client := exp.addClient(conn) + go client.run() + } +} + +// NewExporter creates a new Exporter to export channels +// on the network and local address defined as in net.Listen. +func NewExporter(network, localaddr string) (*Exporter, os.Error) { + listener, err := net.Listen(network, localaddr) + if err != nil { + return nil, err + } + e := &Exporter{ + listener: listener, + clientSet: &clientSet{ + names: make(map[string]*chanDir), + clients: make(map[unackedCounter]bool), + }, + } + go e.listen() + return e, nil +} + +// addClient creates a new expClient and records its existence +func (exp *Exporter) addClient(conn net.Conn) *expClient { + client := newClient(exp, conn) + exp.mu.Lock() + exp.clients[client] = true + exp.mu.Unlock() + return client +} + +// delClient forgets the client existed +func (exp *Exporter) delClient(client *expClient) { + exp.mu.Lock() + exp.clients[client] = false, false + exp.mu.Unlock() +} + +// Drain waits until all messages sent from this exporter/importer, including +// those not yet sent to any client and possibly including those sent while +// Drain was executing, have been received by the importer. In short, it +// waits until all the exporter's messages have been received by a client. +// If the timeout (measured in nanoseconds) is positive and Drain takes +// longer than that to complete, an error is returned. +func (exp *Exporter) Drain(timeout int64) os.Error { + // This wrapper function is here so the method's comment will appear in godoc. + return exp.clientSet.drain(timeout) +} + +// Sync waits until all clients of the exporter have received the messages +// that were sent at the time Sync was invoked. Unlike Drain, it does not +// wait for messages sent while it is running or messages that have not been +// dispatched to any client. If the timeout (measured in nanoseconds) is +// positive and Sync takes longer than that to complete, an error is +// returned. +func (exp *Exporter) Sync(timeout int64) os.Error { + // This wrapper function is here so the method's comment will appear in godoc. + return exp.clientSet.sync(timeout) +} + +// Addr returns the Exporter's local network address. +func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() } + +func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) { + chanType, ok := reflect.Typeof(chT).(*reflect.ChanType) + if !ok { + return nil, os.ErrorString("not a channel") + } + if dir != Send && dir != Recv { + return nil, os.ErrorString("unknown channel direction") + } + switch chanType.Dir() { + case reflect.BothDir: + case reflect.SendDir: + if dir != Recv { + return nil, os.ErrorString("to import/export with Send, must provide <-chan") + } + case reflect.RecvDir: + if dir != Send { + return nil, os.ErrorString("to import/export with Recv, must provide chan<-") + } + } + return reflect.NewValue(chT).(*reflect.ChanValue), nil +} + +// Export exports a channel of a given type and specified direction. The +// channel to be exported is provided in the call and may be of arbitrary +// channel type. +// Despite the literal signature, the effective signature is +// Export(name string, chT chan T, dir Dir) +func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { + ch, err := checkChan(chT, dir) + if err != nil { + return err + } + exp.mu.Lock() + defer exp.mu.Unlock() + _, present := exp.names[name] + if present { + return os.ErrorString("channel name already being exported:" + name) + } + exp.names[name] = &chanDir{ch, dir} + return nil +} + +// Hangup disassociates the named channel from the Exporter and closes +// the channel. Messages in flight for the channel may be dropped. +func (exp *Exporter) Hangup(name string) os.Error { + exp.mu.Lock() + chDir, ok := exp.names[name] + if ok { + exp.names[name] = nil, false + } + // TODO drop all instances of channel from client sets + exp.mu.Unlock() + if !ok { + return os.ErrorString("netchan export: hangup: no such channel: " + name) + } + chDir.ch.Close() + return nil +} diff --git a/libgo/go/netchan/import.go b/libgo/go/netchan/import.go new file mode 100644 index 000000000..22b0f69ba --- /dev/null +++ b/libgo/go/netchan/import.go @@ -0,0 +1,243 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "log" + "net" + "os" + "reflect" + "sync" +) + +// Import + +// impLog is a logging convenience function. The first argument must be a string. +func impLog(args ...interface{}) { + args[0] = "netchan import: " + args[0].(string) + log.Print(args...) +} + +// An Importer allows a set of channels to be imported from a single +// remote machine/network port. A machine may have multiple +// importers, even from the same machine/network port. +type Importer struct { + *encDec + conn net.Conn + chanLock sync.Mutex // protects access to channel map + names map[string]*netChan + chans map[int]*netChan + errors chan os.Error + maxId int +} + +// NewImporter creates a new Importer object to import channels +// from an Exporter at the network and remote address as defined in net.Dial. +// The Exporter must be available and serving when the Importer is +// created. +func NewImporter(network, remoteaddr string) (*Importer, os.Error) { + conn, err := net.Dial(network, "", remoteaddr) + if err != nil { + return nil, err + } + imp := new(Importer) + imp.encDec = newEncDec(conn) + imp.conn = conn + imp.chans = make(map[int]*netChan) + imp.names = make(map[string]*netChan) + imp.errors = make(chan os.Error, 10) + go imp.run() + return imp, nil +} + +// shutdown closes all channels for which we are receiving data from the remote side. +func (imp *Importer) shutdown() { + imp.chanLock.Lock() + for _, ich := range imp.chans { + if ich.dir == Recv { + ich.close() + } + } + imp.chanLock.Unlock() +} + +// Handle the data from a single imported data stream, which will +// have the form +// (response, data)* +// The response identifies by name which channel is transmitting data. +func (imp *Importer) run() { + // Loop on responses; requests are sent by ImportNValues() + hdr := new(header) + hdrValue := reflect.NewValue(hdr) + ackHdr := new(header) + err := new(error) + errValue := reflect.NewValue(err) + for { + *hdr = header{} + if e := imp.decode(hdrValue); e != nil { + impLog("header:", e) + imp.shutdown() + return + } + switch hdr.PayloadType { + case payData: + // done lower in loop + case payError: + if e := imp.decode(errValue); e != nil { + impLog("error:", e) + return + } + if err.Error != "" { + impLog("response error:", err.Error) + if sent := imp.errors <- os.ErrorString(err.Error); !sent { + imp.shutdown() + return + } + continue // errors are not acknowledged. + } + case payClosed: + nch := imp.getChan(hdr.Id, false) + if nch != nil { + nch.close() + } + continue // closes are not acknowledged. + case payAckSend: + // we can receive spurious acks if the channel is + // hung up, so we ask getChan to ignore any errors. + nch := imp.getChan(hdr.Id, true) + if nch != nil { + nch.acked() + } + continue + default: + impLog("unexpected payload type:", hdr.PayloadType) + return + } + nch := imp.getChan(hdr.Id, false) + if nch == nil { + continue + } + if nch.dir != Recv { + impLog("cannot happen: receive from non-Recv channel") + return + } + // Acknowledge receipt + ackHdr.Id = hdr.Id + ackHdr.SeqNum = hdr.SeqNum + imp.encode(ackHdr, payAck, nil) + // Create a new value for each received item. + value := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) + if e := imp.decode(value); e != nil { + impLog("importer value decode:", e) + return + } + nch.send(value) + } +} + +func (imp *Importer) getChan(id int, errOk bool) *netChan { + imp.chanLock.Lock() + ich := imp.chans[id] + imp.chanLock.Unlock() + if ich == nil { + if !errOk { + impLog("unknown id in netchan request: ", id) + } + return nil + } + return ich +} + +// Errors returns a channel from which transmission and protocol errors +// can be read. Clients of the importer are not required to read the error +// channel for correct execution. However, if too many errors occur +// without being read from the error channel, the importer will shut down. +func (imp *Importer) Errors() chan os.Error { + return imp.errors +} + +// Import imports a channel of the given type, size and specified direction. +// It is equivalent to ImportNValues with a count of -1, meaning unbounded. +func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error { + return imp.ImportNValues(name, chT, dir, size, -1) +} + +// ImportNValues imports a channel of the given type and specified +// direction and then receives or transmits up to n values on that +// channel. A value of n==-1 implies an unbounded number of values. The +// channel will have buffer space for size values, or 1 value if size < 1. +// The channel to be bound to the remote site's channel is provided +// in the call and may be of arbitrary channel type. +// Despite the literal signature, the effective signature is +// ImportNValues(name string, chT chan T, dir Dir, n int) os.Error +// Example usage: +// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234") +// if err != nil { log.Exit(err) } +// ch := make(chan myType) +// err = imp.ImportNValues("name", ch, Recv, 1) +// if err != nil { log.Exit(err) } +// fmt.Printf("%+v\n", <-ch) +func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error { + ch, err := checkChan(chT, dir) + if err != nil { + return err + } + imp.chanLock.Lock() + defer imp.chanLock.Unlock() + _, present := imp.names[name] + if present { + return os.ErrorString("channel name already being imported:" + name) + } + if size < 1 { + size = 1 + } + id := imp.maxId + imp.maxId++ + nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n)) + imp.names[name] = nch + imp.chans[id] = nch + // Tell the other side about this channel. + hdr := &header{Id: id} + req := &request{Name: name, Count: int64(n), Dir: dir, Size: size} + if err = imp.encode(hdr, payRequest, req); err != nil { + impLog("request encode:", err) + return err + } + if dir == Send { + go func() { + for i := 0; n == -1 || i < n; i++ { + val, closed := nch.recv() + if closed { + if err = imp.encode(hdr, payClosed, nil); err != nil { + impLog("error encoding client closed message:", err) + } + return + } + if err = imp.encode(hdr, payData, val.Interface()); err != nil { + impLog("error encoding client send:", err) + return + } + } + }() + } + return nil +} + +// Hangup disassociates the named channel from the Importer and closes +// the channel. Messages in flight for the channel may be dropped. +func (imp *Importer) Hangup(name string) os.Error { + imp.chanLock.Lock() + nc, ok := imp.names[name] + if ok { + imp.names[name] = nil, false + imp.chans[nc.id] = nil, false + } + imp.chanLock.Unlock() + if !ok { + return os.ErrorString("netchan import: hangup: no such channel: " + name) + } + nc.close() + return nil +} diff --git a/libgo/go/netchan/netchan_test.go b/libgo/go/netchan/netchan_test.go new file mode 100644 index 000000000..6d7d63f98 --- /dev/null +++ b/libgo/go/netchan/netchan_test.go @@ -0,0 +1,515 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "strings" + "testing" + "time" +) + +const count = 10 // number of items in most tests +const closeCount = 5 // number of items when sender closes early + +const base = 23 + +func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) { + ch := make(chan int) + err := exp.Export("exportedSend", ch, Send) + if err != nil { + t.Fatal("exportSend:", err) + } + go func() { + for i := 0; i < n; i++ { + ch <- base+i + } + close(ch) + if done != nil { + done <- true + } + }() +} + +func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) { + ch := make(chan int) + err := exp.Export("exportedRecv", ch, Recv) + expDone <- true + if err != nil { + t.Fatal("exportReceive:", err) + } + for i := 0; i < count; i++ { + v := <-ch + if closed(ch) { + if i != closeCount { + t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i) + } + break + } + if v != base+i { + t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v) + } + } +} + +func importSend(imp *Importer, n int, t *testing.T, done chan bool) { + ch := make(chan int) + err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1) + if err != nil { + t.Fatal("importSend:", err) + } + go func() { + for i := 0; i < n; i++ { + ch <- base+i + } + close(ch) + if done != nil { + done <- true + } + }() +} + +func importReceive(imp *Importer, t *testing.T, done chan bool) { + ch := make(chan int) + err := imp.ImportNValues("exportedSend", ch, Recv, 3, count) + if err != nil { + t.Fatal("importReceive:", err) + } + for i := 0; i < count; i++ { + v := <-ch + if closed(ch) { + if i != closeCount { + t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i) + } + break + } + if v != base+i { + t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v) + } + } + if done != nil { + done <- true + } +} + +func TestExportSendImportReceive(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + exportSend(exp, count, t, nil) + importReceive(imp, t, nil) +} + +func TestExportReceiveImportSend(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + expDone := make(chan bool) + done := make(chan bool) + go func() { + exportReceive(exp, t, expDone) + done <- true + }() + <-expDone + importSend(imp, count, t, nil) + <-done +} + +func TestClosingExportSendImportReceive(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + exportSend(exp, closeCount, t, nil) + importReceive(imp, t, nil) +} + +func TestClosingImportSendExportReceive(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + expDone := make(chan bool) + done := make(chan bool) + go func() { + exportReceive(exp, t, expDone) + done <- true + }() + <-expDone + importSend(imp, closeCount, t, nil) + <-done +} + +func TestErrorForIllegalChannel(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + // Now export a channel. + ch := make(chan int, 1) + err = exp.Export("aChannel", ch, Send) + if err != nil { + t.Fatal("export:", err) + } + ch <- 1234 + close(ch) + // Now try to import a different channel. + ch = make(chan int) + err = imp.Import("notAChannel", ch, Recv, 1) + if err != nil { + t.Fatal("import:", err) + } + // Expect an error now. Start a timeout. + timeout := make(chan bool, 1) // buffered so closure will not hang around. + go func() { + time.Sleep(10e9) // very long, to give even really slow machines a chance. + timeout <- true + }() + select { + case err = <-imp.Errors(): + if strings.Index(err.String(), "no such channel") < 0 { + t.Error("wrong error for nonexistent channel:", err) + } + case <-timeout: + t.Error("import of nonexistent channel did not receive an error") + } +} + +// Not a great test but it does at least invoke Drain. +func TestExportDrain(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + done := make(chan bool) + go func() { + exportSend(exp, closeCount, t, nil) + done <- true + }() + <-done + go importReceive(imp, t, done) + exp.Drain(0) + <-done +} + +// Not a great test but it does at least invoke Sync. +func TestExportSync(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + done := make(chan bool) + exportSend(exp, closeCount, t, nil) + go importReceive(imp, t, done) + exp.Sync(0) + <-done +} + +// Test hanging up the send side of an export. +// TODO: test hanging up the receive side of an export. +func TestExportHangup(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + ech := make(chan int) + err = exp.Export("exportedSend", ech, Send) + if err != nil { + t.Fatal("export:", err) + } + // Prepare to receive two values. We'll actually deliver only one. + ich := make(chan int) + err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2) + if err != nil { + t.Fatal("import exportedSend:", err) + } + // Send one value, receive it. + const Value = 1234 + ech <- Value + v := <-ich + if v != Value { + t.Fatal("expected", Value, "got", v) + } + // Now hang up the channel. Importer should see it close. + exp.Hangup("exportedSend") + v = <-ich + if !closed(ich) { + t.Fatal("expected channel to be closed; got value", v) + } +} + +// Test hanging up the send side of an import. +// TODO: test hanging up the receive side of an import. +func TestImportHangup(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + ech := make(chan int) + err = exp.Export("exportedRecv", ech, Recv) + if err != nil { + t.Fatal("export:", err) + } + // Prepare to Send two values. We'll actually deliver only one. + ich := make(chan int) + err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2) + if err != nil { + t.Fatal("import exportedRecv:", err) + } + // Send one value, receive it. + const Value = 1234 + ich <- Value + v := <-ech + if v != Value { + t.Fatal("expected", Value, "got", v) + } + // Now hang up the channel. Exporter should see it close. + imp.Hangup("exportedRecv") + v = <-ech + if !closed(ech) { + t.Fatal("expected channel to be closed; got value", v) + } +} + +// loop back exportedRecv to exportedSend, +// but receive a value from ctlch before starting the loop. +func exportLoopback(exp *Exporter, t *testing.T) { + inch := make(chan int) + if err := exp.Export("exportedRecv", inch, Recv); err != nil { + t.Fatal("exportRecv") + } + + outch := make(chan int) + if err := exp.Export("exportedSend", outch, Send); err != nil { + t.Fatal("exportSend") + } + + ctlch := make(chan int) + if err := exp.Export("exportedCtl", ctlch, Recv); err != nil { + t.Fatal("exportRecv") + } + + go func() { + <-ctlch + for i := 0; i < count; i++ { + x := <-inch + if x != base+i { + t.Errorf("exportLoopback expected %d; got %d", i, x) + } + outch <- x + } + }() +} + +// This test checks that channel operations can proceed +// even when other concurrent operations are blocked. +func TestIndependentSends(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + exportLoopback(exp, t) + + importSend(imp, count, t, nil) + done := make(chan bool) + go importReceive(imp, t, done) + + // wait for export side to try to deliver some values. + time.Sleep(0.25e9) + + ctlch := make(chan int) + if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil { + t.Fatal("importSend:", err) + } + ctlch <- 0 + + <-done +} + +// This test cross-connects a pair of exporter/importer pairs. +type value struct { + I int + Source string +} + +func TestCrossConnect(t *testing.T) { + e1, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + i1, err := NewImporter("tcp", e1.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + e2, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + i2, err := NewImporter("tcp", e2.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + crossExport(e1, e2, t) + crossImport(i1, i2, t) +} + +// Export side of cross-traffic. +func crossExport(e1, e2 *Exporter, t *testing.T) { + s := make(chan value) + err := e1.Export("exportedSend", s, Send) + if err != nil { + t.Fatal("exportSend:", err) + } + + r := make(chan value) + err = e2.Export("exportedReceive", r, Recv) + if err != nil { + t.Fatal("exportReceive:", err) + } + + go crossLoop("export", s, r, t) +} + +// Import side of cross-traffic. +func crossImport(i1, i2 *Importer, t *testing.T) { + s := make(chan value) + err := i2.Import("exportedReceive", s, Send, 2) + if err != nil { + t.Fatal("import of exportedReceive:", err) + } + + r := make(chan value) + err = i1.Import("exportedSend", r, Recv, 2) + if err != nil { + t.Fatal("import of exported Send:", err) + } + + crossLoop("import", s, r, t) +} + +// Cross-traffic: send and receive 'count' numbers. +func crossLoop(name string, s, r chan value, t *testing.T) { + for si, ri := 0, 0; si < count && ri < count; { + select { + case s <- value{si, name}: + si++ + case v := <-r: + if v.I != ri { + t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v) + } + ri++ + } + } +} + +const flowCount = 100 + +// test flow control from exporter to importer. +func TestExportFlowControl(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + sendDone := make(chan bool, 1) + exportSend(exp, flowCount, t, sendDone) + + ch := make(chan int) + err = imp.ImportNValues("exportedSend", ch, Recv, 20, -1) + if err != nil { + t.Fatal("importReceive:", err) + } + + testFlow(sendDone, ch, flowCount, t) +} + +// test flow control from importer to exporter. +func TestImportFlowControl(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + + ch := make(chan int) + err = exp.Export("exportedRecv", ch, Recv) + if err != nil { + t.Fatal("importReceive:", err) + } + + sendDone := make(chan bool, 1) + importSend(imp, flowCount, t, sendDone) + testFlow(sendDone, ch, flowCount, t) +} + +func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) { + go func() { + time.Sleep(1e9) + sendDone <- false + }() + + if <-sendDone { + t.Fatal("send did not block") + } + n := 0 + for i := range ch { + t.Log("after blocking, got value ", i) + n++ + } + if n != N { + t.Fatalf("expected %d values; got %d", N, n) + } +} -- cgit v1.2.3