123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package metrics
- import (
- "context"
- "fmt"
- "go-admin/app/observe/models/query"
- "go-admin/common/opentelemetry"
- "go-admin/common/utils"
- "strings"
- "sync"
- "time"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/ptrace"
- conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/metric"
- )
- type ObserveBizDuration struct {
- GaugeValues *sync.Map
- metric.Float64Histogram
- metric.Float64ObservableGauge
- }
- func NewObserveBizDuration(name string) ObserveBizDuration {
- var err error
- result := ObserveBizDuration{
- GaugeValues: &sync.Map{},
- }
- provider := otel.GetMeterProvider()
- meter := provider.Meter("observe.consumer.metrics")
- h, err := meter.Float64Histogram(
- "observe.biz.node.duration",
- metric.WithDescription("observe服务端处理时延统计"),
- metric.WithUnit("ms"),
- )
- if err != nil {
- panic(err)
- }
- result.Float64Histogram = h
- g, err := meter.Float64ObservableGauge(
- "observe.biz.node.duration.max",
- metric.WithDescription("observe服务端最大时延统计"),
- metric.WithUnit("ms"),
- metric.WithFloat64Callback(func(ctx context.Context, fo metric.Float64Observer) error {
- result.GaugeValues.Range(func(key, value any) bool {
- attrs := key.(metric.MeasurementOption)
- duration := value.(float64)
- fo.Observe(duration, attrs)
- // 如果该attrs如duration没有变化,则删除
- // 如果有变化,则必然是一个更大的值,此时不能删除
- result.GaugeValues.CompareAndDelete(attrs, duration)
- return true
- })
- log.Infof("gauge回调成功: %s", time.Now())
- return nil
- }),
- )
- if err != nil {
- panic(err)
- }
- result.Float64ObservableGauge = g
- return result
- }
- func (h ObserveBizDuration) Record(wg *sync.WaitGroup, ch <-chan struct{}, span ptrace.Span, resource pcommon.Resource) {
- defer wg.Done()
- defer func() {
- <-ch
- }()
- resAttrs := opentelemetry.NewResourceAttributes(&resource)
- spanAttrs := opentelemetry.NewSpanAttributesWithRaw(span.Attributes())
- appAlias, _ := resAttrs.AppName() // default UNSET
- if appAlias == "UNSET" && config.ApplicationConfig.Mode == "prod" { // 为了降低数据量, 客户生产暂时不再捕获未设置AppAlias的数据
- return
- }
- duration := span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Milliseconds()
- serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName)
- if !ok {
- log.Errorf("获取service name失败: %s %s", appAlias, span.SpanID().String())
- return
- }
- if serviceName.AsString() == "distribution-rushrepair-job" { // 过滤这个
- return
- }
- spanKind := fmt.Sprintf("SPAN_KIND_%s", strings.ToUpper(span.Kind().String()))
- // 通过 appAlias serviceName spanName spanKind计算 bizNodeHash
- bizNodeHash := utils.SimpleHash(appAlias, serviceName.AsString(), span.Name(), spanKind)
- log.Tracef("%s, %s, %s, %s", appAlias, serviceName.AsString(), span.Name(), spanKind)
- bizHash, err := query.NewBizNode().Hash2BizHashFast(appAlias, bizNodeHash)
- if err != nil {
- log.Errorf("获取biz hash失败: span:%s err:%s", span.SpanID().String(), err.Error())
- return
- }
- if bizHash == "" {
- log.Warnf("未获取到biz hash, 可能是由于trace不完整导致的: trace: %s, span: %s", span.TraceID().String(), span.SpanID().String())
- }
- bizHashs := []string{}
- if strings.Contains(bizHash, ",") {
- bizHashs = strings.Split(bizHash, ",")
- } else {
- bizHashs = append(bizHashs, bizHash)
- }
- log.Tracef("----------- app.alias: %s, biz.hash: %s, biz.node.hash: %s", appAlias, bizHash, bizNodeHash)
- for _, bizHash := range bizHashs {
- attrs := metric.WithAttributes(
- attribute.String("app.alias", appAlias),
- attribute.String("biz.hash", bizHash),
- attribute.String("biz.node.hash", bizNodeHash),
- attribute.String("service.name", serviceName.AsString()),
- attribute.String("span.name", span.Name()),
- attribute.String("span.kind", spanKind),
- attribute.Bool("root", span.ParentSpanID().IsEmpty()),
- attribute.Bool("error", span.Status().Code() == ptrace.StatusCodeError || spanAttrs.StatusCode() >= 400),
- attribute.Bool("database", spanAttrs.IsDB()),
- )
- h.Float64Histogram.Record(context.Background(), float64(duration), attrs)
- h.setGaugeValue(attrs, float64(duration))
- }
- }
- func (h ObserveBizDuration) setGaugeValue(attrs metric.MeasurementOption, duration float64) {
- if value, loaded := h.GaugeValues.LoadOrStore(attrs, duration); loaded {
- prevDuration := value.(float64)
- if prevDuration < duration {
- h.GaugeValues.Store(attrs, duration)
- }
- }
- }
|