exporter.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter"
  4. import (
  5. "context"
  6. "crypto/tls"
  7. "fmt"
  8. "strings"
  9. "go.opentelemetry.io/collector/consumer/consumererror"
  10. "go.opentelemetry.io/collector/exporter"
  11. "go.opentelemetry.io/collector/exporter/exporterhelper"
  12. "go.opentelemetry.io/collector/pdata/plog"
  13. "go.uber.org/multierr"
  14. "go.uber.org/zap"
  15. )
  16. type syslogexporter struct {
  17. config *Config
  18. logger *zap.Logger
  19. tlsConfig *tls.Config
  20. formatter formatter
  21. }
  22. func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*syslogexporter, error) {
  23. tlsConfig, err := cfg.TLSSetting.LoadTLSConfig()
  24. if err != nil {
  25. return nil, err
  26. }
  27. cfg.Network = strings.ToLower(cfg.Network)
  28. s := &syslogexporter{
  29. config: cfg,
  30. logger: createSettings.Logger,
  31. tlsConfig: tlsConfig,
  32. formatter: createFormatter(cfg.Protocol),
  33. }
  34. s.logger.Info("Syslog Exporter configured",
  35. zap.String("endpoint", cfg.Endpoint),
  36. zap.String("protocol", cfg.Protocol),
  37. zap.Int("port", cfg.Port),
  38. )
  39. return s, nil
  40. }
  41. func newLogsExporter(
  42. ctx context.Context,
  43. params exporter.CreateSettings,
  44. cfg *Config,
  45. ) (exporter.Logs, error) {
  46. s, err := initExporter(cfg, params)
  47. if err != nil {
  48. return nil, fmt.Errorf("failed to initialize the logs exporter: %w", err)
  49. }
  50. return exporterhelper.NewLogsExporter(
  51. ctx,
  52. params,
  53. cfg,
  54. s.pushLogsData,
  55. exporterhelper.WithTimeout(cfg.TimeoutSettings),
  56. exporterhelper.WithRetry(cfg.RetrySettings),
  57. exporterhelper.WithQueue(cfg.QueueSettings),
  58. )
  59. }
  60. func (se *syslogexporter) pushLogsData(_ context.Context, logs plog.Logs) error {
  61. batchMessages := strings.ToLower(se.config.Network) == "tcp"
  62. var err error
  63. if batchMessages {
  64. err = se.exportBatch(logs)
  65. } else {
  66. err = se.exportNonBatch(logs)
  67. }
  68. return err
  69. }
  70. func (se *syslogexporter) exportBatch(logs plog.Logs) error {
  71. var payload strings.Builder
  72. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  73. resourceLogs := logs.ResourceLogs().At(i)
  74. for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
  75. scopeLogs := resourceLogs.ScopeLogs().At(j)
  76. for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
  77. logRecord := scopeLogs.LogRecords().At(k)
  78. formatted := se.formatter.format(logRecord)
  79. payload.WriteString(formatted)
  80. }
  81. }
  82. }
  83. if payload.Len() > 0 {
  84. sender, err := connect(se.logger, se.config, se.tlsConfig)
  85. if err != nil {
  86. return consumererror.NewLogs(err, logs)
  87. }
  88. defer sender.close()
  89. err = sender.Write(payload.String())
  90. if err != nil {
  91. return consumererror.NewLogs(err, logs)
  92. }
  93. }
  94. return nil
  95. }
  96. func (se *syslogexporter) exportNonBatch(logs plog.Logs) error {
  97. sender, err := connect(se.logger, se.config, se.tlsConfig)
  98. if err != nil {
  99. return consumererror.NewLogs(err, logs)
  100. }
  101. defer sender.close()
  102. errs := []error{}
  103. droppedLogs := plog.NewLogs()
  104. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  105. resourceLogs := logs.ResourceLogs().At(i)
  106. droppedResourceLogs := droppedLogs.ResourceLogs().AppendEmpty()
  107. for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
  108. scopeLogs := resourceLogs.ScopeLogs().At(j)
  109. droppedScopeLogs := droppedResourceLogs.ScopeLogs().AppendEmpty()
  110. for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
  111. logRecord := scopeLogs.LogRecords().At(k)
  112. formatted := se.formatter.format(logRecord)
  113. err = sender.Write(formatted)
  114. if err != nil {
  115. errs = append(errs, err)
  116. droppedLogRecord := droppedScopeLogs.LogRecords().AppendEmpty()
  117. logRecord.CopyTo(droppedLogRecord)
  118. }
  119. }
  120. }
  121. }
  122. if len(errs) > 0 {
  123. errs = deduplicateErrors(errs)
  124. return consumererror.NewLogs(multierr.Combine(errs...), droppedLogs)
  125. }
  126. return nil
  127. }