package main import ( "context" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "sync" "time" ) type PushTask struct { runInterval time.Duration logger *logrus.Entry chTopoData chan TopoData chMetric chan ServiceMetricData serviceName2Endpoints map[string][]string agg *Aggregator pushLatencyDuration time.Duration taskRunCounter prometheus.Counter topoChanWait prometheus.Counter metricChanWait prometheus.Counter debugQueryDuration time.Duration includeApps []string } func NewPushTask(pushTaskCfg PushTaskConfig, logger *logrus.Logger, chTopoData chan TopoData, ch chan ServiceMetricData, includeApps []string, agg *Aggregator, reg *prometheus.Registry) *PushTask { log := logger.WithField("name", "PushTask") sn2Ep := make(map[string][]string) taskRunCounter := prometheus.NewCounter(prometheus.CounterOpts{ Name: "i6000pusherPushTaskTaskRunCounter", Help: "Counter of push task run events", }) topoChanWait := prometheus.NewCounter(prometheus.CounterOpts{ Name: "i6000pusherPushTaskTopoChanWait", Help: "写入topo channel等待的毫秒数", }) metricChanWait := prometheus.NewCounter(prometheus.CounterOpts{ Name: "i6000pusherPushTaskMetricChanWait", Help: "写入metric channel等待的秒数", }) reg.MustRegister(taskRunCounter) reg.MustRegister(topoChanWait) reg.MustRegister(metricChanWait) return &PushTask{runInterval: pushTaskCfg.RunInterval(), pushLatencyDuration: pushTaskCfg.LatencyDuration(), debugQueryDuration: pushTaskCfg.GetDebugQueryDuration(), agg: agg, logger: log, chTopoData: chTopoData, chMetric: ch, serviceName2Endpoints: sn2Ep, taskRunCounter: taskRunCounter, topoChanWait: topoChanWait, metricChanWait: metricChanWait, includeApps: includeApps, } } func (pt *PushTask) Start(ctx context.Context) { lastEndTime := time.Now().Add(-pt.pushLatencyDuration) tk := time.NewTicker(pt.runInterval) forLoop: for { select { case <-ctx.Done(): pt.logger.Infof("context done, quit") break forLoop case <-tk.C: du := pt.runInterval if pt.debugQueryDuration > 0 { du = pt.debugQueryDuration } thisTimeEnd := lastEndTime.Add(du) go pt.runTask(lastEndTime, thisTimeEnd) lastEndTime = thisTimeEnd } } } func (pt *PushTask) sendTraceData(appAlias string, start, end time.Time) error { traceTopoDataList, errLoadBranch := pt.agg.LoadTraceData(start, end, appAlias) if errLoadBranch != nil { pt.logger.WithError(errLoadBranch).Errorf("load topo data for appAlias: %s failed", appAlias) return errLoadBranch } writeBefore := time.Now() for _, topoData := range traceTopoDataList { pt.chTopoData <- topoData } pt.topoChanWait.Add(float64(time.Now().Sub(writeBefore).Milliseconds())) return nil } func (pt *PushTask) sendMetricsData(appAlias string, start, end time.Time) error { sds, errLoadServiceMetric := pt.agg.LoadMetricData(start, end, appAlias) if errLoadServiceMetric != nil { pt.logger.WithError(errLoadServiceMetric).Errorf("load metric data for appAlias:%s failed", appAlias) return errLoadServiceMetric } metricWriteBefore := time.Now() for _, sd := range sds { pt.chMetric <- sd pt.logger.WithField("metricData", sd).Debugf("pushMetricDatas") } pt.metricChanWait.Add(float64(time.Now().Sub(metricWriteBefore).Milliseconds())) return nil } func (pt *PushTask) runTask(start time.Time, end time.Time) { pt.logger. WithField("start", start). WithField("end", end). Infof("run task begin") pt.taskRunCounter.Inc() pt.logger.Infof("start run task: time range:[%s, %s]", start.Format("15:04:05"), end.Format("15:04:05")) runBefore := time.Now() var wg sync.WaitGroup for _, appAlias := range pt.includeApps { aa := appAlias wg.Add(1) go func() { defer wg.Done() if err := pt.sendTraceData(aa, start, end); err != nil { pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send trace data failed") } }() } for _, appAlias := range pt.includeApps { aa := appAlias wg.Add(1) go func() { defer wg.Done() if err := pt.sendMetricsData(aa, start, end); err != nil { pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send metric data failed") } }() } wg.Wait() pt.logger. WithField("start", start). WithField("costSeconds", time.Now().Sub(runBefore).Seconds()). WithField("end", end). Infof("run task done") }