exporter.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package signalfxexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
  4. import (
  5. "compress/gzip"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "net/http"
  10. "sync"
  11. "go.opentelemetry.io/collector/component"
  12. "go.opentelemetry.io/collector/consumer"
  13. "go.opentelemetry.io/collector/exporter"
  14. "go.opentelemetry.io/collector/pdata/plog"
  15. "go.opentelemetry.io/collector/pdata/pmetric"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/dimensions"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/hostmetadata"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  21. metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
  22. )
  23. var (
  24. errNotStarted = errors.New("exporter has not started")
  25. )
  26. // TODO: Find a place for this to be shared.
  27. type baseMetricsExporter struct {
  28. component.Component
  29. consumer.Metrics
  30. }
  31. // TODO: Find a place for this to be shared.
  32. type baseLogsExporter struct {
  33. component.Component
  34. consumer.Logs
  35. }
  36. type signalfMetadataExporter struct {
  37. exporter.Metrics
  38. exporter *signalfxExporter
  39. }
  40. func (sme *signalfMetadataExporter) ConsumeMetadata(metadata []*metadata.MetadataUpdate) error {
  41. return sme.exporter.pushMetadata(metadata)
  42. }
  43. type signalfxExporter struct {
  44. config *Config
  45. version string
  46. logger *zap.Logger
  47. telemetrySettings component.TelemetrySettings
  48. pushMetricsData func(ctx context.Context, md pmetric.Metrics) (droppedTimeSeries int, err error)
  49. pushLogsData func(ctx context.Context, ld plog.Logs) (droppedLogRecords int, err error)
  50. hostMetadataSyncer *hostmetadata.Syncer
  51. converter *translation.MetricsConverter
  52. dimClient *dimensions.DimensionClient
  53. cancelFn func()
  54. }
  55. // newSignalFxExporter returns a new SignalFx exporter.
  56. func newSignalFxExporter(
  57. config *Config,
  58. createSettings exporter.CreateSettings,
  59. ) (*signalfxExporter, error) {
  60. if config == nil {
  61. return nil, errors.New("nil config")
  62. }
  63. metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger)
  64. if err != nil {
  65. return nil, err
  66. }
  67. converter, err := translation.NewMetricsConverter(
  68. createSettings.TelemetrySettings.Logger,
  69. metricTranslator,
  70. config.ExcludeMetrics,
  71. config.IncludeMetrics,
  72. config.NonAlphanumericDimensionChars,
  73. config.DropHistogramBuckets,
  74. )
  75. if err != nil {
  76. return nil, fmt.Errorf("failed to create metric converter: %w", err)
  77. }
  78. return &signalfxExporter{
  79. config: config,
  80. version: createSettings.BuildInfo.Version,
  81. logger: createSettings.Logger,
  82. telemetrySettings: createSettings.TelemetrySettings,
  83. converter: converter,
  84. }, nil
  85. }
  86. func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) {
  87. ingestURL, err := se.config.getIngestURL()
  88. if err != nil {
  89. return err
  90. }
  91. headers := buildHeaders(se.config, se.version)
  92. client, err := se.createClient(host)
  93. if err != nil {
  94. return err
  95. }
  96. dpClient := &sfxDPClient{
  97. sfxClientBase: sfxClientBase{
  98. ingestURL: ingestURL,
  99. headers: headers,
  100. client: client,
  101. zippers: newGzipPool(),
  102. },
  103. logDataPoints: se.config.LogDataPoints,
  104. logger: se.logger,
  105. accessTokenPassthrough: se.config.AccessTokenPassthrough,
  106. converter: se.converter,
  107. }
  108. apiTLSCfg, err := se.config.APITLSSettings.LoadTLSConfig()
  109. if err != nil {
  110. return fmt.Errorf("could not load API TLS config: %w", err)
  111. }
  112. cancellable, cancelFn := context.WithCancel(ctx)
  113. se.cancelFn = cancelFn
  114. apiURL, err := se.config.getAPIURL()
  115. if err != nil {
  116. return err
  117. }
  118. dimClient := dimensions.NewDimensionClient(
  119. cancellable,
  120. dimensions.DimensionClientOptions{
  121. Token: se.config.AccessToken,
  122. APIURL: apiURL,
  123. APITLSConfig: apiTLSCfg,
  124. LogUpdates: se.config.LogDimensionUpdates,
  125. Logger: se.logger,
  126. // Duration to wait between property updates.
  127. SendDelay: se.config.DimensionClient.SendDelay,
  128. MaxBuffered: se.config.DimensionClient.MaxBuffered,
  129. MetricsConverter: *se.converter,
  130. ExcludeProperties: se.config.ExcludeProperties,
  131. MaxConnsPerHost: se.config.DimensionClient.MaxConnsPerHost,
  132. MaxIdleConns: se.config.DimensionClient.MaxIdleConns,
  133. MaxIdleConnsPerHost: se.config.DimensionClient.MaxIdleConnsPerHost,
  134. IdleConnTimeout: se.config.DimensionClient.IdleConnTimeout,
  135. Timeout: se.config.DimensionClient.Timeout,
  136. })
  137. dimClient.Start()
  138. var hms *hostmetadata.Syncer
  139. if se.config.SyncHostMetadata {
  140. hms = hostmetadata.NewSyncer(se.logger, dimClient)
  141. }
  142. se.dimClient = dimClient
  143. se.pushMetricsData = dpClient.pushMetricsData
  144. se.hostMetadataSyncer = hms
  145. return nil
  146. }
  147. func newGzipPool() sync.Pool {
  148. return sync.Pool{New: func() any {
  149. return gzip.NewWriter(nil)
  150. }}
  151. }
  152. func newEventExporter(config *Config, createSettings exporter.CreateSettings) (*signalfxExporter, error) {
  153. if config == nil {
  154. return nil, errors.New("nil config")
  155. }
  156. return &signalfxExporter{
  157. config: config,
  158. version: createSettings.BuildInfo.Version,
  159. logger: createSettings.Logger,
  160. telemetrySettings: createSettings.TelemetrySettings,
  161. }, nil
  162. }
  163. func (se *signalfxExporter) startLogs(_ context.Context, host component.Host) error {
  164. ingestURL, err := se.config.getIngestURL()
  165. if err != nil {
  166. return err
  167. }
  168. headers := buildHeaders(se.config, se.version)
  169. client, err := se.createClient(host)
  170. if err != nil {
  171. return err
  172. }
  173. eventClient := &sfxEventClient{
  174. sfxClientBase: sfxClientBase{
  175. ingestURL: ingestURL,
  176. headers: headers,
  177. client: client,
  178. zippers: newGzipPool(),
  179. },
  180. logger: se.logger,
  181. accessTokenPassthrough: se.config.AccessTokenPassthrough,
  182. }
  183. se.pushLogsData = eventClient.pushLogsData
  184. return nil
  185. }
  186. func (se *signalfxExporter) createClient(host component.Host) (*http.Client, error) {
  187. se.config.HTTPClientSettings.TLSSetting = se.config.IngestTLSSettings
  188. return se.config.ToClient(host, se.telemetrySettings)
  189. }
  190. func (se *signalfxExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
  191. _, err := se.pushMetricsData(ctx, md)
  192. if err == nil && se.hostMetadataSyncer != nil {
  193. se.hostMetadataSyncer.Sync(md)
  194. }
  195. return err
  196. }
  197. func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
  198. _, err := se.pushLogsData(ctx, ld)
  199. return err
  200. }
  201. func (se *signalfxExporter) shutdown(_ context.Context) error {
  202. if se.cancelFn != nil {
  203. se.cancelFn()
  204. }
  205. return nil
  206. }
  207. func (se *signalfxExporter) pushMetadata(metadata []*metadata.MetadataUpdate) error {
  208. if se.dimClient == nil {
  209. return errNotStarted
  210. }
  211. return se.dimClient.PushMetadata(metadata)
  212. }
  213. func buildHeaders(config *Config, version string) map[string]string {
  214. headers := map[string]string{
  215. "Connection": "keep-alive",
  216. "Content-Type": "application/x-protobuf",
  217. "User-Agent": fmt.Sprintf("OpenTelemetry-Collector SignalFx Exporter/%s", version),
  218. }
  219. if config.AccessToken != "" {
  220. headers[splunk.SFxAccessTokenHeader] = string(config.AccessToken)
  221. }
  222. // Add any custom headers from the config. They will override the pre-defined
  223. // ones above in case of conflict, but, not the content encoding one since
  224. // the latter one is defined according to the payload.
  225. for k, v := range config.HTTPClientSettings.Headers {
  226. headers[k] = string(v)
  227. }
  228. // we want to control how headers are set, overriding user headers with our passthrough.
  229. config.HTTPClientSettings.Headers = nil
  230. return headers
  231. }