pushtask.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package main
  2. import (
  3. "context"
  4. "github.com/prometheus/client_golang/prometheus"
  5. "github.com/sirupsen/logrus"
  6. "sync"
  7. "time"
  8. )
  9. type PushTask struct {
  10. runInterval time.Duration
  11. logger *logrus.Entry
  12. chTopoData chan TopoData
  13. chMetric chan ServiceMetricData
  14. serviceName2Endpoints map[string][]string
  15. agg *Aggregator
  16. pushLatencyDuration time.Duration
  17. taskRunCounter prometheus.Counter
  18. topoChanWait prometheus.Counter
  19. metricChanWait prometheus.Counter
  20. debugQueryDuration time.Duration
  21. includeApps []string
  22. }
  23. func NewPushTask(pushTaskCfg PushTaskConfig, logger *logrus.Logger, chTopoData chan TopoData, ch chan ServiceMetricData,
  24. includeApps []string, agg *Aggregator, reg *prometheus.Registry) *PushTask {
  25. log := logger.WithField("name", "PushTask")
  26. sn2Ep := make(map[string][]string)
  27. taskRunCounter := prometheus.NewCounter(prometheus.CounterOpts{
  28. Name: "i6000pusherPushTaskTaskRunCounter",
  29. Help: "Counter of push task run events",
  30. })
  31. topoChanWait := prometheus.NewCounter(prometheus.CounterOpts{
  32. Name: "i6000pusherPushTaskTopoChanWait",
  33. Help: "写入topo channel等待的毫秒数",
  34. })
  35. metricChanWait := prometheus.NewCounter(prometheus.CounterOpts{
  36. Name: "i6000pusherPushTaskMetricChanWait",
  37. Help: "写入metric channel等待的秒数",
  38. })
  39. reg.MustRegister(taskRunCounter)
  40. reg.MustRegister(topoChanWait)
  41. reg.MustRegister(metricChanWait)
  42. return &PushTask{runInterval: pushTaskCfg.RunInterval(),
  43. pushLatencyDuration: pushTaskCfg.LatencyDuration(),
  44. debugQueryDuration: pushTaskCfg.GetDebugQueryDuration(),
  45. agg: agg,
  46. logger: log,
  47. chTopoData: chTopoData,
  48. chMetric: ch,
  49. serviceName2Endpoints: sn2Ep,
  50. taskRunCounter: taskRunCounter,
  51. topoChanWait: topoChanWait,
  52. metricChanWait: metricChanWait,
  53. includeApps: includeApps,
  54. }
  55. }
  56. func (pt *PushTask) Start(ctx context.Context) {
  57. lastEndTime := time.Now().Add(-pt.pushLatencyDuration)
  58. tk := time.NewTicker(pt.runInterval)
  59. forLoop:
  60. for {
  61. select {
  62. case <-ctx.Done():
  63. pt.logger.Infof("context done, quit")
  64. break forLoop
  65. case <-tk.C:
  66. du := pt.runInterval
  67. if pt.debugQueryDuration > 0 {
  68. du = pt.debugQueryDuration
  69. }
  70. thisTimeEnd := lastEndTime.Add(du)
  71. go pt.runTask(lastEndTime, thisTimeEnd)
  72. lastEndTime = thisTimeEnd
  73. }
  74. }
  75. }
  76. func (pt *PushTask) sendTraceData(appAlias string, start, end time.Time) error {
  77. traceTopoDataList, errLoadBranch := pt.agg.LoadTraceData(start, end, appAlias)
  78. if errLoadBranch != nil {
  79. pt.logger.WithError(errLoadBranch).Errorf("load topo data for appAlias: %s failed", appAlias)
  80. return errLoadBranch
  81. }
  82. writeBefore := time.Now()
  83. for _, topoData := range traceTopoDataList {
  84. pt.chTopoData <- topoData
  85. }
  86. pt.topoChanWait.Add(float64(time.Now().Sub(writeBefore).Milliseconds()))
  87. return nil
  88. }
  89. func (pt *PushTask) sendMetricsData(appAlias string, start, end time.Time) error {
  90. sds, errLoadServiceMetric := pt.agg.LoadMetricData(start, end, appAlias)
  91. if errLoadServiceMetric != nil {
  92. pt.logger.WithError(errLoadServiceMetric).Errorf("load metric data for appAlias:%s failed", appAlias)
  93. return errLoadServiceMetric
  94. }
  95. metricWriteBefore := time.Now()
  96. for _, sd := range sds {
  97. pt.chMetric <- sd
  98. pt.logger.WithField("metricData", sd).Debugf("pushMetricDatas")
  99. }
  100. pt.metricChanWait.Add(float64(time.Now().Sub(metricWriteBefore).Milliseconds()))
  101. return nil
  102. }
  103. func (pt *PushTask) runTask(start time.Time, end time.Time) {
  104. pt.logger.
  105. WithField("start", start).
  106. WithField("end", end).
  107. Infof("run task begin")
  108. pt.taskRunCounter.Inc()
  109. pt.logger.Infof("start run task: time range:[%s, %s]", start.Format("15:04:05"), end.Format("15:04:05"))
  110. runBefore := time.Now()
  111. var wg sync.WaitGroup
  112. for _, appAlias := range pt.includeApps {
  113. aa := appAlias
  114. wg.Add(1)
  115. go func() {
  116. defer wg.Done()
  117. if err := pt.sendTraceData(aa, start, end); err != nil {
  118. pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send trace data failed")
  119. }
  120. }()
  121. }
  122. for _, appAlias := range pt.includeApps {
  123. aa := appAlias
  124. wg.Add(1)
  125. go func() {
  126. defer wg.Done()
  127. if err := pt.sendMetricsData(aa, start, end); err != nil {
  128. pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send metric data failed")
  129. }
  130. }()
  131. }
  132. wg.Wait()
  133. pt.logger.
  134. WithField("start", start).
  135. WithField("costSeconds", time.Now().Sub(runBefore).Seconds()).
  136. WithField("end", end).
  137. Infof("run task done")
  138. }