123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- package main
- import (
- "context"
- "git.cestong.com.cn/cecf/i6000pusher/pkg/pb"
- "github.com/golang/protobuf/proto"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/segmentio/kafka-go"
- "github.com/sirupsen/logrus"
- "strconv"
- "time"
- )
- type TraceKafkaSink struct {
- topoCh chan TopoData
- metricChan chan ServiceMetricData
- parallelism int
- logger *logrus.Entry
- kafkaBrokers []string
- topoCounter *prometheus.CounterVec
- metricCounter *prometheus.CounterVec
- kafkaTopic string
- rmqTopoTopic string
- rmqTopoGroupID string
- rmqMetricTopic string
- rmqMetricGroupID string
- }
- func NewTraceKafkaSink(traceCh chan TopoData, metricChan chan ServiceMetricData, parallelism int,
- logger *logrus.Logger, kafkaBrokers []string, kafkaTopic, topoTopic, topoGroupID,
- metricTopic, metricGroupID string, reg *prometheus.Registry) *TraceKafkaSink {
- log := logger.WithField("name", "TraceKafkaSink")
- topoCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: "i6000pusherTopoPushCounter",
- Help: "推送给i6000Kafka的拓扑数据数量",
- }, []string{
- "txServiceName", "txEndpoint", "srcServiceName", "srcEndpoint", "destServiceName", "destEndpoint",
- })
- metricCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: "i6000pusherMetricPushCounter",
- Help: "推送给i6000Kafka的指标数据数量",
- }, []string{"serviceName", "instance", "endpoint"})
- reg.MustRegister(topoCounter)
- reg.MustRegister(metricCounter)
- return &TraceKafkaSink{
- topoCh: traceCh,
- metricChan: metricChan,
- parallelism: parallelism, logger: log,
- kafkaBrokers: kafkaBrokers,
- topoCounter: topoCounter,
- metricCounter: metricCounter,
- kafkaTopic: kafkaTopic,
- rmqTopoTopic: topoTopic,
- rmqTopoGroupID: topoGroupID,
- rmqMetricTopic: metricTopic,
- rmqMetricGroupID: metricGroupID,
- }
- }
- func (s *TraceKafkaSink) start(ctx context.Context) {
- for i := 0; i < s.parallelism; i++ {
- go s.consumeRoutine(ctx)
- }
- s.logger.Infof("started, routine:%d", s.parallelism)
- }
- func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRecvMetricRequestBody_Resource {
- success := true
- metricNameService := "ApmService"
- metricNameEndpointName := "APMEndpointName"
- metricNameEndpointType := "APMEndpointType"
- metricNameCPM := "APMServiceCPM"
- metricNameCallCount := "APMServiceCallCount"
- metricNameFailCount := "APMServiceFailCount"
- metricNameExceptionCount := "APMServiceExceptionCount"
- metricNameAvgResponseTime := "APMServiceAvgResponseTime"
- metricNameApdex := "APMServiceApdex"
- metricNameSlowCount := "APMServiceSlowCount"
- metricNameStatementCount := "SlowStatmentCount"
- metricEndpointType := "APPIF"
- endpointResourceTypeCode := "ApmEndpoint"
- secZoneId := "40003102"
- serviceMetricNameCode := "APMServiceCode"
- serviceMetricNameSecZoneID := "SecZoneId"
- serviceMetricNameApdex := "APMServiceApdex"
- serviceMetricNameCPM := "APMServiceCPM"
- serviceMetricNameCallCount := "APMServiceCallCount"
- serviceMetricNameFailCount := "APMServiceFailCount"
- serviceMetricNameExceptionCount := "APMServiceExceptionCount"
- serviceMetricNameAvgResponseTime := "APMServiceAvgResponseTime"
- serviceInstanceNameInstanceID := "APMServiceInstId"
- serviceInstanceNameInstanceIP := "APMServiceInstIp"
- serviceInstanceNameCPM := "APMServiceCPM"
- serviceInstanceNameCallCount := "APMServiceCallCount"
- serviceInstanceNameFailCount := "APMServiceFailCount"
- serviceInstanceNameExceptionCount := "APMServiceExceptionCount"
- serviceInstanceNameAvgResponseTime := "APMServiceAvgResponseTime"
- serviceInstanceNameInstanceResourceType := "ApmServiceInst"
- serviceMetricResource := pb.NmsRecvMetricRequestBody_Resource{
- CmdbId: &data.CmdbID,
- Success: &success,
- }
- serviceApdex := strconv.FormatFloat(data.GetApdex(), 'f', 2, 64)
- serviceCPM := strconv.FormatInt(data.GetCPM(), 10)
- serviceCallCount := strconv.FormatInt(data.GetCallCount(), 10)
- serviceFailCount := strconv.FormatInt(data.GetFailCount(), 10)
- serviceExceptionCount := strconv.FormatInt(data.GetExceptionCount(), 10)
- serviceAvgResponseTime := strconv.FormatInt(data.GetAvgResponseTimeMS(), 10)
- serviceMetricResource.Metric = append(serviceMetricResource.Metric,
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameCode,
- Success: &success,
- MetricValue: &data.ServiceName,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameSecZoneID,
- Success: &success,
- MetricValue: &secZoneId,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameApdex,
- Success: &success,
- MetricValue: &serviceApdex,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameCPM,
- Success: &success,
- MetricValue: &serviceCPM,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameCallCount,
- Success: &success,
- MetricValue: &serviceCallCount,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameFailCount,
- Success: &success,
- MetricValue: &serviceFailCount,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameExceptionCount,
- Success: &success,
- MetricValue: &serviceExceptionCount,
- },
- &pb.NmsRecvMetricRequestBody_Metric{
- MetricCode: &serviceMetricNameAvgResponseTime,
- Success: &success,
- MetricValue: &serviceAvgResponseTime,
- },
- )
- for _, instanceMetric := range data.InstanceMetrics {
- serviceInstanceResource := pb.NmsRecvMetricRequestBody_Resource{
- CmdbId: &instanceMetric.InstanceID,
- Success: &success,
- }
- instanceCPM := strconv.FormatInt(instanceMetric.GetCPM(), 10)
- instanceCallCount := strconv.FormatInt(instanceMetric.GetCallCount(), 10)
- instanceFailedCount := strconv.FormatInt(instanceMetric.GetFailCount(), 10)
- instanceExceptionCount := strconv.FormatInt(instanceMetric.GetExceptionCount(), 10)
- instanceAvgResponseTime := strconv.FormatInt(instanceMetric.GetAvgResponseTimeMS(), 10)
- serviceInstanceResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{
- {
- MetricCode: &serviceInstanceNameInstanceID,
- Success: &success,
- MetricValue: &instanceMetric.InstanceID,
- },
- {
- MetricCode: &serviceInstanceNameInstanceIP,
- Success: &success,
- MetricValue: &instanceMetric.InstanceID,
- },
- {
- MetricCode: &serviceInstanceNameCPM,
- Success: &success,
- MetricValue: &instanceCPM,
- },
- {
- MetricCode: &serviceInstanceNameCallCount,
- Success: &success,
- MetricValue: &instanceCallCount,
- },
- {
- MetricCode: &serviceInstanceNameFailCount,
- Success: &success,
- MetricValue: &instanceFailedCount,
- },
- {
- MetricCode: &serviceInstanceNameExceptionCount,
- Success: &success,
- MetricValue: &instanceExceptionCount,
- },
- {
- MetricCode: &serviceInstanceNameAvgResponseTime,
- Success: &success,
- MetricValue: &instanceAvgResponseTime,
- },
- }
- serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType,
- &pb.NmsRecvMetricRequestBody_ResourceType{
- ResourceTypeCode: &serviceInstanceNameInstanceResourceType,
- Success: &success,
- Resource: []*pb.NmsRecvMetricRequestBody_Resource{&serviceInstanceResource},
- })
- for _, endPointMetrics := range instanceMetric.EndpointMetrics {
- endPointResource := pb.NmsRecvMetricRequestBody_Resource{
- CmdbId: &data.CmdbID,
- Success: &success,
- }
- mt := endPointMetrics.Metrics
- epCPM := strconv.FormatInt(mt.GetCallPerMinute(), 10)
- cpCallCount := strconv.FormatInt(mt.GetCallCount(), 10)
- epFailedCount := strconv.FormatInt(mt.GetFailedCount(), 10)
- epExceptionCount := strconv.FormatInt(mt.GetExceptionCount(), 10)
- epAvgResponseTime := strconv.FormatInt(mt.GetAvgResponseTimeMS(), 10)
- epApdex := strconv.FormatFloat(mt.GetApdex(), 'f', 2, 64)
- epSlowCount := strconv.FormatInt(mt.GetSlowCount(), 10)
- epStatementCount := strconv.FormatInt(mt.GetStatementCount(), 10)
- endPointResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{
- {
- MetricCode: &metricNameEndpointName,
- Success: &success,
- MetricValue: &endPointMetrics.Endpoint,
- },
- {
- MetricCode: &metricNameEndpointType,
- Success: &success,
- MetricValue: &metricEndpointType,
- },
- {
- MetricCode: &metricNameCPM,
- Success: &success,
- MetricValue: &epCPM,
- },
- {
- MetricCode: &metricNameCallCount,
- Success: &success,
- MetricValue: &cpCallCount,
- },
- {
- MetricCode: &metricNameFailCount,
- Success: &success,
- MetricValue: &epFailedCount,
- },
- {
- MetricCode: &metricNameExceptionCount,
- Success: &success,
- MetricValue: &epExceptionCount,
- },
- {
- MetricCode: &metricNameAvgResponseTime,
- Success: &success,
- MetricValue: &epAvgResponseTime,
- },
- {
- MetricCode: &metricNameApdex,
- Success: &success,
- MetricValue: &epApdex,
- },
- {
- MetricCode: &metricNameSlowCount,
- Success: &success,
- MetricValue: &epSlowCount,
- },
- {
- MetricCode: &metricNameStatementCount,
- Success: &success,
- MetricValue: &epStatementCount,
- },
- }
- endpointResourceType := pb.NmsRecvMetricRequestBody_ResourceType{
- ResourceTypeCode: &endpointResourceTypeCode,
- Success: &success,
- }
- endpointResourceType.Resource = []*pb.NmsRecvMetricRequestBody_Resource{&endPointResource}
- serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType, &endpointResourceType)
- }
- }
- appResourceType := pb.NmsRecvMetricRequestBody_ResourceType{
- ResourceTypeCode: &metricNameService,
- Success: &success,
- }
- appResourceType.Resource = append(appResourceType.Resource, &serviceMetricResource)
- appResource := pb.NmsRecvMetricRequestBody_Resource{
- CmdbId: &data.CmdbID,
- Success: &success,
- SubResourceType: []*pb.NmsRecvMetricRequestBody_ResourceType{&appResourceType},
- }
- return &appResource
- }
- func (s *TraceKafkaSink) sendMetric(metricData ServiceMetricData, writer *kafka.Writer) {
- metricCls := "PERF"
- version := "1.0.0"
- serviceCode := "apm"
- userID := ""
- bgID := ""
- userToken := ""
- loc, _ := time.LoadLocation("Asia/Shanghai")
- dtStr := time.Now().In(loc).Format("2006-01-02 15:04:05")
- intervalUnitStr := "MI"
- intervalValue := int32(5)
- requestBody := pb.NmsRecvMetricRequestBody{
- MetricCls: &metricCls,
- DataTime: &dtStr,
- IntervalUnit: &intervalUnitStr,
- IntervalValue: &intervalValue,
- ResourceType: nil,
- }
- success := true
- resourceTypeCode := "App"
- serviceResource := s.parseMetricResources(metricData)
- resourceType := pb.NmsRecvMetricRequestBody_ResourceType{
- ResourceTypeCode: &resourceTypeCode,
- Success: &success,
- }
- resourceType.Resource = append(resourceType.Resource, serviceResource)
- requestBody.ResourceType = append(requestBody.ResourceType, &resourceType)
- head := pb.NmsMonMsRequestHead{
- Version: &version,
- ServiceCode: &serviceCode,
- UserId: &userID,
- BgId: &bgID,
- UserToken: &userToken,
- }
- nmsRequest := pb.NmsRecvMetricRequest{
- Head: &head,
- Body: &requestBody,
- }
- instanceCount := len(metricData.InstanceMetrics)
- for _, instanceMetric := range metricData.InstanceMetrics {
- for _, endpointMetric := range instanceMetric.EndpointMetrics {
- s.metricCounter.With(prometheus.Labels{
- "serviceName": metricData.ServiceName,
- "instance": instanceMetric.InstanceID,
- "endpoint": endpointMetric.Endpoint,
- }).Inc()
- }
- }
- s.logger.Infof("[i6000]: [发送] [metric], 时间: %s 包含ServiceMetricResource:serviceName:%s, "+
- "instanceCount%d: endpoints:%s", dtStr, metricData.ServiceName, instanceCount, metricData.GetEndPointsStr())
- bs, errMarshal := proto.Marshal(&nmsRequest)
- if errMarshal != nil {
- s.logger.Errorf("marshal nmsReceiveMetric failed: %v", errMarshal)
- }
- headers := []kafka.Header{
- {
- Key: "groupID",
- Value: []byte(s.rmqMetricGroupID),
- },
- {
- Key: "topic",
- Value: []byte(s.rmqMetricTopic),
- },
- }
- msg := kafka.Message{
- Value: bs,
- Headers: headers,
- }
- if errWrite := writer.WriteMessages(context.Background(), msg); errWrite != nil {
- s.logger.Errorf("write messages failed: %v", errWrite)
- }
- }
- func (s *TraceKafkaSink) sendTrace(topoData TopoData, wr *kafka.Writer) {
- dt := time.Now().Format("2006-01-02 15:04:05")
- s.topoCounter.With(prometheus.Labels{
- "txServiceName": topoData.TxServiceName,
- "txEndpoint": topoData.TxEndpoint,
- "srcServiceName": topoData.SrcServiceName,
- "srcEndpoint": topoData.SrcEndpoint,
- "destServiceName": topoData.DestServiceName,
- "destEndpoint": topoData.DestEndpoint,
- }).Inc()
- s.logger.Infof("[i6000]: [发送] [topo], 包含topo:,%s,%s,%s,%s,%s,%s,%s", dt,
- topoData.TxServiceName, topoData.TxEndpoint, topoData.SrcServiceName, topoData.SrcEndpoint,
- topoData.DestServiceName, topoData.DestEndpoint)
- intervalMinuteUnit := "MI"
- intervalMinute := int32(5)
- destNodeType := pb.NodeType_SERVICE
- callCount := strconv.FormatInt(topoData.RequestCount, 10)
- failCount := strconv.FormatInt(topoData.FailedCount, 10)
- avgResponseTime := strconv.FormatInt(topoData.AvgResponseTime, 10)
- txLink := pb.ApmTxLink{
- DataTime: &dt,
- IntervalUnit: &intervalMinuteUnit,
- IntervalValue: &intervalMinute,
- TxEndPointName: &topoData.TxEndpoint,
- TxServiceCode: &topoData.TxServiceName,
- TxCmdbId: &topoData.TxCmdbID,
- SrcApmEndpointName: &topoData.SrcEndpoint,
- SrcApmServiceCode: &topoData.SrcServiceName,
- SrcAppCmdbId: &topoData.SrcCmdbID,
- DestNodeType: &destNodeType,
- DestApmEndpointName: &topoData.DestEndpoint,
- DestApmServiceCode: &topoData.DestServiceName,
- DestAppCmdbId: &topoData.DestCmdbID,
- APMServiceCallCount: &callCount,
- APMServiceErrorCount: &failCount,
- APMServiceAvgResponseTime: &avgResponseTime,
- }
- linkType := pb.LinkMessage_tx
- linkMessage := pb.LinkMessage{
- ApmTxLink: &txLink,
- LinkType: &linkType,
- }
- bs, errMarshal := proto.Marshal(&linkMessage)
- if errMarshal != nil {
- s.logger.Errorf("marshal flat trace failed: %v", errMarshal)
- }
- headers := []kafka.Header{
- {
- Key: "groupID",
- Value: []byte(s.rmqTopoGroupID),
- },
- {
- Key: "topic",
- Value: []byte(s.rmqTopoTopic),
- },
- }
- msg := kafka.Message{
- Value: bs,
- Headers: headers,
- }
- if errWrite := wr.WriteMessages(context.Background(), msg); errWrite != nil {
- s.logger.Errorf("write messages failed: %v", errWrite)
- }
- }
- func (s *TraceKafkaSink) consumeRoutine(ctx context.Context) {
- wr := kafka.Writer{
- Addr: kafka.TCP(s.kafkaBrokers...),
- Topic: s.kafkaTopic,
- Balancer: &kafka.LeastBytes{},
- WriteTimeout: 10 * time.Second,
- BatchSize: 100,
- }
- loop:
- for {
- select {
- case <-ctx.Done():
- break loop
- case trace := <-s.topoCh:
- s.sendTrace(trace, &wr)
- case metricData := <-s.metricChan:
- s.sendMetric(metricData, &wr)
- }
- }
- s.logger.Infof("stop consume routine")
- }
|