liubing 6 days ago
parent
commit
b823f3daa5

+ 4 - 4
cmd/i6000pusher/aggregator.go

@@ -100,7 +100,7 @@ WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varP
 		var apdexGoodRequestCount uint64
 		var apdexFairRequestCount uint64
 		if errScan := spanRows.Scan(&serviceName, &instance, &em.Endpoint, &failedCount, &requestCount,
-			&em.Metrics.responseTimeSum, &exceptionCount, &apdexGoodRequestCount,
+			&em.Metrics.responseTimeSumNS, &exceptionCount, &apdexGoodRequestCount,
 			&apdexFairRequestCount); errScan != nil {
 			a.logger.WithError(errScan).Error("failed to scan metrics")
 			a.sqlQueryErrorCounter.Inc()
@@ -117,7 +117,7 @@ WITH '\\b[a-fA-F0-9]{32,}\\b|\\b[a-fA-F0-9\\-]{36}\\b|\\b\\[1-9]\\d*\\b' AS varP
 			"instance":       instance,
 			"startTime":      start,
 			"endTime":        end,
-		}).Infof("Query from clickhouse")
+		}).Debugf("Query metric from clickhouse")
 		a.endpointToLoadCounter.With(
 			prometheus.Labels{
 				"service":  serviceName,
@@ -167,7 +167,7 @@ func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]Top
     select distinct TraceId from otel.otel_traces_url where 
 		Timestamp > ? and Timestamp < ? and 
 		AppAlias = ? and Route is not null and 
-		Route != '' limit 50
+		Route != ''
 		`
 	traceRows, errQueryTraceId := a.chCli.Query(ctx, traceIdSql, start, end, appAlias,
 		start, end, appAlias)
@@ -190,7 +190,7 @@ func (a *Aggregator) LoadTraceData(start, end time.Time, appAlias string) ([]Top
 		return []TopoData{}, nil
 	}
 	topoData := make([]TopoData, 0, len(traceIds)*10)
-	traceBatchSize := 30
+	traceBatchSize := 50
 	timePadding := 10 * time.Minute
 	for i := 0; i < len(traceIds); i += traceBatchSize {
 		upperBound := i + traceBatchSize

+ 26 - 7
cmd/i6000pusher/chspan.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"strings"
 	"time"
 )
 
@@ -33,7 +34,7 @@ type MetricsMeters struct {
 	failedCount           int64
 	exceptionCount        int64
 	aggDuration           time.Duration
-	responseTimeSum       int64
+	responseTimeSumNS     int64
 	apdexGoodRequestCount int64
 	apdexFairRequestCount int64
 }
@@ -58,11 +59,11 @@ func (m MetricsMeters) GetExceptionCount() int64 {
 	return m.exceptionCount
 }
 
-func (m MetricsMeters) GetAvgResponseTime() int64 {
+func (m MetricsMeters) GetAvgResponseTimeMS() int64 {
 	if m.requestCount == 0 {
 		return 0
 	}
-	return m.responseTimeSum / m.requestCount
+	return (m.responseTimeSumNS / m.requestCount) / 1e6
 }
 
 func (m MetricsMeters) GetApdex() float64 {
@@ -127,14 +128,14 @@ func (m ServiceInstanceMetric) GetExceptionCount() int64 {
 	return cc
 }
 
-func (m ServiceInstanceMetric) GetAvgResponseTime() int64 {
+func (m ServiceInstanceMetric) GetAvgResponseTimeMS() int64 {
 	if len(m.EndpointMetrics) == 0 {
 		return 0
 	}
 	sum := int64(0)
 	count := int64(len(m.EndpointMetrics))
 	for _, ep := range m.EndpointMetrics {
-		sum += ep.Metrics.GetAvgResponseTime()
+		sum += ep.Metrics.GetAvgResponseTimeMS()
 	}
 	return sum / count
 }
@@ -193,14 +194,14 @@ func (m ServiceMetricData) GetExceptionCount() int64 {
 	return cc
 }
 
-func (m ServiceMetricData) GetAvgResponseTime() int64 {
+func (m ServiceMetricData) GetAvgResponseTimeMS() int64 {
 	if len(m.InstanceMetrics) == 0 {
 		return 0
 	}
 	sum := int64(0)
 	count := int64(len(m.InstanceMetrics))
 	for _, ep := range m.InstanceMetrics {
-		sum += ep.GetAvgResponseTime()
+		sum += ep.GetAvgResponseTimeMS()
 	}
 	return sum / count
 }
@@ -238,3 +239,21 @@ func (m *ServiceMetricData) AddMetric(instance string, em ServiceInstanceEndpoin
 		EndpointMetrics: []ServiceInstanceEndpointMetrics{em},
 	})
 }
+
+func (m *ServiceMetricData) GetEndPointsStr() string {
+	ma := make(map[string]bool)
+	es := make([]string, 0)
+	for _, instanceMetric := range m.InstanceMetrics {
+		for _, metric := range instanceMetric.EndpointMetrics {
+			_, find := ma[metric.Endpoint]
+			if !find {
+				es = append(es, metric.Endpoint)
+				ma[metric.Endpoint] = true
+			}
+		}
+	}
+	if len(es) == 0 {
+		return ""
+	}
+	return strings.Join(es, ",")
+}

+ 60 - 28
cmd/i6000pusher/pushtask.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
+	"sync"
 	"time"
 )
 
@@ -66,49 +67,80 @@ forLoop:
 			pt.logger.Infof("context done, quit")
 			break forLoop
 		case <-tk.C:
-			lastRun := time.Now()
 			du := pt.runInterval
 			if pt.debugQueryDuration > 0 {
 				du = pt.debugQueryDuration
 			}
 			thisTimeEnd := lastEndTime.Add(du)
-			if errRun := pt.runTask(lastEndTime, thisTimeEnd); errRun != nil {
-				pt.logger.WithTime(lastRun).WithError(errRun).Error("run task failed")
-				continue
-			}
+			go pt.runTask(lastEndTime, thisTimeEnd)
 			lastEndTime = thisTimeEnd
 		}
 	}
 
 }
 
-func (pt *PushTask) runTask(start time.Time, end time.Time) error {
+func (pt *PushTask) sendTraceData(appAlias string, start, end time.Time) error {
+	traceTopoDataList, errLoadBranch := pt.agg.LoadTraceData(start, end, appAlias)
+	if errLoadBranch != nil {
+		pt.logger.WithError(errLoadBranch).Errorf("load topo data for appAlias: %s failed", appAlias)
+		return errLoadBranch
+	}
+	writeBefore := time.Now()
+	for _, topoData := range traceTopoDataList {
+		pt.chTopoData <- topoData
+	}
+	pt.topoChanWait.Add(float64(time.Now().Sub(writeBefore).Milliseconds()))
+	return nil
+}
+
+func (pt *PushTask) sendMetricsData(appAlias string, start, end time.Time) error {
+	sds, errLoadServiceMetric := pt.agg.LoadMetricData(start, end, appAlias)
+	if errLoadServiceMetric != nil {
+		pt.logger.WithError(errLoadServiceMetric).Errorf("load metric data for appAlias:%s failed", appAlias)
+		return errLoadServiceMetric
+	}
+	metricWriteBefore := time.Now()
+	for _, sd := range sds {
+		pt.chMetric <- sd
+		pt.logger.WithField("metricData", sd).Debugf("pushMetricDatas")
+	}
+	pt.metricChanWait.Add(float64(time.Now().Sub(metricWriteBefore).Milliseconds()))
+	return nil
+}
+
+func (pt *PushTask) runTask(start time.Time, end time.Time) {
+	pt.logger.
+		WithField("start", start).
+		WithField("end", end).
+		Infof("run task begin")
 	pt.taskRunCounter.Inc()
 	pt.logger.Infof("start run task: time range:[%s, %s]", start.Format("15:04:05"), end.Format("15:04:05"))
+	runBefore := time.Now()
+	var wg sync.WaitGroup
 	for _, appAlias := range pt.includeApps {
-		traceTopoDataList, errLoadBranch := pt.agg.LoadTraceData(start, end, appAlias)
-		if errLoadBranch != nil {
-			pt.logger.WithError(errLoadBranch).Errorf("load topo data for appAlias: %s failed", appAlias)
-			continue
-		}
-		writeBefore := time.Now()
-		for _, topoData := range traceTopoDataList {
-			pt.chTopoData <- topoData
-		}
-		pt.topoChanWait.Add(float64(time.Now().Sub(writeBefore).Milliseconds()))
+		aa := appAlias
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			if err := pt.sendTraceData(aa, start, end); err != nil {
+				pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send trace data failed")
+			}
+		}()
 	}
 	for _, appAlias := range pt.includeApps {
-		sds, errLoadServiceMetric := pt.agg.LoadMetricData(start, end, appAlias)
-		if errLoadServiceMetric != nil {
-			pt.logger.WithError(errLoadServiceMetric).Errorf("load metric data for appAlias:%s failed", appAlias)
-			continue
-		}
-		metricWriteBefore := time.Now()
-		for _, sd := range sds {
-			pt.chMetric <- sd
-			pt.logger.WithField("metricData", sd).Infof("pushMetricDatas")
-		}
-		pt.metricChanWait.Add(float64(time.Now().Sub(metricWriteBefore).Milliseconds()))
+		aa := appAlias
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			if err := pt.sendMetricsData(aa, start, end); err != nil {
+				pt.logger.WithError(err).WithField("appAlias", aa).Errorf("send metric data failed")
+			}
+		}()
 	}
-	return nil
+	wg.Wait()
+	pt.logger.
+		WithField("start", start).
+		WithField("costSeconds", time.Now().Sub(runBefore).Seconds()).
+		WithField("end", end).
+		Infof("run task done")
 }

+ 7 - 6
cmd/i6000pusher/trace_kafka_sink.go

@@ -111,7 +111,7 @@ func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRec
 	serviceCallCount := strconv.FormatInt(data.GetCallCount(), 10)
 	serviceFailCount := strconv.FormatInt(data.GetFailCount(), 10)
 	serviceExceptionCount := strconv.FormatInt(data.GetExceptionCount(), 10)
-	serviceAvgResponseTime := strconv.FormatInt(data.GetAvgResponseTime(), 10)
+	serviceAvgResponseTime := strconv.FormatInt(data.GetAvgResponseTimeMS(), 10)
 	serviceMetricResource.Metric = append(serviceMetricResource.Metric,
 		&pb.NmsRecvMetricRequestBody_Metric{
 			MetricCode:  &serviceMetricNameCode,
@@ -164,7 +164,7 @@ func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRec
 		instanceCallCount := strconv.FormatInt(instanceMetric.GetCallCount(), 10)
 		instanceFailedCount := strconv.FormatInt(instanceMetric.GetFailCount(), 10)
 		instanceExceptionCount := strconv.FormatInt(instanceMetric.GetExceptionCount(), 10)
-		instanceAvgResponseTime := strconv.FormatInt(instanceMetric.GetAvgResponseTime(), 10)
+		instanceAvgResponseTime := strconv.FormatInt(instanceMetric.GetAvgResponseTimeMS(), 10)
 		serviceInstanceResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{
 			{
 				MetricCode:  &serviceInstanceNameInstanceID,
@@ -219,7 +219,7 @@ func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRec
 			cpCallCount := strconv.FormatInt(mt.GetCallCount(), 10)
 			epFailedCount := strconv.FormatInt(mt.GetFailedCount(), 10)
 			epExceptionCount := strconv.FormatInt(mt.GetExceptionCount(), 10)
-			epAvgResponseTime := strconv.FormatInt(mt.GetAvgResponseTime(), 10)
+			epAvgResponseTime := strconv.FormatInt(mt.GetAvgResponseTimeMS(), 10)
 			epApdex := strconv.FormatFloat(mt.GetApdex(), 'f', 2, 64)
 			epSlowCount := strconv.FormatInt(mt.GetSlowCount(), 10)
 			epStatementCount := strconv.FormatInt(mt.GetStatementCount(), 10)
@@ -336,7 +336,6 @@ func (s *TraceKafkaSink) sendMetric(metricData ServiceMetricData, writer *kafka.
 		Body: &requestBody,
 	}
 	instanceCount := len(metricData.InstanceMetrics)
-	endpointCount := metricData.GetEndPointCount()
 	for _, instanceMetric := range metricData.InstanceMetrics {
 		for _, endpointMetric := range instanceMetric.EndpointMetrics {
 			s.metricCounter.With(prometheus.Labels{
@@ -347,7 +346,7 @@ func (s *TraceKafkaSink) sendMetric(metricData ServiceMetricData, writer *kafka.
 		}
 	}
 	s.logger.Infof("[i6000]: [发送] [metric], 时间: %s 包含ServiceMetricResource:serviceName:%s, "+
-		"instanceCount%d: endpointCount:%d", dtStr, metricData.ServiceName, instanceCount, endpointCount)
+		"instanceCount%d: endpoints:%s", dtStr, metricData.ServiceName, instanceCount, metricData.GetEndPointsStr())
 	bs, errMarshal := proto.Marshal(&nmsRequest)
 	if errMarshal != nil {
 		s.logger.Errorf("marshal nmsReceiveMetric failed: %v", errMarshal)
@@ -381,7 +380,9 @@ func (s *TraceKafkaSink) sendTrace(topoData TopoData, wr *kafka.Writer) {
 		"destServiceName": topoData.DestServiceName,
 		"destEndpoint":    topoData.DestEndpoint,
 	}).Inc()
-	s.logger.Infof("[i6000]: [发送] [topo], 时间: %s 包含topo:%+v", dt, topoData)
+	s.logger.Infof("[i6000]: [发送] [topo], 包含topo:,%s,%s,%s,%s,%s,%s,%s", dt,
+		topoData.TxServiceName, topoData.TxEndpoint, topoData.SrcServiceName, topoData.SrcEndpoint,
+		topoData.DestServiceName, topoData.DestEndpoint)
 	intervalMinuteUnit := "MI"
 	intervalMinute := int32(5)
 	destNodeType := pb.NodeType_SERVICE