123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package main
- import (
- "context"
- "encoding/hex"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/sirupsen/logrus"
- pcommon "go.opentelemetry.io/proto/otlp/common/v1"
- presource "go.opentelemetry.io/proto/otlp/resource/v1"
- mpb "git.cestong.com.cn/cecf/trace-stream-creator/pkg/pb"
- ptrace "go.opentelemetry.io/proto/otlp/trace/v1"
- )
- type TraceLoader struct {
- chCli clickhouse.Conn
- traceIdChan chan timeRangedTraceID
- flatCh chan *mpb.FlatTrace
- logger *logrus.Entry
- parallelism int
- }
- func NewTraceLoader(chCli clickhouse.Conn, traceIdChan chan timeRangedTraceID, flatCh chan *mpb.FlatTrace, parallelism int, logger *logrus.Logger) *TraceLoader {
- log := logger.WithField("name", "TraceLoader")
- return &TraceLoader{chCli: chCli, traceIdChan: traceIdChan, flatCh: flatCh, parallelism: parallelism, logger: log}
- }
- func (tl *TraceLoader) start(parentCtx context.Context) {
- for range tl.parallelism {
- go tl.startHandleTraceID(parentCtx)
- }
- tl.logger.Infof("started, parallelism: %d", tl.parallelism)
- }
- func (tl *TraceLoader) loadTrace(traceID timeRangedTraceID) (*mpb.FlatTrace, error) {
- tl.logger.Debugf("load trace %s", traceID.TraceID)
- sql := ""
- rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, traceID)
- var tb TraceBuilder
- ft := tb.buildTrace()
- return ft, nil
- }
- func (tl *TraceLoader) startHandleTraceID(ctx context.Context) {
- loop:
- for {
- select {
- case traceId := <-tl.traceIdChan:
- if ft, errHandleTrace := tl.loadTrace(traceId); errHandleTrace != nil {
- tl.logger.Errorf("load trace %s failed:%v", traceId, errHandleTrace)
- } else {
- if errHandle := tl.handleFlatTrace(ft); errHandle != nil {
- tl.logger.Errorf("handle trace %s failed:%v", traceId, errHandle)
- }
- }
- case <-ctx.Done():
- tl.logger.Infof("stop sig received, quit")
- break loop
- }
- }
- }
- func (tl *TraceLoader) handleFlatTrace(ft *mpb.FlatTrace) error {
- // todo metric & backpressure
- tl.flatCh <- ft
- return nil
- }
- type TraceBuilder struct {
- }
- func (tb *TraceBuilder) buildTrace() *mpb.FlatTrace {
- span := ptrace.Span{
- TraceId: nil,
- SpanId: nil,
- TraceState: "",
- ParentSpanId: nil,
- Flags: 0,
- Name: "",
- Kind: 0,
- StartTimeUnixNano: 0,
- EndTimeUnixNano: 0,
- Attributes: nil,
- DroppedAttributesCount: 0,
- Events: nil,
- DroppedEventsCount: 0,
- Links: nil,
- DroppedLinksCount: 0,
- Status: nil,
- }
- scope := pcommon.InstrumentationScope{
- Name: "scopeName",
- Version: "scopeVersion",
- DroppedAttributesCount: 0,
- Attributes: make([]*pcommon.KeyValue, 0),
- }
- resource := presource.Resource{Attributes: make([]*pcommon.KeyValue, 0)}
- flatSpan := mpb.FlatSpan{
- Span: &span,
- Scope: &scope,
- Resource: &resource,
- }
- traceID := "traceID"
- traceIDBytes, _ := hex.DecodeString(traceID)
- tr := mpb.FlatTrace{
- TraceId: traceIDBytes,
- FlatSpans: []*mpb.FlatSpan{&flatSpan},
- }
- return &tr
- }
|