factory.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:generate mdatagen metadata.yaml
  4. package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
  5. import (
  6. "context"
  7. "fmt"
  8. "time"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/exporter"
  11. "go.opentelemetry.io/collector/exporter/exporterhelper"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/metadata"
  13. )
  14. // NewFactory creates a factory for Elastic exporter.
  15. func NewFactory() exporter.Factory {
  16. return exporter.NewFactory(
  17. metadata.Type,
  18. createDefaultConfig,
  19. exporter.WithLogs(createLogsExporter, metadata.LogsStability),
  20. exporter.WithTraces(createTracesExporter, metadata.TracesStability),
  21. exporter.WithMetrics(createMetricExporter, metadata.MetricsStability),
  22. )
  23. }
  24. func createDefaultConfig() component.Config {
  25. queueSettings := exporterhelper.NewDefaultQueueSettings()
  26. queueSettings.NumConsumers = 1
  27. return &Config{
  28. TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
  29. QueueSettings: queueSettings,
  30. RetrySettings: exporterhelper.NewDefaultRetrySettings(),
  31. ConnectionParams: map[string]string{},
  32. Database: defaultDatabase,
  33. LogsTableName: "otel_logs",
  34. TracesTableName: "otel_traces",
  35. MetricsTableName: "otel_metrics",
  36. TTL: 0,
  37. }
  38. }
  39. // createLogsExporter creates a new exporter for logs.
  40. // Logs are directly insert into clickhouse.
  41. func createLogsExporter(
  42. ctx context.Context,
  43. set exporter.CreateSettings,
  44. cfg component.Config,
  45. ) (exporter.Logs, error) {
  46. c := cfg.(*Config)
  47. exporter, err := newLogsExporter(set.Logger, c)
  48. if err != nil {
  49. return nil, fmt.Errorf("cannot configure clickhouse logs exporter: %w", err)
  50. }
  51. return exporterhelper.NewLogsExporter(
  52. ctx,
  53. set,
  54. cfg,
  55. exporter.pushLogsData,
  56. exporterhelper.WithStart(exporter.start),
  57. exporterhelper.WithShutdown(exporter.shutdown),
  58. exporterhelper.WithTimeout(c.TimeoutSettings),
  59. exporterhelper.WithQueue(c.QueueSettings),
  60. exporterhelper.WithRetry(c.RetrySettings),
  61. )
  62. }
  63. // createTracesExporter creates a new exporter for traces.
  64. // Traces are directly insert into clickhouse.
  65. func createTracesExporter(
  66. ctx context.Context,
  67. set exporter.CreateSettings,
  68. cfg component.Config,
  69. ) (exporter.Traces, error) {
  70. c := cfg.(*Config)
  71. exporter, err := newTracesExporter(set.Logger, c)
  72. if err != nil {
  73. return nil, fmt.Errorf("cannot configure clickhouse traces exporter: %w", err)
  74. }
  75. return exporterhelper.NewTracesExporter(
  76. ctx,
  77. set,
  78. cfg,
  79. exporter.pushTraceData,
  80. exporterhelper.WithStart(exporter.start),
  81. exporterhelper.WithShutdown(exporter.shutdown),
  82. exporterhelper.WithTimeout(c.TimeoutSettings),
  83. exporterhelper.WithQueue(c.QueueSettings),
  84. exporterhelper.WithRetry(c.RetrySettings),
  85. )
  86. }
  87. func createMetricExporter(
  88. ctx context.Context,
  89. set exporter.CreateSettings,
  90. cfg component.Config,
  91. ) (exporter.Metrics, error) {
  92. c := cfg.(*Config)
  93. exporter, err := newMetricsExporter(set.Logger, c)
  94. if err != nil {
  95. return nil, fmt.Errorf("cannot configure clickhouse metrics exporter: %w", err)
  96. }
  97. return exporterhelper.NewMetricsExporter(
  98. ctx,
  99. set,
  100. cfg,
  101. exporter.pushMetricsData,
  102. exporterhelper.WithStart(exporter.start),
  103. exporterhelper.WithShutdown(exporter.shutdown),
  104. exporterhelper.WithTimeout(c.TimeoutSettings),
  105. exporterhelper.WithQueue(c.QueueSettings),
  106. exporterhelper.WithRetry(c.RetrySettings),
  107. )
  108. }
  109. func generateTTLExpr(ttlDays uint, ttl time.Duration) string {
  110. if ttlDays > 0 {
  111. return fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, ttlDays)
  112. }
  113. if ttl > 0 {
  114. switch {
  115. case ttl%(24*time.Hour) == 0:
  116. return fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, ttl/(24*time.Hour))
  117. case ttl%(time.Hour) == 0:
  118. return fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalHour(%d)`, ttl/time.Hour)
  119. case ttl%(time.Minute) == 0:
  120. return fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalMinute(%d)`, ttl/time.Minute)
  121. default:
  122. return fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalSecond(%d)`, ttl/time.Second)
  123. }
  124. }
  125. return ""
  126. }