123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package query
- import (
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/common/utils"
- "math/rand"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/pkg/errors"
- )
- type biz struct {
- Query
- }
- func NewBiz() biz {
- q := Query{}
- q.Init()
- return biz{q}
- }
- const appBizIdKey = "observe__biz_idhash_app_%s"
- func (q biz) BizHash2ID(hash string) (int64, error) {
- biz := models.Biz{}
- if err := q.db.Model(&models.Biz{}).
- Where("hash", hash).
- Limit(1).Find(&biz).Error; err != nil {
- return 0, errors.Wrap(err, "获取biz id失败")
- }
- return biz.ID, nil
- }
- func (q biz) BizHash2IDCache(appAlias, hash string) (int64, error) {
- key := fmt.Sprintf(appBizIdKey, appAlias)
- idstr, err := q.rdb.HGet(key, hash).Result()
- if err == nil && idstr != "" {
- id, _ := strconv.ParseInt(idstr, 10, 64)
- return id, nil
- }
- if id, err := q.BizHash2ID(hash); err == nil && id > 0 {
- q.rdb.HSet(key, hash, id) // 即使是0,也进行set, 为sha
- // 由于可能存在 初始时未被标记为业务(自动生成)后面又被手动标记为业务的情况,所以设置过期时间防止手动添加业务后没有更新
- q.rdb.Expire(key, time.Minute*5)
- return id, nil
- } else {
- return 0, err
- }
- }
- // 添加业务(仅用于自动创建),如果对应业务已存在,直接返回业务id
- func (q biz) CheckAddBiz(appId int64, appAlias, serviceName, spanName, spanKind string, spanAttrs map[string]string) (int64, string, error) {
- reqMap := map[string]string{
- "GET": "GET请求",
- "HTTP GET": "GET请求",
- "POST": "POST请求",
- "HTTP POST": "POST请求",
- "PUT": "PUT请求",
- "HTTP PUT": "PUT请求",
- "DELETE": "DELETE请求",
- "HTTP DELETE": "DELETE请求",
- }
- // oSpanName := spanName
- // attr := opentelemetry.NewSpanAttributes(spanAttrs)
- // spanName = attr.WrapSpanName(spanName, spanKind)
- hash := utils.SimpleHash(appAlias, serviceName, spanName, spanKind)
- id, _ := q.BizHash2IDCache(appAlias, hash)
- if id == 0 {
- bizName := spanName
- if spanKind == "SPAN_KIND_CLIENT" {
- // if name, ok := reqMap[oSpanName]; ok {
- if name, ok := reqMap[spanName]; ok {
- bizName = fmt.Sprintf("发送%s", name)
- }
- // 暂时不重写 span name, 因为这么写的话, 不容易从 otel_traces 表反推出业务来, 导致计算业务相关的统计信息比较困难, 后面考虑直接从源头修改 span name
- // if spanName != oSpanName {
- // method, route, _ := attr.UnWrapSpanName(spanName)
- // if name, err := NewUrlMapping().UrlToNameCache(appAlias, method, route); err == nil && name != "" {
- // bizName = name
- // }
- // }
- } else if spanKind == "SPAN_KIND_SERVER" { // 针对格式为 GET /api/v1/xxx 这种格式的span name, 自动获取url对应的名称
- if name, ok := reqMap[spanName]; ok {
- bizName = fmt.Sprintf("接收%s", name)
- }
- sns := strings.Split(spanName, " ")
- if len(sns) == 2 {
- if _, ok := reqMap[sns[0]]; ok {
- if name, err := NewUrlMapping().UrlToNameCache(appAlias, sns[0], sns[1]); err == nil && name != "" {
- bizName = name
- }
- }
- } else if len(sns) == 1 {
- if strings.HasPrefix(spanName, "/") {
- if name, err := NewUrlMapping().UrlToNameCache(appAlias, "", spanName); err == nil && name != "" {
- bizName = name
- }
- }
- }
- }
- biz := models.Biz{
- Name: bizName,
- Hash: hash,
- AppID: appId,
- AppAlias: appAlias,
- ServiceName: serviceName,
- SpanName: spanName,
- SpanKind: spanKind,
- IsAutoCreated: 1,
- }
- id, _ = q.BizHash2ID(hash)
- if id > 0 {
- return id, hash, nil
- }
- if err := q.db.Model(&models.Biz{}).Create(&biz).Error; err != nil {
- return 0, "", err
- }
- id = int64(biz.ID)
- q.rdb.HSet(fmt.Sprintf(appBizIdKey, appAlias), hash, id)
- }
- return id, hash, nil
- }
- type bizNode struct {
- Query
- }
- func NewBizNode() bizNode {
- q := Query{}
- q.Init()
- return bizNode{q}
- }
- func (q bizNode) BizNodeHash2ID(hash string) (int64, error) {
- id := int64(0)
- if err := q.db.Model(&models.BizNode{}).
- Where("hash", hash).
- Limit(1).Pluck("id", &id).Error; err != nil {
- return 0, errors.Wrap(err, "查询biz node id失败")
- }
- return id, nil
- }
- // 获取biz node id
- func (q bizNode) BizNodeHash2IDCache(appAlias, hash string) (int64, error) {
- key := fmt.Sprintf("observe__biz_node_idhash_app_%s", appAlias)
- idstr, _ := q.rdb.HGet(key, hash).Result()
- if idstr != "" {
- id, err := strconv.Atoi(idstr)
- return int64(id), err
- }
- var err error
- id := int64(0)
- if id, err = q.BizNodeHash2ID(hash); err != nil {
- return id, err
- }
- q.rdb.HSet(key, hash, id)
- q.rdb.Expire(key, time.Hour*24) // node结点不会发生变化,通常只会增加,所以缓存时间长一些
- return id, nil
- }
- // 添加结点数据
- func (q bizNode) CheckAddBizNode(node *models.BizNode) (int64, error) {
- hash := node.Hash
- if hash == "" {
- hash = utils.SimpleHash(node.AppAlias, node.ServiceName, node.SpanName, node.SpanKind)
- node.Hash = hash
- }
- id, err := q.BizNodeHash2IDCache(node.AppAlias, hash)
- if err != nil || id == 0 {
- if id, _ := q.BizNodeHash2ID(hash); id > 0 {
- return id, nil
- }
- // 插入数据库,返回node id todo
- result := q.db.Model(&models.BizNode{}).Create(&node)
- if result.Error != nil {
- return 0, result.Error
- }
- id = node.ID
- // 再执行一次, 缓存 id hash
- q.BizNodeHash2IDCache(node.AppAlias, hash)
- }
- node.ID = id
- return id, nil
- }
- func (q bizNode) Hash2BizHash(appAlias, hash string) (string, error) {
- id := int64(0)
- if err := q.db.Model(&models.BizNode{}).Where("app_alias", appAlias).Where("hash", hash).Pluck("id", &id).Error; err != nil {
- return "", errors.Wrap(err, "获取biz node id失败")
- }
- if id == 0 {
- return "", errors.New("未获取到id:" + hash)
- }
- bizHashs := []string{}
- if err := q.db.Model(&models.BizEdge{}).Distinct("biz_hash").Where("app_alias=? and (source=? or target=?)", appAlias, id, id).Pluck("biz_hash", &bizHashs).Error; err != nil {
- return "", errors.Wrap(err, "获取biz hash失败")
- }
- bizHash := strings.Join(bizHashs, ",")
- // if bizHash == "" {
- // return "", errors.New("node hash不存在:" + hash)
- // }
- return bizHash, nil
- }
- func (q bizNode) Hash2BizHashCache(appAlias, hash string) (string, error) {
- key := "observe__biz_hash2bizhash_" + appAlias + "_" + hash
- bizHash, err := q.rdb.Get(key).Result()
- if err == nil {
- return bizHash, nil
- }
- bizHash, err = q.Hash2BizHash(appAlias, hash)
- q.rdb.Set(key, bizHash, time.Minute*5) // 即使未正确获取到,也缓存起来,防止重复查询
- if err != nil {
- return "", err
- }
- return bizHash, nil
- }
- type hashMap struct {
- ExpireAt time.Time
- SafeMap *sync.Map
- }
- var hash2BizHash sync.Map = sync.Map{}
- func (q bizNode) Hash2BizHashFast(appAlias, bizNodeHash string) (string, error) {
- if hmi, ok := hash2BizHash.Load(appAlias); ok {
- hm := hmi.(*hashMap)
- if time.Now().Before(hm.ExpireAt) {
- if t, ok := hm.SafeMap.Load(bizNodeHash); ok {
- return t.(string), nil
- } else {
- return "", errors.New(fmt.Sprintf("缓存中未获取到biz hash, appAlias: %s, bizNodeHash: %s", appAlias, bizNodeHash))
- }
- }
- }
- // 分布式锁
- lock := fmt.Sprintf("hashmap_lock_%s", appAlias)
- b, err := q.rdb.SetNX(lock, 1, time.Second*30).Result()
- if err != nil {
- return "", errors.Wrap(err, fmt.Sprintf("分布式锁设置失败: %s", appAlias))
- }
- if !b {
- return "", errors.New(fmt.Sprintf("分布式锁生效中: %s", appAlias))
- }
- defer q.rdb.Del(lock)
- // 批量读取特定app_alias下的所有业务结点hash
- idhashs := []struct {
- Id int64
- Hash string
- }{}
- if err := q.db.Model(&models.BizNode{}).Select("id, hash").Where("app_alias", appAlias).Find(&idhashs).Error; err != nil {
- return "", errors.Wrap(err, fmt.Sprintf("获取应用 %s 下的 id hash 失败", appAlias))
- }
- id2hash := map[int64]string{}
- for _, idhash := range idhashs {
- id2hash[idhash.Id] = idhash.Hash
- }
- idBizHashs := []struct {
- Source int64
- Target int64
- BizHash string
- }{}
- if err := q.db.Model(&models.BizEdge{}).Select("source, target, biz_hash").Where("app_alias", appAlias).Find(&idBizHashs).Error; err != nil {
- return "", errors.Wrap(err, "获取biz hash失败")
- }
- seconds := time.Second * time.Duration(rand.Intn(600)+300) // 缓存 10~15 分钟, 目的就防止同时失效导致多个应用并发读库
- hm := &hashMap{
- ExpireAt: time.Now().Add(seconds),
- SafeMap: &sync.Map{},
- }
- bizHash, err := "", errors.New(fmt.Sprintf("bizNodeHash:%s 找不到对应的 bizHash", bizNodeHash))
- for _, idBizHash := range idBizHashs {
- if hash, ok := id2hash[idBizHash.Source]; ok {
- hm.SafeMap.Store(hash, idBizHash.BizHash)
- if hash == bizNodeHash {
- bizHash, err = idBizHash.BizHash, nil
- }
- }
- if hash, ok := id2hash[idBizHash.Target]; ok {
- hm.SafeMap.Store(hash, idBizHash.BizHash)
- if hash == bizNodeHash {
- bizHash, err = idBizHash.BizHash, nil
- }
- }
- }
- hash2BizHash.Store(appAlias, hm)
- return bizHash, err
- }
|