package main import ( "context" "github.com/ClickHouse/clickhouse-go/v2" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) type timeRangedTraceID struct { TimeRange TraceID string } type TraceIDDispatcher struct { timeRangeCh chan TimeRange traceIDCh chan timeRangedTraceID log *logrus.Entry chCli clickhouse.Conn } func NewTraceIDDispatcher(timeRangeCh chan TimeRange, traceIDCh chan timeRangedTraceID, log *logrus.Logger, chCli clickhouse.Conn) *TraceIDDispatcher { logger := log.WithField("name", "TraceIDDispatcher") return &TraceIDDispatcher{timeRangeCh: timeRangeCh, traceIDCh: traceIDCh, log: logger, chCli: chCli} } func (td *TraceIDDispatcher) start(parentCtx context.Context) { go td.consume(parentCtx) td.log.Infof("started") } func (td *TraceIDDispatcher) consume(parentCtx context.Context) { loop: for { select { case <-parentCtx.Done(): td.log.Infof("stopped") break loop case tr := <-td.timeRangeCh: err := td.handleTimeRange(tr) if err != nil { td.log.Errorf("error handling time range:%s, %v", tr.SimpleString(), err) return } } } } func (td *TraceIDDispatcher) handleTimeRange(tr TimeRange) error { sql := "select distinct TraceId from otel.otel_traces where Timestamp > ? and Timestamp < ?" rows, errQuery := td.chCli.Query(context.Background(), sql, tr.begin, tr.end) if errQuery != nil { return errors.WithMessagef(errQuery, "query with sql:%s", sql) } for rows.Next() { var traceID string if errScan := rows.Scan(&traceID); errScan != nil { return errors.WithMessage(errScan, "scan to traceId") } td.log.Debugf("trace id:%v", traceID) td.traceIDCh <- timeRangedTraceID{TimeRange: tr, TraceID: traceID} } return nil }