123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package main
- import (
- "context"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
- "sync"
- "time"
- )
- type PushTask struct {
- runInterval time.Duration
- logger *logrus.Entry
- chTopoData chan TopoData
- chMetric chan ServiceMetricData
- serviceName2Endpoints map[string][]string
- agg *Aggregator
- pushLatencyDuration time.Duration
- taskRunCounter prometheus.Counter
- topoChanWait prometheus.Counter
- metricChanWait prometheus.Counter
- debugQueryDuration time.Duration
- includeApps []string
- }
- func NewPushTask(pushTaskCfg PushTaskConfig, logger *logrus.Logger, chTopoData chan TopoData, ch chan ServiceMetricData,
- includeApps []string, agg *Aggregator, reg *prometheus.Registry) *PushTask {
- log := logger.WithField("name", "PushTask")
- sn2Ep := make(map[string][]string)
- taskRunCounter := prometheus.NewCounter(prometheus.CounterOpts{
- Name: "i6000pusherPushTaskTaskRunCounter",
- Help: "Counter of push task run events",
- })
- topoChanWait := prometheus.NewCounter(prometheus.CounterOpts{
- Name: "i6000pusherPushTaskTopoChanWait",
- Help: "写入topo channel等待的毫秒数",
- })
- metricChanWait := prometheus.NewCounter(prometheus.CounterOpts{
- Name: "i6000pusherPushTaskMetricChanWait",
- Help: "写入metric channel等待的秒数",
- })
- reg.MustRegister(taskRunCounter)
- reg.MustRegister(topoChanWait)
- reg.MustRegister(metricChanWait)
- return &PushTask{runInterval: pushTaskCfg.RunInterval(),
- pushLatencyDuration: pushTaskCfg.LatencyDuration(),
- debugQueryDuration: pushTaskCfg.GetDebugQueryDuration(),
- agg: agg,
- logger: log,
- chTopoData: chTopoData,
- chMetric: ch,
- serviceName2Endpoints: sn2Ep,
- taskRunCounter: taskRunCounter,
- topoChanWait: topoChanWait,
- metricChanWait: metricChanWait,
- includeApps: includeApps,
- }
- }
- func (pt *PushTask) Start(ctx context.Context) {
- lastEndTime := time.Now().Add(-pt.pushLatencyDuration)
- tk := time.NewTicker(pt.runInterval)
- forLoop:
- for {
- select {
- case <-ctx.Done():
- pt.logger.Infof("context done, quit")
- break forLoop
- case <-tk.C:
- du := pt.runInterval
- if pt.debugQueryDuration > 0 {
- du = pt.debugQueryDuration
- }
- thisTimeEnd := lastEndTime.Add(du)
- go pt.runTask(lastEndTime, thisTimeEnd)
- lastEndTime = thisTimeEnd
- }
- }
- }
- func (pt *PushTask) sendTraceData(appAlias string, start, end time.Time) error {
- traceTopoDataList, errLoadBranch := pt.agg.LoadTraceData(start, end, appAlias)
- if errLoadBranch != nil {
- pt.logger.WithError(errLoadBranch).Errorf("load topo data for appAlias: %s failed", appAlias)
- return errLoadBranch
- }
- writeBefore := time.Now()
- for _, topoData := range traceTopoDataList {
- pt.chTopoData <- topoData
- }
- pt.topoChanWait.Add(float64(time.Now().Sub(writeBefore).Milliseconds()))
- return nil
- }
- func (pt *PushTask) sendMetricsData(appAlias string, start, end time.Time) error {
- sds, errLoadServiceMetric := pt.agg.LoadMetricData(start, end, appAlias)
- if errLoadServiceMetric != nil {
- pt.logger.WithError(errLoadServiceMetric).Errorf("load metric data for appAlias:%s failed", appAlias)
- return errLoadServiceMetric
- }
- metricWriteBefore := time.Now()
- for _, sd := range sds {
- pt.chMetric <- sd
- pt.logger.WithField("metricData", sd).Debugf("pushMetricDatas")
- }
- pt.metricChanWait.Add(float64(time.Now().Sub(metricWriteBefore).Milliseconds()))
- return nil
- }
- func (pt *PushTask) runTask(start time.Time, end time.Time) {
- pt.logger.
- WithField("start", start).
- WithField("end", end).
- Infof("run task begin")
- pt.taskRunCounter.Inc()
- pt.logger.Infof("start run task: time range:[%s, %s]", start.Format("15:04:05"), end.Format("15:04:05"))
- runBefore := time.Now()
- var wg sync.WaitGroup
- for _, appAlias := range pt.includeApps {
- aa := appAlias
- wg.Add(1)
- go func() {
- defer wg.Done()
- if err := pt.sendTraceData(aa, start, end); err != nil {
- pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send trace data failed")
- }
- }()
- }
- for _, appAlias := range pt.includeApps {
- aa := appAlias
- wg.Add(1)
- go func() {
- defer wg.Done()
- if err := pt.sendMetricsData(aa, start, end); err != nil {
- pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send metric data failed")
- }
- }()
- }
- wg.Wait()
- pt.logger.
- WithField("start", start).
- WithField("costSeconds", time.Now().Sub(runBefore).Seconds()).
- WithField("end", end).
- Infof("run task done")
- }
|