api_topology.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package app
  2. import (
  3. "context"
  4. "net/http"
  5. "net/url"
  6. "time"
  7. "github.com/gorilla/mux"
  8. ot "github.com/opentracing/opentracing-go"
  9. otlog "github.com/opentracing/opentracing-go/log"
  10. "github.com/pkg/errors"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/weaveworks/scope/common/xfer"
  13. "github.com/weaveworks/scope/render"
  14. "github.com/weaveworks/scope/render/detailed"
  15. "github.com/weaveworks/scope/report"
  16. )
  17. const (
  18. websocketLoop = 1 * time.Second
  19. )
  20. // APITopology is returned by the /api/topology/{name} handler.
  21. type APITopology struct {
  22. Nodes detailed.NodeSummaries `json:"nodes"`
  23. }
  24. // APINode is returned by the /api/topology/{name}/{id} handler.
  25. type APINode struct {
  26. Node detailed.Node `json:"node"`
  27. }
  28. // RenderContextForReporter creates the rendering context for the given reporter.
  29. func RenderContextForReporter(rep Reporter, r report.Report) detailed.RenderContext {
  30. rc := detailed.RenderContext{Report: r}
  31. if wrep, ok := rep.(WebReporter); ok {
  32. rc.MetricsGraphURL = wrep.MetricsGraphURL
  33. }
  34. return rc
  35. }
  36. type rendererHandler func(context.Context, render.Renderer, render.Transformer, detailed.RenderContext, http.ResponseWriter, *http.Request)
  37. // Full topology.
  38. func handleTopology(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) {
  39. censorCfg := report.GetCensorConfigFromRequest(r)
  40. nodeSummaries := detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, transformer).Nodes)
  41. respondWith(ctx, w, http.StatusOK, APITopology{
  42. Nodes: detailed.CensorNodeSummaries(nodeSummaries, censorCfg),
  43. })
  44. }
  45. // Individual nodes.
  46. func handleNode(ctx context.Context, renderer render.Renderer, transformer render.Transformer, rc detailed.RenderContext, w http.ResponseWriter, r *http.Request) {
  47. var (
  48. censorCfg = report.GetCensorConfigFromRequest(r)
  49. vars = mux.Vars(r)
  50. topologyID = vars["topology"]
  51. nodeID = vars["id"]
  52. )
  53. // We must not lose the node during filtering. We achieve that by
  54. // (1) rendering the report with the base renderer, without
  55. // filtering, which gives us the node (if it exists at all), and
  56. // then (2) applying the filter separately to that result. If the
  57. // node is lost in the second step, we simply put it back.
  58. nodes := renderer.Render(ctx, rc.Report)
  59. node, ok := nodes.Nodes[nodeID]
  60. if !ok {
  61. http.NotFound(w, r)
  62. return
  63. }
  64. nodes = transformer.Transform(nodes)
  65. if filteredNode, ok := nodes.Nodes[nodeID]; ok {
  66. node = filteredNode
  67. } else { // we've lost the node during filtering; put it back
  68. nodes.Nodes[nodeID] = node
  69. nodes.Filtered--
  70. }
  71. rawNode := detailed.MakeNode(topologyID, rc, nodes.Nodes, node)
  72. respondWith(ctx, w, http.StatusOK, APINode{Node: detailed.CensorNode(rawNode, censorCfg)})
  73. }
  74. // Websocket for the full topology.
  75. func handleWebsocket(
  76. ctx context.Context,
  77. rep Reporter,
  78. w http.ResponseWriter,
  79. r *http.Request,
  80. ) {
  81. if err := r.ParseForm(); err != nil {
  82. respondWith(ctx, w, http.StatusInternalServerError, err)
  83. return
  84. }
  85. loop := websocketLoop
  86. if t := r.Form.Get("t"); t != "" {
  87. var err error
  88. if loop, err = time.ParseDuration(t); err != nil {
  89. respondWith(ctx, w, http.StatusBadRequest, t)
  90. return
  91. }
  92. }
  93. conn, err := xfer.Upgrade(w, r, nil)
  94. if err != nil {
  95. // log.Info("Upgrade:", err)
  96. return
  97. }
  98. defer conn.Close()
  99. quit := make(chan struct{})
  100. go func(c xfer.Websocket) {
  101. for { // just discard everything the browser sends
  102. if _, _, err := c.ReadMessage(); err != nil {
  103. if !xfer.IsExpectedWSCloseError(err) {
  104. log.Error("err:", err)
  105. }
  106. close(quit)
  107. break
  108. }
  109. }
  110. }(conn)
  111. wc := websocketState{
  112. rep: rep,
  113. values: r.Form,
  114. conn: conn,
  115. topologyID: mux.Vars(r)["topology"],
  116. startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")),
  117. censorCfg: report.GetCensorConfigFromRequest(r),
  118. channelOpenedAt: time.Now(),
  119. }
  120. wait := make(chan struct{}, 1)
  121. rep.WaitOn(ctx, wait)
  122. defer rep.UnWait(ctx, wait)
  123. tick := time.Tick(loop)
  124. for {
  125. if err := wc.update(ctx); err != nil {
  126. log.Errorf("%v", err)
  127. return
  128. }
  129. select {
  130. case <-wait:
  131. case <-tick:
  132. case <-quit:
  133. return
  134. }
  135. }
  136. }
  137. type websocketState struct {
  138. rep Reporter
  139. values url.Values
  140. conn xfer.Websocket
  141. previousTopo detailed.NodeSummaries
  142. topologyID string
  143. startReportingAt time.Time
  144. reportTimestamp time.Time
  145. censorCfg report.CensorConfig
  146. channelOpenedAt time.Time
  147. }
  148. func (wc *websocketState) update(ctx context.Context) error {
  149. span := ot.StartSpan("websocket.Render", ot.Tag{Key: "topology", Value: wc.topologyID})
  150. defer span.Finish()
  151. ctx = ot.ContextWithSpan(ctx, span)
  152. // We measure how much time has passed since the channel was opened
  153. // and add it to the initial report timestamp to get the timestamp
  154. // of the snapshot we want to report right now.
  155. // NOTE: Multiplying `timestampDelta` by a constant factor here
  156. // would have an effect of fast-forward, which is something we
  157. // might be interested in implementing in the future.
  158. timestampDelta := time.Since(wc.channelOpenedAt)
  159. reportTimestamp := wc.startReportingAt.Add(timestampDelta)
  160. span.LogFields(otlog.String("opened-at", wc.channelOpenedAt.String()),
  161. otlog.String("timestamp", reportTimestamp.String()))
  162. re, err := wc.rep.Report(ctx, reportTimestamp)
  163. if err != nil {
  164. return errors.Wrap(err, "Error generating report")
  165. }
  166. re.UnsafeRemovePartMergedNodes(ctx)
  167. renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re)
  168. if err != nil {
  169. return errors.Wrap(err, "Error generating report")
  170. }
  171. newTopo := detailed.CensorNodeSummaries(
  172. detailed.Summaries(
  173. ctx,
  174. RenderContextForReporter(wc.rep, re),
  175. render.Render(ctx, re, renderer, filter).Nodes,
  176. ),
  177. wc.censorCfg,
  178. )
  179. diff := detailed.TopoDiff(wc.previousTopo, newTopo)
  180. wc.previousTopo = newTopo
  181. if err := wc.conn.WriteJSON(diff); err != nil {
  182. if !xfer.IsExpectedWSCloseError(err) {
  183. return errors.Wrap(err, "cannot serialize topology diff")
  184. }
  185. }
  186. return nil
  187. }