agent.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package datadog // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
  4. import (
  5. "context"
  6. "net/http"
  7. "runtime"
  8. "sync"
  9. "time"
  10. pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
  11. "github.com/DataDog/datadog-agent/pkg/trace/agent"
  12. "github.com/DataDog/datadog-agent/pkg/trace/api"
  13. traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
  14. "github.com/DataDog/datadog-agent/pkg/trace/stats"
  15. "github.com/DataDog/datadog-agent/pkg/trace/telemetry"
  16. "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
  17. "go.opentelemetry.io/collector/pdata/ptrace"
  18. )
  19. // keyStatsComputed specifies the resource attribute key which indicates if stats have been
  20. // computed for the resource spans.
  21. const keyStatsComputed = "_dd.stats_computed"
  22. // TraceAgent specifies a minimal trace agent instance that is able to process traces and output stats.
  23. type TraceAgent struct {
  24. *agent.Agent
  25. // pchan specifies the channel that will be used to output Datadog Trace Agent API Payloads
  26. // resulting from ingested OpenTelemetry spans.
  27. pchan chan *api.Payload
  28. // wg waits for all goroutines to exit.
  29. wg sync.WaitGroup
  30. // exit signals the agent to shut down.
  31. exit chan struct{}
  32. }
  33. // newAgent creates a new unstarted traceagent using the given context. Call Start to start the traceagent.
  34. // The out channel will receive outoing stats payloads resulting from spans ingested using the Ingest method.
  35. func NewAgent(ctx context.Context, out chan *pb.StatsPayload) *TraceAgent {
  36. return NewAgentWithConfig(ctx, traceconfig.New(), out)
  37. }
  38. // newAgentWithConfig creates a new traceagent with the given config cfg. Used in tests; use newAgent instead.
  39. func NewAgentWithConfig(ctx context.Context, cfg *traceconfig.AgentConfig, out chan *pb.StatsPayload) *TraceAgent {
  40. // disable the HTTP receiver
  41. cfg.ReceiverPort = 0
  42. // set the API key to succeed startup; it is never used nor needed
  43. cfg.Endpoints[0].APIKey = "skip_check"
  44. // set the default hostname to the translator's placeholder; in the case where no hostname
  45. // can be deduced from incoming traces, we don't know the default hostname (because it is set
  46. // in the exporter). In order to avoid duplicating the hostname setting in the processor and
  47. // exporter, we use a placeholder and fill it in later (in the Datadog Exporter or Agent OTLP
  48. // Ingest). This gives a better user experience.
  49. cfg.Hostname = metrics.UnsetHostnamePlaceholder
  50. pchan := make(chan *api.Payload, 1000)
  51. a := agent.NewAgent(ctx, cfg, telemetry.NewNoopCollector())
  52. // replace the Concentrator (the component which computes and flushes APM Stats from incoming
  53. // traces) with our own, which uses the 'out' channel.
  54. a.Concentrator = stats.NewConcentrator(cfg, out, time.Now())
  55. // ...and the same for the ClientStatsAggregator; we don't use it here, but it is also a source
  56. // of stats which should be available to us.
  57. a.ClientStatsAggregator = stats.NewClientStatsAggregator(cfg, out)
  58. // lastly, start the OTLP receiver, which will be used to introduce ResourceSpans into the traceagent,
  59. // so that we can transform them to Datadog spans and receive stats.
  60. a.OTLPReceiver = api.NewOTLPReceiver(pchan, cfg)
  61. return &TraceAgent{
  62. Agent: a,
  63. exit: make(chan struct{}),
  64. pchan: pchan,
  65. }
  66. }
  67. // Start starts the traceagent, making it ready to ingest spans.
  68. func (p *TraceAgent) Start() {
  69. // we don't need to start the full agent, so we only start a set of minimal
  70. // components needed to compute stats:
  71. for _, starter := range []interface{ Start() }{
  72. p.Concentrator,
  73. p.ClientStatsAggregator,
  74. // we don't need the samplers' nor the processor's functionalities;
  75. // but they are used by the agent nevertheless, so they need to be
  76. // active and functioning.
  77. p.PrioritySampler,
  78. p.ErrorsSampler,
  79. p.NoPrioritySampler,
  80. p.EventProcessor,
  81. } {
  82. starter.Start()
  83. }
  84. p.goDrain()
  85. p.goProcess()
  86. }
  87. // Stop stops the traceagent, making it unable to ingest spans. Do not call Ingest after Stop.
  88. func (p *TraceAgent) Stop() {
  89. for _, stopper := range []interface{ Stop() }{
  90. p.Concentrator,
  91. p.ClientStatsAggregator,
  92. p.PrioritySampler,
  93. p.ErrorsSampler,
  94. p.NoPrioritySampler,
  95. p.EventProcessor,
  96. } {
  97. stopper.Stop()
  98. }
  99. close(p.exit)
  100. p.wg.Wait()
  101. }
  102. // goDrain drains the TraceWriter channel, ensuring it won't block. We don't need the traces,
  103. // nor do we have a running TraceWrite. We just want the outgoing stats.
  104. func (p *TraceAgent) goDrain() {
  105. p.wg.Add(1)
  106. go func() {
  107. defer p.wg.Done()
  108. for {
  109. select {
  110. case <-p.TraceWriter.In:
  111. // we don't write these traces anywhere; drain the channel
  112. case <-p.exit:
  113. return
  114. }
  115. }
  116. }()
  117. }
  118. // Ingest processes the given spans within the traceagent and outputs stats through the output channel
  119. // provided to newAgent. Do not call Ingest on an unstarted or stopped traceagent.
  120. func (p *TraceAgent) Ingest(ctx context.Context, traces ptrace.Traces) {
  121. rspanss := traces.ResourceSpans()
  122. for i := 0; i < rspanss.Len(); i++ {
  123. rspans := rspanss.At(i)
  124. p.OTLPReceiver.ReceiveResourceSpans(ctx, rspans, http.Header{})
  125. // ...the call transforms the OTLP Spans into a Datadog payload and sends the result
  126. // down the p.pchan channel
  127. // Stats will be computed for p. Mark the original resource spans to ensure that they don't
  128. // get computed twice in case these spans pass through here again.
  129. rspans.Resource().Attributes().PutBool(keyStatsComputed, true)
  130. }
  131. }
  132. // goProcesses runs the main loop which takes incoming payloads, processes them and generates stats.
  133. // It then picks up those stats and converts them to metrics.
  134. func (p *TraceAgent) goProcess() {
  135. for i := 0; i < runtime.NumCPU(); i++ {
  136. p.wg.Add(1)
  137. go func() {
  138. defer p.wg.Done()
  139. for {
  140. select {
  141. case payload := <-p.pchan:
  142. p.Process(payload)
  143. // ...the call processes the payload and outputs stats via the 'out' channel
  144. // provided to newAgent
  145. case <-p.exit:
  146. return
  147. }
  148. }
  149. }()
  150. }
  151. }
  152. var _ Ingester = (*TraceAgent)(nil)
  153. // An Ingester is able to ingest traces. Implemented by traceagent.
  154. type Ingester interface {
  155. // Start starts the ingester.
  156. Start()
  157. // Ingest ingests the set of traces.
  158. Ingest(ctx context.Context, traces ptrace.Traces)
  159. // Stop stops the ingester.
  160. Stop()
  161. }