123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package mockdatadogagentexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockdatasenders/mockdatadogagentexporter"
- import (
- "bytes"
- "context"
- "encoding/binary"
- "fmt"
- "net/http"
- "github.com/DataDog/datadog-agent/pkg/trace/exportable/pb"
- "github.com/tinylib/msgp/msgp"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/confighttp"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/ptrace"
- )
- type ddExporter struct {
- endpoint string
- client *http.Client
- clientSettings *confighttp.HTTPClientSettings
- }
- func createExporter(c *Config) *ddExporter {
- dd := &ddExporter{
- endpoint: c.Endpoint,
- clientSettings: &c.HTTPClientSettings,
- client: nil,
- }
- return dd
- }
- // start creates the http client
- func (dd *ddExporter) start(_ context.Context, host component.Host) (err error) {
- dd.client, err = dd.clientSettings.ToClient(host, componenttest.NewNopTelemetrySettings())
- return
- }
- func (dd *ddExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
- var traces pb.Traces
- for i := 0; i < td.ResourceSpans().Len(); i++ {
- resSpans := td.ResourceSpans().At(i)
- var trace pb.Trace
- for l := 0; l < resSpans.ScopeSpans().Len(); l++ {
- ils := resSpans.ScopeSpans().At(i)
- for s := 0; s < ils.Spans().Len(); s++ {
- span := ils.Spans().At(s)
- var newSpan = pb.Span{
- Service: "test",
- Name: "test",
- Resource: "test",
- Start: int64(span.StartTimestamp()),
- Duration: int64(span.EndTimestamp() - span.StartTimestamp()),
- Error: 0,
- Metrics: nil,
- Meta: map[string]string{},
- Type: "custom",
- }
- span.Attributes().Range(func(k string, v pcommon.Value) bool {
- newSpan.GetMeta()[k] = v.AsString()
- return true
- })
- var traceIDBytes [16]byte
- var spanIDBytes [8]byte
- var parentIDBytes [8]byte
- traceIDBytes = span.TraceID()
- spanIDBytes = span.SpanID()
- parentIDBytes = span.ParentSpanID()
- binary.BigEndian.PutUint64(traceIDBytes[:], newSpan.TraceID)
- binary.BigEndian.PutUint64(spanIDBytes[:], newSpan.SpanID)
- binary.BigEndian.PutUint64(parentIDBytes[:], newSpan.ParentID)
- trace = append(trace, &newSpan)
- }
- traces = append(traces, trace)
- }
- }
- var buf bytes.Buffer
- err := msgp.Encode(&buf, &traces)
- if err != nil {
- return consumererror.NewPermanent(fmt.Errorf("failed to encode msgp: %w", err))
- }
- req, err := http.NewRequestWithContext(ctx, "POST", dd.endpoint, &buf)
- if err != nil {
- return fmt.Errorf("failed to push trace data via DD exporter: %w", err)
- }
- req.Header.Set("Content-Type", "application/msgpack")
- resp, err := dd.client.Do(req)
- if err != nil {
- return fmt.Errorf("failed to push trace data via DD exporter: %w", err)
- }
- _ = resp.Body.Close()
- if resp.StatusCode < 200 || resp.StatusCode > 299 {
- return fmt.Errorf("failed the request with status code %d", resp.StatusCode)
- }
- return nil
- }
|