summaryrefslogtreecommitdiff
path: root/libgo/go/rpc/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/rpc/client.go')
-rw-r--r--libgo/go/rpc/client.go250
1 files changed, 250 insertions, 0 deletions
diff --git a/libgo/go/rpc/client.go b/libgo/go/rpc/client.go
new file mode 100644
index 000000000..601c49715
--- /dev/null
+++ b/libgo/go/rpc/client.go
@@ -0,0 +1,250 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "bufio"
+ "gob"
+ "http"
+ "io"
+ "log"
+ "net"
+ "os"
+ "sync"
+)
+
+// Call represents an active RPC.
+type Call struct {
+ ServiceMethod string // The name of the service and method to call.
+ Args interface{} // The argument to the function (*struct).
+ Reply interface{} // The reply from the function (*struct).
+ Error os.Error // After completion, the error status.
+ Done chan *Call // Strobes when call is complete; value is the error status.
+ seq uint64
+}
+
+// Client represents an RPC Client.
+// There may be multiple outstanding Calls associated
+// with a single Client.
+type Client struct {
+ mutex sync.Mutex // protects pending, seq
+ shutdown os.Error // non-nil if the client is shut down
+ sending sync.Mutex
+ seq uint64
+ codec ClientCodec
+ pending map[uint64]*Call
+ closing bool
+}
+
+// A ClientCodec implements writing of RPC requests and
+// reading of RPC responses for the client side of an RPC session.
+// The client calls WriteRequest to write a request to the connection
+// and calls ReadResponseHeader and ReadResponseBody in pairs
+// to read responses. The client calls Close when finished with the
+// connection.
+type ClientCodec interface {
+ WriteRequest(*Request, interface{}) os.Error
+ ReadResponseHeader(*Response) os.Error
+ ReadResponseBody(interface{}) os.Error
+
+ Close() os.Error
+}
+
+func (client *Client) send(c *Call) {
+ // Register this call.
+ client.mutex.Lock()
+ if client.shutdown != nil {
+ c.Error = client.shutdown
+ client.mutex.Unlock()
+ _ = c.Done <- c // do not block
+ return
+ }
+ c.seq = client.seq
+ client.seq++
+ client.pending[c.seq] = c
+ client.mutex.Unlock()
+
+ // Encode and send the request.
+ request := new(Request)
+ client.sending.Lock()
+ defer client.sending.Unlock()
+ request.Seq = c.seq
+ request.ServiceMethod = c.ServiceMethod
+ if err := client.codec.WriteRequest(request, c.Args); err != nil {
+ panic("rpc: client encode error: " + err.String())
+ }
+}
+
+func (client *Client) input() {
+ var err os.Error
+ for err == nil {
+ response := new(Response)
+ err = client.codec.ReadResponseHeader(response)
+ if err != nil {
+ if err == os.EOF && !client.closing {
+ err = io.ErrUnexpectedEOF
+ }
+ break
+ }
+ seq := response.Seq
+ client.mutex.Lock()
+ c := client.pending[seq]
+ client.pending[seq] = c, false
+ client.mutex.Unlock()
+ err = client.codec.ReadResponseBody(c.Reply)
+ if response.Error != "" {
+ c.Error = os.ErrorString(response.Error)
+ } else if err != nil {
+ c.Error = err
+ } else {
+ // Empty strings should turn into nil os.Errors
+ c.Error = nil
+ }
+ // We don't want to block here. It is the caller's responsibility to make
+ // sure the channel has enough buffer space. See comment in Go().
+ _ = c.Done <- c // do not block
+ }
+ // Terminate pending calls.
+ client.mutex.Lock()
+ client.shutdown = err
+ for _, call := range client.pending {
+ call.Error = err
+ _ = call.Done <- call // do not block
+ }
+ client.mutex.Unlock()
+ if err != os.EOF || !client.closing {
+ log.Println("rpc: client protocol error:", err)
+ }
+}
+
+// NewClient returns a new Client to handle requests to the
+// set of services at the other end of the connection.
+func NewClient(conn io.ReadWriteCloser) *Client {
+ return NewClientWithCodec(&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)})
+}
+
+// NewClientWithCodec is like NewClient but uses the specified
+// codec to encode requests and decode responses.
+func NewClientWithCodec(codec ClientCodec) *Client {
+ client := &Client{
+ codec: codec,
+ pending: make(map[uint64]*Call),
+ }
+ go client.input()
+ return client
+}
+
+type gobClientCodec struct {
+ rwc io.ReadWriteCloser
+ dec *gob.Decoder
+ enc *gob.Encoder
+}
+
+func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) os.Error {
+ if err := c.enc.Encode(r); err != nil {
+ return err
+ }
+ return c.enc.Encode(body)
+}
+
+func (c *gobClientCodec) ReadResponseHeader(r *Response) os.Error {
+ return c.dec.Decode(r)
+}
+
+func (c *gobClientCodec) ReadResponseBody(body interface{}) os.Error {
+ return c.dec.Decode(body)
+}
+
+func (c *gobClientCodec) Close() os.Error {
+ return c.rwc.Close()
+}
+
+
+// DialHTTP connects to an HTTP RPC server at the specified network address
+// listening on the default HTTP RPC path.
+func DialHTTP(network, address string) (*Client, os.Error) {
+ return DialHTTPPath(network, address, DefaultRPCPath)
+}
+
+// DialHTTPPath connects to an HTTP RPC server
+// at the specified network address and path.
+func DialHTTPPath(network, address, path string) (*Client, os.Error) {
+ var err os.Error
+ conn, err := net.Dial(network, "", address)
+ if err != nil {
+ return nil, err
+ }
+ io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
+
+ // Require successful HTTP response
+ // before switching to RPC protocol.
+ resp, err := http.ReadResponse(bufio.NewReader(conn), "CONNECT")
+ if err == nil && resp.Status == connected {
+ return NewClient(conn), nil
+ }
+ if err == nil {
+ err = os.ErrorString("unexpected HTTP response: " + resp.Status)
+ }
+ conn.Close()
+ return nil, &net.OpError{"dial-http", network + " " + address, nil, err}
+}
+
+// Dial connects to an RPC server at the specified network address.
+func Dial(network, address string) (*Client, os.Error) {
+ conn, err := net.Dial(network, "", address)
+ if err != nil {
+ return nil, err
+ }
+ return NewClient(conn), nil
+}
+
+func (client *Client) Close() os.Error {
+ if client.shutdown != nil || client.closing {
+ return os.ErrorString("rpc: already closed")
+ }
+ client.mutex.Lock()
+ client.closing = true
+ client.mutex.Unlock()
+ return client.codec.Close()
+}
+
+// Go invokes the function asynchronously. It returns the Call structure representing
+// the invocation. The done channel will signal when the call is complete by returning
+// the same Call object. If done is nil, Go will allocate a new channel.
+// If non-nil, done must be buffered or Go will deliberately crash.
+func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
+ c := new(Call)
+ c.ServiceMethod = serviceMethod
+ c.Args = args
+ c.Reply = reply
+ if done == nil {
+ done = make(chan *Call, 10) // buffered.
+ } else {
+ // If caller passes done != nil, it must arrange that
+ // done has enough buffer for the number of simultaneous
+ // RPCs that will be using that channel. If the channel
+ // is totally unbuffered, it's best not to run at all.
+ if cap(done) == 0 {
+ log.Panic("rpc: done channel is unbuffered")
+ }
+ }
+ c.Done = done
+ if client.shutdown != nil {
+ c.Error = client.shutdown
+ _ = c.Done <- c // do not block
+ return c
+ }
+ client.send(c)
+ return c
+}
+
+// Call invokes the named function, waits for it to complete, and returns its error status.
+func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error {
+ if client.shutdown != nil {
+ return client.shutdown
+ }
+ call := <-client.Go(serviceMethod, args, reply, nil).Done
+ return call.Error
+}