exporter_traces.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
  4. import (
  5. "context"
  6. "database/sql"
  7. "fmt"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. "time"
  12. _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
  13. "go.opentelemetry.io/collector/component"
  14. "go.opentelemetry.io/collector/pdata/ptrace"
  15. conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
  18. )
  19. type tracesExporter struct {
  20. client *sql.DB
  21. insertSQL string
  22. logger *zap.Logger
  23. cfg *Config
  24. }
  25. func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) {
  26. client, err := newClickhouseClient(cfg)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &tracesExporter{
  31. client: client,
  32. insertSQL: renderInsertTracesSQL(cfg),
  33. logger: logger,
  34. cfg: cfg,
  35. }, nil
  36. }
  37. func (e *tracesExporter) start(ctx context.Context, _ component.Host) error {
  38. //if err := createDatabase(ctx, e.cfg); err != nil {
  39. // return err
  40. //}
  41. //if err := createTracesTable(ctx, e.cfg, e.client); err != nil {
  42. // return err
  43. //}
  44. return nil
  45. }
  46. // shutdown will shut down the exporter.
  47. func (e *tracesExporter) shutdown(_ context.Context) error {
  48. if e.client != nil {
  49. return e.client.Close()
  50. }
  51. return nil
  52. }
  53. func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
  54. start := time.Now()
  55. err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
  56. statement, err := tx.PrepareContext(ctx, e.insertSQL)
  57. if err != nil {
  58. return fmt.Errorf("PrepareContext:%w", err)
  59. }
  60. defer func() {
  61. _ = statement.Close()
  62. }()
  63. for i := 0; i < td.ResourceSpans().Len(); i++ {
  64. spans := td.ResourceSpans().At(i)
  65. res := spans.Resource()
  66. resAttr := attributesToMap(res.Attributes())
  67. var serviceName string
  68. if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
  69. serviceName = v.Str()
  70. }
  71. for j := 0; j < spans.ScopeSpans().Len(); j++ {
  72. rs := spans.ScopeSpans().At(j).Spans()
  73. scopeName := spans.ScopeSpans().At(j).Scope().Name()
  74. scopeVersion := spans.ScopeSpans().At(j).Scope().Version()
  75. for k := 0; k < rs.Len(); k++ {
  76. r := rs.At(k)
  77. spanAttr := attributesToMap(r.Attributes())
  78. if spans.ScopeSpans().At(j).Scope().Name() != "" {
  79. spanAttr[conventions.AttributeOtelScopeName] = spans.ScopeSpans().At(j).Scope().Name()
  80. }
  81. if spans.ScopeSpans().At(j).Scope().Version() != "" {
  82. spanAttr[conventions.AttributeOtelScopeVersion] = spans.ScopeSpans().At(j).Scope().Version()
  83. }
  84. status := r.Status()
  85. eventTimes, eventNames, eventAttrs := convertEvents(r.Events())
  86. linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links())
  87. // rpc.method
  88. // http.status_code/http.method/http.url
  89. // src ip, src port, net.peer.name, net.peer.port
  90. // db.statement, db.connection_string
  91. // container.id
  92. appAlias := getStringFromMapWithDefault(resAttr, "process.command_line", "UNSET")
  93. if appAlias != "UNSET" {
  94. pattern := `-DAPP_NAME=(\w+)`
  95. match := regexp.MustCompile(pattern).FindStringSubmatch(appAlias)
  96. if len(match) > 1 {
  97. appAlias = match[1]
  98. } else {
  99. appAlias = "UNSET"
  100. }
  101. }
  102. httpCode := getIntFromMapWithDefault(spanAttr, "http.status_code", -1)
  103. httpMethod := getStringFromMapWithDefault(spanAttr, "http.method", "UNSET")
  104. var httpURL string
  105. if r.Kind() == ptrace.SpanKindServer {
  106. httpURL = getStringFromMapWithDefault(spanAttr, "http.route", "UNSET")
  107. } else {
  108. httpURL = getStringFromMapWithDefault(spanAttr, "http.url", "UNSET")
  109. }
  110. rpcMethod := getStringFromMapWithDefault(spanAttr, "rpc.method", "UNSET")
  111. srcIP := "UNSET"
  112. var srcPort int32 = -1
  113. targetIP, targetPort := getStringFromMapWithDefault(spanAttr, "net.peer.name", "UNSET"),
  114. getIntFromMapWithDefault(spanAttr, "net.peer.port", -1)
  115. dbStatement, dbConnectionString := getStringFromMapWithDefault(spanAttr, "db.statement", "UNSET"),
  116. getStringFromMapWithDefault(spanAttr, "db.connection_string", "UNSET")
  117. containerID := getStringFromMapWithDefault(resAttr, "container.id", "UNSET")
  118. _, err = statement.ExecContext(ctx,
  119. r.StartTimestamp().AsTime(),
  120. traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
  121. traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
  122. traceutil.SpanIDToHexOrEmptyString(r.ParentSpanID()),
  123. r.TraceState().AsRaw(),
  124. r.Name(),
  125. traceutil.SpanKindStr(r.Kind()),
  126. serviceName,
  127. resAttr,
  128. scopeName,
  129. scopeVersion,
  130. spanAttr,
  131. r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds(),
  132. traceutil.StatusCodeStr(status.Code()),
  133. status.Message(),
  134. eventTimes,
  135. eventNames,
  136. eventAttrs,
  137. linksTraceIDs,
  138. linksSpanIDs,
  139. linksTraceStates,
  140. linksAttrs,
  141. httpCode, httpMethod, httpURL, containerID,
  142. srcIP, srcPort, targetIP, targetPort,
  143. rpcMethod, dbStatement, dbConnectionString, appAlias,
  144. )
  145. if err != nil {
  146. return fmt.Errorf("ExecContext:%w, sql:%s", err, e.insertSQL)
  147. }
  148. }
  149. }
  150. }
  151. return nil
  152. })
  153. duration := time.Since(start)
  154. e.logger.Info("insert traces", zap.Int("records", td.SpanCount()),
  155. zap.String("cost", duration.String()))
  156. return err
  157. }
  158. func getIntFromMapWithDefault(m map[string]string, key string, de int32) int32 {
  159. s := getStringFromMapWithDefault(m, key, "")
  160. i, err := strconv.ParseInt(s, 10, 64)
  161. if err != nil {
  162. return de
  163. }
  164. return int32(i)
  165. }
  166. func getStringFromMapWithDefault(m map[string]string, key string, de string) string {
  167. value, find := m[key]
  168. if find {
  169. return value
  170. }
  171. return de
  172. }
  173. func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) {
  174. var (
  175. times []time.Time
  176. names []string
  177. attrs []map[string]string
  178. )
  179. for i := 0; i < events.Len(); i++ {
  180. event := events.At(i)
  181. times = append(times, event.Timestamp().AsTime())
  182. names = append(names, event.Name())
  183. attrs = append(attrs, attributesToMap(event.Attributes()))
  184. }
  185. return times, names, attrs
  186. }
  187. func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) {
  188. var (
  189. traceIDs []string
  190. spanIDs []string
  191. states []string
  192. attrs []map[string]string
  193. )
  194. for i := 0; i < links.Len(); i++ {
  195. link := links.At(i)
  196. traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID()))
  197. spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID()))
  198. states = append(states, link.TraceState().AsRaw())
  199. attrs = append(attrs, attributesToMap(link.Attributes()))
  200. }
  201. return traceIDs, spanIDs, states, attrs
  202. }
  203. const (
  204. // language=ClickHouse SQL
  205. createTracesTableSQL = `
  206. CREATE TABLE IF NOT EXISTS %s (
  207. Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
  208. TraceId String CODEC(ZSTD(1)),
  209. SpanId String CODEC(ZSTD(1)),
  210. ParentSpanId String CODEC(ZSTD(1)),
  211. TraceState String CODEC(ZSTD(1)),
  212. SpanName LowCardinality(String) CODEC(ZSTD(1)),
  213. SpanKind LowCardinality(String) CODEC(ZSTD(1)),
  214. ServiceName LowCardinality(String) CODEC(ZSTD(1)),
  215. ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
  216. ScopeName String CODEC(ZSTD(1)),
  217. ScopeVersion String CODEC(ZSTD(1)),
  218. SpanAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
  219. Duration Int64 CODEC(ZSTD(1)),
  220. StatusCode LowCardinality(String) CODEC(ZSTD(1)),
  221. StatusMessage String CODEC(ZSTD(1)),
  222. Events Nested (
  223. Timestamp DateTime64(9),
  224. Name LowCardinality(String),
  225. Attributes Map(LowCardinality(String), String)
  226. ) CODEC(ZSTD(1)),
  227. Links Nested (
  228. TraceId String,
  229. SpanId String,
  230. TraceState String,
  231. Attributes Map(LowCardinality(String), String)
  232. ) CODEC(ZSTD(1)),
  233. HttpCode Int,
  234. HttpMethod String CODEC(ZSTD(1)),
  235. HttpURL String CODEC(ZSTD(1)),
  236. ContainerId String CODEC(ZSTD(1)),
  237. srcIP String CODEC(ZSTD(1)),
  238. srcPort Int CODEC(ZSTD(1)),
  239. targetIP String CODEC(ZSTD(1)),
  240. targetPort Int CODEC(ZSTD(1)),
  241. rpcMethod String CODEC(ZSTD(1)),
  242. dbStatement String CODEC(ZSTD(1)),
  243. dbConnectionString String CODEC(ZSTD(1)),
  244. INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
  245. INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
  246. INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
  247. INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
  248. INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
  249. INDEX idx_duration Duration TYPE minmax GRANULARITY 1
  250. ) ENGINE MergeTree()
  251. %s
  252. PARTITION BY toDate(Timestamp)
  253. ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId)
  254. SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
  255. `
  256. // language=ClickHouse SQL
  257. insertTracesSQLTemplate = `INSERT INTO %s (
  258. Timestamp,
  259. TraceId,
  260. SpanId,
  261. ParentSpanId,
  262. TraceState,
  263. SpanName,
  264. SpanKind,
  265. ServiceName,
  266. ResourceAttributes,
  267. ScopeName,
  268. ScopeVersion,
  269. SpanAttributes,
  270. Duration,
  271. StatusCode,
  272. StatusMessage,
  273. Events.Timestamp,
  274. Events.Name,
  275. Events.Attributes,
  276. Links.TraceId,
  277. Links.SpanId,
  278. Links.TraceState,
  279. Links.Attributes,
  280. HttpCode, HttpMethod, HttpURL, ContainerId,
  281. srcIP, srcPort, targetIP, targetPort,
  282. rpcMethod , dbStatement , dbConnectionString, AppAlias
  283. ) VALUES (
  284. ?, ?, ?, ?, ?,
  285. ?, ?, ?, ?, ?,
  286. ?, ?, ?, ?, ?,
  287. ?, ?, ?, ?, ?,
  288. ?, ?, ?, ?, ?,
  289. ?, ?, ?, ?, ?,
  290. ?, ?, ?, ?
  291. )`
  292. )
  293. const (
  294. createTraceIDTsTableSQL = `
  295. create table IF NOT EXISTS %s_trace_id_ts (
  296. TraceId String CODEC(ZSTD(1)),
  297. Start DateTime64(9) CODEC(Delta, ZSTD(1)),
  298. End DateTime64(9) CODEC(Delta, ZSTD(1)),
  299. INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1
  300. ) ENGINE MergeTree()
  301. %s
  302. ORDER BY (TraceId, toUnixTimestamp(Start))
  303. SETTINGS index_granularity=8192;
  304. `
  305. createTraceIDTsMaterializedViewSQL = `
  306. CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv
  307. TO %s.%s_trace_id_ts
  308. AS SELECT
  309. TraceId,
  310. min(Timestamp) as Start,
  311. max(Timestamp) as End
  312. FROM
  313. %s.%s
  314. WHERE TraceId!=''
  315. GROUP BY TraceId;
  316. `
  317. )
  318. func createTracesTable(ctx context.Context, cfg *Config, db *sql.DB) error {
  319. if _, err := db.ExecContext(ctx, renderCreateTracesTableSQL(cfg)); err != nil {
  320. return fmt.Errorf("exec create traces table sql: %w", err)
  321. }
  322. if _, err := db.ExecContext(ctx, renderCreateTraceIDTsTableSQL(cfg)); err != nil {
  323. return fmt.Errorf("exec create traceIDTs table sql: %w", err)
  324. }
  325. if _, err := db.ExecContext(ctx, renderTraceIDTsMaterializedViewSQL(cfg)); err != nil {
  326. return fmt.Errorf("exec create traceIDTs view sql: %w", err)
  327. }
  328. return nil
  329. }
  330. func renderInsertTracesSQL(cfg *Config) string {
  331. return fmt.Sprintf(strings.ReplaceAll(insertTracesSQLTemplate, "'", "`"), cfg.TracesTableName)
  332. }
  333. func renderCreateTracesTableSQL(cfg *Config) string {
  334. ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL)
  335. return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, ttlExpr)
  336. }
  337. func renderCreateTraceIDTsTableSQL(cfg *Config) string {
  338. ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL)
  339. return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, ttlExpr)
  340. }
  341. func renderTraceIDTsMaterializedViewSQL(cfg *Config) string {
  342. return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName,
  343. cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName)
  344. }