package main import ( "context" "git.cestong.com.cn/cecf/i6000pusher/pkg/pb" "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" "strconv" "time" ) type TraceKafkaSink struct { topoCh chan TopoData metricChan chan ServiceMetricData parallelism int logger *logrus.Entry kafkaBrokers []string topoCounter *prometheus.CounterVec metricCounter *prometheus.CounterVec kafkaTopic string rmqTopoTopic string rmqTopoGroupID string rmqMetricTopic string rmqMetricGroupID string } func NewTraceKafkaSink(traceCh chan TopoData, metricChan chan ServiceMetricData, parallelism int, logger *logrus.Logger, kafkaBrokers []string, kafkaTopic, topoTopic, topoGroupID, metricTopic, metricGroupID string, reg *prometheus.Registry) *TraceKafkaSink { log := logger.WithField("name", "TraceKafkaSink") topoCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "i6000pusherTopoPushCounter", Help: "推送给i6000Kafka的拓扑数据数量", }, []string{ "txServiceName", "txEndpoint", "srcServiceName", "srcEndpoint", "destServiceName", "destEndpoint", }) metricCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "i6000pusherMetricPushCounter", Help: "推送给i6000Kafka的指标数据数量", }, []string{"serviceName", "instance", "endpoint"}) reg.MustRegister(topoCounter) reg.MustRegister(metricCounter) return &TraceKafkaSink{ topoCh: traceCh, metricChan: metricChan, parallelism: parallelism, logger: log, kafkaBrokers: kafkaBrokers, topoCounter: topoCounter, metricCounter: metricCounter, kafkaTopic: kafkaTopic, rmqTopoTopic: topoTopic, rmqTopoGroupID: topoGroupID, rmqMetricTopic: metricTopic, rmqMetricGroupID: metricGroupID, } } func (s *TraceKafkaSink) start(ctx context.Context) { for i := 0; i < s.parallelism; i++ { go s.consumeRoutine(ctx) } s.logger.Infof("started, routine:%d", s.parallelism) } func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRecvMetricRequestBody_Resource { success := true metricNameService := "ApmService" metricNameEndpointName := "APMEndpointName" metricNameEndpointType := "APMEndpointType" metricNameCPM := "APMServiceCPM" metricNameCallCount := "APMServiceCallCount" metricNameFailCount := "APMServiceFailCount" metricNameExceptionCount := "APMServiceExceptionCount" metricNameAvgResponseTime := "APMServiceAvgResponseTime" metricNameApdex := "APMServiceApdex" metricNameSlowCount := "APMServiceSlowCount" metricNameStatementCount := "SlowStatmentCount" metricEndpointType := "APPIF" endpointResourceTypeCode := "ApmEndpoint" secZoneId := "40003102" serviceMetricNameCode := "APMServiceCode" serviceMetricNameSecZoneID := "SecZoneId" serviceMetricNameApdex := "APMServiceApdex" serviceMetricNameCPM := "APMServiceCPM" serviceMetricNameCallCount := "APMServiceCallCount" serviceMetricNameFailCount := "APMServiceFailCount" serviceMetricNameExceptionCount := "APMServiceExceptionCount" serviceMetricNameAvgResponseTime := "APMServiceAvgResponseTime" serviceInstanceNameInstanceID := "APMServiceInstId" serviceInstanceNameInstanceIP := "APMServiceInstIp" serviceInstanceNameCPM := "APMServiceCPM" serviceInstanceNameCallCount := "APMServiceCallCount" serviceInstanceNameFailCount := "APMServiceFailCount" serviceInstanceNameExceptionCount := "APMServiceExceptionCount" serviceInstanceNameAvgResponseTime := "APMServiceAvgResponseTime" serviceInstanceNameInstanceResourceType := "ApmServiceInst" serviceMetricResource := pb.NmsRecvMetricRequestBody_Resource{ CmdbId: &data.CmdbID, Success: &success, } serviceApdex := strconv.FormatFloat(data.GetApdex(), 'f', 2, 64) serviceCPM := strconv.FormatInt(data.GetCPM(), 10) serviceCallCount := strconv.FormatInt(data.GetCallCount(), 10) serviceFailCount := strconv.FormatInt(data.GetFailCount(), 10) serviceExceptionCount := strconv.FormatInt(data.GetExceptionCount(), 10) serviceAvgResponseTime := strconv.FormatInt(data.GetAvgResponseTimeMS(), 10) serviceMetricResource.Metric = append(serviceMetricResource.Metric, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameCode, Success: &success, MetricValue: &data.ServiceName, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameSecZoneID, Success: &success, MetricValue: &secZoneId, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameApdex, Success: &success, MetricValue: &serviceApdex, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameCPM, Success: &success, MetricValue: &serviceCPM, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameCallCount, Success: &success, MetricValue: &serviceCallCount, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameFailCount, Success: &success, MetricValue: &serviceFailCount, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameExceptionCount, Success: &success, MetricValue: &serviceExceptionCount, }, &pb.NmsRecvMetricRequestBody_Metric{ MetricCode: &serviceMetricNameAvgResponseTime, Success: &success, MetricValue: &serviceAvgResponseTime, }, ) for _, instanceMetric := range data.InstanceMetrics { serviceInstanceResource := pb.NmsRecvMetricRequestBody_Resource{ CmdbId: &instanceMetric.InstanceID, Success: &success, } instanceCPM := strconv.FormatInt(instanceMetric.GetCPM(), 10) instanceCallCount := strconv.FormatInt(instanceMetric.GetCallCount(), 10) instanceFailedCount := strconv.FormatInt(instanceMetric.GetFailCount(), 10) instanceExceptionCount := strconv.FormatInt(instanceMetric.GetExceptionCount(), 10) instanceAvgResponseTime := strconv.FormatInt(instanceMetric.GetAvgResponseTimeMS(), 10) serviceInstanceResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{ { MetricCode: &serviceInstanceNameInstanceID, Success: &success, MetricValue: &instanceMetric.InstanceID, }, { MetricCode: &serviceInstanceNameInstanceIP, Success: &success, MetricValue: &instanceMetric.InstanceID, }, { MetricCode: &serviceInstanceNameCPM, Success: &success, MetricValue: &instanceCPM, }, { MetricCode: &serviceInstanceNameCallCount, Success: &success, MetricValue: &instanceCallCount, }, { MetricCode: &serviceInstanceNameFailCount, Success: &success, MetricValue: &instanceFailedCount, }, { MetricCode: &serviceInstanceNameExceptionCount, Success: &success, MetricValue: &instanceExceptionCount, }, { MetricCode: &serviceInstanceNameAvgResponseTime, Success: &success, MetricValue: &instanceAvgResponseTime, }, } serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType, &pb.NmsRecvMetricRequestBody_ResourceType{ ResourceTypeCode: &serviceInstanceNameInstanceResourceType, Success: &success, Resource: []*pb.NmsRecvMetricRequestBody_Resource{&serviceInstanceResource}, }) for _, endPointMetrics := range instanceMetric.EndpointMetrics { endPointResource := pb.NmsRecvMetricRequestBody_Resource{ CmdbId: &data.CmdbID, Success: &success, } mt := endPointMetrics.Metrics epCPM := strconv.FormatInt(mt.GetCallPerMinute(), 10) cpCallCount := strconv.FormatInt(mt.GetCallCount(), 10) epFailedCount := strconv.FormatInt(mt.GetFailedCount(), 10) epExceptionCount := strconv.FormatInt(mt.GetExceptionCount(), 10) epAvgResponseTime := strconv.FormatInt(mt.GetAvgResponseTimeMS(), 10) epApdex := strconv.FormatFloat(mt.GetApdex(), 'f', 2, 64) epSlowCount := strconv.FormatInt(mt.GetSlowCount(), 10) epStatementCount := strconv.FormatInt(mt.GetStatementCount(), 10) endPointResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{ { MetricCode: &metricNameEndpointName, Success: &success, MetricValue: &endPointMetrics.Endpoint, }, { MetricCode: &metricNameEndpointType, Success: &success, MetricValue: &metricEndpointType, }, { MetricCode: &metricNameCPM, Success: &success, MetricValue: &epCPM, }, { MetricCode: &metricNameCallCount, Success: &success, MetricValue: &cpCallCount, }, { MetricCode: &metricNameFailCount, Success: &success, MetricValue: &epFailedCount, }, { MetricCode: &metricNameExceptionCount, Success: &success, MetricValue: &epExceptionCount, }, { MetricCode: &metricNameAvgResponseTime, Success: &success, MetricValue: &epAvgResponseTime, }, { MetricCode: &metricNameApdex, Success: &success, MetricValue: &epApdex, }, { MetricCode: &metricNameSlowCount, Success: &success, MetricValue: &epSlowCount, }, { MetricCode: &metricNameStatementCount, Success: &success, MetricValue: &epStatementCount, }, } endpointResourceType := pb.NmsRecvMetricRequestBody_ResourceType{ ResourceTypeCode: &endpointResourceTypeCode, Success: &success, } endpointResourceType.Resource = []*pb.NmsRecvMetricRequestBody_Resource{&endPointResource} serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType, &endpointResourceType) } } appResourceType := pb.NmsRecvMetricRequestBody_ResourceType{ ResourceTypeCode: &metricNameService, Success: &success, } appResourceType.Resource = append(appResourceType.Resource, &serviceMetricResource) appResource := pb.NmsRecvMetricRequestBody_Resource{ CmdbId: &data.CmdbID, Success: &success, SubResourceType: []*pb.NmsRecvMetricRequestBody_ResourceType{&appResourceType}, } return &appResource } func (s *TraceKafkaSink) sendMetric(metricData ServiceMetricData, writer *kafka.Writer) { metricCls := "PERF" version := "1.0.0" serviceCode := "apm" userID := "" bgID := "" userToken := "" loc, _ := time.LoadLocation("Asia/Shanghai") dtStr := time.Now().In(loc).Format("2006-01-02 15:04:05") intervalUnitStr := "MI" intervalValue := int32(5) requestBody := pb.NmsRecvMetricRequestBody{ MetricCls: &metricCls, DataTime: &dtStr, IntervalUnit: &intervalUnitStr, IntervalValue: &intervalValue, ResourceType: nil, } success := true resourceTypeCode := "App" serviceResource := s.parseMetricResources(metricData) resourceType := pb.NmsRecvMetricRequestBody_ResourceType{ ResourceTypeCode: &resourceTypeCode, Success: &success, } resourceType.Resource = append(resourceType.Resource, serviceResource) requestBody.ResourceType = append(requestBody.ResourceType, &resourceType) head := pb.NmsMonMsRequestHead{ Version: &version, ServiceCode: &serviceCode, UserId: &userID, BgId: &bgID, UserToken: &userToken, } nmsRequest := pb.NmsRecvMetricRequest{ Head: &head, Body: &requestBody, } instanceCount := len(metricData.InstanceMetrics) for _, instanceMetric := range metricData.InstanceMetrics { for _, endpointMetric := range instanceMetric.EndpointMetrics { s.metricCounter.With(prometheus.Labels{ "serviceName": metricData.ServiceName, "instance": instanceMetric.InstanceID, "endpoint": endpointMetric.Endpoint, }).Inc() } } s.logger.Infof("[i6000]: [发送] [metric], 时间: %s 包含ServiceMetricResource:serviceName:%s, "+ "instanceCount%d: endpoints:%s", dtStr, metricData.ServiceName, instanceCount, metricData.GetEndPointsStr()) bs, errMarshal := proto.Marshal(&nmsRequest) if errMarshal != nil { s.logger.Errorf("marshal nmsReceiveMetric failed: %v", errMarshal) } headers := []kafka.Header{ { Key: "groupID", Value: []byte(s.rmqMetricGroupID), }, { Key: "topic", Value: []byte(s.rmqMetricTopic), }, } msg := kafka.Message{ Value: bs, Headers: headers, } if errWrite := writer.WriteMessages(context.Background(), msg); errWrite != nil { s.logger.Errorf("write messages failed: %v", errWrite) } } func (s *TraceKafkaSink) sendTrace(topoData TopoData, wr *kafka.Writer) { dt := time.Now().Format("2006-01-02 15:04:05") s.topoCounter.With(prometheus.Labels{ "txServiceName": topoData.TxServiceName, "txEndpoint": topoData.TxEndpoint, "srcServiceName": topoData.SrcServiceName, "srcEndpoint": topoData.SrcEndpoint, "destServiceName": topoData.DestServiceName, "destEndpoint": topoData.DestEndpoint, }).Inc() s.logger.Infof("[i6000]: [发送] [topo], 包含topo:,%s,%s,%s,%s,%s,%s,%s", dt, topoData.TxServiceName, topoData.TxEndpoint, topoData.SrcServiceName, topoData.SrcEndpoint, topoData.DestServiceName, topoData.DestEndpoint) intervalMinuteUnit := "MI" intervalMinute := int32(5) destNodeType := pb.NodeType_SERVICE callCount := strconv.FormatInt(topoData.RequestCount, 10) failCount := strconv.FormatInt(topoData.FailedCount, 10) avgResponseTime := strconv.FormatInt(topoData.AvgResponseTime, 10) txLink := pb.ApmTxLink{ DataTime: &dt, IntervalUnit: &intervalMinuteUnit, IntervalValue: &intervalMinute, TxEndPointName: &topoData.TxEndpoint, TxServiceCode: &topoData.TxServiceName, TxCmdbId: &topoData.TxCmdbID, SrcApmEndpointName: &topoData.SrcEndpoint, SrcApmServiceCode: &topoData.SrcServiceName, SrcAppCmdbId: &topoData.SrcCmdbID, DestNodeType: &destNodeType, DestApmEndpointName: &topoData.DestEndpoint, DestApmServiceCode: &topoData.DestServiceName, DestAppCmdbId: &topoData.DestCmdbID, APMServiceCallCount: &callCount, APMServiceErrorCount: &failCount, APMServiceAvgResponseTime: &avgResponseTime, } linkType := pb.LinkMessage_tx linkMessage := pb.LinkMessage{ ApmTxLink: &txLink, LinkType: &linkType, } bs, errMarshal := proto.Marshal(&linkMessage) if errMarshal != nil { s.logger.Errorf("marshal flat trace failed: %v", errMarshal) } headers := []kafka.Header{ { Key: "groupID", Value: []byte(s.rmqTopoGroupID), }, { Key: "topic", Value: []byte(s.rmqTopoTopic), }, } msg := kafka.Message{ Value: bs, Headers: headers, } if errWrite := wr.WriteMessages(context.Background(), msg); errWrite != nil { s.logger.Errorf("write messages failed: %v", errWrite) } } func (s *TraceKafkaSink) consumeRoutine(ctx context.Context) { wr := kafka.Writer{ Addr: kafka.TCP(s.kafkaBrokers...), Topic: s.kafkaTopic, Balancer: &kafka.LeastBytes{}, WriteTimeout: 10 * time.Second, BatchSize: 100, } loop: for { select { case <-ctx.Done(): break loop case trace := <-s.topoCh: s.sendTrace(trace, &wr) case metricData := <-s.metricChan: s.sendMetric(metricData, &wr) } } s.logger.Infof("stop consume routine") }