biz.go 33 KB


  1. package service
  2. import (
  3. "fmt"
  4. "go-admin/app/observe/models"
  5. "go-admin/app/observe/models/query"
  6. "go-admin/app/observe/service/dto"
  7. cDto "go-admin/common/dto"
  8. "go-admin/common/prometheus"
  9. cUtils "go-admin/common/utils"
  10. "go-admin/config"
  11. "go-admin/utils"
  12. "math"
  13. "strings"
  14. "sync"
  15. "time"
  16. adModels "go-admin/app/admin/models"
  17. log "github.com/go-admin-team/go-admin-core/logger"
  18. "github.com/pkg/errors"
  19. )
  20. type Biz struct {
  21. // service.Service
  22. utils.OtService
  23. }
  24. func (b *Biz) List(req *dto.BizListReq, resp *[]dto.BizListResp, count *int64) error {
  25. bizIds := []int64{}
  26. if req.AppAlias != "" && (req.Route != "" || req.ServiceName != "") {
  27. db := b.Orm.Model(&models.BizNode{}).
  28. Where("ot_biz_node.app_alias", req.AppAlias)
  29. if req.Route != "" {
  30. db.Where("span_kind='SPAN_KIND_SERVER'").Where("span_name like ?", "%"+req.Route)
  31. }
  32. if req.ServiceName != "" {
  33. serviceNames := strings.Split(req.ServiceName, ",")
  34. db.Where("service_name in ?", serviceNames)
  35. }
  36. err := db.Joins("inner join ot_biz_edge be on be.source=ot_biz_node.id or be.target=ot_biz_node.id").
  37. Pluck("be.biz_id", &bizIds).Error
  38. if err != nil {
  39. return errors.Wrap(err, "获取业务id失败")
  40. }
  41. }
  42. list := []models.Biz{}
  43. db := b.Orm.Model(&models.Biz{}).
  44. Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex()))
  45. if len(bizIds) > 0 {
  46. db.Where("id in ?", bizIds)
  47. }
  48. if req.AppAlias != "" {
  49. db.Where("ot_biz.app_alias", req.AppAlias)
  50. } else { // 兼容旧逻辑,后期删除
  51. db.Where("app_id", req.AppId)
  52. }
  53. if err := db.
  54. Order("favor DESC, id ASC").Find(&list).
  55. Limit(-1).Offset(-1).Count(count).Error; err != nil {
  56. return errors.Wrap(err, "获取业务列表失败")
  57. }
  58. if len(list) == 0 {
  59. return nil
  60. }
  61. svcMap, err := query.NewService().ServiceMap(list[0].AppAlias)
  62. if err != nil {
  63. return err
  64. }
  65. for _, item := range list {
  66. r := new(dto.BizListResp)
  67. r.ID = int64(item.ID)
  68. r.Name = item.Name
  69. r.AppId = item.AppID
  70. r.ServiceName = item.ServiceName
  71. r.ServiceNameCN = svcMap[item.ServiceName]
  72. r.SpanName = item.SpanName
  73. r.Favor = int8(item.Favor)
  74. *resp = append(*resp, *r)
  75. }
  76. return nil
  77. }
  78. func (s *Biz) StatsFromClickhouse(req *dto.BizStatsReq, resp *dto.BizStatsResp) error {
  79. biz := models.Biz{}
  80. if err := s.Orm.Model(&models.Biz{}).
  81. Where("id", req.BizId).
  82. First(&biz).Error; err != nil {
  83. return errors.Wrap(err, "获取业务失败")
  84. }
  85. req.CheckFilling(time.Minute * 5)
  86. if err := s.ChOrm.Model(&models.Trace{}).
  87. Select("ServiceName, count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, max(Duration)/1e6 as Max, avg(Duration)/1e6 as Avg").
  88. Where("AppAlias", biz.AppAlias).
  89. Where("ServiceName", biz.ServiceName).
  90. Where("SpanName", biz.SpanName).
  91. Where("ParentSpanId", "").
  92. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  93. Group("ServiceName").Find(resp).Error; err != nil {
  94. return errors.Wrap(err, "获取基础统计信息失败")
  95. }
  96. resp.BizId = biz.ID
  97. resp.BizHash = biz.Hash
  98. if resp.Total > 0 {
  99. resp.ErrorRate = float64(resp.ErrorNum) / float64(resp.Total)
  100. }
  101. resp.Rpm = math.Round(float64(resp.Total)/float64((req.EndTime-req.StartTime)/60)*100) / 100
  102. resp.Max = math.Round(resp.Max*100) / 100
  103. resp.Avg = math.Round(resp.Avg*100) / 100
  104. timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  105. if req.EndTime-req.StartTime >= 60*60 {
  106. timeField = "formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  107. }
  108. fields := []string{
  109. timeField,
  110. "quantile(0.5)(Duration)/1e6 as P50Duration",
  111. "quantile(0.90)(Duration)/1e6 as P90Duration",
  112. "quantile(0.99)(Duration)/1e6 as P99Duration",
  113. }
  114. quantiles := []struct {
  115. StartTime string
  116. P50Duration float64
  117. P90Duration float64
  118. P99Duration float64
  119. }{}
  120. if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
  121. Where("AppAlias", biz.AppAlias).
  122. Where("ServiceName", biz.ServiceName).
  123. Where("SpanName", biz.SpanName).
  124. Where("ParentSpanId", "").
  125. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  126. Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
  127. return errors.Wrap(err, "获取分位数数据失败")
  128. }
  129. resp.Quantiles = dto.Quantiles{
  130. Time: make([]string, len(quantiles)),
  131. P50: make([]float64, len(quantiles)),
  132. P90: make([]float64, len(quantiles)),
  133. P99: make([]float64, len(quantiles)),
  134. }
  135. for i, quantile := range quantiles {
  136. resp.Quantiles.Time[i] = quantile.StartTime
  137. resp.Quantiles.P50[i] = math.Round(quantile.P50Duration*100) / 100
  138. resp.Quantiles.P90[i] = math.Round(quantile.P90Duration*100) / 100
  139. resp.Quantiles.P99[i] = math.Round(quantile.P99Duration*100) / 100
  140. }
  141. return nil
  142. }
  143. func (s *Biz) Stats(req *dto.BizStatsReq, resp *dto.BizStatsResp) error {
  144. biz := models.Biz{}
  145. if err := s.Orm.Model(&models.Biz{}).
  146. Where("id", req.BizId).
  147. First(&biz).Error; err != nil {
  148. return errors.Wrap(err, "获取业务失败")
  149. }
  150. req.BizHash = biz.Hash // 后面bizHash从参数中传
  151. wg := sync.WaitGroup{}
  152. req.CheckFilling(time.Minute * 5)
  153. mins := (req.EndTime - req.StartTime) / 60
  154. metric := "observe_biz_node_duration_milliseconds"
  155. labels := map[string]string{"biz_hash": req.BizHash}
  156. ts := time.Unix(req.EndTime, 0)
  157. tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
  158. // 计算错误
  159. errorRate := float64(0)
  160. wg.Add(1)
  161. go tb.ErrorRate(&errorRate)
  162. // 计算分位数
  163. times := []string{}
  164. p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
  165. wg.Add(3)
  166. go tb.QuantileMinutes(0.5, &[]string{}, &p50s)
  167. go tb.QuantileMinutes(0.9, &[]string{}, &p90s)
  168. go tb.QuantileMinutes(0.99, &times, &p99s)
  169. appAlias := ""
  170. if err := s.Orm.Model(&adModels.OtApps{}).Where("alias", biz.AppAlias).Pluck("alias", &appAlias).Error; err != nil {
  171. return errors.Wrap(err, "获取应用别名失败")
  172. }
  173. // 计算 max duration, min duration, avg duration, 从clickhouse读取,效率较低,但max min在prometheus不好算,尤其是min,应该是算不出来从prometheus
  174. // wg.Add(1)
  175. // st := struct {
  176. // MaxDuration float64
  177. // MinDuration float64
  178. // AvgDuration float64
  179. // }{}
  180. // go func() {
  181. // defer wg.Done()
  182. // s.ChOrm.Table(models.TableNameMetricsHistogram).
  183. // Select("avg(Sum/Count) AvgDuration, min(Min) MinDuration , max(Max) MaxDuration").
  184. // Where("MetricName", "observe.biz.duration").
  185. // Where(`Attributes['app.id']=?`, strconv.Itoa(int(biz.ID))).
  186. // Where("TimeUnix>=? AND TimeUnix<?", req.StartTime, req.EndTime).
  187. // Find(&st)
  188. // }()
  189. // 获取max duration
  190. wg.Add(1)
  191. maxDuration := float64(0)
  192. // go tb.Quantile(1, &maxDuration)
  193. gauge := prometheus.NewGauge(&wg, "observe_biz_node_duration_max_milliseconds", labels, ts, mins)
  194. go gauge.Max(&maxDuration)
  195. // 获取avg duration
  196. wg.Add(1)
  197. avgDuration := float64(0)
  198. go tb.Avg(&avgDuration)
  199. // 记录 total
  200. wg.Add(1)
  201. total := int64(0)
  202. go tb.Total(&total)
  203. wg.Wait()
  204. resp.BizId = int64(biz.ID)
  205. resp.BizHash = req.BizHash
  206. // resp.Total = st.Total
  207. resp.Total = total
  208. resp.Rpm = math.Round(float64(resp.Total)/float64((mins))*100) / 100
  209. if maxDuration > 0 {
  210. resp.Max = maxDuration
  211. }
  212. // resp.Min = st.MinDuration
  213. resp.Avg = avgDuration
  214. if math.IsNaN(resp.Avg) {
  215. resp.Avg = 0
  216. }
  217. resp.ErrorRate = errorRate // 错误率的计算公式可能有问题
  218. resp.ErrorNum = int64(errorRate * float64(resp.Total))
  219. resp.Quantiles.Time = times
  220. resp.Quantiles.P50 = p50s
  221. resp.Quantiles.P90 = p90s
  222. resp.Quantiles.P99 = p99s
  223. return nil
  224. }
  225. func (s *Biz) Detail(req *dto.BizDetailReq, resp *[]dto.BizDetailResp, count *int64) error {
  226. biz := models.Biz{}
  227. if err := s.Orm.Model(&models.Biz{}).Where("id", req.BizId).First(&biz).Error; err != nil {
  228. return errors.Wrap(err, "业务ID不存在")
  229. }
  230. appAlias := ""
  231. if err := s.Orm.Model(&adModels.OtApps{}).Where("id", biz.AppID).Pluck("alias", &appAlias).Error; err != nil {
  232. return errors.Wrap(err, "获取应用别名失败")
  233. }
  234. req.CheckFilling(time.Minute * 5)
  235. startTime, endTime := req.StartTime, req.EndTime
  236. if endTime-startTime > 1800 {
  237. startTime = endTime - 1800
  238. }
  239. for *count == 0 {
  240. traceList := []struct {
  241. Timestamp time.Time
  242. TraceId string
  243. SpanId string
  244. Duration int64
  245. }{}
  246. db := s.ChOrm.Debug().Table(models.TableNameTrace).
  247. Select("Timestamp, TraceId, SpanId, Duration").
  248. Where("AppAlias", appAlias).
  249. Where("ServiceName", biz.ServiceName).
  250. Where("SpanName", biz.SpanName).
  251. Where("SpanKind", biz.SpanKind).
  252. Where("ParentSpanId=''").
  253. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime)
  254. if req.MinDuration > 0 {
  255. db.Where("Duration>?", req.MinDuration*float64(time.Millisecond))
  256. }
  257. if req.MaxDuration > 0 {
  258. db.Where("Duration<?", req.MaxDuration*float64(time.Millisecond))
  259. }
  260. if req.OnlyException {
  261. db.Where("StatusCode", "STATUS_CODE_ERROR")
  262. }
  263. db.Count(count)
  264. if *count == 0 {
  265. if startTime <= req.StartTime {
  266. break
  267. }
  268. diff := endTime - startTime
  269. endTime = startTime
  270. startTime = startTime - diff*2
  271. if startTime < req.StartTime {
  272. startTime = req.StartTime
  273. }
  274. continue
  275. }
  276. if err := db.Order(req.OrderBy([]string{"Duration", "Timestamp"}, "Duration", "DESC")).
  277. Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).
  278. Find(&traceList).Offset(-1).Limit(-1).Count(count).Error; err != nil {
  279. return errors.Wrap(err, "获取业务详情失败")
  280. }
  281. *resp = make([]dto.BizDetailResp, len(traceList))
  282. for i, trace := range traceList {
  283. (*resp)[i] = dto.BizDetailResp{
  284. Datetime: trace.Timestamp.Local().Format(time.DateTime),
  285. Timestamp: trace.Timestamp.Unix(),
  286. TraceId: trace.TraceId,
  287. SpanId: trace.SpanId,
  288. SpanName: biz.SpanName,
  289. Duration: int64(math.Round(float64(trace.Duration / 1e6))),
  290. }
  291. }
  292. }
  293. return nil
  294. }
  295. func (s *Biz) Graph(req *dto.BizGraphReq, resp *[]dto.BizGraphResp) error {
  296. biz := models.Biz{}
  297. if err := s.Orm.First(&biz, req.BizId).Error; err != nil {
  298. return errors.Wrap(err, "获取业务数据失败")
  299. }
  300. edges := []models.BizEdge{}
  301. if err := s.Orm.Model(&models.BizEdge{}).Where("biz_id", req.BizId).Where("app_id", biz.AppID).Find(&edges).Error; err != nil {
  302. return errors.Wrap(err, "获取边数据失败")
  303. }
  304. if len(edges) == 0 {
  305. return errors.New("边数据不存在")
  306. }
  307. nodeIdMap := map[int64]struct{}{}
  308. for _, edge := range edges {
  309. if edge.Source > 0 {
  310. nodeIdMap[edge.Source] = struct{}{}
  311. }
  312. nodeIdMap[edge.Target] = struct{}{}
  313. }
  314. nodeIds := []int64{}
  315. for nodeId := range nodeIdMap {
  316. nodeIds = append(nodeIds, nodeId)
  317. }
  318. nodes := []models.BizNode{}
  319. if err := s.Orm.Model(&models.BizNode{}).Find(&nodes, nodeIds).Error; err != nil {
  320. return errors.Wrap(err, "获取结点数据失败")
  321. }
  322. if len(nodes) == 0 {
  323. return errors.New("结点数据不存在")
  324. }
  325. appAlias := ""
  326. if err := s.Orm.Model(&models.App{}).Where("id", biz.AppID).Pluck("alias", &appAlias).Error; err != nil {
  327. return errors.New("app不存在")
  328. }
  329. svcNodes := []struct {
  330. ServiceName string
  331. Name string
  332. }{}
  333. if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&svcNodes).Error; err != nil {
  334. return errors.Wrap(err, "获取服务列表失败")
  335. }
  336. svcMap := map[string]string{}
  337. for _, node := range svcNodes {
  338. svcMap[node.ServiceName] = node.Name
  339. }
  340. req.CheckFilling(time.Hour)
  341. nodeMap := map[int64]models.BizNode{} // nodeId => node
  342. for _, node := range nodes {
  343. nodeMap[node.ID] = node
  344. }
  345. sourceNodeds := map[int64]struct{}{}
  346. targetNodes := map[int64]struct{}{}
  347. sourceTargets := map[int64][]int64{} // sourceId => targetIds
  348. heads := map[int64]struct{}{} // 头结点id,正常情况下仅有一个
  349. for _, edge := range edges {
  350. if edge.Source > 0 {
  351. sourceNodeds[edge.Source] = struct{}{}
  352. targetNodes[edge.Target] = struct{}{}
  353. sourceTargets[edge.Source] = append(sourceTargets[edge.Source], edge.Target)
  354. } else {
  355. heads[edge.Target] = struct{}{}
  356. }
  357. }
  358. // 正常情况下, heads已经存在一个, 这里仅当业务决策树不完整时才会向heads中继续添加
  359. for source := range sourceNodeds {
  360. if _, ok := targetNodes[source]; !ok {
  361. heads[source] = struct{}{}
  362. }
  363. }
  364. if len(heads) == 0 { // 未找到头结点, 为了显示数据,往heads中加入第一个node结点的id
  365. heads[nodes[0].ID] = struct{}{}
  366. }
  367. wg := &sync.WaitGroup{}
  368. var genTree func(root *dto.BizGraphResp)
  369. genTree = func(root *dto.BizGraphResp) {
  370. wg.Add(1)
  371. root.Stats = new(dto.BizGraphStatsResp) // 一定要写在外面, 在方法内new的话,有的地方获取不到
  372. go s.graphStats(wg, appAlias, req.StartTime, req.EndTime, root)
  373. // s.graphStats(wg, appAlias, req.StartTime, req.EndTime, root)
  374. children, ok := sourceTargets[root.ID]
  375. // fmt.Println("-------", ok, len(children), children, root.BizNode.ID)
  376. if !ok || len(children) == 0 {
  377. root.MaxDepth = 1
  378. return
  379. }
  380. maxDepth := int64(0)
  381. for _, child := range children {
  382. node := new(dto.BizGraphResp)
  383. // if node.BizNode, ok = nodeMap[child]; ok {
  384. if node2, ok := nodeMap[child]; ok {
  385. node.ID = node2.ID
  386. node.Name = node2.Name
  387. node.UniqueID = fmt.Sprintf("%s-%d", root.UniqueID, node2.ID)
  388. node.Type = node2.Type
  389. node.BizHash = node2.BizHash
  390. node.AppAlias = node2.AppAlias
  391. node.ServiceName = node2.ServiceName
  392. node.ServiceNameCN = svcMap[node.ServiceName]
  393. node.SpanName = node2.SpanName
  394. node.SpanKind = node2.SpanKind
  395. node.SpanType = node2.SpanType
  396. node.IsVirtual = node2.IsVirtual
  397. genTree(node)
  398. root.Children = append(root.Children, node)
  399. if node.MaxDepth > maxDepth {
  400. maxDepth = node.MaxDepth
  401. }
  402. }
  403. }
  404. root.MaxDepth = 1 + maxDepth
  405. }
  406. for head := range heads {
  407. root := new(dto.BizGraphResp)
  408. if node, ok := nodeMap[head]; ok {
  409. // root.BizNode = node
  410. root.ID = node.ID
  411. root.Name = node.Name
  412. root.UniqueID = fmt.Sprintf("%d", node.ID)
  413. root.Type = node.Type
  414. root.BizHash = node.BizHash
  415. root.AppAlias = node.AppAlias
  416. root.ServiceName = node.ServiceName
  417. root.ServiceNameCN = svcMap[node.ServiceName]
  418. root.SpanName = node.SpanName
  419. root.SpanKind = node.SpanKind
  420. root.SpanType = node.SpanType
  421. root.IsVirtual = node.IsVirtual
  422. genTree(root)
  423. *resp = append(*resp, *root)
  424. }
  425. }
  426. wg.Wait()
  427. return nil
  428. }
  429. func (s *Biz) GraphStats(req *dto.BizGraphStatsReq, resp *dto.BizGraphStatsResp) error {
  430. bizNode := models.BizNode{}
  431. if err := s.Orm.Model(&models.BizNode{}).Where("id", req.BizNodeId).First(&bizNode).Error; err != nil {
  432. return errors.Wrap(err, "获取业务结点信息失败")
  433. }
  434. appAlias := ""
  435. if err := s.Orm.Model(&adModels.OtApps{}).Where("id", bizNode.AppID).Pluck("alias", &appAlias).Error; err != nil {
  436. return errors.Wrap(err, "获取应用别名失败")
  437. }
  438. type stats struct {
  439. Duration float64
  440. SuccessRate float64
  441. }
  442. st1, st2 := stats{}, stats{}
  443. errch := make(chan error, 2)
  444. genStats := func(wg *sync.WaitGroup, ch chan error, startTime, endTime int64, st *stats) {
  445. defer wg.Done()
  446. fields := "quantile(0.5)(Duration) AS Duration, SUM(IF(StatusCode!='STATUS_CODE_ERROR', 1, 0))/COUNT() AS SuccessRate"
  447. if err := s.ChOrm.Table(models.TableNameTrace).
  448. Select(fields).
  449. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime).
  450. Where("AppAlias", appAlias).
  451. Where("ServiceName", bizNode.ServiceName).
  452. Where("SpanName", bizNode.SpanName).
  453. Where("SpanKind", bizNode.SpanKind).
  454. Group("ServiceName, SpanName, SpanKind").
  455. Find(&st).Error; err != nil {
  456. ch <- errors.Wrap(err, "获取统计数据失败")
  457. return
  458. }
  459. }
  460. req.CheckFilling(time.Hour)
  461. wg := &sync.WaitGroup{}
  462. wg.Add(2)
  463. diff := req.EndTime - req.StartTime
  464. go genStats(wg, errch, req.StartTime, req.EndTime, &st1)
  465. go genStats(wg, errch, req.StartTime-diff, req.EndTime-diff, &st2)
  466. wg.Wait()
  467. close(errch)
  468. if err := <-errch; err != nil {
  469. return err
  470. }
  471. resp.BizNodeId = req.BizNodeId
  472. resp.Duration = st1.Duration / 1e6
  473. resp.SuccessRate = st1.SuccessRate
  474. resp.SuccessRateUp = st1.SuccessRate >= st2.SuccessRate
  475. return nil
  476. }
  477. func (s *Biz) graphStats(wg *sync.WaitGroup, appAlias string, startTime, endTime int64, root *dto.BizGraphResp) error {
  478. defer wg.Done()
  479. type stats struct {
  480. Duration float64
  481. SuccessRate float64
  482. }
  483. st1, st2 := stats{}, stats{}
  484. errch := make(chan error, 2)
  485. genStats := func(wg *sync.WaitGroup, startTime, endTime int64, st *stats) {
  486. defer wg.Done()
  487. fields := "quantile(0.5)(Duration) / 1e6 AS Duration, SUM(IF(StatusCode!='STATUS_CODE_ERROR', 1, 0))/COUNT() AS SuccessRate"
  488. if err := s.ChOrm.Table(models.TableNameTrace).
  489. Select(fields).
  490. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime).
  491. Where("AppAlias", appAlias).
  492. Where("ServiceName", root.ServiceName).
  493. Where("SpanName", root.SpanName).
  494. Where("SpanKind", root.SpanKind).
  495. Group("ServiceName, SpanName, SpanKind").
  496. Find(&st).Error; err != nil {
  497. errch <- errors.Wrap(err, "获取统计数据失败")
  498. return
  499. }
  500. }
  501. wg2 := &sync.WaitGroup{}
  502. wg2.Add(2)
  503. diff := endTime - startTime
  504. go genStats(wg2, startTime, endTime, &st1)
  505. go genStats(wg2, startTime-diff, endTime-diff, &st2)
  506. wg2.Wait()
  507. root.Stats.BizNodeId = root.ID
  508. root.Stats.Duration = math.Round(st1.Duration*100) / 100
  509. root.Stats.SuccessRate = st1.SuccessRate
  510. root.Stats.SuccessRateUp = st1.SuccessRate >= st2.SuccessRate
  511. return nil
  512. }
  513. func (s *Biz) DigitsFromClickhouse(req *dto.BizDigitsReq, resp *dto.BizDigitsResp) error {
  514. alias := ""
  515. s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &alias)
  516. req.AppAlias = alias // 后期 app alias从参数中传
  517. req.CheckFilling(time.Hour)
  518. mins := (req.EndTime - req.StartTime) / 60
  519. // 计算业务总数和服务总数
  520. bizTotal, serviceTotal := int64(0), int64(0)
  521. appAlias := ""
  522. err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
  523. if err != nil {
  524. return errors.Wrap(err, "未获取到app alias")
  525. }
  526. if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Count(&serviceTotal).Error; err != nil {
  527. return errors.Wrap(err, "获取服务总数失败")
  528. }
  529. if err := s.Orm.Model(&models.Biz{}).Where("app_alias", appAlias).Count(&bizTotal).Error; err != nil {
  530. return errors.Wrap(err, "获取业务总数失败")
  531. }
  532. resp.Biz = bizTotal
  533. resp.Service = serviceTotal
  534. //
  535. row := struct {
  536. Total int64
  537. ErrorNum int64
  538. DatabaseNum int64
  539. }{}
  540. if err := s.ChOrm.Model(&models.Trace{}).
  541. Select("count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, SUM(IF(SpanAttributes['db.system'] != '', 1, 0)) as DatabaseNum").
  542. Where("AppAlias", alias).
  543. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  544. Find(&row).Error; err != nil {
  545. return errors.Wrap(err, "获取基础统计信息失败")
  546. }
  547. resp.DatabaseBiz = row.DatabaseNum
  548. resp.DatabaseBizPerMinute = math.Round(float64(row.DatabaseNum)/float64(mins)*100) / 100
  549. resp.Error = row.ErrorNum
  550. if row.Total > 0 {
  551. resp.ErrorRate = float64(row.ErrorNum) / float64(row.Total)
  552. }
  553. resp.ErrorPerMinute = math.Round(float64(row.ErrorNum)/float64(mins)*100) / 100
  554. resp.Trace = float64(row.Total)
  555. resp.TracePerMinute = math.Round(float64(row.Total)/float64(mins)*100) / 100
  556. return nil
  557. }
  558. func (s *Biz) Digits(req *dto.BizDigitsReq, resp *dto.BizDigitsResp) error {
  559. alias := ""
  560. s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &alias)
  561. req.AppAlias = alias // 后期 app alias从参数中传
  562. req.CheckFilling(time.Hour)
  563. mins := (req.EndTime - req.StartTime) / 60
  564. metric := "observe_biz_node_duration_milliseconds"
  565. labels := map[string]string{"app_alias": req.AppAlias}
  566. ts := time.Unix(req.EndTime, 0)
  567. wg := sync.WaitGroup{}
  568. tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
  569. // 计算错误率和trace总数
  570. total, errRate := int64(0), float64(0)
  571. wg.Add(2)
  572. go tb.Total(&total)
  573. go tb.ErrorRate(&errRate)
  574. // 计算业务总数和服务总数
  575. bizTotal, serviceTotal := int64(0), int64(0)
  576. appAlias := ""
  577. err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
  578. if err != nil {
  579. return errors.Wrap(err, "未获取到app alias")
  580. }
  581. if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Count(&serviceTotal).Error; err != nil {
  582. return errors.Wrap(err, "获取服务总数失败")
  583. }
  584. if err := s.Orm.Model(&models.Biz{}).Where("app_alias", appAlias).Count(&bizTotal).Error; err != nil {
  585. return errors.Wrap(err, "获取业务总数失败")
  586. }
  587. // 计算 db 相关的业务结点
  588. wg.Add(1)
  589. labels["database"] = "true"
  590. tb2 := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
  591. dbTotal := int64(0)
  592. tb2.Total(&dbTotal)
  593. // todo log
  594. wg.Wait()
  595. resp.Biz = bizTotal
  596. resp.Service = serviceTotal
  597. resp.ErrorRate = math.Round(errRate*1e5) / 1e5
  598. resp.Error = int64(errRate * float64(total))
  599. resp.ErrorPerMinute = math.Round(errRate*float64(total)/float64(mins)*100) / 100
  600. resp.Trace = float64(total)
  601. resp.TracePerMinute = math.Round(float64(total)/float64(mins)*100) / 100
  602. resp.DatabaseBiz = dbTotal
  603. resp.DatabaseBizPerMinute = math.Round(float64(dbTotal)/float64(mins)*100) / 100
  604. return nil
  605. }
  606. // 标记
  607. func (s *Biz) Mark(req *dto.BizMarkReq) error {
  608. if req.Id <= 0 {
  609. return errors.New("参数id非法")
  610. }
  611. //err := s.Orm.Model(&models.Biz{}).Where("id", req.Id).Update("name", req.Name).Error
  612. err := s.Orm.Table("ot_biz").Where("id", req.Id).Update("name", req.Name).Error
  613. if err != nil {
  614. return errors.Wrap(err, "标记失败")
  615. }
  616. return nil
  617. }
  618. // 收藏
  619. func (s *Biz) Favor(req *dto.BizFavorReq) error {
  620. if req.Id <= 0 {
  621. return errors.New("参数id非法")
  622. }
  623. if req.Favor != 0 && req.Favor != 1 {
  624. return errors.New("参数favor非法")
  625. }
  626. err := s.Orm.Model(&models.Biz{}).Where("id", req.Id).Update("favor", req.Favor).Error
  627. if err != nil {
  628. msg := "收藏失败"
  629. if req.Favor == 0 {
  630. msg = "取消收藏失败"
  631. }
  632. return errors.Wrap(err, msg)
  633. }
  634. return nil
  635. }
  636. // 删除
  637. func (s *Biz) Delete(req *dto.BizDeleteReq) error {
  638. if req.Id <= 0 {
  639. return errors.New("参数id非法")
  640. }
  641. err := s.Orm.Delete(&models.Biz{}, req.Id).Error
  642. if err != nil {
  643. return errors.Wrap(err, "删除失败")
  644. }
  645. return nil
  646. }
  647. func (s *Biz) StatisticBasicGuy(req *dto.BizBasicGuy, resp *map[string]any) error {
  648. var countBiz int64
  649. var countService int64
  650. var countInterface int64
  651. if err := s.Orm.Table("ot_biz").Where("app_alias", req.AppAlias).Count(&countBiz).Error; err != nil {
  652. return errors.Wrap(err, "统计业务数错误")
  653. }
  654. if err := s.Orm.Table("ot_service_nodes").Where("app_alias", req.AppAlias).Count(&countService).Error; err != nil {
  655. return errors.Wrap(err, "统计服务总数错误")
  656. }
  657. if err := s.Orm.Table("ot_url_mapping").Where("app_alias", req.AppAlias).Where("type", 2).Count(&countInterface).Error; err != nil {
  658. return errors.Wrap(err, "统计对外接口总数错误")
  659. }
  660. (*resp)["biz"] = countBiz
  661. (*resp)["svrs"] = countService
  662. (*resp)["interface"] = countInterface
  663. return nil
  664. }
  665. type bizItem struct {
  666. AppAlias string
  667. ServiceName string
  668. SpanName string
  669. SpanKind string
  670. }
  671. func (s *Biz) GenGraph() error {
  672. // total := int64(0)
  673. groups := "AppAlias, ServiceName, SpanName"
  674. fields := fmt.Sprintf("%s, COUNT() Total", groups)
  675. // if err := s.ChOrm.Table(models.TableNameTrace).Select(fields).
  676. // Where("Timestamp>=now()-interval 2 hour AND Timestamp<=now()-interval 1 hour"). // 保证trace完整
  677. // Group(groups).Having("Total>1").Count(&total).Error; err != nil {
  678. // return errors.Wrap(err, "获取业务总数失败")
  679. // }
  680. pageIndex, pageSize := 1, 100
  681. // pageTotal := int(math.Ceil(float64(total) / float64(pageSize)))
  682. for i := pageIndex; ; i++ {
  683. list := []bizItem{}
  684. if err := s.ChOrm.Table(models.TableNameTrace).
  685. Select(fields).
  686. Scopes(cDto.Paginate(pageSize, i)).
  687. Where("Timestamp>now()-interval 1 hour").
  688. Where("ParentSpanId=''").
  689. Group(groups).Having("Total>1").
  690. Find(&list).Error; err != nil {
  691. log.Errorf("获取业务列表失败: %s", err.Error())
  692. continue
  693. }
  694. if len(list) == 0 {
  695. break
  696. }
  697. newBizs := []models.Biz{}
  698. for _, item := range list {
  699. appId, err := query.NewApp().Alias2ID(item.AppAlias)
  700. if err != nil {
  701. log.Errorf("获取app id失败: %s", err.Error())
  702. continue
  703. }
  704. hash := cUtils.SimpleHash(item.AppAlias, item.ServiceName, item.SpanName)
  705. if bizId, err := query.NewBiz().BizHash2ID(hash); err != nil {
  706. log.Errorf("biz id获取失败: %s", err.Error())
  707. } else if bizId == 0 {
  708. newBizs = append(newBizs, models.Biz{
  709. Name: item.SpanName,
  710. Hash: hash,
  711. AppID: appId,
  712. AppAlias: item.AppAlias,
  713. ServiceName: item.ServiceName,
  714. SpanName: item.SpanName,
  715. IsAutoCreated: 1,
  716. })
  717. }
  718. // if _, err := query.NewBiz().CheckAddBiz(appId, item.ServiceName, item.SpanName); err != nil {
  719. // log.Errorf("check add biz失败: %s", err.Error())
  720. // continue
  721. // }
  722. // go s.genGraphNode(bizId, appId, item.AppAlias, item.ServiceName, item.SpanName)
  723. }
  724. if len(newBizs) == 0 {
  725. continue
  726. }
  727. if err := s.Orm.Model(&models.Biz{}).Create(&newBizs).Error; err != nil {
  728. log.Errorf("biz create失败: %s", err.Error())
  729. }
  730. }
  731. return nil
  732. }
  733. // // 时间 限制
  734. // func (s *Biz) genGraphNode(bizId, appId int64, appAlias, serviceName, spanName string) {
  735. // traceIds := []string{}
  736. // if err := s.ChOrm.Table(models.TableNameTrace).Select("DISTINCT TraceId").
  737. // Where("AppAlias=? AND ServiceName=? AND SpanName=? AND TraceId=''", appAlias, serviceName, spanName).
  738. // Limit(10).
  739. // Pluck("TraceId", &traceIds).Error; err != nil {
  740. // return
  741. // }
  742. // fields := "SpanId, ParentSpanId, AppAlias, ServiceName, SpanName, SpanKind"
  743. // for _, traceId := range traceIds {
  744. // list := []bizItem{}
  745. // if err := s.ChOrm.Table(models.TableNameTrace).
  746. // Select(fields).Where("TraceId", traceId).
  747. // Find(&list).Error; err != nil {
  748. // log.Errorf("获取业务结点失败: %s", err)
  749. // continue
  750. // }
  751. // for _, item := range list {
  752. // bizNode := models.BizNode{
  753. // Name: item.SpanName,
  754. // Type: 0, // todo
  755. // ExternalId: 0, // todo
  756. // BizId: bizId,
  757. // AppId: appId,
  758. // // appAlias: appAlias,
  759. // ServiceName: serviceName,
  760. // SpanName: spanName,
  761. // SpanKind: item.SpanKind,
  762. // IsVirtual: 0,
  763. // }
  764. // query.NewBizNode().CheckAddBizNode(&bizNode)
  765. // }
  766. // }
  767. // }
  768. func (s *Biz) nodeInfo(id int64, hash string, node *models.BizNode) error {
  769. if id == 0 && hash == "" {
  770. return errors.New("无效参数")
  771. }
  772. db := s.Orm.Model(&models.BizNode{})
  773. if hash != "" {
  774. db.Where("hash", hash)
  775. } else {
  776. db.Where("id", id)
  777. }
  778. if err := db.Find(node).Error; err != nil {
  779. return errors.Wrap(err, "")
  780. }
  781. return nil
  782. }
  783. func (s *Biz) NodeSpans(req *dto.BizNodeSpansReq, resp *[]dto.ServiceSpansResp, count *int64) error {
  784. node := models.BizNode{}
  785. if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
  786. return err
  787. }
  788. req.CheckFilling(time.Hour)
  789. req2 := dto.ServiceSpansReq{
  790. AppAlias: node.AppAlias,
  791. ServiceName: []string{node.ServiceName},
  792. SpanName: node.SpanName,
  793. SpanKind: node.SpanKind,
  794. TimeRange: req.TimeRange,
  795. Pagination: req.Pagination,
  796. SortInfo: req.SortInfo,
  797. }
  798. svc := Service{s.OtService}
  799. return svc.Spans(&req2, resp, count)
  800. }
  801. func (s *Biz) NodeStatsFromClickhouse(req *dto.BizNodeStatsReq, resp *dto.BizNodeStatsResp) error {
  802. node := models.BizNode{}
  803. if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
  804. return err
  805. }
  806. resp.ID = node.ID
  807. resp.Hash = node.Hash
  808. req.CheckFilling(time.Hour)
  809. if err := s.ChOrm.Model(&models.Trace{}).
  810. Select(`count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0))/count() as ErrorRate,
  811. quantile(0.5)(Duration)/1e6 as P50, quantile(0.9)(Duration)/1e6 as P90, quantile(0.99)(Duration)/1e6 as P99`).
  812. Where("AppAlias", node.AppAlias).
  813. Where("ServiceName", node.ServiceName).
  814. Where("SpanKind", node.SpanKind).
  815. Where("SpanName", node.SpanName).
  816. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  817. Find(resp).Error; err != nil {
  818. return errors.Wrap(err, "获取基础统计信息失败")
  819. }
  820. resp.ID = int64(node.ID)
  821. resp.ServiceName = node.ServiceName
  822. if node.ExternalID > 0 {
  823. urlmapping := models.UrlMapping{}
  824. if err := s.Orm.Model(&models.UrlMapping{}).Where("id", node.ExternalID).Find(&urlmapping).Error; err != nil {
  825. return errors.Wrap(err, "获取接口解析信息失败")
  826. }
  827. resp.Name = urlmapping.Name
  828. resp.Value = urlmapping.Url
  829. resp.HttpMethod = urlmapping.Method
  830. }
  831. timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  832. if req.EndTime-req.StartTime >= 60*60 {
  833. timeField = "formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  834. }
  835. fields := []string{
  836. timeField,
  837. "quantile(0.5)(Duration)/1e6 as P50Duration",
  838. "quantile(0.90)(Duration)/1e6 as P90Duration",
  839. "quantile(0.99)(Duration)/1e6 as P99Duration",
  840. }
  841. quantiles := []struct {
  842. StartTime string
  843. P50Duration float64
  844. P90Duration float64
  845. P99Duration float64
  846. }{}
  847. if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
  848. Where("AppAlias", node.AppAlias).
  849. Where("ServiceName", node.ServiceName).
  850. Where("SpanKind", node.SpanKind).
  851. Where("SpanName", node.SpanName).
  852. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  853. Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
  854. return errors.Wrap(err, "获取分位数数据失败")
  855. }
  856. resp.DurationStats = dto.DurationStats{
  857. Time: make([]string, len(quantiles)),
  858. P50: make([]float64, len(quantiles)),
  859. P90: make([]float64, len(quantiles)),
  860. P99: make([]float64, len(quantiles)),
  861. }
  862. for i, quantile := range quantiles {
  863. resp.DurationStats.Time[i] = quantile.StartTime
  864. resp.DurationStats.P50[i] = math.Round(quantile.P50Duration*100) / 100
  865. resp.DurationStats.P90[i] = math.Round(quantile.P90Duration*100) / 100
  866. resp.DurationStats.P99[i] = math.Round(quantile.P99Duration*100) / 100
  867. }
  868. return nil
  869. }
  870. func (s *Biz) NodeStats(req *dto.BizNodeStatsReq, resp *dto.BizNodeStatsResp) error {
  871. node := models.BizNode{}
  872. if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
  873. return err
  874. }
  875. resp.ID = node.ID
  876. resp.Hash = node.Hash
  877. req.CheckFilling(time.Hour)
  878. wg := sync.WaitGroup{}
  879. mins := (req.EndTime - req.StartTime) / 60
  880. metric := "observe_biz_node_duration_milliseconds"
  881. labels := map[string]string{"biz_node_hash": node.Hash}
  882. ts := time.Unix(req.EndTime, 0)
  883. hist := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
  884. wg.Add(5)
  885. go hist.Quantile(0.5, &resp.P50)
  886. go hist.Quantile(0.9, &resp.P90)
  887. go hist.Quantile(0.99, &resp.P99)
  888. go hist.ErrorRate(&resp.ErrorRate)
  889. go hist.Total(&resp.Total)
  890. times := []string{}
  891. p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
  892. wg.Add(3)
  893. go hist.QuantileMinutes(0.5, &[]string{}, &p50s)
  894. go hist.QuantileMinutes(0.9, &[]string{}, &p90s)
  895. go hist.QuantileMinutes(0.99, &times, &p99s)
  896. wg.Wait()
  897. resp.ServiceName = node.ServiceName
  898. resp.DurationStats = dto.DurationStats{
  899. Time: times,
  900. P50: p50s,
  901. P90: p90s,
  902. P99: p99s,
  903. }
  904. if node.ExternalID > 0 {
  905. urlmapping := models.UrlMapping{}
  906. if err := s.Orm.Model(&models.UrlMapping{}).Where("id", node.ExternalID).Find(&urlmapping).Error; err != nil {
  907. return errors.Wrap(err, "获取接口解析信息失败")
  908. }
  909. resp.Name = urlmapping.Name
  910. resp.Value = urlmapping.Url
  911. resp.HttpMethod = urlmapping.Method
  912. }
  913. return nil
  914. }
  915. func (s *Biz) NodeUpdate(req *dto.BizNodeUpdateReq) error {
  916. db := s.Orm.Model(&models.BizNode{})
  917. if req.ID > 0 {
  918. db.Where("id", req.ID)
  919. } else if req.Hash != "" {
  920. db.Where("hash", req.Hash)
  921. } else {
  922. return errors.New("参数非法")
  923. }
  924. if err := db.Update("name", req.Name).Error; err != nil {
  925. return errors.Wrap(err, "更新失败")
  926. }
  927. return nil
  928. }
  929. func (s *Biz) SlowTop(req *dto.BizSlowTopReq, resp *[]dto.BizSlowTopResp) error {
  930. req.CheckFilling(time.Hour)
  931. if req.EndTime-req.StartTime > 3600 {
  932. req.StartTime = req.EndTime - 3600
  933. }
  934. respTemp := []dto.BizSlowTopResp{}
  935. if err := s.ChOrm.Table(models.TableNameTrace).
  936. Select("AppAlias, ServiceName, SpanName, SpanKind, avg(Duration)/1e6 Duration, SUM(if(StatusCode='STATUS_CODE_ERROR', 1, 0))/count() ErrorRate").
  937. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  938. Where("AppAlias", req.AppAlias).
  939. Where("ParentSpanId=''").
  940. Group("AppAlias, ServiceName, SpanName, SpanKind").
  941. Order("Duration desc").
  942. Limit(int(req.Limit)).
  943. Find(&respTemp).Error; err != nil {
  944. return errors.Wrap(err, "查询基础业务数据失败")
  945. }
  946. svcMap, err := query.NewService().ServiceMap(req.AppAlias)
  947. if err != nil {
  948. return err
  949. }
  950. mins := (req.EndTime - req.StartTime) / 60
  951. metric := "observe_biz_node_duration_milliseconds"
  952. ts := time.Unix(req.EndTime, 0)
  953. wg := sync.WaitGroup{}
  954. for i, trace := range respTemp {
  955. wg.Add(1)
  956. go func(t dto.BizSlowTopResp, i int) {
  957. defer wg.Done()
  958. biz := models.Biz{}
  959. s.Orm.Model(&models.Biz{}).
  960. Where("app_alias", t.AppAlias).
  961. Where("service_name", t.ServiceName).
  962. Where("span_name", t.SpanName).
  963. Find(&biz)
  964. respTemp[i].BizId = biz.ID
  965. respTemp[i].BizName = biz.Name
  966. respTemp[i].BizHash = biz.Hash
  967. respTemp[i].Duration = math.Round(respTemp[i].Duration*100) / 100
  968. // 使用sql查询出来的ErrorRate, 但sql查询出来的应该不准确, 只是当前节点的错误率,而不包括下游节点错误率
  969. if !config.ExtConfig.ClickhouseMetrics {
  970. labels := map[string]string{"biz_hash": biz.Hash}
  971. wg2 := sync.WaitGroup{}
  972. tb := prometheus.NewHistogram(&wg2, metric, labels, ts, mins)
  973. wg2.Add(1)
  974. go tb.ErrorRate(&respTemp[i].ErrorRate)
  975. wg2.Wait()
  976. }
  977. }(trace, i)
  978. }
  979. wg.Wait()
  980. for _, trace := range respTemp {
  981. if trace.BizId > 0 {
  982. trace.ServiceNameCN = svcMap[trace.ServiceName]
  983. (*resp) = append((*resp), trace)
  984. }
  985. }
  986. return nil
  987. }