1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package main
- import (
- "context"
- "encoding/hex"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- )
- type TraceLoader struct {
- chCli clickhouse.Conn
- traceIdChan chan timeRangedTraceID
- flatCh chan *TopoData
- logger *logrus.Entry
- parallelism int
- }
- func NewTraceLoader(chCli clickhouse.Conn, traceIdChan chan timeRangedTraceID, flatCh chan *TopoData, parallelism int, logger *logrus.Logger) *TraceLoader {
- log := logger.WithField("name", "TraceLoader")
- return &TraceLoader{chCli: chCli, traceIdChan: traceIdChan, flatCh: flatCh, parallelism: parallelism, logger: log}
- }
- func (tl *TraceLoader) start(parentCtx context.Context) {
- for i := 0; i < tl.parallelism; i++ {
- go tl.startHandleTraceID(parentCtx)
- }
- tl.logger.Infof("started, parallelism: %d", tl.parallelism)
- }
- func (tl *TraceLoader) loadTrace(rangedTraceID timeRangedTraceID) (*TopoData, error) {
- tl.logger.Debugf("load trace %s", rangedTraceID.TraceID)
- sql := " select Timestamp, TraceId, SpanId, ParentSpanId, TraceState, SpanName, SpanKindNumber, ServiceName, " +
- "ResourceAttributes, ScopeName, ScopeVersion, SpanAttributes, Duration, StatusCodeNumber, StatusMessage, " +
- "`Events.Timestamp`, `Events.Name`, `Events.Attributes`, `Links.TraceId`, `Links.SpanId`, `Links.TraceState`, " +
- "`Links.Attributes`, HttpCode, HttpMethod, HttpURL, ContainerId, srcIP, srcPort, targetIP, targetPort, " +
- "RPCType, RPCName, RPCRequest, RPCResult, FuncNameSpace, FuncName, FuncLineNO, FuncResult, dbStatement, " +
- "dbConnectionString, `Exceptions.type`, `Exceptions.message`, `Exceptions.stacktrace`, AppAlias " +
- "from otel.otel_traces where Timestamp < ? and Timestamp > ? and TraceId=?"
- rows, errQuerySpans := tl.chCli.Query(context.Background(), sql, rangedTraceID.end, rangedTraceID.begin, rangedTraceID.TraceID)
- if errQuerySpans != nil {
- return nil, errors.WithMessagef(errQuerySpans, "query sql %s", sql)
- }
- chSpans := make([]ChSpan, 0, 100)
- for rows.Next() {
- var span ChSpan
- if errScan := rows.ScanStruct(&span); errScan != nil {
- return nil, errors.WithMessagef(errScan, "scan to ChSpan")
- }
- chSpans = append(chSpans, span)
- }
- ft := tl.buildTrace(rangedTraceID.TraceID, chSpans)
- return ft, nil
- }
- func (tl *TraceLoader) startHandleTraceID(ctx context.Context) {
- loop:
- for {
- select {
- case traceId := <-tl.traceIdChan:
- if ft, errHandleTrace := tl.loadTrace(traceId); errHandleTrace != nil {
- tl.logger.Errorf("load trace %v failed:%v", traceId, errHandleTrace)
- } else {
- if errHandle := tl.handleFlatTrace(ft); errHandle != nil {
- tl.logger.Errorf("handle trace %v failed:%v", traceId, errHandle)
- }
- }
- case <-ctx.Done():
- tl.logger.Infof("stop sig received, quit")
- break loop
- }
- }
- }
- func (tl *TraceLoader) handleFlatTrace(ft *TopoData) error {
- // todo metric & backpressure
- tl.flatCh <- ft
- return nil
- }
- func (tl *TraceLoader) buildTrace(traceID string, chSpans []ChSpan) *TopoData {
- return &TopoData{}
- }
- func hexIDStr2Bytes(idStr string) ([]byte, error) {
- if idStr == "" {
- return nil, nil
- }
- bs, err := hex.DecodeString(idStr)
- if err != nil {
- return nil, errors.WithMessagef(err, "hex decode:[%s]", idStr)
- }
- return bs, nil
- }
|