|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
- import (
- "context"
- "database/sql"
- "fmt"
- "regexp"
- "strconv"
- "strings"
- "time"
- _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/ptrace"
- conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
- )
- type tracesExporter struct {
- client *sql.DB
- insertSQL string
- logger *zap.Logger
- cfg *Config
- }
- func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) {
- client, err := newClickhouseClient(cfg)
- if err != nil {
- return nil, err
- }
- return &tracesExporter{
- client: client,
- insertSQL: renderInsertTracesSQL(cfg),
- logger: logger,
- cfg: cfg,
- }, nil
- }
- func (e *tracesExporter) start(ctx context.Context, _ component.Host) error {
- //if err := createDatabase(ctx, e.cfg); err != nil {
- // return err
- //}
- //if err := createTracesTable(ctx, e.cfg, e.client); err != nil {
- // return err
- //}
- return nil
- }
- // shutdown will shut down the exporter.
- func (e *tracesExporter) shutdown(_ context.Context) error {
- if e.client != nil {
- return e.client.Close()
- }
- return nil
- }
- func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
- start := time.Now()
- err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
- statement, err := tx.PrepareContext(ctx, e.insertSQL)
- if err != nil {
- return fmt.Errorf("PrepareContext:%w", err)
- }
- defer func() {
- _ = statement.Close()
- }()
- for i := 0; i < td.ResourceSpans().Len(); i++ {
- spans := td.ResourceSpans().At(i)
- res := spans.Resource()
- resAttr := attributesToMap(res.Attributes())
- var serviceName string
- if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
- serviceName = v.Str()
- }
- for j := 0; j < spans.ScopeSpans().Len(); j++ {
- rs := spans.ScopeSpans().At(j).Spans()
- scopeName := spans.ScopeSpans().At(j).Scope().Name()
- scopeVersion := spans.ScopeSpans().At(j).Scope().Version()
- for k := 0; k < rs.Len(); k++ {
- r := rs.At(k)
- spanAttr := attributesToMap(r.Attributes())
- if spans.ScopeSpans().At(j).Scope().Name() != "" {
- spanAttr[conventions.AttributeOtelScopeName] = spans.ScopeSpans().At(j).Scope().Name()
- }
- if spans.ScopeSpans().At(j).Scope().Version() != "" {
- spanAttr[conventions.AttributeOtelScopeVersion] = spans.ScopeSpans().At(j).Scope().Version()
- }
- status := r.Status()
- eventTimes, eventNames, eventAttrs := convertEvents(r.Events())
- linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links())
- // rpc.method
- // http.status_code/http.method/http.url
- // src ip, src port, net.peer.name, net.peer.port
- // db.statement, db.connection_string
- // container.id
- appAlias := getStringFromMapWithDefault(resAttr, "process.command_line", "UNSET")
- if appAlias != "UNSET" {
- pattern := `-DAPP_NAME=(\w+)`
- match := regexp.MustCompile(pattern).FindStringSubmatch(appAlias)
- if len(match) > 1 {
- appAlias = match[1]
- } else {
- appAlias = "UNSET"
- }
- }
- httpCode := getIntFromMapWithDefault(spanAttr, "http.status_code", -1)
- httpMethod := getStringFromMapWithDefault(spanAttr, "http.method", "UNSET")
- var httpURL string
- if r.Kind() == ptrace.SpanKindServer {
- httpURL = getStringFromMapWithDefault(spanAttr, "http.route", "UNSET")
- } else {
- httpURL = getStringFromMapWithDefault(spanAttr, "http.url", "UNSET")
- }
- rpcMethod := getStringFromMapWithDefault(spanAttr, "rpc.method", "UNSET")
- srcIP := "UNSET"
- var srcPort int32 = -1
- targetIP, targetPort := getStringFromMapWithDefault(spanAttr, "net.peer.name", "UNSET"),
- getIntFromMapWithDefault(spanAttr, "net.peer.port", -1)
- dbStatement, dbConnectionString := getStringFromMapWithDefault(spanAttr, "db.statement", "UNSET"),
- getStringFromMapWithDefault(spanAttr, "db.connection_string", "UNSET")
- containerID := getStringFromMapWithDefault(resAttr, "container.id", "UNSET")
- _, err = statement.ExecContext(ctx,
- r.StartTimestamp().AsTime(),
- traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
- traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
- traceutil.SpanIDToHexOrEmptyString(r.ParentSpanID()),
- r.TraceState().AsRaw(),
- r.Name(),
- traceutil.SpanKindStr(r.Kind()),
- serviceName,
- resAttr,
- scopeName,
- scopeVersion,
- spanAttr,
- r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds(),
- traceutil.StatusCodeStr(status.Code()),
- status.Message(),
- eventTimes,
- eventNames,
- eventAttrs,
- linksTraceIDs,
- linksSpanIDs,
- linksTraceStates,
- linksAttrs,
- httpCode, httpMethod, httpURL, containerID,
- srcIP, srcPort, targetIP, targetPort,
- rpcMethod, dbStatement, dbConnectionString, appAlias,
- )
- if err != nil {
- return fmt.Errorf("ExecContext:%w, sql:%s", err, e.insertSQL)
- }
- }
- }
- }
- return nil
- })
- duration := time.Since(start)
- e.logger.Info("insert traces", zap.Int("records", td.SpanCount()),
- zap.String("cost", duration.String()))
- return err
- }
- func getIntFromMapWithDefault(m map[string]string, key string, de int32) int32 {
- s := getStringFromMapWithDefault(m, key, "")
- i, err := strconv.ParseInt(s, 10, 64)
- if err != nil {
- return de
- }
- return int32(i)
- }
- func getStringFromMapWithDefault(m map[string]string, key string, de string) string {
- value, find := m[key]
- if find {
- return value
- }
- return de
- }
- func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) {
- var (
- times []time.Time
- names []string
- attrs []map[string]string
- )
- for i := 0; i < events.Len(); i++ {
- event := events.At(i)
- times = append(times, event.Timestamp().AsTime())
- names = append(names, event.Name())
- attrs = append(attrs, attributesToMap(event.Attributes()))
- }
- return times, names, attrs
- }
- func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) {
- var (
- traceIDs []string
- spanIDs []string
- states []string
- attrs []map[string]string
- )
- for i := 0; i < links.Len(); i++ {
- link := links.At(i)
- traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID()))
- spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID()))
- states = append(states, link.TraceState().AsRaw())
- attrs = append(attrs, attributesToMap(link.Attributes()))
- }
- return traceIDs, spanIDs, states, attrs
- }
- const (
- // language=ClickHouse SQL
- createTracesTableSQL = `
- CREATE TABLE IF NOT EXISTS %s (
- Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
- TraceId String CODEC(ZSTD(1)),
- SpanId String CODEC(ZSTD(1)),
- ParentSpanId String CODEC(ZSTD(1)),
- TraceState String CODEC(ZSTD(1)),
- SpanName LowCardinality(String) CODEC(ZSTD(1)),
- SpanKind LowCardinality(String) CODEC(ZSTD(1)),
- ServiceName LowCardinality(String) CODEC(ZSTD(1)),
- ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
- ScopeName String CODEC(ZSTD(1)),
- ScopeVersion String CODEC(ZSTD(1)),
- SpanAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
- Duration Int64 CODEC(ZSTD(1)),
- StatusCode LowCardinality(String) CODEC(ZSTD(1)),
- StatusMessage String CODEC(ZSTD(1)),
- Events Nested (
- Timestamp DateTime64(9),
- Name LowCardinality(String),
- Attributes Map(LowCardinality(String), String)
- ) CODEC(ZSTD(1)),
- Links Nested (
- TraceId String,
- SpanId String,
- TraceState String,
- Attributes Map(LowCardinality(String), String)
- ) CODEC(ZSTD(1)),
- HttpCode Int,
- HttpMethod String CODEC(ZSTD(1)),
- HttpURL String CODEC(ZSTD(1)),
- ContainerId String CODEC(ZSTD(1)),
- srcIP String CODEC(ZSTD(1)),
- srcPort Int CODEC(ZSTD(1)),
- targetIP String CODEC(ZSTD(1)),
- targetPort Int CODEC(ZSTD(1)),
- rpcMethod String CODEC(ZSTD(1)),
- dbStatement String CODEC(ZSTD(1)),
- dbConnectionString String CODEC(ZSTD(1)),
- INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
- INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
- INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
- INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
- INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
- INDEX idx_duration Duration TYPE minmax GRANULARITY 1
- ) ENGINE MergeTree()
- %s
- PARTITION BY toDate(Timestamp)
- ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId)
- SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
- `
- // language=ClickHouse SQL
- insertTracesSQLTemplate = `INSERT INTO %s (
- Timestamp,
- TraceId,
- SpanId,
- ParentSpanId,
- TraceState,
- SpanName,
- SpanKind,
- ServiceName,
- ResourceAttributes,
- ScopeName,
- ScopeVersion,
- SpanAttributes,
- Duration,
- StatusCode,
- StatusMessage,
- Events.Timestamp,
- Events.Name,
- Events.Attributes,
- Links.TraceId,
- Links.SpanId,
- Links.TraceState,
- Links.Attributes,
- HttpCode, HttpMethod, HttpURL, ContainerId,
- srcIP, srcPort, targetIP, targetPort,
- rpcMethod , dbStatement , dbConnectionString, AppAlias
- ) VALUES (
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?,
- ?, ?, ?, ?
- )`
- )
- const (
- createTraceIDTsTableSQL = `
- create table IF NOT EXISTS %s_trace_id_ts (
- TraceId String CODEC(ZSTD(1)),
- Start DateTime64(9) CODEC(Delta, ZSTD(1)),
- End DateTime64(9) CODEC(Delta, ZSTD(1)),
- INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1
- ) ENGINE MergeTree()
- %s
- ORDER BY (TraceId, toUnixTimestamp(Start))
- SETTINGS index_granularity=8192;
- `
- createTraceIDTsMaterializedViewSQL = `
- CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv
- TO %s.%s_trace_id_ts
- AS SELECT
- TraceId,
- min(Timestamp) as Start,
- max(Timestamp) as End
- FROM
- %s.%s
- WHERE TraceId!=''
- GROUP BY TraceId;
- `
- )
- func createTracesTable(ctx context.Context, cfg *Config, db *sql.DB) error {
- if _, err := db.ExecContext(ctx, renderCreateTracesTableSQL(cfg)); err != nil {
- return fmt.Errorf("exec create traces table sql: %w", err)
- }
- if _, err := db.ExecContext(ctx, renderCreateTraceIDTsTableSQL(cfg)); err != nil {
- return fmt.Errorf("exec create traceIDTs table sql: %w", err)
- }
- if _, err := db.ExecContext(ctx, renderTraceIDTsMaterializedViewSQL(cfg)); err != nil {
- return fmt.Errorf("exec create traceIDTs view sql: %w", err)
- }
- return nil
- }
- func renderInsertTracesSQL(cfg *Config) string {
- return fmt.Sprintf(strings.ReplaceAll(insertTracesSQLTemplate, "'", "`"), cfg.TracesTableName)
- }
- func renderCreateTracesTableSQL(cfg *Config) string {
- ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL)
- return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, ttlExpr)
- }
- func renderCreateTraceIDTsTableSQL(cfg *Config) string {
- ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL)
- return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, ttlExpr)
- }
- func renderTraceIDTsMaterializedViewSQL(cfg *Config) string {
- return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName,
- cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName)
- }
|