trace_kafka_sink.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package main
  2. import (
  3. "context"
  4. "git.cestong.com.cn/cecf/i6000pusher/pkg/pb"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/prometheus/client_golang/prometheus"
  7. "github.com/segmentio/kafka-go"
  8. "github.com/sirupsen/logrus"
  9. "strconv"
  10. "time"
  11. )
  12. type TraceKafkaSink struct {
  13. topoCh chan TopoData
  14. metricChan chan ServiceMetricData
  15. parallelism int
  16. logger *logrus.Entry
  17. kafkaBrokers []string
  18. topoCounter *prometheus.CounterVec
  19. metricCounter *prometheus.CounterVec
  20. kafkaTopic string
  21. rmqTopoTopic string
  22. rmqTopoGroupID string
  23. rmqMetricTopic string
  24. rmqMetricGroupID string
  25. }
  26. func NewTraceKafkaSink(traceCh chan TopoData, metricChan chan ServiceMetricData, parallelism int,
  27. logger *logrus.Logger, kafkaBrokers []string, kafkaTopic, topoTopic, topoGroupID,
  28. metricTopic, metricGroupID string, reg *prometheus.Registry) *TraceKafkaSink {
  29. log := logger.WithField("name", "TraceKafkaSink")
  30. topoCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
  31. Name: "i6000pusherTopoPushCounter",
  32. Help: "推送给i6000Kafka的拓扑数据数量",
  33. }, []string{
  34. "txServiceName", "txEndpoint", "srcServiceName", "srcEndpoint", "destServiceName", "destEndpoint",
  35. })
  36. metricCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
  37. Name: "i6000pusherMetricPushCounter",
  38. Help: "推送给i6000Kafka的指标数据数量",
  39. }, []string{"serviceName", "instance", "endpoint"})
  40. reg.MustRegister(topoCounter)
  41. reg.MustRegister(metricCounter)
  42. return &TraceKafkaSink{
  43. topoCh: traceCh,
  44. metricChan: metricChan,
  45. parallelism: parallelism, logger: log,
  46. kafkaBrokers: kafkaBrokers,
  47. topoCounter: topoCounter,
  48. metricCounter: metricCounter,
  49. kafkaTopic: kafkaTopic,
  50. rmqTopoTopic: topoTopic,
  51. rmqTopoGroupID: topoGroupID,
  52. rmqMetricTopic: metricTopic,
  53. rmqMetricGroupID: metricGroupID,
  54. }
  55. }
  56. func (s *TraceKafkaSink) start(ctx context.Context) {
  57. for i := 0; i < s.parallelism; i++ {
  58. go s.consumeRoutine(ctx)
  59. }
  60. s.logger.Infof("started, routine:%d", s.parallelism)
  61. }
  62. func (s *TraceKafkaSink) parseMetricResources(data ServiceMetricData) *pb.NmsRecvMetricRequestBody_Resource {
  63. success := true
  64. metricNameService := "ApmService"
  65. metricNameEndpointName := "APMEndpointName"
  66. metricNameEndpointType := "APMEndpointType"
  67. metricNameCPM := "APMServiceCPM"
  68. metricNameCallCount := "APMServiceCallCount"
  69. metricNameFailCount := "APMServiceFailCount"
  70. metricNameExceptionCount := "APMServiceExceptionCount"
  71. metricNameAvgResponseTime := "APMServiceAvgResponseTime"
  72. metricNameApdex := "APMServiceApdex"
  73. metricNameSlowCount := "APMServiceSlowCount"
  74. metricNameStatementCount := "SlowStatmentCount"
  75. metricEndpointType := "APPIF"
  76. endpointResourceTypeCode := "ApmEndpoint"
  77. secZoneId := "40003102"
  78. serviceMetricNameCode := "APMServiceCode"
  79. serviceMetricNameSecZoneID := "SecZoneId"
  80. serviceMetricNameApdex := "APMServiceApdex"
  81. serviceMetricNameCPM := "APMServiceCPM"
  82. serviceMetricNameCallCount := "APMServiceCallCount"
  83. serviceMetricNameFailCount := "APMServiceFailCount"
  84. serviceMetricNameExceptionCount := "APMServiceExceptionCount"
  85. serviceMetricNameAvgResponseTime := "APMServiceAvgResponseTime"
  86. serviceInstanceNameInstanceID := "APMServiceInstId"
  87. serviceInstanceNameInstanceIP := "APMServiceInstIp"
  88. serviceInstanceNameCPM := "APMServiceCPM"
  89. serviceInstanceNameCallCount := "APMServiceCallCount"
  90. serviceInstanceNameFailCount := "APMServiceFailCount"
  91. serviceInstanceNameExceptionCount := "APMServiceExceptionCount"
  92. serviceInstanceNameAvgResponseTime := "APMServiceAvgResponseTime"
  93. serviceInstanceNameInstanceResourceType := "ApmServiceInst"
  94. serviceMetricResource := pb.NmsRecvMetricRequestBody_Resource{
  95. CmdbId: &data.CmdbID,
  96. Success: &success,
  97. }
  98. serviceApdex := strconv.FormatFloat(data.GetApdex(), 'f', 2, 64)
  99. serviceCPM := strconv.FormatInt(data.GetCPM(), 10)
  100. serviceCallCount := strconv.FormatInt(data.GetCallCount(), 10)
  101. serviceFailCount := strconv.FormatInt(data.GetFailCount(), 10)
  102. serviceExceptionCount := strconv.FormatInt(data.GetExceptionCount(), 10)
  103. serviceAvgResponseTime := strconv.FormatInt(data.GetAvgResponseTimeMS(), 10)
  104. serviceMetricResource.Metric = append(serviceMetricResource.Metric,
  105. &pb.NmsRecvMetricRequestBody_Metric{
  106. MetricCode: &serviceMetricNameCode,
  107. Success: &success,
  108. MetricValue: &data.ServiceName,
  109. },
  110. &pb.NmsRecvMetricRequestBody_Metric{
  111. MetricCode: &serviceMetricNameSecZoneID,
  112. Success: &success,
  113. MetricValue: &secZoneId,
  114. },
  115. &pb.NmsRecvMetricRequestBody_Metric{
  116. MetricCode: &serviceMetricNameApdex,
  117. Success: &success,
  118. MetricValue: &serviceApdex,
  119. },
  120. &pb.NmsRecvMetricRequestBody_Metric{
  121. MetricCode: &serviceMetricNameCPM,
  122. Success: &success,
  123. MetricValue: &serviceCPM,
  124. },
  125. &pb.NmsRecvMetricRequestBody_Metric{
  126. MetricCode: &serviceMetricNameCallCount,
  127. Success: &success,
  128. MetricValue: &serviceCallCount,
  129. },
  130. &pb.NmsRecvMetricRequestBody_Metric{
  131. MetricCode: &serviceMetricNameFailCount,
  132. Success: &success,
  133. MetricValue: &serviceFailCount,
  134. },
  135. &pb.NmsRecvMetricRequestBody_Metric{
  136. MetricCode: &serviceMetricNameExceptionCount,
  137. Success: &success,
  138. MetricValue: &serviceExceptionCount,
  139. },
  140. &pb.NmsRecvMetricRequestBody_Metric{
  141. MetricCode: &serviceMetricNameAvgResponseTime,
  142. Success: &success,
  143. MetricValue: &serviceAvgResponseTime,
  144. },
  145. )
  146. for _, instanceMetric := range data.InstanceMetrics {
  147. serviceInstanceResource := pb.NmsRecvMetricRequestBody_Resource{
  148. CmdbId: &instanceMetric.InstanceID,
  149. Success: &success,
  150. }
  151. instanceCPM := strconv.FormatInt(instanceMetric.GetCPM(), 10)
  152. instanceCallCount := strconv.FormatInt(instanceMetric.GetCallCount(), 10)
  153. instanceFailedCount := strconv.FormatInt(instanceMetric.GetFailCount(), 10)
  154. instanceExceptionCount := strconv.FormatInt(instanceMetric.GetExceptionCount(), 10)
  155. instanceAvgResponseTime := strconv.FormatInt(instanceMetric.GetAvgResponseTimeMS(), 10)
  156. serviceInstanceResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{
  157. {
  158. MetricCode: &serviceInstanceNameInstanceID,
  159. Success: &success,
  160. MetricValue: &instanceMetric.InstanceID,
  161. },
  162. {
  163. MetricCode: &serviceInstanceNameInstanceIP,
  164. Success: &success,
  165. MetricValue: &instanceMetric.InstanceID,
  166. },
  167. {
  168. MetricCode: &serviceInstanceNameCPM,
  169. Success: &success,
  170. MetricValue: &instanceCPM,
  171. },
  172. {
  173. MetricCode: &serviceInstanceNameCallCount,
  174. Success: &success,
  175. MetricValue: &instanceCallCount,
  176. },
  177. {
  178. MetricCode: &serviceInstanceNameFailCount,
  179. Success: &success,
  180. MetricValue: &instanceFailedCount,
  181. },
  182. {
  183. MetricCode: &serviceInstanceNameExceptionCount,
  184. Success: &success,
  185. MetricValue: &instanceExceptionCount,
  186. },
  187. {
  188. MetricCode: &serviceInstanceNameAvgResponseTime,
  189. Success: &success,
  190. MetricValue: &instanceAvgResponseTime,
  191. },
  192. }
  193. serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType,
  194. &pb.NmsRecvMetricRequestBody_ResourceType{
  195. ResourceTypeCode: &serviceInstanceNameInstanceResourceType,
  196. Success: &success,
  197. Resource: []*pb.NmsRecvMetricRequestBody_Resource{&serviceInstanceResource},
  198. })
  199. for _, endPointMetrics := range instanceMetric.EndpointMetrics {
  200. endPointResource := pb.NmsRecvMetricRequestBody_Resource{
  201. CmdbId: &data.CmdbID,
  202. Success: &success,
  203. }
  204. mt := endPointMetrics.Metrics
  205. epCPM := strconv.FormatInt(mt.GetCallPerMinute(), 10)
  206. cpCallCount := strconv.FormatInt(mt.GetCallCount(), 10)
  207. epFailedCount := strconv.FormatInt(mt.GetFailedCount(), 10)
  208. epExceptionCount := strconv.FormatInt(mt.GetExceptionCount(), 10)
  209. epAvgResponseTime := strconv.FormatInt(mt.GetAvgResponseTimeMS(), 10)
  210. epApdex := strconv.FormatFloat(mt.GetApdex(), 'f', 2, 64)
  211. epSlowCount := strconv.FormatInt(mt.GetSlowCount(), 10)
  212. epStatementCount := strconv.FormatInt(mt.GetStatementCount(), 10)
  213. endPointResource.Metric = []*pb.NmsRecvMetricRequestBody_Metric{
  214. {
  215. MetricCode: &metricNameEndpointName,
  216. Success: &success,
  217. MetricValue: &endPointMetrics.Endpoint,
  218. },
  219. {
  220. MetricCode: &metricNameEndpointType,
  221. Success: &success,
  222. MetricValue: &metricEndpointType,
  223. },
  224. {
  225. MetricCode: &metricNameCPM,
  226. Success: &success,
  227. MetricValue: &epCPM,
  228. },
  229. {
  230. MetricCode: &metricNameCallCount,
  231. Success: &success,
  232. MetricValue: &cpCallCount,
  233. },
  234. {
  235. MetricCode: &metricNameFailCount,
  236. Success: &success,
  237. MetricValue: &epFailedCount,
  238. },
  239. {
  240. MetricCode: &metricNameExceptionCount,
  241. Success: &success,
  242. MetricValue: &epExceptionCount,
  243. },
  244. {
  245. MetricCode: &metricNameAvgResponseTime,
  246. Success: &success,
  247. MetricValue: &epAvgResponseTime,
  248. },
  249. {
  250. MetricCode: &metricNameApdex,
  251. Success: &success,
  252. MetricValue: &epApdex,
  253. },
  254. {
  255. MetricCode: &metricNameSlowCount,
  256. Success: &success,
  257. MetricValue: &epSlowCount,
  258. },
  259. {
  260. MetricCode: &metricNameStatementCount,
  261. Success: &success,
  262. MetricValue: &epStatementCount,
  263. },
  264. }
  265. endpointResourceType := pb.NmsRecvMetricRequestBody_ResourceType{
  266. ResourceTypeCode: &endpointResourceTypeCode,
  267. Success: &success,
  268. }
  269. endpointResourceType.Resource = []*pb.NmsRecvMetricRequestBody_Resource{&endPointResource}
  270. serviceMetricResource.SubResourceType = append(serviceMetricResource.SubResourceType, &endpointResourceType)
  271. }
  272. }
  273. appResourceType := pb.NmsRecvMetricRequestBody_ResourceType{
  274. ResourceTypeCode: &metricNameService,
  275. Success: &success,
  276. }
  277. appResourceType.Resource = append(appResourceType.Resource, &serviceMetricResource)
  278. appResource := pb.NmsRecvMetricRequestBody_Resource{
  279. CmdbId: &data.CmdbID,
  280. Success: &success,
  281. SubResourceType: []*pb.NmsRecvMetricRequestBody_ResourceType{&appResourceType},
  282. }
  283. return &appResource
  284. }
  285. func (s *TraceKafkaSink) sendMetric(metricData ServiceMetricData, writer *kafka.Writer) {
  286. metricCls := "PERF"
  287. version := "1.0.0"
  288. serviceCode := "apm"
  289. userID := ""
  290. bgID := ""
  291. userToken := ""
  292. loc, _ := time.LoadLocation("Asia/Shanghai")
  293. dtStr := time.Now().In(loc).Format("2006-01-02 15:04:05")
  294. intervalUnitStr := "MI"
  295. intervalValue := int32(5)
  296. requestBody := pb.NmsRecvMetricRequestBody{
  297. MetricCls: &metricCls,
  298. DataTime: &dtStr,
  299. IntervalUnit: &intervalUnitStr,
  300. IntervalValue: &intervalValue,
  301. ResourceType: nil,
  302. }
  303. success := true
  304. resourceTypeCode := "App"
  305. serviceResource := s.parseMetricResources(metricData)
  306. resourceType := pb.NmsRecvMetricRequestBody_ResourceType{
  307. ResourceTypeCode: &resourceTypeCode,
  308. Success: &success,
  309. }
  310. resourceType.Resource = append(resourceType.Resource, serviceResource)
  311. requestBody.ResourceType = append(requestBody.ResourceType, &resourceType)
  312. head := pb.NmsMonMsRequestHead{
  313. Version: &version,
  314. ServiceCode: &serviceCode,
  315. UserId: &userID,
  316. BgId: &bgID,
  317. UserToken: &userToken,
  318. }
  319. nmsRequest := pb.NmsRecvMetricRequest{
  320. Head: &head,
  321. Body: &requestBody,
  322. }
  323. instanceCount := len(metricData.InstanceMetrics)
  324. for _, instanceMetric := range metricData.InstanceMetrics {
  325. for _, endpointMetric := range instanceMetric.EndpointMetrics {
  326. s.metricCounter.With(prometheus.Labels{
  327. "serviceName": metricData.ServiceName,
  328. "instance": instanceMetric.InstanceID,
  329. "endpoint": endpointMetric.Endpoint,
  330. }).Inc()
  331. }
  332. }
  333. s.logger.Infof("[i6000]: [发送] [metric], 时间: %s 包含ServiceMetricResource:serviceName:%s, "+
  334. "instanceCount%d: endpoints:%s", dtStr, metricData.ServiceName, instanceCount, metricData.GetEndPointsStr())
  335. bs, errMarshal := proto.Marshal(&nmsRequest)
  336. if errMarshal != nil {
  337. s.logger.Errorf("marshal nmsReceiveMetric failed: %v", errMarshal)
  338. }
  339. headers := []kafka.Header{
  340. {
  341. Key: "groupID",
  342. Value: []byte(s.rmqMetricGroupID),
  343. },
  344. {
  345. Key: "topic",
  346. Value: []byte(s.rmqMetricTopic),
  347. },
  348. }
  349. msg := kafka.Message{
  350. Value: bs,
  351. Headers: headers,
  352. }
  353. if errWrite := writer.WriteMessages(context.Background(), msg); errWrite != nil {
  354. s.logger.Errorf("write messages failed: %v", errWrite)
  355. }
  356. }
  357. func (s *TraceKafkaSink) sendTrace(topoData TopoData, wr *kafka.Writer) {
  358. dt := time.Now().Format("2006-01-02 15:04:05")
  359. s.topoCounter.With(prometheus.Labels{
  360. "txServiceName": topoData.TxServiceName,
  361. "txEndpoint": topoData.TxEndpoint,
  362. "srcServiceName": topoData.SrcServiceName,
  363. "srcEndpoint": topoData.SrcEndpoint,
  364. "destServiceName": topoData.DestServiceName,
  365. "destEndpoint": topoData.DestEndpoint,
  366. }).Inc()
  367. s.logger.Infof("[i6000]: [发送] [topo], 包含topo:,%s,%s,%s,%s,%s,%s,%s", dt,
  368. topoData.TxServiceName, topoData.TxEndpoint, topoData.SrcServiceName, topoData.SrcEndpoint,
  369. topoData.DestServiceName, topoData.DestEndpoint)
  370. intervalMinuteUnit := "MI"
  371. intervalMinute := int32(5)
  372. destNodeType := pb.NodeType_SERVICE
  373. callCount := strconv.FormatInt(topoData.RequestCount, 10)
  374. failCount := strconv.FormatInt(topoData.FailedCount, 10)
  375. avgResponseTime := strconv.FormatInt(topoData.AvgResponseTime, 10)
  376. txLink := pb.ApmTxLink{
  377. DataTime: &dt,
  378. IntervalUnit: &intervalMinuteUnit,
  379. IntervalValue: &intervalMinute,
  380. TxEndPointName: &topoData.TxEndpoint,
  381. TxServiceCode: &topoData.TxServiceName,
  382. TxCmdbId: &topoData.TxCmdbID,
  383. SrcApmEndpointName: &topoData.SrcEndpoint,
  384. SrcApmServiceCode: &topoData.SrcServiceName,
  385. SrcAppCmdbId: &topoData.SrcCmdbID,
  386. DestNodeType: &destNodeType,
  387. DestApmEndpointName: &topoData.DestEndpoint,
  388. DestApmServiceCode: &topoData.DestServiceName,
  389. DestAppCmdbId: &topoData.DestCmdbID,
  390. APMServiceCallCount: &callCount,
  391. APMServiceErrorCount: &failCount,
  392. APMServiceAvgResponseTime: &avgResponseTime,
  393. }
  394. linkType := pb.LinkMessage_tx
  395. linkMessage := pb.LinkMessage{
  396. ApmTxLink: &txLink,
  397. LinkType: &linkType,
  398. }
  399. bs, errMarshal := proto.Marshal(&linkMessage)
  400. if errMarshal != nil {
  401. s.logger.Errorf("marshal flat trace failed: %v", errMarshal)
  402. }
  403. headers := []kafka.Header{
  404. {
  405. Key: "groupID",
  406. Value: []byte(s.rmqTopoGroupID),
  407. },
  408. {
  409. Key: "topic",
  410. Value: []byte(s.rmqTopoTopic),
  411. },
  412. }
  413. msg := kafka.Message{
  414. Value: bs,
  415. Headers: headers,
  416. }
  417. if errWrite := wr.WriteMessages(context.Background(), msg); errWrite != nil {
  418. s.logger.Errorf("write messages failed: %v", errWrite)
  419. }
  420. }
  421. func (s *TraceKafkaSink) consumeRoutine(ctx context.Context) {
  422. wr := kafka.Writer{
  423. Addr: kafka.TCP(s.kafkaBrokers...),
  424. Topic: s.kafkaTopic,
  425. Balancer: &kafka.LeastBytes{},
  426. WriteTimeout: 10 * time.Second,
  427. BatchSize: 100,
  428. }
  429. loop:
  430. for {
  431. select {
  432. case <-ctx.Done():
  433. break loop
  434. case trace := <-s.topoCh:
  435. s.sendTrace(trace, &wr)
  436. case metricData := <-s.metricChan:
  437. s.sendMetric(metricData, &wr)
  438. }
  439. }
  440. s.logger.Infof("stop consume routine")
  441. }