12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package signalfxexporter
- import (
- "compress/gzip"
- "context"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/http/httptest"
- "net/url"
- "sort"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
- sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/confighttp"
- "go.opentelemetry.io/collector/config/configopaque"
- "go.opentelemetry.io/collector/config/configtls"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/exporter/exporterhelper"
- "go.opentelemetry.io/collector/exporter/exportertest"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "go.uber.org/zap/zaptest/observer"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/dimensions"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation/dpfilters"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
- )
- func TestNew(t *testing.T) {
- tests := []struct {
- name string
- config *Config
- wantErr bool
- wantErrMessage string
- }{
- {
- name: "nil config fails",
- wantErr: true,
- wantErrMessage: "nil config",
- },
- {
- name: "fails to create metrics converter",
- config: &Config{
- AccessToken: "test",
- Realm: "realm",
- ExcludeMetrics: []dpfilters.MetricFilter{{}},
- },
- wantErr: true,
- },
- {
- name: "successfully create exporter",
- config: &Config{
- AccessToken: "someToken",
- Realm: "xyz",
- HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
- },
- },
- {
- name: "create exporter with host metadata syncer",
- config: &Config{
- AccessToken: "someToken",
- Realm: "xyz",
- HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
- SyncHostMetadata: true,
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
- if tt.wantErr {
- require.Error(t, err)
- if tt.wantErrMessage != "" {
- require.EqualError(t, err, tt.wantErrMessage)
- }
- } else {
- require.NotNil(t, got)
- }
- })
- }
- }
- func TestConsumeMetrics(t *testing.T) {
- smallBatch := pmetric.NewMetrics()
- rm := smallBatch.ResourceMetrics().AppendEmpty()
- ilm := rm.ScopeMetrics().AppendEmpty()
- m := ilm.Metrics().AppendEmpty()
- m.SetName("test_gauge")
- dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.Attributes().PutStr("k0", "v0")
- dp.Attributes().PutStr("k1", "v1")
- dp.SetDoubleValue(123)
- tests := []struct {
- name string
- md pmetric.Metrics
- httpResponseCode int
- retryAfter int
- numDroppedTimeSeries int
- wantErr bool
- wantPermanentErr bool
- wantThrottleErr bool
- expectedErrorMsg string
- }{
- {
- name: "happy_path",
- md: smallBatch,
- httpResponseCode: http.StatusAccepted,
- },
- {
- name: "response_forbidden",
- md: smallBatch,
- httpResponseCode: http.StatusForbidden,
- numDroppedTimeSeries: 1,
- wantErr: true,
- expectedErrorMsg: "HTTP 403 \"Forbidden\"",
- },
- {
- name: "response_bad_request",
- md: smallBatch,
- httpResponseCode: http.StatusBadRequest,
- numDroppedTimeSeries: 1,
- wantPermanentErr: true,
- expectedErrorMsg: "Permanent error: \"HTTP/1.1 400 Bad Request",
- },
- {
- name: "response_throttle",
- md: smallBatch,
- httpResponseCode: http.StatusTooManyRequests,
- numDroppedTimeSeries: 1,
- wantThrottleErr: true,
- },
- {
- name: "response_throttle_with_header",
- md: smallBatch,
- retryAfter: 123,
- httpResponseCode: http.StatusServiceUnavailable,
- numDroppedTimeSeries: 1,
- wantThrottleErr: true,
- },
- {
- name: "large_batch",
- md: generateLargeDPBatch(),
- httpResponseCode: http.StatusAccepted,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- assert.Equal(t, "test", r.Header.Get("test_header_"))
- if (tt.httpResponseCode == http.StatusTooManyRequests ||
- tt.httpResponseCode == http.StatusServiceUnavailable) && tt.retryAfter != 0 {
- w.Header().Add(splunk.HeaderRetryAfter, strconv.Itoa(tt.retryAfter))
- }
- w.WriteHeader(tt.httpResponseCode)
- _, _ = w.Write([]byte("response content"))
- }))
- defer server.Close()
- serverURL, err := url.Parse(server.URL)
- assert.NoError(t, err)
- cfg := &Config{
- HTTPClientSettings: confighttp.HTTPClientSettings{
- Timeout: 1 * time.Second,
- Headers: map[string]configopaque.String{"test_header_": "test"},
- },
- }
- client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings)
- require.NoError(t, err)
- c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false)
- require.NoError(t, err)
- require.NotNil(t, c)
- dpClient := &sfxDPClient{
- sfxClientBase: sfxClientBase{
- ingestURL: serverURL,
- client: client,
- zippers: sync.Pool{New: func() any {
- return gzip.NewWriter(nil)
- }},
- },
- logger: zap.NewNop(),
- converter: c,
- }
- numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), tt.md)
- assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries)
- if tt.wantErr {
- assert.Error(t, err)
- assert.EqualError(t, err, tt.expectedErrorMsg)
- return
- }
- if tt.wantPermanentErr {
- assert.Error(t, err)
- assert.True(t, consumererror.IsPermanent(err))
- assert.True(t, strings.HasPrefix(err.Error(), tt.expectedErrorMsg))
- assert.Contains(t, err.Error(), "response content")
- return
- }
- if tt.wantThrottleErr {
- expected := fmt.Errorf("HTTP %d %q", tt.httpResponseCode, http.StatusText(tt.httpResponseCode))
- expected = exporterhelper.NewThrottleRetry(expected, time.Duration(tt.retryAfter)*time.Second)
- assert.EqualValues(t, expected, err)
- return
- }
- assert.NoError(t, err)
- })
- }
- }
- func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) {
- fromHeaders := "AccessTokenFromClientHeaders"
- fromLabels := []string{"AccessTokenFromLabel0", "AccessTokenFromLabel1"}
- validMetricsWithToken := func(includeToken bool, token string) pmetric.Metrics {
- out := pmetric.NewMetrics()
- rm := out.ResourceMetrics().AppendEmpty()
- if includeToken {
- rm.Resource().Attributes().PutStr("com.splunk.signalfx.access_token", token)
- }
- ilm := rm.ScopeMetrics().AppendEmpty()
- m := ilm.Metrics().AppendEmpty()
- m.SetName("test_gauge")
- dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.Attributes().PutStr("k0", "v0")
- dp.Attributes().PutStr("k1", "v1")
- dp.SetDoubleValue(123)
- return out
- }
- tests := []struct {
- name string
- accessTokenPassthrough bool
- metrics pmetric.Metrics
- additionalHeaders map[string]string
- pushedTokens []string
- }{
- {
- name: "passthrough access token and included in md",
- accessTokenPassthrough: true,
- metrics: validMetricsWithToken(true, fromLabels[0]),
- pushedTokens: []string{fromLabels[0]},
- },
- {
- name: "passthrough access token and not included in md",
- accessTokenPassthrough: true,
- metrics: validMetricsWithToken(false, fromLabels[0]),
- pushedTokens: []string{fromHeaders},
- },
- {
- name: "don't passthrough access token and included in md",
- accessTokenPassthrough: false,
- metrics: func() pmetric.Metrics {
- forFirstToken := validMetricsWithToken(true, fromLabels[0])
- tgt := forFirstToken.ResourceMetrics().AppendEmpty()
- validMetricsWithToken(true, fromLabels[1]).ResourceMetrics().At(0).CopyTo(tgt)
- return forFirstToken
- }(),
- pushedTokens: []string{fromHeaders},
- },
- {
- name: "don't passthrough access token and not included in md",
- accessTokenPassthrough: false,
- metrics: validMetricsWithToken(false, fromLabels[0]),
- pushedTokens: []string{fromHeaders},
- },
- {
- name: "override user-specified token-like header",
- accessTokenPassthrough: true,
- metrics: validMetricsWithToken(true, fromLabels[0]),
- additionalHeaders: map[string]string{
- "x-sf-token": "user-specified",
- },
- pushedTokens: []string{fromLabels[0]},
- },
- {
- name: "use token from header when resource is nil",
- accessTokenPassthrough: true,
- metrics: func() pmetric.Metrics {
- out := pmetric.NewMetrics()
- rm := out.ResourceMetrics().AppendEmpty()
- ilm := rm.ScopeMetrics().AppendEmpty()
- m := ilm.Metrics().AppendEmpty()
- m.SetName("test_gauge")
- dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.Attributes().PutStr("k0", "v0")
- dp.Attributes().PutStr("k1", "v1")
- dp.SetDoubleValue(123)
- return out
- }(),
- pushedTokens: []string{fromHeaders},
- },
- {
- name: "multiple tokens passed through",
- accessTokenPassthrough: true,
- metrics: func() pmetric.Metrics {
- forFirstToken := validMetricsWithToken(true, fromLabels[0])
- forSecondToken := validMetricsWithToken(true, fromLabels[1])
- forSecondToken.ResourceMetrics().EnsureCapacity(2)
- forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- return forSecondToken
- }(),
- pushedTokens: []string{fromLabels[0], fromLabels[1]},
- },
- {
- name: "multiple tokens passed through - multiple md with same token",
- accessTokenPassthrough: true,
- metrics: func() pmetric.Metrics {
- forFirstToken := validMetricsWithToken(true, fromLabels[1])
- forSecondToken := validMetricsWithToken(true, fromLabels[0])
- moreForSecondToken := validMetricsWithToken(true, fromLabels[1])
- forSecondToken.ResourceMetrics().EnsureCapacity(3)
- forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- return forSecondToken
- }(),
- pushedTokens: []string{fromLabels[0], fromLabels[1]},
- },
- {
- name: "multiple tokens passed through - multiple md with same token grouped together",
- accessTokenPassthrough: true,
- metrics: func() pmetric.Metrics {
- forFirstToken := validMetricsWithToken(true, fromLabels[0])
- forSecondToken := validMetricsWithToken(true, fromLabels[1])
- moreForSecondToken := validMetricsWithToken(true, fromLabels[1])
- forSecondToken.ResourceMetrics().EnsureCapacity(3)
- moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- return forSecondToken
- }(),
- pushedTokens: []string{fromLabels[0], fromLabels[1]},
- },
- {
- name: "multiple tokens passed through - one corrupted",
- accessTokenPassthrough: true,
- metrics: func() pmetric.Metrics {
- forFirstToken := validMetricsWithToken(true, fromLabels[0])
- forSecondToken := validMetricsWithToken(false, fromLabels[1])
- forSecondToken.ResourceMetrics().EnsureCapacity(2)
- forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
- return forSecondToken
- }(),
- pushedTokens: []string{fromLabels[0], fromHeaders},
- },
- }
- for _, tt := range tests {
- receivedTokens := struct {
- sync.Mutex
- tokens []string
- }{}
- receivedTokens.tokens = []string{}
- t.Run(tt.name, func(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- assert.Equal(t, tt.name, r.Header.Get("test_header_"))
- receivedTokens.Lock()
- token := r.Header.Get("x-sf-token")
- receivedTokens.tokens = append(receivedTokens.tokens, token)
- receivedTokens.Unlock()
- w.WriteHeader(http.StatusAccepted)
- }))
- defer server.Close()
- factory := NewFactory()
- cfg := factory.CreateDefaultConfig().(*Config)
- cfg.IngestURL = server.URL
- cfg.APIURL = server.URL
- cfg.HTTPClientSettings.Headers = make(map[string]configopaque.String)
- for k, v := range tt.additionalHeaders {
- cfg.HTTPClientSettings.Headers[k] = configopaque.String(v)
- }
- cfg.HTTPClientSettings.Headers["test_header_"] = configopaque.String(tt.name)
- cfg.AccessToken = configopaque.String(fromHeaders)
- cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
- sfxExp, err := NewFactory().CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
- require.NoError(t, err)
- require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
- defer func() {
- require.NoError(t, sfxExp.Shutdown(context.Background()))
- }()
- err = sfxExp.ConsumeMetrics(context.Background(), tt.metrics)
- assert.NoError(t, err)
- require.Eventually(t, func() bool {
- receivedTokens.Lock()
- defer receivedTokens.Unlock()
- return len(tt.pushedTokens) == len(receivedTokens.tokens)
- }, 1*time.Second, 10*time.Millisecond)
- sort.Strings(tt.pushedTokens)
- sort.Strings(receivedTokens.tokens)
- assert.Equal(t, tt.pushedTokens, receivedTokens.tokens)
- })
- }
- }
- func TestNewEventExporter(t *testing.T) {
- got, err := newEventExporter(nil, exportertest.NewNopCreateSettings())
- assert.EqualError(t, err, "nil config")
- assert.Nil(t, got)
- got, err = newEventExporter(nil, exportertest.NewNopCreateSettings())
- assert.Error(t, err)
- assert.Nil(t, got)
- cfg := &Config{
- AccessToken: "someToken",
- Realm: "xyz",
- HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
- }
- got, err = newEventExporter(cfg, exportertest.NewNopCreateSettings())
- assert.NoError(t, err)
- require.NotNil(t, got)
- err = got.startLogs(context.Background(), componenttest.NewNopHost())
- assert.NoError(t, err)
- // This is expected to fail.
- ld := makeSampleResourceLogs()
- err = got.pushLogs(context.Background(), ld)
- assert.Error(t, err)
- }
- func makeSampleResourceLogs() plog.Logs {
- out := plog.NewLogs()
- l := out.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
- l.SetTimestamp(pcommon.Timestamp(1000))
- attrs := l.Attributes()
- attrs.PutStr("k0", "v0")
- attrs.PutStr("k1", "v1")
- attrs.PutStr("k2", "v2")
- propMap := attrs.PutEmptyMap("com.splunk.signalfx.event_properties")
- propMap.PutStr("env", "prod")
- propMap.PutBool("isActive", true)
- propMap.PutInt("rack", 5)
- propMap.PutDouble("temp", 40.5)
- attrs.PutInt("com.splunk.signalfx.event_category", int64(sfxpb.EventCategory_USER_DEFINED))
- attrs.PutStr("com.splunk.signalfx.event_type", "shutdown")
- return out
- }
- func TestConsumeEventData(t *testing.T) {
- tests := []struct {
- name string
- resourceLogs plog.Logs
- reqTestFunc func(t *testing.T, r *http.Request)
- httpResponseCode int
- numDroppedLogRecords int
- wantErr bool
- }{
- {
- name: "happy_path",
- resourceLogs: makeSampleResourceLogs(),
- reqTestFunc: nil,
- httpResponseCode: http.StatusAccepted,
- },
- {
- name: "no_event_attribute",
- resourceLogs: func() plog.Logs {
- out := makeSampleResourceLogs()
- attrs := out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()
- attrs.Remove("com.splunk.signalfx.event_category")
- attrs.Remove("com.splunk.signalfx.event_type")
- return out
- }(),
- reqTestFunc: nil,
- numDroppedLogRecords: 1,
- httpResponseCode: http.StatusAccepted,
- },
- {
- name: "nonconvertible_log_attrs",
- resourceLogs: func() plog.Logs {
- out := makeSampleResourceLogs()
- attrs := out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()
- attrs.PutEmptyMap("map")
- propsAttrs, _ := attrs.Get("com.splunk.signalfx.event_properties")
- propsAttrs.Map().PutEmptyMap("map")
- return out
- }(),
- reqTestFunc: nil,
- // The log does go through, just without that prop
- numDroppedLogRecords: 0,
- httpResponseCode: http.StatusAccepted,
- },
- {
- name: "response_forbidden",
- resourceLogs: makeSampleResourceLogs(),
- reqTestFunc: nil,
- httpResponseCode: http.StatusForbidden,
- numDroppedLogRecords: 1,
- wantErr: true,
- },
- {
- name: "large_batch",
- resourceLogs: generateLargeEventBatch(),
- reqTestFunc: nil,
- httpResponseCode: http.StatusAccepted,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- assert.Equal(t, "test", r.Header.Get("test_header_"))
- if tt.reqTestFunc != nil {
- tt.reqTestFunc(t, r)
- }
- w.WriteHeader(tt.httpResponseCode)
- }))
- defer server.Close()
- serverURL, err := url.Parse(server.URL)
- assert.NoError(t, err)
- cfg := &Config{
- HTTPClientSettings: confighttp.HTTPClientSettings{
- Timeout: 1 * time.Second,
- Headers: map[string]configopaque.String{"test_header_": "test"},
- },
- }
- client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings)
- require.NoError(t, err)
- eventClient := &sfxEventClient{
- sfxClientBase: sfxClientBase{
- ingestURL: serverURL,
- client: client,
- zippers: newGzipPool(),
- },
- logger: zap.NewNop(),
- }
- numDroppedLogRecords, err := eventClient.pushLogsData(context.Background(), tt.resourceLogs)
- assert.Equal(t, tt.numDroppedLogRecords, numDroppedLogRecords)
- if tt.wantErr {
- assert.Error(t, err)
- return
- }
- assert.NoError(t, err)
- })
- }
- }
- func TestConsumeLogsDataWithAccessTokenPassthrough(t *testing.T) {
- fromHeaders := "AccessTokenFromClientHeaders"
- fromLabels := "AccessTokenFromLabel"
- newLogData := func(includeToken bool) plog.Logs {
- out := makeSampleResourceLogs()
- makeSampleResourceLogs().ResourceLogs().At(0).CopyTo(out.ResourceLogs().AppendEmpty())
- if includeToken {
- out.ResourceLogs().At(0).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
- out.ResourceLogs().At(1).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
- }
- return out
- }
- tests := []struct {
- name string
- accessTokenPassthrough bool
- includedInLogData bool
- expectedToken string
- }{
- {
- name: "passthrough access token and included in logs",
- accessTokenPassthrough: true,
- includedInLogData: true,
- expectedToken: fromLabels,
- },
- {
- name: "passthrough access token and not included in logs",
- accessTokenPassthrough: true,
- includedInLogData: false,
- expectedToken: fromHeaders,
- },
- {
- name: "don't passthrough access token and included in logs",
- accessTokenPassthrough: false,
- includedInLogData: true,
- expectedToken: fromHeaders,
- },
- {
- name: "don't passthrough access token and not included in logs",
- accessTokenPassthrough: false,
- includedInLogData: false,
- expectedToken: fromHeaders,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- receivedTokens := struct {
- sync.Mutex
- tokens []string
- }{}
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- assert.Equal(t, tt.name, r.Header.Get("test_header_"))
- receivedTokens.Lock()
- receivedTokens.tokens = append(receivedTokens.tokens, r.Header.Get("x-sf-token"))
- receivedTokens.Unlock()
- w.WriteHeader(http.StatusAccepted)
- }))
- defer server.Close()
- factory := NewFactory()
- cfg := factory.CreateDefaultConfig().(*Config)
- cfg.IngestURL = server.URL
- cfg.APIURL = server.URL
- cfg.Headers = make(map[string]configopaque.String)
- cfg.Headers["test_header_"] = configopaque.String(tt.name)
- cfg.AccessToken = configopaque.String(fromHeaders)
- cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
- sfxExp, err := NewFactory().CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
- require.NoError(t, err)
- require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
- defer func() {
- require.NoError(t, sfxExp.Shutdown(context.Background()))
- }()
- assert.NoError(t, sfxExp.ConsumeLogs(context.Background(), newLogData(tt.includedInLogData)))
- require.Eventually(t, func() bool {
- receivedTokens.Lock()
- defer receivedTokens.Unlock()
- return len(receivedTokens.tokens) == 1
- }, 1*time.Second, 10*time.Millisecond)
- assert.Equal(t, receivedTokens.tokens[0], tt.expectedToken)
- })
- }
- }
- func generateLargeDPBatch() pmetric.Metrics {
- md := pmetric.NewMetrics()
- md.ResourceMetrics().EnsureCapacity(6500)
- ts := time.Now()
- for i := 0; i < 6500; i++ {
- rm := md.ResourceMetrics().AppendEmpty()
- ilm := rm.ScopeMetrics().AppendEmpty()
- m := ilm.Metrics().AppendEmpty()
- m.SetName("test_" + strconv.Itoa(i))
- dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
- dp.Attributes().PutStr("k0", "v0")
- dp.Attributes().PutStr("k1", "v1")
- dp.SetIntValue(int64(i))
- }
- return md
- }
- func generateLargeEventBatch() plog.Logs {
- out := plog.NewLogs()
- logs := out.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
- batchSize := 65000
- logs.EnsureCapacity(batchSize)
- ts := time.Now()
- for i := 0; i < batchSize; i++ {
- lr := logs.AppendEmpty()
- lr.Attributes().PutStr("k0", "k1")
- lr.Attributes().PutEmpty("com.splunk.signalfx.event_category")
- lr.SetTimestamp(pcommon.NewTimestampFromTime(ts))
- }
- return out
- }
- func TestConsumeMetadataNotStarted(t *testing.T) {
- exporter := &signalfxExporter{}
- err := exporter.pushMetadata([]*metadata.MetadataUpdate{})
- require.ErrorContains(t, err, "exporter has not started")
- }
- func TestConsumeMetadata(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- converter, err := translation.NewMetricsConverter(
- zap.NewNop(),
- nil,
- cfg.ExcludeMetrics,
- cfg.IncludeMetrics,
- cfg.NonAlphanumericDimensionChars,
- false,
- )
- require.NoError(t, err)
- type args struct {
- metadata []*metadata.MetadataUpdate
- }
- type fields struct {
- payLoad map[string]any
- }
- tests := []struct {
- name string
- fields fields
- args args
- excludeProperties []dpfilters.PropertyFilter
- expectedDimensionKey string
- expectedDimensionValue string
- sendDelay time.Duration
- shouldNotSendUpdate bool
- }{
- {
- name: "Test property updates",
- fields: fields{
- map[string]any{
- "customProperties": map[string]any{
- "prop.erty1": "val1",
- "property2": nil,
- "prop.erty3": "val33",
- "property4": nil,
- },
- "tags": nil,
- "tagsToRemove": nil,
- },
- },
- excludeProperties: []dpfilters.PropertyFilter{
- {
- DimensionName: mustStringFilter(t, "/^.*$/"),
- DimensionValue: mustStringFilter(t, "/^.*$/"),
- PropertyName: mustStringFilter(t, "/^property2$/"),
- PropertyValue: mustStringFilter(t, "some*value"),
- },
- {
- DimensionName: mustStringFilter(t, "/^.*$/"),
- DimensionValue: mustStringFilter(t, "/^.*$/"),
- PropertyName: mustStringFilter(t, "property5"),
- PropertyValue: mustStringFilter(t, "/^.*$/"),
- },
- {
- DimensionName: mustStringFilter(t, "*"),
- DimensionValue: mustStringFilter(t, "*"),
- PropertyName: mustStringFilter(t, "/^pro[op]erty6$/"),
- PropertyValue: mustStringFilter(t, "property*value"),
- },
- },
- args: args{
- []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "prop.erty1": "val1",
- "property5": "added.value",
- "property6": "property6.value",
- },
- MetadataToRemove: map[string]string{
- "property2": "val2",
- "property5": "removed.value",
- },
- MetadataToUpdate: map[string]string{
- "prop.erty3": "val33",
- "property4": "",
- },
- },
- },
- },
- },
- expectedDimensionKey: "key",
- expectedDimensionValue: "id",
- },
- {
- name: "Test tag updates",
- fields: fields{
- map[string]any{
- "customProperties": map[string]any{},
- "tags": []any{
- "tag.1",
- },
- "tagsToRemove": []any{
- "tag/2",
- },
- },
- },
- excludeProperties: []dpfilters.PropertyFilter{
- {
- // confirms tags aren't affected by excludeProperties filters
- DimensionName: mustStringFilter(t, "/^.*$/"),
- DimensionValue: mustStringFilter(t, "/^.*$/"),
- PropertyName: mustStringFilter(t, "/^.*$/"),
- PropertyValue: mustStringFilter(t, "/^.*$/"),
- },
- },
- args: args{
- []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "tag.1": "",
- },
- MetadataToRemove: map[string]string{
- "tag/2": "",
- },
- MetadataToUpdate: map[string]string{},
- },
- },
- },
- },
- expectedDimensionKey: "key",
- expectedDimensionValue: "id",
- },
- {
- name: "Test quick successive updates",
- fields: fields{
- map[string]any{
- "customProperties": map[string]any{
- "property1": nil,
- "property2": "val2",
- "property3": nil,
- },
- "tags": []any{
- "tag/2",
- },
- "tagsToRemove": []any{
- "tag.1",
- },
- },
- },
- args: args{
- []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "tag.1": "",
- "property1": "val1",
- "property3": "val3",
- },
- MetadataToRemove: map[string]string{
- "tag/2": "",
- },
- MetadataToUpdate: map[string]string{
- "property2": "val22",
- },
- },
- },
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "tag/2": "",
- },
- MetadataToRemove: map[string]string{
- "tag.1": "",
- "property1": "val1",
- },
- MetadataToUpdate: map[string]string{
- "property2": "val2",
- "property3": "val33",
- },
- },
- },
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{},
- MetadataToRemove: map[string]string{
- "property3": "val33",
- },
- MetadataToUpdate: map[string]string{},
- },
- },
- },
- },
- expectedDimensionKey: "key",
- expectedDimensionValue: "id",
- sendDelay: time.Second,
- },
- {
- name: "Test updates on dimensions with nonalphanumeric characters (other than the default allow list)",
- fields: fields{
- map[string]any{
- "customProperties": map[string]any{
- "prop.erty1": "val1",
- "property2": nil,
- "prop.erty3": "val33",
- "property4": nil,
- },
- "tags": nil,
- "tagsToRemove": nil,
- },
- },
- args: args{
- []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "k!e=y",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "prop.erty1": "val1",
- },
- MetadataToRemove: map[string]string{
- "property2": "val2",
- },
- MetadataToUpdate: map[string]string{
- "prop.erty3": "val33",
- "property4": "",
- },
- },
- },
- },
- },
- expectedDimensionKey: "k_e_y",
- expectedDimensionValue: "id",
- },
- {
- name: "no dimension update for empty properties",
- shouldNotSendUpdate: true,
- excludeProperties: []dpfilters.PropertyFilter{
- {
- DimensionName: mustStringFilter(t, "key"),
- DimensionValue: mustStringFilter(t, "/^.*$/"),
- PropertyName: mustStringFilter(t, "/^prop\\.erty[13]$/"),
- PropertyValue: mustStringFilter(t, "/^.*$/"),
- },
- {
- DimensionName: mustStringFilter(t, "*"),
- DimensionValue: mustStringFilter(t, "id"),
- PropertyName: mustStringFilter(t, "property*"),
- PropertyValue: mustStringFilter(t, "/^.*$/"),
- },
- },
- args: args{
- []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "prop.erty1": "val1",
- "property2": "val2",
- "property5": "added.value",
- "property6": "property6.value",
- },
- MetadataToUpdate: map[string]string{
- "prop.erty3": "val33",
- "property4": "val",
- },
- },
- },
- },
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Use WaitGroup to ensure the mocked server has encountered
- // a request from the exporter.
- wg := sync.WaitGroup{}
- wg.Add(1)
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- b, err := io.ReadAll(r.Body)
- assert.NoError(t, err)
- // Test metadata updates are sent onto the right dimensions.
- dimPair := strings.Split(r.RequestURI, "/")[3:5]
- assert.Equal(t, tt.expectedDimensionKey, dimPair[0])
- assert.Equal(t, tt.expectedDimensionValue, dimPair[1])
- p := map[string]any{
- "customProperties": map[string]*string{},
- "tags": []string{},
- "tagsToRemove": []string{},
- }
- err = json.Unmarshal(b, &p)
- assert.NoError(t, err)
- assert.Equal(t, tt.fields.payLoad, p)
- wg.Done()
- }))
- defer server.Close()
- serverURL, err := url.Parse(server.URL)
- assert.NoError(t, err)
- logger := zap.NewNop()
- dimClient := dimensions.NewDimensionClient(
- context.Background(),
- dimensions.DimensionClientOptions{
- Token: "foo",
- APIURL: serverURL,
- LogUpdates: true,
- Logger: logger,
- SendDelay: tt.sendDelay,
- MaxBuffered: 10,
- MetricsConverter: *converter,
- ExcludeProperties: tt.excludeProperties,
- })
- dimClient.Start()
- se := &signalfxExporter{
- dimClient: dimClient,
- }
- defer func() {
- _ = se.shutdown(context.Background())
- }()
- sme := signalfMetadataExporter{
- exporter: se,
- }
- err = sme.ConsumeMetadata(tt.args.metadata)
- c := make(chan struct{})
- go func() {
- defer close(c)
- wg.Wait()
- }()
- select {
- case <-c:
- // wait 500ms longer than send delay
- case <-time.After(tt.sendDelay + 500*time.Millisecond):
- require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response")
- }
- require.NoError(t, err)
- })
- }
- }
- func BenchmarkExporterConsumeData(b *testing.B) {
- batchSize := 1000
- metrics := pmetric.NewMetrics()
- tmd := testMetricsData()
- for i := 0; i < batchSize; i++ {
- tmd.ResourceMetrics().At(0).CopyTo(metrics.ResourceMetrics().AppendEmpty())
- }
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusAccepted)
- }))
- defer server.Close()
- serverURL, err := url.Parse(server.URL)
- assert.NoError(b, err)
- c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false)
- require.NoError(b, err)
- require.NotNil(b, c)
- dpClient := &sfxDPClient{
- sfxClientBase: sfxClientBase{
- ingestURL: serverURL,
- client: &http.Client{
- Timeout: 1 * time.Second,
- },
- zippers: sync.Pool{New: func() any {
- return gzip.NewWriter(nil)
- }},
- },
- logger: zap.NewNop(),
- converter: c,
- }
- for i := 0; i < b.N; i++ {
- numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), metrics)
- assert.NoError(b, err)
- assert.Equal(b, 0, numDroppedTimeSeries)
- }
- }
- // Test to ensure SignalFx exporter implements metadata.MetadataExporter in k8s_cluster receiver.
- func TestSignalFxExporterConsumeMetadata(t *testing.T) {
- f := NewFactory()
- cfg := f.CreateDefaultConfig()
- rCfg := cfg.(*Config)
- rCfg.AccessToken = "token"
- rCfg.Realm = "realm"
- exp, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), rCfg)
- require.NoError(t, err)
- kme, ok := exp.(metadata.MetadataExporter)
- require.True(t, ok, "SignalFx exporter does not implement metadata.MetadataExporter")
- require.NotNil(t, kme)
- }
- func TestTLSExporterInit(t *testing.T) {
- tests := []struct {
- name string
- config *Config
- wantErr bool
- wantErrMessage string
- }{
- {
- name: "valid CA",
- config: &Config{
- APIURL: "https://test",
- IngestURL: "https://test",
- IngestTLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/ca.pem",
- },
- },
- APITLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/ca.pem",
- },
- },
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: false,
- },
- {
- name: "missing CA",
- config: &Config{
- APIURL: "https://test",
- IngestURL: "https://test",
- IngestTLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/missingfile",
- },
- },
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: true,
- wantErrMessage: "failed to load CA CertPool",
- },
- {
- name: "invalid CA",
- config: &Config{
- APIURL: "https://test",
- IngestURL: "https://test",
- IngestTLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/invalid-ca.pem",
- },
- },
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: true,
- wantErrMessage: "failed to load CA CertPool",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
- assert.NoError(t, err)
- err = sfx.start(context.Background(), componenttest.NewNopHost())
- if tt.wantErr {
- require.Error(t, err)
- if tt.wantErrMessage != "" {
- require.ErrorContains(t, err, tt.wantErrMessage)
- }
- } else {
- require.NotNil(t, sfx)
- }
- })
- }
- }
- func TestTLSIngestConnection(t *testing.T) {
- metricsPayload := pmetric.NewMetrics()
- rm := metricsPayload.ResourceMetrics().AppendEmpty()
- ilm := rm.ScopeMetrics().AppendEmpty()
- m := ilm.Metrics().AppendEmpty()
- m.SetName("test_gauge")
- dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.Attributes().PutStr("k0", "v0")
- dp.Attributes().PutStr("k1", "v1")
- dp.SetDoubleValue(123)
- server, err := newLocalHTTPSTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprint(w, "connection is successful")
- }))
- require.NoError(t, err)
- defer server.Close()
- serverURL := server.URL
- tests := []struct {
- name string
- config *Config
- wantErr bool
- wantErrMessage string
- }{
- {
- name: "Ingest CA not set",
- config: &Config{
- APIURL: serverURL,
- IngestURL: serverURL,
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: true,
- wantErrMessage: "x509.*certificate",
- },
- {
- name: "Ingest CA set",
- config: &Config{
- APIURL: serverURL,
- IngestURL: serverURL,
- IngestTLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/ca.pem",
- },
- },
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
- assert.NoError(t, err)
- err = sfx.start(context.Background(), componenttest.NewNopHost())
- assert.NoError(t, err)
- _, err = sfx.pushMetricsData(context.Background(), metricsPayload)
- if tt.wantErr {
- require.Error(t, err)
- if tt.wantErrMessage != "" {
- assert.Regexp(t, tt.wantErrMessage, err)
- }
- } else {
- assert.NoError(t, err)
- }
- })
- }
- }
- func TestDefaultSystemCPUTimeExcludedAndTranslated(t *testing.T) {
- translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
- require.NoError(t, err)
- converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false)
- require.NoError(t, err)
- md := pmetric.NewMetrics()
- rm := md.ResourceMetrics().AppendEmpty()
- sm := rm.ScopeMetrics().AppendEmpty()
- m := sm.Metrics().AppendEmpty()
- m.SetName("system.cpu.time")
- sum := m.SetEmptySum()
- for _, state := range []string{"idle", "interrupt", "nice", "softirq", "steal", "system", "user", "wait"} {
- for cpu := 0; cpu < 32; cpu++ {
- dp := sum.DataPoints().AppendEmpty()
- dp.SetDoubleValue(0)
- dp.Attributes().PutStr("cpu", fmt.Sprintf("%d", cpu))
- dp.Attributes().PutStr("state", state)
- }
- }
- dps := converter.MetricsToSignalFxV2(md)
- found := map[string]int64{}
- for _, dp := range dps {
- if dp.Metric == "cpu.num_processors" || dp.Metric == "cpu.idle" {
- intVal := dp.Value.IntValue
- require.NotNil(t, intVal, fmt.Sprintf("unexpected nil IntValue for %q", dp.Metric))
- found[dp.Metric] = *intVal
- } else {
- // account for unexpected w/ test-failing placeholder
- found[dp.Metric] = -1
- }
- }
- require.Equal(t, map[string]int64{
- "cpu.num_processors": 32,
- "cpu.idle": 0,
- }, found)
- }
- func TestTLSAPIConnection(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- converter, err := translation.NewMetricsConverter(
- zap.NewNop(),
- nil,
- cfg.ExcludeMetrics,
- cfg.IncludeMetrics,
- cfg.NonAlphanumericDimensionChars,
- false)
- require.NoError(t, err)
- metadata := []*metadata.MetadataUpdate{
- {
- ResourceIDKey: "key",
- ResourceID: "id",
- MetadataDelta: metadata.MetadataDelta{
- MetadataToAdd: map[string]string{
- "prop.erty1": "val1",
- },
- },
- },
- }
- server, err := newLocalHTTPSTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprint(w, "connection is successful")
- }))
- require.NoError(t, err)
- defer server.Close()
- tests := []struct {
- name string
- config *Config
- wantErr bool
- wantErrMessage string
- }{
- {
- name: "API CA set",
- config: &Config{
- APIURL: server.URL,
- IngestURL: server.URL,
- AccessToken: "random",
- SyncHostMetadata: true,
- APITLSSettings: configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/certs/ca.pem",
- },
- },
- },
- wantErr: false,
- },
- {
- name: "API CA set",
- config: &Config{
- APIURL: server.URL,
- IngestURL: server.URL,
- AccessToken: "random",
- SyncHostMetadata: true,
- },
- wantErr: true,
- wantErrMessage: "error making HTTP request.*x509",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- observedZapCore, observedLogs := observer.New(zap.DebugLevel)
- logger := zap.New(observedZapCore)
- apiTLSCfg, err := tt.config.APITLSSettings.LoadTLSConfig()
- require.NoError(t, err)
- serverURL, err := url.Parse(tt.config.APIURL)
- assert.NoError(t, err)
- cancellable, cancelFn := context.WithCancel(context.Background())
- defer cancelFn()
- dimClient := dimensions.NewDimensionClient(
- cancellable,
- dimensions.DimensionClientOptions{
- Token: "",
- APIURL: serverURL,
- LogUpdates: true,
- Logger: logger,
- SendDelay: 1,
- MaxBuffered: 10,
- MetricsConverter: *converter,
- APITLSConfig: apiTLSCfg,
- })
- dimClient.Start()
- se := &signalfxExporter{
- dimClient: dimClient,
- }
- sme := signalfMetadataExporter{
- exporter: se,
- }
- err = sme.ConsumeMetadata(metadata)
- time.Sleep(3 * time.Second)
- require.NoError(t, err)
- if tt.wantErr {
- if tt.wantErrMessage != "" {
- assert.Regexp(t, tt.wantErrMessage, observedLogs.All()[0].Context[0].Interface.(error).Error())
- }
- } else {
- require.Equal(t, 1, observedLogs.Len())
- require.Nil(t, observedLogs.All()[0].Context[0].Interface)
- }
- })
- }
- }
- func newLocalHTTPSTestServer(handler http.Handler) (*httptest.Server, error) {
- ts := httptest.NewUnstartedServer(handler)
- cert, err := tls.LoadX509KeyPair("./testdata/certs/cert.pem", "./testdata/certs/cert-key.pem")
- if err != nil {
- return nil, err
- }
- ts.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
- ts.StartTLS()
- return ts, nil
- }
|