controls.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package app
  2. import (
  3. "net/http"
  4. "net/rpc"
  5. "context"
  6. "github.com/gorilla/mux"
  7. log "github.com/sirupsen/logrus"
  8. "github.com/ugorji/go/codec"
  9. "github.com/weaveworks/scope/common/xfer"
  10. )
  11. // RegisterControlRoutes registers the various control routes with a http mux.
  12. func RegisterControlRoutes(router *mux.Router, cr ControlRouter) {
  13. router.
  14. Methods("GET").
  15. Path("/api/control/ws").
  16. HandlerFunc(requestContextDecorator(handleProbeWS(cr)))
  17. router.
  18. Methods("POST").
  19. Name("api_control_probeid_nodeid_control").
  20. MatcherFunc(URLMatcher("/api/control/{probeID}/{nodeID}/{control}")).
  21. HandlerFunc(requestContextDecorator(handleControl(cr)))
  22. }
  23. // handleControl routes control requests from the client to the appropriate
  24. // probe. Its is blocking.
  25. func handleControl(cr ControlRouter) CtxHandlerFunc {
  26. return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  27. var (
  28. vars = mux.Vars(r)
  29. probeID = vars["probeID"]
  30. nodeID = vars["nodeID"]
  31. control = vars["control"]
  32. controlArgs map[string]string
  33. )
  34. if r.ContentLength > 0 {
  35. err := codec.NewDecoder(r.Body, &codec.JsonHandle{}).Decode(&controlArgs)
  36. defer r.Body.Close()
  37. if err != nil {
  38. respondWith(ctx, w, http.StatusBadRequest, err)
  39. return
  40. }
  41. }
  42. result, err := cr.Handle(ctx, probeID, xfer.Request{
  43. NodeID: nodeID,
  44. Control: control,
  45. ControlArgs: controlArgs,
  46. })
  47. if err != nil {
  48. respondWith(ctx, w, http.StatusBadRequest, err.Error())
  49. return
  50. }
  51. if result.Error != "" {
  52. respondWith(ctx, w, http.StatusBadRequest, result.Error)
  53. return
  54. }
  55. respondWith(ctx, w, http.StatusOK, result)
  56. }
  57. }
  58. // handleProbeWS accepts websocket connections from the probe and registers
  59. // them in the control router, such that HandleControl calls can find them.
  60. func handleProbeWS(cr ControlRouter) CtxHandlerFunc {
  61. return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  62. probeID := r.Header.Get(xfer.ScopeProbeIDHeader)
  63. if probeID == "" {
  64. respondWith(ctx, w, http.StatusBadRequest, xfer.ScopeProbeIDHeader)
  65. return
  66. }
  67. conn, err := xfer.Upgrade(w, r, nil)
  68. if err != nil {
  69. log.Printf("Error upgrading control websocket: %v", err)
  70. return
  71. }
  72. defer conn.Close()
  73. codec := xfer.NewJSONWebsocketCodec(conn)
  74. client := rpc.NewClientWithCodec(codec)
  75. defer client.Close()
  76. id, err := cr.Register(ctx, probeID, func(req xfer.Request) xfer.Response {
  77. var res xfer.Response
  78. if err := client.Call("control.Handle", req, &res); err != nil {
  79. return xfer.ResponseError(err)
  80. }
  81. return res
  82. })
  83. if err != nil {
  84. respondWith(ctx, w, http.StatusBadRequest, err)
  85. return
  86. }
  87. defer cr.Deregister(ctx, probeID, id)
  88. if err := codec.WaitForReadError(); err != nil && !xfer.IsExpectedWSCloseError(err) {
  89. log.Errorf("Error on websocket: %v", err)
  90. }
  91. }
  92. }