trace_loader.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package main
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "github.com/ClickHouse/clickhouse-go/v2"
  6. "github.com/sirupsen/logrus"
  7. pcommon "go.opentelemetry.io/proto/otlp/common/v1"
  8. presource "go.opentelemetry.io/proto/otlp/resource/v1"
  9. mpb "git.cestong.com.cn/cecf/trace-stream-creator/pkg/pb"
  10. ptrace "go.opentelemetry.io/proto/otlp/trace/v1"
  11. )
  12. type TraceLoader struct {
  13. chCli clickhouse.Conn
  14. traceIdChan chan timeRangedTraceID
  15. flatCh chan *mpb.FlatTrace
  16. logger *logrus.Entry
  17. parallelism int
  18. }
  19. func NewTraceLoader(chCli clickhouse.Conn, traceIdChan chan timeRangedTraceID, flatCh chan *mpb.FlatTrace, parallelism int, logger *logrus.Logger) *TraceLoader {
  20. log := logger.WithField("name", "TraceLoader")
  21. return &TraceLoader{chCli: chCli, traceIdChan: traceIdChan, flatCh: flatCh, parallelism: parallelism, logger: log}
  22. }
  23. func (tl *TraceLoader) start(parentCtx context.Context) {
  24. for range tl.parallelism {
  25. go tl.startHandleTraceID(parentCtx)
  26. }
  27. tl.logger.Infof("started, parallelism: %d", tl.parallelism)
  28. }
  29. func (tl *TraceLoader) loadTrace(traceID timeRangedTraceID) (*mpb.FlatTrace, error) {
  30. tl.logger.Debugf("load trace %s", traceID.TraceID)
  31. sql := ""
  32. rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, traceID)
  33. var tb TraceBuilder
  34. ft := tb.buildTrace()
  35. return ft, nil
  36. }
  37. func (tl *TraceLoader) startHandleTraceID(ctx context.Context) {
  38. loop:
  39. for {
  40. select {
  41. case traceId := <-tl.traceIdChan:
  42. if ft, errHandleTrace := tl.loadTrace(traceId); errHandleTrace != nil {
  43. tl.logger.Errorf("load trace %s failed:%v", traceId, errHandleTrace)
  44. } else {
  45. if errHandle := tl.handleFlatTrace(ft); errHandle != nil {
  46. tl.logger.Errorf("handle trace %s failed:%v", traceId, errHandle)
  47. }
  48. }
  49. case <-ctx.Done():
  50. tl.logger.Infof("stop sig received, quit")
  51. break loop
  52. }
  53. }
  54. }
  55. func (tl *TraceLoader) handleFlatTrace(ft *mpb.FlatTrace) error {
  56. // todo metric & backpressure
  57. tl.flatCh <- ft
  58. return nil
  59. }
  60. type TraceBuilder struct {
  61. }
  62. func (tb *TraceBuilder) buildTrace() *mpb.FlatTrace {
  63. span := ptrace.Span{
  64. TraceId: nil,
  65. SpanId: nil,
  66. TraceState: "",
  67. ParentSpanId: nil,
  68. Flags: 0,
  69. Name: "",
  70. Kind: 0,
  71. StartTimeUnixNano: 0,
  72. EndTimeUnixNano: 0,
  73. Attributes: nil,
  74. DroppedAttributesCount: 0,
  75. Events: nil,
  76. DroppedEventsCount: 0,
  77. Links: nil,
  78. DroppedLinksCount: 0,
  79. Status: nil,
  80. }
  81. scope := pcommon.InstrumentationScope{
  82. Name: "scopeName",
  83. Version: "scopeVersion",
  84. DroppedAttributesCount: 0,
  85. Attributes: make([]*pcommon.KeyValue, 0),
  86. }
  87. resource := presource.Resource{Attributes: make([]*pcommon.KeyValue, 0)}
  88. flatSpan := mpb.FlatSpan{
  89. Span: &span,
  90. Scope: &scope,
  91. Resource: &resource,
  92. }
  93. traceID := "traceID"
  94. traceIDBytes, _ := hex.DecodeString(traceID)
  95. tr := mpb.FlatTrace{
  96. TraceId: traceIDBytes,
  97. FlatSpans: []*mpb.FlatSpan{&flatSpan},
  98. }
  99. return &tr
  100. }