trace_receiver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package jaegerreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "html"
  9. "io"
  10. "mime"
  11. "net/http"
  12. "sync"
  13. apacheThrift "github.com/apache/thrift/lib/go/thrift"
  14. "github.com/gorilla/mux"
  15. "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
  16. "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
  17. "github.com/jaegertracing/jaeger/cmd/agent/app/processors"
  18. "github.com/jaegertracing/jaeger/cmd/agent/app/servers"
  19. "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
  20. "github.com/jaegertracing/jaeger/model"
  21. "github.com/jaegertracing/jaeger/pkg/metrics"
  22. "github.com/jaegertracing/jaeger/proto-gen/api_v2"
  23. "github.com/jaegertracing/jaeger/thrift-gen/agent"
  24. "github.com/jaegertracing/jaeger/thrift-gen/baggage"
  25. "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
  26. "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
  27. "go.opentelemetry.io/collector/component"
  28. "go.opentelemetry.io/collector/config/configgrpc"
  29. "go.opentelemetry.io/collector/config/confighttp"
  30. "go.opentelemetry.io/collector/consumer"
  31. "go.opentelemetry.io/collector/receiver"
  32. "go.opentelemetry.io/collector/receiver/receiverhelper"
  33. "go.uber.org/multierr"
  34. "google.golang.org/grpc"
  35. jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
  36. )
  37. // configuration defines the behavior and the ports that
  38. // the Jaeger receiver will use.
  39. type configuration struct {
  40. CollectorHTTPSettings confighttp.HTTPServerSettings
  41. CollectorGRPCServerSettings configgrpc.GRPCServerSettings
  42. AgentCompactThrift ProtocolUDP
  43. AgentBinaryThrift ProtocolUDP
  44. AgentHTTPEndpoint string
  45. }
  46. // Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
  47. // This receiver is basically a Jaeger collector.
  48. type jReceiver struct {
  49. nextConsumer consumer.Traces
  50. id component.ID
  51. config *configuration
  52. grpc *grpc.Server
  53. collectorServer *http.Server
  54. agentProcessors []processors.Processor
  55. agentServer *http.Server
  56. goroutines sync.WaitGroup
  57. settings receiver.CreateSettings
  58. grpcObsrecv *receiverhelper.ObsReport
  59. httpObsrecv *receiverhelper.ObsReport
  60. }
  61. const (
  62. agentTransportBinary = "udp_thrift_binary"
  63. agentTransportCompact = "udp_thrift_compact"
  64. collectorHTTPTransport = "collector_http"
  65. grpcTransport = "grpc"
  66. thriftFormat = "thrift"
  67. protobufFormat = "protobuf"
  68. )
  69. var (
  70. acceptedThriftFormats = map[string]struct{}{
  71. "application/x-thrift": {},
  72. "application/vnd.apache.thrift.binary": {},
  73. }
  74. )
  75. // newJaegerReceiver creates a TracesReceiver that receives traffic as a Jaeger collector, and
  76. // also as a Jaeger agent.
  77. func newJaegerReceiver(
  78. id component.ID,
  79. config *configuration,
  80. nextConsumer consumer.Traces,
  81. set receiver.CreateSettings,
  82. ) (*jReceiver, error) {
  83. grpcObsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  84. ReceiverID: id,
  85. Transport: grpcTransport,
  86. ReceiverCreateSettings: set,
  87. })
  88. if err != nil {
  89. return nil, err
  90. }
  91. httpObsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  92. ReceiverID: id,
  93. Transport: collectorHTTPTransport,
  94. ReceiverCreateSettings: set,
  95. })
  96. if err != nil {
  97. return nil, err
  98. }
  99. return &jReceiver{
  100. config: config,
  101. nextConsumer: nextConsumer,
  102. id: id,
  103. settings: set,
  104. grpcObsrecv: grpcObsrecv,
  105. httpObsrecv: httpObsrecv,
  106. }, nil
  107. }
  108. func (jr *jReceiver) Start(_ context.Context, host component.Host) error {
  109. if err := jr.startAgent(host); err != nil {
  110. return err
  111. }
  112. return jr.startCollector(host)
  113. }
  114. func (jr *jReceiver) Shutdown(ctx context.Context) error {
  115. var errs error
  116. if jr.agentServer != nil {
  117. if aerr := jr.agentServer.Shutdown(ctx); aerr != nil {
  118. errs = multierr.Append(errs, aerr)
  119. }
  120. }
  121. for _, processor := range jr.agentProcessors {
  122. processor.Stop()
  123. }
  124. if jr.collectorServer != nil {
  125. if cerr := jr.collectorServer.Shutdown(ctx); cerr != nil {
  126. errs = multierr.Append(errs, cerr)
  127. }
  128. }
  129. if jr.grpc != nil {
  130. jr.grpc.GracefulStop()
  131. }
  132. jr.goroutines.Wait()
  133. return errs
  134. }
  135. func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.Traces) (int, error) {
  136. if batch == nil {
  137. return 0, nil
  138. }
  139. td, err := jaegertranslator.ThriftToTraces(batch)
  140. if err != nil {
  141. return 0, err
  142. }
  143. return len(batch.Spans), consumer.ConsumeTraces(ctx, td)
  144. }
  145. var _ agent.Agent = (*agentHandler)(nil)
  146. var _ api_v2.CollectorServiceServer = (*jReceiver)(nil)
  147. var _ configmanager.ClientConfigManager = (*notImplementedConfigManager)(nil)
  148. var errNotImplemented = fmt.Errorf("not implemented")
  149. type notImplementedConfigManager struct{}
  150. func (notImplementedConfigManager) GetSamplingStrategy(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) {
  151. return nil, errNotImplemented
  152. }
  153. func (notImplementedConfigManager) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) {
  154. return nil, errNotImplemented
  155. }
  156. type agentHandler struct {
  157. nextConsumer consumer.Traces
  158. obsrecv *receiverhelper.ObsReport
  159. }
  160. // EmitZipkinBatch is unsupported agent's
  161. func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err error) {
  162. panic("unsupported receiver")
  163. }
  164. // EmitBatch implements thrift-gen/agent/Agent and it forwards
  165. // Jaeger spans received by the Jaeger agent processor.
  166. func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
  167. ctx = h.obsrecv.StartTracesOp(ctx)
  168. numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
  169. h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
  170. return err
  171. }
  172. func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
  173. ctx = jr.grpcObsrecv.StartTracesOp(ctx)
  174. batch := r.GetBatch()
  175. td, err := jaegertranslator.ProtoToTraces([]*model.Batch{&batch})
  176. if err != nil {
  177. jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(batch.Spans), err)
  178. return nil, err
  179. }
  180. err = jr.nextConsumer.ConsumeTraces(ctx, td)
  181. jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(batch.Spans), err)
  182. if err != nil {
  183. return nil, err
  184. }
  185. return &api_v2.PostSpansResponse{}, nil
  186. }
  187. func (jr *jReceiver) startAgent(host component.Host) error {
  188. if jr.config == nil {
  189. return nil
  190. }
  191. if jr.config.AgentBinaryThrift.Endpoint != "" {
  192. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  193. ReceiverID: jr.id,
  194. Transport: agentTransportBinary,
  195. ReceiverCreateSettings: jr.settings,
  196. })
  197. if err != nil {
  198. return err
  199. }
  200. h := &agentHandler{
  201. nextConsumer: jr.nextConsumer,
  202. obsrecv: obsrecv,
  203. }
  204. processor, err := jr.buildProcessor(jr.config.AgentBinaryThrift.Endpoint, jr.config.AgentBinaryThrift.ServerConfigUDP, apacheThrift.NewTBinaryProtocolFactoryConf(nil), h)
  205. if err != nil {
  206. return err
  207. }
  208. jr.agentProcessors = append(jr.agentProcessors, processor)
  209. }
  210. if jr.config.AgentCompactThrift.Endpoint != "" {
  211. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  212. ReceiverID: jr.id,
  213. Transport: agentTransportCompact,
  214. ReceiverCreateSettings: jr.settings,
  215. })
  216. if err != nil {
  217. return err
  218. }
  219. h := &agentHandler{
  220. nextConsumer: jr.nextConsumer,
  221. obsrecv: obsrecv,
  222. }
  223. processor, err := jr.buildProcessor(jr.config.AgentCompactThrift.Endpoint, jr.config.AgentCompactThrift.ServerConfigUDP, apacheThrift.NewTCompactProtocolFactoryConf(nil), h)
  224. if err != nil {
  225. return err
  226. }
  227. jr.agentProcessors = append(jr.agentProcessors, processor)
  228. }
  229. jr.goroutines.Add(len(jr.agentProcessors))
  230. for _, processor := range jr.agentProcessors {
  231. go func(p processors.Processor) {
  232. defer jr.goroutines.Done()
  233. p.Serve()
  234. }(processor)
  235. }
  236. if jr.config.AgentHTTPEndpoint != "" {
  237. jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, &notImplementedConfigManager{}, metrics.NullFactory, jr.settings.Logger)
  238. jr.goroutines.Add(1)
  239. go func() {
  240. defer jr.goroutines.Done()
  241. if err := jr.agentServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) && err != nil {
  242. host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err))
  243. }
  244. }()
  245. }
  246. return nil
  247. }
  248. func (jr *jReceiver) buildProcessor(address string, cfg ServerConfigUDP, factory apacheThrift.TProtocolFactory, a agent.Agent) (processors.Processor, error) {
  249. handler := agent.NewAgentProcessor(a)
  250. transport, err := thriftudp.NewTUDPServerTransport(address)
  251. if err != nil {
  252. return nil, err
  253. }
  254. if cfg.SocketBufferSize > 0 {
  255. if err = transport.SetSocketBufferSize(cfg.SocketBufferSize); err != nil {
  256. return nil, err
  257. }
  258. }
  259. server, err := servers.NewTBufferedServer(transport, cfg.QueueSize, cfg.MaxPacketSize, metrics.NullFactory)
  260. if err != nil {
  261. return nil, err
  262. }
  263. processor, err := processors.NewThriftProcessor(server, cfg.Workers, metrics.NullFactory, factory, handler, jr.settings.Logger)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return processor, nil
  268. }
  269. func (jr *jReceiver) decodeThriftHTTPBody(r *http.Request) (*jaeger.Batch, *httpError) {
  270. bodyBytes, err := io.ReadAll(r.Body)
  271. r.Body.Close()
  272. if err != nil {
  273. return nil, &httpError{
  274. fmt.Sprintf("Unable to process request body: %v", err),
  275. http.StatusInternalServerError,
  276. }
  277. }
  278. contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
  279. if err != nil {
  280. return nil, &httpError{
  281. fmt.Sprintf("Cannot parse content type: %v", err),
  282. http.StatusBadRequest,
  283. }
  284. }
  285. if _, ok := acceptedThriftFormats[contentType]; !ok {
  286. return nil, &httpError{
  287. fmt.Sprintf("Unsupported content type: %v", contentType),
  288. http.StatusBadRequest,
  289. }
  290. }
  291. tdes := apacheThrift.NewTDeserializer()
  292. batch := &jaeger.Batch{}
  293. if err = tdes.Read(r.Context(), batch, bodyBytes); err != nil {
  294. return nil, &httpError{
  295. fmt.Sprintf("Unable to process request body: %v", err),
  296. http.StatusBadRequest,
  297. }
  298. }
  299. return batch, nil
  300. }
  301. // HandleThriftHTTPBatch implements Jaeger HTTP Thrift handler.
  302. func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Request) {
  303. ctx := jr.httpObsrecv.StartTracesOp(r.Context())
  304. batch, hErr := jr.decodeThriftHTTPBody(r)
  305. if hErr != nil {
  306. http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
  307. jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr)
  308. return
  309. }
  310. numSpans, err := consumeTraces(ctx, batch, jr.nextConsumer)
  311. if err != nil {
  312. http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
  313. } else {
  314. w.WriteHeader(http.StatusAccepted)
  315. }
  316. jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
  317. }
  318. func (jr *jReceiver) startCollector(host component.Host) error {
  319. if jr.config == nil {
  320. return nil
  321. }
  322. if jr.config.CollectorHTTPSettings.Endpoint != "" {
  323. cln, err := jr.config.CollectorHTTPSettings.ToListener()
  324. if err != nil {
  325. return fmt.Errorf("failed to bind to Collector address %q: %w",
  326. jr.config.CollectorHTTPSettings.Endpoint, err)
  327. }
  328. nr := mux.NewRouter()
  329. nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost)
  330. jr.collectorServer, err = jr.config.CollectorHTTPSettings.ToServer(host, jr.settings.TelemetrySettings, nr)
  331. if err != nil {
  332. return err
  333. }
  334. jr.goroutines.Add(1)
  335. go func() {
  336. defer jr.goroutines.Done()
  337. if errHTTP := jr.collectorServer.Serve(cln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  338. host.ReportFatalError(errHTTP)
  339. }
  340. }()
  341. }
  342. if jr.config.CollectorGRPCServerSettings.NetAddr.Endpoint != "" {
  343. var err error
  344. jr.grpc, err = jr.config.CollectorGRPCServerSettings.ToServer(host, jr.settings.TelemetrySettings)
  345. if err != nil {
  346. return fmt.Errorf("failed to build the options for the Jaeger gRPC Collector: %w", err)
  347. }
  348. ln, err := jr.config.CollectorGRPCServerSettings.ToListener()
  349. if err != nil {
  350. return fmt.Errorf("failed to bind to gRPC address %q: %w", jr.config.CollectorGRPCServerSettings.NetAddr, err)
  351. }
  352. api_v2.RegisterCollectorServiceServer(jr.grpc, jr)
  353. jr.goroutines.Add(1)
  354. go func() {
  355. defer jr.goroutines.Done()
  356. if errGrpc := jr.grpc.Serve(ln); !errors.Is(errGrpc, grpc.ErrServerStopped) && errGrpc != nil {
  357. host.ReportFatalError(errGrpc)
  358. }
  359. }()
  360. }
  361. return nil
  362. }