|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package signalfxexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
- import (
- "compress/gzip"
- "context"
- "errors"
- "fmt"
- "net/http"
- "sync"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/exporter"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/dimensions"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/hostmetadata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
- )
- var (
- errNotStarted = errors.New("exporter has not started")
- )
- // TODO: Find a place for this to be shared.
- type baseMetricsExporter struct {
- component.Component
- consumer.Metrics
- }
- // TODO: Find a place for this to be shared.
- type baseLogsExporter struct {
- component.Component
- consumer.Logs
- }
- type signalfMetadataExporter struct {
- exporter.Metrics
- exporter *signalfxExporter
- }
- func (sme *signalfMetadataExporter) ConsumeMetadata(metadata []*metadata.MetadataUpdate) error {
- return sme.exporter.pushMetadata(metadata)
- }
- type signalfxExporter struct {
- config *Config
- version string
- logger *zap.Logger
- telemetrySettings component.TelemetrySettings
- pushMetricsData func(ctx context.Context, md pmetric.Metrics) (droppedTimeSeries int, err error)
- pushLogsData func(ctx context.Context, ld plog.Logs) (droppedLogRecords int, err error)
- hostMetadataSyncer *hostmetadata.Syncer
- converter *translation.MetricsConverter
- dimClient *dimensions.DimensionClient
- cancelFn func()
- }
- // newSignalFxExporter returns a new SignalFx exporter.
- func newSignalFxExporter(
- config *Config,
- createSettings exporter.CreateSettings,
- ) (*signalfxExporter, error) {
- if config == nil {
- return nil, errors.New("nil config")
- }
- metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger)
- if err != nil {
- return nil, err
- }
- converter, err := translation.NewMetricsConverter(
- createSettings.TelemetrySettings.Logger,
- metricTranslator,
- config.ExcludeMetrics,
- config.IncludeMetrics,
- config.NonAlphanumericDimensionChars,
- config.DropHistogramBuckets,
- )
- if err != nil {
- return nil, fmt.Errorf("failed to create metric converter: %w", err)
- }
- return &signalfxExporter{
- config: config,
- version: createSettings.BuildInfo.Version,
- logger: createSettings.Logger,
- telemetrySettings: createSettings.TelemetrySettings,
- converter: converter,
- }, nil
- }
- func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) {
- ingestURL, err := se.config.getIngestURL()
- if err != nil {
- return err
- }
- headers := buildHeaders(se.config, se.version)
- client, err := se.createClient(host)
- if err != nil {
- return err
- }
- dpClient := &sfxDPClient{
- sfxClientBase: sfxClientBase{
- ingestURL: ingestURL,
- headers: headers,
- client: client,
- zippers: newGzipPool(),
- },
- logDataPoints: se.config.LogDataPoints,
- logger: se.logger,
- accessTokenPassthrough: se.config.AccessTokenPassthrough,
- converter: se.converter,
- }
- apiTLSCfg, err := se.config.APITLSSettings.LoadTLSConfig()
- if err != nil {
- return fmt.Errorf("could not load API TLS config: %w", err)
- }
- cancellable, cancelFn := context.WithCancel(ctx)
- se.cancelFn = cancelFn
- apiURL, err := se.config.getAPIURL()
- if err != nil {
- return err
- }
- dimClient := dimensions.NewDimensionClient(
- cancellable,
- dimensions.DimensionClientOptions{
- Token: se.config.AccessToken,
- APIURL: apiURL,
- APITLSConfig: apiTLSCfg,
- LogUpdates: se.config.LogDimensionUpdates,
- Logger: se.logger,
- // Duration to wait between property updates.
- SendDelay: se.config.DimensionClient.SendDelay,
- MaxBuffered: se.config.DimensionClient.MaxBuffered,
- MetricsConverter: *se.converter,
- ExcludeProperties: se.config.ExcludeProperties,
- MaxConnsPerHost: se.config.DimensionClient.MaxConnsPerHost,
- MaxIdleConns: se.config.DimensionClient.MaxIdleConns,
- MaxIdleConnsPerHost: se.config.DimensionClient.MaxIdleConnsPerHost,
- IdleConnTimeout: se.config.DimensionClient.IdleConnTimeout,
- Timeout: se.config.DimensionClient.Timeout,
- })
- dimClient.Start()
- var hms *hostmetadata.Syncer
- if se.config.SyncHostMetadata {
- hms = hostmetadata.NewSyncer(se.logger, dimClient)
- }
- se.dimClient = dimClient
- se.pushMetricsData = dpClient.pushMetricsData
- se.hostMetadataSyncer = hms
- return nil
- }
- func newGzipPool() sync.Pool {
- return sync.Pool{New: func() any {
- return gzip.NewWriter(nil)
- }}
- }
- func newEventExporter(config *Config, createSettings exporter.CreateSettings) (*signalfxExporter, error) {
- if config == nil {
- return nil, errors.New("nil config")
- }
- return &signalfxExporter{
- config: config,
- version: createSettings.BuildInfo.Version,
- logger: createSettings.Logger,
- telemetrySettings: createSettings.TelemetrySettings,
- }, nil
- }
- func (se *signalfxExporter) startLogs(_ context.Context, host component.Host) error {
- ingestURL, err := se.config.getIngestURL()
- if err != nil {
- return err
- }
- headers := buildHeaders(se.config, se.version)
- client, err := se.createClient(host)
- if err != nil {
- return err
- }
- eventClient := &sfxEventClient{
- sfxClientBase: sfxClientBase{
- ingestURL: ingestURL,
- headers: headers,
- client: client,
- zippers: newGzipPool(),
- },
- logger: se.logger,
- accessTokenPassthrough: se.config.AccessTokenPassthrough,
- }
- se.pushLogsData = eventClient.pushLogsData
- return nil
- }
- func (se *signalfxExporter) createClient(host component.Host) (*http.Client, error) {
- se.config.HTTPClientSettings.TLSSetting = se.config.IngestTLSSettings
- return se.config.ToClient(host, se.telemetrySettings)
- }
- func (se *signalfxExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
- _, err := se.pushMetricsData(ctx, md)
- if err == nil && se.hostMetadataSyncer != nil {
- se.hostMetadataSyncer.Sync(md)
- }
- return err
- }
- func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
- _, err := se.pushLogsData(ctx, ld)
- return err
- }
- func (se *signalfxExporter) shutdown(_ context.Context) error {
- if se.cancelFn != nil {
- se.cancelFn()
- }
- return nil
- }
- func (se *signalfxExporter) pushMetadata(metadata []*metadata.MetadataUpdate) error {
- if se.dimClient == nil {
- return errNotStarted
- }
- return se.dimClient.PushMetadata(metadata)
- }
- func buildHeaders(config *Config, version string) map[string]string {
- headers := map[string]string{
- "Connection": "keep-alive",
- "Content-Type": "application/x-protobuf",
- "User-Agent": fmt.Sprintf("OpenTelemetry-Collector SignalFx Exporter/%s", version),
- }
- if config.AccessToken != "" {
- headers[splunk.SFxAccessTokenHeader] = string(config.AccessToken)
- }
- // Add any custom headers from the config. They will override the pre-defined
- // ones above in case of conflict, but, not the content encoding one since
- // the latter one is defined according to the payload.
- for k, v := range config.HTTPClientSettings.Headers {
- headers[k] = string(v)
- }
- // we want to control how headers are set, overriding user headers with our passthrough.
- config.HTTPClientSettings.Headers = nil
- return headers
- }
|