redis_scraper.go 7.8 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package redisreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver"
  4. import (
  5. "context"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/go-redis/redis/v7"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/pdata/pcommon"
  12. "go.opentelemetry.io/collector/pdata/pmetric"
  13. "go.opentelemetry.io/collector/receiver"
  14. "go.opentelemetry.io/collector/receiver/scraperhelper"
  15. "go.uber.org/zap"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver/internal/metadata"
  17. )
  18. // Runs intermittently, fetching info from Redis, creating metrics/datapoints,
  19. // and feeding them to a metricsConsumer.
  20. type redisScraper struct {
  21. client client
  22. redisSvc *redisSvc
  23. settings component.TelemetrySettings
  24. mb *metadata.MetricsBuilder
  25. uptime time.Duration
  26. configInfo configInfo
  27. }
  28. const redisMaxDbs = 16 // Maximum possible number of redis databases
  29. func newRedisScraper(cfg *Config, settings receiver.CreateSettings) (scraperhelper.Scraper, error) {
  30. opts := &redis.Options{
  31. Addr: cfg.Endpoint,
  32. Username: cfg.Username,
  33. Password: string(cfg.Password),
  34. Network: cfg.Transport,
  35. }
  36. var err error
  37. if opts.TLSConfig, err = cfg.TLS.LoadTLSConfig(); err != nil {
  38. return nil, err
  39. }
  40. return newRedisScraperWithClient(newRedisClient(opts), settings, cfg)
  41. }
  42. func newRedisScraperWithClient(client client, settings receiver.CreateSettings, cfg *Config) (scraperhelper.Scraper, error) {
  43. configInfo, err := newConfigInfo(cfg)
  44. if err != nil {
  45. return nil, err
  46. }
  47. rs := &redisScraper{
  48. client: client,
  49. redisSvc: newRedisSvc(client),
  50. settings: settings.TelemetrySettings,
  51. mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
  52. configInfo: configInfo,
  53. }
  54. return scraperhelper.NewScraper(
  55. metadata.Type,
  56. rs.Scrape,
  57. scraperhelper.WithShutdown(rs.shutdown),
  58. )
  59. }
  60. func (rs *redisScraper) shutdown(context.Context) error {
  61. if rs.client != nil {
  62. return rs.client.close()
  63. }
  64. return nil
  65. }
  66. // Scrape is called periodically, querying Redis and building Metrics to send to
  67. // the next consumer. First builds 'fixed' metrics (non-keyspace metrics)
  68. // defined at startup time. Then builds 'keyspace' metrics if there are any
  69. // keyspace lines returned by Redis. There should be one keyspace line per
  70. // active Redis database, of which there can be 16.
  71. func (rs *redisScraper) Scrape(context.Context) (pmetric.Metrics, error) {
  72. inf, err := rs.redisSvc.info()
  73. if err != nil {
  74. return pmetric.Metrics{}, err
  75. }
  76. now := pcommon.NewTimestampFromTime(time.Now())
  77. currentUptime, err := inf.getUptimeInSeconds()
  78. if err != nil {
  79. return pmetric.Metrics{}, err
  80. }
  81. if rs.uptime == time.Duration(0) || rs.uptime > currentUptime {
  82. rs.mb.Reset(metadata.WithStartTime(pcommon.NewTimestampFromTime(now.AsTime().Add(-currentUptime))))
  83. }
  84. rs.uptime = currentUptime
  85. rs.recordCommonMetrics(now, inf)
  86. rs.recordKeyspaceMetrics(now, inf)
  87. rs.recordRoleMetrics(now, inf)
  88. rs.recordCmdMetrics(now, inf)
  89. rb := rs.mb.NewResourceBuilder()
  90. rb.SetRedisVersion(rs.getRedisVersion(inf))
  91. rb.SetServerAddress(rs.configInfo.Address)
  92. rb.SetServerPort(rs.configInfo.Port)
  93. return rs.mb.Emit(metadata.WithResource(rb.Emit())), nil
  94. }
  95. // recordCommonMetrics records metrics from Redis info key-value pairs.
  96. func (rs *redisScraper) recordCommonMetrics(ts pcommon.Timestamp, inf info) {
  97. recorders := rs.dataPointRecorders()
  98. for infoKey, infoVal := range inf {
  99. recorder, ok := recorders[infoKey]
  100. if !ok {
  101. // Skip unregistered metric.
  102. continue
  103. }
  104. switch recordDataPoint := recorder.(type) {
  105. case func(pcommon.Timestamp, int64):
  106. val, err := strconv.ParseInt(infoVal, 10, 64)
  107. if err != nil {
  108. rs.settings.Logger.Warn("failed to parse info int val", zap.String("key", infoKey),
  109. zap.String("val", infoVal), zap.Error(err))
  110. }
  111. recordDataPoint(ts, val)
  112. case func(pcommon.Timestamp, float64):
  113. val, err := strconv.ParseFloat(infoVal, 64)
  114. if err != nil {
  115. rs.settings.Logger.Warn("failed to parse info float val", zap.String("key", infoKey),
  116. zap.String("val", infoVal), zap.Error(err))
  117. }
  118. recordDataPoint(ts, val)
  119. }
  120. }
  121. }
  122. // recordKeyspaceMetrics records metrics from 'keyspace' Redis info key-value pairs,
  123. // e.g. "db0: keys=1,expires=2,avg_ttl=3".
  124. func (rs *redisScraper) recordKeyspaceMetrics(ts pcommon.Timestamp, inf info) {
  125. for db := 0; db < redisMaxDbs; db++ {
  126. key := "db" + strconv.Itoa(db)
  127. str, ok := inf[key]
  128. if !ok {
  129. break
  130. }
  131. keyspace, parsingError := parseKeyspaceString(db, str)
  132. if parsingError != nil {
  133. rs.settings.Logger.Warn("failed to parse keyspace string", zap.String("key", key),
  134. zap.String("val", str), zap.Error(parsingError))
  135. continue
  136. }
  137. rs.mb.RecordRedisDbKeysDataPoint(ts, int64(keyspace.keys), keyspace.db)
  138. rs.mb.RecordRedisDbExpiresDataPoint(ts, int64(keyspace.expires), keyspace.db)
  139. rs.mb.RecordRedisDbAvgTTLDataPoint(ts, int64(keyspace.avgTTL), keyspace.db)
  140. }
  141. }
  142. // getRedisVersion retrieves version string from 'redis_version' Redis info key-value pairs
  143. // e.g. "redis_version:5.0.7"
  144. func (rs *redisScraper) getRedisVersion(inf info) string {
  145. if str, ok := inf["redis_version"]; ok {
  146. return str
  147. }
  148. return "unknown"
  149. }
  150. // recordRoleMetrics records metrics from 'role' Redis info key-value pairs
  151. // e.g. "role:master"
  152. func (rs *redisScraper) recordRoleMetrics(ts pcommon.Timestamp, inf info) {
  153. if str, ok := inf["role"]; ok {
  154. if str == "master" {
  155. rs.mb.RecordRedisRoleDataPoint(ts, 1, metadata.AttributeRolePrimary)
  156. } else {
  157. rs.mb.RecordRedisRoleDataPoint(ts, 1, metadata.AttributeRoleReplica)
  158. }
  159. }
  160. }
  161. // recordCmdMetrics records per-command metrics from Redis info.
  162. // These include command stats and command latency percentiles.
  163. // Examples:
  164. //
  165. // "cmdstat_mget:calls=1685,usec=6032,usec_per_call=3.58,rejected_calls=0,failed_calls=0"
  166. // "latency_percentiles_usec_lastsave:p50=1.003,p99=1.003,p99.9=1.003"
  167. func (rs *redisScraper) recordCmdMetrics(ts pcommon.Timestamp, inf info) {
  168. const cmdstatPrefix = "cmdstat_"
  169. const latencyPrefix = "latency_percentiles_usec_"
  170. for key, val := range inf {
  171. if strings.HasPrefix(key, cmdstatPrefix) {
  172. rs.recordCmdStatsMetrics(ts, key[len(cmdstatPrefix):], val)
  173. } else if strings.HasPrefix(key, latencyPrefix) {
  174. rs.recordCmdLatencyMetrics(ts, key[len(latencyPrefix):], val)
  175. }
  176. }
  177. }
  178. // recordCmdStatsMetrics records metrics for a particlar Redis command.
  179. // Only 'calls' and 'usec' are recorded at the moment.
  180. // 'cmd' is the Redis command, 'val' is the values string (e.g. "calls=1685,usec=6032,usec_per_call=3.58,rejected_calls=0,failed_calls=0").
  181. func (rs *redisScraper) recordCmdStatsMetrics(ts pcommon.Timestamp, cmd, val string) {
  182. parts := strings.Split(strings.TrimSpace(val), ",")
  183. for _, element := range parts {
  184. subParts := strings.Split(element, "=")
  185. if len(subParts) == 1 {
  186. continue
  187. }
  188. parsed, err := strconv.ParseInt(subParts[1], 10, 64)
  189. if err != nil { // skip bad items
  190. continue
  191. }
  192. if subParts[0] == "calls" {
  193. rs.mb.RecordRedisCmdCallsDataPoint(ts, parsed, cmd)
  194. } else if subParts[0] == "usec" {
  195. rs.mb.RecordRedisCmdUsecDataPoint(ts, parsed, cmd)
  196. }
  197. }
  198. }
  199. // recordCmdLatencyMetrics record latency metrics of a particular Redis command.
  200. // 'cmd' is the Redis command, 'val' is the values string (e.g. "p50=1.003,p99=1.003,p99.9=1.003).
  201. // Latency values in the values string are expressed in microseconds.
  202. func (rs *redisScraper) recordCmdLatencyMetrics(ts pcommon.Timestamp, cmd, val string) {
  203. latencies, err := parseLatencyStats(val)
  204. if err != nil {
  205. return
  206. }
  207. for percentile, usecs := range latencies {
  208. if percentileAttr, ok := metadata.MapAttributePercentile[percentile]; ok {
  209. latency := usecs / 1e6 // metric is in seconds
  210. rs.mb.RecordRedisCmdLatencyDataPoint(ts, latency, cmd, percentileAttr)
  211. }
  212. }
  213. }