metrics.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package metrics
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/common/database"
  6. "go-admin/common/opentelemetry"
  7. "go-admin/common/storage"
  8. extConfig "go-admin/config"
  9. "sync"
  10. "time"
  11. log "github.com/go-admin-team/go-admin-core/logger"
  12. "github.com/panjf2000/ants/v2"
  13. "github.com/go-admin-team/go-admin-core/config/source/file"
  14. "github.com/go-admin-team/go-admin-core/sdk/config"
  15. "github.com/segmentio/kafka-go"
  16. "github.com/spf13/cobra"
  17. "go.opentelemetry.io/collector/pdata/ptrace"
  18. )
  19. var configYml string
  20. var groupId string
  21. var observeServerDuration ObserveServerDuration
  22. var observeBizDuration ObserveBizDuration
  23. const METER_NAME = "observe.consumer.metrics"
  24. const DEFAULT_GROUP_ID = "observe-consumer-metrics-group"
  25. // consumer/biz/bizCmd represents the consumer/biz/biz command
  26. var MetricsCmd = &cobra.Command{
  27. Use: "metrics",
  28. Short: "消费otel-collector, 用于metrics统计",
  29. PreRun: func(cmd *cobra.Command, args []string) {
  30. config.ExtendConfig = &extConfig.ExtConfig
  31. config.Setup(
  32. file.NewSource(file.WithPath(configYml)),
  33. database.SetupWithoutOtel,
  34. storage.Setup,
  35. // olap.Setup,
  36. opentelemetry.Setup,
  37. )
  38. observeServerDuration = NewObserveServerDuration("observe.server.duration")
  39. observeBizDuration = NewObserveBizDuration("observe.biz.duration")
  40. },
  41. Run: func(cmd *cobra.Command, args []string) {
  42. // run()
  43. groupId = extConfig.ExtConfig.Kafka.MetricsGroupId
  44. if groupId == "" {
  45. groupId = DEFAULT_GROUP_ID
  46. // // 自动设置一个groupId
  47. // rdb := config.GetRedisClient()
  48. // key := "observe-consumer-metrics-group"
  49. // locker := fmt.Sprintf("%s:locker", key)
  50. // groupId, err := rdb.Get(key).Result()
  51. // if err != nil {
  52. // log.Errorf("获取group id失败: %s", err)
  53. // return
  54. // }
  55. // if groupId == "" {
  56. // b, err := rdb.SetNX(locker, 1, time.Second*3).Result()
  57. // if err != nil {
  58. // log.Errorf("设置group id分布式锁失败: %s", err)
  59. // return
  60. // }
  61. // if b {
  62. // groupId = fmt.Sprintf("%s-%s", key, time.Now().Format("200601021504"))
  63. // rdb.Set(key, groupId, 0)
  64. // } else {
  65. // time.Sleep(time.Second * 3)
  66. // }
  67. // groupId, _ := rdb.Get(key).Result()
  68. // if groupId == "" {
  69. // log.Errorf("获取group id失败2: %s", err)
  70. // return
  71. // }
  72. // }
  73. }
  74. var maxConsumerNum = extConfig.ExtConfig.Kafka.Consumers
  75. var wg sync.WaitGroup
  76. var p *ants.Pool
  77. task := func() {
  78. defer wg.Done()
  79. run()
  80. }
  81. p, err := ants.NewPool(maxConsumerNum, ants.WithPanicHandler(func(i interface{}) {
  82. log.Errorf("消费者 goroutine task 出现异常: %s, 当前共有%d个goroutine在运行", i, p.Running())
  83. wg.Add(1)
  84. if err := p.Submit(task); err == nil {
  85. log.Infof("重新提交任务成功,当前共计有%d个goroutine在运行", p.Running())
  86. } else {
  87. log.Errorf("重新提交任务失败: %s", err.Error())
  88. }
  89. }))
  90. if err != nil {
  91. log.Fatalf("创建ants pool失败: %s", err)
  92. return
  93. }
  94. defer p.Release()
  95. go func() {
  96. for range time.Tick(time.Second * 3) {
  97. log.Infof("当前共计有%d个goroutine在运行", p.Running())
  98. }
  99. }()
  100. for i := 0; i < maxConsumerNum; i++ {
  101. wg.Add(1)
  102. if err := p.Submit(task); err == nil {
  103. log.Infof("成功提交第%d个任务", i)
  104. }
  105. }
  106. wg.Wait()
  107. },
  108. }
  109. func init() {
  110. MetricsCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
  111. }
  112. func run() {
  113. r := kafka.NewReader(kafka.ReaderConfig{
  114. Brokers: extConfig.ExtConfig.Kafka.Brokers,
  115. Topic: extConfig.ExtConfig.Kafka.Topic,
  116. MaxBytes: 10e6, // 10MB
  117. StartOffset: kafka.LastOffset,
  118. GroupID: groupId,
  119. // CommitInterval: time.Second,
  120. })
  121. defer r.Close()
  122. for {
  123. start := time.Now()
  124. msg, err := r.ReadMessage(context.Background())
  125. if err != nil {
  126. panic(fmt.Sprintf("消费kafka失败:%s", err))
  127. // log.Errorf("消费kafka失败:%s", err)
  128. // break
  129. }
  130. consumeDuration := time.Since(start).Milliseconds()
  131. var td = ptrace.NewTraces()
  132. var jsonUnmarshaler ptrace.JSONUnmarshaler
  133. td, err = jsonUnmarshaler.UnmarshalTraces(msg.Value)
  134. if err != nil {
  135. log.Errorf("解析kafka消息失败: %s", err)
  136. continue
  137. }
  138. scopeTotal, spanTotal := 0, 0
  139. wg := sync.WaitGroup{}
  140. max := extConfig.ExtConfig.Kafka.MaxConcurrency
  141. if max == 0 {
  142. max = 20
  143. }
  144. ch := make(chan struct{}, max)
  145. for i := 0; i < td.ResourceSpans().Len(); i++ {
  146. spans := td.ResourceSpans().At(i)
  147. res := spans.Resource()
  148. // serviceName, _ := res.Attributes().Get("service.name")
  149. scopeLen := spans.ScopeSpans().Len()
  150. scopeTotal += scopeLen
  151. // spanLen := 0
  152. for j := 0; j < scopeLen; j++ {
  153. rs := spans.ScopeSpans().At(j).Spans()
  154. // spanLen += rs.Len()
  155. spanTotal += rs.Len()
  156. for k := 0; k < rs.Len(); k++ {
  157. r := rs.At(k)
  158. ch <- struct{}{}
  159. ch <- struct{}{}
  160. // log.Infof("record 并发数量 %d", len(ch))
  161. wg.Add(2)
  162. go observeServerDuration.Record(&wg, ch, r, res)
  163. go observeBizDuration.Record(&wg, ch, r, res)
  164. }
  165. }
  166. // log.Infof("running... service.name:%s, scope: %d, span: %d", serviceName.AsString(), scopeLen, spanLen)
  167. }
  168. wg.Wait()
  169. // log.Infof("topic: %s, partition: %d, offset: %d, scopes: %d, spans: %d", msg.Topic, msg.Partition, msg.Offset, scopeTotal, spanTotal)
  170. totalDuration := time.Since(start).Milliseconds()
  171. log.Infof("消费耗时 %d ms, 处理耗时 %d ms, 总共耗时 %d ms, spans: %d, partition: %d, offset: %d", consumeDuration, totalDuration-consumeDuration, totalDuration, spanTotal, msg.Partition, msg.Offset)
  172. }
  173. }