traces_exporter.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package mockdatadogagentexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockdatasenders/mockdatadogagentexporter"
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/binary"
  8. "fmt"
  9. "net/http"
  10. "github.com/DataDog/datadog-agent/pkg/trace/exportable/pb"
  11. "github.com/tinylib/msgp/msgp"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/component/componenttest"
  14. "go.opentelemetry.io/collector/config/confighttp"
  15. "go.opentelemetry.io/collector/consumer/consumererror"
  16. "go.opentelemetry.io/collector/pdata/pcommon"
  17. "go.opentelemetry.io/collector/pdata/ptrace"
  18. )
  19. type ddExporter struct {
  20. endpoint string
  21. client *http.Client
  22. clientSettings *confighttp.HTTPClientSettings
  23. }
  24. func createExporter(c *Config) *ddExporter {
  25. dd := &ddExporter{
  26. endpoint: c.Endpoint,
  27. clientSettings: &c.HTTPClientSettings,
  28. client: nil,
  29. }
  30. return dd
  31. }
  32. // start creates the http client
  33. func (dd *ddExporter) start(_ context.Context, host component.Host) (err error) {
  34. dd.client, err = dd.clientSettings.ToClient(host, componenttest.NewNopTelemetrySettings())
  35. return
  36. }
  37. func (dd *ddExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
  38. var traces pb.Traces
  39. for i := 0; i < td.ResourceSpans().Len(); i++ {
  40. resSpans := td.ResourceSpans().At(i)
  41. var trace pb.Trace
  42. for l := 0; l < resSpans.ScopeSpans().Len(); l++ {
  43. ils := resSpans.ScopeSpans().At(i)
  44. for s := 0; s < ils.Spans().Len(); s++ {
  45. span := ils.Spans().At(s)
  46. var newSpan = pb.Span{
  47. Service: "test",
  48. Name: "test",
  49. Resource: "test",
  50. Start: int64(span.StartTimestamp()),
  51. Duration: int64(span.EndTimestamp() - span.StartTimestamp()),
  52. Error: 0,
  53. Metrics: nil,
  54. Meta: map[string]string{},
  55. Type: "custom",
  56. }
  57. span.Attributes().Range(func(k string, v pcommon.Value) bool {
  58. newSpan.GetMeta()[k] = v.AsString()
  59. return true
  60. })
  61. var traceIDBytes [16]byte
  62. var spanIDBytes [8]byte
  63. var parentIDBytes [8]byte
  64. traceIDBytes = span.TraceID()
  65. spanIDBytes = span.SpanID()
  66. parentIDBytes = span.ParentSpanID()
  67. binary.BigEndian.PutUint64(traceIDBytes[:], newSpan.TraceID)
  68. binary.BigEndian.PutUint64(spanIDBytes[:], newSpan.SpanID)
  69. binary.BigEndian.PutUint64(parentIDBytes[:], newSpan.ParentID)
  70. trace = append(trace, &newSpan)
  71. }
  72. traces = append(traces, trace)
  73. }
  74. }
  75. var buf bytes.Buffer
  76. err := msgp.Encode(&buf, &traces)
  77. if err != nil {
  78. return consumererror.NewPermanent(fmt.Errorf("failed to encode msgp: %w", err))
  79. }
  80. req, err := http.NewRequestWithContext(ctx, "POST", dd.endpoint, &buf)
  81. if err != nil {
  82. return fmt.Errorf("failed to push trace data via DD exporter: %w", err)
  83. }
  84. req.Header.Set("Content-Type", "application/msgpack")
  85. resp, err := dd.client.Do(req)
  86. if err != nil {
  87. return fmt.Errorf("failed to push trace data via DD exporter: %w", err)
  88. }
  89. _ = resp.Body.Close()
  90. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  91. return fmt.Errorf("failed the request with status code %d", resp.StatusCode)
  92. }
  93. return nil
  94. }