trace_receiver_test.go 13 KB

  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sapmreceiver
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "context"
  8. "encoding/binary"
  9. "fmt"
  10. "net/http"
  11. "testing"
  12. "time"
  13. ""
  14. ""
  15. splunksapm ""
  16. ""
  17. ""
  18. ""
  19. ""
  20. ""
  21. ""
  22. ""
  23. ""
  24. ""
  25. ""
  26. ""
  27. ""
  28. conventions ""
  29. ""
  30. ""
  31. )
  32. func expectedTraceData(t1, t2, t3 time.Time) ptrace.Traces {
  33. traceID := pcommon.TraceID(
  34. [16]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
  35. parentSpanID := pcommon.SpanID([8]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18})
  36. childSpanID := pcommon.SpanID([8]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8})
  37. traces := ptrace.NewTraces()
  38. rs := traces.ResourceSpans().AppendEmpty()
  39. rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "issaTest")
  40. rs.Resource().Attributes().PutBool("bool", true)
  41. rs.Resource().Attributes().PutStr("string", "yes")
  42. rs.Resource().Attributes().PutInt("int64", 10000000)
  43. spans := rs.ScopeSpans().AppendEmpty().Spans()
  44. span0 := spans.AppendEmpty()
  45. span0.SetSpanID(childSpanID)
  46. span0.SetParentSpanID(parentSpanID)
  47. span0.SetTraceID(traceID)
  48. span0.SetName("DBSearch")
  49. span0.SetStartTimestamp(pcommon.NewTimestampFromTime(t1))
  50. span0.SetEndTimestamp(pcommon.NewTimestampFromTime(t2))
  51. span0.Status().SetCode(ptrace.StatusCodeError)
  52. span0.Status().SetMessage("Stale indices")
  53. span1 := spans.AppendEmpty()
  54. span1.SetSpanID(parentSpanID)
  55. span1.SetTraceID(traceID)
  56. span1.SetName("ProxyFetch")
  57. span1.SetStartTimestamp(pcommon.NewTimestampFromTime(t2))
  58. span1.SetEndTimestamp(pcommon.NewTimestampFromTime(t3))
  59. span1.Status().SetCode(ptrace.StatusCodeError)
  60. span1.Status().SetMessage("Frontend crash")
  61. return traces
  62. }
  63. func grpcFixture(t1 time.Time) *model.Batch {
  64. traceID := model.TraceID{}
  65. _ = traceID.Unmarshal([]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
  66. parentSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}))
  67. childSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}))
  68. return &model.Batch{
  69. Process: &model.Process{
  70. ServiceName: "issaTest",
  71. Tags: []model.KeyValue{
  72. model.Bool("bool", true),
  73. model.String("string", "yes"),
  74. model.Int64("int64", 1e7),
  75. },
  76. },
  77. Spans: []*model.Span{
  78. {
  79. TraceID: traceID,
  80. SpanID: childSpanID,
  81. OperationName: "DBSearch",
  82. StartTime: t1,
  83. Duration: 10 * time.Minute,
  84. Tags: []model.KeyValue{
  85. model.String(conventions.OtelStatusDescription, "Stale indices"),
  86. model.String(conventions.OtelStatusCode, "ERROR"),
  87. model.Bool("error", true),
  88. },
  89. References: []model.SpanRef{
  90. {
  91. TraceID: traceID,
  92. SpanID: parentSpanID,
  93. RefType: model.SpanRefType_CHILD_OF,
  94. },
  95. },
  96. },
  97. {
  98. TraceID: traceID,
  99. SpanID: parentSpanID,
  100. OperationName: "ProxyFetch",
  101. StartTime: t1.Add(10 * time.Minute),
  102. Duration: 2 * time.Second,
  103. Tags: []model.KeyValue{
  104. model.String(conventions.OtelStatusDescription, "Frontend crash"),
  105. model.String(conventions.OtelStatusCode, "ERROR"),
  106. model.Bool("error", true),
  107. },
  108. },
  109. },
  110. }
  111. }
  112. // sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future.
  113. func sendSapm(
  114. endpoint string,
  115. sapm *splunksapm.PostSpansRequest,
  116. compression string,
  117. tlsEnabled bool,
  118. token string,
  119. ) (*http.Response, error) {
  120. // marshal the sapm
  121. reqBytes, err := sapm.Marshal()
  122. if err != nil {
  123. return nil, fmt.Errorf("failed to marshal sapm %w", err)
  124. }
  125. switch compression {
  126. case "gzip":
  127. reqBytes, err = compressGzip(reqBytes)
  128. if err != nil {
  129. return nil, err
  130. }
  131. case "zstd":
  132. reqBytes, err = compressZstd(reqBytes)
  133. if err != nil {
  134. return nil, err
  135. }
  136. case "":
  137. // no compression
  138. default:
  139. return nil, fmt.Errorf("unknown compression %q", compression)
  140. }
  141. // build the request
  142. url := fmt.Sprintf("http://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
  143. if tlsEnabled {
  144. url = fmt.Sprintf("https://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
  145. }
  146. req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(reqBytes))
  147. req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)
  148. // set headers for gzip
  149. if compression != "" {
  150. req.Header.Set(sapmprotocol.ContentEncodingHeaderName, compression)
  151. req.Header.Set(sapmprotocol.AcceptEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
  152. }
  153. if token != "" {
  154. req.Header.Set("x-sf-token", token)
  155. }
  156. // send the request
  157. client := &http.Client{}
  158. if tlsEnabled {
  159. tlscs := configtls.TLSClientSetting{
  160. TLSSetting: configtls.TLSSetting{
  161. CAFile: "./testdata/ca.crt",
  162. CertFile: "./testdata/client.crt",
  163. KeyFile: "./testdata/client.key",
  164. },
  165. ServerName: "localhost",
  166. }
  167. tls, errTLS := tlscs.LoadTLSConfig()
  168. if errTLS != nil {
  169. return nil, fmt.Errorf("failed to send request to receiver %w", err)
  170. }
  171. client.Transport = &http.Transport{
  172. TLSClientConfig: tls,
  173. }
  174. }
  175. resp, err := client.Do(req)
  176. if err != nil {
  177. return resp, fmt.Errorf("failed to send request to receiver %w", err)
  178. }
  179. return resp, nil
  180. }
  181. func compressGzip(reqBytes []byte) ([]byte, error) {
  182. // create a gzip writer
  183. var buff bytes.Buffer
  184. writer := gzip.NewWriter(&buff)
  185. // run the request bytes through the gzip writer
  186. _, err := writer.Write(reqBytes)
  187. if err != nil {
  188. return nil, fmt.Errorf("failed to write gzip sapm %w", err)
  189. }
  190. // close the writer
  191. err = writer.Close()
  192. if err != nil {
  193. return nil, fmt.Errorf("failed to close the gzip writer %w", err)
  194. }
  195. return buff.Bytes(), nil
  196. }
  197. func compressZstd(reqBytes []byte) ([]byte, error) {
  198. // create a gzip writer
  199. var buff bytes.Buffer
  200. writer, err := zstd.NewWriter(&buff)
  201. if err != nil {
  202. return nil, fmt.Errorf("failed to write zstd sapm %w", err)
  203. }
  204. // run the request bytes through the gzip writer
  205. _, err = writer.Write(reqBytes)
  206. if err != nil {
  207. return nil, fmt.Errorf("failed to write zstd sapm %w", err)
  208. }
  209. // close the writer
  210. err = writer.Close()
  211. if err != nil {
  212. return nil, fmt.Errorf("failed to close the zstd writer %w", err)
  213. }
  214. return buff.Bytes(), nil
  215. }
  216. func setupReceiver(t *testing.T, config *Config, sink *consumertest.TracesSink) receiver.Traces {
  217. params := receivertest.NewNopCreateSettings()
  218. sr, err := newReceiver(params, config, sink)
  219. assert.NoError(t, err, "should not have failed to create the SAPM receiver")
  220. t.Log("Starting")
  221. mh := newAssertNoErrorHost(t)
  222. require.NoError(t, sr.Start(context.Background(), mh), "should not have failed to start trace reception")
  223. require.NoError(t, sr.Start(context.Background(), mh), "should not fail to start log on second Start call")
  224. // If there are errors reported through host.ReportFatalError() this will retrieve it.
  225. <-time.After(500 * time.Millisecond)
  226. t.Log("Trace Reception Started")
  227. return sr
  228. }
  229. func TestReception(t *testing.T) {
  230. now := time.Unix(1542158650, 536343000).UTC()
  231. nowPlus10min := now.Add(10 * time.Minute)
  232. nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second)
  233. tlsAddress := testutil.GetAvailableLocalAddress(t)
  234. type args struct {
  235. config *Config
  236. sapm *splunksapm.PostSpansRequest
  237. compression string
  238. useTLS bool
  239. }
  240. tests := []struct {
  241. name string
  242. args args
  243. want ptrace.Traces
  244. }{
  245. {
  246. name: "receive uncompressed sapm",
  247. args: args{
  248. // 1. Create the SAPM receiver aka "server"
  249. config: &Config{
  250. HTTPServerSettings: confighttp.HTTPServerSettings{
  251. Endpoint: defaultEndpoint,
  252. },
  253. },
  254. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  255. compression: "",
  256. useTLS: false,
  257. },
  258. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  259. },
  260. {
  261. name: "receive compressed sapm",
  262. args: args{
  263. config: &Config{
  264. HTTPServerSettings: confighttp.HTTPServerSettings{
  265. Endpoint: defaultEndpoint,
  266. },
  267. },
  268. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  269. compression: "gzip",
  270. useTLS: false,
  271. },
  272. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  273. },
  274. {
  275. name: "connect via TLS zstd compressed sapm",
  276. args: args{
  277. config: &Config{
  278. HTTPServerSettings: confighttp.HTTPServerSettings{
  279. Endpoint: tlsAddress,
  280. TLSSetting: &configtls.TLSServerSetting{
  281. TLSSetting: configtls.TLSSetting{
  282. CAFile: "./testdata/ca.crt",
  283. CertFile: "./testdata/server.crt",
  284. KeyFile: "./testdata/server.key",
  285. },
  286. },
  287. },
  288. },
  289. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  290. compression: "zstd",
  291. useTLS: true,
  292. },
  293. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  294. },
  295. }
  296. for _, tt := range tests {
  297. t.Run(, func(t *testing.T) {
  298. sink := new(consumertest.TracesSink)
  299. sr := setupReceiver(t, tt.args.config, sink)
  300. defer func() {
  301. require.NoError(t, sr.Shutdown(context.Background()))
  302. }()
  303. t.Log("Sending Sapm Request")
  304. var resp *http.Response
  305. resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.compression, tt.args.useTLS, "")
  306. require.NoError(t, err)
  307. assert.Equal(t, 200, resp.StatusCode)
  308. t.Log("SAPM Request Received")
  309. // retrieve received traces
  310. got := sink.AllTraces()
  311. assert.Equal(t, 1, len(got))
  312. // compare what we got to what we wanted
  313. t.Log("Comparing expected data to trace data")
  314. assert.EqualValues(t, tt.want, got[0])
  315. })
  316. }
  317. }
  318. func TestAccessTokenPassthrough(t *testing.T) {
  319. tests := []struct {
  320. name string
  321. accessTokenPassthrough bool
  322. token string
  323. }{
  324. {
  325. name: "no passthrough and no token",
  326. accessTokenPassthrough: false,
  327. token: "",
  328. },
  329. {
  330. name: "no passthrough and token",
  331. accessTokenPassthrough: false,
  332. token: "MyAccessToken",
  333. },
  334. {
  335. name: "passthrough and no token",
  336. accessTokenPassthrough: true,
  337. token: "",
  338. },
  339. {
  340. name: "passthrough and token",
  341. accessTokenPassthrough: true,
  342. token: "MyAccessToken",
  343. },
  344. }
  345. for _, tt := range tests {
  346. t.Run(, func(t *testing.T) {
  347. config := &Config{
  348. HTTPServerSettings: confighttp.HTTPServerSettings{
  349. Endpoint: defaultEndpoint,
  350. },
  351. AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
  352. AccessTokenPassthrough: tt.accessTokenPassthrough,
  353. },
  354. }
  355. sapm := &splunksapm.PostSpansRequest{
  356. Batches: []*model.Batch{grpcFixture(time.Now().UTC())},
  357. }
  358. sink := new(consumertest.TracesSink)
  359. sr := setupReceiver(t, config, sink)
  360. defer func() {
  361. require.NoError(t, sr.Shutdown(context.Background()))
  362. }()
  363. var resp *http.Response
  364. resp, err := sendSapm(config.Endpoint, sapm, "gzip", false, tt.token)
  365. require.NoErrorf(t, err, "should not have failed when sending sapm %v", err)
  366. assert.Equal(t, 200, resp.StatusCode)
  367. got := sink.AllTraces()
  368. assert.Equal(t, 1, len(got))
  369. received := got[0].ResourceSpans()
  370. for i := 0; i < received.Len(); i++ {
  371. rspan := received.At(i)
  372. attrs := rspan.Resource().Attributes()
  373. amap, contains := attrs.Get("com.splunk.signalfx.access_token")
  374. if tt.accessTokenPassthrough && tt.token != "" {
  375. assert.Equal(t, tt.token, amap.Str())
  376. } else {
  377. assert.False(t, contains)
  378. }
  379. }
  380. })
  381. }
  382. }
  383. // assertNoErrorHost implements a component.Host that asserts that there were no errors.
  384. type assertNoErrorHost struct {
  385. component.Host
  386. *testing.T
  387. }
  388. // newAssertNoErrorHost returns a new instance of assertNoErrorHost.
  389. func newAssertNoErrorHost(t *testing.T) component.Host {
  390. return &assertNoErrorHost{
  391. Host: componenttest.NewNopHost(),
  392. T: t,
  393. }
  394. }
  395. func (aneh *assertNoErrorHost) ReportFatalError(err error) {
  396. assert.NoError(aneh, err)
  397. }