emf_exporter.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "strings"
  9. "sync"
  10. "github.com/aws/aws-sdk-go/aws/awserr"
  11. "github.com/google/uuid"
  12. "go.opentelemetry.io/collector/consumer/consumererror"
  13. "go.opentelemetry.io/collector/exporter"
  14. "go.opentelemetry.io/collector/pdata/pcommon"
  15. "go.opentelemetry.io/collector/pdata/pmetric"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
  19. )
  20. const (
  21. // OutputDestination Options
  22. outputDestinationCloudWatch = "cloudwatch"
  23. outputDestinationStdout = "stdout"
  24. )
  25. type emfExporter struct {
  26. pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
  27. svcStructuredLog *cwlogs.Client
  28. config *Config
  29. metricTranslator metricTranslator
  30. pusherMapLock sync.Mutex
  31. retryCnt int
  32. collectorID string
  33. }
  34. // newEmfExporter creates a new exporter using exporterhelper
  35. func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, error) {
  36. if config == nil {
  37. return nil, errors.New("emf exporter config is nil")
  38. }
  39. config.logger = set.Logger
  40. // create AWS session
  41. awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings)
  42. if err != nil {
  43. return nil, err
  44. }
  45. // create CWLogs client with aws session config
  46. svcStructuredLog := cwlogs.NewClient(set.Logger, awsConfig, set.BuildInfo, config.LogGroupName, config.LogRetention, config.Tags, session)
  47. collectorIdentifier, err := uuid.NewRandom()
  48. if err != nil {
  49. return nil, err
  50. }
  51. emfExporter := &emfExporter{
  52. svcStructuredLog: svcStructuredLog,
  53. config: config,
  54. metricTranslator: newMetricTranslator(*config),
  55. retryCnt: *awsConfig.MaxRetries,
  56. collectorID: collectorIdentifier.String(),
  57. pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
  58. }
  59. config.logger.Warn("the default value for DimensionRollupOption will be changing to NoDimensionRollup" +
  60. "in a future release. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23997 for more" +
  61. "information")
  62. return emfExporter, nil
  63. }
  64. func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
  65. rms := md.ResourceMetrics()
  66. labels := map[string]string{}
  67. for i := 0; i < rms.Len(); i++ {
  68. rm := rms.At(i)
  69. am := rm.Resource().Attributes()
  70. if am.Len() > 0 {
  71. am.Range(func(k string, v pcommon.Value) bool {
  72. labels[k] = v.Str()
  73. return true
  74. })
  75. }
  76. }
  77. emf.config.logger.Debug("Start processing resource metrics", zap.Any("labels", labels))
  78. groupedMetrics := make(map[any]*groupedMetric)
  79. defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
  80. outputDestination := emf.config.OutputDestination
  81. for i := 0; i < rms.Len(); i++ {
  82. err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, emf.config)
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. for _, groupedMetric := range groupedMetrics {
  88. putLogEvent, err := translateGroupedMetricToEmf(groupedMetric, emf.config, defaultLogStream)
  89. if err != nil {
  90. return err
  91. }
  92. // Currently we only support two options for "OutputDestination".
  93. if strings.EqualFold(outputDestination, outputDestinationStdout) {
  94. if putLogEvent != nil &&
  95. putLogEvent.InputLogEvent != nil &&
  96. putLogEvent.InputLogEvent.Message != nil {
  97. fmt.Println(*putLogEvent.InputLogEvent.Message)
  98. }
  99. } else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
  100. emfPusher := emf.getPusher(putLogEvent.StreamKey)
  101. if emfPusher != nil {
  102. returnError := emfPusher.AddLogEntry(putLogEvent)
  103. if returnError != nil {
  104. return wrapErrorIfBadRequest(returnError)
  105. }
  106. }
  107. }
  108. }
  109. if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
  110. for _, emfPusher := range emf.listPushers() {
  111. returnError := emfPusher.ForceFlush()
  112. if returnError != nil {
  113. // TODO now we only have one logPusher, so it's ok to return after first error occurred
  114. err := wrapErrorIfBadRequest(returnError)
  115. if err != nil {
  116. emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err))
  117. }
  118. return err
  119. }
  120. }
  121. }
  122. emf.config.logger.Debug("Finish processing resource metrics", zap.Any("labels", labels))
  123. return nil
  124. }
  125. func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
  126. var ok bool
  127. if _, ok = emf.pusherMap[key]; !ok {
  128. emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
  129. }
  130. return emf.pusherMap[key]
  131. }
  132. func (emf *emfExporter) listPushers() []cwlogs.Pusher {
  133. emf.pusherMapLock.Lock()
  134. defer emf.pusherMapLock.Unlock()
  135. var pushers []cwlogs.Pusher
  136. for _, pusher := range emf.pusherMap {
  137. pushers = append(pushers, pusher)
  138. }
  139. return pushers
  140. }
  141. // shutdown stops the exporter and is invoked during shutdown.
  142. func (emf *emfExporter) shutdown(_ context.Context) error {
  143. for _, emfPusher := range emf.listPushers() {
  144. returnError := emfPusher.ForceFlush()
  145. if returnError != nil {
  146. err := wrapErrorIfBadRequest(returnError)
  147. if err != nil {
  148. emf.config.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err))
  149. }
  150. }
  151. }
  152. return emf.metricTranslator.Shutdown()
  153. }
  154. func wrapErrorIfBadRequest(err error) error {
  155. var rfErr awserr.RequestFailure
  156. if errors.As(err, &rfErr) && rfErr.StatusCode() < 500 {
  157. return consumererror.NewPermanent(err)
  158. }
  159. return err
  160. }