metrics.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver"
  4. import (
  5. "fmt"
  6. "strconv"
  7. "go.opentelemetry.io/collector/pdata/pcommon"
  8. "go.opentelemetry.io/collector/pdata/pmetric"
  9. "go.opentelemetry.io/collector/receiver/scraperhelper"
  10. )
  11. func rowToMetric(row stringMap, cfg MetricCfg, dest pmetric.Metric, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ScraperControllerSettings) error {
  12. dest.SetName(cfg.MetricName)
  13. dest.SetDescription(cfg.Description)
  14. dest.SetUnit(cfg.Unit)
  15. dataPointSlice := setMetricFields(cfg, dest)
  16. dataPoint := dataPointSlice.AppendEmpty()
  17. if cfg.StartTsColumn != "" {
  18. if val, found := row[cfg.StartTsColumn]; found {
  19. timestamp, err := strconv.ParseInt(val, 10, 64)
  20. if err != nil {
  21. return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)
  22. }
  23. startTime = pcommon.Timestamp(timestamp)
  24. } else {
  25. return fmt.Errorf("rowToMetric: start_ts_column not found")
  26. }
  27. }
  28. if cfg.TsColumn != "" {
  29. if val, found := row[cfg.TsColumn]; found {
  30. timestamp, err := strconv.ParseInt(val, 10, 64)
  31. if err != nil {
  32. return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)
  33. }
  34. ts = pcommon.Timestamp(timestamp)
  35. } else {
  36. return fmt.Errorf("rowToMetric: ts_column not found")
  37. }
  38. }
  39. setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
  40. value, found := row[cfg.ValueColumn]
  41. if !found {
  42. return fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn)
  43. }
  44. err := setDataPointValue(cfg, value, dataPoint)
  45. if err != nil {
  46. return fmt.Errorf("rowToMetric: %w", err)
  47. }
  48. attrs := dataPoint.Attributes()
  49. for k, v := range cfg.StaticAttributes {
  50. attrs.PutStr(k, v)
  51. }
  52. for _, columnName := range cfg.AttributeColumns {
  53. if attrVal, found := row[columnName]; found {
  54. attrs.PutStr(columnName, attrVal)
  55. } else {
  56. return fmt.Errorf("rowToMetric: attribute_column not found: '%s'", columnName)
  57. }
  58. }
  59. return nil
  60. }
  61. func setTimestamp(cfg MetricCfg, dp pmetric.NumberDataPoint, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ScraperControllerSettings) {
  62. dp.SetTimestamp(ts)
  63. // Cumulative sum should have a start time set to the beginning of the data points cumulation
  64. if cfg.Aggregation == MetricAggregationCumulative && cfg.DataType != MetricTypeGauge {
  65. dp.SetStartTimestamp(startTime)
  66. }
  67. // Non-cumulative sum should have a start time set to the previous endpoint
  68. if cfg.Aggregation == MetricAggregationDelta && cfg.DataType != MetricTypeGauge {
  69. dp.SetStartTimestamp(pcommon.NewTimestampFromTime(ts.AsTime().Add(-scrapeCfg.CollectionInterval)))
  70. }
  71. }
  72. func setMetricFields(cfg MetricCfg, dest pmetric.Metric) pmetric.NumberDataPointSlice {
  73. var out pmetric.NumberDataPointSlice
  74. switch cfg.DataType {
  75. case MetricTypeUnspecified, MetricTypeGauge:
  76. out = dest.SetEmptyGauge().DataPoints()
  77. case MetricTypeSum:
  78. sum := dest.SetEmptySum()
  79. sum.SetIsMonotonic(cfg.Monotonic)
  80. sum.SetAggregationTemporality(cfgToAggregationTemporality(cfg.Aggregation))
  81. out = sum.DataPoints()
  82. }
  83. return out
  84. }
  85. func cfgToAggregationTemporality(agg MetricAggregation) pmetric.AggregationTemporality {
  86. var out pmetric.AggregationTemporality
  87. switch agg {
  88. case MetricAggregationUnspecified, MetricAggregationCumulative:
  89. out = pmetric.AggregationTemporalityCumulative
  90. case MetricAggregationDelta:
  91. out = pmetric.AggregationTemporalityDelta
  92. }
  93. return out
  94. }
  95. func setDataPointValue(cfg MetricCfg, str string, dest pmetric.NumberDataPoint) error {
  96. switch cfg.ValueType {
  97. case MetricValueTypeUnspecified, MetricValueTypeInt:
  98. val, err := strconv.Atoi(str)
  99. if err != nil {
  100. return fmt.Errorf("setDataPointValue: col %q: error converting to integer: %w", cfg.ValueColumn, err)
  101. }
  102. dest.SetIntValue(int64(val))
  103. case MetricValueTypeDouble:
  104. val, err := strconv.ParseFloat(str, 64)
  105. if err != nil {
  106. return fmt.Errorf("setDataPointValue: col %q: error converting to double: %w", cfg.ValueColumn, err)
  107. }
  108. dest.SetDoubleValue(val)
  109. }
  110. return nil
  111. }