exporter_logs.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package cassandraexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/cassandraexporter"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "time"
  9. "github.com/gocql/gocql"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/pdata/plog"
  12. "go.uber.org/zap"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
  14. )
  15. type logsExporter struct {
  16. client *gocql.Session
  17. logger *zap.Logger
  18. cfg *Config
  19. }
  20. func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
  21. cluster := gocql.NewCluster(cfg.DSN)
  22. session, err := cluster.CreateSession()
  23. cluster.Keyspace = cfg.Keyspace
  24. cluster.Consistency = gocql.Quorum
  25. cluster.Port = cfg.Port
  26. cluster.Timeout = cfg.Timeout
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &logsExporter{logger: logger, client: session, cfg: cfg}, nil
  31. }
  32. func initializeLogKernel(cfg *Config) error {
  33. ctx := context.Background()
  34. cluster := gocql.NewCluster(cfg.DSN)
  35. cluster.Consistency = gocql.Quorum
  36. cluster.Port = cfg.Port
  37. cluster.Timeout = cfg.Timeout
  38. session, err := cluster.CreateSession()
  39. if err != nil {
  40. return err
  41. }
  42. defer session.Close()
  43. createDatabaseError := session.Query(parseCreateDatabaseSQL(cfg)).WithContext(ctx).Exec()
  44. if createDatabaseError != nil {
  45. return createDatabaseError
  46. }
  47. createLogTableError := session.Query(parseCreateLogTableSQL(cfg)).WithContext(ctx).Exec()
  48. if createLogTableError != nil {
  49. return createLogTableError
  50. }
  51. return nil
  52. }
  53. func (e *logsExporter) Start(_ context.Context, _ component.Host) error {
  54. initializeErr := initializeLogKernel(e.cfg)
  55. return initializeErr
  56. }
  57. func (e *logsExporter) Shutdown(_ context.Context) error {
  58. if e.client != nil {
  59. e.client.Close()
  60. }
  61. return nil
  62. }
  63. func parseCreateLogTableSQL(cfg *Config) string {
  64. return fmt.Sprintf(createLogTableSQL, cfg.Keyspace, cfg.LogsTable, cfg.Compression.Algorithm)
  65. }
  66. func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
  67. start := time.Now()
  68. for i := 0; i < ld.ResourceLogs().Len(); i++ {
  69. logs := ld.ResourceLogs().At(i)
  70. res := logs.Resource()
  71. resAttr := attributesToMap(res.Attributes().AsRaw())
  72. for j := 0; j < logs.ScopeLogs().Len(); j++ {
  73. rs := logs.ScopeLogs().At(j).LogRecords()
  74. for k := 0; k < rs.Len(); k++ {
  75. r := rs.At(k)
  76. logAttr := attributesToMap(r.Attributes().AsRaw())
  77. bodyByte, err := json.Marshal(r.Body().AsRaw())
  78. if err != nil {
  79. return err
  80. }
  81. insertLogError := e.client.Query(fmt.Sprintf(insertLogTableSQL, e.cfg.Keyspace, e.cfg.LogsTable),
  82. r.Timestamp().AsTime(),
  83. traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
  84. traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
  85. uint32(r.Flags()),
  86. r.SeverityText(),
  87. int32(r.SeverityNumber()),
  88. string(bodyByte),
  89. resAttr,
  90. logAttr,
  91. ).WithContext(ctx).Exec()
  92. if insertLogError != nil {
  93. e.logger.Error("insert log error", zap.Error(insertLogError))
  94. }
  95. }
  96. }
  97. }
  98. duration := time.Since(start)
  99. e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
  100. zap.String("cost", duration.String()))
  101. return nil
  102. }