123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- package multitenant
- // Collect reports from probes per-tenant, and supply them to queriers on demand
- import (
- "encoding/json"
- "fmt"
- "io"
- "net"
- "net/http"
- "strconv"
- "sync"
- "time"
- "context"
- "github.com/nats-io/nats"
- "github.com/opentracing-contrib/go-stdlib/nethttp"
- opentracing "github.com/opentracing/opentracing-go"
- otlog "github.com/opentracing/opentracing-go/log"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "github.com/weaveworks/common/instrument"
- "github.com/weaveworks/common/user"
- "github.com/weaveworks/scope/app"
- "github.com/weaveworks/scope/report"
- "golang.org/x/sync/errgroup"
- )
- var (
- topologiesDropped = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "topologies_dropped_total",
- Help: "Total count of topologies dropped for being over limit.",
- }, []string{"user", "topology"})
- natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "nats_requests_total",
- Help: "Total count of NATS requests.",
- }, []string{"method", "status_code"})
- reportReceivedSizeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
- Namespace: "scope",
- Name: "report_received_size_bytes",
- Help: "Distribution of received report sizes",
- Buckets: prometheus.ExponentialBuckets(4096, 2.0, 10),
- })
- reportsReceivedPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "reports_received_total",
- Help: "Total count of received reports per user.",
- }, []string{"user"})
- shortcutsReceivedPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "shortcut_reports_received_total",
- Help: "Total count of received shortcut reports per user.",
- }, []string{"user"})
- reportReceivedSizePerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "reports_received_bytes_total",
- Help: "Total bytes received in reports per user.",
- }, []string{"user"})
- )
- func registerLiveCollectorMetrics() {
- prometheus.MustRegister(topologiesDropped)
- prometheus.MustRegister(natsRequests)
- prometheus.MustRegister(reportReceivedSizeHistogram)
- prometheus.MustRegister(reportsReceivedPerUser)
- prometheus.MustRegister(shortcutsReceivedPerUser)
- prometheus.MustRegister(reportReceivedSizePerUser)
- }
- var registerLiveCollectorMetricsOnce sync.Once
- // LiveCollectorConfig has everything we need to make a collector for live multitenant data.
- type LiveCollectorConfig struct {
- UserIDer UserIDer
- NatsHost string
- MemcacheClient *MemcacheClient
- Window time.Duration
- TickInterval time.Duration
- MaxTopNodes int
- CollectorAddr string
- }
- type liveCollector struct {
- cfg LiveCollectorConfig
- merger app.Merger
- pending sync.Map
- ticker *time.Ticker
- tickCallbacks []func(context.Context)
- nats *nats.Conn
- waitersLock sync.Mutex
- waiters map[watchKey]*nats.Subscription
- collectors []string
- lastResolved time.Time
- }
- // if StoreInterval is set, reports are merged into here and held until flushed to store
- type pendingEntry struct {
- sync.Mutex
- report *report.Report
- older []*report.Report
- }
- // NewLiveCollector makes a new LiveCollector from the supplied config.
- func NewLiveCollector(config LiveCollectorConfig) (app.Collector, error) {
- c := &liveCollector{
- cfg: config,
- }
- return c, c.init()
- }
- func (c *liveCollector) init() error {
- registerLiveCollectorMetricsOnce.Do(registerLiveCollectorMetrics)
- var nc *nats.Conn
- if c.cfg.NatsHost != "" {
- if c.cfg.MemcacheClient == nil {
- return fmt.Errorf("Must supply memcache client when using nats")
- }
- var err error
- nc, err = nats.Connect(c.cfg.NatsHost)
- if err != nil {
- return err
- }
- }
- c.nats = nc
- c.merger = app.NewFastMerger()
- c.waiters = make(map[watchKey]*nats.Subscription)
- if c.isCollector() {
- if c.cfg.TickInterval == 0 {
- return fmt.Errorf("--app.collector.tick-interval or --app.collector.store-interval must be non-zero for a collector")
- }
- c.ticker = time.NewTicker(c.cfg.TickInterval)
- go c.tickLoop()
- }
- c.tickCallbacks = append(c.tickCallbacks, c.bumpPending)
- return nil
- }
- func (c *liveCollector) tickLoop() {
- for range c.ticker.C {
- for _, f := range c.tickCallbacks {
- f(context.Background())
- }
- }
- }
- // Close will close things down
- func (c *liveCollector) Close() {
- c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running
- }
- // Range over all users (instances) that have pending reports and shift the data back in the array
- func (c *liveCollector) bumpPending(ctx context.Context) {
- c.pending.Range(func(key, value interface{}) bool {
- entry := value.(*pendingEntry)
- entry.Lock()
- rpt := entry.report
- entry.report = nil
- if entry.older == nil {
- entry.older = make([]*report.Report, c.cfg.Window/c.cfg.TickInterval)
- } else {
- copy(entry.older[1:], entry.older) // move everything down one
- }
- entry.older[0] = rpt
- entry.Unlock()
- return true
- })
- }
- func (c *liveCollector) HasHistoricReports() bool {
- return false
- }
- func (c *liveCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return false, err
- }
- if time.Since(timestamp) < c.cfg.Window {
- has, err := c.hasReportsFromLive(ctx, userid)
- return has, err
- }
- return false, nil
- }
- func (c *liveCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return err
- }
- reportReceivedSizeHistogram.Observe(float64(len(buf)))
- reportReceivedSizePerUser.WithLabelValues(userid).Add(float64(len(buf)))
- reportsReceivedPerUser.WithLabelValues(userid).Inc()
- // Shortcut reports are published to nats but not persisted -
- // we'll get a full report from the same probe in a few seconds
- if rep.Shortcut {
- shortcutsReceivedPerUser.WithLabelValues(userid).Inc()
- if c.nats != nil {
- _, _, reportKey := calculateReportKeys(userid, time.Now())
- _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
- if err != nil {
- log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err)
- // No point publishing on nats if cache store failed
- return nil
- }
- err := c.nats.Publish(userid, []byte(reportKey))
- natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
- if err != nil {
- log.Errorf("Error sending shortcut report: %v", err)
- }
- }
- return nil
- }
- rep = c.massageReport(userid, rep)
- c.addToLive(ctx, userid, rep)
- return nil
- }
- // process a report from a probe which may be at an older version or overloaded
- func (c *liveCollector) massageReport(userid string, report report.Report) report.Report {
- if c.cfg.MaxTopNodes > 0 {
- max := c.cfg.MaxTopNodes
- if len(report.Host.Nodes) > 1 {
- max = max * len(report.Host.Nodes) // higher limit for merged reports
- }
- var dropped []string
- report, dropped = report.DropTopologiesOver(max)
- for _, name := range dropped {
- topologiesDropped.WithLabelValues(userid, name).Inc()
- }
- }
- report = report.Upgrade()
- return report
- }
- // We are building up a report in memory; merge into that (for awsCollector it will be saved shortly)
- // NOTE: may retain a reference to rep; must not be used by caller after this.
- func (c *liveCollector) addToLive(ctx context.Context, userid string, rep report.Report) {
- entry := &pendingEntry{}
- if e, found := c.pending.LoadOrStore(userid, entry); found {
- entry = e.(*pendingEntry)
- }
- entry.Lock()
- if entry.report == nil {
- entry.report = &rep
- } else {
- entry.report.UnsafeMerge(rep)
- }
- entry.Unlock()
- }
- func (c *liveCollector) isCollector() bool {
- return c.cfg.CollectorAddr == ""
- }
- func (c *liveCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "hasReportsFromLive")
- defer span.Finish()
- if c.isCollector() {
- e, found := c.pending.Load(userid)
- if !found {
- return false, nil
- }
- entry := e.(*pendingEntry)
- entry.Lock()
- defer entry.Unlock()
- if entry.report != nil {
- return true, nil
- }
- for _, v := range entry.older {
- if v != nil {
- return true, nil
- }
- }
- return false, nil
- }
- // We are a querier: ask each collector if it has any
- // (serially, since we will bail out on the first one that has reports)
- addrs := resolve(c.cfg.CollectorAddr)
- for _, addr := range addrs {
- body, err := oneCall(ctx, addr, "/api/probes?sparse=true", userid)
- if err != nil {
- return false, err
- }
- var hasReports bool
- decoder := json.NewDecoder(body)
- if err := decoder.Decode(&hasReports); err != nil {
- log.Errorf("Error encoding response: %v", err)
- }
- body.Close()
- if hasReports {
- return true, nil
- }
- }
- return false, nil
- }
- func (c *liveCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.Report")
- defer span.Finish()
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return report.MakeReport(), err
- }
- span.SetTag("userid", userid)
- var reports []report.Report
- if time.Since(timestamp) < c.cfg.Window {
- reports, err = c.reportsFromLive(ctx, userid)
- }
- if err != nil {
- return report.MakeReport(), err
- }
- span.LogFields(otlog.Int("merging", len(reports)))
- return c.merger.Merge(reports), nil
- }
- func (c *liveCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "reportsFromLive")
- defer span.Finish()
- if c.isCollector() {
- e, found := c.pending.Load(userid)
- if !found {
- return nil, nil
- }
- entry := e.(*pendingEntry)
- entry.Lock()
- ret := make([]report.Report, 0, len(entry.older)+1)
- if entry.report != nil {
- ret = append(ret, entry.report.Copy()) // Copy contents because this report is being unsafe-merged to
- }
- for _, v := range entry.older {
- if v != nil {
- ret = append(ret, *v) // no copy because older reports are immutable
- }
- }
- entry.Unlock()
- return ret, nil
- }
- // We are a querier: fetch the most up-to-date reports from collectors
- if time.Since(c.lastResolved) > time.Second*5 {
- c.collectors = resolve(c.cfg.CollectorAddr)
- c.lastResolved = time.Now()
- }
- reports := make([]*report.Report, len(c.collectors))
- // make a call to each collector and fetch its data for this userid
- g, ctx := errgroup.WithContext(ctx)
- for i, addr := range c.collectors {
- i, addr := i, addr // https://golang.org/doc/faq#closures_and_goroutines
- g.Go(func() error {
- body, err := oneCall(ctx, addr, "/api/report", userid)
- if err != nil {
- log.Warnf("error calling '%s': %v", addr, err)
- return nil
- }
- reports[i], err = report.MakeFromBinary(ctx, body, false, true)
- body.Close()
- if err != nil {
- log.Warnf("error decoding: %v", err)
- return nil
- }
- return nil
- })
- }
- if err := g.Wait(); err != nil {
- return nil, err
- }
- // dereference pointers into the expected return format
- ret := make([]report.Report, 0, len(reports))
- for _, rpt := range reports {
- if rpt != nil {
- ret = append(ret, *rpt)
- }
- }
- return ret, nil
- }
- func resolve(name string) []string {
- _, addrs, err := net.LookupSRV("", "", name)
- if err != nil {
- log.Warnf("Cannot resolve '%s': %v", name, err)
- return []string{}
- }
- endpoints := make([]string, 0, len(addrs))
- for _, addr := range addrs {
- port := strconv.Itoa(int(addr.Port))
- endpoints = append(endpoints, net.JoinHostPort(addr.Target, port))
- }
- return endpoints
- }
- func oneCall(ctx context.Context, endpoint, path, userid string) (io.ReadCloser, error) {
- fullPath := "http://" + endpoint + path
- req, err := http.NewRequest("GET", fullPath, nil)
- if err != nil {
- return nil, fmt.Errorf("error making request %s: %w", fullPath, err)
- }
- req = req.WithContext(ctx)
- req.Header.Set(user.OrgIDHeaderName, userid)
- req.Header.Set("Accept", "application/msgpack")
- req.Header.Set("Accept-Encoding", "identity") // disable compression
- if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
- var ht *nethttp.Tracer
- req, ht = nethttp.TraceRequest(parentSpan.Tracer(), req, nethttp.OperationName("Collector Fetch"))
- defer ht.Finish()
- }
- client := &http.Client{Transport: &nethttp.Transport{}}
- res, err := client.Do(req)
- if err != nil {
- return nil, fmt.Errorf("error getting %s: %w", fullPath, err)
- }
- if res.StatusCode != http.StatusOK {
- content, _ := io.ReadAll(res.Body)
- res.Body.Close()
- return nil, fmt.Errorf("error from collector: %s (%s)", res.Status, string(content))
- }
- return res.Body, nil
- }
- func (c *liveCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- log.Errorf("Error getting user id in WaitOn: %v", err)
- return
- }
- if c.nats == nil {
- return
- }
- sub, err := c.nats.SubscribeSync(userid)
- natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1)
- if err != nil {
- log.Errorf("Error subscribing for shortcuts: %v", err)
- return
- }
- c.waitersLock.Lock()
- c.waiters[watchKey{userid, waiter}] = sub
- c.waitersLock.Unlock()
- go func() {
- for {
- _, err := sub.NextMsg(natsTimeout)
- if err == nats.ErrTimeout {
- continue
- }
- natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1)
- if err != nil {
- log.Debugf("NextMsg error: %v", err)
- return
- }
- select {
- case waiter <- struct{}{}:
- default:
- }
- }
- }()
- }
- func (c *liveCollector) UnWait(ctx context.Context, waiter chan struct{}) {
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- log.Errorf("Error getting user id in WaitOn: %v", err)
- return
- }
- if c.nats == nil {
- return
- }
- c.waitersLock.Lock()
- key := watchKey{userid, waiter}
- sub := c.waiters[key]
- delete(c.waiters, key)
- c.waitersLock.Unlock()
- err = sub.Unsubscribe()
- natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1)
- if err != nil {
- log.Errorf("Error on unsubscribe: %v", err)
- }
- }
- // AdminSummary returns a string with some internal information about
- // the report, which may be useful to troubleshoot.
- func (c *liveCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.AdminSummary")
- defer span.Finish()
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return "", err
- }
- _ = userid
- // TODO: finish implementation
- return "TODO", nil
- }
|