package query import ( "fmt" "go-admin/app/observe/models" "go-admin/common/utils" "math/rand" "strconv" "strings" "sync" "time" "github.com/pkg/errors" ) type biz struct { Query } func NewBiz() biz { q := Query{} q.Init() return biz{q} } const appBizIdKey = "observe__biz_idhash_app_%s" func (q biz) BizHash2ID(hash string) (int64, error) { biz := models.Biz{} if err := q.db.Model(&models.Biz{}). Where("hash", hash). Limit(1).Find(&biz).Error; err != nil { return 0, errors.Wrap(err, "获取biz id失败") } return biz.ID, nil } func (q biz) BizHash2IDCache(appAlias, hash string) (int64, error) { key := fmt.Sprintf(appBizIdKey, appAlias) idstr, err := q.rdb.HGet(key, hash).Result() if err == nil && idstr != "" { id, _ := strconv.ParseInt(idstr, 10, 64) return id, nil } if id, err := q.BizHash2ID(hash); err == nil && id > 0 { q.rdb.HSet(key, hash, id) // 即使是0,也进行set, 为sha // 由于可能存在 初始时未被标记为业务(自动生成)后面又被手动标记为业务的情况,所以设置过期时间防止手动添加业务后没有更新 q.rdb.Expire(key, time.Minute*5) return id, nil } else { return 0, err } } // 添加业务(仅用于自动创建),如果对应业务已存在,直接返回业务id func (q biz) CheckAddBiz(appId int64, appAlias, serviceName, spanName, spanKind string, spanAttrs map[string]string) (int64, string, error) { reqMap := map[string]string{ "GET": "GET请求", "HTTP GET": "GET请求", "POST": "POST请求", "HTTP POST": "POST请求", "PUT": "PUT请求", "HTTP PUT": "PUT请求", "DELETE": "DELETE请求", "HTTP DELETE": "DELETE请求", } // oSpanName := spanName // attr := opentelemetry.NewSpanAttributes(spanAttrs) // spanName = attr.WrapSpanName(spanName, spanKind) hash := utils.SimpleHash(appAlias, serviceName, spanName, spanKind) id, _ := q.BizHash2IDCache(appAlias, hash) if id == 0 { bizName := spanName if spanKind == "SPAN_KIND_CLIENT" { // if name, ok := reqMap[oSpanName]; ok { if name, ok := reqMap[spanName]; ok { bizName = fmt.Sprintf("发送%s", name) } // 暂时不重写 span name, 因为这么写的话, 不容易从 otel_traces 表反推出业务来, 导致计算业务相关的统计信息比较困难, 后面考虑直接从源头修改 span name // if spanName != oSpanName { // method, route, _ := attr.UnWrapSpanName(spanName) // if name, err := NewUrlMapping().UrlToNameCache(appAlias, method, route); err == nil && name != "" { // bizName = name // } // } } else if spanKind == "SPAN_KIND_SERVER" { // 针对格式为 GET /api/v1/xxx 这种格式的span name, 自动获取url对应的名称 if name, ok := reqMap[spanName]; ok { bizName = fmt.Sprintf("接收%s", name) } sns := strings.Split(spanName, " ") if len(sns) == 2 { if _, ok := reqMap[sns[0]]; ok { if name, err := NewUrlMapping().UrlToNameCache(appAlias, sns[0], sns[1]); err == nil && name != "" { bizName = name } } } else if len(sns) == 1 { if strings.HasPrefix(spanName, "/") { if name, err := NewUrlMapping().UrlToNameCache(appAlias, "", spanName); err == nil && name != "" { bizName = name } } } } biz := models.Biz{ Name: bizName, Hash: hash, AppID: appId, AppAlias: appAlias, ServiceName: serviceName, SpanName: spanName, SpanKind: spanKind, IsAutoCreated: 1, } id, _ = q.BizHash2ID(hash) if id > 0 { return id, hash, nil } if err := q.db.Model(&models.Biz{}).Create(&biz).Error; err != nil { return 0, "", err } id = int64(biz.ID) q.rdb.HSet(fmt.Sprintf(appBizIdKey, appAlias), hash, id) } return id, hash, nil } type bizNode struct { Query } func NewBizNode() bizNode { q := Query{} q.Init() return bizNode{q} } func (q bizNode) BizNodeHash2ID(hash string) (int64, error) { id := int64(0) if err := q.db.Model(&models.BizNode{}). Where("hash", hash). Limit(1).Pluck("id", &id).Error; err != nil { return 0, errors.Wrap(err, "查询biz node id失败") } return id, nil } // 获取biz node id func (q bizNode) BizNodeHash2IDCache(appAlias, hash string) (int64, error) { key := fmt.Sprintf("observe__biz_node_idhash_app_%s", appAlias) idstr, _ := q.rdb.HGet(key, hash).Result() if idstr != "" { id, err := strconv.Atoi(idstr) return int64(id), err } var err error id := int64(0) if id, err = q.BizNodeHash2ID(hash); err != nil { return id, err } q.rdb.HSet(key, hash, id) q.rdb.Expire(key, time.Hour*24) // node结点不会发生变化,通常只会增加,所以缓存时间长一些 return id, nil } // 添加结点数据 func (q bizNode) CheckAddBizNode(node *models.BizNode) (int64, error) { hash := node.Hash if hash == "" { hash = utils.SimpleHash(node.AppAlias, node.ServiceName, node.SpanName, node.SpanKind) node.Hash = hash } id, err := q.BizNodeHash2IDCache(node.AppAlias, hash) if err != nil || id == 0 { if id, _ := q.BizNodeHash2ID(hash); id > 0 { return id, nil } // 插入数据库,返回node id todo result := q.db.Model(&models.BizNode{}).Create(&node) if result.Error != nil { return 0, result.Error } id = node.ID // 再执行一次, 缓存 id hash q.BizNodeHash2IDCache(node.AppAlias, hash) } node.ID = id return id, nil } func (q bizNode) Hash2BizHash(appAlias, hash string) (string, error) { id := int64(0) if err := q.db.Model(&models.BizNode{}).Where("app_alias", appAlias).Where("hash", hash).Pluck("id", &id).Error; err != nil { return "", errors.Wrap(err, "获取biz node id失败") } if id == 0 { return "", errors.New("未获取到id:" + hash) } bizHashs := []string{} if err := q.db.Model(&models.BizEdge{}).Distinct("biz_hash").Where("app_alias=? and (source=? or target=?)", appAlias, id, id).Pluck("biz_hash", &bizHashs).Error; err != nil { return "", errors.Wrap(err, "获取biz hash失败") } bizHash := strings.Join(bizHashs, ",") // if bizHash == "" { // return "", errors.New("node hash不存在:" + hash) // } return bizHash, nil } func (q bizNode) Hash2BizHashCache(appAlias, hash string) (string, error) { key := "observe__biz_hash2bizhash_" + appAlias + "_" + hash bizHash, err := q.rdb.Get(key).Result() if err == nil { return bizHash, nil } bizHash, err = q.Hash2BizHash(appAlias, hash) q.rdb.Set(key, bizHash, time.Minute*5) // 即使未正确获取到,也缓存起来,防止重复查询 if err != nil { return "", err } return bizHash, nil } type hashMap struct { ExpireAt time.Time SafeMap *sync.Map } var hash2BizHash sync.Map = sync.Map{} func (q bizNode) Hash2BizHashFast(appAlias, bizNodeHash string) (string, error) { if hmi, ok := hash2BizHash.Load(appAlias); ok { hm := hmi.(*hashMap) if time.Now().Before(hm.ExpireAt) { if t, ok := hm.SafeMap.Load(bizNodeHash); ok { return t.(string), nil } else { return "", errors.New(fmt.Sprintf("缓存中未获取到biz hash, appAlias: %s, bizNodeHash: %s", appAlias, bizNodeHash)) } } } // 分布式锁 lock := fmt.Sprintf("hashmap_lock_%s", appAlias) b, err := q.rdb.SetNX(lock, 1, time.Second*30).Result() if err != nil { return "", errors.Wrap(err, fmt.Sprintf("分布式锁设置失败: %s", appAlias)) } if !b { return "", errors.New(fmt.Sprintf("分布式锁生效中: %s", appAlias)) } defer q.rdb.Del(lock) // 批量读取特定app_alias下的所有业务结点hash idhashs := []struct { Id int64 Hash string }{} if err := q.db.Model(&models.BizNode{}).Select("id, hash").Where("app_alias", appAlias).Find(&idhashs).Error; err != nil { return "", errors.Wrap(err, fmt.Sprintf("获取应用 %s 下的 id hash 失败", appAlias)) } id2hash := map[int64]string{} for _, idhash := range idhashs { id2hash[idhash.Id] = idhash.Hash } idBizHashs := []struct { Source int64 Target int64 BizHash string }{} if err := q.db.Model(&models.BizEdge{}).Select("source, target, biz_hash").Where("app_alias", appAlias).Find(&idBizHashs).Error; err != nil { return "", errors.Wrap(err, "获取biz hash失败") } seconds := time.Second * time.Duration(rand.Intn(600)+300) // 缓存 10~15 分钟, 目的就防止同时失效导致多个应用并发读库 hm := &hashMap{ ExpireAt: time.Now().Add(seconds), SafeMap: &sync.Map{}, } bizHash, err := "", errors.New(fmt.Sprintf("bizNodeHash:%s 找不到对应的 bizHash", bizNodeHash)) for _, idBizHash := range idBizHashs { if hash, ok := id2hash[idBizHash.Source]; ok { hm.SafeMap.Store(hash, idBizHash.BizHash) if hash == bizNodeHash { bizHash, err = idBizHash.BizHash, nil } } if hash, ok := id2hash[idBizHash.Target]; ok { hm.SafeMap.Store(hash, idBizHash.BizHash) if hash == bizNodeHash { bizHash, err = idBizHash.BizHash, nil } } } hash2BizHash.Store(appAlias, hm) return bizHash, err }