liubing 5 bulan lalu
induk
melakukan
cbddf25769
8 mengubah file dengan 212 tambahan dan 52 penghapusan
  1. 2 0
      .gitignore
  2. TEMPAT SAMPAH
      bin/run
  3. TEMPAT SAMPAH
      bin/tracestreamcreator
  4. 50 0
      cmd/tracecreator/chspan.go
  5. 2 3
      cmd/tracecreator/trace_kafka_sink.go
  6. 154 45
      cmd/tracecreator/trace_loader.go
  7. 3 3
      go.mod
  8. 1 1
      go.sum

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+bin
+.idea

TEMPAT SAMPAH
bin/run


TEMPAT SAMPAH
bin/tracestreamcreator


+ 50 - 0
cmd/tracecreator/chspan.go

@@ -0,0 +1,50 @@
+package main
+
+import "time"
+
+type ChSpan struct {
+	Timestamp            time.Time
+	TraceId              string
+	SpanId               string
+	ParentSpanId         string
+	TraceState           string
+	SpanName             string
+	SpanKind             int8
+	ServiceName          string
+	ResourceAttributes   map[string]string
+	ScopeName            string
+	ScopeVersion         string
+	SpanAttributes       map[string]string
+	Duration             int64
+	StatusCode           int32
+	StatusMessage        string
+	EventsTimestamp      []time.Time         `ch:"Events.Timestamp"`
+	EventsName           []string            `ch:"Events.Name"`
+	EventsAttributes     []map[string]string `ch:"Events.Attributes"`
+	LinksTraceId         []string            `ch:"Links.TraceId"`
+	LinksSpanId          []string            `ch:"Links.SpanId"`
+	LinksTraceState      []string            `ch:"Links.TraceState"`
+	LinksAttributes      []map[string]string `ch:"Links.Attributes"`
+	HttpCode             int32               `ch:"HttpCode"`
+	HttpMethod           string              `ch:"HttpMethod"`
+	HttpURL              string              `ch:"HttpURL"`
+	ContainerId          string              `ch:"ContainerId"`
+	SrcIP                string              `ch:"srcIP"`
+	SrcPort              int32               `ch:"srcPort"`
+	TargetIP             string              `ch:"targetIP"`
+	TargetPort           int32               `ch:"targetPort"`
+	RPCType              string              `ch:"RPCType"`
+	RPCName              string              `ch:"RPCName"`
+	RPCRequest           map[string]string   `ch:"RPCRequest"`
+	RPCResult            int8                `ch:"RPCResult"`
+	FuncNameSpace        string              `ch:"FuncNameSpace"`
+	FuncName             string              `ch:"FuncName"`
+	FuncLineNO           int32               `ch:"FuncLineNO"`
+	FuncResult           int32               `ch:"FuncResult"`
+	DbStatement          string              `ch:"dbStatement"`
+	DbConnectionString   string              `ch:"dbConnectionString"`
+	ExceptionsType       []string            `ch:"Exceptions.type"`
+	ExceptionsMessage    []string            `ch:"Exceptions.message"`
+	ExceptionsStacktrace []string            `ch:"Exceptions.stacktrace"`
+	AppAlias             string              `ch:"AppAlias"`
+}

+ 2 - 3
cmd/tracecreator/trace_kafka_sink.go

@@ -3,7 +3,7 @@ package main
 import (
 	"context"
 	"git.cestong.com.cn/cecf/trace-stream-creator/pkg/pb"
-	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/proto"
 	"github.com/segmentio/kafka-go"
 	"github.com/sirupsen/logrus"
 )
@@ -25,7 +25,7 @@ func NewTraceKafkaSink(traceCh chan *pb.FlatTrace, parallelism int, logger *logr
 }
 
 func (s *TraceKafkaSink) start(ctx context.Context) {
-	for _ = range s.parallelism {
+	for i := 0; i < s.parallelism; i++ {
 		writer := kafka.Writer{Addr: kafka.TCP(s.kafkaBrokers...), Topic: s.kafkaTopic, Balancer: &kafka.LeastBytes{}}
 		go s.consumeRoutine(ctx, &writer)
 	}
@@ -50,7 +50,6 @@ loop:
 			break loop
 		case trace := <-s.traceCh:
 			s.sendTrace(trace, wr)
-			s.sendTrace(trace, wr)
 		}
 	}
 	s.logger.Infof("stop consume routine")

