trace_loader.go 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package main
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "github.com/ClickHouse/clickhouse-go/v2"
  6. "github.com/pkg/errors"
  7. "github.com/sirupsen/logrus"
  8. )
  9. type TraceLoader struct {
  10. chCli clickhouse.Conn
  11. traceIdChan chan timeRangedTraceID
  12. flatCh chan *TopoData
  13. logger *logrus.Entry
  14. parallelism int
  15. }
  16. func NewTraceLoader(chCli clickhouse.Conn, traceIdChan chan timeRangedTraceID, flatCh chan *TopoData, parallelism int, logger *logrus.Logger) *TraceLoader {
  17. log := logger.WithField("name", "TraceLoader")
  18. return &TraceLoader{chCli: chCli, traceIdChan: traceIdChan, flatCh: flatCh, parallelism: parallelism, logger: log}
  19. }
  20. func (tl *TraceLoader) start(parentCtx context.Context) {
  21. for i := 0; i < tl.parallelism; i++ {
  22. go tl.startHandleTraceID(parentCtx)
  23. }
  24. tl.logger.Infof("started, parallelism: %d", tl.parallelism)
  25. }
  26. func (tl *TraceLoader) loadTrace(rangedTraceID timeRangedTraceID) (*TopoData, error) {
  27. tl.logger.Debugf("load trace %s", rangedTraceID.TraceID)
  28. sql := " select Timestamp, TraceId, SpanId, ParentSpanId, TraceState, SpanName, SpanKindNumber, ServiceName, " +
  29. "ResourceAttributes, ScopeName, ScopeVersion, SpanAttributes, Duration, StatusCodeNumber, StatusMessage, " +
  30. "`Events.Timestamp`, `Events.Name`, `Events.Attributes`, `Links.TraceId`, `Links.SpanId`, `Links.TraceState`, " +
  31. "`Links.Attributes`, HttpCode, HttpMethod, HttpURL, ContainerId, srcIP, srcPort, targetIP, targetPort, " +
  32. "RPCType, RPCName, RPCRequest, RPCResult, FuncNameSpace, FuncName, FuncLineNO, FuncResult, dbStatement, " +
  33. "dbConnectionString, `Exceptions.type`, `Exceptions.message`, `Exceptions.stacktrace`, AppAlias " +
  34. "from otel.otel_traces where Timestamp < ? and Timestamp > ? and TraceId=?"
  35. rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, rangedTraceID.end, rangedTraceID.begin, rangedTraceID.TraceID)
  36. if errQuerySpans != nil {
  37. return nil, errors.WithMessagef(errQuerySpans, "query sql %s", sql)
  38. }
  39. chSpans := make([]ChSpan, 0, 100)
  40. for rows.Next() {
  41. var span ChSpan
  42. if errScan := rows.ScanStruct(&span); errScan != nil {
  43. return nil, errors.WithMessagef(errScan, "scan to ChSpan")
  44. }
  45. chSpans = append(chSpans, span)
  46. }
  47. ft := tl.buildTrace(rangedTraceID.TraceID, chSpans)
  48. return ft, nil
  49. }
  50. func (tl *TraceLoader) startHandleTraceID(ctx context.Context) {
  51. loop:
  52. for {
  53. select {
  54. case traceId := <-tl.traceIdChan:
  55. if ft, errHandleTrace := tl.loadTrace(traceId); errHandleTrace != nil {
  56. tl.logger.Errorf("load trace %v failed:%v", traceId, errHandleTrace)
  57. } else {
  58. if errHandle := tl.handleFlatTrace(ft); errHandle != nil {
  59. tl.logger.Errorf("handle trace %v failed:%v", traceId, errHandle)
  60. }
  61. }
  62. case <-ctx.Done():
  63. tl.logger.Infof("stop sig received, quit")
  64. break loop
  65. }
  66. }
  67. }
  68. func (tl *TraceLoader) handleFlatTrace(ft *TopoData) error {
  69. // todo metric & backpressure
  70. tl.flatCh <- ft
  71. return nil
  72. }
  73. func (tl *TraceLoader) buildTrace(traceID string, chSpans []ChSpan) *TopoData {
  74. return &TopoData{}
  75. }
  76. func hexIDStr2Bytes(idStr string) ([]byte, error) {
  77. if idStr == "" {
  78. return nil, nil
  79. }
  80. bs, err := hex.DecodeString(idStr)
  81. if err != nil {
  82. return nil, errors.WithMessagef(err, "hex decode:[%s]", idStr)
  83. }
  84. return bs, nil
  85. }