package xfer import ( "fmt" "net/rpc" "strconv" "sync" ) // ErrInvalidMessage is the error returned when the on-wire message is unexpected. var ErrInvalidMessage = fmt.Errorf("Invalid Message") // Request is the UI -> App -> Probe message type for control RPCs type Request struct { AppID string // filled in by the probe on receiving this request NodeID string Control string ControlArgs map[string]string } // Response is the Probe -> App -> UI message type for the control RPCs. type Response struct { Value interface{} `json:"value,omitempty"` Error string `json:"error,omitempty"` // Pipe specific fields Pipe string `json:"pipe,omitempty"` RawTTY bool `json:"raw_tty,omitempty"` ResizeTTYControl string `json:"resize_tty_control,omitempty"` // Remove specific fields RemovedNode string `json:"removedNode,omitempty"` // Set if node was removed } // Message is the unions of Request, Response and arbitrary Value. type Message struct { Request *rpc.Request Response *rpc.Response Value interface{} } // ControlHandler is interface used in the app and the probe to represent // a control RPC. type ControlHandler interface { Handle(req Request, res *Response) error } // ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc) // for ControlHandler type ControlHandlerFunc func(Request) Response // Handle is an adapter method to make ControlHandlers exposable via golang rpc func (c ControlHandlerFunc) Handle(req Request, res *Response) error { *res = c(req) return nil } // ResizeTTYControlWrapper extracts the arguments needed by the resize tty control handler func ResizeTTYControlWrapper(next func(pipeID string, height, width uint) Response) ControlHandlerFunc { return func(req Request) Response { var ( height, width uint64 err error ) pipeID, ok := req.ControlArgs["pipeID"] if !ok { return ResponseErrorf("Missing argument: pipeID") } heightS, ok := req.ControlArgs["height"] if !ok { return ResponseErrorf("Missing argument: height") } widthS, ok := req.ControlArgs["width"] if !ok { return ResponseErrorf("Missing argument: width") } height, err = strconv.ParseUint(heightS, 10, 32) if err != nil { return ResponseErrorf("Bad parameter: height (%q): %v", heightS, err) } width, err = strconv.ParseUint(widthS, 10, 32) if err != nil { return ResponseErrorf("Bad parameter: width (%q): %v", widthS, err) } return next(pipeID, uint(height), uint(width)) } } // ResponseErrorf creates a new Response with the given formatted error string. func ResponseErrorf(format string, a ...interface{}) Response { return Response{ Error: fmt.Sprintf(format, a...), } } // ResponseError creates a new Response with the given error. func ResponseError(err error) Response { if err != nil { return Response{ Error: err.Error(), } } return Response{} } // JSONWebsocketCodec is golang rpc compatible Server and Client Codec // that transmits and receives RPC messages over a websocker, as JSON. type JSONWebsocketCodec struct { sync.Mutex conn Websocket err chan error } // NewJSONWebsocketCodec makes a new JSONWebsocketCodec func NewJSONWebsocketCodec(conn Websocket) *JSONWebsocketCodec { return &JSONWebsocketCodec{ conn: conn, err: make(chan error, 1), } } // WaitForReadError blocks until any read on this codec returns an error. // This is useful to know when the server has disconnected from the client. func (j *JSONWebsocketCodec) WaitForReadError() error { return <-j.err } // WriteRequest implements rpc.ClientCodec func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error { j.Lock() defer j.Unlock() if err := j.conn.WriteJSON(Message{Request: r}); err != nil { return err } return j.conn.WriteJSON(Message{Value: v}) } // WriteResponse implements rpc.ServerCodec func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error { j.Lock() defer j.Unlock() if err := j.conn.WriteJSON(Message{Response: r}); err != nil { return err } return j.conn.WriteJSON(Message{Value: v}) } func (j *JSONWebsocketCodec) readMessage(v interface{}) (*Message, error) { m := Message{Value: v} if err := j.conn.ReadJSON(&m); err != nil { j.err <- err close(j.err) return nil, err } return &m, nil } // ReadResponseHeader implements rpc.ClientCodec func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error { m, err := j.readMessage(nil) if err != nil { return err } if m.Response == nil { return ErrInvalidMessage } *r = *m.Response return nil } // ReadResponseBody implements rpc.ClientCodec func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error { _, err := j.readMessage(v) if err != nil { return err } if v == nil { return ErrInvalidMessage } return nil } // Close implements rpc.ClientCodec and rpc.ServerCodec func (j *JSONWebsocketCodec) Close() error { return j.conn.Close() } // ReadRequestHeader implements rpc.ServerCodec func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error { m, err := j.readMessage(nil) if err != nil { return err } if m.Request == nil { return ErrInvalidMessage } *r = *m.Request return nil } // ReadRequestBody implements rpc.ServerCodec func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error { _, err := j.readMessage(v) if err != nil { return err } if v == nil { return ErrInvalidMessage } return nil }