123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package sapmreceiver
- import (
- "bytes"
- "compress/gzip"
- "context"
- "encoding/binary"
- "fmt"
- "net/http"
- "testing"
- "time"
- "github.com/jaegertracing/jaeger/model"
- "github.com/klauspost/compress/zstd"
- splunksapm "github.com/signalfx/sapm-proto/gen"
- "github.com/signalfx/sapm-proto/sapmprotocol"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/confighttp"
- "go.opentelemetry.io/collector/config/configtls"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/receivertest"
- conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- )
- func expectedTraceData(t1, t2, t3 time.Time) ptrace.Traces {
- traceID := pcommon.TraceID(
- [16]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
- parentSpanID := pcommon.SpanID([8]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18})
- childSpanID := pcommon.SpanID([8]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8})
- traces := ptrace.NewTraces()
- rs := traces.ResourceSpans().AppendEmpty()
- rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "issaTest")
- rs.Resource().Attributes().PutBool("bool", true)
- rs.Resource().Attributes().PutStr("string", "yes")
- rs.Resource().Attributes().PutInt("int64", 10000000)
- spans := rs.ScopeSpans().AppendEmpty().Spans()
- span0 := spans.AppendEmpty()
- span0.SetSpanID(childSpanID)
- span0.SetParentSpanID(parentSpanID)
- span0.SetTraceID(traceID)
- span0.SetName("DBSearch")
- span0.SetStartTimestamp(pcommon.NewTimestampFromTime(t1))
- span0.SetEndTimestamp(pcommon.NewTimestampFromTime(t2))
- span0.Status().SetCode(ptrace.StatusCodeError)
- span0.Status().SetMessage("Stale indices")
- span1 := spans.AppendEmpty()
- span1.SetSpanID(parentSpanID)
- span1.SetTraceID(traceID)
- span1.SetName("ProxyFetch")
- span1.SetStartTimestamp(pcommon.NewTimestampFromTime(t2))
- span1.SetEndTimestamp(pcommon.NewTimestampFromTime(t3))
- span1.Status().SetCode(ptrace.StatusCodeError)
- span1.Status().SetMessage("Frontend crash")
- return traces
- }
- func grpcFixture(t1 time.Time) *model.Batch {
- traceID := model.TraceID{}
- _ = traceID.Unmarshal([]byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80})
- parentSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}))
- childSpanID := model.NewSpanID(binary.BigEndian.Uint64([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}))
- return &model.Batch{
- Process: &model.Process{
- ServiceName: "issaTest",
- Tags: []model.KeyValue{
- model.Bool("bool", true),
- model.String("string", "yes"),
- model.Int64("int64", 1e7),
- },
- },
- Spans: []*model.Span{
- {
- TraceID: traceID,
- SpanID: childSpanID,
- OperationName: "DBSearch",
- StartTime: t1,
- Duration: 10 * time.Minute,
- Tags: []model.KeyValue{
- model.String(conventions.OtelStatusDescription, "Stale indices"),
- model.String(conventions.OtelStatusCode, "ERROR"),
- model.Bool("error", true),
- },
- References: []model.SpanRef{
- {
- TraceID: traceID,
- SpanID: parentSpanID,
- RefType: model.SpanRefType_CHILD_OF,
- },
- },
- },
- {
- TraceID: traceID,
- SpanID: parentSpanID,
- OperationName: "ProxyFetch",
- StartTime: t1.Add(10 * time.Minute),
- Duration: 2 * time.Second,
- Tags: []model.KeyValue{
- model.String(conventions.OtelStatusDescription, "Frontend crash"),
- model.String(conventions.OtelStatusCode, "ERROR"),
- model.Bool("error", true),
- },
- },
- },
- }
- }
- // sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future.
- func sendSapm(
- endpoint string,
- sapm *splunksapm.PostSpansRequest,
- compression string,
- tlsEnabled bool,
- token string,
- ) (*http.Response, error) {
- // marshal the sapm
- reqBytes, err := sapm.Marshal()
- if err != nil {
- return nil, fmt.Errorf("failed to marshal sapm %w", err)
- }
- switch compression {
- case "gzip":
- reqBytes, err = compressGzip(reqBytes)
- if err != nil {
- return nil, err
- }
- case "zstd":
- reqBytes, err = compressZstd(reqBytes)
- if err != nil {
- return nil, err
- }
- case "":
- // no compression
- default:
- return nil, fmt.Errorf("unknown compression %q", compression)
- }
- // build the request
- url := fmt.Sprintf("http://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
- if tlsEnabled {
- url = fmt.Sprintf("https://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
- }
- req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(reqBytes))
- req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)
- // set headers for gzip
- if compression != "" {
- req.Header.Set(sapmprotocol.ContentEncodingHeaderName, compression)
- req.Header.Set(sapmprotocol.AcceptEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
- }
- if token != "" {
- req.Header.Set("x-sf-token", token)
- }
- // send the request
- client := &http.Client{}
- if tlsEnabled {
- tlscs := configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/ca.crt",
- CertFile: "./testdata/client.crt",
- KeyFile: "./testdata/client.key",
- },
- ServerName: "localhost",
- }
- tls, errTLS := tlscs.LoadTLSConfig()
- if errTLS != nil {
- return nil, fmt.Errorf("failed to send request to receiver %w", err)
- }
- client.Transport = &http.Transport{
- TLSClientConfig: tls,
- }
- }
- resp, err := client.Do(req)
- if err != nil {
- return resp, fmt.Errorf("failed to send request to receiver %w", err)
- }
- return resp, nil
- }
- func compressGzip(reqBytes []byte) ([]byte, error) {
- // create a gzip writer
- var buff bytes.Buffer
- writer := gzip.NewWriter(&buff)
- // run the request bytes through the gzip writer
- _, err := writer.Write(reqBytes)
- if err != nil {
- return nil, fmt.Errorf("failed to write gzip sapm %w", err)
- }
- // close the writer
- err = writer.Close()
- if err != nil {
- return nil, fmt.Errorf("failed to close the gzip writer %w", err)
- }
- return buff.Bytes(), nil
- }
- func compressZstd(reqBytes []byte) ([]byte, error) {
- // create a gzip writer
- var buff bytes.Buffer
- writer, err := zstd.NewWriter(&buff)
- if err != nil {
- return nil, fmt.Errorf("failed to write zstd sapm %w", err)
- }
- // run the request bytes through the gzip writer
- _, err = writer.Write(reqBytes)
- if err != nil {
- return nil, fmt.Errorf("failed to write zstd sapm %w", err)
- }
- // close the writer
- err = writer.Close()
- if err != nil {
- return nil, fmt.Errorf("failed to close the zstd writer %w", err)
- }
- return buff.Bytes(), nil
- }
- func setupReceiver(t *testing.T, config *Config, sink *consumertest.TracesSink) receiver.Traces {
- params := receivertest.NewNopCreateSettings()
- sr, err := newReceiver(params, config, sink)
- assert.NoError(t, err, "should not have failed to create the SAPM receiver")
- t.Log("Starting")
- mh := newAssertNoErrorHost(t)
- require.NoError(t, sr.Start(context.Background(), mh), "should not have failed to start trace reception")
- require.NoError(t, sr.Start(context.Background(), mh), "should not fail to start log on second Start call")
- // If there are errors reported through host.ReportFatalError() this will retrieve it.
- <-time.After(500 * time.Millisecond)
- t.Log("Trace Reception Started")
- return sr
- }
- func TestReception(t *testing.T) {
- now := time.Unix(1542158650, 536343000).UTC()
- nowPlus10min := now.Add(10 * time.Minute)
- nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second)
- tlsAddress := testutil.GetAvailableLocalAddress(t)
- type args struct {
- config *Config
- sapm *splunksapm.PostSpansRequest
- compression string
- useTLS bool
- }
- tests := []struct {
- name string
- args args
- want ptrace.Traces
- }{
- {
- name: "receive uncompressed sapm",
- args: args{
- // 1. Create the SAPM receiver aka "server"
- config: &Config{
- HTTPServerSettings: confighttp.HTTPServerSettings{
- Endpoint: defaultEndpoint,
- },
- },
- sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
- compression: "",
- useTLS: false,
- },
- want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
- },
- {
- name: "receive compressed sapm",
- args: args{
- config: &Config{
- HTTPServerSettings: confighttp.HTTPServerSettings{
- Endpoint: defaultEndpoint,
- },
- },
- sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
- compression: "gzip",
- useTLS: false,
- },
- want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
- },
- {
- name: "connect via TLS zstd compressed sapm",
- args: args{
- config: &Config{
- HTTPServerSettings: confighttp.HTTPServerSettings{
- Endpoint: tlsAddress,
- TLSSetting: &configtls.TLSServerSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "./testdata/ca.crt",
- CertFile: "./testdata/server.crt",
- KeyFile: "./testdata/server.key",
- },
- },
- },
- },
- sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
- compression: "zstd",
- useTLS: true,
- },
- want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- sink := new(consumertest.TracesSink)
- sr := setupReceiver(t, tt.args.config, sink)
- defer func() {
- require.NoError(t, sr.Shutdown(context.Background()))
- }()
- t.Log("Sending Sapm Request")
- var resp *http.Response
- resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.compression, tt.args.useTLS, "")
- require.NoError(t, err)
- assert.Equal(t, 200, resp.StatusCode)
- t.Log("SAPM Request Received")
- // retrieve received traces
- got := sink.AllTraces()
- assert.Equal(t, 1, len(got))
- // compare what we got to what we wanted
- t.Log("Comparing expected data to trace data")
- assert.EqualValues(t, tt.want, got[0])
- })
- }
- }
- func TestAccessTokenPassthrough(t *testing.T) {
- tests := []struct {
- name string
- accessTokenPassthrough bool
- token string
- }{
- {
- name: "no passthrough and no token",
- accessTokenPassthrough: false,
- token: "",
- },
- {
- name: "no passthrough and token",
- accessTokenPassthrough: false,
- token: "MyAccessToken",
- },
- {
- name: "passthrough and no token",
- accessTokenPassthrough: true,
- token: "",
- },
- {
- name: "passthrough and token",
- accessTokenPassthrough: true,
- token: "MyAccessToken",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- config := &Config{
- HTTPServerSettings: confighttp.HTTPServerSettings{
- Endpoint: defaultEndpoint,
- },
- AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
- AccessTokenPassthrough: tt.accessTokenPassthrough,
- },
- }
- sapm := &splunksapm.PostSpansRequest{
- Batches: []*model.Batch{grpcFixture(time.Now().UTC())},
- }
- sink := new(consumertest.TracesSink)
- sr := setupReceiver(t, config, sink)
- defer func() {
- require.NoError(t, sr.Shutdown(context.Background()))
- }()
- var resp *http.Response
- resp, err := sendSapm(config.Endpoint, sapm, "gzip", false, tt.token)
- require.NoErrorf(t, err, "should not have failed when sending sapm %v", err)
- assert.Equal(t, 200, resp.StatusCode)
- got := sink.AllTraces()
- assert.Equal(t, 1, len(got))
- received := got[0].ResourceSpans()
- for i := 0; i < received.Len(); i++ {
- rspan := received.At(i)
- attrs := rspan.Resource().Attributes()
- amap, contains := attrs.Get("com.splunk.signalfx.access_token")
- if tt.accessTokenPassthrough && tt.token != "" {
- assert.Equal(t, tt.token, amap.Str())
- } else {
- assert.False(t, contains)
- }
- }
- })
- }
- }
- // assertNoErrorHost implements a component.Host that asserts that there were no errors.
- type assertNoErrorHost struct {
- component.Host
- *testing.T
- }
- // newAssertNoErrorHost returns a new instance of assertNoErrorHost.
- func newAssertNoErrorHost(t *testing.T) component.Host {
- return &assertNoErrorHost{
- Host: componenttest.NewNopHost(),
- T: t,
- }
- }
- func (aneh *assertNoErrorHost) ReportFatalError(err error) {
- assert.NoError(aneh, err)
- }
|