collector.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
  4. import (
  5. "encoding/hex"
  6. "fmt"
  7. "sort"
  8. "github.com/prometheus/client_golang/prometheus"
  9. "github.com/prometheus/common/model"
  10. "go.opentelemetry.io/collector/pdata/pcommon"
  11. "go.opentelemetry.io/collector/pdata/pmetric"
  12. conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
  13. "go.uber.org/zap"
  14. prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
  15. )
  16. const (
  17. targetMetricName = "target_info"
  18. )
  19. var (
  20. separatorString = string([]byte{model.SeparatorByte})
  21. )
  22. type collector struct {
  23. accumulator accumulator
  24. logger *zap.Logger
  25. sendTimestamps bool
  26. addMetricSuffixes bool
  27. namespace string
  28. constLabels prometheus.Labels
  29. }
  30. func newCollector(config *Config, logger *zap.Logger) *collector {
  31. return &collector{
  32. accumulator: newAccumulator(logger, config.MetricExpiration),
  33. logger: logger,
  34. namespace: prometheustranslator.CleanUpString(config.Namespace),
  35. sendTimestamps: config.SendTimestamps,
  36. constLabels: config.ConstLabels,
  37. addMetricSuffixes: config.AddMetricSuffixes,
  38. }
  39. }
  40. func convertExemplars(exemplars pmetric.ExemplarSlice) []prometheus.Exemplar {
  41. length := exemplars.Len()
  42. result := make([]prometheus.Exemplar, length)
  43. for i := 0; i < length; i++ {
  44. e := exemplars.At(i)
  45. exemplarLabels := make(prometheus.Labels, 0)
  46. if traceID := e.TraceID(); !traceID.IsEmpty() {
  47. exemplarLabels["trace_id"] = hex.EncodeToString(traceID[:])
  48. }
  49. if spanID := e.SpanID(); !spanID.IsEmpty() {
  50. exemplarLabels["span_id"] = hex.EncodeToString(spanID[:])
  51. }
  52. var value float64
  53. switch e.ValueType() {
  54. case pmetric.ExemplarValueTypeDouble:
  55. value = e.DoubleValue()
  56. case pmetric.ExemplarValueTypeInt:
  57. value = float64(e.IntValue())
  58. }
  59. result[i] = prometheus.Exemplar{
  60. Value: value,
  61. Labels: exemplarLabels,
  62. Timestamp: e.Timestamp().AsTime(),
  63. }
  64. }
  65. return result
  66. }
  67. // Describe is a no-op, because the collector dynamically allocates metrics.
  68. // https://github.com/prometheus/client_golang/blob/v1.9.0/prometheus/collector.go#L28-L40
  69. func (c *collector) Describe(_ chan<- *prometheus.Desc) {}
  70. /*
  71. Processing
  72. */
  73. func (c *collector) processMetrics(rm pmetric.ResourceMetrics) (n int) {
  74. return c.accumulator.Accumulate(rm)
  75. }
  76. var errUnknownMetricType = fmt.Errorf("unknown metric type")
  77. func (c *collector) convertMetric(metric pmetric.Metric, resourceAttrs pcommon.Map) (prometheus.Metric, error) {
  78. switch metric.Type() {
  79. case pmetric.MetricTypeGauge:
  80. return c.convertGauge(metric, resourceAttrs)
  81. case pmetric.MetricTypeSum:
  82. return c.convertSum(metric, resourceAttrs)
  83. case pmetric.MetricTypeHistogram:
  84. return c.convertDoubleHistogram(metric, resourceAttrs)
  85. case pmetric.MetricTypeSummary:
  86. return c.convertSummary(metric, resourceAttrs)
  87. }
  88. return nil, errUnknownMetricType
  89. }
  90. func (c *collector) getMetricMetadata(metric pmetric.Metric, attributes pcommon.Map, resourceAttrs pcommon.Map) (*prometheus.Desc, []string) {
  91. keys := make([]string, 0, attributes.Len()+2) // +2 for job and instance labels.
  92. values := make([]string, 0, attributes.Len()+2)
  93. attributes.Range(func(k string, v pcommon.Value) bool {
  94. keys = append(keys, prometheustranslator.NormalizeLabel(k))
  95. values = append(values, v.AsString())
  96. return true
  97. })
  98. if job, ok := extractJob(resourceAttrs); ok {
  99. keys = append(keys, model.JobLabel)
  100. values = append(values, job)
  101. }
  102. if instance, ok := extractInstance(resourceAttrs); ok {
  103. keys = append(keys, model.InstanceLabel)
  104. values = append(values, instance)
  105. }
  106. return prometheus.NewDesc(
  107. prometheustranslator.BuildCompliantName(metric, c.namespace, c.addMetricSuffixes),
  108. metric.Description(),
  109. keys,
  110. c.constLabels,
  111. ), values
  112. }
  113. func (c *collector) convertGauge(metric pmetric.Metric, resourceAttrs pcommon.Map) (prometheus.Metric, error) {
  114. ip := metric.Gauge().DataPoints().At(0)
  115. desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)
  116. var value float64
  117. switch ip.ValueType() {
  118. case pmetric.NumberDataPointValueTypeInt:
  119. value = float64(ip.IntValue())
  120. case pmetric.NumberDataPointValueTypeDouble:
  121. value = ip.DoubleValue()
  122. }
  123. m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, value, attributes...)
  124. if err != nil {
  125. return nil, err
  126. }
  127. if c.sendTimestamps {
  128. return prometheus.NewMetricWithTimestamp(ip.Timestamp().AsTime(), m), nil
  129. }
  130. return m, nil
  131. }
  132. func (c *collector) convertSum(metric pmetric.Metric, resourceAttrs pcommon.Map) (prometheus.Metric, error) {
  133. ip := metric.Sum().DataPoints().At(0)
  134. metricType := prometheus.GaugeValue
  135. if metric.Sum().IsMonotonic() {
  136. metricType = prometheus.CounterValue
  137. }
  138. desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)
  139. var value float64
  140. switch ip.ValueType() {
  141. case pmetric.NumberDataPointValueTypeInt:
  142. value = float64(ip.IntValue())
  143. case pmetric.NumberDataPointValueTypeDouble:
  144. value = ip.DoubleValue()
  145. }
  146. var exemplars []prometheus.Exemplar
  147. // Prometheus currently only supports exporting counters
  148. if metricType == prometheus.CounterValue {
  149. exemplars = convertExemplars(ip.Exemplars())
  150. }
  151. m, err := prometheus.NewConstMetric(desc, metricType, value, attributes...)
  152. if err != nil {
  153. return nil, err
  154. }
  155. if len(exemplars) > 0 {
  156. m, err = prometheus.NewMetricWithExemplars(m, exemplars...)
  157. if err != nil {
  158. return nil, err
  159. }
  160. }
  161. if c.sendTimestamps {
  162. return prometheus.NewMetricWithTimestamp(ip.Timestamp().AsTime(), m), nil
  163. }
  164. return m, nil
  165. }
  166. func (c *collector) convertSummary(metric pmetric.Metric, resourceAttrs pcommon.Map) (prometheus.Metric, error) {
  167. // TODO: In the off chance that we have multiple points
  168. // within the same metric, how should we handle them?
  169. point := metric.Summary().DataPoints().At(0)
  170. quantiles := make(map[float64]float64)
  171. qv := point.QuantileValues()
  172. for j := 0; j < qv.Len(); j++ {
  173. qvj := qv.At(j)
  174. // There should be EXACTLY one quantile value lest it is an invalid exposition.
  175. quantiles[qvj.Quantile()] = qvj.Value()
  176. }
  177. desc, attributes := c.getMetricMetadata(metric, point.Attributes(), resourceAttrs)
  178. m, err := prometheus.NewConstSummary(desc, point.Count(), point.Sum(), quantiles, attributes...)
  179. if err != nil {
  180. return nil, err
  181. }
  182. if c.sendTimestamps {
  183. return prometheus.NewMetricWithTimestamp(point.Timestamp().AsTime(), m), nil
  184. }
  185. return m, nil
  186. }
  187. func (c *collector) convertDoubleHistogram(metric pmetric.Metric, resourceAttrs pcommon.Map) (prometheus.Metric, error) {
  188. ip := metric.Histogram().DataPoints().At(0)
  189. desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)
  190. indicesMap := make(map[float64]int)
  191. buckets := make([]float64, 0, ip.BucketCounts().Len())
  192. for index := 0; index < ip.ExplicitBounds().Len(); index++ {
  193. bucket := ip.ExplicitBounds().At(index)
  194. if _, added := indicesMap[bucket]; !added {
  195. indicesMap[bucket] = index
  196. buckets = append(buckets, bucket)
  197. }
  198. }
  199. sort.Float64s(buckets)
  200. cumCount := uint64(0)
  201. points := make(map[float64]uint64)
  202. for _, bucket := range buckets {
  203. index := indicesMap[bucket]
  204. var countPerBucket uint64
  205. if ip.ExplicitBounds().Len() > 0 && index < ip.ExplicitBounds().Len() {
  206. countPerBucket = ip.BucketCounts().At(index)
  207. }
  208. cumCount += countPerBucket
  209. points[bucket] = cumCount
  210. }
  211. exemplars := convertExemplars(ip.Exemplars())
  212. m, err := prometheus.NewConstHistogram(desc, ip.Count(), ip.Sum(), points, attributes...)
  213. if err != nil {
  214. return nil, err
  215. }
  216. if len(exemplars) > 0 {
  217. m, err = prometheus.NewMetricWithExemplars(m, exemplars...)
  218. if err != nil {
  219. return nil, err
  220. }
  221. }
  222. if c.sendTimestamps {
  223. return prometheus.NewMetricWithTimestamp(ip.Timestamp().AsTime(), m), nil
  224. }
  225. return m, nil
  226. }
  227. func (c *collector) createTargetInfoMetrics(resourceAttrs []pcommon.Map) ([]prometheus.Metric, error) {
  228. var lastErr error
  229. // deduplicate resourceAttrs by job and instance
  230. deduplicatedResourceAttrs := make([]pcommon.Map, 0, len(resourceAttrs))
  231. seenResource := map[string]struct{}{}
  232. for _, attrs := range resourceAttrs {
  233. sig := resourceSignature(attrs)
  234. if sig == "" {
  235. continue
  236. }
  237. if _, ok := seenResource[sig]; !ok {
  238. seenResource[sig] = struct{}{}
  239. deduplicatedResourceAttrs = append(deduplicatedResourceAttrs, attrs)
  240. }
  241. }
  242. metrics := make([]prometheus.Metric, 0, len(deduplicatedResourceAttrs))
  243. for _, rAttributes := range deduplicatedResourceAttrs {
  244. // map ensures no duplicate label name
  245. labels := make(map[string]string, rAttributes.Len()+2) // +2 for job and instance labels.
  246. // Use resource attributes (other than those used for job+instance) as the
  247. // metric labels for the target info metric
  248. attributes := pcommon.NewMap()
  249. rAttributes.CopyTo(attributes)
  250. attributes.RemoveIf(func(k string, _ pcommon.Value) bool {
  251. switch k {
  252. case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID:
  253. // Remove resource attributes used for job + instance
  254. return true
  255. default:
  256. return false
  257. }
  258. })
  259. attributes.Range(func(k string, v pcommon.Value) bool {
  260. finalKey := prometheustranslator.NormalizeLabel(k)
  261. if existingVal, ok := labels[finalKey]; ok {
  262. labels[finalKey] = existingVal + ";" + v.AsString()
  263. } else {
  264. labels[finalKey] = v.AsString()
  265. }
  266. return true
  267. })
  268. // Map service.name + service.namespace to job
  269. if job, ok := extractJob(rAttributes); ok {
  270. labels[model.JobLabel] = job
  271. }
  272. // Map service.instance.id to instance
  273. if instance, ok := extractInstance(rAttributes); ok {
  274. labels[model.InstanceLabel] = instance
  275. }
  276. name := targetMetricName
  277. if len(c.namespace) > 0 {
  278. name = c.namespace + "_" + name
  279. }
  280. keys := make([]string, 0, len(labels))
  281. values := make([]string, 0, len(labels))
  282. for key, value := range labels {
  283. keys = append(keys, key)
  284. values = append(values, value)
  285. }
  286. metric, err := prometheus.NewConstMetric(
  287. prometheus.NewDesc(name, "Target metadata", keys, nil),
  288. prometheus.GaugeValue,
  289. 1,
  290. values...,
  291. )
  292. if err != nil {
  293. lastErr = err
  294. continue
  295. }
  296. metrics = append(metrics, metric)
  297. }
  298. return metrics, lastErr
  299. }
  300. /*
  301. Reporting
  302. */
  303. func (c *collector) Collect(ch chan<- prometheus.Metric) {
  304. c.logger.Debug("collect called")
  305. inMetrics, resourceAttrs := c.accumulator.Collect()
  306. targetMetrics, err := c.createTargetInfoMetrics(resourceAttrs)
  307. if err != nil {
  308. c.logger.Error(fmt.Sprintf("failed to convert metric %s: %s", targetMetricName, err.Error()))
  309. }
  310. for _, m := range targetMetrics {
  311. ch <- m
  312. c.logger.Debug(fmt.Sprintf("metric served: %s", m.Desc().String()))
  313. }
  314. for i := range inMetrics {
  315. pMetric := inMetrics[i]
  316. rAttr := resourceAttrs[i]
  317. m, err := c.convertMetric(pMetric, rAttr)
  318. if err != nil {
  319. c.logger.Error(fmt.Sprintf("failed to convert metric %s: %s", pMetric.Name(), err.Error()))
  320. continue
  321. }
  322. ch <- m
  323. c.logger.Debug(fmt.Sprintf("metric served: %s", m.Desc().String()))
  324. }
  325. }