123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- package main
- import (
- "context"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- )
- type timeRangedTraceID struct {
- TimeRange
- TraceID string
- }
- type TraceIDDispatcher struct {
- timeRangeCh chan TimeRange
- traceIDCh chan timeRangedTraceID
- log *logrus.Entry
- chCli clickhouse.Conn
- }
- func NewTraceIDDispatcher(timeRangeCh chan TimeRange, traceIDCh chan timeRangedTraceID, log *logrus.Logger, chCli clickhouse.Conn) *TraceIDDispatcher {
- logger := log.WithField("name", "TraceIDDispatcher")
- return &TraceIDDispatcher{timeRangeCh: timeRangeCh, traceIDCh: traceIDCh, log: logger, chCli: chCli}
- }
- func (td *TraceIDDispatcher) start(parentCtx context.Context) {
- go td.consume(parentCtx)
- td.log.Infof("started")
- }
- func (td *TraceIDDispatcher) consume(parentCtx context.Context) {
- loop:
- for {
- select {
- case <-parentCtx.Done():
- td.log.Infof("stopped")
- break loop
- case tr := <-td.timeRangeCh:
- err := td.handleTimeRange(tr)
- if err != nil {
- td.log.Errorf("error handling time range:%s, %v", tr.SimpleString(), err)
- return
- }
- }
- }
- }
- func (td *TraceIDDispatcher) handleTimeRange(tr TimeRange) error {
- sql := "select distinct TraceId from otel.otel_traces where Timestamp > ? and Timestamp < ?"
- rows, errQuery := td.chCli.Query(context.Background(), sql, tr.begin, tr.end)
- if errQuery != nil {
- return errors.WithMessagef(errQuery, "query with sql:%s", sql)
- }
- for rows.Next() {
- var traceID string
- if errScan := rows.Scan(&traceID); errScan != nil {
- return errors.WithMessage(errScan, "scan to traceId")
- }
- td.log.Debugf("trace id:%v", traceID)
- td.traceIDCh <- timeRangedTraceID{TimeRange: tr, TraceID: traceID}
- }
- return nil
- }
|