memcache_client.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package multitenant
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sort"
  7. "sync"
  8. "time"
  9. "context"
  10. "github.com/bradfitz/gomemcache/memcache"
  11. opentracing "github.com/opentracing/opentracing-go"
  12. otlog "github.com/opentracing/opentracing-go/log"
  13. "github.com/prometheus/client_golang/prometheus"
  14. log "github.com/sirupsen/logrus"
  15. "github.com/weaveworks/common/instrument"
  16. "github.com/weaveworks/scope/report"
  17. )
  18. var (
  19. memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
  20. Namespace: "scope",
  21. Name: "memcache_requests_total",
  22. Help: "Total count of reports requested from memcache that were not found in our in-memory cache.",
  23. })
  24. memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{
  25. Namespace: "scope",
  26. Name: "memcache_hits_total",
  27. Help: "Total count of reports found in memcache that were not found in our in-memory cache.",
  28. })
  29. memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
  30. Namespace: "scope",
  31. Name: "memcache_request_duration_seconds",
  32. Help: "Total time spent in seconds doing memcache requests.",
  33. Buckets: prometheus.DefBuckets,
  34. }, []string{"method", "status_code"})
  35. )
  36. func registerMemcacheClientMetrics() {
  37. prometheus.MustRegister(memcacheRequests)
  38. prometheus.MustRegister(memcacheHits)
  39. prometheus.MustRegister(memcacheRequestDuration)
  40. }
  41. var registerMemcacheClientMetricsOnce sync.Once
  42. // MemcacheClient is a memcache client that gets its server list from SRV
  43. // records, and periodically updates that ServerList.
  44. type MemcacheClient struct {
  45. client *memcache.Client
  46. serverList *memcache.ServerList
  47. expiration int32
  48. hostname string
  49. service string
  50. compressionLevel int
  51. quit chan struct{}
  52. wait sync.WaitGroup
  53. }
  54. // MemcacheConfig defines how a MemcacheClient should be constructed.
  55. type MemcacheConfig struct {
  56. Host string
  57. Service string
  58. Timeout time.Duration
  59. UpdateInterval time.Duration
  60. Expiration time.Duration
  61. CompressionLevel int
  62. }
  63. // NewMemcacheClient creates a new MemcacheClient that gets its server list
  64. // from SRV and updates the server list on a regular basis.
  65. func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
  66. registerMemcacheClientMetricsOnce.Do(registerMemcacheClientMetrics)
  67. var servers memcache.ServerList
  68. client := memcache.NewFromSelector(&servers)
  69. client.Timeout = config.Timeout
  70. newClient := &MemcacheClient{
  71. client: client,
  72. serverList: &servers,
  73. expiration: int32(config.Expiration.Seconds()),
  74. hostname: config.Host,
  75. service: config.Service,
  76. compressionLevel: config.CompressionLevel,
  77. quit: make(chan struct{}),
  78. }
  79. err := newClient.updateMemcacheServers()
  80. if err != nil {
  81. log.Errorf("Error setting memcache servers to '%v': %v", config.Host, err)
  82. }
  83. newClient.wait.Add(1)
  84. go newClient.updateLoop(config.UpdateInterval)
  85. return newClient
  86. }
  87. // Stop the memcache client.
  88. func (c *MemcacheClient) Stop() {
  89. close(c.quit)
  90. c.wait.Wait()
  91. }
  92. func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error {
  93. defer c.wait.Done()
  94. ticker := time.NewTicker(updateInterval)
  95. var err error
  96. for {
  97. select {
  98. case <-ticker.C:
  99. err = c.updateMemcacheServers()
  100. if err != nil {
  101. log.Warningf("Error updating memcache servers: %v", err)
  102. }
  103. case <-c.quit:
  104. ticker.Stop()
  105. }
  106. }
  107. }
  108. // updateMemcacheServers sets a memcache server list from SRV records. SRV
  109. // priority & weight are ignored.
  110. func (c *MemcacheClient) updateMemcacheServers() error {
  111. _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname)
  112. if err != nil {
  113. return err
  114. }
  115. var servers []string
  116. for _, srv := range addrs {
  117. servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port))
  118. }
  119. // ServerList deterministically maps keys to _index_ of the server list.
  120. // Since DNS returns records in different order each time, we sort to
  121. // guarantee best possible match between nodes.
  122. sort.Strings(servers)
  123. return c.serverList.SetServers(servers...)
  124. }
  125. func memcacheStatusCode(err error) string {
  126. // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
  127. switch err {
  128. case nil:
  129. return "200"
  130. case memcache.ErrCacheMiss:
  131. return "404"
  132. case memcache.ErrMalformedKey:
  133. return "400"
  134. default:
  135. return "500"
  136. }
  137. }
  138. // FetchReports gets reports from memcache.
  139. func (c *MemcacheClient) FetchReports(ctx context.Context, keys []string) (map[string]report.Report, []string, error) {
  140. span, ctx := opentracing.StartSpanFromContext(ctx, "Memcache.FetchReports")
  141. defer span.Finish()
  142. defer memcacheRequests.Add(float64(len(keys)))
  143. var found map[string]*memcache.Item
  144. err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.GetMulti", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
  145. var err error
  146. found, err = c.client.GetMulti(keys)
  147. return err
  148. })
  149. span.LogFields(otlog.Int("keys", len(keys)), otlog.Int("hits", len(found)))
  150. if err != nil {
  151. return nil, keys, err
  152. }
  153. // Decode all the reports in parallel.
  154. type result struct {
  155. key string
  156. report *report.Report
  157. }
  158. ch := make(chan result, len(keys))
  159. var missing []string
  160. for _, key := range keys {
  161. item, ok := found[key]
  162. if !ok {
  163. missing = append(missing, key)
  164. continue
  165. }
  166. go func(key string) {
  167. rep, err := report.MakeFromBinary(ctx, bytes.NewBuffer(item.Value), true, true)
  168. if err != nil {
  169. log.Warningf("Corrupt report in memcache %v: %v", key, err)
  170. ch <- result{key: key}
  171. return
  172. }
  173. ch <- result{key: key, report: rep}
  174. }(key)
  175. }
  176. reports := map[string]report.Report{}
  177. lenFound := len(keys) - len(missing)
  178. for i := 0; i < lenFound; i++ {
  179. r := <-ch
  180. if r.report == nil {
  181. missing = append(missing, r.key)
  182. } else {
  183. reports[r.key] = *r.report
  184. }
  185. }
  186. if len(missing) > 0 {
  187. sort.Strings(missing)
  188. log.Warningf("Missing %d reports from memcache: %v", len(missing), missing)
  189. }
  190. memcacheHits.Add(float64(len(reports)))
  191. return reports, missing, nil
  192. }
  193. // StoreReportBytes stores a report.
  194. func (c *MemcacheClient) StoreReportBytes(ctx context.Context, key string, rpt []byte) (int, error) {
  195. err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
  196. item := memcache.Item{Key: key, Value: rpt, Expiration: c.expiration}
  197. return c.client.Set(&item)
  198. })
  199. return len(rpt), err
  200. }