biz.query.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package query
  2. import (
  3. "fmt"
  4. "go-admin/app/observe/models"
  5. "go-admin/common/utils"
  6. "math/rand"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/pkg/errors"
  12. )
  13. type biz struct {
  14. Query
  15. }
  16. func NewBiz() biz {
  17. q := Query{}
  18. q.Init()
  19. return biz{q}
  20. }
  21. const appBizIdKey = "observe__biz_idhash_app_%s"
  22. func (q biz) BizHash2ID(hash string) (int64, error) {
  23. biz := models.Biz{}
  24. if err := q.db.Model(&models.Biz{}).
  25. Where("hash", hash).
  26. Limit(1).Find(&biz).Error; err != nil {
  27. return 0, errors.Wrap(err, "获取biz id失败")
  28. }
  29. return biz.ID, nil
  30. }
  31. func (q biz) BizHash2IDCache(appAlias, hash string) (int64, error) {
  32. key := fmt.Sprintf(appBizIdKey, appAlias)
  33. idstr, err := q.rdb.HGet(key, hash).Result()
  34. if err == nil && idstr != "" {
  35. id, _ := strconv.ParseInt(idstr, 10, 64)
  36. return id, nil
  37. }
  38. if id, err := q.BizHash2ID(hash); err == nil && id > 0 {
  39. q.rdb.HSet(key, hash, id) // 即使是0,也进行set, 为sha
  40. // 由于可能存在 初始时未被标记为业务(自动生成)后面又被手动标记为业务的情况,所以设置过期时间防止手动添加业务后没有更新
  41. q.rdb.Expire(key, time.Minute*5)
  42. return id, nil
  43. } else {
  44. return 0, err
  45. }
  46. }
  47. // 添加业务(仅用于自动创建),如果对应业务已存在,直接返回业务id
  48. func (q biz) CheckAddBiz(appId int64, appAlias, serviceName, spanName, spanKind string, spanAttrs map[string]string) (int64, string, error) {
  49. reqMap := map[string]string{
  50. "GET": "GET请求",
  51. "HTTP GET": "GET请求",
  52. "POST": "POST请求",
  53. "HTTP POST": "POST请求",
  54. "PUT": "PUT请求",
  55. "HTTP PUT": "PUT请求",
  56. "DELETE": "DELETE请求",
  57. "HTTP DELETE": "DELETE请求",
  58. }
  59. // oSpanName := spanName
  60. // attr := opentelemetry.NewSpanAttributes(spanAttrs)
  61. // spanName = attr.WrapSpanName(spanName, spanKind)
  62. hash := utils.SimpleHash(appAlias, serviceName, spanName, spanKind)
  63. id, _ := q.BizHash2IDCache(appAlias, hash)
  64. if id == 0 {
  65. bizName := spanName
  66. if spanKind == "SPAN_KIND_CLIENT" {
  67. // if name, ok := reqMap[oSpanName]; ok {
  68. if name, ok := reqMap[spanName]; ok {
  69. bizName = fmt.Sprintf("发送%s", name)
  70. }
  71. // 暂时不重写 span name, 因为这么写的话, 不容易从 otel_traces 表反推出业务来, 导致计算业务相关的统计信息比较困难, 后面考虑直接从源头修改 span name
  72. // if spanName != oSpanName {
  73. // method, route, _ := attr.UnWrapSpanName(spanName)
  74. // if name, err := NewUrlMapping().UrlToNameCache(appAlias, method, route); err == nil && name != "" {
  75. // bizName = name
  76. // }
  77. // }
  78. } else if spanKind == "SPAN_KIND_SERVER" { // 针对格式为 GET /api/v1/xxx 这种格式的span name, 自动获取url对应的名称
  79. if name, ok := reqMap[spanName]; ok {
  80. bizName = fmt.Sprintf("接收%s", name)
  81. }
  82. sns := strings.Split(spanName, " ")
  83. if len(sns) == 2 {
  84. if _, ok := reqMap[sns[0]]; ok {
  85. if name, err := NewUrlMapping().UrlToNameCache(appAlias, sns[0], sns[1]); err == nil && name != "" {
  86. bizName = name
  87. }
  88. }
  89. } else if len(sns) == 1 {
  90. if strings.HasPrefix(spanName, "/") {
  91. if name, err := NewUrlMapping().UrlToNameCache(appAlias, "", spanName); err == nil && name != "" {
  92. bizName = name
  93. }
  94. }
  95. }
  96. }
  97. biz := models.Biz{
  98. Name: bizName,
  99. Hash: hash,
  100. AppID: appId,
  101. AppAlias: appAlias,
  102. ServiceName: serviceName,
  103. SpanName: spanName,
  104. SpanKind: spanKind,
  105. IsAutoCreated: 1,
  106. }
  107. id, _ = q.BizHash2ID(hash)
  108. if id > 0 {
  109. return id, hash, nil
  110. }
  111. if err := q.db.Model(&models.Biz{}).Create(&biz).Error; err != nil {
  112. return 0, "", err
  113. }
  114. id = int64(biz.ID)
  115. q.rdb.HSet(fmt.Sprintf(appBizIdKey, appAlias), hash, id)
  116. }
  117. return id, hash, nil
  118. }
  119. type bizNode struct {
  120. Query
  121. }
  122. func NewBizNode() bizNode {
  123. q := Query{}
  124. q.Init()
  125. return bizNode{q}
  126. }
  127. func (q bizNode) BizNodeHash2ID(hash string) (int64, error) {
  128. id := int64(0)
  129. if err := q.db.Model(&models.BizNode{}).
  130. Where("hash", hash).
  131. Limit(1).Pluck("id", &id).Error; err != nil {
  132. return 0, errors.Wrap(err, "查询biz node id失败")
  133. }
  134. return id, nil
  135. }
  136. // 获取biz node id
  137. func (q bizNode) BizNodeHash2IDCache(appAlias, hash string) (int64, error) {
  138. key := fmt.Sprintf("observe__biz_node_idhash_app_%s", appAlias)
  139. idstr, _ := q.rdb.HGet(key, hash).Result()
  140. if idstr != "" {
  141. id, err := strconv.Atoi(idstr)
  142. return int64(id), err
  143. }
  144. var err error
  145. id := int64(0)
  146. if id, err = q.BizNodeHash2ID(hash); err != nil {
  147. return id, err
  148. }
  149. q.rdb.HSet(key, hash, id)
  150. q.rdb.Expire(key, time.Hour*24) // node结点不会发生变化,通常只会增加,所以缓存时间长一些
  151. return id, nil
  152. }
  153. // 添加结点数据
  154. func (q bizNode) CheckAddBizNode(node *models.BizNode) (int64, error) {
  155. hash := node.Hash
  156. if hash == "" {
  157. hash = utils.SimpleHash(node.AppAlias, node.ServiceName, node.SpanName, node.SpanKind)
  158. node.Hash = hash
  159. }
  160. id, err := q.BizNodeHash2IDCache(node.AppAlias, hash)
  161. if err != nil || id == 0 {
  162. if id, _ := q.BizNodeHash2ID(hash); id > 0 {
  163. return id, nil
  164. }
  165. // 插入数据库,返回node id todo
  166. result := q.db.Model(&models.BizNode{}).Create(&node)
  167. if result.Error != nil {
  168. return 0, result.Error
  169. }
  170. id = node.ID
  171. // 再执行一次, 缓存 id hash
  172. q.BizNodeHash2IDCache(node.AppAlias, hash)
  173. }
  174. node.ID = id
  175. return id, nil
  176. }
  177. func (q bizNode) Hash2BizHash(appAlias, hash string) (string, error) {
  178. id := int64(0)
  179. if err := q.db.Model(&models.BizNode{}).Where("app_alias", appAlias).Where("hash", hash).Pluck("id", &id).Error; err != nil {
  180. return "", errors.Wrap(err, "获取biz node id失败")
  181. }
  182. if id == 0 {
  183. return "", errors.New("未获取到id:" + hash)
  184. }
  185. bizHashs := []string{}
  186. if err := q.db.Model(&models.BizEdge{}).Distinct("biz_hash").Where("app_alias=? and (source=? or target=?)", appAlias, id, id).Pluck("biz_hash", &bizHashs).Error; err != nil {
  187. return "", errors.Wrap(err, "获取biz hash失败")
  188. }
  189. bizHash := strings.Join(bizHashs, ",")
  190. // if bizHash == "" {
  191. // return "", errors.New("node hash不存在:" + hash)
  192. // }
  193. return bizHash, nil
  194. }
  195. func (q bizNode) Hash2BizHashCache(appAlias, hash string) (string, error) {
  196. key := "observe__biz_hash2bizhash_" + appAlias + "_" + hash
  197. bizHash, err := q.rdb.Get(key).Result()
  198. if err == nil {
  199. return bizHash, nil
  200. }
  201. bizHash, err = q.Hash2BizHash(appAlias, hash)
  202. q.rdb.Set(key, bizHash, time.Minute*5) // 即使未正确获取到,也缓存起来,防止重复查询
  203. if err != nil {
  204. return "", err
  205. }
  206. return bizHash, nil
  207. }
  208. type hashMap struct {
  209. ExpireAt time.Time
  210. SafeMap *sync.Map
  211. }
  212. var hash2BizHash sync.Map = sync.Map{}
  213. func (q bizNode) Hash2BizHashFast(appAlias, bizNodeHash string) (string, error) {
  214. if hmi, ok := hash2BizHash.Load(appAlias); ok {
  215. hm := hmi.(*hashMap)
  216. if time.Now().Before(hm.ExpireAt) {
  217. if t, ok := hm.SafeMap.Load(bizNodeHash); ok {
  218. return t.(string), nil
  219. } else {
  220. return "", errors.New(fmt.Sprintf("缓存中未获取到biz hash, appAlias: %s, bizNodeHash: %s", appAlias, bizNodeHash))
  221. }
  222. }
  223. }
  224. // 分布式锁
  225. lock := fmt.Sprintf("hashmap_lock_%s", appAlias)
  226. b, err := q.rdb.SetNX(lock, 1, time.Second*30).Result()
  227. if err != nil {
  228. return "", errors.Wrap(err, fmt.Sprintf("分布式锁设置失败: %s", appAlias))
  229. }
  230. if !b {
  231. return "", errors.New(fmt.Sprintf("分布式锁生效中: %s", appAlias))
  232. }
  233. defer q.rdb.Del(lock)
  234. // 批量读取特定app_alias下的所有业务结点hash
  235. idhashs := []struct {
  236. Id int64
  237. Hash string
  238. }{}
  239. if err := q.db.Model(&models.BizNode{}).Select("id, hash").Where("app_alias", appAlias).Find(&idhashs).Error; err != nil {
  240. return "", errors.Wrap(err, fmt.Sprintf("获取应用 %s 下的 id hash 失败", appAlias))
  241. }
  242. id2hash := map[int64]string{}
  243. for _, idhash := range idhashs {
  244. id2hash[idhash.Id] = idhash.Hash
  245. }
  246. idBizHashs := []struct {
  247. Source int64
  248. Target int64
  249. BizHash string
  250. }{}
  251. if err := q.db.Model(&models.BizEdge{}).Select("source, target, biz_hash").Where("app_alias", appAlias).Find(&idBizHashs).Error; err != nil {
  252. return "", errors.Wrap(err, "获取biz hash失败")
  253. }
  254. seconds := time.Second * time.Duration(rand.Intn(600)+300) // 缓存 10~15 分钟, 目的就防止同时失效导致多个应用并发读库
  255. hm := &hashMap{
  256. ExpireAt: time.Now().Add(seconds),
  257. SafeMap: &sync.Map{},
  258. }
  259. bizHash, err := "", errors.New(fmt.Sprintf("bizNodeHash:%s 找不到对应的 bizHash", bizNodeHash))
  260. for _, idBizHash := range idBizHashs {
  261. if hash, ok := id2hash[idBizHash.Source]; ok {
  262. hm.SafeMap.Store(hash, idBizHash.BizHash)
  263. if hash == bizNodeHash {
  264. bizHash, err = idBizHash.BizHash, nil
  265. }
  266. }
  267. if hash, ok := id2hash[idBizHash.Target]; ok {
  268. hm.SafeMap.Store(hash, idBizHash.BizHash)
  269. if hash == bizNodeHash {
  270. bizHash, err = idBizHash.BizHash, nil
  271. }
  272. }
  273. }
  274. hash2BizHash.Store(appAlias, hm)
  275. return bizHash, err
  276. }