exporter_test.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package signalfxexporter
  4. import (
  5. "compress/gzip"
  6. "context"
  7. "crypto/tls"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "net/http/httptest"
  13. "net/url"
  14. "sort"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "testing"
  19. "time"
  20. sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
  21. "github.com/stretchr/testify/assert"
  22. "github.com/stretchr/testify/require"
  23. "go.opentelemetry.io/collector/component/componenttest"
  24. "go.opentelemetry.io/collector/config/confighttp"
  25. "go.opentelemetry.io/collector/config/configopaque"
  26. "go.opentelemetry.io/collector/config/configtls"
  27. "go.opentelemetry.io/collector/consumer/consumererror"
  28. "go.opentelemetry.io/collector/exporter/exporterhelper"
  29. "go.opentelemetry.io/collector/exporter/exportertest"
  30. "go.opentelemetry.io/collector/pdata/pcommon"
  31. "go.opentelemetry.io/collector/pdata/plog"
  32. "go.opentelemetry.io/collector/pdata/pmetric"
  33. "go.uber.org/zap"
  34. "go.uber.org/zap/zaptest/observer"
  35. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/dimensions"
  36. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
  37. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation/dpfilters"
  38. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  39. metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
  40. )
  41. func TestNew(t *testing.T) {
  42. tests := []struct {
  43. name string
  44. config *Config
  45. wantErr bool
  46. wantErrMessage string
  47. }{
  48. {
  49. name: "nil config fails",
  50. wantErr: true,
  51. wantErrMessage: "nil config",
  52. },
  53. {
  54. name: "fails to create metrics converter",
  55. config: &Config{
  56. AccessToken: "test",
  57. Realm: "realm",
  58. ExcludeMetrics: []dpfilters.MetricFilter{{}},
  59. },
  60. wantErr: true,
  61. },
  62. {
  63. name: "successfully create exporter",
  64. config: &Config{
  65. AccessToken: "someToken",
  66. Realm: "xyz",
  67. HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
  68. },
  69. },
  70. {
  71. name: "create exporter with host metadata syncer",
  72. config: &Config{
  73. AccessToken: "someToken",
  74. Realm: "xyz",
  75. HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
  76. SyncHostMetadata: true,
  77. },
  78. },
  79. }
  80. for _, tt := range tests {
  81. t.Run(tt.name, func(t *testing.T) {
  82. got, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
  83. if tt.wantErr {
  84. require.Error(t, err)
  85. if tt.wantErrMessage != "" {
  86. require.EqualError(t, err, tt.wantErrMessage)
  87. }
  88. } else {
  89. require.NotNil(t, got)
  90. }
  91. })
  92. }
  93. }
  94. func TestConsumeMetrics(t *testing.T) {
  95. smallBatch := pmetric.NewMetrics()
  96. rm := smallBatch.ResourceMetrics().AppendEmpty()
  97. ilm := rm.ScopeMetrics().AppendEmpty()
  98. m := ilm.Metrics().AppendEmpty()
  99. m.SetName("test_gauge")
  100. dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
  101. dp.Attributes().PutStr("k0", "v0")
  102. dp.Attributes().PutStr("k1", "v1")
  103. dp.SetDoubleValue(123)
  104. tests := []struct {
  105. name string
  106. md pmetric.Metrics
  107. httpResponseCode int
  108. retryAfter int
  109. numDroppedTimeSeries int
  110. wantErr bool
  111. wantPermanentErr bool
  112. wantThrottleErr bool
  113. expectedErrorMsg string
  114. }{
  115. {
  116. name: "happy_path",
  117. md: smallBatch,
  118. httpResponseCode: http.StatusAccepted,
  119. },
  120. {
  121. name: "response_forbidden",
  122. md: smallBatch,
  123. httpResponseCode: http.StatusForbidden,
  124. numDroppedTimeSeries: 1,
  125. wantErr: true,
  126. expectedErrorMsg: "HTTP 403 \"Forbidden\"",
  127. },
  128. {
  129. name: "response_bad_request",
  130. md: smallBatch,
  131. httpResponseCode: http.StatusBadRequest,
  132. numDroppedTimeSeries: 1,
  133. wantPermanentErr: true,
  134. expectedErrorMsg: "Permanent error: \"HTTP/1.1 400 Bad Request",
  135. },
  136. {
  137. name: "response_throttle",
  138. md: smallBatch,
  139. httpResponseCode: http.StatusTooManyRequests,
  140. numDroppedTimeSeries: 1,
  141. wantThrottleErr: true,
  142. },
  143. {
  144. name: "response_throttle_with_header",
  145. md: smallBatch,
  146. retryAfter: 123,
  147. httpResponseCode: http.StatusServiceUnavailable,
  148. numDroppedTimeSeries: 1,
  149. wantThrottleErr: true,
  150. },
  151. {
  152. name: "large_batch",
  153. md: generateLargeDPBatch(),
  154. httpResponseCode: http.StatusAccepted,
  155. },
  156. }
  157. for _, tt := range tests {
  158. t.Run(tt.name, func(t *testing.T) {
  159. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  160. assert.Equal(t, "test", r.Header.Get("test_header_"))
  161. if (tt.httpResponseCode == http.StatusTooManyRequests ||
  162. tt.httpResponseCode == http.StatusServiceUnavailable) && tt.retryAfter != 0 {
  163. w.Header().Add(splunk.HeaderRetryAfter, strconv.Itoa(tt.retryAfter))
  164. }
  165. w.WriteHeader(tt.httpResponseCode)
  166. _, _ = w.Write([]byte("response content"))
  167. }))
  168. defer server.Close()
  169. serverURL, err := url.Parse(server.URL)
  170. assert.NoError(t, err)
  171. cfg := &Config{
  172. HTTPClientSettings: confighttp.HTTPClientSettings{
  173. Timeout: 1 * time.Second,
  174. Headers: map[string]configopaque.String{"test_header_": "test"},
  175. },
  176. }
  177. client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings)
  178. require.NoError(t, err)
  179. c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false)
  180. require.NoError(t, err)
  181. require.NotNil(t, c)
  182. dpClient := &sfxDPClient{
  183. sfxClientBase: sfxClientBase{
  184. ingestURL: serverURL,
  185. client: client,
  186. zippers: sync.Pool{New: func() any {
  187. return gzip.NewWriter(nil)
  188. }},
  189. },
  190. logger: zap.NewNop(),
  191. converter: c,
  192. }
  193. numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), tt.md)
  194. assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries)
  195. if tt.wantErr {
  196. assert.Error(t, err)
  197. assert.EqualError(t, err, tt.expectedErrorMsg)
  198. return
  199. }
  200. if tt.wantPermanentErr {
  201. assert.Error(t, err)
  202. assert.True(t, consumererror.IsPermanent(err))
  203. assert.True(t, strings.HasPrefix(err.Error(), tt.expectedErrorMsg))
  204. assert.Contains(t, err.Error(), "response content")
  205. return
  206. }
  207. if tt.wantThrottleErr {
  208. expected := fmt.Errorf("HTTP %d %q", tt.httpResponseCode, http.StatusText(tt.httpResponseCode))
  209. expected = exporterhelper.NewThrottleRetry(expected, time.Duration(tt.retryAfter)*time.Second)
  210. assert.EqualValues(t, expected, err)
  211. return
  212. }
  213. assert.NoError(t, err)
  214. })
  215. }
  216. }
  217. func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) {
  218. fromHeaders := "AccessTokenFromClientHeaders"
  219. fromLabels := []string{"AccessTokenFromLabel0", "AccessTokenFromLabel1"}
  220. validMetricsWithToken := func(includeToken bool, token string) pmetric.Metrics {
  221. out := pmetric.NewMetrics()
  222. rm := out.ResourceMetrics().AppendEmpty()
  223. if includeToken {
  224. rm.Resource().Attributes().PutStr("com.splunk.signalfx.access_token", token)
  225. }
  226. ilm := rm.ScopeMetrics().AppendEmpty()
  227. m := ilm.Metrics().AppendEmpty()
  228. m.SetName("test_gauge")
  229. dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
  230. dp.Attributes().PutStr("k0", "v0")
  231. dp.Attributes().PutStr("k1", "v1")
  232. dp.SetDoubleValue(123)
  233. return out
  234. }
  235. tests := []struct {
  236. name string
  237. accessTokenPassthrough bool
  238. metrics pmetric.Metrics
  239. additionalHeaders map[string]string
  240. pushedTokens []string
  241. }{
  242. {
  243. name: "passthrough access token and included in md",
  244. accessTokenPassthrough: true,
  245. metrics: validMetricsWithToken(true, fromLabels[0]),
  246. pushedTokens: []string{fromLabels[0]},
  247. },
  248. {
  249. name: "passthrough access token and not included in md",
  250. accessTokenPassthrough: true,
  251. metrics: validMetricsWithToken(false, fromLabels[0]),
  252. pushedTokens: []string{fromHeaders},
  253. },
  254. {
  255. name: "don't passthrough access token and included in md",
  256. accessTokenPassthrough: false,
  257. metrics: func() pmetric.Metrics {
  258. forFirstToken := validMetricsWithToken(true, fromLabels[0])
  259. tgt := forFirstToken.ResourceMetrics().AppendEmpty()
  260. validMetricsWithToken(true, fromLabels[1]).ResourceMetrics().At(0).CopyTo(tgt)
  261. return forFirstToken
  262. }(),
  263. pushedTokens: []string{fromHeaders},
  264. },
  265. {
  266. name: "don't passthrough access token and not included in md",
  267. accessTokenPassthrough: false,
  268. metrics: validMetricsWithToken(false, fromLabels[0]),
  269. pushedTokens: []string{fromHeaders},
  270. },
  271. {
  272. name: "override user-specified token-like header",
  273. accessTokenPassthrough: true,
  274. metrics: validMetricsWithToken(true, fromLabels[0]),
  275. additionalHeaders: map[string]string{
  276. "x-sf-token": "user-specified",
  277. },
  278. pushedTokens: []string{fromLabels[0]},
  279. },
  280. {
  281. name: "use token from header when resource is nil",
  282. accessTokenPassthrough: true,
  283. metrics: func() pmetric.Metrics {
  284. out := pmetric.NewMetrics()
  285. rm := out.ResourceMetrics().AppendEmpty()
  286. ilm := rm.ScopeMetrics().AppendEmpty()
  287. m := ilm.Metrics().AppendEmpty()
  288. m.SetName("test_gauge")
  289. dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
  290. dp.Attributes().PutStr("k0", "v0")
  291. dp.Attributes().PutStr("k1", "v1")
  292. dp.SetDoubleValue(123)
  293. return out
  294. }(),
  295. pushedTokens: []string{fromHeaders},
  296. },
  297. {
  298. name: "multiple tokens passed through",
  299. accessTokenPassthrough: true,
  300. metrics: func() pmetric.Metrics {
  301. forFirstToken := validMetricsWithToken(true, fromLabels[0])
  302. forSecondToken := validMetricsWithToken(true, fromLabels[1])
  303. forSecondToken.ResourceMetrics().EnsureCapacity(2)
  304. forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  305. return forSecondToken
  306. }(),
  307. pushedTokens: []string{fromLabels[0], fromLabels[1]},
  308. },
  309. {
  310. name: "multiple tokens passed through - multiple md with same token",
  311. accessTokenPassthrough: true,
  312. metrics: func() pmetric.Metrics {
  313. forFirstToken := validMetricsWithToken(true, fromLabels[1])
  314. forSecondToken := validMetricsWithToken(true, fromLabels[0])
  315. moreForSecondToken := validMetricsWithToken(true, fromLabels[1])
  316. forSecondToken.ResourceMetrics().EnsureCapacity(3)
  317. forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  318. moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  319. return forSecondToken
  320. }(),
  321. pushedTokens: []string{fromLabels[0], fromLabels[1]},
  322. },
  323. {
  324. name: "multiple tokens passed through - multiple md with same token grouped together",
  325. accessTokenPassthrough: true,
  326. metrics: func() pmetric.Metrics {
  327. forFirstToken := validMetricsWithToken(true, fromLabels[0])
  328. forSecondToken := validMetricsWithToken(true, fromLabels[1])
  329. moreForSecondToken := validMetricsWithToken(true, fromLabels[1])
  330. forSecondToken.ResourceMetrics().EnsureCapacity(3)
  331. moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  332. forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  333. return forSecondToken
  334. }(),
  335. pushedTokens: []string{fromLabels[0], fromLabels[1]},
  336. },
  337. {
  338. name: "multiple tokens passed through - one corrupted",
  339. accessTokenPassthrough: true,
  340. metrics: func() pmetric.Metrics {
  341. forFirstToken := validMetricsWithToken(true, fromLabels[0])
  342. forSecondToken := validMetricsWithToken(false, fromLabels[1])
  343. forSecondToken.ResourceMetrics().EnsureCapacity(2)
  344. forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty())
  345. return forSecondToken
  346. }(),
  347. pushedTokens: []string{fromLabels[0], fromHeaders},
  348. },
  349. }
  350. for _, tt := range tests {
  351. receivedTokens := struct {
  352. sync.Mutex
  353. tokens []string
  354. }{}
  355. receivedTokens.tokens = []string{}
  356. t.Run(tt.name, func(t *testing.T) {
  357. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  358. assert.Equal(t, tt.name, r.Header.Get("test_header_"))
  359. receivedTokens.Lock()
  360. token := r.Header.Get("x-sf-token")
  361. receivedTokens.tokens = append(receivedTokens.tokens, token)
  362. receivedTokens.Unlock()
  363. w.WriteHeader(http.StatusAccepted)
  364. }))
  365. defer server.Close()
  366. factory := NewFactory()
  367. cfg := factory.CreateDefaultConfig().(*Config)
  368. cfg.IngestURL = server.URL
  369. cfg.APIURL = server.URL
  370. cfg.HTTPClientSettings.Headers = make(map[string]configopaque.String)
  371. for k, v := range tt.additionalHeaders {
  372. cfg.HTTPClientSettings.Headers[k] = configopaque.String(v)
  373. }
  374. cfg.HTTPClientSettings.Headers["test_header_"] = configopaque.String(tt.name)
  375. cfg.AccessToken = configopaque.String(fromHeaders)
  376. cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
  377. sfxExp, err := NewFactory().CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
  378. require.NoError(t, err)
  379. require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
  380. defer func() {
  381. require.NoError(t, sfxExp.Shutdown(context.Background()))
  382. }()
  383. err = sfxExp.ConsumeMetrics(context.Background(), tt.metrics)
  384. assert.NoError(t, err)
  385. require.Eventually(t, func() bool {
  386. receivedTokens.Lock()
  387. defer receivedTokens.Unlock()
  388. return len(tt.pushedTokens) == len(receivedTokens.tokens)
  389. }, 1*time.Second, 10*time.Millisecond)
  390. sort.Strings(tt.pushedTokens)
  391. sort.Strings(receivedTokens.tokens)
  392. assert.Equal(t, tt.pushedTokens, receivedTokens.tokens)
  393. })
  394. }
  395. }
  396. func TestNewEventExporter(t *testing.T) {
  397. got, err := newEventExporter(nil, exportertest.NewNopCreateSettings())
  398. assert.EqualError(t, err, "nil config")
  399. assert.Nil(t, got)
  400. got, err = newEventExporter(nil, exportertest.NewNopCreateSettings())
  401. assert.Error(t, err)
  402. assert.Nil(t, got)
  403. cfg := &Config{
  404. AccessToken: "someToken",
  405. Realm: "xyz",
  406. HTTPClientSettings: confighttp.HTTPClientSettings{Timeout: 1 * time.Second},
  407. }
  408. got, err = newEventExporter(cfg, exportertest.NewNopCreateSettings())
  409. assert.NoError(t, err)
  410. require.NotNil(t, got)
  411. err = got.startLogs(context.Background(), componenttest.NewNopHost())
  412. assert.NoError(t, err)
  413. // This is expected to fail.
  414. ld := makeSampleResourceLogs()
  415. err = got.pushLogs(context.Background(), ld)
  416. assert.Error(t, err)
  417. }
  418. func makeSampleResourceLogs() plog.Logs {
  419. out := plog.NewLogs()
  420. l := out.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
  421. l.SetTimestamp(pcommon.Timestamp(1000))
  422. attrs := l.Attributes()
  423. attrs.PutStr("k0", "v0")
  424. attrs.PutStr("k1", "v1")
  425. attrs.PutStr("k2", "v2")
  426. propMap := attrs.PutEmptyMap("com.splunk.signalfx.event_properties")
  427. propMap.PutStr("env", "prod")
  428. propMap.PutBool("isActive", true)
  429. propMap.PutInt("rack", 5)
  430. propMap.PutDouble("temp", 40.5)
  431. attrs.PutInt("com.splunk.signalfx.event_category", int64(sfxpb.EventCategory_USER_DEFINED))
  432. attrs.PutStr("com.splunk.signalfx.event_type", "shutdown")
  433. return out
  434. }
  435. func TestConsumeEventData(t *testing.T) {
  436. tests := []struct {
  437. name string
  438. resourceLogs plog.Logs
  439. reqTestFunc func(t *testing.T, r *http.Request)
  440. httpResponseCode int
  441. numDroppedLogRecords int
  442. wantErr bool
  443. }{
  444. {
  445. name: "happy_path",
  446. resourceLogs: makeSampleResourceLogs(),
  447. reqTestFunc: nil,
  448. httpResponseCode: http.StatusAccepted,
  449. },
  450. {
  451. name: "no_event_attribute",
  452. resourceLogs: func() plog.Logs {
  453. out := makeSampleResourceLogs()
  454. attrs := out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()
  455. attrs.Remove("com.splunk.signalfx.event_category")
  456. attrs.Remove("com.splunk.signalfx.event_type")
  457. return out
  458. }(),
  459. reqTestFunc: nil,
  460. numDroppedLogRecords: 1,
  461. httpResponseCode: http.StatusAccepted,
  462. },
  463. {
  464. name: "nonconvertible_log_attrs",
  465. resourceLogs: func() plog.Logs {
  466. out := makeSampleResourceLogs()
  467. attrs := out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes()
  468. attrs.PutEmptyMap("map")
  469. propsAttrs, _ := attrs.Get("com.splunk.signalfx.event_properties")
  470. propsAttrs.Map().PutEmptyMap("map")
  471. return out
  472. }(),
  473. reqTestFunc: nil,
  474. // The log does go through, just without that prop
  475. numDroppedLogRecords: 0,
  476. httpResponseCode: http.StatusAccepted,
  477. },
  478. {
  479. name: "response_forbidden",
  480. resourceLogs: makeSampleResourceLogs(),
  481. reqTestFunc: nil,
  482. httpResponseCode: http.StatusForbidden,
  483. numDroppedLogRecords: 1,
  484. wantErr: true,
  485. },
  486. {
  487. name: "large_batch",
  488. resourceLogs: generateLargeEventBatch(),
  489. reqTestFunc: nil,
  490. httpResponseCode: http.StatusAccepted,
  491. },
  492. }
  493. for _, tt := range tests {
  494. t.Run(tt.name, func(t *testing.T) {
  495. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  496. assert.Equal(t, "test", r.Header.Get("test_header_"))
  497. if tt.reqTestFunc != nil {
  498. tt.reqTestFunc(t, r)
  499. }
  500. w.WriteHeader(tt.httpResponseCode)
  501. }))
  502. defer server.Close()
  503. serverURL, err := url.Parse(server.URL)
  504. assert.NoError(t, err)
  505. cfg := &Config{
  506. HTTPClientSettings: confighttp.HTTPClientSettings{
  507. Timeout: 1 * time.Second,
  508. Headers: map[string]configopaque.String{"test_header_": "test"},
  509. },
  510. }
  511. client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings)
  512. require.NoError(t, err)
  513. eventClient := &sfxEventClient{
  514. sfxClientBase: sfxClientBase{
  515. ingestURL: serverURL,
  516. client: client,
  517. zippers: newGzipPool(),
  518. },
  519. logger: zap.NewNop(),
  520. }
  521. numDroppedLogRecords, err := eventClient.pushLogsData(context.Background(), tt.resourceLogs)
  522. assert.Equal(t, tt.numDroppedLogRecords, numDroppedLogRecords)
  523. if tt.wantErr {
  524. assert.Error(t, err)
  525. return
  526. }
  527. assert.NoError(t, err)
  528. })
  529. }
  530. }
  531. func TestConsumeLogsDataWithAccessTokenPassthrough(t *testing.T) {
  532. fromHeaders := "AccessTokenFromClientHeaders"
  533. fromLabels := "AccessTokenFromLabel"
  534. newLogData := func(includeToken bool) plog.Logs {
  535. out := makeSampleResourceLogs()
  536. makeSampleResourceLogs().ResourceLogs().At(0).CopyTo(out.ResourceLogs().AppendEmpty())
  537. if includeToken {
  538. out.ResourceLogs().At(0).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
  539. out.ResourceLogs().At(1).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
  540. }
  541. return out
  542. }
  543. tests := []struct {
  544. name string
  545. accessTokenPassthrough bool
  546. includedInLogData bool
  547. expectedToken string
  548. }{
  549. {
  550. name: "passthrough access token and included in logs",
  551. accessTokenPassthrough: true,
  552. includedInLogData: true,
  553. expectedToken: fromLabels,
  554. },
  555. {
  556. name: "passthrough access token and not included in logs",
  557. accessTokenPassthrough: true,
  558. includedInLogData: false,
  559. expectedToken: fromHeaders,
  560. },
  561. {
  562. name: "don't passthrough access token and included in logs",
  563. accessTokenPassthrough: false,
  564. includedInLogData: true,
  565. expectedToken: fromHeaders,
  566. },
  567. {
  568. name: "don't passthrough access token and not included in logs",
  569. accessTokenPassthrough: false,
  570. includedInLogData: false,
  571. expectedToken: fromHeaders,
  572. },
  573. }
  574. for _, tt := range tests {
  575. t.Run(tt.name, func(t *testing.T) {
  576. receivedTokens := struct {
  577. sync.Mutex
  578. tokens []string
  579. }{}
  580. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  581. assert.Equal(t, tt.name, r.Header.Get("test_header_"))
  582. receivedTokens.Lock()
  583. receivedTokens.tokens = append(receivedTokens.tokens, r.Header.Get("x-sf-token"))
  584. receivedTokens.Unlock()
  585. w.WriteHeader(http.StatusAccepted)
  586. }))
  587. defer server.Close()
  588. factory := NewFactory()
  589. cfg := factory.CreateDefaultConfig().(*Config)
  590. cfg.IngestURL = server.URL
  591. cfg.APIURL = server.URL
  592. cfg.Headers = make(map[string]configopaque.String)
  593. cfg.Headers["test_header_"] = configopaque.String(tt.name)
  594. cfg.AccessToken = configopaque.String(fromHeaders)
  595. cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
  596. sfxExp, err := NewFactory().CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
  597. require.NoError(t, err)
  598. require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
  599. defer func() {
  600. require.NoError(t, sfxExp.Shutdown(context.Background()))
  601. }()
  602. assert.NoError(t, sfxExp.ConsumeLogs(context.Background(), newLogData(tt.includedInLogData)))
  603. require.Eventually(t, func() bool {
  604. receivedTokens.Lock()
  605. defer receivedTokens.Unlock()
  606. return len(receivedTokens.tokens) == 1
  607. }, 1*time.Second, 10*time.Millisecond)
  608. assert.Equal(t, receivedTokens.tokens[0], tt.expectedToken)
  609. })
  610. }
  611. }
  612. func generateLargeDPBatch() pmetric.Metrics {
  613. md := pmetric.NewMetrics()
  614. md.ResourceMetrics().EnsureCapacity(6500)
  615. ts := time.Now()
  616. for i := 0; i < 6500; i++ {
  617. rm := md.ResourceMetrics().AppendEmpty()
  618. ilm := rm.ScopeMetrics().AppendEmpty()
  619. m := ilm.Metrics().AppendEmpty()
  620. m.SetName("test_" + strconv.Itoa(i))
  621. dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
  622. dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
  623. dp.Attributes().PutStr("k0", "v0")
  624. dp.Attributes().PutStr("k1", "v1")
  625. dp.SetIntValue(int64(i))
  626. }
  627. return md
  628. }
  629. func generateLargeEventBatch() plog.Logs {
  630. out := plog.NewLogs()
  631. logs := out.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
  632. batchSize := 65000
  633. logs.EnsureCapacity(batchSize)
  634. ts := time.Now()
  635. for i := 0; i < batchSize; i++ {
  636. lr := logs.AppendEmpty()
  637. lr.Attributes().PutStr("k0", "k1")
  638. lr.Attributes().PutEmpty("com.splunk.signalfx.event_category")
  639. lr.SetTimestamp(pcommon.NewTimestampFromTime(ts))
  640. }
  641. return out
  642. }
  643. func TestConsumeMetadataNotStarted(t *testing.T) {
  644. exporter := &signalfxExporter{}
  645. err := exporter.pushMetadata([]*metadata.MetadataUpdate{})
  646. require.ErrorContains(t, err, "exporter has not started")
  647. }
  648. func TestConsumeMetadata(t *testing.T) {
  649. cfg := createDefaultConfig().(*Config)
  650. converter, err := translation.NewMetricsConverter(
  651. zap.NewNop(),
  652. nil,
  653. cfg.ExcludeMetrics,
  654. cfg.IncludeMetrics,
  655. cfg.NonAlphanumericDimensionChars,
  656. false,
  657. )
  658. require.NoError(t, err)
  659. type args struct {
  660. metadata []*metadata.MetadataUpdate
  661. }
  662. type fields struct {
  663. payLoad map[string]any
  664. }
  665. tests := []struct {
  666. name string
  667. fields fields
  668. args args
  669. excludeProperties []dpfilters.PropertyFilter
  670. expectedDimensionKey string
  671. expectedDimensionValue string
  672. sendDelay time.Duration
  673. shouldNotSendUpdate bool
  674. }{
  675. {
  676. name: "Test property updates",
  677. fields: fields{
  678. map[string]any{
  679. "customProperties": map[string]any{
  680. "prop.erty1": "val1",
  681. "property2": nil,
  682. "prop.erty3": "val33",
  683. "property4": nil,
  684. },
  685. "tags": nil,
  686. "tagsToRemove": nil,
  687. },
  688. },
  689. excludeProperties: []dpfilters.PropertyFilter{
  690. {
  691. DimensionName: mustStringFilter(t, "/^.*$/"),
  692. DimensionValue: mustStringFilter(t, "/^.*$/"),
  693. PropertyName: mustStringFilter(t, "/^property2$/"),
  694. PropertyValue: mustStringFilter(t, "some*value"),
  695. },
  696. {
  697. DimensionName: mustStringFilter(t, "/^.*$/"),
  698. DimensionValue: mustStringFilter(t, "/^.*$/"),
  699. PropertyName: mustStringFilter(t, "property5"),
  700. PropertyValue: mustStringFilter(t, "/^.*$/"),
  701. },
  702. {
  703. DimensionName: mustStringFilter(t, "*"),
  704. DimensionValue: mustStringFilter(t, "*"),
  705. PropertyName: mustStringFilter(t, "/^pro[op]erty6$/"),
  706. PropertyValue: mustStringFilter(t, "property*value"),
  707. },
  708. },
  709. args: args{
  710. []*metadata.MetadataUpdate{
  711. {
  712. ResourceIDKey: "key",
  713. ResourceID: "id",
  714. MetadataDelta: metadata.MetadataDelta{
  715. MetadataToAdd: map[string]string{
  716. "prop.erty1": "val1",
  717. "property5": "added.value",
  718. "property6": "property6.value",
  719. },
  720. MetadataToRemove: map[string]string{
  721. "property2": "val2",
  722. "property5": "removed.value",
  723. },
  724. MetadataToUpdate: map[string]string{
  725. "prop.erty3": "val33",
  726. "property4": "",
  727. },
  728. },
  729. },
  730. },
  731. },
  732. expectedDimensionKey: "key",
  733. expectedDimensionValue: "id",
  734. },
  735. {
  736. name: "Test tag updates",
  737. fields: fields{
  738. map[string]any{
  739. "customProperties": map[string]any{},
  740. "tags": []any{
  741. "tag.1",
  742. },
  743. "tagsToRemove": []any{
  744. "tag/2",
  745. },
  746. },
  747. },
  748. excludeProperties: []dpfilters.PropertyFilter{
  749. {
  750. // confirms tags aren't affected by excludeProperties filters
  751. DimensionName: mustStringFilter(t, "/^.*$/"),
  752. DimensionValue: mustStringFilter(t, "/^.*$/"),
  753. PropertyName: mustStringFilter(t, "/^.*$/"),
  754. PropertyValue: mustStringFilter(t, "/^.*$/"),
  755. },
  756. },
  757. args: args{
  758. []*metadata.MetadataUpdate{
  759. {
  760. ResourceIDKey: "key",
  761. ResourceID: "id",
  762. MetadataDelta: metadata.MetadataDelta{
  763. MetadataToAdd: map[string]string{
  764. "tag.1": "",
  765. },
  766. MetadataToRemove: map[string]string{
  767. "tag/2": "",
  768. },
  769. MetadataToUpdate: map[string]string{},
  770. },
  771. },
  772. },
  773. },
  774. expectedDimensionKey: "key",
  775. expectedDimensionValue: "id",
  776. },
  777. {
  778. name: "Test quick successive updates",
  779. fields: fields{
  780. map[string]any{
  781. "customProperties": map[string]any{
  782. "property1": nil,
  783. "property2": "val2",
  784. "property3": nil,
  785. },
  786. "tags": []any{
  787. "tag/2",
  788. },
  789. "tagsToRemove": []any{
  790. "tag.1",
  791. },
  792. },
  793. },
  794. args: args{
  795. []*metadata.MetadataUpdate{
  796. {
  797. ResourceIDKey: "key",
  798. ResourceID: "id",
  799. MetadataDelta: metadata.MetadataDelta{
  800. MetadataToAdd: map[string]string{
  801. "tag.1": "",
  802. "property1": "val1",
  803. "property3": "val3",
  804. },
  805. MetadataToRemove: map[string]string{
  806. "tag/2": "",
  807. },
  808. MetadataToUpdate: map[string]string{
  809. "property2": "val22",
  810. },
  811. },
  812. },
  813. {
  814. ResourceIDKey: "key",
  815. ResourceID: "id",
  816. MetadataDelta: metadata.MetadataDelta{
  817. MetadataToAdd: map[string]string{
  818. "tag/2": "",
  819. },
  820. MetadataToRemove: map[string]string{
  821. "tag.1": "",
  822. "property1": "val1",
  823. },
  824. MetadataToUpdate: map[string]string{
  825. "property2": "val2",
  826. "property3": "val33",
  827. },
  828. },
  829. },
  830. {
  831. ResourceIDKey: "key",
  832. ResourceID: "id",
  833. MetadataDelta: metadata.MetadataDelta{
  834. MetadataToAdd: map[string]string{},
  835. MetadataToRemove: map[string]string{
  836. "property3": "val33",
  837. },
  838. MetadataToUpdate: map[string]string{},
  839. },
  840. },
  841. },
  842. },
  843. expectedDimensionKey: "key",
  844. expectedDimensionValue: "id",
  845. sendDelay: time.Second,
  846. },
  847. {
  848. name: "Test updates on dimensions with nonalphanumeric characters (other than the default allow list)",
  849. fields: fields{
  850. map[string]any{
  851. "customProperties": map[string]any{
  852. "prop.erty1": "val1",
  853. "property2": nil,
  854. "prop.erty3": "val33",
  855. "property4": nil,
  856. },
  857. "tags": nil,
  858. "tagsToRemove": nil,
  859. },
  860. },
  861. args: args{
  862. []*metadata.MetadataUpdate{
  863. {
  864. ResourceIDKey: "k!e=y",
  865. ResourceID: "id",
  866. MetadataDelta: metadata.MetadataDelta{
  867. MetadataToAdd: map[string]string{
  868. "prop.erty1": "val1",
  869. },
  870. MetadataToRemove: map[string]string{
  871. "property2": "val2",
  872. },
  873. MetadataToUpdate: map[string]string{
  874. "prop.erty3": "val33",
  875. "property4": "",
  876. },
  877. },
  878. },
  879. },
  880. },
  881. expectedDimensionKey: "k_e_y",
  882. expectedDimensionValue: "id",
  883. },
  884. {
  885. name: "no dimension update for empty properties",
  886. shouldNotSendUpdate: true,
  887. excludeProperties: []dpfilters.PropertyFilter{
  888. {
  889. DimensionName: mustStringFilter(t, "key"),
  890. DimensionValue: mustStringFilter(t, "/^.*$/"),
  891. PropertyName: mustStringFilter(t, "/^prop\\.erty[13]$/"),
  892. PropertyValue: mustStringFilter(t, "/^.*$/"),
  893. },
  894. {
  895. DimensionName: mustStringFilter(t, "*"),
  896. DimensionValue: mustStringFilter(t, "id"),
  897. PropertyName: mustStringFilter(t, "property*"),
  898. PropertyValue: mustStringFilter(t, "/^.*$/"),
  899. },
  900. },
  901. args: args{
  902. []*metadata.MetadataUpdate{
  903. {
  904. ResourceIDKey: "key",
  905. ResourceID: "id",
  906. MetadataDelta: metadata.MetadataDelta{
  907. MetadataToAdd: map[string]string{
  908. "prop.erty1": "val1",
  909. "property2": "val2",
  910. "property5": "added.value",
  911. "property6": "property6.value",
  912. },
  913. MetadataToUpdate: map[string]string{
  914. "prop.erty3": "val33",
  915. "property4": "val",
  916. },
  917. },
  918. },
  919. },
  920. },
  921. },
  922. }
  923. for _, tt := range tests {
  924. t.Run(tt.name, func(t *testing.T) {
  925. // Use WaitGroup to ensure the mocked server has encountered
  926. // a request from the exporter.
  927. wg := sync.WaitGroup{}
  928. wg.Add(1)
  929. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  930. b, err := io.ReadAll(r.Body)
  931. assert.NoError(t, err)
  932. // Test metadata updates are sent onto the right dimensions.
  933. dimPair := strings.Split(r.RequestURI, "/")[3:5]
  934. assert.Equal(t, tt.expectedDimensionKey, dimPair[0])
  935. assert.Equal(t, tt.expectedDimensionValue, dimPair[1])
  936. p := map[string]any{
  937. "customProperties": map[string]*string{},
  938. "tags": []string{},
  939. "tagsToRemove": []string{},
  940. }
  941. err = json.Unmarshal(b, &p)
  942. assert.NoError(t, err)
  943. assert.Equal(t, tt.fields.payLoad, p)
  944. wg.Done()
  945. }))
  946. defer server.Close()
  947. serverURL, err := url.Parse(server.URL)
  948. assert.NoError(t, err)
  949. logger := zap.NewNop()
  950. dimClient := dimensions.NewDimensionClient(
  951. context.Background(),
  952. dimensions.DimensionClientOptions{
  953. Token: "foo",
  954. APIURL: serverURL,
  955. LogUpdates: true,
  956. Logger: logger,
  957. SendDelay: tt.sendDelay,
  958. MaxBuffered: 10,
  959. MetricsConverter: *converter,
  960. ExcludeProperties: tt.excludeProperties,
  961. })
  962. dimClient.Start()
  963. se := &signalfxExporter{
  964. dimClient: dimClient,
  965. }
  966. defer func() {
  967. _ = se.shutdown(context.Background())
  968. }()
  969. sme := signalfMetadataExporter{
  970. exporter: se,
  971. }
  972. err = sme.ConsumeMetadata(tt.args.metadata)
  973. c := make(chan struct{})
  974. go func() {
  975. defer close(c)
  976. wg.Wait()
  977. }()
  978. select {
  979. case <-c:
  980. // wait 500ms longer than send delay
  981. case <-time.After(tt.sendDelay + 500*time.Millisecond):
  982. require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response")
  983. }
  984. require.NoError(t, err)
  985. })
  986. }
  987. }
  988. func BenchmarkExporterConsumeData(b *testing.B) {
  989. batchSize := 1000
  990. metrics := pmetric.NewMetrics()
  991. tmd := testMetricsData()
  992. for i := 0; i < batchSize; i++ {
  993. tmd.ResourceMetrics().At(0).CopyTo(metrics.ResourceMetrics().AppendEmpty())
  994. }
  995. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  996. w.WriteHeader(http.StatusAccepted)
  997. }))
  998. defer server.Close()
  999. serverURL, err := url.Parse(server.URL)
  1000. assert.NoError(b, err)
  1001. c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false)
  1002. require.NoError(b, err)
  1003. require.NotNil(b, c)
  1004. dpClient := &sfxDPClient{
  1005. sfxClientBase: sfxClientBase{
  1006. ingestURL: serverURL,
  1007. client: &http.Client{
  1008. Timeout: 1 * time.Second,
  1009. },
  1010. zippers: sync.Pool{New: func() any {
  1011. return gzip.NewWriter(nil)
  1012. }},
  1013. },
  1014. logger: zap.NewNop(),
  1015. converter: c,
  1016. }
  1017. for i := 0; i < b.N; i++ {
  1018. numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), metrics)
  1019. assert.NoError(b, err)
  1020. assert.Equal(b, 0, numDroppedTimeSeries)
  1021. }
  1022. }
  1023. // Test to ensure SignalFx exporter implements metadata.MetadataExporter in k8s_cluster receiver.
  1024. func TestSignalFxExporterConsumeMetadata(t *testing.T) {
  1025. f := NewFactory()
  1026. cfg := f.CreateDefaultConfig()
  1027. rCfg := cfg.(*Config)
  1028. rCfg.AccessToken = "token"
  1029. rCfg.Realm = "realm"
  1030. exp, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), rCfg)
  1031. require.NoError(t, err)
  1032. kme, ok := exp.(metadata.MetadataExporter)
  1033. require.True(t, ok, "SignalFx exporter does not implement metadata.MetadataExporter")
  1034. require.NotNil(t, kme)
  1035. }
  1036. func TestTLSExporterInit(t *testing.T) {
  1037. tests := []struct {
  1038. name string
  1039. config *Config
  1040. wantErr bool
  1041. wantErrMessage string
  1042. }{
  1043. {
  1044. name: "valid CA",
  1045. config: &Config{
  1046. APIURL: "https://test",
  1047. IngestURL: "https://test",
  1048. IngestTLSSettings: configtls.TLSClientSetting{
  1049. TLSSetting: configtls.TLSSetting{
  1050. CAFile: "./testdata/certs/ca.pem",
  1051. },
  1052. },
  1053. APITLSSettings: configtls.TLSClientSetting{
  1054. TLSSetting: configtls.TLSSetting{
  1055. CAFile: "./testdata/certs/ca.pem",
  1056. },
  1057. },
  1058. AccessToken: "random",
  1059. SyncHostMetadata: true,
  1060. },
  1061. wantErr: false,
  1062. },
  1063. {
  1064. name: "missing CA",
  1065. config: &Config{
  1066. APIURL: "https://test",
  1067. IngestURL: "https://test",
  1068. IngestTLSSettings: configtls.TLSClientSetting{
  1069. TLSSetting: configtls.TLSSetting{
  1070. CAFile: "./testdata/certs/missingfile",
  1071. },
  1072. },
  1073. AccessToken: "random",
  1074. SyncHostMetadata: true,
  1075. },
  1076. wantErr: true,
  1077. wantErrMessage: "failed to load CA CertPool",
  1078. },
  1079. {
  1080. name: "invalid CA",
  1081. config: &Config{
  1082. APIURL: "https://test",
  1083. IngestURL: "https://test",
  1084. IngestTLSSettings: configtls.TLSClientSetting{
  1085. TLSSetting: configtls.TLSSetting{
  1086. CAFile: "./testdata/certs/invalid-ca.pem",
  1087. },
  1088. },
  1089. AccessToken: "random",
  1090. SyncHostMetadata: true,
  1091. },
  1092. wantErr: true,
  1093. wantErrMessage: "failed to load CA CertPool",
  1094. },
  1095. }
  1096. for _, tt := range tests {
  1097. t.Run(tt.name, func(t *testing.T) {
  1098. sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
  1099. assert.NoError(t, err)
  1100. err = sfx.start(context.Background(), componenttest.NewNopHost())
  1101. if tt.wantErr {
  1102. require.Error(t, err)
  1103. if tt.wantErrMessage != "" {
  1104. require.ErrorContains(t, err, tt.wantErrMessage)
  1105. }
  1106. } else {
  1107. require.NotNil(t, sfx)
  1108. }
  1109. })
  1110. }
  1111. }
  1112. func TestTLSIngestConnection(t *testing.T) {
  1113. metricsPayload := pmetric.NewMetrics()
  1114. rm := metricsPayload.ResourceMetrics().AppendEmpty()
  1115. ilm := rm.ScopeMetrics().AppendEmpty()
  1116. m := ilm.Metrics().AppendEmpty()
  1117. m.SetName("test_gauge")
  1118. dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
  1119. dp.Attributes().PutStr("k0", "v0")
  1120. dp.Attributes().PutStr("k1", "v1")
  1121. dp.SetDoubleValue(123)
  1122. server, err := newLocalHTTPSTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1123. fmt.Fprint(w, "connection is successful")
  1124. }))
  1125. require.NoError(t, err)
  1126. defer server.Close()
  1127. serverURL := server.URL
  1128. tests := []struct {
  1129. name string
  1130. config *Config
  1131. wantErr bool
  1132. wantErrMessage string
  1133. }{
  1134. {
  1135. name: "Ingest CA not set",
  1136. config: &Config{
  1137. APIURL: serverURL,
  1138. IngestURL: serverURL,
  1139. AccessToken: "random",
  1140. SyncHostMetadata: true,
  1141. },
  1142. wantErr: true,
  1143. wantErrMessage: "x509.*certificate",
  1144. },
  1145. {
  1146. name: "Ingest CA set",
  1147. config: &Config{
  1148. APIURL: serverURL,
  1149. IngestURL: serverURL,
  1150. IngestTLSSettings: configtls.TLSClientSetting{
  1151. TLSSetting: configtls.TLSSetting{
  1152. CAFile: "./testdata/certs/ca.pem",
  1153. },
  1154. },
  1155. AccessToken: "random",
  1156. SyncHostMetadata: true,
  1157. },
  1158. wantErr: false,
  1159. },
  1160. }
  1161. for _, tt := range tests {
  1162. t.Run(tt.name, func(t *testing.T) {
  1163. sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings())
  1164. assert.NoError(t, err)
  1165. err = sfx.start(context.Background(), componenttest.NewNopHost())
  1166. assert.NoError(t, err)
  1167. _, err = sfx.pushMetricsData(context.Background(), metricsPayload)
  1168. if tt.wantErr {
  1169. require.Error(t, err)
  1170. if tt.wantErrMessage != "" {
  1171. assert.Regexp(t, tt.wantErrMessage, err)
  1172. }
  1173. } else {
  1174. assert.NoError(t, err)
  1175. }
  1176. })
  1177. }
  1178. }
  1179. func TestDefaultSystemCPUTimeExcludedAndTranslated(t *testing.T) {
  1180. translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
  1181. require.NoError(t, err)
  1182. converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false)
  1183. require.NoError(t, err)
  1184. md := pmetric.NewMetrics()
  1185. rm := md.ResourceMetrics().AppendEmpty()
  1186. sm := rm.ScopeMetrics().AppendEmpty()
  1187. m := sm.Metrics().AppendEmpty()
  1188. m.SetName("system.cpu.time")
  1189. sum := m.SetEmptySum()
  1190. for _, state := range []string{"idle", "interrupt", "nice", "softirq", "steal", "system", "user", "wait"} {
  1191. for cpu := 0; cpu < 32; cpu++ {
  1192. dp := sum.DataPoints().AppendEmpty()
  1193. dp.SetDoubleValue(0)
  1194. dp.Attributes().PutStr("cpu", fmt.Sprintf("%d", cpu))
  1195. dp.Attributes().PutStr("state", state)
  1196. }
  1197. }
  1198. dps := converter.MetricsToSignalFxV2(md)
  1199. found := map[string]int64{}
  1200. for _, dp := range dps {
  1201. if dp.Metric == "cpu.num_processors" || dp.Metric == "cpu.idle" {
  1202. intVal := dp.Value.IntValue
  1203. require.NotNil(t, intVal, fmt.Sprintf("unexpected nil IntValue for %q", dp.Metric))
  1204. found[dp.Metric] = *intVal
  1205. } else {
  1206. // account for unexpected w/ test-failing placeholder
  1207. found[dp.Metric] = -1
  1208. }
  1209. }
  1210. require.Equal(t, map[string]int64{
  1211. "cpu.num_processors": 32,
  1212. "cpu.idle": 0,
  1213. }, found)
  1214. }
  1215. func TestTLSAPIConnection(t *testing.T) {
  1216. cfg := createDefaultConfig().(*Config)
  1217. converter, err := translation.NewMetricsConverter(
  1218. zap.NewNop(),
  1219. nil,
  1220. cfg.ExcludeMetrics,
  1221. cfg.IncludeMetrics,
  1222. cfg.NonAlphanumericDimensionChars,
  1223. false)
  1224. require.NoError(t, err)
  1225. metadata := []*metadata.MetadataUpdate{
  1226. {
  1227. ResourceIDKey: "key",
  1228. ResourceID: "id",
  1229. MetadataDelta: metadata.MetadataDelta{
  1230. MetadataToAdd: map[string]string{
  1231. "prop.erty1": "val1",
  1232. },
  1233. },
  1234. },
  1235. }
  1236. server, err := newLocalHTTPSTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1237. fmt.Fprint(w, "connection is successful")
  1238. }))
  1239. require.NoError(t, err)
  1240. defer server.Close()
  1241. tests := []struct {
  1242. name string
  1243. config *Config
  1244. wantErr bool
  1245. wantErrMessage string
  1246. }{
  1247. {
  1248. name: "API CA set",
  1249. config: &Config{
  1250. APIURL: server.URL,
  1251. IngestURL: server.URL,
  1252. AccessToken: "random",
  1253. SyncHostMetadata: true,
  1254. APITLSSettings: configtls.TLSClientSetting{
  1255. TLSSetting: configtls.TLSSetting{
  1256. CAFile: "./testdata/certs/ca.pem",
  1257. },
  1258. },
  1259. },
  1260. wantErr: false,
  1261. },
  1262. {
  1263. name: "API CA set",
  1264. config: &Config{
  1265. APIURL: server.URL,
  1266. IngestURL: server.URL,
  1267. AccessToken: "random",
  1268. SyncHostMetadata: true,
  1269. },
  1270. wantErr: true,
  1271. wantErrMessage: "error making HTTP request.*x509",
  1272. },
  1273. }
  1274. for _, tt := range tests {
  1275. t.Run(tt.name, func(t *testing.T) {
  1276. observedZapCore, observedLogs := observer.New(zap.DebugLevel)
  1277. logger := zap.New(observedZapCore)
  1278. apiTLSCfg, err := tt.config.APITLSSettings.LoadTLSConfig()
  1279. require.NoError(t, err)
  1280. serverURL, err := url.Parse(tt.config.APIURL)
  1281. assert.NoError(t, err)
  1282. cancellable, cancelFn := context.WithCancel(context.Background())
  1283. defer cancelFn()
  1284. dimClient := dimensions.NewDimensionClient(
  1285. cancellable,
  1286. dimensions.DimensionClientOptions{
  1287. Token: "",
  1288. APIURL: serverURL,
  1289. LogUpdates: true,
  1290. Logger: logger,
  1291. SendDelay: 1,
  1292. MaxBuffered: 10,
  1293. MetricsConverter: *converter,
  1294. APITLSConfig: apiTLSCfg,
  1295. })
  1296. dimClient.Start()
  1297. se := &signalfxExporter{
  1298. dimClient: dimClient,
  1299. }
  1300. sme := signalfMetadataExporter{
  1301. exporter: se,
  1302. }
  1303. err = sme.ConsumeMetadata(metadata)
  1304. time.Sleep(3 * time.Second)
  1305. require.NoError(t, err)
  1306. if tt.wantErr {
  1307. if tt.wantErrMessage != "" {
  1308. assert.Regexp(t, tt.wantErrMessage, observedLogs.All()[0].Context[0].Interface.(error).Error())
  1309. }
  1310. } else {
  1311. require.Equal(t, 1, observedLogs.Len())
  1312. require.Nil(t, observedLogs.All()[0].Context[0].Interface)
  1313. }
  1314. })
  1315. }
  1316. }
  1317. func newLocalHTTPSTestServer(handler http.Handler) (*httptest.Server, error) {
  1318. ts := httptest.NewUnstartedServer(handler)
  1319. cert, err := tls.LoadX509KeyPair("./testdata/certs/cert.pem", "./testdata/certs/cert-key.pem")
  1320. if err != nil {
  1321. return nil, err
  1322. }
  1323. ts.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
  1324. ts.StartTLS()
  1325. return ts, nil
  1326. }