pipes.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package app
  2. import (
  3. "net/http"
  4. "context"
  5. "github.com/gorilla/mux"
  6. opentracing "github.com/opentracing/opentracing-go"
  7. log "github.com/sirupsen/logrus"
  8. "github.com/weaveworks/scope/common/xfer"
  9. )
  10. // RegisterPipeRoutes registers the pipe routes
  11. func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) {
  12. router.Methods("GET").
  13. Name("api_pipe_pipeid_check").
  14. Path("/api/pipe/{pipeID}/check").
  15. HandlerFunc(requestContextDecorator(checkPipe(pr)))
  16. router.Methods("GET").
  17. Name("api_pipe_pipeid").
  18. Path("/api/pipe/{pipeID}").
  19. HandlerFunc(requestContextDecorator(handlePipeWs(pr, UIEnd)))
  20. router.Methods("GET").
  21. Name("api_pipe_pipeid_probe").
  22. Path("/api/pipe/{pipeID}/probe").
  23. HandlerFunc(requestContextDecorator(handlePipeWs(pr, ProbeEnd)))
  24. router.Methods("DELETE", "POST").
  25. Name("api_pipe_pipeid").
  26. Path("/api/pipe/{pipeID}").
  27. HandlerFunc(requestContextDecorator(deletePipe(pr)))
  28. }
  29. func checkPipe(pr PipeRouter) CtxHandlerFunc {
  30. return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  31. id := mux.Vars(r)["pipeID"]
  32. exists, err := pr.Exists(ctx, id)
  33. if err != nil {
  34. respondWith(ctx, w, http.StatusInternalServerError, err)
  35. } else if exists {
  36. w.WriteHeader(http.StatusNoContent)
  37. } else {
  38. http.NotFound(w, r)
  39. }
  40. }
  41. }
  42. func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
  43. return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  44. id := mux.Vars(r)["pipeID"]
  45. pipe, endIO, err := pr.Get(ctx, id, end)
  46. if err != nil {
  47. // this usually means the pipe has been closed
  48. log.Debugf("Error getting pipe %s: %v", id, err)
  49. http.NotFound(w, r)
  50. return
  51. }
  52. defer pr.Release(ctx, id, end)
  53. conn, err := xfer.Upgrade(w, r, nil)
  54. if err != nil {
  55. log.Errorf("Error upgrading pipe %s (%d) websocket: %v", id, end, err)
  56. return
  57. }
  58. defer conn.Close()
  59. if _, err := pipe.CopyToWebsocket(endIO, conn); err != nil {
  60. if span := opentracing.SpanFromContext(ctx); span != nil {
  61. span.LogKV("error", err.Error())
  62. }
  63. log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
  64. }
  65. }
  66. }
  67. func deletePipe(pr PipeRouter) CtxHandlerFunc {
  68. return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  69. pipeID := mux.Vars(r)["pipeID"]
  70. log.Debugf("Deleting pipe %s", pipeID)
  71. if err := pr.Delete(ctx, pipeID); err != nil {
  72. respondWith(ctx, w, http.StatusInternalServerError, err)
  73. }
  74. }
  75. }