123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package cassandraexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/cassandraexporter"
- import (
- "context"
- "encoding/json"
- "fmt"
- "time"
- "github.com/gocql/gocql"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
- )
- type logsExporter struct {
- client *gocql.Session
- logger *zap.Logger
- cfg *Config
- }
- func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
- cluster := gocql.NewCluster(cfg.DSN)
- session, err := cluster.CreateSession()
- cluster.Keyspace = cfg.Keyspace
- cluster.Consistency = gocql.Quorum
- cluster.Port = cfg.Port
- cluster.Timeout = cfg.Timeout
- if err != nil {
- return nil, err
- }
- return &logsExporter{logger: logger, client: session, cfg: cfg}, nil
- }
- func initializeLogKernel(cfg *Config) error {
- ctx := context.Background()
- cluster := gocql.NewCluster(cfg.DSN)
- cluster.Consistency = gocql.Quorum
- cluster.Port = cfg.Port
- cluster.Timeout = cfg.Timeout
- session, err := cluster.CreateSession()
- if err != nil {
- return err
- }
- defer session.Close()
- createDatabaseError := session.Query(parseCreateDatabaseSQL(cfg)).WithContext(ctx).Exec()
- if createDatabaseError != nil {
- return createDatabaseError
- }
- createLogTableError := session.Query(parseCreateLogTableSQL(cfg)).WithContext(ctx).Exec()
- if createLogTableError != nil {
- return createLogTableError
- }
- return nil
- }
- func (e *logsExporter) Start(_ context.Context, _ component.Host) error {
- initializeErr := initializeLogKernel(e.cfg)
- return initializeErr
- }
- func (e *logsExporter) Shutdown(_ context.Context) error {
- if e.client != nil {
- e.client.Close()
- }
- return nil
- }
- func parseCreateLogTableSQL(cfg *Config) string {
- return fmt.Sprintf(createLogTableSQL, cfg.Keyspace, cfg.LogsTable, cfg.Compression.Algorithm)
- }
- func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
- start := time.Now()
- for i := 0; i < ld.ResourceLogs().Len(); i++ {
- logs := ld.ResourceLogs().At(i)
- res := logs.Resource()
- resAttr := attributesToMap(res.Attributes().AsRaw())
- for j := 0; j < logs.ScopeLogs().Len(); j++ {
- rs := logs.ScopeLogs().At(j).LogRecords()
- for k := 0; k < rs.Len(); k++ {
- r := rs.At(k)
- logAttr := attributesToMap(r.Attributes().AsRaw())
- bodyByte, err := json.Marshal(r.Body().AsRaw())
- if err != nil {
- return err
- }
- insertLogError := e.client.Query(fmt.Sprintf(insertLogTableSQL, e.cfg.Keyspace, e.cfg.LogsTable),
- r.Timestamp().AsTime(),
- traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
- traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
- uint32(r.Flags()),
- r.SeverityText(),
- int32(r.SeverityNumber()),
- string(bodyByte),
- resAttr,
- logAttr,
- ).WithContext(ctx).Exec()
- if insertLogError != nil {
- e.logger.Error("insert log error", zap.Error(insertLogError))
- }
- }
- }
- }
- duration := time.Since(start)
- e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
- zap.String("cost", duration.String()))
- return nil
- }
|