|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package redisreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver"
- import (
- "context"
- "strconv"
- "strings"
- "time"
- "github.com/go-redis/redis/v7"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/scraperhelper"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver/internal/metadata"
- )
- // Runs intermittently, fetching info from Redis, creating metrics/datapoints,
- // and feeding them to a metricsConsumer.
- type redisScraper struct {
- client client
- redisSvc *redisSvc
- settings component.TelemetrySettings
- mb *metadata.MetricsBuilder
- uptime time.Duration
- configInfo configInfo
- }
- const redisMaxDbs = 16 // Maximum possible number of redis databases
- func newRedisScraper(cfg *Config, settings receiver.CreateSettings) (scraperhelper.Scraper, error) {
- opts := &redis.Options{
- Addr: cfg.Endpoint,
- Username: cfg.Username,
- Password: string(cfg.Password),
- Network: cfg.Transport,
- }
- var err error
- if opts.TLSConfig, err = cfg.TLS.LoadTLSConfig(); err != nil {
- return nil, err
- }
- return newRedisScraperWithClient(newRedisClient(opts), settings, cfg)
- }
- func newRedisScraperWithClient(client client, settings receiver.CreateSettings, cfg *Config) (scraperhelper.Scraper, error) {
- configInfo, err := newConfigInfo(cfg)
- if err != nil {
- return nil, err
- }
- rs := &redisScraper{
- client: client,
- redisSvc: newRedisSvc(client),
- settings: settings.TelemetrySettings,
- mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
- configInfo: configInfo,
- }
- return scraperhelper.NewScraper(
- metadata.Type,
- rs.Scrape,
- scraperhelper.WithShutdown(rs.shutdown),
- )
- }
- func (rs *redisScraper) shutdown(context.Context) error {
- if rs.client != nil {
- return rs.client.close()
- }
- return nil
- }
- // Scrape is called periodically, querying Redis and building Metrics to send to
- // the next consumer. First builds 'fixed' metrics (non-keyspace metrics)
- // defined at startup time. Then builds 'keyspace' metrics if there are any
- // keyspace lines returned by Redis. There should be one keyspace line per
- // active Redis database, of which there can be 16.
- func (rs *redisScraper) Scrape(context.Context) (pmetric.Metrics, error) {
- inf, err := rs.redisSvc.info()
- if err != nil {
- return pmetric.Metrics{}, err
- }
- now := pcommon.NewTimestampFromTime(time.Now())
- currentUptime, err := inf.getUptimeInSeconds()
- if err != nil {
- return pmetric.Metrics{}, err
- }
- if rs.uptime == time.Duration(0) || rs.uptime > currentUptime {
- rs.mb.Reset(metadata.WithStartTime(pcommon.NewTimestampFromTime(now.AsTime().Add(-currentUptime))))
- }
- rs.uptime = currentUptime
- rs.recordCommonMetrics(now, inf)
- rs.recordKeyspaceMetrics(now, inf)
- rs.recordRoleMetrics(now, inf)
- rs.recordCmdMetrics(now, inf)
- rb := rs.mb.NewResourceBuilder()
- rb.SetRedisVersion(rs.getRedisVersion(inf))
- rb.SetServerAddress(rs.configInfo.Address)
- rb.SetServerPort(rs.configInfo.Port)
- return rs.mb.Emit(metadata.WithResource(rb.Emit())), nil
- }
- // recordCommonMetrics records metrics from Redis info key-value pairs.
- func (rs *redisScraper) recordCommonMetrics(ts pcommon.Timestamp, inf info) {
- recorders := rs.dataPointRecorders()
- for infoKey, infoVal := range inf {
- recorder, ok := recorders[infoKey]
- if !ok {
- // Skip unregistered metric.
- continue
- }
- switch recordDataPoint := recorder.(type) {
- case func(pcommon.Timestamp, int64):
- val, err := strconv.ParseInt(infoVal, 10, 64)
- if err != nil {
- rs.settings.Logger.Warn("failed to parse info int val", zap.String("key", infoKey),
- zap.String("val", infoVal), zap.Error(err))
- }
- recordDataPoint(ts, val)
- case func(pcommon.Timestamp, float64):
- val, err := strconv.ParseFloat(infoVal, 64)
- if err != nil {
- rs.settings.Logger.Warn("failed to parse info float val", zap.String("key", infoKey),
- zap.String("val", infoVal), zap.Error(err))
- }
- recordDataPoint(ts, val)
- }
- }
- }
- // recordKeyspaceMetrics records metrics from 'keyspace' Redis info key-value pairs,
- // e.g. "db0: keys=1,expires=2,avg_ttl=3".
- func (rs *redisScraper) recordKeyspaceMetrics(ts pcommon.Timestamp, inf info) {
- for db := 0; db < redisMaxDbs; db++ {
- key := "db" + strconv.Itoa(db)
- str, ok := inf[key]
- if !ok {
- break
- }
- keyspace, parsingError := parseKeyspaceString(db, str)
- if parsingError != nil {
- rs.settings.Logger.Warn("failed to parse keyspace string", zap.String("key", key),
- zap.String("val", str), zap.Error(parsingError))
- continue
- }
- rs.mb.RecordRedisDbKeysDataPoint(ts, int64(keyspace.keys), keyspace.db)
- rs.mb.RecordRedisDbExpiresDataPoint(ts, int64(keyspace.expires), keyspace.db)
- rs.mb.RecordRedisDbAvgTTLDataPoint(ts, int64(keyspace.avgTTL), keyspace.db)
- }
- }
- // getRedisVersion retrieves version string from 'redis_version' Redis info key-value pairs
- // e.g. "redis_version:5.0.7"
- func (rs *redisScraper) getRedisVersion(inf info) string {
- if str, ok := inf["redis_version"]; ok {
- return str
- }
- return "unknown"
- }
- // recordRoleMetrics records metrics from 'role' Redis info key-value pairs
- // e.g. "role:master"
- func (rs *redisScraper) recordRoleMetrics(ts pcommon.Timestamp, inf info) {
- if str, ok := inf["role"]; ok {
- if str == "master" {
- rs.mb.RecordRedisRoleDataPoint(ts, 1, metadata.AttributeRolePrimary)
- } else {
- rs.mb.RecordRedisRoleDataPoint(ts, 1, metadata.AttributeRoleReplica)
- }
- }
- }
- // recordCmdMetrics records per-command metrics from Redis info.
- // These include command stats and command latency percentiles.
- // Examples:
- //
- // "cmdstat_mget:calls=1685,usec=6032,usec_per_call=3.58,rejected_calls=0,failed_calls=0"
- // "latency_percentiles_usec_lastsave:p50=1.003,p99=1.003,p99.9=1.003"
- func (rs *redisScraper) recordCmdMetrics(ts pcommon.Timestamp, inf info) {
- const cmdstatPrefix = "cmdstat_"
- const latencyPrefix = "latency_percentiles_usec_"
- for key, val := range inf {
- if strings.HasPrefix(key, cmdstatPrefix) {
- rs.recordCmdStatsMetrics(ts, key[len(cmdstatPrefix):], val)
- } else if strings.HasPrefix(key, latencyPrefix) {
- rs.recordCmdLatencyMetrics(ts, key[len(latencyPrefix):], val)
- }
- }
- }
- // recordCmdStatsMetrics records metrics for a particlar Redis command.
- // Only 'calls' and 'usec' are recorded at the moment.
- // 'cmd' is the Redis command, 'val' is the values string (e.g. "calls=1685,usec=6032,usec_per_call=3.58,rejected_calls=0,failed_calls=0").
- func (rs *redisScraper) recordCmdStatsMetrics(ts pcommon.Timestamp, cmd, val string) {
- parts := strings.Split(strings.TrimSpace(val), ",")
- for _, element := range parts {
- subParts := strings.Split(element, "=")
- if len(subParts) == 1 {
- continue
- }
- parsed, err := strconv.ParseInt(subParts[1], 10, 64)
- if err != nil { // skip bad items
- continue
- }
- if subParts[0] == "calls" {
- rs.mb.RecordRedisCmdCallsDataPoint(ts, parsed, cmd)
- } else if subParts[0] == "usec" {
- rs.mb.RecordRedisCmdUsecDataPoint(ts, parsed, cmd)
- }
- }
- }
- // recordCmdLatencyMetrics record latency metrics of a particular Redis command.
- // 'cmd' is the Redis command, 'val' is the values string (e.g. "p50=1.003,p99=1.003,p99.9=1.003).
- // Latency values in the values string are expressed in microseconds.
- func (rs *redisScraper) recordCmdLatencyMetrics(ts pcommon.Timestamp, cmd, val string) {
- latencies, err := parseLatencyStats(val)
- if err != nil {
- return
- }
- for percentile, usecs := range latencies {
- if percentileAttr, ok := metadata.MapAttributePercentile[percentile]; ok {
- latency := usecs / 1e6 // metric is in seconds
- rs.mb.RecordRedisCmdLatencyDataPoint(ts, latency, cmd, percentileAttr)
- }
- }
- }
|