sso_log_exporter.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package opensearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter"
  4. import (
  5. "context"
  6. "strings"
  7. "github.com/opensearch-project/opensearch-go/v2"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/config/confighttp"
  10. "go.opentelemetry.io/collector/exporter"
  11. "go.opentelemetry.io/collector/pdata/plog"
  12. )
  13. type logExporter struct {
  14. client *opensearch.Client
  15. Index string
  16. bulkAction string
  17. model mappingModel
  18. httpSettings confighttp.HTTPClientSettings
  19. telemetry component.TelemetrySettings
  20. }
  21. func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, error) {
  22. if err := cfg.Validate(); err != nil {
  23. return nil, err
  24. }
  25. model := &encodeModel{
  26. dedup: cfg.Dedup,
  27. dedot: cfg.Dedot,
  28. sso: cfg.MappingsSettings.Mode == MappingSS4O.String(),
  29. flattenAttributes: cfg.MappingsSettings.Mode == MappingFlattenAttributes.String(),
  30. timestampField: cfg.MappingsSettings.TimestampField,
  31. unixTime: cfg.MappingsSettings.UnixTimestamp,
  32. dataset: cfg.Dataset,
  33. namespace: cfg.Namespace,
  34. }
  35. return &logExporter{
  36. telemetry: set.TelemetrySettings,
  37. Index: getIndexName(cfg.Dataset, cfg.Namespace, cfg.LogsIndex),
  38. bulkAction: cfg.BulkAction,
  39. httpSettings: cfg.HTTPClientSettings,
  40. model: model,
  41. }, nil
  42. }
  43. func (l *logExporter) Start(_ context.Context, host component.Host) error {
  44. httpClient, err := l.httpSettings.ToClient(host, l.telemetry)
  45. if err != nil {
  46. return err
  47. }
  48. client, err := newOpenSearchClient(l.httpSettings.Endpoint, httpClient, l.telemetry.Logger)
  49. if err != nil {
  50. return err
  51. }
  52. l.client = client
  53. return nil
  54. }
  55. func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
  56. indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model)
  57. startErr := indexer.start(l.client)
  58. if startErr != nil {
  59. return startErr
  60. }
  61. indexer.submit(ctx, ld)
  62. indexer.close(ctx)
  63. return indexer.joinedError()
  64. }
  65. func getIndexName(dataset, namespace, index string) string {
  66. if len(index) != 0 {
  67. return index
  68. }
  69. return strings.Join([]string{"ss4o_logs", dataset, namespace}, "-")
  70. }