package app import ( "fmt" "os" "path/filepath" "strconv" "strings" "sync" "time" "context" "github.com/weaveworks/common/mtime" "github.com/weaveworks/scope/report" ) // We merge all reports received within the specified interval, and // discard the orignals. Higher figures improve the performance of // Report(), but at the expense of lower time resolution, since time // is effectively advancing in quantiles. // // The current figure is identical to the default // probe.publishInterval, which results in performance improvements // as soon as there is more than one probe. const reportQuantisationInterval = 3 * time.Second // Reporter is something that can produce reports on demand. It's a convenient // interface for parts of the app, and several experimental components. type Reporter interface { Report(context.Context, time.Time) (report.Report, error) // must return an object that is OK to modify HasReports(context.Context, time.Time) (bool, error) HasHistoricReports() bool AdminSummary(context.Context, time.Time) (string, error) WaitOn(context.Context, chan struct{}) UnWait(context.Context, chan struct{}) } // WebReporter is a reporter that creates reports whose data is eventually // displayed on websites. It carries fields that will be forwarded to the // detailed.RenderContext type WebReporter struct { Reporter MetricsGraphURL string } // Adder is something that can accept reports. It's a convenient interface for // parts of the app, and several experimental components. It takes the following // arguments: // - context.Context: the request context // - report.Report: the deserialised report // - []byte: the serialised report (as gzip'd msgpack) type Adder interface { Add(context.Context, report.Report, []byte) error } // A Collector is a Reporter and an Adder type Collector interface { Reporter Adder Close() } // Collector receives published reports from multiple producers. It yields a // single merged report, representing all collected reports. type collector struct { mtx sync.Mutex reports []report.Report timestamps []time.Time window time.Duration cached *report.Report merger Merger waitableCondition } type waitableCondition struct { sync.Mutex waiters map[chan struct{}]struct{} } func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) { wc.Lock() wc.waiters[waiter] = struct{}{} wc.Unlock() } func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) { wc.Lock() delete(wc.waiters, waiter) wc.Unlock() } func (wc *waitableCondition) Broadcast() { wc.Lock() for waiter := range wc.waiters { // Non-block write to channel select { case waiter <- struct{}{}: default: } } wc.Unlock() } // NewCollector returns a collector ready for use. func NewCollector(window time.Duration) Collector { return &collector{ window: window, waitableCondition: waitableCondition{ waiters: map[chan struct{}]struct{}{}, }, merger: NewFastMerger(), } } // Close is a no-op for the regular collector func (c *collector) Close() {} // Add adds a report to the collector's internal state. It implements Adder. func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error { c.mtx.Lock() defer c.mtx.Unlock() c.reports = append(c.reports, rpt) c.timestamps = append(c.timestamps, mtime.Now()) c.clean() c.cached = nil if rpt.Shortcut { c.Broadcast() } return nil } // Report returns a merged report over all added reports. It implements // Reporter. // Note we copy return a copy in case callers modify the data. func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Report, error) { c.mtx.Lock() defer c.mtx.Unlock() // If the oldest report is still within range, // and there is a cached report, return that. if c.cached != nil && len(c.reports) > 0 { oldest := timestamp.Add(-c.window) if c.timestamps[0].After(oldest) { return c.cached.Copy(), nil } } c.clean() c.quantise() for i := range c.reports { c.reports[i] = c.reports[i].Upgrade() } rpt := c.merger.Merge(c.reports) c.cached = &rpt return rpt.Copy(), nil } // HasReports indicates whether the collector contains reports between // timestamp-app.window and timestamp. func (c *collector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) { c.mtx.Lock() defer c.mtx.Unlock() if len(c.timestamps) < 1 { return false, nil } return !c.timestamps[0].After(timestamp) && !c.timestamps[len(c.reports)-1].Before(timestamp.Add(-c.window)), nil } // HasHistoricReports indicates whether the collector contains reports // older than now-app.window. func (c *collector) HasHistoricReports() bool { return false } // AdminSummary returns a string with some internal information about // the report, which may be useful to troubleshoot. func (c *collector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) { c.mtx.Lock() defer c.mtx.Unlock() var b strings.Builder for i := range c.reports { fmt.Fprintf(&b, "%v: ", c.timestamps[i].Format(time.StampMilli)) b.WriteString(c.reports[i].Summary()) b.WriteByte('\n') } return b.String(), nil } // remove reports older than the app.window func (c *collector) clean() { var ( cleanedReports = make([]report.Report, 0, len(c.reports)) cleanedTimestamps = make([]time.Time, 0, len(c.timestamps)) oldest = mtime.Now().Add(-c.window) ) for i, r := range c.reports { if c.timestamps[i].After(oldest) { cleanedReports = append(cleanedReports, r) cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i]) } } c.reports = cleanedReports c.timestamps = cleanedTimestamps } // Merge reports received within the same reportQuantisationInterval. // // Quantisation is relative to the time of the first report in a given // interval, rather than absolute time. So, for example, with a // reportQuantisationInterval of 3s and reports with timestamps [0, 1, // 2, 5, 6, 7], the result contains merged reports with // timestamps/content of [0:{0,1,2}, 5:{5,6,7}]. func (c *collector) quantise() { if len(c.reports) == 0 { return } var ( quantisedReports = make([]report.Report, 0, len(c.reports)) quantisedTimestamps = make([]time.Time, 0, len(c.timestamps)) ) quantumStartIdx := 0 quantumStartTimestamp := c.timestamps[0] for i, t := range c.timestamps { if t.Sub(quantumStartTimestamp) < reportQuantisationInterval { continue } quantisedReports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:i])) quantisedTimestamps = append(quantisedTimestamps, quantumStartTimestamp) quantumStartIdx = i quantumStartTimestamp = t } c.reports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:])) c.timestamps = append(quantisedTimestamps, c.timestamps[quantumStartIdx]) } // StaticCollector always returns the given report. type StaticCollector report.Report // Report returns a merged report over all added reports. It implements // Reporter. func (c StaticCollector) Report(context.Context, time.Time) (report.Report, error) { return report.Report(c).Copy(), nil } // Close is a no-op for the static collector func (c StaticCollector) Close() {} // HasReports indicates whether the collector contains reports between // timestamp-app.window and timestamp. func (c StaticCollector) HasReports(context.Context, time.Time) (bool, error) { return true, nil } // HasHistoricReports indicates whether the collector contains reports // older than now-app.window. func (c StaticCollector) HasHistoricReports() bool { return false } // AdminSummary implements Reporter func (c StaticCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) { return "not implemented", nil } // Add adds a report to the collector's internal state. It implements Adder. func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil } // WaitOn lets other components wait on a new report being received. It // implements Reporter. func (c StaticCollector) WaitOn(context.Context, chan struct{}) {} // UnWait lets other components stop waiting on a new report being received. It // implements Reporter. func (c StaticCollector) UnWait(context.Context, chan struct{}) {} // NewFileCollector reads and parses the files at path (a file or // directory) as reports. If there are multiple files, and they all // have names representing "nanoseconds since epoch" timestamps, // e.g. "1488557088545489008.msgpack.gz", then the collector will // return merged reports resulting from replaying the file reports in // a loop at a sequence and speed determined by the timestamps. // Otherwise the collector always returns the merger of all reports. func NewFileCollector(path string, window time.Duration) (Collector, error) { var ( timestamps []time.Time reports []report.Report ) allTimestamped := true if err := filepath.Walk(path, func(p string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } t, err := timestampFromFilepath(p) if err != nil { allTimestamped = false } timestamps = append(timestamps, t) rpt, err := report.MakeFromFile(context.Background(), p) if err != nil { return err } reports = append(reports, *rpt) return nil }); err != nil { return nil, err } if len(reports) > 1 && allTimestamped { collector := NewCollector(window) go replay(collector, timestamps, reports) return collector, nil } return StaticCollector(NewFastMerger().Merge(reports).Upgrade()), nil } func timestampFromFilepath(path string) (time.Time, error) { name := filepath.Base(path) for { ext := filepath.Ext(name) if ext == "" { break } name = strings.TrimSuffix(name, ext) } nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64) if err != nil { return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err) } return time.Unix(0, nanosecondsSinceEpoch), nil } func replay(a Adder, timestamps []time.Time, reports []report.Report) { // calculate delays between report n and n+1 l := len(timestamps) delays := make([]time.Duration, l, l) for i, t := range timestamps[0 : l-1] { delays[i] = timestamps[i+1].Sub(t) if delays[i] < 0 { panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps)) } } // We don't know how long to wait before looping round, so make a // good guess. delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l) due := time.Now() for { for i, r := range reports { a.Add(nil, r, nil) due = due.Add(delays[i]) delay := due.Sub(time.Now()) if delay > 0 { time.Sleep(delay) } } } }