123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package metrics
- import (
- "context"
- "fmt"
- "go-admin/common/database"
- "go-admin/common/opentelemetry"
- "go-admin/common/storage"
- extConfig "go-admin/config"
- "sync"
- "time"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/panjf2000/ants/v2"
- "github.com/go-admin-team/go-admin-core/config/source/file"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "github.com/segmentio/kafka-go"
- "github.com/spf13/cobra"
- "go.opentelemetry.io/collector/pdata/ptrace"
- )
- var configYml string
- var groupId string
- var observeServerDuration ObserveServerDuration
- var observeBizDuration ObserveBizDuration
- const METER_NAME = "observe.consumer.metrics"
- const DEFAULT_GROUP_ID = "observe-consumer-metrics-group"
- // consumer/biz/bizCmd represents the consumer/biz/biz command
- var MetricsCmd = &cobra.Command{
- Use: "metrics",
- Short: "消费otel-collector, 用于metrics统计",
- PreRun: func(cmd *cobra.Command, args []string) {
- config.ExtendConfig = &extConfig.ExtConfig
- config.Setup(
- file.NewSource(file.WithPath(configYml)),
- database.SetupWithoutOtel,
- storage.Setup,
- // olap.Setup,
- opentelemetry.Setup,
- )
- observeServerDuration = NewObserveServerDuration("observe.server.duration")
- observeBizDuration = NewObserveBizDuration("observe.biz.duration")
- },
- Run: func(cmd *cobra.Command, args []string) {
- // run()
- groupId = extConfig.ExtConfig.Kafka.MetricsGroupId
- if groupId == "" {
- groupId = DEFAULT_GROUP_ID
- // // 自动设置一个groupId
- // rdb := config.GetRedisClient()
- // key := "observe-consumer-metrics-group"
- // locker := fmt.Sprintf("%s:locker", key)
- // groupId, err := rdb.Get(key).Result()
- // if err != nil {
- // log.Errorf("获取group id失败: %s", err)
- // return
- // }
- // if groupId == "" {
- // b, err := rdb.SetNX(locker, 1, time.Second*3).Result()
- // if err != nil {
- // log.Errorf("设置group id分布式锁失败: %s", err)
- // return
- // }
- // if b {
- // groupId = fmt.Sprintf("%s-%s", key, time.Now().Format("200601021504"))
- // rdb.Set(key, groupId, 0)
- // } else {
- // time.Sleep(time.Second * 3)
- // }
- // groupId, _ := rdb.Get(key).Result()
- // if groupId == "" {
- // log.Errorf("获取group id失败2: %s", err)
- // return
- // }
- // }
- }
- var maxConsumerNum = extConfig.ExtConfig.Kafka.Consumers
- var wg sync.WaitGroup
- var p *ants.Pool
- task := func() {
- defer wg.Done()
- run()
- }
- p, err := ants.NewPool(maxConsumerNum, ants.WithPanicHandler(func(i interface{}) {
- log.Errorf("消费者 goroutine task 出现异常: %s, 当前共有%d个goroutine在运行", i, p.Running())
- wg.Add(1)
- if err := p.Submit(task); err == nil {
- log.Infof("重新提交任务成功,当前共计有%d个goroutine在运行", p.Running())
- } else {
- log.Errorf("重新提交任务失败: %s", err.Error())
- }
- }))
- if err != nil {
- log.Fatalf("创建ants pool失败: %s", err)
- return
- }
- defer p.Release()
- go func() {
- for range time.Tick(time.Second * 3) {
- log.Infof("当前共计有%d个goroutine在运行", p.Running())
- }
- }()
- for i := 0; i < maxConsumerNum; i++ {
- wg.Add(1)
- if err := p.Submit(task); err == nil {
- log.Infof("成功提交第%d个任务", i)
- }
- }
- wg.Wait()
- },
- }
- func init() {
- MetricsCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
- }
- func run() {
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: extConfig.ExtConfig.Kafka.Brokers,
- Topic: extConfig.ExtConfig.Kafka.Topic,
- MaxBytes: 10e6, // 10MB
- StartOffset: kafka.LastOffset,
- GroupID: groupId,
- // CommitInterval: time.Second,
- })
- defer r.Close()
- for {
- start := time.Now()
- msg, err := r.ReadMessage(context.Background())
- if err != nil {
- panic(fmt.Sprintf("消费kafka失败:%s", err))
- // log.Errorf("消费kafka失败:%s", err)
- // break
- }
- consumeDuration := time.Since(start).Milliseconds()
- var td = ptrace.NewTraces()
- var jsonUnmarshaler ptrace.JSONUnmarshaler
- td, err = jsonUnmarshaler.UnmarshalTraces(msg.Value)
- if err != nil {
- log.Errorf("解析kafka消息失败: %s", err)
- continue
- }
- scopeTotal, spanTotal := 0, 0
- wg := sync.WaitGroup{}
- max := extConfig.ExtConfig.Kafka.MaxConcurrency
- if max == 0 {
- max = 20
- }
- ch := make(chan struct{}, max)
- for i := 0; i < td.ResourceSpans().Len(); i++ {
- spans := td.ResourceSpans().At(i)
- res := spans.Resource()
- // serviceName, _ := res.Attributes().Get("service.name")
- scopeLen := spans.ScopeSpans().Len()
- scopeTotal += scopeLen
- // spanLen := 0
- for j := 0; j < scopeLen; j++ {
- rs := spans.ScopeSpans().At(j).Spans()
- // spanLen += rs.Len()
- spanTotal += rs.Len()
- for k := 0; k < rs.Len(); k++ {
- r := rs.At(k)
- ch <- struct{}{}
- ch <- struct{}{}
- // log.Infof("record 并发数量 %d", len(ch))
- wg.Add(2)
- go observeServerDuration.Record(&wg, ch, r, res)
- go observeBizDuration.Record(&wg, ch, r, res)
- }
- }
- // log.Infof("running... service.name:%s, scope: %d, span: %d", serviceName.AsString(), scopeLen, spanLen)
- }
- wg.Wait()
- // log.Infof("topic: %s, partition: %d, offset: %d, scopes: %d, spans: %d", msg.Topic, msg.Partition, msg.Offset, scopeTotal, spanTotal)
- totalDuration := time.Since(start).Milliseconds()
- log.Infof("消费耗时 %d ms, 处理耗时 %d ms, 总共耗时 %d ms, spans: %d, partition: %d, offset: %d", consumeDuration, totalDuration-consumeDuration, totalDuration, spanTotal, msg.Partition, msg.Offset)
- }
- }
|