collector.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package app
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "context"
  11. "github.com/weaveworks/common/mtime"
  12. "github.com/weaveworks/scope/report"
  13. )
  14. // We merge all reports received within the specified interval, and
  15. // discard the orignals. Higher figures improve the performance of
  16. // Report(), but at the expense of lower time resolution, since time
  17. // is effectively advancing in quantiles.
  18. //
  19. // The current figure is identical to the default
  20. // probe.publishInterval, which results in performance improvements
  21. // as soon as there is more than one probe.
  22. const reportQuantisationInterval = 3 * time.Second
  23. // Reporter is something that can produce reports on demand. It's a convenient
  24. // interface for parts of the app, and several experimental components.
  25. type Reporter interface {
  26. Report(context.Context, time.Time) (report.Report, error) // must return an object that is OK to modify
  27. HasReports(context.Context, time.Time) (bool, error)
  28. HasHistoricReports() bool
  29. AdminSummary(context.Context, time.Time) (string, error)
  30. WaitOn(context.Context, chan struct{})
  31. UnWait(context.Context, chan struct{})
  32. }
  33. // WebReporter is a reporter that creates reports whose data is eventually
  34. // displayed on websites. It carries fields that will be forwarded to the
  35. // detailed.RenderContext
  36. type WebReporter struct {
  37. Reporter
  38. MetricsGraphURL string
  39. }
  40. // Adder is something that can accept reports. It's a convenient interface for
  41. // parts of the app, and several experimental components. It takes the following
  42. // arguments:
  43. // - context.Context: the request context
  44. // - report.Report: the deserialised report
  45. // - []byte: the serialised report (as gzip'd msgpack)
  46. type Adder interface {
  47. Add(context.Context, report.Report, []byte) error
  48. }
  49. // A Collector is a Reporter and an Adder
  50. type Collector interface {
  51. Reporter
  52. Adder
  53. Close()
  54. }
  55. // Collector receives published reports from multiple producers. It yields a
  56. // single merged report, representing all collected reports.
  57. type collector struct {
  58. mtx sync.Mutex
  59. reports []report.Report
  60. timestamps []time.Time
  61. window time.Duration
  62. cached *report.Report
  63. merger Merger
  64. waitableCondition
  65. }
  66. type waitableCondition struct {
  67. sync.Mutex
  68. waiters map[chan struct{}]struct{}
  69. }
  70. func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
  71. wc.Lock()
  72. wc.waiters[waiter] = struct{}{}
  73. wc.Unlock()
  74. }
  75. func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
  76. wc.Lock()
  77. delete(wc.waiters, waiter)
  78. wc.Unlock()
  79. }
  80. func (wc *waitableCondition) Broadcast() {
  81. wc.Lock()
  82. for waiter := range wc.waiters {
  83. // Non-block write to channel
  84. select {
  85. case waiter <- struct{}{}:
  86. default:
  87. }
  88. }
  89. wc.Unlock()
  90. }
  91. // NewCollector returns a collector ready for use.
  92. func NewCollector(window time.Duration) Collector {
  93. return &collector{
  94. window: window,
  95. waitableCondition: waitableCondition{
  96. waiters: map[chan struct{}]struct{}{},
  97. },
  98. merger: NewFastMerger(),
  99. }
  100. }
  101. // Close is a no-op for the regular collector
  102. func (c *collector) Close() {}
  103. // Add adds a report to the collector's internal state. It implements Adder.
  104. func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error {
  105. c.mtx.Lock()
  106. defer c.mtx.Unlock()
  107. c.reports = append(c.reports, rpt)
  108. c.timestamps = append(c.timestamps, mtime.Now())
  109. c.clean()
  110. c.cached = nil
  111. if rpt.Shortcut {
  112. c.Broadcast()
  113. }
  114. return nil
  115. }
  116. // Report returns a merged report over all added reports. It implements
  117. // Reporter.
  118. // Note we copy return a copy in case callers modify the data.
  119. func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Report, error) {
  120. c.mtx.Lock()
  121. defer c.mtx.Unlock()
  122. // If the oldest report is still within range,
  123. // and there is a cached report, return that.
  124. if c.cached != nil && len(c.reports) > 0 {
  125. oldest := timestamp.Add(-c.window)
  126. if c.timestamps[0].After(oldest) {
  127. return c.cached.Copy(), nil
  128. }
  129. }
  130. c.clean()
  131. c.quantise()
  132. for i := range c.reports {
  133. c.reports[i] = c.reports[i].Upgrade()
  134. }
  135. rpt := c.merger.Merge(c.reports)
  136. c.cached = &rpt
  137. return rpt.Copy(), nil
  138. }
  139. // HasReports indicates whether the collector contains reports between
  140. // timestamp-app.window and timestamp.
  141. func (c *collector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
  142. c.mtx.Lock()
  143. defer c.mtx.Unlock()
  144. if len(c.timestamps) < 1 {
  145. return false, nil
  146. }
  147. return !c.timestamps[0].After(timestamp) && !c.timestamps[len(c.reports)-1].Before(timestamp.Add(-c.window)), nil
  148. }
  149. // HasHistoricReports indicates whether the collector contains reports
  150. // older than now-app.window.
  151. func (c *collector) HasHistoricReports() bool {
  152. return false
  153. }
  154. // AdminSummary returns a string with some internal information about
  155. // the report, which may be useful to troubleshoot.
  156. func (c *collector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
  157. c.mtx.Lock()
  158. defer c.mtx.Unlock()
  159. var b strings.Builder
  160. for i := range c.reports {
  161. fmt.Fprintf(&b, "%v: ", c.timestamps[i].Format(time.StampMilli))
  162. b.WriteString(c.reports[i].Summary())
  163. b.WriteByte('\n')
  164. }
  165. return b.String(), nil
  166. }
  167. // remove reports older than the app.window
  168. func (c *collector) clean() {
  169. var (
  170. cleanedReports = make([]report.Report, 0, len(c.reports))
  171. cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
  172. oldest = mtime.Now().Add(-c.window)
  173. )
  174. for i, r := range c.reports {
  175. if c.timestamps[i].After(oldest) {
  176. cleanedReports = append(cleanedReports, r)
  177. cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
  178. }
  179. }
  180. c.reports = cleanedReports
  181. c.timestamps = cleanedTimestamps
  182. }
  183. // Merge reports received within the same reportQuantisationInterval.
  184. //
  185. // Quantisation is relative to the time of the first report in a given
  186. // interval, rather than absolute time. So, for example, with a
  187. // reportQuantisationInterval of 3s and reports with timestamps [0, 1,
  188. // 2, 5, 6, 7], the result contains merged reports with
  189. // timestamps/content of [0:{0,1,2}, 5:{5,6,7}].
  190. func (c *collector) quantise() {
  191. if len(c.reports) == 0 {
  192. return
  193. }
  194. var (
  195. quantisedReports = make([]report.Report, 0, len(c.reports))
  196. quantisedTimestamps = make([]time.Time, 0, len(c.timestamps))
  197. )
  198. quantumStartIdx := 0
  199. quantumStartTimestamp := c.timestamps[0]
  200. for i, t := range c.timestamps {
  201. if t.Sub(quantumStartTimestamp) < reportQuantisationInterval {
  202. continue
  203. }
  204. quantisedReports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:i]))
  205. quantisedTimestamps = append(quantisedTimestamps, quantumStartTimestamp)
  206. quantumStartIdx = i
  207. quantumStartTimestamp = t
  208. }
  209. c.reports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:]))
  210. c.timestamps = append(quantisedTimestamps, c.timestamps[quantumStartIdx])
  211. }
  212. // StaticCollector always returns the given report.
  213. type StaticCollector report.Report
  214. // Report returns a merged report over all added reports. It implements
  215. // Reporter.
  216. func (c StaticCollector) Report(context.Context, time.Time) (report.Report, error) {
  217. return report.Report(c).Copy(), nil
  218. }
  219. // Close is a no-op for the static collector
  220. func (c StaticCollector) Close() {}
  221. // HasReports indicates whether the collector contains reports between
  222. // timestamp-app.window and timestamp.
  223. func (c StaticCollector) HasReports(context.Context, time.Time) (bool, error) {
  224. return true, nil
  225. }
  226. // HasHistoricReports indicates whether the collector contains reports
  227. // older than now-app.window.
  228. func (c StaticCollector) HasHistoricReports() bool {
  229. return false
  230. }
  231. // AdminSummary implements Reporter
  232. func (c StaticCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
  233. return "not implemented", nil
  234. }
  235. // Add adds a report to the collector's internal state. It implements Adder.
  236. func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil }
  237. // WaitOn lets other components wait on a new report being received. It
  238. // implements Reporter.
  239. func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
  240. // UnWait lets other components stop waiting on a new report being received. It
  241. // implements Reporter.
  242. func (c StaticCollector) UnWait(context.Context, chan struct{}) {}
  243. // NewFileCollector reads and parses the files at path (a file or
  244. // directory) as reports. If there are multiple files, and they all
  245. // have names representing "nanoseconds since epoch" timestamps,
  246. // e.g. "1488557088545489008.msgpack.gz", then the collector will
  247. // return merged reports resulting from replaying the file reports in
  248. // a loop at a sequence and speed determined by the timestamps.
  249. // Otherwise the collector always returns the merger of all reports.
  250. func NewFileCollector(path string, window time.Duration) (Collector, error) {
  251. var (
  252. timestamps []time.Time
  253. reports []report.Report
  254. )
  255. allTimestamped := true
  256. if err := filepath.Walk(path,
  257. func(p string, info os.FileInfo, err error) error {
  258. if err != nil {
  259. return err
  260. }
  261. if info.IsDir() {
  262. return nil
  263. }
  264. t, err := timestampFromFilepath(p)
  265. if err != nil {
  266. allTimestamped = false
  267. }
  268. timestamps = append(timestamps, t)
  269. rpt, err := report.MakeFromFile(context.Background(), p)
  270. if err != nil {
  271. return err
  272. }
  273. reports = append(reports, *rpt)
  274. return nil
  275. }); err != nil {
  276. return nil, err
  277. }
  278. if len(reports) > 1 && allTimestamped {
  279. collector := NewCollector(window)
  280. go replay(collector, timestamps, reports)
  281. return collector, nil
  282. }
  283. return StaticCollector(NewFastMerger().Merge(reports).Upgrade()), nil
  284. }
  285. func timestampFromFilepath(path string) (time.Time, error) {
  286. name := filepath.Base(path)
  287. for {
  288. ext := filepath.Ext(name)
  289. if ext == "" {
  290. break
  291. }
  292. name = strings.TrimSuffix(name, ext)
  293. }
  294. nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
  295. if err != nil {
  296. return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
  297. }
  298. return time.Unix(0, nanosecondsSinceEpoch), nil
  299. }
  300. func replay(a Adder, timestamps []time.Time, reports []report.Report) {
  301. // calculate delays between report n and n+1
  302. l := len(timestamps)
  303. delays := make([]time.Duration, l, l)
  304. for i, t := range timestamps[0 : l-1] {
  305. delays[i] = timestamps[i+1].Sub(t)
  306. if delays[i] < 0 {
  307. panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
  308. }
  309. }
  310. // We don't know how long to wait before looping round, so make a
  311. // good guess.
  312. delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)
  313. due := time.Now()
  314. for {
  315. for i, r := range reports {
  316. a.Add(nil, r, nil)
  317. due = due.Add(delays[i])
  318. delay := due.Sub(time.Now())
  319. if delay > 0 {
  320. time.Sleep(delay)
  321. }
  322. }
  323. }
  324. }