eventclient.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package signalfxexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
  4. import (
  5. "context"
  6. "io"
  7. "net/http"
  8. "path"
  9. "strings"
  10. sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
  11. "go.opentelemetry.io/collector/consumer/consumererror"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/plog"
  14. "go.uber.org/zap"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  17. )
  18. // sfxEventClient sends the data to the SignalFx backend.
  19. type sfxEventClient struct {
  20. sfxClientBase
  21. logger *zap.Logger
  22. accessTokenPassthrough bool
  23. }
  24. func (s *sfxEventClient) pushLogsData(ctx context.Context, ld plog.Logs) (int, error) {
  25. rls := ld.ResourceLogs()
  26. if rls.Len() == 0 {
  27. return 0, nil
  28. }
  29. accessToken := s.retrieveAccessToken(rls.At(0))
  30. var sfxEvents []*sfxpb.Event
  31. numDroppedLogRecords := 0
  32. for i := 0; i < rls.Len(); i++ {
  33. rl := rls.At(i)
  34. ills := rl.ScopeLogs()
  35. for j := 0; j < ills.Len(); j++ {
  36. sl := ills.At(j)
  37. events, dropped := translation.LogRecordSliceToSignalFxV2(s.logger, sl.LogRecords(), rl.Resource().Attributes())
  38. sfxEvents = append(sfxEvents, events...)
  39. numDroppedLogRecords += dropped
  40. }
  41. }
  42. body, compressed, err := s.encodeBody(sfxEvents)
  43. if err != nil {
  44. return ld.LogRecordCount(), consumererror.NewPermanent(err)
  45. }
  46. eventURL := *s.ingestURL
  47. if !strings.HasSuffix(eventURL.Path, "v2/event") {
  48. eventURL.Path = path.Join(eventURL.Path, "v2/event")
  49. }
  50. req, err := http.NewRequestWithContext(ctx, "POST", eventURL.String(), body)
  51. if err != nil {
  52. return ld.LogRecordCount(), consumererror.NewPermanent(err)
  53. }
  54. for k, v := range s.headers {
  55. req.Header.Set(k, v)
  56. }
  57. if s.accessTokenPassthrough && accessToken != "" {
  58. req.Header.Set(splunk.SFxAccessTokenHeader, accessToken)
  59. }
  60. if compressed {
  61. req.Header.Set("Content-Encoding", "gzip")
  62. }
  63. resp, err := s.client.Do(req)
  64. if err != nil {
  65. return ld.LogRecordCount(), err
  66. }
  67. defer func() {
  68. _, _ = io.Copy(io.Discard, resp.Body)
  69. resp.Body.Close()
  70. }()
  71. err = splunk.HandleHTTPCode(resp)
  72. if err != nil {
  73. return ld.LogRecordCount(), err
  74. }
  75. return numDroppedLogRecords, nil
  76. }
  77. func (s *sfxEventClient) encodeBody(events []*sfxpb.Event) (bodyReader io.Reader, compressed bool, err error) {
  78. msg := sfxpb.EventUploadMessage{
  79. Events: events,
  80. }
  81. body, err := msg.Marshal()
  82. if err != nil {
  83. return nil, false, err
  84. }
  85. return s.getReader(body)
  86. }
  87. func (s *sfxEventClient) retrieveAccessToken(rl plog.ResourceLogs) string {
  88. attrs := rl.Resource().Attributes()
  89. if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok && accessToken.Type() == pcommon.ValueTypeStr {
  90. return accessToken.Str()
  91. }
  92. return ""
  93. }