// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter" import ( "context" "database/sql" "fmt" "regexp" "strconv" "strings" "time" _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.18.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" ) type tracesExporter struct { client *sql.DB insertSQL string logger *zap.Logger cfg *Config } func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { client, err := newClickhouseClient(cfg) if err != nil { return nil, err } return &tracesExporter{ client: client, insertSQL: renderInsertTracesSQL(cfg), logger: logger, cfg: cfg, }, nil } func (e *tracesExporter) start(ctx context.Context, _ component.Host) error { //if err := createDatabase(ctx, e.cfg); err != nil { // return err //} //if err := createTracesTable(ctx, e.cfg, e.client); err != nil { // return err //} return nil } // shutdown will shut down the exporter. func (e *tracesExporter) shutdown(_ context.Context) error { if e.client != nil { return e.client.Close() } return nil } func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { start := time.Now() err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { statement, err := tx.PrepareContext(ctx, e.insertSQL) if err != nil { return fmt.Errorf("PrepareContext:%w", err) } defer func() { _ = statement.Close() }() for i := 0; i < td.ResourceSpans().Len(); i++ { spans := td.ResourceSpans().At(i) res := spans.Resource() resAttr := attributesToMap(res.Attributes()) var serviceName string if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() } for j := 0; j < spans.ScopeSpans().Len(); j++ { rs := spans.ScopeSpans().At(j).Spans() scopeName := spans.ScopeSpans().At(j).Scope().Name() scopeVersion := spans.ScopeSpans().At(j).Scope().Version() for k := 0; k < rs.Len(); k++ { r := rs.At(k) spanAttr := attributesToMap(r.Attributes()) if spans.ScopeSpans().At(j).Scope().Name() != "" { spanAttr[conventions.AttributeOtelScopeName] = spans.ScopeSpans().At(j).Scope().Name() } if spans.ScopeSpans().At(j).Scope().Version() != "" { spanAttr[conventions.AttributeOtelScopeVersion] = spans.ScopeSpans().At(j).Scope().Version() } status := r.Status() eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) // rpc.method // http.status_code/http.method/http.url // src ip, src port, net.peer.name, net.peer.port // db.statement, db.connection_string // container.id appAlias := getStringFromMapWithDefault(resAttr, "process.command_line", "UNSET") if appAlias != "UNSET" { pattern := `-DAPP_NAME=(\w+)` match := regexp.MustCompile(pattern).FindStringSubmatch(appAlias) if len(match) > 1 { appAlias = match[1] } else { appAlias = "UNSET" } } httpCode := getIntFromMapWithDefault(spanAttr, "http.status_code", -1) httpMethod := getStringFromMapWithDefault(spanAttr, "http.method", "UNSET") var httpURL string if r.Kind() == ptrace.SpanKindServer { httpURL = getStringFromMapWithDefault(spanAttr, "http.route", "UNSET") } else { httpURL = getStringFromMapWithDefault(spanAttr, "http.url", "UNSET") } rpcMethod := getStringFromMapWithDefault(spanAttr, "rpc.method", "UNSET") srcIP := "UNSET" var srcPort int32 = -1 targetIP, targetPort := getStringFromMapWithDefault(spanAttr, "net.peer.name", "UNSET"), getIntFromMapWithDefault(spanAttr, "net.peer.port", -1) dbStatement, dbConnectionString := getStringFromMapWithDefault(spanAttr, "db.statement", "UNSET"), getStringFromMapWithDefault(spanAttr, "db.connection_string", "UNSET") containerID := getStringFromMapWithDefault(resAttr, "container.id", "UNSET") _, err = statement.ExecContext(ctx, r.StartTimestamp().AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), traceutil.SpanIDToHexOrEmptyString(r.SpanID()), traceutil.SpanIDToHexOrEmptyString(r.ParentSpanID()), r.TraceState().AsRaw(), r.Name(), traceutil.SpanKindStr(r.Kind()), serviceName, resAttr, scopeName, scopeVersion, spanAttr, r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds(), traceutil.StatusCodeStr(status.Code()), status.Message(), eventTimes, eventNames, eventAttrs, linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs, httpCode, httpMethod, httpURL, containerID, srcIP, srcPort, targetIP, targetPort, rpcMethod, dbStatement, dbConnectionString, appAlias, ) if err != nil { return fmt.Errorf("ExecContext:%w, sql:%s", err, e.insertSQL) } } } } return nil }) duration := time.Since(start) e.logger.Info("insert traces", zap.Int("records", td.SpanCount()), zap.String("cost", duration.String())) return err } func getIntFromMapWithDefault(m map[string]string, key string, de int32) int32 { s := getStringFromMapWithDefault(m, key, "") i, err := strconv.ParseInt(s, 10, 64) if err != nil { return de } return int32(i) } func getStringFromMapWithDefault(m map[string]string, key string, de string) string { value, find := m[key] if find { return value } return de } func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { var ( times []time.Time names []string attrs []map[string]string ) for i := 0; i < events.Len(); i++ { event := events.At(i) times = append(times, event.Timestamp().AsTime()) names = append(names, event.Name()) attrs = append(attrs, attributesToMap(event.Attributes())) } return times, names, attrs } func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { var ( traceIDs []string spanIDs []string states []string attrs []map[string]string ) for i := 0; i < links.Len(); i++ { link := links.At(i) traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID())) spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID())) states = append(states, link.TraceState().AsRaw()) attrs = append(attrs, attributesToMap(link.Attributes())) } return traceIDs, spanIDs, states, attrs } const ( // language=ClickHouse SQL createTracesTableSQL = ` CREATE TABLE IF NOT EXISTS %s ( Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), TraceId String CODEC(ZSTD(1)), SpanId String CODEC(ZSTD(1)), ParentSpanId String CODEC(ZSTD(1)), TraceState String CODEC(ZSTD(1)), SpanName LowCardinality(String) CODEC(ZSTD(1)), SpanKind LowCardinality(String) CODEC(ZSTD(1)), ServiceName LowCardinality(String) CODEC(ZSTD(1)), ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), ScopeName String CODEC(ZSTD(1)), ScopeVersion String CODEC(ZSTD(1)), SpanAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), Duration Int64 CODEC(ZSTD(1)), StatusCode LowCardinality(String) CODEC(ZSTD(1)), StatusMessage String CODEC(ZSTD(1)), Events Nested ( Timestamp DateTime64(9), Name LowCardinality(String), Attributes Map(LowCardinality(String), String) ) CODEC(ZSTD(1)), Links Nested ( TraceId String, SpanId String, TraceState String, Attributes Map(LowCardinality(String), String) ) CODEC(ZSTD(1)), HttpCode Int, HttpMethod String CODEC(ZSTD(1)), HttpURL String CODEC(ZSTD(1)), ContainerId String CODEC(ZSTD(1)), srcIP String CODEC(ZSTD(1)), srcPort Int CODEC(ZSTD(1)), targetIP String CODEC(ZSTD(1)), targetPort Int CODEC(ZSTD(1)), rpcMethod String CODEC(ZSTD(1)), dbStatement String CODEC(ZSTD(1)), dbConnectionString String CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_duration Duration TYPE minmax GRANULARITY 1 ) ENGINE MergeTree() %s PARTITION BY toDate(Timestamp) ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId) SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ` // language=ClickHouse SQL insertTracesSQLTemplate = `INSERT INTO %s ( Timestamp, TraceId, SpanId, ParentSpanId, TraceState, SpanName, SpanKind, ServiceName, ResourceAttributes, ScopeName, ScopeVersion, SpanAttributes, Duration, StatusCode, StatusMessage, Events.Timestamp, Events.Name, Events.Attributes, Links.TraceId, Links.SpanId, Links.TraceState, Links.Attributes, HttpCode, HttpMethod, HttpURL, ContainerId, srcIP, srcPort, targetIP, targetPort, rpcMethod , dbStatement , dbConnectionString, AppAlias ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )` ) const ( createTraceIDTsTableSQL = ` create table IF NOT EXISTS %s_trace_id_ts ( TraceId String CODEC(ZSTD(1)), Start DateTime64(9) CODEC(Delta, ZSTD(1)), End DateTime64(9) CODEC(Delta, ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1 ) ENGINE MergeTree() %s ORDER BY (TraceId, toUnixTimestamp(Start)) SETTINGS index_granularity=8192; ` createTraceIDTsMaterializedViewSQL = ` CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv TO %s.%s_trace_id_ts AS SELECT TraceId, min(Timestamp) as Start, max(Timestamp) as End FROM %s.%s WHERE TraceId!='' GROUP BY TraceId; ` ) func createTracesTable(ctx context.Context, cfg *Config, db *sql.DB) error { if _, err := db.ExecContext(ctx, renderCreateTracesTableSQL(cfg)); err != nil { return fmt.Errorf("exec create traces table sql: %w", err) } if _, err := db.ExecContext(ctx, renderCreateTraceIDTsTableSQL(cfg)); err != nil { return fmt.Errorf("exec create traceIDTs table sql: %w", err) } if _, err := db.ExecContext(ctx, renderTraceIDTsMaterializedViewSQL(cfg)); err != nil { return fmt.Errorf("exec create traceIDTs view sql: %w", err) } return nil } func renderInsertTracesSQL(cfg *Config) string { return fmt.Sprintf(strings.ReplaceAll(insertTracesSQLTemplate, "'", "`"), cfg.TracesTableName) } func renderCreateTracesTableSQL(cfg *Config) string { ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL) return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, ttlExpr) } func renderCreateTraceIDTsTableSQL(cfg *Config) string { ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL) return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, ttlExpr) } func renderTraceIDTsMaterializedViewSQL(cfg *Config) string { return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName, cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName) }