1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
- import (
- "context"
- "database/sql"
- "errors"
- "fmt"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
- )
- type metricsExporter struct {
- client *sql.DB
- logger *zap.Logger
- cfg *Config
- }
- func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) {
- client, err := newClickhouseClient(cfg)
- if err != nil {
- return nil, err
- }
- return &metricsExporter{
- client: client,
- logger: logger,
- cfg: cfg,
- }, nil
- }
- func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
- if err := createDatabase(ctx, e.cfg); err != nil {
- return err
- }
- internal.SetLogger(e.logger)
- ttlExpr := generateTTLExpr(e.cfg.TTLDays, e.cfg.TTL)
- return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, ttlExpr, e.client)
- }
- // shutdown will shut down the exporter.
- func (e *metricsExporter) shutdown(_ context.Context) error {
- if e.client != nil {
- return e.client.Close()
- }
- return nil
- }
- func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
- metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName)
- for i := 0; i < md.ResourceMetrics().Len(); i++ {
- metrics := md.ResourceMetrics().At(i)
- resAttr := attributesToMap(metrics.Resource().Attributes())
- for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
- rs := metrics.ScopeMetrics().At(j).Metrics()
- scopeInstr := metrics.ScopeMetrics().At(j).Scope()
- scopeURL := metrics.ScopeMetrics().At(j).SchemaUrl()
- for k := 0; k < rs.Len(); k++ {
- r := rs.At(k)
- var errs error
- //exhaustive:enforce
- switch r.Type() {
- case pmetric.MetricTypeGauge:
- errs = errors.Join(errs, metricsMap[pmetric.MetricTypeGauge].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Gauge(), r.Name(), r.Description(), r.Unit()))
- case pmetric.MetricTypeSum:
- errs = errors.Join(errs, metricsMap[pmetric.MetricTypeSum].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Sum(), r.Name(), r.Description(), r.Unit()))
- case pmetric.MetricTypeHistogram:
- errs = errors.Join(errs, metricsMap[pmetric.MetricTypeHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Histogram(), r.Name(), r.Description(), r.Unit()))
- case pmetric.MetricTypeExponentialHistogram:
- errs = errors.Join(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.ExponentialHistogram(), r.Name(), r.Description(), r.Unit()))
- case pmetric.MetricTypeSummary:
- errs = errors.Join(errs, metricsMap[pmetric.MetricTypeSummary].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Summary(), r.Name(), r.Description(), r.Unit()))
- case pmetric.MetricTypeEmpty:
- return fmt.Errorf("metrics type is unset")
- default:
- return fmt.Errorf("unsupported metrics type")
- }
- if errs != nil {
- return errs
- }
- }
- }
- }
- // batch insert https://clickhouse.com/docs/en/about-us/performance/#performance-when-inserting-data
- return internal.InsertMetrics(ctx, e.client, metricsMap)
- }
|