aws_collector.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. package multitenant
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/awserr"
  13. "github.com/aws/aws-sdk-go/aws/session"
  14. "github.com/aws/aws-sdk-go/service/dynamodb"
  15. "github.com/bluele/gcache"
  16. opentracing "github.com/opentracing/opentracing-go"
  17. otlog "github.com/opentracing/opentracing-go/log"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "github.com/weaveworks/common/instrument"
  21. "github.com/weaveworks/scope/app"
  22. "github.com/weaveworks/scope/report"
  23. )
  24. const (
  25. hourField = "hour"
  26. tsField = "ts"
  27. reportField = "report"
  28. natsTimeout = 10 * time.Second
  29. reportQuantisationInterval = 3 * time.Second
  30. // Grace period allows for some gap between the timestamp on reports
  31. // (assigned when they arrive at collector) and them appearing in DynamoDB query
  32. gracePeriod = 500 * time.Millisecond
  33. )
  34. var (
  35. dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
  36. Namespace: "scope",
  37. Name: "dynamo_request_duration_seconds",
  38. Help: "Time in seconds spent doing DynamoDB requests.",
  39. Buckets: prometheus.DefBuckets,
  40. }, []string{"method", "status_code"})
  41. dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
  42. Namespace: "scope",
  43. Name: "dynamo_consumed_capacity_total",
  44. Help: "Total count of capacity units consumed per operation.",
  45. }, []string{"method"})
  46. dynamoValueSize = prometheus.NewCounterVec(prometheus.CounterOpts{
  47. Namespace: "scope",
  48. Name: "dynamo_value_size_bytes_total",
  49. Help: "Total size of data read / written from DynamoDB in bytes.",
  50. }, []string{"method"})
  51. inProcessCacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
  52. Namespace: "scope",
  53. Name: "in_process_cache_requests_total",
  54. Help: "Total count of reports requested from the in-process cache.",
  55. })
  56. inProcessCacheHits = prometheus.NewCounter(prometheus.CounterOpts{
  57. Namespace: "scope",
  58. Name: "in_process_cache_hits_total",
  59. Help: "Total count of reports found in the in-process cache.",
  60. })
  61. reportSizeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
  62. Namespace: "scope",
  63. Name: "report_size_bytes",
  64. Help: "Distribution of memcache report sizes",
  65. Buckets: prometheus.ExponentialBuckets(4096, 2.0, 10),
  66. })
  67. reportsPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
  68. Namespace: "scope",
  69. Name: "reports_stored_total",
  70. Help: "Total count of stored reports per user.",
  71. }, []string{"user"})
  72. reportSizePerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
  73. Namespace: "scope",
  74. Name: "reports_bytes_total",
  75. Help: "Total bytes stored in reports per user.",
  76. }, []string{"user"})
  77. flushDuration = instrument.NewHistogramCollectorFromOpts(prometheus.HistogramOpts{
  78. Namespace: "scope",
  79. Name: "flush_duration_seconds",
  80. Help: "Time in seconds spent flushing merged reports.",
  81. Buckets: prometheus.DefBuckets,
  82. })
  83. )
  84. func registerAWSCollectorMetrics() {
  85. prometheus.MustRegister(dynamoRequestDuration)
  86. prometheus.MustRegister(dynamoConsumedCapacity)
  87. prometheus.MustRegister(dynamoValueSize)
  88. prometheus.MustRegister(inProcessCacheRequests)
  89. prometheus.MustRegister(inProcessCacheHits)
  90. prometheus.MustRegister(reportSizeHistogram)
  91. prometheus.MustRegister(reportsPerUser)
  92. prometheus.MustRegister(reportSizePerUser)
  93. flushDuration.Register()
  94. }
  95. var registerAWSCollectorMetricsOnce sync.Once
  96. // AWSCollector is a Collector which can also CreateTables
  97. type AWSCollector interface {
  98. app.Collector
  99. CreateTables() error
  100. }
  101. // ReportStore is a thing that we can get reports from.
  102. type ReportStore interface {
  103. FetchReports(context.Context, []string) (map[string]report.Report, []string, error)
  104. }
  105. // AWSCollectorConfig has everything we need to make an AWS collector.
  106. type AWSCollectorConfig struct {
  107. DynamoDBConfig *aws.Config
  108. DynamoTable string
  109. S3Store *S3Store
  110. }
  111. type awsCollector struct {
  112. liveCollector
  113. awsCfg AWSCollectorConfig
  114. db *dynamodb.DynamoDB
  115. inProcess inProcessStore
  116. }
  117. // Shortcut reports:
  118. // When the UI connects a WS to the query service, a goroutine periodically
  119. // published rendered reports to that ws. This process can be interrupted by
  120. // "shortcut" reports, causing the query service to push a render report
  121. // immediately. This whole process is controlled by the aforementioned
  122. // goroutine registering a channel with the collector. We store these
  123. // registered channels in a map keyed by the userid and the channel itself,
  124. // which in go is hashable. We then listen on a NATS topic for any shortcut
  125. // reports coming from the collection service.
  126. type watchKey struct {
  127. userid string
  128. c chan struct{}
  129. }
  130. // NewAWSCollector the elastic reaper of souls
  131. // https://github.com/aws/aws-sdk-go/wiki/common-examples
  132. func NewAWSCollector(liveConfig LiveCollectorConfig, config AWSCollectorConfig) (AWSCollector, error) {
  133. registerAWSCollectorMetricsOnce.Do(registerAWSCollectorMetrics)
  134. // (window * report rate) * number of hosts per user * number of users
  135. reportCacheSize := (int(liveConfig.Window.Seconds()) / 3) * 10 * 5
  136. c := &awsCollector{
  137. liveCollector: liveCollector{cfg: liveConfig},
  138. awsCfg: config,
  139. db: dynamodb.New(session.New(config.DynamoDBConfig)),
  140. inProcess: newInProcessStore(reportCacheSize, liveConfig.Window+reportQuantisationInterval),
  141. }
  142. err := c.liveCollector.init()
  143. if err != nil {
  144. return nil, err
  145. }
  146. c.tickCallbacks = append(c.tickCallbacks, c.flushPending)
  147. return c, nil
  148. }
  149. // Range over all users (instances) that have pending reports and send to store
  150. func (c *awsCollector) flushPending(ctx context.Context) {
  151. instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error {
  152. type queueEntry struct {
  153. userid string
  154. buf []byte
  155. }
  156. queue := make(chan queueEntry)
  157. const numParallel = 10
  158. var group sync.WaitGroup
  159. group.Add(numParallel)
  160. // Run n parallel goroutines fetching reports from the queue and flushing them
  161. for i := 0; i < numParallel; i++ {
  162. go func() {
  163. for entry := range queue {
  164. rowKey, colKey, reportKey := calculateReportKeys(entry.userid, time.Now())
  165. err := c.persistReport(ctx, entry.userid, rowKey, colKey, reportKey, entry.buf)
  166. if err != nil {
  167. log.Errorf("Could not persist combined report: %v", err)
  168. }
  169. }
  170. group.Done()
  171. }()
  172. }
  173. c.pending.Range(func(key, value interface{}) bool {
  174. userid := key.(string)
  175. entry := value.(*pendingEntry)
  176. entry.Lock()
  177. rpt := entry.older[0]
  178. entry.Unlock()
  179. if rpt != nil {
  180. // serialise reports on one goroutine to limit CPU usage
  181. buf, err := rpt.WriteBinary()
  182. if err != nil {
  183. log.Errorf("Could not serialise combined report: %v", err)
  184. return true
  185. }
  186. queue <- queueEntry{userid: userid, buf: buf.Bytes()}
  187. }
  188. return true
  189. })
  190. close(queue)
  191. group.Wait()
  192. return nil
  193. })
  194. }
  195. // Close will flush pending data
  196. func (c *awsCollector) Close() {
  197. c.liveCollector.Close()
  198. c.flushPending(context.Background())
  199. }
  200. // CreateTables creates the required tables in dynamodb
  201. func (c *awsCollector) CreateTables() error {
  202. // see if tableName exists
  203. resp, err := c.db.ListTables(&dynamodb.ListTablesInput{
  204. Limit: aws.Int64(10),
  205. })
  206. if err != nil {
  207. return err
  208. }
  209. for _, s := range resp.TableNames {
  210. if *s == c.awsCfg.DynamoTable {
  211. return nil
  212. }
  213. }
  214. params := &dynamodb.CreateTableInput{
  215. TableName: aws.String(c.awsCfg.DynamoTable),
  216. AttributeDefinitions: []*dynamodb.AttributeDefinition{
  217. {
  218. AttributeName: aws.String(hourField),
  219. AttributeType: aws.String("S"),
  220. },
  221. {
  222. AttributeName: aws.String(tsField),
  223. AttributeType: aws.String("N"),
  224. },
  225. // Don't need to specify non-key attributes in schema
  226. //{
  227. // AttributeName: aws.String(reportField),
  228. // AttributeType: aws.String("S"),
  229. //},
  230. },
  231. KeySchema: []*dynamodb.KeySchemaElement{
  232. {
  233. AttributeName: aws.String(hourField),
  234. KeyType: aws.String("HASH"),
  235. },
  236. {
  237. AttributeName: aws.String(tsField),
  238. KeyType: aws.String("RANGE"),
  239. },
  240. },
  241. ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
  242. ReadCapacityUnits: aws.Int64(10),
  243. WriteCapacityUnits: aws.Int64(5),
  244. },
  245. }
  246. log.Infof("Creating table %s", c.awsCfg.DynamoTable)
  247. _, err = c.db.CreateTable(params)
  248. return err
  249. }
  250. type keyInfo struct {
  251. key string
  252. ts int64
  253. }
  254. // reportKeysInRange returns the s3 keys for reports in the specified range
  255. func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]keyInfo, error) {
  256. rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
  257. var resp *dynamodb.QueryOutput
  258. err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
  259. var err error
  260. resp, err = c.db.Query(&dynamodb.QueryInput{
  261. TableName: aws.String(c.awsCfg.DynamoTable),
  262. KeyConditions: map[string]*dynamodb.Condition{
  263. hourField: {
  264. AttributeValueList: []*dynamodb.AttributeValue{
  265. {S: aws.String(rowKey)},
  266. },
  267. ComparisonOperator: aws.String("EQ"),
  268. },
  269. tsField: {
  270. AttributeValueList: []*dynamodb.AttributeValue{
  271. {N: aws.String(strconv.FormatInt(start.UnixNano(), 10))},
  272. {N: aws.String(strconv.FormatInt(end.UnixNano(), 10))},
  273. },
  274. ComparisonOperator: aws.String("BETWEEN"),
  275. },
  276. },
  277. ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
  278. })
  279. return err
  280. })
  281. if resp.ConsumedCapacity != nil {
  282. dynamoConsumedCapacity.WithLabelValues("Query").
  283. Add(float64(*resp.ConsumedCapacity.CapacityUnits))
  284. }
  285. if err != nil {
  286. return nil, err
  287. }
  288. result := []keyInfo{}
  289. for _, item := range resp.Items {
  290. reportKey := item[reportField].S
  291. tsValue := item[tsField].N
  292. if reportKey == nil || tsValue == nil {
  293. log.Errorf("Empty row!")
  294. continue
  295. }
  296. dynamoValueSize.WithLabelValues("BatchGetItem").
  297. Add(float64(len(*reportKey)))
  298. ts, _ := strconv.ParseInt(*tsValue, 10, 64)
  299. result = append(result, keyInfo{key: *reportKey, ts: ts})
  300. }
  301. return result, nil
  302. }
  303. // getReportKeys returns the S3 for reports in the interval [start, end].
  304. func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]keyInfo, error) {
  305. var (
  306. rowStart = start.UnixNano() / time.Hour.Nanoseconds()
  307. rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
  308. err error
  309. )
  310. // Queries will only every span 2 rows max.
  311. var reportKeys []keyInfo
  312. if rowStart != rowEnd {
  313. reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end)
  314. if err != nil {
  315. return nil, err
  316. }
  317. reportKeys2, err := c.reportKeysInRange(ctx, userid, rowEnd, start, end)
  318. if err != nil {
  319. return nil, err
  320. }
  321. reportKeys = append(reportKeys, reportKeys1...)
  322. reportKeys = append(reportKeys, reportKeys2...)
  323. } else {
  324. if reportKeys, err = c.reportKeysInRange(ctx, userid, rowEnd, start, end); err != nil {
  325. return nil, err
  326. }
  327. }
  328. return reportKeys, nil
  329. }
  330. func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys []string) ([]report.Report, error) {
  331. missing := reportKeys
  332. stores := []ReportStore{c.inProcess}
  333. if c.cfg.MemcacheClient != nil {
  334. stores = append(stores, c.cfg.MemcacheClient)
  335. }
  336. stores = append(stores, c.awsCfg.S3Store)
  337. var reports []report.Report
  338. for _, store := range stores {
  339. if store == nil {
  340. continue
  341. }
  342. found, newMissing, err := store.FetchReports(ctx, missing)
  343. missing = newMissing
  344. if err != nil {
  345. log.Warningf("Error fetching from cache: %v", err)
  346. }
  347. for key, report := range found {
  348. report = c.massageReport(userid, report)
  349. c.inProcess.StoreReport(key, report)
  350. reports = append(reports, report)
  351. }
  352. if len(missing) == 0 {
  353. return reports, nil
  354. }
  355. }
  356. if len(missing) > 0 {
  357. return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing)
  358. }
  359. return reports, nil
  360. }
  361. // If we are running as a Query service, fetch data and merge into a report
  362. // If we are running as a Collector and the request is for live data, merge in-memory data and return
  363. func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
  364. span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
  365. defer span.Finish()
  366. userid, err := c.cfg.UserIDer(ctx)
  367. if err != nil {
  368. return report.MakeReport(), err
  369. }
  370. span.SetTag("userid", userid)
  371. var reports []report.Report
  372. if time.Since(timestamp) < c.cfg.Window {
  373. reports, err = c.reportsFromLive(ctx, userid)
  374. } else {
  375. reports, err = c.reportsFromStore(ctx, userid, timestamp)
  376. }
  377. if err != nil {
  378. return report.MakeReport(), err
  379. }
  380. span.LogFields(otlog.Int("merging", len(reports)))
  381. return c.merger.Merge(reports), nil
  382. }
  383. /*
  384. Given a timestamp in the past, fetch reports within the window from store or cache
  385. S3 stores original reports from one probe at the timestamp they arrived at collector.
  386. Collector also sends every report to memcached.
  387. The in-memory cache stores:
  388. - individual reports deserialised, under S3 key for report
  389. - sets of reports in interval [t,t+3) merged, under key "instance:t"
  390. - so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
  391. */
  392. func (c *awsCollector) reportsFromStore(ctx context.Context, userid string, timestamp time.Time) ([]report.Report, error) {
  393. span := opentracing.SpanFromContext(ctx)
  394. end := timestamp
  395. start := end.Add(-c.cfg.Window)
  396. reportKeys, err := c.getReportKeys(ctx, userid, start, end)
  397. if err != nil {
  398. return nil, err
  399. }
  400. span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String()))
  401. var reports []report.Report
  402. // Fetch a merged report for each time quantum covering the window
  403. startTS, endTS := start.UnixNano(), end.UnixNano()
  404. ts := startTS - (startTS % reportQuantisationInterval.Nanoseconds())
  405. for ; ts+(reportQuantisationInterval+gracePeriod).Nanoseconds() < endTS; ts += reportQuantisationInterval.Nanoseconds() {
  406. quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts)
  407. if err != nil {
  408. return nil, err
  409. }
  410. reports = append(reports, quantumReport)
  411. }
  412. // Fetch individual reports for the period after the last quantum
  413. last, err := c.reportsForKeysInRange(ctx, userid, reportKeys, ts, endTS)
  414. if err != nil {
  415. return nil, err
  416. }
  417. reports = append(reports, last...)
  418. return reports, nil
  419. }
  420. // Fetch a merged report either from cache or from store which we put in cache
  421. func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, reportKeys []keyInfo, start int64) (report.Report, error) {
  422. key := fmt.Sprintf("%s:%d", userid, start)
  423. cached, _, err := c.inProcess.FetchReports(ctx, []string{key})
  424. if len(cached) == 1 {
  425. return cached[key], nil
  426. }
  427. reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start, start+reportQuantisationInterval.Nanoseconds())
  428. if err != nil {
  429. return report.MakeReport(), err
  430. }
  431. merged := c.merger.Merge(reports)
  432. c.inProcess.StoreReport(key, merged)
  433. return merged, nil
  434. }
  435. // Find the keys relating to this time period then fetch from memcached and/or S3
  436. func (c *awsCollector) reportsForKeysInRange(ctx context.Context, userid string, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
  437. var keys []string
  438. for _, k := range reportKeys {
  439. if k.ts >= start && k.ts < end {
  440. keys = append(keys, k.key)
  441. }
  442. }
  443. if span := opentracing.SpanFromContext(ctx); span != nil {
  444. span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end))
  445. }
  446. log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end)
  447. return c.getReports(ctx, userid, keys)
  448. }
  449. func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
  450. userid, err := c.cfg.UserIDer(ctx)
  451. if err != nil {
  452. return false, err
  453. }
  454. if time.Since(timestamp) < c.cfg.Window {
  455. has, err := c.hasReportsFromLive(ctx, userid)
  456. return has, err
  457. }
  458. start := timestamp.Add(-c.cfg.Window)
  459. reportKeys, err := c.getReportKeys(ctx, userid, start, timestamp)
  460. return len(reportKeys) > 0, err
  461. }
  462. func (c *awsCollector) HasHistoricReports() bool {
  463. return true
  464. }
  465. // AdminSummary returns a string with some internal information about
  466. // the report, which may be useful to troubleshoot.
  467. func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
  468. span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.AdminSummary")
  469. defer span.Finish()
  470. userid, err := c.cfg.UserIDer(ctx)
  471. if err != nil {
  472. return "", err
  473. }
  474. end := timestamp
  475. start := end.Add(-c.cfg.Window)
  476. reportKeys, err := c.getReportKeys(ctx, userid, start, end)
  477. if err != nil {
  478. return "", err
  479. }
  480. reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start.UnixNano(), end.UnixNano())
  481. if err != nil {
  482. return "", err
  483. }
  484. var b strings.Builder
  485. for i := range reports {
  486. // TODO: print the key - note reports may be in a different order from reportKeys
  487. b.WriteString(reports[i].Summary())
  488. b.WriteByte('\n')
  489. }
  490. return b.String(), nil
  491. }
  492. // calculateDynamoKeys generates the row & column keys for Dynamo.
  493. func calculateDynamoKeys(userid string, now time.Time) (string, string) {
  494. rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
  495. colKey := strconv.FormatInt(now.UnixNano(), 10)
  496. return rowKey, colKey
  497. }
  498. // calculateReportKeys returns DynamoDB row & col keys, and S3/memcached key that we will use for a report
  499. func calculateReportKeys(userid string, now time.Time) (string, string, string) {
  500. rowKey, colKey := calculateDynamoKeys(userid, now)
  501. rowKeyHash := md5.New()
  502. _, _ = io.WriteString(rowKeyHash, rowKey) // hash write doesn't error
  503. return rowKey, colKey, fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
  504. }
  505. func (c *awsCollector) persistReport(ctx context.Context, userid, rowKey, colKey, reportKey string, buf []byte) error {
  506. // Put in S3 and cache before index, so it is fetchable before it is discoverable
  507. reportSize, err := c.awsCfg.S3Store.StoreReportBytes(ctx, reportKey, buf)
  508. if err != nil {
  509. return err
  510. }
  511. if c.cfg.MemcacheClient != nil {
  512. _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
  513. if err != nil {
  514. // NOTE: We don't abort here because failing to store in memcache
  515. // doesn't actually break anything else -- it's just an
  516. // optimization.
  517. log.Warningf("Could not store %v in memcache: %v", reportKey, err)
  518. }
  519. }
  520. dynamoValueSize.WithLabelValues("PutItem").Add(float64(len(reportKey)))
  521. err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error {
  522. resp, err := c.putItemInDynamo(rowKey, colKey, reportKey)
  523. if resp.ConsumedCapacity != nil {
  524. dynamoConsumedCapacity.WithLabelValues("PutItem").
  525. Add(float64(*resp.ConsumedCapacity.CapacityUnits))
  526. }
  527. return err
  528. })
  529. if err != nil {
  530. return err
  531. }
  532. reportSizeHistogram.Observe(float64(reportSize))
  533. reportSizePerUser.WithLabelValues(userid).Add(float64(reportSize))
  534. reportsPerUser.WithLabelValues(userid).Inc()
  535. return nil
  536. }
  537. func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynamodb.PutItemOutput, error) {
  538. // Back off on ProvisionedThroughputExceededException
  539. const (
  540. maxRetries = 5
  541. throuputExceededError = "ProvisionedThroughputExceededException"
  542. )
  543. var (
  544. resp *dynamodb.PutItemOutput
  545. err error
  546. retries = 0
  547. backoff = 50 * time.Millisecond
  548. )
  549. for {
  550. resp, err = c.db.PutItem(&dynamodb.PutItemInput{
  551. TableName: aws.String(c.awsCfg.DynamoTable),
  552. Item: map[string]*dynamodb.AttributeValue{
  553. hourField: {
  554. S: aws.String(rowKey),
  555. },
  556. tsField: {
  557. N: aws.String(colKey),
  558. },
  559. reportField: {
  560. S: aws.String(reportKey),
  561. },
  562. },
  563. ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
  564. })
  565. if err != nil && retries < maxRetries {
  566. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == throuputExceededError {
  567. time.Sleep(backoff)
  568. retries++
  569. backoff *= 2
  570. continue
  571. }
  572. }
  573. break
  574. }
  575. return resp, err
  576. }
  577. type inProcessStore struct {
  578. cache gcache.Cache
  579. }
  580. // newInProcessStore creates an in-process store for reports.
  581. func newInProcessStore(size int, expiration time.Duration) inProcessStore {
  582. return inProcessStore{gcache.New(size).LRU().Expiration(expiration).Build()}
  583. }
  584. // FetchReports retrieves the given reports from the store.
  585. func (c inProcessStore) FetchReports(_ context.Context, keys []string) (map[string]report.Report, []string, error) {
  586. found := map[string]report.Report{}
  587. missing := []string{}
  588. for _, key := range keys {
  589. rpt, err := c.cache.Get(key)
  590. if err == nil {
  591. found[key] = rpt.(report.Report)
  592. } else {
  593. missing = append(missing, key)
  594. }
  595. }
  596. inProcessCacheHits.Add(float64(len(found)))
  597. inProcessCacheRequests.Add(float64(len(keys)))
  598. return found, missing, nil
  599. }
  600. // StoreReport stores a report in the store.
  601. func (c inProcessStore) StoreReport(key string, report report.Report) {
  602. c.cache.Set(key, report)
  603. }