collector.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. package multitenant
  2. // Collect reports from probes per-tenant, and supply them to queriers on demand
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "strconv"
  10. "sync"
  11. "time"
  12. "context"
  13. "github.com/nats-io/nats"
  14. "github.com/opentracing-contrib/go-stdlib/nethttp"
  15. opentracing "github.com/opentracing/opentracing-go"
  16. otlog "github.com/opentracing/opentracing-go/log"
  17. "github.com/prometheus/client_golang/prometheus"
  18. log "github.com/sirupsen/logrus"
  19. "github.com/weaveworks/common/instrument"
  20. "github.com/weaveworks/common/user"
  21. "github.com/weaveworks/scope/app"
  22. "github.com/weaveworks/scope/report"
  23. "golang.org/x/sync/errgroup"
  24. )
  25. var (
  26. topologiesDropped = prometheus.NewCounterVec(prometheus.CounterOpts{
  27. Namespace: "scope",
  28. Name: "topologies_dropped_total",
  29. Help: "Total count of topologies dropped for being over limit.",
  30. }, []string{"user", "topology"})
  31. natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
  32. Namespace: "scope",
  33. Name: "nats_requests_total",
  34. Help: "Total count of NATS requests.",
  35. }, []string{"method", "status_code"})
  36. reportReceivedSizeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
  37. Namespace: "scope",
  38. Name: "report_received_size_bytes",
  39. Help: "Distribution of received report sizes",
  40. Buckets: prometheus.ExponentialBuckets(4096, 2.0, 10),
  41. })
  42. reportsReceivedPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
  43. Namespace: "scope",
  44. Name: "reports_received_total",
  45. Help: "Total count of received reports per user.",
  46. }, []string{"user"})
  47. shortcutsReceivedPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
  48. Namespace: "scope",
  49. Name: "shortcut_reports_received_total",
  50. Help: "Total count of received shortcut reports per user.",
  51. }, []string{"user"})
  52. reportReceivedSizePerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
  53. Namespace: "scope",
  54. Name: "reports_received_bytes_total",
  55. Help: "Total bytes received in reports per user.",
  56. }, []string{"user"})
  57. )
  58. func registerLiveCollectorMetrics() {
  59. prometheus.MustRegister(topologiesDropped)
  60. prometheus.MustRegister(natsRequests)
  61. prometheus.MustRegister(reportReceivedSizeHistogram)
  62. prometheus.MustRegister(reportsReceivedPerUser)
  63. prometheus.MustRegister(shortcutsReceivedPerUser)
  64. prometheus.MustRegister(reportReceivedSizePerUser)
  65. }
  66. var registerLiveCollectorMetricsOnce sync.Once
  67. // LiveCollectorConfig has everything we need to make a collector for live multitenant data.
  68. type LiveCollectorConfig struct {
  69. UserIDer UserIDer
  70. NatsHost string
  71. MemcacheClient *MemcacheClient
  72. Window time.Duration
  73. TickInterval time.Duration
  74. MaxTopNodes int
  75. CollectorAddr string
  76. }
  77. type liveCollector struct {
  78. cfg LiveCollectorConfig
  79. merger app.Merger
  80. pending sync.Map
  81. ticker *time.Ticker
  82. tickCallbacks []func(context.Context)
  83. nats *nats.Conn
  84. waitersLock sync.Mutex
  85. waiters map[watchKey]*nats.Subscription
  86. collectors []string
  87. lastResolved time.Time
  88. }
  89. // if StoreInterval is set, reports are merged into here and held until flushed to store
  90. type pendingEntry struct {
  91. sync.Mutex
  92. report *report.Report
  93. older []*report.Report
  94. }
  95. // NewLiveCollector makes a new LiveCollector from the supplied config.
  96. func NewLiveCollector(config LiveCollectorConfig) (app.Collector, error) {
  97. c := &liveCollector{
  98. cfg: config,
  99. }
  100. return c, c.init()
  101. }
  102. func (c *liveCollector) init() error {
  103. registerLiveCollectorMetricsOnce.Do(registerLiveCollectorMetrics)
  104. var nc *nats.Conn
  105. if c.cfg.NatsHost != "" {
  106. if c.cfg.MemcacheClient == nil {
  107. return fmt.Errorf("Must supply memcache client when using nats")
  108. }
  109. var err error
  110. nc, err = nats.Connect(c.cfg.NatsHost)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. c.nats = nc
  116. c.merger = app.NewFastMerger()
  117. c.waiters = make(map[watchKey]*nats.Subscription)
  118. if c.isCollector() {
  119. if c.cfg.TickInterval == 0 {
  120. return fmt.Errorf("--app.collector.tick-interval or --app.collector.store-interval must be non-zero for a collector")
  121. }
  122. c.ticker = time.NewTicker(c.cfg.TickInterval)
  123. go c.tickLoop()
  124. }
  125. c.tickCallbacks = append(c.tickCallbacks, c.bumpPending)
  126. return nil
  127. }
  128. func (c *liveCollector) tickLoop() {
  129. for range c.ticker.C {
  130. for _, f := range c.tickCallbacks {
  131. f(context.Background())
  132. }
  133. }
  134. }
  135. // Close will close things down
  136. func (c *liveCollector) Close() {
  137. c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running
  138. }
  139. // Range over all users (instances) that have pending reports and shift the data back in the array
  140. func (c *liveCollector) bumpPending(ctx context.Context) {
  141. c.pending.Range(func(key, value interface{}) bool {
  142. entry := value.(*pendingEntry)
  143. entry.Lock()
  144. rpt := entry.report
  145. entry.report = nil
  146. if entry.older == nil {
  147. entry.older = make([]*report.Report, c.cfg.Window/c.cfg.TickInterval)
  148. } else {
  149. copy(entry.older[1:], entry.older) // move everything down one
  150. }
  151. entry.older[0] = rpt
  152. entry.Unlock()
  153. return true
  154. })
  155. }
  156. func (c *liveCollector) HasHistoricReports() bool {
  157. return false
  158. }
  159. func (c *liveCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
  160. userid, err := c.cfg.UserIDer(ctx)
  161. if err != nil {
  162. return false, err
  163. }
  164. if time.Since(timestamp) < c.cfg.Window {
  165. has, err := c.hasReportsFromLive(ctx, userid)
  166. return has, err
  167. }
  168. return false, nil
  169. }
  170. func (c *liveCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
  171. userid, err := c.cfg.UserIDer(ctx)
  172. if err != nil {
  173. return err
  174. }
  175. reportReceivedSizeHistogram.Observe(float64(len(buf)))
  176. reportReceivedSizePerUser.WithLabelValues(userid).Add(float64(len(buf)))
  177. reportsReceivedPerUser.WithLabelValues(userid).Inc()
  178. // Shortcut reports are published to nats but not persisted -
  179. // we'll get a full report from the same probe in a few seconds
  180. if rep.Shortcut {
  181. shortcutsReceivedPerUser.WithLabelValues(userid).Inc()
  182. if c.nats != nil {
  183. _, _, reportKey := calculateReportKeys(userid, time.Now())
  184. _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
  185. if err != nil {
  186. log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err)
  187. // No point publishing on nats if cache store failed
  188. return nil
  189. }
  190. err := c.nats.Publish(userid, []byte(reportKey))
  191. natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
  192. if err != nil {
  193. log.Errorf("Error sending shortcut report: %v", err)
  194. }
  195. }
  196. return nil
  197. }
  198. rep = c.massageReport(userid, rep)
  199. c.addToLive(ctx, userid, rep)
  200. return nil
  201. }
  202. // process a report from a probe which may be at an older version or overloaded
  203. func (c *liveCollector) massageReport(userid string, report report.Report) report.Report {
  204. if c.cfg.MaxTopNodes > 0 {
  205. max := c.cfg.MaxTopNodes
  206. if len(report.Host.Nodes) > 1 {
  207. max = max * len(report.Host.Nodes) // higher limit for merged reports
  208. }
  209. var dropped []string
  210. report, dropped = report.DropTopologiesOver(max)
  211. for _, name := range dropped {
  212. topologiesDropped.WithLabelValues(userid, name).Inc()
  213. }
  214. }
  215. report = report.Upgrade()
  216. return report
  217. }
  218. // We are building up a report in memory; merge into that (for awsCollector it will be saved shortly)
  219. // NOTE: may retain a reference to rep; must not be used by caller after this.
  220. func (c *liveCollector) addToLive(ctx context.Context, userid string, rep report.Report) {
  221. entry := &pendingEntry{}
  222. if e, found := c.pending.LoadOrStore(userid, entry); found {
  223. entry = e.(*pendingEntry)
  224. }
  225. entry.Lock()
  226. if entry.report == nil {
  227. entry.report = &rep
  228. } else {
  229. entry.report.UnsafeMerge(rep)
  230. }
  231. entry.Unlock()
  232. }
  233. func (c *liveCollector) isCollector() bool {
  234. return c.cfg.CollectorAddr == ""
  235. }
  236. func (c *liveCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) {
  237. span, ctx := opentracing.StartSpanFromContext(ctx, "hasReportsFromLive")
  238. defer span.Finish()
  239. if c.isCollector() {
  240. e, found := c.pending.Load(userid)
  241. if !found {
  242. return false, nil
  243. }
  244. entry := e.(*pendingEntry)
  245. entry.Lock()
  246. defer entry.Unlock()
  247. if entry.report != nil {
  248. return true, nil
  249. }
  250. for _, v := range entry.older {
  251. if v != nil {
  252. return true, nil
  253. }
  254. }
  255. return false, nil
  256. }
  257. // We are a querier: ask each collector if it has any
  258. // (serially, since we will bail out on the first one that has reports)
  259. addrs := resolve(c.cfg.CollectorAddr)
  260. for _, addr := range addrs {
  261. body, err := oneCall(ctx, addr, "/api/probes?sparse=true", userid)
  262. if err != nil {
  263. return false, err
  264. }
  265. var hasReports bool
  266. decoder := json.NewDecoder(body)
  267. if err := decoder.Decode(&hasReports); err != nil {
  268. log.Errorf("Error encoding response: %v", err)
  269. }
  270. body.Close()
  271. if hasReports {
  272. return true, nil
  273. }
  274. }
  275. return false, nil
  276. }
  277. func (c *liveCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
  278. span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.Report")
  279. defer span.Finish()
  280. userid, err := c.cfg.UserIDer(ctx)
  281. if err != nil {
  282. return report.MakeReport(), err
  283. }
  284. span.SetTag("userid", userid)
  285. var reports []report.Report
  286. if time.Since(timestamp) < c.cfg.Window {
  287. reports, err = c.reportsFromLive(ctx, userid)
  288. }
  289. if err != nil {
  290. return report.MakeReport(), err
  291. }
  292. span.LogFields(otlog.Int("merging", len(reports)))
  293. return c.merger.Merge(reports), nil
  294. }
  295. func (c *liveCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) {
  296. span, ctx := opentracing.StartSpanFromContext(ctx, "reportsFromLive")
  297. defer span.Finish()
  298. if c.isCollector() {
  299. e, found := c.pending.Load(userid)
  300. if !found {
  301. return nil, nil
  302. }
  303. entry := e.(*pendingEntry)
  304. entry.Lock()
  305. ret := make([]report.Report, 0, len(entry.older)+1)
  306. if entry.report != nil {
  307. ret = append(ret, entry.report.Copy()) // Copy contents because this report is being unsafe-merged to
  308. }
  309. for _, v := range entry.older {
  310. if v != nil {
  311. ret = append(ret, *v) // no copy because older reports are immutable
  312. }
  313. }
  314. entry.Unlock()
  315. return ret, nil
  316. }
  317. // We are a querier: fetch the most up-to-date reports from collectors
  318. if time.Since(c.lastResolved) > time.Second*5 {
  319. c.collectors = resolve(c.cfg.CollectorAddr)
  320. c.lastResolved = time.Now()
  321. }
  322. reports := make([]*report.Report, len(c.collectors))
  323. // make a call to each collector and fetch its data for this userid
  324. g, ctx := errgroup.WithContext(ctx)
  325. for i, addr := range c.collectors {
  326. i, addr := i, addr // https://golang.org/doc/faq#closures_and_goroutines
  327. g.Go(func() error {
  328. body, err := oneCall(ctx, addr, "/api/report", userid)
  329. if err != nil {
  330. log.Warnf("error calling '%s': %v", addr, err)
  331. return nil
  332. }
  333. reports[i], err = report.MakeFromBinary(ctx, body, false, true)
  334. body.Close()
  335. if err != nil {
  336. log.Warnf("error decoding: %v", err)
  337. return nil
  338. }
  339. return nil
  340. })
  341. }
  342. if err := g.Wait(); err != nil {
  343. return nil, err
  344. }
  345. // dereference pointers into the expected return format
  346. ret := make([]report.Report, 0, len(reports))
  347. for _, rpt := range reports {
  348. if rpt != nil {
  349. ret = append(ret, *rpt)
  350. }
  351. }
  352. return ret, nil
  353. }
  354. func resolve(name string) []string {
  355. _, addrs, err := net.LookupSRV("", "", name)
  356. if err != nil {
  357. log.Warnf("Cannot resolve '%s': %v", name, err)
  358. return []string{}
  359. }
  360. endpoints := make([]string, 0, len(addrs))
  361. for _, addr := range addrs {
  362. port := strconv.Itoa(int(addr.Port))
  363. endpoints = append(endpoints, net.JoinHostPort(addr.Target, port))
  364. }
  365. return endpoints
  366. }
  367. func oneCall(ctx context.Context, endpoint, path, userid string) (io.ReadCloser, error) {
  368. fullPath := "http://" + endpoint + path
  369. req, err := http.NewRequest("GET", fullPath, nil)
  370. if err != nil {
  371. return nil, fmt.Errorf("error making request %s: %w", fullPath, err)
  372. }
  373. req = req.WithContext(ctx)
  374. req.Header.Set(user.OrgIDHeaderName, userid)
  375. req.Header.Set("Accept", "application/msgpack")
  376. req.Header.Set("Accept-Encoding", "identity") // disable compression
  377. if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
  378. var ht *nethttp.Tracer
  379. req, ht = nethttp.TraceRequest(parentSpan.Tracer(), req, nethttp.OperationName("Collector Fetch"))
  380. defer ht.Finish()
  381. }
  382. client := &http.Client{Transport: &nethttp.Transport{}}
  383. res, err := client.Do(req)
  384. if err != nil {
  385. return nil, fmt.Errorf("error getting %s: %w", fullPath, err)
  386. }
  387. if res.StatusCode != http.StatusOK {
  388. content, _ := io.ReadAll(res.Body)
  389. res.Body.Close()
  390. return nil, fmt.Errorf("error from collector: %s (%s)", res.Status, string(content))
  391. }
  392. return res.Body, nil
  393. }
  394. func (c *liveCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
  395. userid, err := c.cfg.UserIDer(ctx)
  396. if err != nil {
  397. log.Errorf("Error getting user id in WaitOn: %v", err)
  398. return
  399. }
  400. if c.nats == nil {
  401. return
  402. }
  403. sub, err := c.nats.SubscribeSync(userid)
  404. natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1)
  405. if err != nil {
  406. log.Errorf("Error subscribing for shortcuts: %v", err)
  407. return
  408. }
  409. c.waitersLock.Lock()
  410. c.waiters[watchKey{userid, waiter}] = sub
  411. c.waitersLock.Unlock()
  412. go func() {
  413. for {
  414. _, err := sub.NextMsg(natsTimeout)
  415. if err == nats.ErrTimeout {
  416. continue
  417. }
  418. natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1)
  419. if err != nil {
  420. log.Debugf("NextMsg error: %v", err)
  421. return
  422. }
  423. select {
  424. case waiter <- struct{}{}:
  425. default:
  426. }
  427. }
  428. }()
  429. }
  430. func (c *liveCollector) UnWait(ctx context.Context, waiter chan struct{}) {
  431. userid, err := c.cfg.UserIDer(ctx)
  432. if err != nil {
  433. log.Errorf("Error getting user id in WaitOn: %v", err)
  434. return
  435. }
  436. if c.nats == nil {
  437. return
  438. }
  439. c.waitersLock.Lock()
  440. key := watchKey{userid, waiter}
  441. sub := c.waiters[key]
  442. delete(c.waiters, key)
  443. c.waitersLock.Unlock()
  444. err = sub.Unsubscribe()
  445. natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1)
  446. if err != nil {
  447. log.Errorf("Error on unsubscribe: %v", err)
  448. }
  449. }
  450. // AdminSummary returns a string with some internal information about
  451. // the report, which may be useful to troubleshoot.
  452. func (c *liveCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
  453. span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.AdminSummary")
  454. defer span.Finish()
  455. userid, err := c.cfg.UserIDer(ctx)
  456. if err != nil {
  457. return "", err
  458. }
  459. _ = userid
  460. // TODO: finish implementation
  461. return "TODO", nil
  462. }