123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package signalfxexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
- import (
- "context"
- "io"
- "net/http"
- "path"
- "strings"
- sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- )
- // sfxEventClient sends the data to the SignalFx backend.
- type sfxEventClient struct {
- sfxClientBase
- logger *zap.Logger
- accessTokenPassthrough bool
- }
- func (s *sfxEventClient) pushLogsData(ctx context.Context, ld plog.Logs) (int, error) {
- rls := ld.ResourceLogs()
- if rls.Len() == 0 {
- return 0, nil
- }
- accessToken := s.retrieveAccessToken(rls.At(0))
- var sfxEvents []*sfxpb.Event
- numDroppedLogRecords := 0
- for i := 0; i < rls.Len(); i++ {
- rl := rls.At(i)
- ills := rl.ScopeLogs()
- for j := 0; j < ills.Len(); j++ {
- sl := ills.At(j)
- events, dropped := translation.LogRecordSliceToSignalFxV2(s.logger, sl.LogRecords(), rl.Resource().Attributes())
- sfxEvents = append(sfxEvents, events...)
- numDroppedLogRecords += dropped
- }
- }
- body, compressed, err := s.encodeBody(sfxEvents)
- if err != nil {
- return ld.LogRecordCount(), consumererror.NewPermanent(err)
- }
- eventURL := *s.ingestURL
- if !strings.HasSuffix(eventURL.Path, "v2/event") {
- eventURL.Path = path.Join(eventURL.Path, "v2/event")
- }
- req, err := http.NewRequestWithContext(ctx, "POST", eventURL.String(), body)
- if err != nil {
- return ld.LogRecordCount(), consumererror.NewPermanent(err)
- }
- for k, v := range s.headers {
- req.Header.Set(k, v)
- }
- if s.accessTokenPassthrough && accessToken != "" {
- req.Header.Set(splunk.SFxAccessTokenHeader, accessToken)
- }
- if compressed {
- req.Header.Set("Content-Encoding", "gzip")
- }
- resp, err := s.client.Do(req)
- if err != nil {
- return ld.LogRecordCount(), err
- }
- defer func() {
- _, _ = io.Copy(io.Discard, resp.Body)
- resp.Body.Close()
- }()
- err = splunk.HandleHTTPCode(resp)
- if err != nil {
- return ld.LogRecordCount(), err
- }
- return numDroppedLogRecords, nil
- }
- func (s *sfxEventClient) encodeBody(events []*sfxpb.Event) (bodyReader io.Reader, compressed bool, err error) {
- msg := sfxpb.EventUploadMessage{
- Events: events,
- }
- body, err := msg.Marshal()
- if err != nil {
- return nil, false, err
- }
- return s.getReader(body)
- }
- func (s *sfxEventClient) retrieveAccessToken(rl plog.ResourceLogs) string {
- attrs := rl.Resource().Attributes()
- if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok && accessToken.Type() == pcommon.ValueTypeStr {
- return accessToken.Str()
- }
- return ""
- }
|