jaeger_agent_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package jaegerreceiver
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "testing"
  10. "time"
  11. "github.com/apache/thrift/lib/go/thrift"
  12. "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
  13. "github.com/jaegertracing/jaeger/model"
  14. jaegerconvert "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
  15. "github.com/jaegertracing/jaeger/proto-gen/api_v2"
  16. "github.com/jaegertracing/jaeger/thrift-gen/agent"
  17. jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
  18. "github.com/stretchr/testify/assert"
  19. "github.com/stretchr/testify/require"
  20. "go.opentelemetry.io/collector/component"
  21. "go.opentelemetry.io/collector/component/componenttest"
  22. "go.opentelemetry.io/collector/consumer/consumertest"
  23. "go.opentelemetry.io/collector/pdata/ptrace"
  24. "go.opentelemetry.io/collector/receiver/receivertest"
  25. conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
  26. "google.golang.org/grpc"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
  28. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/metadata"
  30. )
  31. var jaegerAgent = component.NewIDWithName(metadata.Type, "agent_test")
  32. func TestJaegerAgentUDP_ThriftCompact(t *testing.T) {
  33. addr := testutil.GetAvailableLocalAddress(t)
  34. testJaegerAgent(t, addr, &configuration{
  35. AgentCompactThrift: ProtocolUDP{
  36. Endpoint: addr,
  37. ServerConfigUDP: defaultServerConfigUDP(),
  38. },
  39. })
  40. }
  41. func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {
  42. config := &configuration{
  43. AgentCompactThrift: ProtocolUDP{
  44. Endpoint: "0.0.0.0:999999",
  45. ServerConfigUDP: defaultServerConfigUDP(),
  46. },
  47. }
  48. set := receivertest.NewNopCreateSettings()
  49. jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
  50. require.NoError(t, err)
  51. assert.Error(t, jr.Start(context.Background(), componenttest.NewNopHost()), "should not have been able to startTraceReception")
  52. require.NoError(t, jr.Shutdown(context.Background()))
  53. }
  54. func TestJaegerAgentUDP_ThriftBinary(t *testing.T) {
  55. addr := testutil.GetAvailableLocalAddress(t)
  56. testJaegerAgent(t, addr, &configuration{
  57. AgentBinaryThrift: ProtocolUDP{
  58. Endpoint: addr,
  59. ServerConfigUDP: defaultServerConfigUDP(),
  60. },
  61. })
  62. }
  63. func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
  64. // This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above.
  65. addr := testutil.GetAvailableLocalAddress(t)
  66. config := &configuration{
  67. AgentBinaryThrift: ProtocolUDP{
  68. Endpoint: addr,
  69. ServerConfigUDP: defaultServerConfigUDP(),
  70. },
  71. }
  72. set := receivertest.NewNopCreateSettings()
  73. jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
  74. require.NoError(t, err)
  75. assert.NoError(t, jr.startAgent(componenttest.NewNopHost()), "Start failed")
  76. t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })
  77. l, err := net.Listen("udp", addr)
  78. assert.Error(t, err, "should not have been able to listen to the port")
  79. if l != nil {
  80. l.Close()
  81. }
  82. }
  83. func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
  84. config := &configuration{
  85. AgentBinaryThrift: ProtocolUDP{
  86. Endpoint: "0.0.0.0:999999",
  87. ServerConfigUDP: defaultServerConfigUDP(),
  88. },
  89. }
  90. set := receivertest.NewNopCreateSettings()
  91. jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
  92. require.NoError(t, err)
  93. assert.Error(t, jr.Start(context.Background(), componenttest.NewNopHost()), "should not have been able to startTraceReception")
  94. require.NoError(t, jr.Shutdown(context.Background()))
  95. }
  96. func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
  97. server := grpc.NewServer(opts...)
  98. lis, err := net.Listen("tcp", "localhost:0")
  99. require.NoError(t, err)
  100. beforeServe(server)
  101. go func() {
  102. err := server.Serve(lis)
  103. require.NoError(t, err)
  104. }()
  105. return server, lis.Addr()
  106. }
  107. type mockSamplingHandler struct {
  108. }
  109. func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
  110. return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
  111. }
  112. func TestJaegerHTTP(t *testing.T) {
  113. s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
  114. api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
  115. })
  116. defer s.GracefulStop()
  117. endpoint := testutil.GetAvailableLocalAddress(t)
  118. config := &configuration{
  119. AgentHTTPEndpoint: endpoint,
  120. }
  121. set := receivertest.NewNopCreateSettings()
  122. jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
  123. require.NoError(t, err)
  124. t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })
  125. assert.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost()), "Start failed")
  126. // allow http server to start
  127. assert.Eventually(t, func() bool {
  128. var conn net.Conn
  129. conn, err = net.Dial("tcp", endpoint)
  130. if err == nil && conn != nil {
  131. conn.Close()
  132. return true
  133. }
  134. return false
  135. }, 10*time.Second, 5*time.Millisecond, "failed to wait for the port to be open")
  136. resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
  137. assert.NoError(t, err, "should not have failed to make request")
  138. if resp != nil {
  139. assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
  140. return
  141. }
  142. t.Fail()
  143. }
  144. func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {
  145. // 1. Create the Jaeger receiver aka "server"
  146. sink := new(consumertest.TracesSink)
  147. set := receivertest.NewNopCreateSettings()
  148. jr, err := newJaegerReceiver(jaegerAgent, receiverConfig, sink, set)
  149. require.NoError(t, err)
  150. t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })
  151. for i := 0; i < 3; i++ {
  152. err = jr.Start(context.Background(), componenttest.NewNopHost())
  153. if err == nil {
  154. break
  155. }
  156. time.Sleep(50 * time.Millisecond)
  157. }
  158. require.NoError(t, err, "Start failed")
  159. // 2. Then send spans to the Jaeger receiver.
  160. jexp, err := newClientUDP(agentEndpoint, jr.config.AgentBinaryThrift.Endpoint != "")
  161. require.NoError(t, err, "Failed to create the Jaeger OpenTelemetry exporter for the live application")
  162. // 3. Now finally send some spans
  163. td := generateTraceData()
  164. batches, err := jaeger.ProtoFromTraces(td)
  165. require.NoError(t, err)
  166. for _, batch := range batches {
  167. require.NoError(t, jexp.EmitBatch(context.Background(), modelToThrift(batch)))
  168. }
  169. require.Eventually(t, func() bool {
  170. return sink.SpanCount() > 0
  171. }, 10*time.Second, 5*time.Millisecond)
  172. gotTraces := sink.AllTraces()
  173. require.Equal(t, 1, len(gotTraces))
  174. assert.EqualValues(t, td, gotTraces[0])
  175. }
  176. func newClientUDP(hostPort string, binary bool) (*agent.AgentClient, error) {
  177. clientTransport, err := thriftudp.NewTUDPClientTransport(hostPort, "")
  178. if err != nil {
  179. return nil, err
  180. }
  181. var protocolFactory thrift.TProtocolFactory
  182. if binary {
  183. protocolFactory = thrift.NewTBinaryProtocolFactoryConf(nil)
  184. } else {
  185. protocolFactory = thrift.NewTCompactProtocolFactoryConf(nil)
  186. }
  187. return agent.NewAgentClientFactory(clientTransport, protocolFactory), nil
  188. }
  189. // Cannot use the testdata because timestamps are nanoseconds.
  190. func generateTraceData() ptrace.Traces {
  191. td := ptrace.NewTraces()
  192. rs := td.ResourceSpans().AppendEmpty()
  193. rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test")
  194. span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  195. span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
  196. span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 7, 6, 5, 4, 3, 2, 1, 0})
  197. span.SetStartTimestamp(1581452772000000000)
  198. span.SetEndTimestamp(1581452773000000000)
  199. return td
  200. }
  201. func modelToThrift(batch *model.Batch) *jaegerthrift.Batch {
  202. return &jaegerthrift.Batch{
  203. Process: processModelToThrift(batch.Process),
  204. Spans: jaegerconvert.FromDomain(batch.Spans),
  205. }
  206. }
  207. func processModelToThrift(process *model.Process) *jaegerthrift.Process {
  208. if process == nil {
  209. return nil
  210. }
  211. return &jaegerthrift.Process{
  212. ServiceName: process.ServiceName,
  213. }
  214. }