+ 154 - 45
cmd/tracecreator/trace_loader.go

@@ -3,12 +3,12 @@ package main
 import (
 	"context"
 	"encoding/hex"
+	mpb "git.cestong.com.cn/cecf/trace-stream-creator/pkg/pb"
 	"github.com/ClickHouse/clickhouse-go/v2"
+	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 	pcommon "go.opentelemetry.io/proto/otlp/common/v1"
 	presource "go.opentelemetry.io/proto/otlp/resource/v1"
-
-	mpb "git.cestong.com.cn/cecf/trace-stream-creator/pkg/pb"
 	ptrace "go.opentelemetry.io/proto/otlp/trace/v1"
 )
 
@@ -26,18 +26,35 @@ func NewTraceLoader(chCli clickhouse.Conn, traceIdChan chan timeRangedTraceID, f
 }
 
 func (tl *TraceLoader) start(parentCtx context.Context) {
-	for range tl.parallelism {
+	for i := 0; i < tl.parallelism; i++ {
 		go tl.startHandleTraceID(parentCtx)
 	}
 	tl.logger.Infof("started, parallelism: %d", tl.parallelism)
 }
 
-func (tl *TraceLoader) loadTrace(traceID timeRangedTraceID) (*mpb.FlatTrace, error) {
-	tl.logger.Debugf("load trace %s", traceID.TraceID)
-	sql := ""
-	rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, traceID)
-	var tb TraceBuilder
-	ft := tb.buildTrace()
+func (tl *TraceLoader) loadTrace(rangedTraceID timeRangedTraceID) (*mpb.FlatTrace, error) {
+	tl.logger.Debugf("load trace %s", rangedTraceID.TraceID)
+	sql := " select Timestamp, TraceId, SpanId, ParentSpanId, TraceState, SpanName, SpanKind, ServiceName, " +
+		"ResourceAttributes, ScopeName, ScopeVersion, SpanAttributes, Duration, StatusCode, StatusMessage, " +
+		"`Events.Timestamp`, `Events.Name`, `Events.Attributes`, `Links.TraceId`, `Links.SpanId`, `Links.TraceState`, " +
+		"`Links.Attributes`, HttpCode, HttpMethod, HttpURL, ContainerId, srcIP, srcPort, targetIP, targetPort, " +
+		"RPCType, RPCName, RPCRequest, RPCResult, FuncNameSpace, FuncName, FuncLineNO, FuncResult, dbStatement, " +
+		"dbConnectionString, `Exceptions.type`, `Exceptions.message`, `Exceptions.stacktrace`, AppAlias " +
+		"from otel.traces where Timestamp < ? and Timestamp > ? and TraceId=?"
+
+	rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, rangedTraceID.end, rangedTraceID.begin, rangedTraceID.TraceID)
+	if errQuerySpans != nil {
+		return nil, errors.WithMessagef(errQuerySpans, "query sql %s", sql)
+	}
+	chSpans := make([]ChSpan, 0, 100)
+	for rows.Next() {
+		var span ChSpan
+		if errScan := rows.ScanStruct(&span); errScan != nil {
+			return nil, errors.WithMessagef(errScan, "scan to ChSpan")
+		}
+		chSpans = append(chSpans, span)
+	}
+	ft := tl.buildTrace(rangedTraceID.TraceID, chSpans)
 	return ft, nil
 }
 
@@ -66,45 +83,137 @@ func (tl *TraceLoader) handleFlatTrace(ft *mpb.FlatTrace) error {
 	return nil
 }
 
