123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- package app
- import (
- "fmt"
- "os"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "time"
- "context"
- "github.com/weaveworks/common/mtime"
- "github.com/weaveworks/scope/report"
- )
- // We merge all reports received within the specified interval, and
- // discard the orignals. Higher figures improve the performance of
- // Report(), but at the expense of lower time resolution, since time
- // is effectively advancing in quantiles.
- //
- // The current figure is identical to the default
- // probe.publishInterval, which results in performance improvements
- // as soon as there is more than one probe.
- const reportQuantisationInterval = 3 * time.Second
- // Reporter is something that can produce reports on demand. It's a convenient
- // interface for parts of the app, and several experimental components.
- type Reporter interface {
- Report(context.Context, time.Time) (report.Report, error) // must return an object that is OK to modify
- HasReports(context.Context, time.Time) (bool, error)
- HasHistoricReports() bool
- AdminSummary(context.Context, time.Time) (string, error)
- WaitOn(context.Context, chan struct{})
- UnWait(context.Context, chan struct{})
- }
- // WebReporter is a reporter that creates reports whose data is eventually
- // displayed on websites. It carries fields that will be forwarded to the
- // detailed.RenderContext
- type WebReporter struct {
- Reporter
- MetricsGraphURL string
- }
- // Adder is something that can accept reports. It's a convenient interface for
- // parts of the app, and several experimental components. It takes the following
- // arguments:
- // - context.Context: the request context
- // - report.Report: the deserialised report
- // - []byte: the serialised report (as gzip'd msgpack)
- type Adder interface {
- Add(context.Context, report.Report, []byte) error
- }
- // A Collector is a Reporter and an Adder
- type Collector interface {
- Reporter
- Adder
- Close()
- }
- // Collector receives published reports from multiple producers. It yields a
- // single merged report, representing all collected reports.
- type collector struct {
- mtx sync.Mutex
- reports []report.Report
- timestamps []time.Time
- window time.Duration
- cached *report.Report
- merger Merger
- waitableCondition
- }
- type waitableCondition struct {
- sync.Mutex
- waiters map[chan struct{}]struct{}
- }
- func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
- wc.Lock()
- wc.waiters[waiter] = struct{}{}
- wc.Unlock()
- }
- func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
- wc.Lock()
- delete(wc.waiters, waiter)
- wc.Unlock()
- }
- func (wc *waitableCondition) Broadcast() {
- wc.Lock()
- for waiter := range wc.waiters {
- // Non-block write to channel
- select {
- case waiter <- struct{}{}:
- default:
- }
- }
- wc.Unlock()
- }
- // NewCollector returns a collector ready for use.
- func NewCollector(window time.Duration) Collector {
- return &collector{
- window: window,
- waitableCondition: waitableCondition{
- waiters: map[chan struct{}]struct{}{},
- },
- merger: NewFastMerger(),
- }
- }
- // Close is a no-op for the regular collector
- func (c *collector) Close() {}
- // Add adds a report to the collector's internal state. It implements Adder.
- func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error {
- c.mtx.Lock()
- defer c.mtx.Unlock()
- c.reports = append(c.reports, rpt)
- c.timestamps = append(c.timestamps, mtime.Now())
- c.clean()
- c.cached = nil
- if rpt.Shortcut {
- c.Broadcast()
- }
- return nil
- }
- // Report returns a merged report over all added reports. It implements
- // Reporter.
- // Note we copy return a copy in case callers modify the data.
- func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Report, error) {
- c.mtx.Lock()
- defer c.mtx.Unlock()
- // If the oldest report is still within range,
- // and there is a cached report, return that.
- if c.cached != nil && len(c.reports) > 0 {
- oldest := timestamp.Add(-c.window)
- if c.timestamps[0].After(oldest) {
- return c.cached.Copy(), nil
- }
- }
- c.clean()
- c.quantise()
- for i := range c.reports {
- c.reports[i] = c.reports[i].Upgrade()
- }
- rpt := c.merger.Merge(c.reports)
- c.cached = &rpt
- return rpt.Copy(), nil
- }
- // HasReports indicates whether the collector contains reports between
- // timestamp-app.window and timestamp.
- func (c *collector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
- c.mtx.Lock()
- defer c.mtx.Unlock()
- if len(c.timestamps) < 1 {
- return false, nil
- }
- return !c.timestamps[0].After(timestamp) && !c.timestamps[len(c.reports)-1].Before(timestamp.Add(-c.window)), nil
- }
- // HasHistoricReports indicates whether the collector contains reports
- // older than now-app.window.
- func (c *collector) HasHistoricReports() bool {
- return false
- }
- // AdminSummary returns a string with some internal information about
- // the report, which may be useful to troubleshoot.
- func (c *collector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
- c.mtx.Lock()
- defer c.mtx.Unlock()
- var b strings.Builder
- for i := range c.reports {
- fmt.Fprintf(&b, "%v: ", c.timestamps[i].Format(time.StampMilli))
- b.WriteString(c.reports[i].Summary())
- b.WriteByte('\n')
- }
- return b.String(), nil
- }
- // remove reports older than the app.window
- func (c *collector) clean() {
- var (
- cleanedReports = make([]report.Report, 0, len(c.reports))
- cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
- oldest = mtime.Now().Add(-c.window)
- )
- for i, r := range c.reports {
- if c.timestamps[i].After(oldest) {
- cleanedReports = append(cleanedReports, r)
- cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
- }
- }
- c.reports = cleanedReports
- c.timestamps = cleanedTimestamps
- }
- // Merge reports received within the same reportQuantisationInterval.
- //
- // Quantisation is relative to the time of the first report in a given
- // interval, rather than absolute time. So, for example, with a
- // reportQuantisationInterval of 3s and reports with timestamps [0, 1,
- // 2, 5, 6, 7], the result contains merged reports with
- // timestamps/content of [0:{0,1,2}, 5:{5,6,7}].
- func (c *collector) quantise() {
- if len(c.reports) == 0 {
- return
- }
- var (
- quantisedReports = make([]report.Report, 0, len(c.reports))
- quantisedTimestamps = make([]time.Time, 0, len(c.timestamps))
- )
- quantumStartIdx := 0
- quantumStartTimestamp := c.timestamps[0]
- for i, t := range c.timestamps {
- if t.Sub(quantumStartTimestamp) < reportQuantisationInterval {
- continue
- }
- quantisedReports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:i]))
- quantisedTimestamps = append(quantisedTimestamps, quantumStartTimestamp)
- quantumStartIdx = i
- quantumStartTimestamp = t
- }
- c.reports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:]))
- c.timestamps = append(quantisedTimestamps, c.timestamps[quantumStartIdx])
- }
- // StaticCollector always returns the given report.
- type StaticCollector report.Report
- // Report returns a merged report over all added reports. It implements
- // Reporter.
- func (c StaticCollector) Report(context.Context, time.Time) (report.Report, error) {
- return report.Report(c).Copy(), nil
- }
- // Close is a no-op for the static collector
- func (c StaticCollector) Close() {}
- // HasReports indicates whether the collector contains reports between
- // timestamp-app.window and timestamp.
- func (c StaticCollector) HasReports(context.Context, time.Time) (bool, error) {
- return true, nil
- }
- // HasHistoricReports indicates whether the collector contains reports
- // older than now-app.window.
- func (c StaticCollector) HasHistoricReports() bool {
- return false
- }
- // AdminSummary implements Reporter
- func (c StaticCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
- return "not implemented", nil
- }
- // Add adds a report to the collector's internal state. It implements Adder.
- func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil }
- // WaitOn lets other components wait on a new report being received. It
- // implements Reporter.
- func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
- // UnWait lets other components stop waiting on a new report being received. It
- // implements Reporter.
- func (c StaticCollector) UnWait(context.Context, chan struct{}) {}
- // NewFileCollector reads and parses the files at path (a file or
- // directory) as reports. If there are multiple files, and they all
- // have names representing "nanoseconds since epoch" timestamps,
- // e.g. "1488557088545489008.msgpack.gz", then the collector will
- // return merged reports resulting from replaying the file reports in
- // a loop at a sequence and speed determined by the timestamps.
- // Otherwise the collector always returns the merger of all reports.
- func NewFileCollector(path string, window time.Duration) (Collector, error) {
- var (
- timestamps []time.Time
- reports []report.Report
- )
- allTimestamped := true
- if err := filepath.Walk(path,
- func(p string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if info.IsDir() {
- return nil
- }
- t, err := timestampFromFilepath(p)
- if err != nil {
- allTimestamped = false
- }
- timestamps = append(timestamps, t)
- rpt, err := report.MakeFromFile(context.Background(), p)
- if err != nil {
- return err
- }
- reports = append(reports, *rpt)
- return nil
- }); err != nil {
- return nil, err
- }
- if len(reports) > 1 && allTimestamped {
- collector := NewCollector(window)
- go replay(collector, timestamps, reports)
- return collector, nil
- }
- return StaticCollector(NewFastMerger().Merge(reports).Upgrade()), nil
- }
- func timestampFromFilepath(path string) (time.Time, error) {
- name := filepath.Base(path)
- for {
- ext := filepath.Ext(name)
- if ext == "" {
- break
- }
- name = strings.TrimSuffix(name, ext)
- }
- nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
- if err != nil {
- return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
- }
- return time.Unix(0, nanosecondsSinceEpoch), nil
- }
- func replay(a Adder, timestamps []time.Time, reports []report.Report) {
- // calculate delays between report n and n+1
- l := len(timestamps)
- delays := make([]time.Duration, l, l)
- for i, t := range timestamps[0 : l-1] {
- delays[i] = timestamps[i+1].Sub(t)
- if delays[i] < 0 {
- panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
- }
- }
- // We don't know how long to wait before looping round, so make a
- // good guess.
- delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)
- due := time.Now()
- for {
- for i, r := range reports {
- a.Add(nil, r, nil)
- due = due.Add(delays[i])
- delay := due.Sub(time.Now())
- if delay > 0 {
- time.Sleep(delay)
- }
- }
- }
- }
|