exporter_metrics.go 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
  4. import (
  5. "context"
  6. "database/sql"
  7. "errors"
  8. "fmt"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/pdata/pmetric"
  11. "go.uber.org/zap"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
  13. )
  14. type metricsExporter struct {
  15. client *sql.DB
  16. logger *zap.Logger
  17. cfg *Config
  18. }
  19. func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) {
  20. client, err := newClickhouseClient(cfg)
  21. if err != nil {
  22. return nil, err
  23. }
  24. return &metricsExporter{
  25. client: client,
  26. logger: logger,
  27. cfg: cfg,
  28. }, nil
  29. }
  30. func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
  31. if err := createDatabase(ctx, e.cfg); err != nil {
  32. return err
  33. }
  34. internal.SetLogger(e.logger)
  35. ttlExpr := generateTTLExpr(e.cfg.TTLDays, e.cfg.TTL)
  36. return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, ttlExpr, e.client)
  37. }
  38. // shutdown will shut down the exporter.
  39. func (e *metricsExporter) shutdown(_ context.Context) error {
  40. if e.client != nil {
  41. return e.client.Close()
  42. }
  43. return nil
  44. }
  45. func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
  46. metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName)
  47. for i := 0; i < md.ResourceMetrics().Len(); i++ {
  48. metrics := md.ResourceMetrics().At(i)
  49. resAttr := attributesToMap(metrics.Resource().Attributes())
  50. for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
  51. rs := metrics.ScopeMetrics().At(j).Metrics()
  52. scopeInstr := metrics.ScopeMetrics().At(j).Scope()
  53. scopeURL := metrics.ScopeMetrics().At(j).SchemaUrl()
  54. for k := 0; k < rs.Len(); k++ {
  55. r := rs.At(k)
  56. var errs error
  57. //exhaustive:enforce
  58. switch r.Type() {
  59. case pmetric.MetricTypeGauge:
  60. errs = errors.Join(errs, metricsMap[pmetric.MetricTypeGauge].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Gauge(), r.Name(), r.Description(), r.Unit()))
  61. case pmetric.MetricTypeSum:
  62. errs = errors.Join(errs, metricsMap[pmetric.MetricTypeSum].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Sum(), r.Name(), r.Description(), r.Unit()))
  63. case pmetric.MetricTypeHistogram:
  64. errs = errors.Join(errs, metricsMap[pmetric.MetricTypeHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Histogram(), r.Name(), r.Description(), r.Unit()))
  65. case pmetric.MetricTypeExponentialHistogram:
  66. errs = errors.Join(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.ExponentialHistogram(), r.Name(), r.Description(), r.Unit()))
  67. case pmetric.MetricTypeSummary:
  68. errs = errors.Join(errs, metricsMap[pmetric.MetricTypeSummary].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Summary(), r.Name(), r.Description(), r.Unit()))
  69. case pmetric.MetricTypeEmpty:
  70. return fmt.Errorf("metrics type is unset")
  71. default:
  72. return fmt.Errorf("unsupported metrics type")
  73. }
  74. if errs != nil {
  75. return errs
  76. }
  77. }
  78. }
  79. }
  80. // batch insert https://clickhouse.com/docs/en/about-us/performance/#performance-when-inserting-data
  81. return internal.InsertMetrics(ctx, e.client, metricsMap)
  82. }