biz.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*
  2. Copyright © 2023 NAME HERE <EMAIL ADDRESS>
  3. */
  4. package biz
  5. import (
  6. "fmt"
  7. "go-admin/app/observe/models"
  8. "go-admin/app/observe/models/query"
  9. "go-admin/common/database"
  10. "go-admin/common/olap"
  11. "go-admin/common/opentelemetry"
  12. "go-admin/common/storage"
  13. "go-admin/common/utils"
  14. extConfig "go-admin/config"
  15. "strings"
  16. "time"
  17. "github.com/go-admin-team/go-admin-core/config/source/file"
  18. log "github.com/go-admin-team/go-admin-core/logger"
  19. "github.com/go-admin-team/go-admin-core/sdk"
  20. "github.com/go-admin-team/go-admin-core/sdk/config"
  21. "github.com/go-redis/redis/v7"
  22. "github.com/spf13/cobra"
  23. "gorm.io/gorm"
  24. "gorm.io/gorm/clause"
  25. )
  26. var configYml string
  27. var rebuilt bool
  28. var chdb *gorm.DB
  29. var mydb *gorm.DB
  30. var rdb *redis.Client
  31. // consumer/biz/bizCmd represents the consumer/biz/biz command
  32. var BizCmd = &cobra.Command{
  33. Use: "biz",
  34. Short: "生成业务",
  35. PreRun: func(cmd *cobra.Command, args []string) {
  36. config.ExtendConfig = &extConfig.ExtConfig
  37. config.Setup(
  38. file.NewSource(file.WithPath(configYml)),
  39. database.SetupWithoutOtel,
  40. storage.Setup,
  41. // olap.Setup,
  42. // opentelemetry.Setup,
  43. )
  44. },
  45. Run: func(cmd *cobra.Command, args []string) {
  46. run()
  47. },
  48. }
  49. func init() {
  50. BizCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "启动命令时指定的配置文件, 默认为config/settings.yml")
  51. BizCmd.Flags().BoolVar(&rebuilt, "rebuilt", false, "如果指定了这个参数,先清空biz相关表与缓存,来重建biz相关表数据")
  52. }
  53. func ReBuilt() error {
  54. if !rebuilt {
  55. return nil
  56. }
  57. return mydb.Transaction(func(tx *gorm.DB) error {
  58. if err := tx.Exec("truncate ot_biz").Error; err != nil {
  59. return err
  60. }
  61. if err := tx.Exec("truncate ot_biz_node").Error; err != nil {
  62. return err
  63. }
  64. if err := tx.Exec("truncate ot_biz_edge").Error; err != nil {
  65. return err
  66. }
  67. // if err := tx.Exec("truncate ot_biz_stats").Error; err != nil {
  68. // return err
  69. // }
  70. keys, err := rdb.Keys("observe__biz*").Result()
  71. if err != nil {
  72. return err
  73. }
  74. pipe := rdb.Pipeline()
  75. for _, key := range keys {
  76. pipe.Del(key)
  77. }
  78. if _, err = pipe.Exec(); err != nil {
  79. return err
  80. }
  81. log.Info("重建biz初始化数据库成功")
  82. return nil
  83. })
  84. }
  85. func getMysqlDB() *gorm.DB {
  86. host := "*"
  87. mysql := sdk.Runtime.GetDbByKey(host)
  88. if config.DatabasesConfig[host].Driver == "mysql" {
  89. mysql.Set("gorm:table_options", "ENGINE=InnoDB CHARSET=utf8mb4")
  90. }
  91. return mysql
  92. }
  93. func run() {
  94. rdb = config.GetRedisClient()
  95. mydb = getMysqlDB()
  96. chdb = olap.GetClickhouseOrm()
  97. if err := ReBuilt(); err != nil {
  98. panic(fmt.Sprintf("重建biz, 初始化失败: %s", err))
  99. }
  100. GenBiz()
  101. }
  102. type tinySpan struct {
  103. TraceId string `gorm:"column:TraceId;not null"`
  104. Timestamp time.Time `gorm:"column:Timestamp"`
  105. SpanId string `gorm:"column:SpanId;not null"`
  106. SpanName string `gorm:"column:SpanName;not null"`
  107. ParentSpanId string `gorm:"column:ParentSpanId;not null"`
  108. SpanKind string `gorm:"column:SpanKind;not null"`
  109. SpanAttributes map[string]string `gorm:"column:SpanAttributes;type:Map(LowCardinality(String), String);not null"`
  110. StatusCode string `gorm:"column:StatusCode;not null"`
  111. Duration int64 `gorm:"column:Duration;not null"`
  112. ServiceName string `gorm:"column:ServiceName;not null"`
  113. AppAlias string `gorm:"column:AppAlias;not null"`
  114. }
  115. func getSpanType(span tinySpan) string {
  116. spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttributes)
  117. if span.SpanKind == "SPAN_KIND_INTERNAL" {
  118. return "internal"
  119. } else if span.SpanKind == "SPAN_KIND_SERVER" {
  120. if spanAttrs.RpcSystem() != "" {
  121. return fmt.Sprintf("%s server", spanAttrs.RpcSystem())
  122. } else if spanAttrs.StatusCode() >= 200 {
  123. return "http server"
  124. }
  125. return "server"
  126. } else if span.SpanKind == "SPAN_KIND_CLIENT" {
  127. if spanAttrs.IsDB() {
  128. if spanAttrs.DbSystem() != "" {
  129. return fmt.Sprintf("%s client", spanAttrs.DbSystem())
  130. } else {
  131. return "database client"
  132. }
  133. } else if spanAttrs.RpcSystem() != "" {
  134. return fmt.Sprintf("%s client", spanAttrs.RpcSystem())
  135. } else if spanAttrs.StatusCode() >= 200 {
  136. return "http client"
  137. }
  138. return "client"
  139. } else if span.SpanKind == "SPAN_KIND_CONSUMER" || span.SpanKind == "SPAN_KIND_PRODUCER" {
  140. arr := strings.Split(span.SpanKind, "_")
  141. tail := strings.ToLower(arr[len(arr)-1])
  142. head := "messaging"
  143. if spanAttrs.MessagingSystem() != "" {
  144. head = spanAttrs.MessagingSystem()
  145. }
  146. return fmt.Sprintf("%s %s", head, tail)
  147. }
  148. return "unknown"
  149. }
  150. // 生成业务相关数据
  151. func GenBiz() {
  152. // 1. 获取当前时间前5到前6分钟内所有的AppAlias
  153. // 2. 针对每个AppAlias,取前n条TraceId
  154. // 3. 针对每个TraceId, 取出所有span 并 分析出边关系
  155. for {
  156. now := time.Now().Unix()
  157. start, end := now-360, now-300
  158. earliest := start - 600
  159. appAliases := []string{}
  160. if err := chdb.Debug().Table(models.TableNameTrace).Distinct("AppAlias").
  161. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start, end).
  162. Where("ParentSpanId=''").Pluck("AppAlias", &appAliases).Error; err != nil {
  163. log.Errorf("查询AppAlias失败: %s", err)
  164. continue
  165. }
  166. for _, appAlias := range appAliases {
  167. appId, _ := query.NewApp().Alias2ID(appAlias) // 在未在后台设置的情况下 appId可能存在0的情况
  168. traceIds := []string{}
  169. if err := chdb.Table(models.TableNameTrace).Distinct("TraceId").
  170. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start, end).
  171. Where("AppAlias", appAlias).
  172. Where("ParentSpanId=''").
  173. Where("SpanAttributes['db.system']=''"). // 过滤root span为sql查询的
  174. Limit(10).Pluck("TraceId", &traceIds).Error; err != nil {
  175. log.Errorf("查询TraceId失败: %s", err)
  176. continue
  177. }
  178. for _, traceId := range traceIds {
  179. tinySpans := []tinySpan{}
  180. if err := chdb.Table(models.TableNameTrace).Select([]string{
  181. "TraceId", "Timestamp", "SpanId", "SpanName", "ParentSpanId", "SpanKind", "SpanAttributes", "StatusCode", "Duration", "ServiceName", "AppAlias",
  182. }).
  183. Where("Timestamp>=toDateTime(?)", earliest).
  184. Where("TraceId", traceId).
  185. Order("TraceId ASC, Timestamp ASC").Find(&tinySpans).Error; err != nil {
  186. log.Errorf("查询spans失败:%s %s", traceId, err)
  187. continue
  188. }
  189. // 过滤不必要的仅有一个span的业务
  190. if len(tinySpans) == 1 {
  191. span := tinySpans[0]
  192. if span.SpanKind == "SPAN_KIND_INTERNAL" { // 这种业务看起来没有意义
  193. log.Debugf("跳过仅有一个span,且span kind为internal的数据:%s %s", span.SpanName, span.TraceId)
  194. continue
  195. }
  196. }
  197. tinySpanMap := map[string]tinySpan{}
  198. var rootSpanId string
  199. for _, ts := range tinySpans {
  200. tinySpanMap[ts.SpanId] = ts
  201. if ts.ParentSpanId == "" {
  202. rootSpanId = ts.SpanId
  203. }
  204. }
  205. if rootSpanId == "" {
  206. // 由于前端是通过ParentSpanId为空查找到,所以不会走到这里
  207. log.Errorf("未获取到root span: %s", traceId)
  208. continue
  209. }
  210. rootSpan := tinySpanMap[rootSpanId]
  211. // 获取bizId
  212. bizId, bizHash, err := query.NewBiz().CheckAddBiz(appId, appAlias, rootSpan.ServiceName, rootSpan.SpanName, rootSpan.SpanKind, rootSpan.SpanAttributes)
  213. if err != nil {
  214. log.Errorf("创建业务失败:%s", err)
  215. continue
  216. }
  217. edges := []models.BizEdge{}
  218. for _, tinySpan := range tinySpans {
  219. targetNode := genBizNode(appId, appAlias, tinySpan)
  220. targetId, err := query.NewBizNode().CheckAddBizNode(&targetNode)
  221. if err != nil {
  222. log.Errorf("添加目标结点失败:%s", err)
  223. continue
  224. }
  225. if tinySpan.ParentSpanId == "" {
  226. // 如果是root, 其上游添加一个源结点
  227. // 主要用于当只一个结点时 找不到边数据(因为结点不与业务直接关联了, 而边与业务直接关联)问题
  228. edges = append(edges, models.BizEdge{
  229. Source: 0,
  230. Target: targetId,
  231. Type: "",
  232. BizId: bizId,
  233. BizHash: bizHash,
  234. AppId: appId,
  235. AppAlias: appAlias,
  236. })
  237. continue
  238. }
  239. tinySpan2, ok := tinySpanMap[tinySpan.ParentSpanId]
  240. if !ok {
  241. log.Infof("未找到源结点:parent span id = %s", tinySpan.ParentSpanId)
  242. continue
  243. }
  244. sourceNode := genBizNode(appId, appAlias, tinySpan2)
  245. sourceId, err := query.NewBizNode().CheckAddBizNode(&sourceNode)
  246. if err != nil {
  247. log.Errorf("添加源结点失败:%s", err)
  248. continue
  249. }
  250. if sourceId == targetId {
  251. log.Warnf("特殊情况,源结点与目标结点一致,暂时忽略:%s %s", tinySpan.SpanId, tinySpan2.SpanId)
  252. continue
  253. }
  254. edges = append(edges, models.BizEdge{
  255. Source: sourceId,
  256. Target: targetId,
  257. Type: getRelation(tinySpan),
  258. BizId: bizId,
  259. BizHash: bizHash,
  260. AppId: appId,
  261. AppAlias: appAlias,
  262. })
  263. }
  264. log.Infof("trace id: %s, edge len: %d", traceId, len(edges))
  265. edgeCreate(edges)
  266. }
  267. }
  268. diff := time.Now().Unix() - now
  269. log.Infof("耗时:%ds", diff)
  270. if diff < 60 {
  271. time.Sleep(time.Second * time.Duration(60-diff))
  272. }
  273. }
  274. }
  275. // 获取当前结点与下级结点间的关系
  276. func getRelation(span tinySpan) (relation string) {
  277. spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttributes)
  278. if span.SpanKind == "SPAN_KIND_INTERNAL" { // 当前span为internal, 则上级与当前的关系为internal
  279. relation = "internal"
  280. } else if span.SpanKind == "SPAN_KIND_CLIENT" { // 当前span为client, 上级可能为server或internal,与上级间的关系为internal
  281. relation = "internal"
  282. } else if span.SpanKind == "SPAN_KIND_SERVER" { // 当前span为server,上级必为client,由于当前span不是root,所以上级必定存在
  283. relation = "http"
  284. if span.SpanAttributes["rpc.system"] != "" {
  285. relation = span.SpanAttributes["rpc.system"]
  286. }
  287. } else if span.SpanKind == "SPAN_KIND_CONSUMER" { // 当前span为consumer,上级必为producer
  288. // relation = span.SpanAttributes["messaging.system"]
  289. relation = fmt.Sprintf("%s(%s)", spanAttrs.MessagingSystem(), spanAttrs.MessagingOperation())
  290. } else if span.SpanKind == "SPAN_KIND_PRODUCER" { // 当前span为producer, 上级可能为server或internal,与上级的关系肯定为内部调用
  291. // relation = "internal"
  292. // 为了方便后面清洗node与edge,将当前span与producer span之间的关系定义为 system(operation)
  293. relation = fmt.Sprintf("%s(%s)", spanAttrs.MessagingSystem(), spanAttrs.MessagingOperation())
  294. } else {
  295. relation = "unknown"
  296. log.Warnf("未识别的上下游关系:%s", spanAttrs)
  297. }
  298. return
  299. }
  300. func genBizNode(appId int64, appAlias string, tinySpan tinySpan) models.BizNode {
  301. externalId := 0
  302. if tinySpan.SpanKind == "SPAN_KIND_SERVER" {
  303. spanAttrs := opentelemetry.NewSpanAttributes(tinySpan.SpanAttributes)
  304. if spanAttrs.RpcSystem() == "" && spanAttrs.StatusCode() >= 200 {
  305. externalId = query.NewUrlMapping().UrlMappingID(appAlias, tinySpan.ServiceName, spanAttrs.HttpMethod(), spanAttrs.HttpRoute())
  306. }
  307. }
  308. return models.BizNode{
  309. Name: tinySpan.SpanName,
  310. Hash: utils.SimpleHash(tinySpan.AppAlias, tinySpan.ServiceName, tinySpan.SpanName, tinySpan.SpanKind),
  311. // Type: getNodeType(source.Span, spanMap), // 废弃
  312. // 废弃一下两个字段, node不与业务关连,edge再与业务关联,因为不同业务可能会有相同的结点
  313. // BizID: bzId,
  314. // BizHash: bzHash,
  315. AppID: appId,
  316. AppAlias: appAlias,
  317. ExternalID: int64(externalId),
  318. ServiceName: tinySpan.ServiceName,
  319. SpanName: tinySpan.SpanName,
  320. SpanKind: tinySpan.SpanKind,
  321. SpanType: getSpanType(tinySpan),
  322. IsVirtual: 0,
  323. }
  324. }
  325. func (e EdgeRaw) EdgeType() string {
  326. return ""
  327. }
  328. type EdgeRaw struct {
  329. SourceAppAlias string
  330. SourceServiceName string
  331. SourceSpanName string
  332. SourceSpanKind string
  333. TargetAppAlias string
  334. TargetServiceName string
  335. TargetSpanName string
  336. TargetSpanKind string
  337. TargetSpanAttributes map[string]string
  338. }
  339. const edgeCacheKey = "observe__biz_edges"
  340. func edgeCreate(edges []models.BizEdge) {
  341. if len(edges) == 0 {
  342. return
  343. }
  344. newEdges := []models.BizEdge{}
  345. uniq := map[string]struct{}{}
  346. for _, edge := range edges {
  347. if !edgeExists(edge.AppAlias, edge.BizId, edge.Source, edge.Target) {
  348. key := fmt.Sprintf("%d-%d-%d", edge.Source, edge.Target, edge.AppId)
  349. if _, ok := uniq[key]; !ok {
  350. newEdges = append(newEdges, edge)
  351. uniq[key] = struct{}{}
  352. }
  353. }
  354. }
  355. if len(newEdges) == 0 {
  356. return
  357. }
  358. result := mydb.Model(&models.BizEdge{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newEdges)
  359. if result.Error != nil {
  360. log.Errorf("批量插入边数据失败:%s", result.Error)
  361. for _, edge := range newEdges {
  362. fmt.Println(edge.BizId, edge.Source, edge.Target)
  363. }
  364. return
  365. }
  366. pipe := rdb.Pipeline()
  367. for _, edge := range newEdges {
  368. pipe.SAdd(edgeCacheKey, edgeMember(edge.AppAlias, edge.BizId, edge.Source, edge.Target))
  369. }
  370. // pipe.Expire(edgeCacheKey, time.Minute)
  371. pipe.Exec()
  372. dur, _ := rdb.TTL(edgeCacheKey).Result()
  373. if dur == -1 {
  374. rdb.Expire(edgeCacheKey, time.Minute)
  375. }
  376. }
  377. // 检测edge是否存在
  378. func edgeExists(appAlias string, bizId, sourceId, targetId int64) bool {
  379. exists, _ := rdb.SIsMember(edgeCacheKey, edgeMember(appAlias, bizId, sourceId, targetId)).Result()
  380. if !exists {
  381. cnt := int64(0)
  382. mydb.Model(&models.BizEdge{}).Where("app_alias", appAlias).Where("biz_id", bizId).
  383. Where("source", sourceId).Where("target", targetId).Count(&cnt)
  384. if cnt > 0 {
  385. rdb.SAdd(edgeCacheKey, edgeMember(appAlias, bizId, sourceId, targetId))
  386. }
  387. return cnt > 0
  388. }
  389. return exists
  390. }
  391. func edgeMember(appAlias string, bizId, sourceId, targetId int64) string {
  392. return fmt.Sprintf("%s-%d-%d-%d", appAlias, bizId, sourceId, targetId)
  393. }