trace_receiver_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sapmreceiver
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "context"
  8. "encoding/binary"
  9. "fmt"
  10. "net/http"
  11. "testing"
  12. "time"
  13. "github.com/jaegertracing/jaeger/model"
  14. "github.com/klauspost/compress/zstd"
  15. splunksapm "github.com/signalfx/sapm-proto/gen"
  16. "github.com/signalfx/sapm-proto/sapmprotocol"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. "go.opentelemetry.io/collector/component"
  20. "go.opentelemetry.io/collector/component/componenttest"
  21. "go.opentelemetry.io/collector/config/confighttp"
  22. "go.opentelemetry.io/collector/config/configtls"
  23. "go.opentelemetry.io/collector/consumer/consumertest"
  24. "go.opentelemetry.io/collector/pdata/pcommon"
  25. "go.opentelemetry.io/collector/pdata/ptrace"
  26. "go.opentelemetry.io/collector/receiver"
  27. "go.opentelemetry.io/collector/receiver/receivertest"
  28. conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
  30. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  31. )
  32. func expectedTraceData(t1, t2, t3 time.Time) ptrace.Traces {
  33. traceID := pcommon.TraceID(
  34. [16]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
  35. parentSpanID := pcommon.SpanID([8]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18})
  36. childSpanID := pcommon.SpanID([8]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8})
  37. traces := ptrace.NewTraces()
  38. rs := traces.ResourceSpans().AppendEmpty()
  39. rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "issaTest")
  40. rs.Resource().Attributes().PutBool("bool", true)
  41. rs.Resource().Attributes().PutStr("string", "yes")
  42. rs.Resource().Attributes().PutInt("int64", 10000000)
  43. spans := rs.ScopeSpans().AppendEmpty().Spans()
  44. span0 := spans.AppendEmpty()
  45. span0.SetSpanID(childSpanID)
  46. span0.SetParentSpanID(parentSpanID)
  47. span0.SetTraceID(traceID)
  48. span0.SetName("DBSearch")
  49. span0.SetStartTimestamp(pcommon.NewTimestampFromTime(t1))
  50. span0.SetEndTimestamp(pcommon.NewTimestampFromTime(t2))
  51. span0.Status().SetCode(ptrace.StatusCodeError)
  52. span0.Status().SetMessage("Stale indices")
  53. span1 := spans.AppendEmpty()
  54. span1.SetSpanID(parentSpanID)
  55. span1.SetTraceID(traceID)
  56. span1.SetName("ProxyFetch")
  57. span1.SetStartTimestamp(pcommon.NewTimestampFromTime(t2))
  58. span1.SetEndTimestamp(pcommon.NewTimestampFromTime(t3))
  59. span1.Status().SetCode(ptrace.StatusCodeError)
  60. span1.Status().SetMessage("Frontend crash")
  61. return traces
  62. }
  63. func grpcFixture(t1 time.Time) *model.Batch {
  64. traceID := model.TraceID{}
  65. _ = traceID.Unmarshal([]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
  66. parentSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}))
  67. childSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}))
  68. return &model.Batch{
  69. Process: &model.Process{
  70. ServiceName: "issaTest",
  71. Tags: []model.KeyValue{
  72. model.Bool("bool", true),
  73. model.String("string", "yes"),
  74. model.Int64("int64", 1e7),
  75. },
  76. },
  77. Spans: []*model.Span{
  78. {
  79. TraceID: traceID,
  80. SpanID: childSpanID,
  81. OperationName: "DBSearch",
  82. StartTime: t1,
  83. Duration: 10 * time.Minute,
  84. Tags: []model.KeyValue{
  85. model.String(conventions.OtelStatusDescription, "Stale indices"),
  86. model.String(conventions.OtelStatusCode, "ERROR"),
  87. model.Bool("error", true),
  88. },
  89. References: []model.SpanRef{
  90. {
  91. TraceID: traceID,
  92. SpanID: parentSpanID,
  93. RefType: model.SpanRefType_CHILD_OF,
  94. },
  95. },
  96. },
  97. {
  98. TraceID: traceID,
  99. SpanID: parentSpanID,
  100. OperationName: "ProxyFetch",
  101. StartTime: t1.Add(10 * time.Minute),
  102. Duration: 2 * time.Second,
  103. Tags: []model.KeyValue{
  104. model.String(conventions.OtelStatusDescription, "Frontend crash"),
  105. model.String(conventions.OtelStatusCode, "ERROR"),
  106. model.Bool("error", true),
  107. },
  108. },
  109. },
  110. }
  111. }
  112. // sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future.
  113. func sendSapm(
  114. endpoint string,
  115. sapm *splunksapm.PostSpansRequest,
  116. compression string,
  117. tlsEnabled bool,
  118. token string,
  119. ) (*http.Response, error) {
  120. // marshal the sapm
  121. reqBytes, err := sapm.Marshal()
  122. if err != nil {
  123. return nil, fmt.Errorf("failed to marshal sapm %w", err)
  124. }
  125. switch compression {
  126. case "gzip":
  127. reqBytes, err = compressGzip(reqBytes)
  128. if err != nil {
  129. return nil, err
  130. }
  131. case "zstd":
  132. reqBytes, err = compressZstd(reqBytes)
  133. if err != nil {
  134. return nil, err
  135. }
  136. case "":
  137. // no compression
  138. default:
  139. return nil, fmt.Errorf("unknown compression %q", compression)
  140. }
  141. // build the request
  142. url := fmt.Sprintf("http://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
  143. if tlsEnabled {
  144. url = fmt.Sprintf("https://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
  145. }
  146. req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(reqBytes))
  147. req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)
  148. // set headers for gzip
  149. if compression != "" {
  150. req.Header.Set(sapmprotocol.ContentEncodingHeaderName, compression)
  151. req.Header.Set(sapmprotocol.AcceptEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
  152. }
  153. if token != "" {
  154. req.Header.Set("x-sf-token", token)
  155. }
  156. // send the request
  157. client := &http.Client{}
  158. if tlsEnabled {
  159. tlscs := configtls.TLSClientSetting{
  160. TLSSetting: configtls.TLSSetting{
  161. CAFile: "./testdata/ca.crt",
  162. CertFile: "./testdata/client.crt",
  163. KeyFile: "./testdata/client.key",
  164. },
  165. ServerName: "localhost",
  166. }
  167. tls, errTLS := tlscs.LoadTLSConfig()
  168. if errTLS != nil {
  169. return nil, fmt.Errorf("failed to send request to receiver %w", err)
  170. }
  171. client.Transport = &http.Transport{
  172. TLSClientConfig: tls,
  173. }
  174. }
  175. resp, err := client.Do(req)
  176. if err != nil {
  177. return resp, fmt.Errorf("failed to send request to receiver %w", err)
  178. }
  179. return resp, nil
  180. }
  181. func compressGzip(reqBytes []byte) ([]byte, error) {
  182. // create a gzip writer
  183. var buff bytes.Buffer
  184. writer := gzip.NewWriter(&buff)
  185. // run the request bytes through the gzip writer
  186. _, err := writer.Write(reqBytes)
  187. if err != nil {
  188. return nil, fmt.Errorf("failed to write gzip sapm %w", err)
  189. }
  190. // close the writer
  191. err = writer.Close()
  192. if err != nil {
  193. return nil, fmt.Errorf("failed to close the gzip writer %w", err)
  194. }
  195. return buff.Bytes(), nil
  196. }
  197. func compressZstd(reqBytes []byte) ([]byte, error) {
  198. // create a gzip writer
  199. var buff bytes.Buffer
  200. writer, err := zstd.NewWriter(&buff)
  201. if err != nil {
  202. return nil, fmt.Errorf("failed to write zstd sapm %w", err)
  203. }
  204. // run the request bytes through the gzip writer
  205. _, err = writer.Write(reqBytes)
  206. if err != nil {
  207. return nil, fmt.Errorf("failed to write zstd sapm %w", err)
  208. }
  209. // close the writer
  210. err = writer.Close()
  211. if err != nil {
  212. return nil, fmt.Errorf("failed to close the zstd writer %w", err)
  213. }
  214. return buff.Bytes(), nil
  215. }
  216. func setupReceiver(t *testing.T, config *Config, sink *consumertest.TracesSink) receiver.Traces {
  217. params := receivertest.NewNopCreateSettings()
  218. sr, err := newReceiver(params, config, sink)
  219. assert.NoError(t, err, "should not have failed to create the SAPM receiver")
  220. t.Log("Starting")
  221. mh := newAssertNoErrorHost(t)
  222. require.NoError(t, sr.Start(context.Background(), mh), "should not have failed to start trace reception")
  223. require.NoError(t, sr.Start(context.Background(), mh), "should not fail to start log on second Start call")
  224. // If there are errors reported through host.ReportFatalError() this will retrieve it.
  225. <-time.After(500 * time.Millisecond)
  226. t.Log("Trace Reception Started")
  227. return sr
  228. }
  229. func TestReception(t *testing.T) {
  230. now := time.Unix(1542158650, 536343000).UTC()
  231. nowPlus10min := now.Add(10 * time.Minute)
  232. nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second)
  233. tlsAddress := testutil.GetAvailableLocalAddress(t)
  234. type args struct {
  235. config *Config
  236. sapm *splunksapm.PostSpansRequest
  237. compression string
  238. useTLS bool
  239. }
  240. tests := []struct {
  241. name string
  242. args args
  243. want ptrace.Traces
  244. }{
  245. {
  246. name: "receive uncompressed sapm",
  247. args: args{
  248. // 1. Create the SAPM receiver aka "server"
  249. config: &Config{
  250. HTTPServerSettings: confighttp.HTTPServerSettings{
  251. Endpoint: defaultEndpoint,
  252. },
  253. },
  254. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  255. compression: "",
  256. useTLS: false,
  257. },
  258. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  259. },
  260. {
  261. name: "receive compressed sapm",
  262. args: args{
  263. config: &Config{
  264. HTTPServerSettings: confighttp.HTTPServerSettings{
  265. Endpoint: defaultEndpoint,
  266. },
  267. },
  268. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  269. compression: "gzip",
  270. useTLS: false,
  271. },
  272. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  273. },
  274. {
  275. name: "connect via TLS zstd compressed sapm",
  276. args: args{
  277. config: &Config{
  278. HTTPServerSettings: confighttp.HTTPServerSettings{
  279. Endpoint: tlsAddress,
  280. TLSSetting: &configtls.TLSServerSetting{
  281. TLSSetting: configtls.TLSSetting{
  282. CAFile: "./testdata/ca.crt",
  283. CertFile: "./testdata/server.crt",
  284. KeyFile: "./testdata/server.key",
  285. },
  286. },
  287. },
  288. },
  289. sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
  290. compression: "zstd",
  291. useTLS: true,
  292. },
  293. want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
  294. },
  295. }
  296. for _, tt := range tests {
  297. t.Run(tt.name, func(t *testing.T) {
  298. sink := new(consumertest.TracesSink)
  299. sr := setupReceiver(t, tt.args.config, sink)
  300. defer func() {
  301. require.NoError(t, sr.Shutdown(context.Background()))
  302. }()
  303. t.Log("Sending Sapm Request")
  304. var resp *http.Response
  305. resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.compression, tt.args.useTLS, "")
  306. require.NoError(t, err)
  307. assert.Equal(t, 200, resp.StatusCode)
  308. t.Log("SAPM Request Received")
  309. // retrieve received traces
  310. got := sink.AllTraces()
  311. assert.Equal(t, 1, len(got))
  312. // compare what we got to what we wanted
  313. t.Log("Comparing expected data to trace data")
  314. assert.EqualValues(t, tt.want, got[0])
  315. })
  316. }
  317. }
  318. func TestAccessTokenPassthrough(t *testing.T) {
  319. tests := []struct {
  320. name string
  321. accessTokenPassthrough bool
  322. token string
  323. }{
  324. {
  325. name: "no passthrough and no token",
  326. accessTokenPassthrough: false,
  327. token: "",
  328. },
  329. {
  330. name: "no passthrough and token",
  331. accessTokenPassthrough: false,
  332. token: "MyAccessToken",
  333. },
  334. {
  335. name: "passthrough and no token",
  336. accessTokenPassthrough: true,
  337. token: "",
  338. },
  339. {
  340. name: "passthrough and token",
  341. accessTokenPassthrough: true,
  342. token: "MyAccessToken",
  343. },
  344. }
  345. for _, tt := range tests {
  346. t.Run(tt.name, func(t *testing.T) {
  347. config := &Config{
  348. HTTPServerSettings: confighttp.HTTPServerSettings{
  349. Endpoint: defaultEndpoint,
  350. },
  351. AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
  352. AccessTokenPassthrough: tt.accessTokenPassthrough,
  353. },
  354. }
  355. sapm := &splunksapm.PostSpansRequest{
  356. Batches: []*model.Batch{grpcFixture(time.Now().UTC())},
  357. }
  358. sink := new(consumertest.TracesSink)
  359. sr := setupReceiver(t, config, sink)
  360. defer func() {
  361. require.NoError(t, sr.Shutdown(context.Background()))
  362. }()
  363. var resp *http.Response
  364. resp, err := sendSapm(config.Endpoint, sapm, "gzip", false, tt.token)
  365. require.NoErrorf(t, err, "should not have failed when sending sapm %v", err)
  366. assert.Equal(t, 200, resp.StatusCode)
  367. got := sink.AllTraces()
  368. assert.Equal(t, 1, len(got))
  369. received := got[0].ResourceSpans()
  370. for i := 0; i < received.Len(); i++ {
  371. rspan := received.At(i)
  372. attrs := rspan.Resource().Attributes()
  373. amap, contains := attrs.Get("com.splunk.signalfx.access_token")
  374. if tt.accessTokenPassthrough && tt.token != "" {
  375. assert.Equal(t, tt.token, amap.Str())
  376. } else {
  377. assert.False(t, contains)
  378. }
  379. }
  380. })
  381. }
  382. }
  383. // assertNoErrorHost implements a component.Host that asserts that there were no errors.
  384. type assertNoErrorHost struct {
  385. component.Host
  386. *testing.T
  387. }
  388. // newAssertNoErrorHost returns a new instance of assertNoErrorHost.
  389. func newAssertNoErrorHost(t *testing.T) component.Host {
  390. return &assertNoErrorHost{
  391. Host: componenttest.NewNopHost(),
  392. T: t,
  393. }
  394. }
  395. func (aneh *assertNoErrorHost) ReportFatalError(err error) {
  396. assert.NoError(aneh, err)
  397. }