123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659 |
- package multitenant
- import (
- "context"
- "crypto/md5"
- "fmt"
- "io"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/dynamodb"
- "github.com/bluele/gcache"
- opentracing "github.com/opentracing/opentracing-go"
- otlog "github.com/opentracing/opentracing-go/log"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "github.com/weaveworks/common/instrument"
- "github.com/weaveworks/scope/app"
- "github.com/weaveworks/scope/report"
- )
- const (
- hourField = "hour"
- tsField = "ts"
- reportField = "report"
- natsTimeout = 10 * time.Second
- reportQuantisationInterval = 3 * time.Second
- // Grace period allows for some gap between the timestamp on reports
- // (assigned when they arrive at collector) and them appearing in DynamoDB query
- gracePeriod = 500 * time.Millisecond
- )
- var (
- dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
- Namespace: "scope",
- Name: "dynamo_request_duration_seconds",
- Help: "Time in seconds spent doing DynamoDB requests.",
- Buckets: prometheus.DefBuckets,
- }, []string{"method", "status_code"})
- dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "dynamo_consumed_capacity_total",
- Help: "Total count of capacity units consumed per operation.",
- }, []string{"method"})
- dynamoValueSize = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "dynamo_value_size_bytes_total",
- Help: "Total size of data read / written from DynamoDB in bytes.",
- }, []string{"method"})
- inProcessCacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "in_process_cache_requests_total",
- Help: "Total count of reports requested from the in-process cache.",
- })
- inProcessCacheHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "in_process_cache_hits_total",
- Help: "Total count of reports found in the in-process cache.",
- })
- reportSizeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
- Namespace: "scope",
- Name: "report_size_bytes",
- Help: "Distribution of memcache report sizes",
- Buckets: prometheus.ExponentialBuckets(4096, 2.0, 10),
- })
- reportsPerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "reports_stored_total",
- Help: "Total count of stored reports per user.",
- }, []string{"user"})
- reportSizePerUser = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "reports_bytes_total",
- Help: "Total bytes stored in reports per user.",
- }, []string{"user"})
- flushDuration = instrument.NewHistogramCollectorFromOpts(prometheus.HistogramOpts{
- Namespace: "scope",
- Name: "flush_duration_seconds",
- Help: "Time in seconds spent flushing merged reports.",
- Buckets: prometheus.DefBuckets,
- })
- )
- func registerAWSCollectorMetrics() {
- prometheus.MustRegister(dynamoRequestDuration)
- prometheus.MustRegister(dynamoConsumedCapacity)
- prometheus.MustRegister(dynamoValueSize)
- prometheus.MustRegister(inProcessCacheRequests)
- prometheus.MustRegister(inProcessCacheHits)
- prometheus.MustRegister(reportSizeHistogram)
- prometheus.MustRegister(reportsPerUser)
- prometheus.MustRegister(reportSizePerUser)
- flushDuration.Register()
- }
- var registerAWSCollectorMetricsOnce sync.Once
- // AWSCollector is a Collector which can also CreateTables
- type AWSCollector interface {
- app.Collector
- CreateTables() error
- }
- // ReportStore is a thing that we can get reports from.
- type ReportStore interface {
- FetchReports(context.Context, []string) (map[string]report.Report, []string, error)
- }
- // AWSCollectorConfig has everything we need to make an AWS collector.
- type AWSCollectorConfig struct {
- DynamoDBConfig *aws.Config
- DynamoTable string
- S3Store *S3Store
- }
- type awsCollector struct {
- liveCollector
- awsCfg AWSCollectorConfig
- db *dynamodb.DynamoDB
- inProcess inProcessStore
- }
- // Shortcut reports:
- // When the UI connects a WS to the query service, a goroutine periodically
- // published rendered reports to that ws. This process can be interrupted by
- // "shortcut" reports, causing the query service to push a render report
- // immediately. This whole process is controlled by the aforementioned
- // goroutine registering a channel with the collector. We store these
- // registered channels in a map keyed by the userid and the channel itself,
- // which in go is hashable. We then listen on a NATS topic for any shortcut
- // reports coming from the collection service.
- type watchKey struct {
- userid string
- c chan struct{}
- }
- // NewAWSCollector the elastic reaper of souls
- // https://github.com/aws/aws-sdk-go/wiki/common-examples
- func NewAWSCollector(liveConfig LiveCollectorConfig, config AWSCollectorConfig) (AWSCollector, error) {
- registerAWSCollectorMetricsOnce.Do(registerAWSCollectorMetrics)
- // (window * report rate) * number of hosts per user * number of users
- reportCacheSize := (int(liveConfig.Window.Seconds()) / 3) * 10 * 5
- c := &awsCollector{
- liveCollector: liveCollector{cfg: liveConfig},
- awsCfg: config,
- db: dynamodb.New(session.New(config.DynamoDBConfig)),
- inProcess: newInProcessStore(reportCacheSize, liveConfig.Window+reportQuantisationInterval),
- }
- err := c.liveCollector.init()
- if err != nil {
- return nil, err
- }
- c.tickCallbacks = append(c.tickCallbacks, c.flushPending)
- return c, nil
- }
- // Range over all users (instances) that have pending reports and send to store
- func (c *awsCollector) flushPending(ctx context.Context) {
- instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error {
- type queueEntry struct {
- userid string
- buf []byte
- }
- queue := make(chan queueEntry)
- const numParallel = 10
- var group sync.WaitGroup
- group.Add(numParallel)
- // Run n parallel goroutines fetching reports from the queue and flushing them
- for i := 0; i < numParallel; i++ {
- go func() {
- for entry := range queue {
- rowKey, colKey, reportKey := calculateReportKeys(entry.userid, time.Now())
- err := c.persistReport(ctx, entry.userid, rowKey, colKey, reportKey, entry.buf)
- if err != nil {
- log.Errorf("Could not persist combined report: %v", err)
- }
- }
- group.Done()
- }()
- }
- c.pending.Range(func(key, value interface{}) bool {
- userid := key.(string)
- entry := value.(*pendingEntry)
- entry.Lock()
- rpt := entry.older[0]
- entry.Unlock()
- if rpt != nil {
- // serialise reports on one goroutine to limit CPU usage
- buf, err := rpt.WriteBinary()
- if err != nil {
- log.Errorf("Could not serialise combined report: %v", err)
- return true
- }
- queue <- queueEntry{userid: userid, buf: buf.Bytes()}
- }
- return true
- })
- close(queue)
- group.Wait()
- return nil
- })
- }
- // Close will flush pending data
- func (c *awsCollector) Close() {
- c.liveCollector.Close()
- c.flushPending(context.Background())
- }
- // CreateTables creates the required tables in dynamodb
- func (c *awsCollector) CreateTables() error {
- // see if tableName exists
- resp, err := c.db.ListTables(&dynamodb.ListTablesInput{
- Limit: aws.Int64(10),
- })
- if err != nil {
- return err
- }
- for _, s := range resp.TableNames {
- if *s == c.awsCfg.DynamoTable {
- return nil
- }
- }
- params := &dynamodb.CreateTableInput{
- TableName: aws.String(c.awsCfg.DynamoTable),
- AttributeDefinitions: []*dynamodb.AttributeDefinition{
- {
- AttributeName: aws.String(hourField),
- AttributeType: aws.String("S"),
- },
- {
- AttributeName: aws.String(tsField),
- AttributeType: aws.String("N"),
- },
- // Don't need to specify non-key attributes in schema
- //{
- // AttributeName: aws.String(reportField),
- // AttributeType: aws.String("S"),
- //},
- },
- KeySchema: []*dynamodb.KeySchemaElement{
- {
- AttributeName: aws.String(hourField),
- KeyType: aws.String("HASH"),
- },
- {
- AttributeName: aws.String(tsField),
- KeyType: aws.String("RANGE"),
- },
- },
- ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
- ReadCapacityUnits: aws.Int64(10),
- WriteCapacityUnits: aws.Int64(5),
- },
- }
- log.Infof("Creating table %s", c.awsCfg.DynamoTable)
- _, err = c.db.CreateTable(params)
- return err
- }
- type keyInfo struct {
- key string
- ts int64
- }
- // reportKeysInRange returns the s3 keys for reports in the specified range
- func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row int64, start, end time.Time) ([]keyInfo, error) {
- rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
- var resp *dynamodb.QueryOutput
- err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
- var err error
- resp, err = c.db.Query(&dynamodb.QueryInput{
- TableName: aws.String(c.awsCfg.DynamoTable),
- KeyConditions: map[string]*dynamodb.Condition{
- hourField: {
- AttributeValueList: []*dynamodb.AttributeValue{
- {S: aws.String(rowKey)},
- },
- ComparisonOperator: aws.String("EQ"),
- },
- tsField: {
- AttributeValueList: []*dynamodb.AttributeValue{
- {N: aws.String(strconv.FormatInt(start.UnixNano(), 10))},
- {N: aws.String(strconv.FormatInt(end.UnixNano(), 10))},
- },
- ComparisonOperator: aws.String("BETWEEN"),
- },
- },
- ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
- })
- return err
- })
- if resp.ConsumedCapacity != nil {
- dynamoConsumedCapacity.WithLabelValues("Query").
- Add(float64(*resp.ConsumedCapacity.CapacityUnits))
- }
- if err != nil {
- return nil, err
- }
- result := []keyInfo{}
- for _, item := range resp.Items {
- reportKey := item[reportField].S
- tsValue := item[tsField].N
- if reportKey == nil || tsValue == nil {
- log.Errorf("Empty row!")
- continue
- }
- dynamoValueSize.WithLabelValues("BatchGetItem").
- Add(float64(len(*reportKey)))
- ts, _ := strconv.ParseInt(*tsValue, 10, 64)
- result = append(result, keyInfo{key: *reportKey, ts: ts})
- }
- return result, nil
- }
- // getReportKeys returns the S3 for reports in the interval [start, end].
- func (c *awsCollector) getReportKeys(ctx context.Context, userid string, start, end time.Time) ([]keyInfo, error) {
- var (
- rowStart = start.UnixNano() / time.Hour.Nanoseconds()
- rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
- err error
- )
- // Queries will only every span 2 rows max.
- var reportKeys []keyInfo
- if rowStart != rowEnd {
- reportKeys1, err := c.reportKeysInRange(ctx, userid, rowStart, start, end)
- if err != nil {
- return nil, err
- }
- reportKeys2, err := c.reportKeysInRange(ctx, userid, rowEnd, start, end)
- if err != nil {
- return nil, err
- }
- reportKeys = append(reportKeys, reportKeys1...)
- reportKeys = append(reportKeys, reportKeys2...)
- } else {
- if reportKeys, err = c.reportKeysInRange(ctx, userid, rowEnd, start, end); err != nil {
- return nil, err
- }
- }
- return reportKeys, nil
- }
- func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys []string) ([]report.Report, error) {
- missing := reportKeys
- stores := []ReportStore{c.inProcess}
- if c.cfg.MemcacheClient != nil {
- stores = append(stores, c.cfg.MemcacheClient)
- }
- stores = append(stores, c.awsCfg.S3Store)
- var reports []report.Report
- for _, store := range stores {
- if store == nil {
- continue
- }
- found, newMissing, err := store.FetchReports(ctx, missing)
- missing = newMissing
- if err != nil {
- log.Warningf("Error fetching from cache: %v", err)
- }
- for key, report := range found {
- report = c.massageReport(userid, report)
- c.inProcess.StoreReport(key, report)
- reports = append(reports, report)
- }
- if len(missing) == 0 {
- return reports, nil
- }
- }
- if len(missing) > 0 {
- return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing)
- }
- return reports, nil
- }
- // If we are running as a Query service, fetch data and merge into a report
- // If we are running as a Collector and the request is for live data, merge in-memory data and return
- func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
- defer span.Finish()
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return report.MakeReport(), err
- }
- span.SetTag("userid", userid)
- var reports []report.Report
- if time.Since(timestamp) < c.cfg.Window {
- reports, err = c.reportsFromLive(ctx, userid)
- } else {
- reports, err = c.reportsFromStore(ctx, userid, timestamp)
- }
- if err != nil {
- return report.MakeReport(), err
- }
- span.LogFields(otlog.Int("merging", len(reports)))
- return c.merger.Merge(reports), nil
- }
- /*
- Given a timestamp in the past, fetch reports within the window from store or cache
- S3 stores original reports from one probe at the timestamp they arrived at collector.
- Collector also sends every report to memcached.
- The in-memory cache stores:
- - individual reports deserialised, under S3 key for report
- - sets of reports in interval [t,t+3) merged, under key "instance:t"
- - so to check the cache for reports from 14:31:00 to 14:31:15 you would request 5 keys 3 seconds apart
- */
- func (c *awsCollector) reportsFromStore(ctx context.Context, userid string, timestamp time.Time) ([]report.Report, error) {
- span := opentracing.SpanFromContext(ctx)
- end := timestamp
- start := end.Add(-c.cfg.Window)
- reportKeys, err := c.getReportKeys(ctx, userid, start, end)
- if err != nil {
- return nil, err
- }
- span.LogFields(otlog.Int("keys", len(reportKeys)), otlog.String("timestamp", timestamp.String()))
- var reports []report.Report
- // Fetch a merged report for each time quantum covering the window
- startTS, endTS := start.UnixNano(), end.UnixNano()
- ts := startTS - (startTS % reportQuantisationInterval.Nanoseconds())
- for ; ts+(reportQuantisationInterval+gracePeriod).Nanoseconds() < endTS; ts += reportQuantisationInterval.Nanoseconds() {
- quantumReport, err := c.reportForQuantum(ctx, userid, reportKeys, ts)
- if err != nil {
- return nil, err
- }
- reports = append(reports, quantumReport)
- }
- // Fetch individual reports for the period after the last quantum
- last, err := c.reportsForKeysInRange(ctx, userid, reportKeys, ts, endTS)
- if err != nil {
- return nil, err
- }
- reports = append(reports, last...)
- return reports, nil
- }
- // Fetch a merged report either from cache or from store which we put in cache
- func (c *awsCollector) reportForQuantum(ctx context.Context, userid string, reportKeys []keyInfo, start int64) (report.Report, error) {
- key := fmt.Sprintf("%s:%d", userid, start)
- cached, _, err := c.inProcess.FetchReports(ctx, []string{key})
- if len(cached) == 1 {
- return cached[key], nil
- }
- reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start, start+reportQuantisationInterval.Nanoseconds())
- if err != nil {
- return report.MakeReport(), err
- }
- merged := c.merger.Merge(reports)
- c.inProcess.StoreReport(key, merged)
- return merged, nil
- }
- // Find the keys relating to this time period then fetch from memcached and/or S3
- func (c *awsCollector) reportsForKeysInRange(ctx context.Context, userid string, reportKeys []keyInfo, start, end int64) ([]report.Report, error) {
- var keys []string
- for _, k := range reportKeys {
- if k.ts >= start && k.ts < end {
- keys = append(keys, k.key)
- }
- }
- if span := opentracing.SpanFromContext(ctx); span != nil {
- span.LogFields(otlog.Int("fetching", len(keys)), otlog.Int64("start", start), otlog.Int64("end", end))
- }
- log.Debugf("Fetching %d reports from %v to %v", len(keys), start, end)
- return c.getReports(ctx, userid, keys)
- }
- func (c *awsCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return false, err
- }
- if time.Since(timestamp) < c.cfg.Window {
- has, err := c.hasReportsFromLive(ctx, userid)
- return has, err
- }
- start := timestamp.Add(-c.cfg.Window)
- reportKeys, err := c.getReportKeys(ctx, userid, start, timestamp)
- return len(reportKeys) > 0, err
- }
- func (c *awsCollector) HasHistoricReports() bool {
- return true
- }
- // AdminSummary returns a string with some internal information about
- // the report, which may be useful to troubleshoot.
- func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.AdminSummary")
- defer span.Finish()
- userid, err := c.cfg.UserIDer(ctx)
- if err != nil {
- return "", err
- }
- end := timestamp
- start := end.Add(-c.cfg.Window)
- reportKeys, err := c.getReportKeys(ctx, userid, start, end)
- if err != nil {
- return "", err
- }
- reports, err := c.reportsForKeysInRange(ctx, userid, reportKeys, start.UnixNano(), end.UnixNano())
- if err != nil {
- return "", err
- }
- var b strings.Builder
- for i := range reports {
- // TODO: print the key - note reports may be in a different order from reportKeys
- b.WriteString(reports[i].Summary())
- b.WriteByte('\n')
- }
- return b.String(), nil
- }
- // calculateDynamoKeys generates the row & column keys for Dynamo.
- func calculateDynamoKeys(userid string, now time.Time) (string, string) {
- rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
- colKey := strconv.FormatInt(now.UnixNano(), 10)
- return rowKey, colKey
- }
- // calculateReportKeys returns DynamoDB row & col keys, and S3/memcached key that we will use for a report
- func calculateReportKeys(userid string, now time.Time) (string, string, string) {
- rowKey, colKey := calculateDynamoKeys(userid, now)
- rowKeyHash := md5.New()
- _, _ = io.WriteString(rowKeyHash, rowKey) // hash write doesn't error
- return rowKey, colKey, fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
- }
- func (c *awsCollector) persistReport(ctx context.Context, userid, rowKey, colKey, reportKey string, buf []byte) error {
- // Put in S3 and cache before index, so it is fetchable before it is discoverable
- reportSize, err := c.awsCfg.S3Store.StoreReportBytes(ctx, reportKey, buf)
- if err != nil {
- return err
- }
- if c.cfg.MemcacheClient != nil {
- _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
- if err != nil {
- // NOTE: We don't abort here because failing to store in memcache
- // doesn't actually break anything else -- it's just an
- // optimization.
- log.Warningf("Could not store %v in memcache: %v", reportKey, err)
- }
- }
- dynamoValueSize.WithLabelValues("PutItem").Add(float64(len(reportKey)))
- err = instrument.TimeRequestHistogram(ctx, "DynamoDB.PutItem", dynamoRequestDuration, func(_ context.Context) error {
- resp, err := c.putItemInDynamo(rowKey, colKey, reportKey)
- if resp.ConsumedCapacity != nil {
- dynamoConsumedCapacity.WithLabelValues("PutItem").
- Add(float64(*resp.ConsumedCapacity.CapacityUnits))
- }
- return err
- })
- if err != nil {
- return err
- }
- reportSizeHistogram.Observe(float64(reportSize))
- reportSizePerUser.WithLabelValues(userid).Add(float64(reportSize))
- reportsPerUser.WithLabelValues(userid).Inc()
- return nil
- }
- func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynamodb.PutItemOutput, error) {
- // Back off on ProvisionedThroughputExceededException
- const (
- maxRetries = 5
- throuputExceededError = "ProvisionedThroughputExceededException"
- )
- var (
- resp *dynamodb.PutItemOutput
- err error
- retries = 0
- backoff = 50 * time.Millisecond
- )
- for {
- resp, err = c.db.PutItem(&dynamodb.PutItemInput{
- TableName: aws.String(c.awsCfg.DynamoTable),
- Item: map[string]*dynamodb.AttributeValue{
- hourField: {
- S: aws.String(rowKey),
- },
- tsField: {
- N: aws.String(colKey),
- },
- reportField: {
- S: aws.String(reportKey),
- },
- },
- ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
- })
- if err != nil && retries < maxRetries {
- if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == throuputExceededError {
- time.Sleep(backoff)
- retries++
- backoff *= 2
- continue
- }
- }
- break
- }
- return resp, err
- }
- type inProcessStore struct {
- cache gcache.Cache
- }
- // newInProcessStore creates an in-process store for reports.
- func newInProcessStore(size int, expiration time.Duration) inProcessStore {
- return inProcessStore{gcache.New(size).LRU().Expiration(expiration).Build()}
- }
- // FetchReports retrieves the given reports from the store.
- func (c inProcessStore) FetchReports(_ context.Context, keys []string) (map[string]report.Report, []string, error) {
- found := map[string]report.Report{}
- missing := []string{}
- for _, key := range keys {
- rpt, err := c.cache.Get(key)
- if err == nil {
- found[key] = rpt.(report.Report)
- } else {
- missing = append(missing, key)
- }
- }
- inProcessCacheHits.Add(float64(len(found)))
- inProcessCacheRequests.Add(float64(len(keys)))
- return found, missing, nil
- }
- // StoreReport stores a report in the store.
- func (c inProcessStore) StoreReport(key string, report report.Report) {
- c.cache.Set(key, report)
- }
|