-type TraceBuilder struct {
-}
-
-func (tb *TraceBuilder) buildTrace() *mpb.FlatTrace {
-	span := ptrace.Span{
-		TraceId:                nil,
-		SpanId:                 nil,
-		TraceState:             "",
-		ParentSpanId:           nil,
-		Flags:                  0,
-		Name:                   "",
-		Kind:                   0,
-		StartTimeUnixNano:      0,
-		EndTimeUnixNano:        0,
-		Attributes:             nil,
-		DroppedAttributesCount: 0,
-		Events:                 nil,
-		DroppedEventsCount:     0,
-		Links:                  nil,
-		DroppedLinksCount:      0,
-		Status:                 nil,
-	}
-	scope := pcommon.InstrumentationScope{
-		Name:                   "scopeName",
-		Version:                "scopeVersion",
-		DroppedAttributesCount: 0,
-		Attributes:             make([]*pcommon.KeyValue, 0),
-	}
-	resource := presource.Resource{Attributes: make([]*pcommon.KeyValue, 0)}
-	flatSpan := mpb.FlatSpan{
-		Span:     &span,
-		Scope:    &scope,
-		Resource: &resource,
+func (tl *TraceLoader) buildTrace(traceID string, chSpans []ChSpan) *mpb.FlatTrace {
+	flatSpans := make([]*mpb.FlatSpan, 0, len(chSpans))
+	for _, chSpan := range chSpans {
+		attr := make([]*pcommon.KeyValue, 0, len(chSpan.SpanAttributes))
+		for sa, saValue := range chSpan.SpanAttributes {
+			attr = append(attr, &pcommon.KeyValue{
+				Key: sa,
+				Value: &pcommon.AnyValue{
+					Value: &pcommon.AnyValue_StringValue{StringValue: saValue},
+				},
+			})
+		}
+		evs := make([]*ptrace.Span_Event, 0, len(chSpan.EventsTimestamp))
+		if !(len(chSpan.EventsTimestamp) == len(chSpan.EventsName) &&
+			len(chSpan.EventsTimestamp) == len(chSpan.EventsAttributes)) {
+			tl.logger.Errorf("events.name,timestamp,attributes长度不相同")
+			continue
+		}
+		for i, et := range chSpan.EventsTimestamp {
+			evName := chSpan.EventsName[i]
+			evAttr := chSpan.EventsAttributes[i]
+			spanEvAttrs := make([]*pcommon.KeyValue, 0, len(evAttr))
+			for evAttrKey, evAttrValue := range evAttr {
+				spanEvAttrs = append(spanEvAttrs, &pcommon.KeyValue{
+					Key:   evAttrKey,
+					Value: &pcommon.AnyValue{Value: &pcommon.AnyValue_StringValue{StringValue: evAttrValue}},
+				})
+			}
+			evs = append(evs, &ptrace.Span_Event{
+				TimeUnixNano:           uint64(et.UnixNano()),
+				Name:                   evName,
+				Attributes:             spanEvAttrs,
+				DroppedAttributesCount: 0,
+			})
+		}
+		linkSize := len(chSpan.LinksSpanId)
+		if !(linkSize == len(chSpan.LinksTraceId) &&
+			linkSize == len(chSpan.LinksTraceState) &&
+			linkSize == len(chSpan.LinksAttributes)) {
+			tl.logger.Errorf("link.spanId,traceId,attributes,state长度不相同")
+			continue
+		}
+		lks := make([]*ptrace.Span_Link, 0, linkSize)
+		for i, linkSpanId := range chSpan.LinksSpanId {
+			linkAttr := make([]*pcommon.KeyValue, 0, linkSize)
+			for linkKey, linkValue := range chSpan.LinksAttributes[i] {
+				linkAttr = append(linkAttr, &pcommon.KeyValue{
+					Key:   linkKey,
+					Value: &pcommon.AnyValue{Value: &pcommon.AnyValue_StringValue{StringValue: linkValue}},
+				})
+			}
+			linkTraceID, errLinkTraceID := hexIDStr2Bytes(chSpan.LinksTraceId[i])
+			linkSpanID, errLinkSpanID := hexIDStr2Bytes(linkSpanId)
+			if errLinkTraceID != nil || errLinkSpanID != nil {
+				tl.logger.Errorf("link has invalid spanID/traceID, err:%v-%v", errLinkSpanID, errLinkSpanID)
+				continue
+			}
+			lks = append(lks, &ptrace.Span_Link{
+				TraceId:                linkTraceID,
+				SpanId:                 linkSpanID,
+				TraceState:             chSpan.LinksTraceState[i],
+				Attributes:             linkAttr,
+				DroppedAttributesCount: 0,
+				Flags:                  0,
+			})
+		}
+		spanTraceID, errSpanTraceID := hexIDStr2Bytes(chSpan.TraceId)
+		spanSpanID, errSpanSpanID := hexIDStr2Bytes(chSpan.SpanId)
+		spanParentSpanID, errSpanParentSpanID := hexIDStr2Bytes(chSpan.ParentSpanId)
+		if errSpanTraceID != nil || errSpanSpanID != nil || errSpanParentSpanID != nil {
+			tl.logger.Errorf("span has invalid tid/sid/psid:%v-%v-%v", errSpanTraceID, errSpanSpanID, errSpanParentSpanID)
+			continue
+		}
+		span := ptrace.Span{
+			TraceId:                spanTraceID,
+			SpanId:                 spanSpanID,
+			TraceState:             "",
+			ParentSpanId:           spanParentSpanID,
+			Flags:                  0,
+			Name:                   chSpan.SpanName,
+			Kind:                   ptrace.Span_SpanKind(chSpan.SpanKind),
+			StartTimeUnixNano:      uint64(chSpan.Timestamp.UnixNano()),
+			EndTimeUnixNano:        uint64(chSpan.Timestamp.UnixNano() + chSpan.Duration),
+			Attributes:             attr,
+			DroppedAttributesCount: 0,
+			Events:                 evs,
+			DroppedEventsCount:     0,
+			Links:                  lks,
+			DroppedLinksCount:      0,
+			Status: &ptrace.Status{
+				Message: chSpan.StatusMessage,
+				Code:    ptrace.Status_StatusCode(chSpan.StatusCode),
+			},
+		}
+		scope := pcommon.InstrumentationScope{
+			Name:                   chSpan.ScopeName,
+			Version:                chSpan.ScopeVersion,
+			DroppedAttributesCount: 0,
+			Attributes:             make([]*pcommon.KeyValue, 0),
+		}
+		resource := presource.Resource{Attributes: make([]*pcommon.KeyValue, 0, len(chSpan.ResourceAttributes)), DroppedAttributesCount: 0}
+		for raKey, raValue := range chSpan.ResourceAttributes {
+			resource.Attributes = append(resource.Attributes, &pcommon.KeyValue{
+				Key: raKey,
+				Value: &pcommon.AnyValue{
+					Value: &pcommon.AnyValue_StringValue{StringValue: raValue},
+				},
+			})
+		}
+		flatSpan := mpb.FlatSpan{
+			Span:     &span,
+			Scope:    &scope,
+			Resource: &resource,
+		}
+		flatSpans = append(flatSpans, &flatSpan)
 	}
-	traceID := "traceID"
-	traceIDBytes, _ := hex.DecodeString(traceID)
+	traceIDBytes, _ := hexIDStr2Bytes(traceID)
 	tr := mpb.FlatTrace{
 		TraceId:   traceIDBytes,
-		FlatSpans: []*mpb.FlatSpan{&flatSpan},
+		FlatSpans: flatSpans,
 	}
 	return &tr
 }
+
+func hexIDStr2Bytes(idStr string) ([]byte, error) {
+	if idStr == "" {
+		return nil, nil
+	}
+	bs, err := hex.DecodeString(idStr)
+	if err != nil {
+		return nil, errors.WithMessagef(err, "hex decode:[%s]", idStr)
+	}
+	return bs, nil
+}

+ 3 - 3
go.mod

@@ -1,10 +1,11 @@
 module git.cestong.com.cn/cecf/trace-stream-creator
 
-go 1.22.1
+go 1.21.10
 
 require (
 	github.com/ClickHouse/clickhouse-go/v2 v2.29.0
-	github.com/gogo/protobuf v1.3.2
+	github.com/golang/protobuf v1.5.0
+	github.com/pkg/errors v0.9.1
 	github.com/segmentio/kafka-go v0.4.47
 	github.com/sirupsen/logrus v1.9.3
 	go.opentelemetry.io/proto/otlp v1.3.1
@@ -21,7 +22,6 @@ require (
 	github.com/klauspost/compress v1.17.7 // indirect
 	github.com/paulmach/orb v0.11.1 // indirect
 	github.com/pierrec/lz4/v4 v4.1.21 // indirect
-	github.com/pkg/errors v0.9.1 // indirect
 	github.com/segmentio/asm v1.2.0 // indirect
 	github.com/shopspring/decimal v1.4.0 // indirect
 	go.opentelemetry.io/otel v1.26.0 // indirect

+ 1 - 1
go.sum

@@ -11,8 +11,8 @@ github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
 github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
 github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
 github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
-github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=