transaction.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "sort"
  9. "github.com/prometheus/common/model"
  10. "github.com/prometheus/prometheus/model/exemplar"
  11. "github.com/prometheus/prometheus/model/histogram"
  12. "github.com/prometheus/prometheus/model/labels"
  13. "github.com/prometheus/prometheus/model/metadata"
  14. "github.com/prometheus/prometheus/model/value"
  15. "github.com/prometheus/prometheus/scrape"
  16. "github.com/prometheus/prometheus/storage"
  17. "go.opentelemetry.io/collector/component"
  18. "go.opentelemetry.io/collector/consumer"
  19. "go.opentelemetry.io/collector/pdata/pcommon"
  20. "go.opentelemetry.io/collector/pdata/pmetric"
  21. "go.opentelemetry.io/collector/receiver"
  22. "go.opentelemetry.io/collector/receiver/receiverhelper"
  23. "go.uber.org/zap"
  24. )
  25. const (
  26. targetMetricName = "target_info"
  27. scopeMetricName = "otel_scope_info"
  28. scopeNameLabel = "otel_scope_name"
  29. scopeVersionLabel = "otel_scope_version"
  30. receiverName = "otelcol/prometheusreceiver"
  31. )
  32. type transaction struct {
  33. isNew bool
  34. trimSuffixes bool
  35. ctx context.Context
  36. families map[scopeID]map[string]*metricFamily
  37. mc scrape.MetricMetadataStore
  38. sink consumer.Metrics
  39. externalLabels labels.Labels
  40. nodeResource pcommon.Resource
  41. scopeAttributes map[scopeID]pcommon.Map
  42. logger *zap.Logger
  43. buildInfo component.BuildInfo
  44. metricAdjuster MetricsAdjuster
  45. obsrecv *receiverhelper.ObsReport
  46. // Used as buffer to calculate series ref hash.
  47. bufBytes []byte
  48. }
  49. var emptyScopeID scopeID
  50. type scopeID struct {
  51. name string
  52. version string
  53. }
  54. func newTransaction(
  55. ctx context.Context,
  56. metricAdjuster MetricsAdjuster,
  57. sink consumer.Metrics,
  58. externalLabels labels.Labels,
  59. settings receiver.CreateSettings,
  60. obsrecv *receiverhelper.ObsReport,
  61. trimSuffixes bool) *transaction {
  62. return &transaction{
  63. ctx: ctx,
  64. families: make(map[scopeID]map[string]*metricFamily),
  65. isNew: true,
  66. trimSuffixes: trimSuffixes,
  67. sink: sink,
  68. metricAdjuster: metricAdjuster,
  69. externalLabels: externalLabels,
  70. logger: settings.Logger,
  71. buildInfo: settings.BuildInfo,
  72. obsrecv: obsrecv,
  73. bufBytes: make([]byte, 0, 1024),
  74. scopeAttributes: make(map[scopeID]pcommon.Map),
  75. }
  76. }
  77. // Append always returns 0 to disable label caching.
  78. func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {
  79. select {
  80. case <-t.ctx.Done():
  81. return 0, errTransactionAborted
  82. default:
  83. }
  84. if len(t.externalLabels) != 0 {
  85. ls = append(ls, t.externalLabels...)
  86. sort.Sort(ls)
  87. }
  88. if t.isNew {
  89. if err := t.initTransaction(ls); err != nil {
  90. return 0, err
  91. }
  92. }
  93. // Any datapoint with duplicate labels MUST be rejected per:
  94. // * https://github.com/open-telemetry/wg-prometheus/issues/44
  95. // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
  96. // as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
  97. if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
  98. return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
  99. }
  100. metricName := ls.Get(model.MetricNameLabel)
  101. if metricName == "" {
  102. return 0, errMetricNameNotFound
  103. }
  104. // See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
  105. // up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
  106. // But it can also be a staleNaN, which is inserted when the target goes away.
  107. if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {
  108. if val == 0.0 {
  109. t.logger.Warn("Failed to scrape Prometheus endpoint",
  110. zap.Int64("scrape_timestamp", atMs),
  111. zap.Stringer("target_labels", ls))
  112. } else {
  113. t.logger.Warn("The 'up' metric contains invalid value",
  114. zap.Float64("value", val),
  115. zap.Int64("scrape_timestamp", atMs),
  116. zap.Stringer("target_labels", ls))
  117. }
  118. }
  119. // For the `target_info` metric we need to convert it to resource attributes.
  120. if metricName == targetMetricName {
  121. t.AddTargetInfo(ls)
  122. return 0, nil
  123. }
  124. // For the `otel_scope_info` metric we need to convert it to scope attributes.
  125. if metricName == scopeMetricName {
  126. t.addScopeInfo(ls)
  127. return 0, nil
  128. }
  129. curMF := t.getOrCreateMetricFamily(getScopeID(ls), metricName)
  130. err := curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val)
  131. if err != nil {
  132. t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls))
  133. }
  134. return 0, nil // never return errors, as that fails the whole scrape
  135. }
  136. func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricFamily {
  137. _, ok := t.families[scope]
  138. if !ok {
  139. t.families[scope] = make(map[string]*metricFamily)
  140. }
  141. curMf, ok := t.families[scope][mn]
  142. if !ok {
  143. fn := mn
  144. if _, ok := t.mc.GetMetadata(mn); !ok {
  145. fn = normalizeMetricName(mn)
  146. }
  147. if mf, ok := t.families[scope][fn]; ok && mf.includesMetric(mn) {
  148. curMf = mf
  149. } else {
  150. curMf = newMetricFamily(mn, t.mc, t.logger)
  151. t.families[scope][curMf.name] = curMf
  152. }
  153. }
  154. return curMf
  155. }
  156. func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
  157. select {
  158. case <-t.ctx.Done():
  159. return 0, errTransactionAborted
  160. default:
  161. }
  162. if t.isNew {
  163. if err := t.initTransaction(l); err != nil {
  164. return 0, err
  165. }
  166. }
  167. l = l.WithoutEmpty()
  168. if dupLabel, hasDup := l.HasDuplicateLabelNames(); hasDup {
  169. return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
  170. }
  171. mn := l.Get(model.MetricNameLabel)
  172. if mn == "" {
  173. return 0, errMetricNameNotFound
  174. }
  175. mf := t.getOrCreateMetricFamily(getScopeID(l), mn)
  176. mf.addExemplar(t.getSeriesRef(l, mf.mtype), e)
  177. return 0, nil
  178. }
  179. func (t *transaction) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
  180. //TODO: implement this func
  181. return 0, nil
  182. }
  183. func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) uint64 {
  184. var hash uint64
  185. hash, t.bufBytes = getSeriesRef(t.bufBytes, ls, mtype)
  186. return hash
  187. }
  188. // getMetrics returns all metrics to the given slice.
  189. // The only error returned by this function is errNoDataToBuild.
  190. func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
  191. if len(t.families) == 0 {
  192. return pmetric.Metrics{}, errNoDataToBuild
  193. }
  194. md := pmetric.NewMetrics()
  195. rms := md.ResourceMetrics().AppendEmpty()
  196. resource.CopyTo(rms.Resource())
  197. for scope, mfs := range t.families {
  198. ils := rms.ScopeMetrics().AppendEmpty()
  199. // If metrics don't include otel_scope_name or otel_scope_version
  200. // labels, use the receiver name and version.
  201. if scope == emptyScopeID {
  202. ils.Scope().SetName(receiverName)
  203. ils.Scope().SetVersion(t.buildInfo.Version)
  204. } else {
  205. // Otherwise, use the scope that was provided with the metrics.
  206. ils.Scope().SetName(scope.name)
  207. ils.Scope().SetVersion(scope.version)
  208. // If we got an otel_scope_info metric for that scope, get scope
  209. // attributes from it.
  210. attributes, ok := t.scopeAttributes[scope]
  211. if ok {
  212. attributes.CopyTo(ils.Scope().Attributes())
  213. }
  214. }
  215. metrics := ils.Metrics()
  216. for _, mf := range mfs {
  217. mf.appendMetric(metrics, t.trimSuffixes)
  218. }
  219. }
  220. return md, nil
  221. }
  222. func getScopeID(ls labels.Labels) scopeID {
  223. var scope scopeID
  224. for _, lbl := range ls {
  225. if lbl.Name == scopeNameLabel {
  226. scope.name = lbl.Value
  227. }
  228. if lbl.Name == scopeVersionLabel {
  229. scope.version = lbl.Value
  230. }
  231. }
  232. return scope
  233. }
  234. func (t *transaction) initTransaction(labels labels.Labels) error {
  235. target, ok := scrape.TargetFromContext(t.ctx)
  236. if !ok {
  237. return errors.New("unable to find target in context")
  238. }
  239. t.mc, ok = scrape.MetricMetadataStoreFromContext(t.ctx)
  240. if !ok {
  241. return errors.New("unable to find MetricMetadataStore in context")
  242. }
  243. job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel)
  244. if job == "" || instance == "" {
  245. return errNoJobInstance
  246. }
  247. t.nodeResource = CreateResource(job, instance, target.DiscoveredLabels())
  248. t.isNew = false
  249. return nil
  250. }
  251. func (t *transaction) Commit() error {
  252. if t.isNew {
  253. return nil
  254. }
  255. ctx := t.obsrecv.StartMetricsOp(t.ctx)
  256. md, err := t.getMetrics(t.nodeResource)
  257. if err != nil {
  258. t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
  259. return err
  260. }
  261. numPoints := md.DataPointCount()
  262. if numPoints == 0 {
  263. return nil
  264. }
  265. if err = t.metricAdjuster.AdjustMetrics(md); err != nil {
  266. t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
  267. return err
  268. }
  269. err = t.sink.ConsumeMetrics(ctx, md)
  270. t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
  271. return err
  272. }
  273. func (t *transaction) Rollback() error {
  274. return nil
  275. }
  276. func (t *transaction) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
  277. //TODO: implement this func
  278. return 0, nil
  279. }
  280. func (t *transaction) AddTargetInfo(labels labels.Labels) {
  281. attrs := t.nodeResource.Attributes()
  282. for _, lbl := range labels {
  283. if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
  284. continue
  285. }
  286. attrs.PutStr(lbl.Name, lbl.Value)
  287. }
  288. }
  289. func (t *transaction) addScopeInfo(labels labels.Labels) {
  290. attrs := pcommon.NewMap()
  291. scope := scopeID{}
  292. for _, lbl := range labels {
  293. if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
  294. continue
  295. }
  296. if lbl.Name == scopeNameLabel {
  297. scope.name = lbl.Value
  298. continue
  299. }
  300. if lbl.Name == scopeVersionLabel {
  301. scope.version = lbl.Value
  302. continue
  303. }
  304. attrs.PutStr(lbl.Name, lbl.Value)
  305. }
  306. t.scopeAttributes[scope] = attrs
  307. }
  308. func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
  309. return ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mtype)...)
  310. }