123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package multitenant
- import (
- "bytes"
- "fmt"
- "net"
- "sort"
- "sync"
- "time"
- "context"
- "github.com/bradfitz/gomemcache/memcache"
- opentracing "github.com/opentracing/opentracing-go"
- otlog "github.com/opentracing/opentracing-go/log"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "github.com/weaveworks/common/instrument"
- "github.com/weaveworks/scope/report"
- )
- var (
- memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "memcache_requests_total",
- Help: "Total count of reports requested from memcache that were not found in our in-memory cache.",
- })
- memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "scope",
- Name: "memcache_hits_total",
- Help: "Total count of reports found in memcache that were not found in our in-memory cache.",
- })
- memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
- Namespace: "scope",
- Name: "memcache_request_duration_seconds",
- Help: "Total time spent in seconds doing memcache requests.",
- Buckets: prometheus.DefBuckets,
- }, []string{"method", "status_code"})
- )
- func registerMemcacheClientMetrics() {
- prometheus.MustRegister(memcacheRequests)
- prometheus.MustRegister(memcacheHits)
- prometheus.MustRegister(memcacheRequestDuration)
- }
- var registerMemcacheClientMetricsOnce sync.Once
- // MemcacheClient is a memcache client that gets its server list from SRV
- // records, and periodically updates that ServerList.
- type MemcacheClient struct {
- client *memcache.Client
- serverList *memcache.ServerList
- expiration int32
- hostname string
- service string
- compressionLevel int
- quit chan struct{}
- wait sync.WaitGroup
- }
- // MemcacheConfig defines how a MemcacheClient should be constructed.
- type MemcacheConfig struct {
- Host string
- Service string
- Timeout time.Duration
- UpdateInterval time.Duration
- Expiration time.Duration
- CompressionLevel int
- }
- // NewMemcacheClient creates a new MemcacheClient that gets its server list
- // from SRV and updates the server list on a regular basis.
- func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
- registerMemcacheClientMetricsOnce.Do(registerMemcacheClientMetrics)
- var servers memcache.ServerList
- client := memcache.NewFromSelector(&servers)
- client.Timeout = config.Timeout
- newClient := &MemcacheClient{
- client: client,
- serverList: &servers,
- expiration: int32(config.Expiration.Seconds()),
- hostname: config.Host,
- service: config.Service,
- compressionLevel: config.CompressionLevel,
- quit: make(chan struct{}),
- }
- err := newClient.updateMemcacheServers()
- if err != nil {
- log.Errorf("Error setting memcache servers to '%v': %v", config.Host, err)
- }
- newClient.wait.Add(1)
- go newClient.updateLoop(config.UpdateInterval)
- return newClient
- }
- // Stop the memcache client.
- func (c *MemcacheClient) Stop() {
- close(c.quit)
- c.wait.Wait()
- }
- func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error {
- defer c.wait.Done()
- ticker := time.NewTicker(updateInterval)
- var err error
- for {
- select {
- case <-ticker.C:
- err = c.updateMemcacheServers()
- if err != nil {
- log.Warningf("Error updating memcache servers: %v", err)
- }
- case <-c.quit:
- ticker.Stop()
- }
- }
- }
- // updateMemcacheServers sets a memcache server list from SRV records. SRV
- // priority & weight are ignored.
- func (c *MemcacheClient) updateMemcacheServers() error {
- _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname)
- if err != nil {
- return err
- }
- var servers []string
- for _, srv := range addrs {
- servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port))
- }
- // ServerList deterministically maps keys to _index_ of the server list.
- // Since DNS returns records in different order each time, we sort to
- // guarantee best possible match between nodes.
- sort.Strings(servers)
- return c.serverList.SetServers(servers...)
- }
- func memcacheStatusCode(err error) string {
- // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
- switch err {
- case nil:
- return "200"
- case memcache.ErrCacheMiss:
- return "404"
- case memcache.ErrMalformedKey:
- return "400"
- default:
- return "500"
- }
- }
- // FetchReports gets reports from memcache.
- func (c *MemcacheClient) FetchReports(ctx context.Context, keys []string) (map[string]report.Report, []string, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "Memcache.FetchReports")
- defer span.Finish()
- defer memcacheRequests.Add(float64(len(keys)))
- var found map[string]*memcache.Item
- err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.GetMulti", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
- var err error
- found, err = c.client.GetMulti(keys)
- return err
- })
- span.LogFields(otlog.Int("keys", len(keys)), otlog.Int("hits", len(found)))
- if err != nil {
- return nil, keys, err
- }
- // Decode all the reports in parallel.
- type result struct {
- key string
- report *report.Report
- }
- ch := make(chan result, len(keys))
- var missing []string
- for _, key := range keys {
- item, ok := found[key]
- if !ok {
- missing = append(missing, key)
- continue
- }
- go func(key string) {
- rep, err := report.MakeFromBinary(ctx, bytes.NewBuffer(item.Value), true, true)
- if err != nil {
- log.Warningf("Corrupt report in memcache %v: %v", key, err)
- ch <- result{key: key}
- return
- }
- ch <- result{key: key, report: rep}
- }(key)
- }
- reports := map[string]report.Report{}
- lenFound := len(keys) - len(missing)
- for i := 0; i < lenFound; i++ {
- r := <-ch
- if r.report == nil {
- missing = append(missing, r.key)
- } else {
- reports[r.key] = *r.report
- }
- }
- if len(missing) > 0 {
- sort.Strings(missing)
- log.Warningf("Missing %d reports from memcache: %v", len(missing), missing)
- }
- memcacheHits.Add(float64(len(reports)))
- return reports, missing, nil
- }
- // StoreReportBytes stores a report.
- func (c *MemcacheClient) StoreReportBytes(ctx context.Context, key string, rpt []byte) (int, error) {
- err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
- item := memcache.Item{Key: key, Value: rpt, Expiration: c.expiration}
- return c.client.Set(&item)
- })
- return len(rpt), err
- }
|