traceiddispatcher.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package main
  2. import (
  3. "context"
  4. "github.com/ClickHouse/clickhouse-go/v2"
  5. "github.com/pkg/errors"
  6. "github.com/sirupsen/logrus"
  7. )
  8. type timeRangedTraceID struct {
  9. TimeRange
  10. TraceID string
  11. }
  12. type TraceIDDispatcher struct {
  13. timeRangeCh chan TimeRange
  14. traceIDCh chan timeRangedTraceID
  15. log *logrus.Entry
  16. chCli clickhouse.Conn
  17. }
  18. func NewTraceIDDispatcher(timeRangeCh chan TimeRange, traceIDCh chan timeRangedTraceID, log *logrus.Logger, chCli clickhouse.Conn) *TraceIDDispatcher {
  19. logger := log.WithField("name", "TraceIDDispatcher")
  20. return &TraceIDDispatcher{timeRangeCh: timeRangeCh, traceIDCh: traceIDCh, log: logger, chCli: chCli}
  21. }
  22. func (td *TraceIDDispatcher) start(parentCtx context.Context) {
  23. go td.consume(parentCtx)
  24. td.log.Infof("started")
  25. }
  26. func (td *TraceIDDispatcher) consume(parentCtx context.Context) {
  27. loop:
  28. for {
  29. select {
  30. case <-parentCtx.Done():
  31. td.log.Infof("stopped")
  32. break loop
  33. case tr := <-td.timeRangeCh:
  34. err := td.handleTimeRange(tr)
  35. if err != nil {
  36. td.log.Errorf("error handling time range:%s, %v", tr.SimpleString(), err)
  37. return
  38. }
  39. }
  40. }
  41. }
  42. func (td *TraceIDDispatcher) handleTimeRange(tr TimeRange) error {
  43. sql := "select distinct TraceId from otel.otel_traces where Timestamp > ? and Timestamp < ?"
  44. rows, errQuery := td.chCli.Query(context.Background(), sql, tr.begin, tr.end)
  45. if errQuery != nil {
  46. return errors.WithMessagef(errQuery, "query with sql:%s", sql)
  47. }
  48. for rows.Next() {
  49. var traceID string
  50. if errScan := rows.Scan(&traceID); errScan != nil {
  51. return errors.WithMessage(errScan, "scan to traceId")
  52. }
  53. td.log.Debugf("trace id:%v", traceID)
  54. td.traceIDCh <- timeRangedTraceID{TimeRange: tr, TraceID: traceID}
  55. }
  56. return nil
  57. }