observe_biz_duration.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package metrics
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/app/observe/models/query"
  6. "go-admin/common/opentelemetry"
  7. "go-admin/common/utils"
  8. "strings"
  9. "sync"
  10. "time"
  11. log "github.com/go-admin-team/go-admin-core/logger"
  12. "github.com/go-admin-team/go-admin-core/sdk/config"
  13. "go.opentelemetry.io/collector/pdata/pcommon"
  14. "go.opentelemetry.io/collector/pdata/ptrace"
  15. conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
  16. "go.opentelemetry.io/otel"
  17. "go.opentelemetry.io/otel/attribute"
  18. "go.opentelemetry.io/otel/metric"
  19. )
  20. type ObserveBizDuration struct {
  21. GaugeValues *sync.Map
  22. metric.Float64Histogram
  23. metric.Float64ObservableGauge
  24. }
  25. func NewObserveBizDuration(name string) ObserveBizDuration {
  26. var err error
  27. result := ObserveBizDuration{
  28. GaugeValues: &sync.Map{},
  29. }
  30. provider := otel.GetMeterProvider()
  31. meter := provider.Meter("observe.consumer.metrics")
  32. h, err := meter.Float64Histogram(
  33. "observe.biz.node.duration",
  34. metric.WithDescription("observe服务端处理时延统计"),
  35. metric.WithUnit("ms"),
  36. )
  37. if err != nil {
  38. panic(err)
  39. }
  40. result.Float64Histogram = h
  41. g, err := meter.Float64ObservableGauge(
  42. "observe.biz.node.duration.max",
  43. metric.WithDescription("observe服务端最大时延统计"),
  44. metric.WithUnit("ms"),
  45. metric.WithFloat64Callback(func(ctx context.Context, fo metric.Float64Observer) error {
  46. result.GaugeValues.Range(func(key, value any) bool {
  47. attrs := key.(metric.MeasurementOption)
  48. duration := value.(float64)
  49. fo.Observe(duration, attrs)
  50. // 如果该attrs如duration没有变化,则删除
  51. // 如果有变化,则必然是一个更大的值,此时不能删除
  52. result.GaugeValues.CompareAndDelete(attrs, duration)
  53. return true
  54. })
  55. log.Infof("gauge回调成功: %s", time.Now())
  56. return nil
  57. }),
  58. )
  59. if err != nil {
  60. panic(err)
  61. }
  62. result.Float64ObservableGauge = g
  63. return result
  64. }
  65. func (h ObserveBizDuration) Record(wg *sync.WaitGroup, ch <-chan struct{}, span ptrace.Span, resource pcommon.Resource) {
  66. defer wg.Done()
  67. defer func() {
  68. <-ch
  69. }()
  70. resAttrs := opentelemetry.NewResourceAttributes(&resource)
  71. spanAttrs := opentelemetry.NewSpanAttributesWithRaw(span.Attributes())
  72. appAlias, _ := resAttrs.AppName() // default UNSET
  73. if appAlias == "UNSET" && config.ApplicationConfig.Mode == "prod" { // 为了降低数据量, 客户生产暂时不再捕获未设置AppAlias的数据
  74. return
  75. }
  76. duration := span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Milliseconds()
  77. serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName)
  78. if !ok {
  79. log.Errorf("获取service name失败: %s %s", appAlias, span.SpanID().String())
  80. return
  81. }
  82. if serviceName.AsString() == "distribution-rushrepair-job" { // 过滤这个
  83. return
  84. }
  85. spanKind := fmt.Sprintf("SPAN_KIND_%s", strings.ToUpper(span.Kind().String()))
  86. // 通过 appAlias serviceName spanName spanKind计算 bizNodeHash
  87. bizNodeHash := utils.SimpleHash(appAlias, serviceName.AsString(), span.Name(), spanKind)
  88. log.Tracef("%s, %s, %s, %s", appAlias, serviceName.AsString(), span.Name(), spanKind)
  89. bizHash, err := query.NewBizNode().Hash2BizHashFast(appAlias, bizNodeHash)
  90. if err != nil {
  91. log.Errorf("获取biz hash失败: span:%s err:%s", span.SpanID().String(), err.Error())
  92. return
  93. }
  94. if bizHash == "" {
  95. log.Warnf("未获取到biz hash, 可能是由于trace不完整导致的: trace: %s, span: %s", span.TraceID().String(), span.SpanID().String())
  96. }
  97. bizHashs := []string{}
  98. if strings.Contains(bizHash, ",") {
  99. bizHashs = strings.Split(bizHash, ",")
  100. } else {
  101. bizHashs = append(bizHashs, bizHash)
  102. }
  103. log.Tracef("----------- app.alias: %s, biz.hash: %s, biz.node.hash: %s", appAlias, bizHash, bizNodeHash)
  104. for _, bizHash := range bizHashs {
  105. attrs := metric.WithAttributes(
  106. attribute.String("app.alias", appAlias),
  107. attribute.String("biz.hash", bizHash),
  108. attribute.String("biz.node.hash", bizNodeHash),
  109. attribute.String("service.name", serviceName.AsString()),
  110. attribute.String("span.name", span.Name()),
  111. attribute.String("span.kind", spanKind),
  112. attribute.Bool("root", span.ParentSpanID().IsEmpty()),
  113. attribute.Bool("error", span.Status().Code() == ptrace.StatusCodeError || spanAttrs.StatusCode() >= 400),
  114. attribute.Bool("database", spanAttrs.IsDB()),
  115. )
  116. h.Float64Histogram.Record(context.Background(), float64(duration), attrs)
  117. h.setGaugeValue(attrs, float64(duration))
  118. }
  119. }
  120. func (h ObserveBizDuration) setGaugeValue(attrs metric.MeasurementOption, duration float64) {
  121. if value, loaded := h.GaugeValues.LoadOrStore(attrs, duration); loaded {
  122. prevDuration := value.(float64)
  123. if prevDuration < duration {
  124. h.GaugeValues.Store(attrs, duration)
  125. }
  126. }
  127. }