controls.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package xfer
  2. import (
  3. "fmt"
  4. "net/rpc"
  5. "strconv"
  6. "sync"
  7. )
  8. // ErrInvalidMessage is the error returned when the on-wire message is unexpected.
  9. var ErrInvalidMessage = fmt.Errorf("Invalid Message")
  10. // Request is the UI -> App -> Probe message type for control RPCs
  11. type Request struct {
  12. AppID string // filled in by the probe on receiving this request
  13. NodeID string
  14. Control string
  15. ControlArgs map[string]string
  16. }
  17. // Response is the Probe -> App -> UI message type for the control RPCs.
  18. type Response struct {
  19. Value interface{} `json:"value,omitempty"`
  20. Error string `json:"error,omitempty"`
  21. // Pipe specific fields
  22. Pipe string `json:"pipe,omitempty"`
  23. RawTTY bool `json:"raw_tty,omitempty"`
  24. ResizeTTYControl string `json:"resize_tty_control,omitempty"`
  25. // Remove specific fields
  26. RemovedNode string `json:"removedNode,omitempty"` // Set if node was removed
  27. }
  28. // Message is the unions of Request, Response and arbitrary Value.
  29. type Message struct {
  30. Request *rpc.Request
  31. Response *rpc.Response
  32. Value interface{}
  33. }
  34. // ControlHandler is interface used in the app and the probe to represent
  35. // a control RPC.
  36. type ControlHandler interface {
  37. Handle(req Request, res *Response) error
  38. }
  39. // ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc)
  40. // for ControlHandler
  41. type ControlHandlerFunc func(Request) Response
  42. // Handle is an adapter method to make ControlHandlers exposable via golang rpc
  43. func (c ControlHandlerFunc) Handle(req Request, res *Response) error {
  44. *res = c(req)
  45. return nil
  46. }
  47. // ResizeTTYControlWrapper extracts the arguments needed by the resize tty control handler
  48. func ResizeTTYControlWrapper(next func(pipeID string, height, width uint) Response) ControlHandlerFunc {
  49. return func(req Request) Response {
  50. var (
  51. height, width uint64
  52. err error
  53. )
  54. pipeID, ok := req.ControlArgs["pipeID"]
  55. if !ok {
  56. return ResponseErrorf("Missing argument: pipeID")
  57. }
  58. heightS, ok := req.ControlArgs["height"]
  59. if !ok {
  60. return ResponseErrorf("Missing argument: height")
  61. }
  62. widthS, ok := req.ControlArgs["width"]
  63. if !ok {
  64. return ResponseErrorf("Missing argument: width")
  65. }
  66. height, err = strconv.ParseUint(heightS, 10, 32)
  67. if err != nil {
  68. return ResponseErrorf("Bad parameter: height (%q): %v", heightS, err)
  69. }
  70. width, err = strconv.ParseUint(widthS, 10, 32)
  71. if err != nil {
  72. return ResponseErrorf("Bad parameter: width (%q): %v", widthS, err)
  73. }
  74. return next(pipeID, uint(height), uint(width))
  75. }
  76. }
  77. // ResponseErrorf creates a new Response with the given formatted error string.
  78. func ResponseErrorf(format string, a ...interface{}) Response {
  79. return Response{
  80. Error: fmt.Sprintf(format, a...),
  81. }
  82. }
  83. // ResponseError creates a new Response with the given error.
  84. func ResponseError(err error) Response {
  85. if err != nil {
  86. return Response{
  87. Error: err.Error(),
  88. }
  89. }
  90. return Response{}
  91. }
  92. // JSONWebsocketCodec is golang rpc compatible Server and Client Codec
  93. // that transmits and receives RPC messages over a websocker, as JSON.
  94. type JSONWebsocketCodec struct {
  95. sync.Mutex
  96. conn Websocket
  97. err chan error
  98. }
  99. // NewJSONWebsocketCodec makes a new JSONWebsocketCodec
  100. func NewJSONWebsocketCodec(conn Websocket) *JSONWebsocketCodec {
  101. return &JSONWebsocketCodec{
  102. conn: conn,
  103. err: make(chan error, 1),
  104. }
  105. }
  106. // WaitForReadError blocks until any read on this codec returns an error.
  107. // This is useful to know when the server has disconnected from the client.
  108. func (j *JSONWebsocketCodec) WaitForReadError() error {
  109. return <-j.err
  110. }
  111. // WriteRequest implements rpc.ClientCodec
  112. func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error {
  113. j.Lock()
  114. defer j.Unlock()
  115. if err := j.conn.WriteJSON(Message{Request: r}); err != nil {
  116. return err
  117. }
  118. return j.conn.WriteJSON(Message{Value: v})
  119. }
  120. // WriteResponse implements rpc.ServerCodec
  121. func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error {
  122. j.Lock()
  123. defer j.Unlock()
  124. if err := j.conn.WriteJSON(Message{Response: r}); err != nil {
  125. return err
  126. }
  127. return j.conn.WriteJSON(Message{Value: v})
  128. }
  129. func (j *JSONWebsocketCodec) readMessage(v interface{}) (*Message, error) {
  130. m := Message{Value: v}
  131. if err := j.conn.ReadJSON(&m); err != nil {
  132. j.err <- err
  133. close(j.err)
  134. return nil, err
  135. }
  136. return &m, nil
  137. }
  138. // ReadResponseHeader implements rpc.ClientCodec
  139. func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error {
  140. m, err := j.readMessage(nil)
  141. if err != nil {
  142. return err
  143. }
  144. if m.Response == nil {
  145. return ErrInvalidMessage
  146. }
  147. *r = *m.Response
  148. return nil
  149. }
  150. // ReadResponseBody implements rpc.ClientCodec
  151. func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error {
  152. _, err := j.readMessage(v)
  153. if err != nil {
  154. return err
  155. }
  156. if v == nil {
  157. return ErrInvalidMessage
  158. }
  159. return nil
  160. }
  161. // Close implements rpc.ClientCodec and rpc.ServerCodec
  162. func (j *JSONWebsocketCodec) Close() error {
  163. return j.conn.Close()
  164. }
  165. // ReadRequestHeader implements rpc.ServerCodec
  166. func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error {
  167. m, err := j.readMessage(nil)
  168. if err != nil {
  169. return err
  170. }
  171. if m.Request == nil {
  172. return ErrInvalidMessage
  173. }
  174. *r = *m.Request
  175. return nil
  176. }
  177. // ReadRequestBody implements rpc.ServerCodec
  178. func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error {
  179. _, err := j.readMessage(v)
  180. if err != nil {
  181. return err
  182. }
  183. if v == nil {
  184. return ErrInvalidMessage
  185. }
  186. return nil
  187. }