service.go 58 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/app/observe/models"
  6. "go-admin/app/observe/models/query"
  7. "go-admin/app/observe/service/dto"
  8. cDto "go-admin/common/dto"
  9. "go-admin/common/opentelemetry"
  10. "go-admin/common/prometheus"
  11. "go-admin/utils"
  12. "math"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/go-admin-team/go-admin-core/sdk/config"
  18. "github.com/pkg/errors"
  19. "gorm.io/gorm"
  20. "gorm.io/gorm/clause"
  21. )
  22. const (
  23. ChWithSql = `SpanAttributes['http.url'] != '' AS isHttp,
  24. SpanAttributes['rpc.system'] != '' AS isRpc,
  25. SpanAttributes['rpc.system'] AS rpcSystem,
  26. SpanAttributes['db.system'] != '' AS isDb,
  27. SpanAttributes['db.system'] AS dbSystem,
  28. SpanAttributes['messaging.system'] AS messagingSystem`
  29. FIFTYMINUTE = "toStartOfFifteenMinutes"
  30. ONEMINUTE = "toStartOfMinute"
  31. )
  32. type Service struct {
  33. utils.OtService
  34. }
  35. func (s *Service) GetEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphEdge) error {
  36. // 边暂时不计算相关统计值,因为现在的前端用不到,隐藏以提高性能
  37. // sql := `WITH toDateTime(?) AS StartTime, toDateTime(?) AS EndTime, ? AS seconds, ? AS appAlias
  38. // SELECT
  39. // ot2.ServiceName AS SourceService,
  40. // ot1.ServiceName AS TargetService,
  41. // ot2.RequestType AS RequestType,
  42. // COUNT()/seconds AS Qps,
  43. // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
  44. // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0))/COUNT() AS ErrorRate,
  45. // AVG(Duration)/1e6 AS DurationAverage,
  46. // quantile(0.5)(Duration)/1e6 AS DurationMedian,
  47. // quantile(0.9)(Duration)/1e6 AS DurationP90,
  48. // quantile(0.99)(Duration)/1e6 AS DurationP99
  49. // FROM
  50. // (
  51. // SELECT
  52. // TraceId,
  53. // SpanId,
  54. // ParentSpanId,
  55. // ServiceName,
  56. // Duration,
  57. // StatusCode,
  58. // AppAlias
  59. // FROM otel.otel_traces
  60. // WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime
  61. // ) AS ot1
  62. // INNER JOIN
  63. // (
  64. // SELECT
  65. // SpanId,
  66. // ServiceName,
  67. // IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system']))) as RequestType
  68. // FROM otel.otel_traces
  69. // WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime
  70. // ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId
  71. // WHERE SourceService != TargetService
  72. // GROUP BY SourceService, TargetService, RequestType`
  73. // seconds := params.EndTime - params.StartTime
  74. // rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, seconds, params.AppAlias) // param.AppAlias
  75. // if err != nil {
  76. // return err
  77. // }
  78. // edgeMap := map[string]models.GraphEdge{}
  79. // for rows.Next() {
  80. // row := new(dto.ServiceEdgeRaw)
  81. // if err := rows.ScanStruct(row); err != nil {
  82. // s.Log.Errorf("扫描行到结构体失败: %s", err)
  83. // }
  84. // edgeMap[s.getEdgeId(row.SourceService, row.TargetService)] = models.GraphEdge{
  85. // ID: row.SourceService + "-" + row.TargetService,
  86. // Source: row.SourceService,
  87. // Target: row.TargetService,
  88. // MainStat: fmt.Sprintf("total: %d, %s: %.2fr/s, err: %d / %.2f%%", row.TotalNum, row.RequestType, row.Qps, row.ErrorNum, row.ErrorRate), // 展示示例 grpc:12.39req/s
  89. // SecondaryStat: fmt.Sprintf("avg: %.2fms, med: %.2fms, p90: %.2fms, p99: %.2fms",
  90. // row.DurationAverage, row.DurationMedian, row.DurationP90, row.DurationP99),
  91. // }
  92. // }
  93. _, defaultEdgeMap := s.getDefaultGraph(params.AppAlias)
  94. // for k, edge := range edgeMap {
  95. // defaultEdgeMap[k] = edge
  96. // }
  97. *result = defaultEdgeMap
  98. return nil
  99. }
  100. func (s *Service) GetNoSoulEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphEdge) error {
  101. // nodes := []models.ServiceNode{}
  102. edgeMap := map[string]models.GraphEdge{}
  103. _, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias)
  104. for k, edge := range edgeMap {
  105. defaultEdgeMap[k] = edge
  106. }
  107. for _, edge := range defaultEdgeMap {
  108. *result = append(*result, edge)
  109. }
  110. return nil
  111. }
  112. func (s *Service) GetNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphNodeScope) error {
  113. sql := `SELECT
  114. ServiceName,
  115. countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied,
  116. countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable,
  117. countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated,
  118. ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex,
  119. COUNT(DISTINCT TraceId) AS TraceNum,
  120. COUNT() AS SpanNum,
  121. SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum,
  122. SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum,
  123. SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum,
  124. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
  125. any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang
  126. FROM
  127. otel_traces ot
  128. WHERE
  129. Timestamp > toDateTime(?)
  130. AND Timestamp < toDateTime(?)
  131. AND AppAlias = ?
  132. GROUP BY ServiceName ORDER BY ServiceName`
  133. rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias)
  134. if err != nil {
  135. s.Log.Error("执行sql错误: %s", err)
  136. return err
  137. }
  138. serviceNodes := []models.ServiceNode{}
  139. s.Orm.Model(&models.ServiceNode{}).Where("app_alias", params.AppAlias).Find(&serviceNodes)
  140. serviceNameMap := map[string]string{}
  141. for _, node := range serviceNodes {
  142. serviceNameMap[node.ServiceName] = node.Name
  143. }
  144. nodeMap := map[string]models.GraphNodeScope{}
  145. s.Log.Debug("-- rows扫描行到graph_node_scope --")
  146. for rows.Next() {
  147. row := new(dto.SerivceGraphNodeRaw)
  148. if err := rows.ScanStruct(row); err != nil {
  149. s.Log.Errorf("扫描行到结构体失败: %s", err)
  150. }
  151. title, ok := serviceNameMap[row.ServiceName]
  152. if !ok {
  153. title = row.ServiceName
  154. }
  155. nodeMap[row.ServiceName] = models.GraphNodeScope{
  156. ID: row.ServiceName,
  157. Title: title,
  158. SubTitle: "",
  159. Send: int64(row.SentNum),
  160. Receive: int64(row.ReceivedNum),
  161. SecondaryStat: "",
  162. ArcSuccess: float64(row.SpanNum-row.ErrorNum) / float64(row.SpanNum),
  163. ArcFaild: float64(row.ErrorNum) / float64(row.SpanNum),
  164. Icon: row.SdkLang,
  165. Apdex: row.Apdex,
  166. // MainStat: fmt.Sprintf("sent: %d, received: %d", row.SentNum, row.ReceivedNum),
  167. }
  168. }
  169. defaultNodeMap, _ := s.getDefaultGraph(params.AppAlias)
  170. for k, node := range nodeMap {
  171. defaultNodeMap[k] = node
  172. }
  173. *result = defaultNodeMap
  174. // for _, node := range defaultNodeMap {
  175. // *result = append(*result, node)
  176. // }
  177. return nil
  178. }
  179. func (s *Service) GetNoSoulNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphNodeScope) error {
  180. defaultNodeMap, _ := s.getDefaultGraphNoSoul(params.AppAlias)
  181. for _, node := range defaultNodeMap {
  182. *result = append(*result, node)
  183. }
  184. return nil
  185. }
  186. func (s *Service) GetGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) {
  187. params.CheckFilling(5 * time.Minute)
  188. nodes, edges := make(map[string]models.GraphNodeScope, 0), make(map[string]models.GraphEdge, 0)
  189. if err = s.GetNodes(ctx, params, &nodes); err != nil {
  190. return
  191. }
  192. if err = s.GetEdges(ctx, params, &edges); err != nil {
  193. return
  194. }
  195. // nodes, edges, _ = s.extendNodesAndEdges(params, nodes, edges)
  196. for _, node := range nodes {
  197. result.Nodes = append(result.Nodes, node)
  198. }
  199. for _, edge := range edges {
  200. result.Edges = append(result.Edges, edge)
  201. }
  202. // result.Edges = edges
  203. // result.Nodes = nodes
  204. return
  205. }
  206. func (s *Service) GetNoSoulGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) {
  207. params.CheckFilling(5 * time.Minute)
  208. nodes, edges := make([]models.GraphNodeScope, 0), make([]models.GraphEdge, 0)
  209. defaultNodeMap, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias)
  210. // defaultNodeMap, defaultEdgeMap, _ = s.extendNodesAndEdges(params, defaultNodeMap, defaultEdgeMap)
  211. for _, node := range defaultNodeMap {
  212. nodes = append(nodes, node)
  213. }
  214. for _, edge := range defaultEdgeMap {
  215. edges = append(edges, edge)
  216. }
  217. // if err = s.GetNoSoulNodes(ctx, params, &nodes); err != nil {
  218. // return
  219. // }
  220. // if err = s.GetNoSoulEdges(ctx, params, &edges); err != nil {
  221. // return
  222. // }
  223. result.Edges = edges
  224. result.Nodes = nodes
  225. return
  226. }
  227. func (s *Service) getDefaultGraph(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) {
  228. data := []models.ServiceEdge{}
  229. edges = make(map[string]models.GraphEdge)
  230. nodes = make(map[string]models.GraphNodeScope)
  231. serviceNodes := []models.ServiceNode{}
  232. s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes)
  233. serviceNameMap := map[string]string{}
  234. for _, node := range serviceNodes {
  235. serviceNameMap[node.ServiceName] = node.Name
  236. }
  237. if err := s.Orm.Model(&models.ServiceEdge{}).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil {
  238. for _, item := range data {
  239. sourceTitle, targetTitle := item.Source, item.Target
  240. if title, ok := serviceNameMap[sourceTitle]; ok {
  241. sourceTitle = title
  242. }
  243. if title, ok := serviceNameMap[targetTitle]; ok {
  244. targetTitle = title
  245. }
  246. source, target := item.Source, item.Target
  247. if item.SourceApp == appAlias { // 如果源应用是当前应用
  248. if item.SourceType != "service" {
  249. continue
  250. }
  251. nodes[source] = models.GraphNodeScope{
  252. ID: source,
  253. Title: sourceTitle,
  254. Icon: item.SourceIcon,
  255. }
  256. if item.TargetType == "application" {
  257. target = "downstream"
  258. nodes[target] = models.GraphNodeScope{
  259. ID: target,
  260. Title: "下游应用",
  261. Icon: "cloud",
  262. }
  263. } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" {
  264. nodes[target] = models.GraphNodeScope{
  265. ID: target,
  266. Title: targetTitle,
  267. Icon: item.TargetIcon,
  268. }
  269. } else {
  270. continue
  271. }
  272. }
  273. if item.TargetApp == appAlias {
  274. if item.TargetType != "service" {
  275. continue
  276. }
  277. nodes[target] = models.GraphNodeScope{
  278. ID: target,
  279. Title: target,
  280. Icon: item.TargetIcon,
  281. }
  282. if item.SourceType == "application" || item.SourceType == "client" {
  283. source = "upstream"
  284. nodes[source] = models.GraphNodeScope{
  285. ID: source,
  286. Title: "上游应用",
  287. Icon: "cloud",
  288. }
  289. } else if item.SourceType == "service" && item.SourceApp == appAlias {
  290. nodes[source] = models.GraphNodeScope{
  291. ID: source,
  292. Title: sourceTitle,
  293. Icon: item.SourceIcon,
  294. }
  295. } else {
  296. continue
  297. }
  298. }
  299. // if item.SourceType == "client" || item.SourceType == "application" {
  300. // source = "upstream"
  301. // nodes[source] = models.GraphNodeScope{
  302. // ID: source,
  303. // Title: "上游应用",
  304. // Icon: "cloud",
  305. // }
  306. // } else {
  307. // nodes[source] = models.GraphNodeScope{
  308. // ID: source,
  309. // Title: sourceTitle,
  310. // MainStat: "sent: 0, received: 0",
  311. // Icon: item.SourceIcon,
  312. // }
  313. // }
  314. // if item.TargetType == "application" {
  315. // target = "downstream"
  316. // nodes[target] = models.GraphNodeScope{
  317. // ID: target,
  318. // Title: "下游应用",
  319. // Icon: "cloud",
  320. // }
  321. // } else {
  322. // nodes[target] = models.GraphNodeScope{
  323. // ID: target,
  324. // Title: targetTitle,
  325. // MainStat: "sent: 0, received: 0",
  326. // Icon: item.TargetIcon,
  327. // }
  328. // }
  329. edges[s.getEdgeId(source, target)] = models.GraphEdge{
  330. ID: s.getEdgeId(source, target),
  331. Source: source,
  332. Target: target,
  333. MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%",
  334. }
  335. }
  336. }
  337. return
  338. }
  339. func (s *Service) getDefaultGraphNoSoul(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) {
  340. data := []models.ServiceEdge{}
  341. serviceNodes := []models.ServiceNode{}
  342. s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes)
  343. serviceNameMap := map[string]string{}
  344. for _, node := range serviceNodes {
  345. serviceNameMap[node.ServiceName] = node.Name
  346. }
  347. nodes = make(map[string]models.GraphNodeScope)
  348. if err := s.Orm.Table(models.TableNameServiceEdge).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil {
  349. edges = make(map[string]models.GraphEdge, len(data))
  350. for _, item := range data {
  351. source, target := item.Source, item.Target
  352. sourceTitle, targetTitle := item.Source, item.Target
  353. if title, ok := serviceNameMap[sourceTitle]; ok {
  354. sourceTitle = title
  355. }
  356. if title, ok := serviceNameMap[targetTitle]; ok {
  357. targetTitle = title
  358. }
  359. if item.SourceApp == appAlias { // 如果源应用是当前应用
  360. if item.SourceType != "service" {
  361. continue
  362. }
  363. nodes[source] = models.GraphNodeScope{
  364. ID: source,
  365. Title: sourceTitle,
  366. Icon: item.SourceIcon,
  367. }
  368. if item.TargetType == "application" {
  369. target = "downstream"
  370. nodes[target] = models.GraphNodeScope{
  371. ID: target,
  372. Title: "下游应用",
  373. Icon: "cloud",
  374. }
  375. } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" {
  376. nodes[target] = models.GraphNodeScope{
  377. ID: target,
  378. Title: targetTitle,
  379. Icon: item.TargetIcon,
  380. }
  381. } else {
  382. continue
  383. }
  384. }
  385. if item.TargetApp == appAlias {
  386. if item.TargetType != "service" {
  387. continue
  388. }
  389. nodes[target] = models.GraphNodeScope{
  390. ID: target,
  391. Title: targetTitle,
  392. Icon: item.TargetIcon,
  393. }
  394. if item.SourceType == "application" || item.SourceType == "client" {
  395. source = "upstream"
  396. nodes[source] = models.GraphNodeScope{
  397. ID: source,
  398. Title: "上游应用",
  399. Icon: "cloud",
  400. }
  401. } else if item.SourceType == "service" && item.SourceApp == appAlias {
  402. nodes[source] = models.GraphNodeScope{
  403. ID: source,
  404. Title: sourceTitle,
  405. Icon: item.SourceIcon,
  406. }
  407. } else {
  408. continue
  409. }
  410. }
  411. // if item.SourceType == "client" || item.SourceType == "application" {
  412. // source = "upstream"
  413. // nodes[source] = models.GraphNodeScope{
  414. // ID: source,
  415. // Title: "上游应用",
  416. // Icon: "cloud",
  417. // }
  418. // } else {
  419. // nodes[source] = models.GraphNodeScope{
  420. // ID: source,
  421. // Title: source,
  422. // Icon: source,
  423. // }
  424. // }
  425. // if item.TargetType == "application" {
  426. // target = "downstream"
  427. // nodes[target] = models.GraphNodeScope{
  428. // ID: target,
  429. // Title: "下游应用",
  430. // Icon: "cloud",
  431. // }
  432. // } else {
  433. // nodes[target] = models.GraphNodeScope{
  434. // ID: target,
  435. // Title: target,
  436. // Icon: target,
  437. // }
  438. // }
  439. edges[s.getEdgeId(source, target)] = models.GraphEdge{
  440. ID: s.getEdgeId(source, target),
  441. Source: source,
  442. Target: target,
  443. // MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%",
  444. }
  445. }
  446. }
  447. dn := []models.ServiceNode{}
  448. err := s.Orm.Table(models.TableNameServiceNode).Where("app_alias", appAlias).Find(&dn).Error
  449. if err == nil {
  450. for _, item := range dn {
  451. title := item.ServiceName
  452. if val, ok := serviceNameMap[item.ServiceName]; ok {
  453. title = val
  454. }
  455. nodes[item.ServiceName] = models.GraphNodeScope{
  456. ID: item.ServiceName,
  457. Title: title,
  458. Icon: item.Kind,
  459. }
  460. }
  461. }
  462. return
  463. }
  464. // 扩展节点和边
  465. func (s *Service) extendNodesAndEdges(params *dto.ServiceGetEdgesReq, nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) (map[string]models.GraphNodeScope, map[string]models.GraphEdge, error) {
  466. components := []models.SystemComponent{}
  467. err := s.Orm.Model(&models.SystemComponent{}).Where("app_alias", params.AppAlias).Find(&components).Error
  468. if err != nil {
  469. return nodes, edges, errors.Wrap(err, "获取系统组件失败")
  470. }
  471. dbMp, err := s.databaseNodeMap(params)
  472. if err != nil {
  473. return nodes, edges, err
  474. }
  475. msgMp, err := s.messagingNodeMap(params)
  476. if err != nil {
  477. return nodes, edges, err
  478. }
  479. cliMp, err := s.clientNodeMap(params)
  480. if err != nil {
  481. return nodes, edges, err
  482. }
  483. prevSvcToComponent := map[string]string{}
  484. nextSvcToComponent := map[string]string{}
  485. newEdges := map[string]models.GraphEdge{}
  486. for _, component := range components {
  487. if component.PrevServiceName != "" {
  488. prevSvcToComponent[component.PrevServiceName] = component.Component
  489. newEdges[s.getEdgeId(component.PrevServiceName, component.Component)] = models.GraphEdge{
  490. Source: component.PrevServiceName,
  491. Target: component.Component,
  492. }
  493. }
  494. if component.NextServiceName != "" {
  495. nextSvcToComponent[component.NextServiceName] = component.Component
  496. newEdges[s.getEdgeId(component.Component, component.NextServiceName)] = models.GraphEdge{
  497. Source: component.Component,
  498. Target: component.NextServiceName,
  499. }
  500. }
  501. node := models.GraphNodeScope{
  502. ID: component.Component,
  503. Title: component.Name,
  504. Icon: component.Component,
  505. }
  506. if dbStat, ok := dbMp[component.Component]; ok {
  507. node.Receive = dbStat.Total
  508. node.Send = 0
  509. node.ArcFaild = float64(dbStat.ErrorNum) / float64(dbStat.Total)
  510. node.ArcSuccess = 1 - node.ArcFaild
  511. }
  512. if msgStat, ok := msgMp[component.Component]; ok {
  513. node.Receive = msgStat.ReceivedNum
  514. node.Send = msgStat.SentNum // 严格来说, kafka通常只能被动接收,无发送量,但为不方便理解,将消费量作为发送量
  515. node.ArcFaild = float64(msgStat.ErrorNum) / float64(msgStat.Total)
  516. node.ArcSuccess = 1 - node.ArcFaild
  517. }
  518. if cliStat, ok := cliMp[component.Component]; ok {
  519. node.Receive = 0
  520. node.Send = cliStat.Total
  521. node.ArcFaild = float64(cliStat.ErrorNum) / float64(cliStat.Total)
  522. node.ArcSuccess = 1 - node.ArcFaild
  523. }
  524. nodes[component.Component] = node
  525. }
  526. dels := []string{}
  527. for sourceTarget, edge := range edges {
  528. source, target := s.getSourceTarget(sourceTarget)
  529. if component, ok := prevSvcToComponent[source]; ok {
  530. newEdges[s.getEdgeId(source, component)] = models.GraphEdge{
  531. Source: source,
  532. Target: component,
  533. MainStat: edge.MainStat,
  534. SecondaryStat: edge.SecondaryStat,
  535. }
  536. }
  537. if component, ok := nextSvcToComponent[target]; ok {
  538. newEdges[s.getEdgeId(component, target)] = models.GraphEdge{
  539. Source: component,
  540. Target: target,
  541. MainStat: edge.MainStat,
  542. SecondaryStat: edge.SecondaryStat,
  543. }
  544. }
  545. if prevSvcToComponent[source] != "" && nextSvcToComponent[target] != "" {
  546. // 如果组件既有上游节点又有下游节点, 则该组件为消息中间件, 删除原来两节点直连的边
  547. dels = append(dels, sourceTarget)
  548. }
  549. }
  550. for _, key := range dels {
  551. delete(edges, key)
  552. }
  553. for key, edge := range newEdges {
  554. edges[key] = edge
  555. }
  556. return nodes, edges, nil
  557. }
  558. func (s *Service) databaseNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
  559. list := []dto.ServiceComponentStats{}
  560. if err := s.ChOrm.Model(&models.Trace{}).Select(`
  561. SpanAttributes['db.system'] AS Component,
  562. COUNT() AS Total,
  563. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
  564. `).Where("AppAlias", params.AppAlias).
  565. Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
  566. Where("SpanKind", "SPAN_KIND_CLIENT").
  567. Where("SpanAttributes['db.system']!=''").Group("Component").Find(&list).Error; err != nil {
  568. return nil, errors.Wrap(err, "获取数据库统计信息失败")
  569. }
  570. mp := make(map[string]dto.ServiceComponentStats, len(list))
  571. for _, item := range list {
  572. mp[item.Component] = item
  573. }
  574. return mp, nil
  575. }
  576. func (s *Service) messagingNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
  577. list := []dto.ServiceComponentStats{}
  578. if err := s.ChOrm.Model(&models.Trace{}).Select(`
  579. SpanAttributes['messaging.system'] AS Component,
  580. COUNT() AS Total,
  581. SUM(IF(SpanKind='SPAN_KIND_CONSUMER', 1, 0)) AS ReceivedNum,
  582. SUM(IF(SpanKind='SPAN_KIND_PRODUCER', 1, 0)) AS SentNum,
  583. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
  584. `).Where("AppAlias", params.AppAlias).
  585. Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
  586. Where("SpanKind in ('SPAN_KIND_CONSUMER', 'SPAN_KIND_PRODUCER')").
  587. Where("SpanAttributes['messaging.system']!=''").Group("Component").Find(&list).Error; err != nil {
  588. return nil, errors.Wrap(err, "获取数据库统计信息失败")
  589. }
  590. mp := make(map[string]dto.ServiceComponentStats, len(list))
  591. for _, item := range list {
  592. mp[item.Component] = item
  593. }
  594. return mp, nil
  595. }
  596. func (s *Service) clientNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
  597. list := []dto.ServiceComponentStats{}
  598. if err := s.ChOrm.Model(&models.Trace{}).Select(`
  599. splitByChar('/', if(SpanAttributes['user_agent.original']!='', SpanAttributes['user_agent.original'], SpanAttributes['http.user_agent']))[1] AS Component,
  600. COUNT() AS Total,
  601. SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
  602. `).Where("AppAlias", params.AppAlias).
  603. Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
  604. Where("SpanKind='SPAN_KIND_SERVER'").
  605. Where("ParentSpanId='' and Component!=''").
  606. Group("Component").Find(&list).Error; err != nil {
  607. return nil, errors.Wrap(err, "获取数据库统计信息失败")
  608. }
  609. mp := make(map[string]dto.ServiceComponentStats, len(list))
  610. for _, item := range list {
  611. mp[item.Component] = item
  612. }
  613. return mp, nil
  614. }
  615. func (s *Service) getEdgeId(source, target string) string {
  616. return fmt.Sprintf("%s-%s", source, target)
  617. }
  618. func (s *Service) getSourceTarget(edgeId string) (string, string) {
  619. st := strings.Split(edgeId, "-")
  620. if len(st) == 2 {
  621. return st[0], st[1]
  622. }
  623. return "", ""
  624. }
  625. func (s *Service) getPercentileDuration(ctx context.Context, params *dto.SpanScatterChartReq, p90 *float64) (err error) {
  626. sql := fmt.Sprintf(`SELECT quantile(%f)(Duration) AS p90 FROM otel_traces
  627. WHERE Timestamp>toDateTime(?) AND Timestamp<toDateTime(?) AND AppAlias=? AND ServiceName=?`, params.Percentile)
  628. row := s.OlapConn.QueryRow(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.ServiceName)
  629. err = row.Scan(p90)
  630. return
  631. }
  632. func (s *Service) GetSpanScatterChart(ctx context.Context, params *dto.SpanScatterChartReq, result *models.ScatterChart) (err error) {
  633. var p90 float64
  634. if err = s.getPercentileDuration(ctx, params, &p90); err != nil {
  635. s.Log.Errorf("获取百分位失败:%s", err)
  636. return
  637. }
  638. if params.EndTime == 0 {
  639. params.EndTime = time.Now().Unix()
  640. params.StartTime = params.EndTime - 5*60
  641. }
  642. sql := `SELECT
  643. Timestamp,
  644. Duration/1e6 AS DurationMs,
  645. StatusCode == 'STATUS_CODE_ERROR' AS Error,
  646. IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system']))) as RequestType
  647. FROM otel_traces
  648. WHERE Timestamp>toDateTime(?) AND Timestamp<toDateTime(?) AND AppAlias=? AND ServiceName=? AND Duration>? AND SpanKind!='SPAN_KIND_INTERNAL'
  649. ORDER BY Timestamp ASC`
  650. rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.ServiceName, p90)
  651. if err != nil {
  652. s.Log.Errorf("执行sql失败:%s", err)
  653. return
  654. }
  655. var t time.Time
  656. var r string
  657. var d float64
  658. var e bool
  659. for rows.Next() {
  660. err = rows.Scan(&t, &d, &e, &r)
  661. if err != nil {
  662. s.Log.Errorf("扫描行到变量失败:%s", err)
  663. break
  664. }
  665. estr := "success"
  666. if e {
  667. estr = "failed"
  668. }
  669. if _, ok := (*result)[r]; !ok {
  670. (*result)[r] = map[string][]models.CoordinatePoint{
  671. "success": {},
  672. "failed": {},
  673. }
  674. }
  675. (*result)[r][estr] = append((*result)[r][estr], []any{t, d})
  676. }
  677. return
  678. }
  679. func (s *Service) GetServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
  680. if params.EndTime == 0 {
  681. params.EndTime = time.Now().Unix()
  682. params.StartTime = params.EndTime - 5*60
  683. }
  684. sql := fmt.Sprintf(`SELECT
  685. toStartOfMinute(toTimeZone(Timestamp,'Asia/Hong_Kong')) AS hr,
  686. COUNT(*),
  687. median(Duration/1e6),
  688. FROM otel.otel_traces
  689. Where ServiceName='%s'
  690. AND Timestamp > (NOW() - toIntervalHour(3))
  691. GROUP BY hr
  692. ORDER BY hr ASC`, params.ServiceName)
  693. rows, err := s.OlapConn.Query(ctx, sql)
  694. if err != nil {
  695. s.Log.Errorf("select liveness service error:%s", err)
  696. return
  697. }
  698. var t time.Time
  699. var d float64
  700. var c uint64
  701. for rows.Next() {
  702. err = rows.Scan(&t, &c, &d)
  703. if err != nil {
  704. s.Log.Errorf("Scan rows errors:%s", err)
  705. break
  706. }
  707. *result = append(*result, []any{t, c, d})
  708. }
  709. return
  710. }
  711. func (s *Service) CompareServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
  712. startTime := time.Now().Add(-2 * time.Hour * time.Duration(params.HourNum)).Truncate(time.Hour)
  713. endTime := time.Now().Truncate(time.Hour).Add(time.Hour)
  714. list := []struct {
  715. Start string
  716. Total int64
  717. }{}
  718. sql := `SELECT formatDateTime(StartTime, '%F %H:%i', 'PRC') as Start, countMerge(TraceNum) as Total FROM otel_traces_aggbysvc
  719. WHERE AppAlias=? AND ServiceName=? AND StartTime>=? AND StartTime<? GROUP BY StartTime ORDER BY Start`
  720. err = s.ChOrm.Raw(sql, params.AppAlias, params.ServiceName, startTime.Unix(), endTime.Unix()).Scan(&list).Error
  721. // 使用视图效率低,会扫描全表
  722. // err = s.ChOrm.Model(&models.TracesAggbysvcMerge{}).Debug().
  723. // Where("AppAlias=? and ServiceName=?", params.AppAlias, params.ServiceName).
  724. // Where("StartTime>=? and StartTime<?", startTime.Unix(), endTime.Unix()).
  725. // Select("formatDateTime(StartTime, '%F %H:%i', 'PRC') as Start, TraceNum as Total").
  726. // Scan(&list).Error
  727. if err != nil {
  728. return
  729. }
  730. *result = make([]models.CoordinatePoint, 0, len(list))
  731. for _, item := range list {
  732. *result = append(*result, []any{item.Start, item.Total})
  733. }
  734. return
  735. // livenessSQL := `SELECT
  736. // toString(%s(toTimeZone(Timestamp,'Asia/Hong_Kong'))) AS hr,
  737. // COUNT(*)
  738. // FROM otel.otel_traces
  739. // Where ServiceName='%s'
  740. // AND AppAlias = '%s'
  741. // AND Timestamp > (NOW() - toIntervalHour(%d))
  742. // GROUP BY hr
  743. // ORDER BY hr ASC`
  744. // if params.HourNum < 6 {
  745. // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
  746. // } else {
  747. // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
  748. // }
  749. // rows, err := s.OlapConn.Query(ctx, livenessSQL)
  750. // if err != nil {
  751. // s.Log.Errorf("select compare liveness service error:%s", err)
  752. // return
  753. // }
  754. // var t string
  755. // var d uint64
  756. // for rows.Next() {
  757. // err = rows.Scan(&t, &d)
  758. // if err != nil {
  759. // s.Log.Errorf("Scan rows errors:%s", err)
  760. // break
  761. // }
  762. // *result = append(*result, [2]any{t, d})
  763. // }
  764. // return
  765. }
  766. func (s *Service) CompareServiceErrors(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
  767. db := s.ChOrm.Model(&models.TracesError{}).
  768. Where("AppAlias=? and ServiceName=? and Timestamp>now()-interval ? hour", params.AppAlias, params.ServiceName, params.HourNum*2)
  769. if params.HourNum == 1 {
  770. db.Select("formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
  771. } else if params.HourNum <= 3 {
  772. db.Select("formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
  773. } else if params.HourNum <= 6 {
  774. db.Select("formatDateTime(toStartOfTenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
  775. } else {
  776. db.Select("formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
  777. }
  778. list := []struct {
  779. StartTime string
  780. Total int64
  781. }{}
  782. err = db.Group("StartTime").Order("StartTime ASC").Scan(&list).Error
  783. if err != nil {
  784. return
  785. }
  786. *result = make([]models.CoordinatePoint, 0, len(list))
  787. for _, item := range list {
  788. *result = append(*result, []any{item.StartTime, item.Total})
  789. }
  790. return
  791. // livenessSQL := `with (if(SpanAttributes['http.status_code']!='', SpanAttributes['http.status_code'], SpanAttributes['http.response.status_code'])) as HttpCode
  792. // SELECT
  793. // toString(%s(toTimeZone(Timestamp,'Asia/Hong_Kong'))) AS hr,
  794. // COUNT(*)
  795. // FROM otel.otel_traces
  796. // Where ServiceName='%s'
  797. // AND (HttpCode >= '400' OR StatusCode = 'STATUS_CODE_ERROR')
  798. // AND AppAlias = '%s'
  799. // AND Timestamp > (NOW() - toIntervalHour(%d))
  800. // GROUP BY hr
  801. // ORDER BY hr ASC`
  802. // if params.HourNum < 6 {
  803. // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
  804. // } else {
  805. // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
  806. // }
  807. // rows, err := s.OlapConn.Query(ctx, livenessSQL)
  808. // if err != nil {
  809. // s.Log.Errorf("select compare liveness service error:%s", err)
  810. // return
  811. // }
  812. // var t string
  813. // var d uint64
  814. // for rows.Next() {
  815. // err = rows.Scan(&t, &d)
  816. // if err != nil {
  817. // s.Log.Errorf("Scan rows errors:%s", err)
  818. // break
  819. // }
  820. // *result = append(*result, [2]any{t, d})
  821. // }
  822. // return
  823. }
  824. func (s *Service) Spans(req *dto.ServiceSpansReq, resp *[]dto.ServiceSpansResp, count *int64) error {
  825. req.CheckFilling(time.Minute * 5)
  826. list := []struct {
  827. Datetime time.Time `json:"datetime"`
  828. TraceId string `json:"trace_id"`
  829. SpanId string `json:"span_id"`
  830. ServiceName string `json:"service_name"`
  831. Method string `json:"method"`
  832. Code string `json:"code"`
  833. Duration float64 `json:"duration"`
  834. }{}
  835. db := s.ChOrm.Table(models.TableNameTrace).
  836. Select(`Timestamp as Datetime, TraceId,SpanId,ServiceName,IF(SpanAttributes['http.method'] != '',
  837. SpanAttributes['http.method'],
  838. IF(SpanAttributes['http.request.method'] != '', SpanAttributes['http.request.method'],
  839. IF(SpanAttributes['rpc.system'] != '',
  840. SpanAttributes['rpc.system'],
  841. IF(SpanAttributes['db.system'] != '',
  842. SpanAttributes['db.system'],
  843. SpanAttributes['messaging.system']
  844. )
  845. )
  846. )
  847. ) AS Method,
  848. IF(SpanAttributes['http.status_code']>='200',
  849. SpanAttributes['http.status_code'],
  850. IF(SpanAttributes['http.response.status_code'] >= '200',SpanAttributes['http.response.status_code'],
  851. IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')] != '',
  852. SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')],
  853. IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')] != '',
  854. SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')],
  855. IF(SpanAttributes['messaging.operation']!='',
  856. SpanAttributes['messaging.operation'],
  857. IF(SpanAttributes['db.operation'] != '',SpanAttributes['db.operation'], SpanName)
  858. )
  859. )
  860. )
  861. )
  862. ) AS Code,
  863. Duration `,
  864. ).
  865. Where("AppAlias", req.AppAlias).
  866. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  867. Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex()))
  868. if len(req.ServiceName) == 1 {
  869. db.Where("ServiceName", req.ServiceName[0])
  870. } else if len(req.ServiceName) > 1 {
  871. db.Where("ServiceName IN ?", req.ServiceName)
  872. }
  873. if req.TraceId != "" {
  874. db.Where("TraceId", req.TraceId)
  875. }
  876. if req.MaxDuration > 0 {
  877. db.Where("Duration<?", req.MaxDuration*1e6)
  878. }
  879. if req.MinDuration > 0 {
  880. db.Where("Duration>=?", req.MinDuration*1e6)
  881. }
  882. if req.SpanKind != "" {
  883. db.Where("SpanKind", req.SpanKind)
  884. }
  885. if req.SpanAttributeKey != "" {
  886. db.Where(fmt.Sprintf("SpanAttributes['%s']", req.SpanAttributeKey), req.SpanAttributeValue)
  887. }
  888. if req.OnlyDatabase {
  889. db.Where("SpanAttributes['db.system'] != ?", "")
  890. }
  891. if req.SpanName != "" {
  892. db.Where("SpanName", req.SpanName)
  893. }
  894. if req.OnlyException {
  895. // db.Where(`StatusCode=? OR
  896. // SpanAttributes['http.status_code']>=? OR SpanAttributes['http.response.status_code']>=? OR
  897. // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')]>? OR
  898. // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')]!=?
  899. // `, "STATUS_CODE_ERROR", "400", "400", "0", "")
  900. // 简化查询, 加速仅异常条件慢问题,这么写也能查到,只是查到的http类错误都为span kind client, 没关系,因为都要点进去看的
  901. db.Where("StatusCode", "STATUS_CODE_ERROR")
  902. }
  903. if req.RequestMethod != "" {
  904. db.Where(`SpanAttributes['http.method'] = ? OR
  905. SpanAttributes['http.request.method'] = ? OR
  906. SpanAttributes['rpc.system'] = ? OR
  907. SpanAttributes['db.system'] = ? OR
  908. SpanAttributes['messaging.system'] = ?`,
  909. req.RequestMethod,
  910. req.RequestMethod,
  911. req.RequestMethod,
  912. req.RequestMethod,
  913. req.RequestMethod)
  914. }
  915. order := req.OrderBy([]string{"Duration", "Timestamp"}, "Duration", "DESC")
  916. if err := db.Order(order).Find(&list).Limit(-1).Offset(-1).Count(count).Error; err != nil {
  917. return errors.Wrap(err, "获取service相关span失败")
  918. }
  919. *resp = make([]dto.ServiceSpansResp, len(list))
  920. for i, item := range list {
  921. // (*resp)[i].Duration /= 1e6
  922. (*resp)[i].Datetime = item.Datetime.Local().Format(time.DateTime)
  923. (*resp)[i].Timestamp = item.Datetime.Unix()
  924. (*resp)[i].TraceId = item.TraceId
  925. (*resp)[i].SpanId = item.SpanId
  926. (*resp)[i].Method = item.Method
  927. (*resp)[i].Code = item.Code
  928. (*resp)[i].ServiceName = item.ServiceName
  929. (*resp)[i].Duration = item.Duration / 1e6
  930. }
  931. return nil
  932. }
  933. func (s *Service) GenService(resp *dto.ServiceJobGenServiceResp) error {
  934. defer s.genSingleNodeService(resp)
  935. list := []struct {
  936. TargetService string
  937. SourceService string
  938. TargetAlias string
  939. SourceAlias string
  940. SourceIcon string
  941. TargetIcon string
  942. }{}
  943. now := time.Now()
  944. // start, end := now.Add(-time.Hour), now //数据量太大,调整为分钟级,跨度大于增加的数据量即可
  945. start, end := now.Add(-5*time.Minute), now
  946. sub1 := s.ChOrm.Table(models.TableNameTrace).
  947. Select("SpanId, ParentSpanId, ServiceName, AppAlias, ResourceAttributes['telemetry.sdk.language'] SdkLang").
  948. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix())
  949. sub2 := s.ChOrm.Table(models.TableNameTrace).
  950. Select("SpanId, ServiceName, AppAlias, ResourceAttributes['telemetry.sdk.language'] SdkLang").
  951. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix())
  952. if err := s.ChOrm.Table("(?) AS ot1", sub1).Joins("INNER JOIN (?) AS ot2 ON ot2.SpanId = ot1.ParentSpanId", sub2).
  953. Select([]string{
  954. "ot1.ServiceName AS TargetService",
  955. "ot2.ServiceName AS SourceService",
  956. "any(ot1.AppAlias) AS TargetAlias",
  957. "any(ot2.AppAlias) AS SourceAlias",
  958. "any(ot1.SdkLang) AS TargetIcon",
  959. "any(ot2.SdkLang) AS SourceIcon",
  960. }).
  961. Where("TargetService != SourceService").
  962. Group("TargetService, SourceService").
  963. Find(&list).Error; err != nil {
  964. return errors.Wrap(err, "获取service信息失败")
  965. }
  966. if len(list) == 0 {
  967. return nil
  968. }
  969. nodes := map[string]models.ServiceNode{}
  970. edges := map[string]models.ServiceEdge{}
  971. edgeKey := func(edge models.ServiceEdge) string {
  972. return fmt.Sprintf("%s-%s-%s", edge.AppAlias, edge.Source, edge.Target)
  973. }
  974. // 先处理target结点,因为仅target结点能看出对内或对外
  975. for _, item := range list {
  976. edge := models.ServiceEdge{
  977. AppAlias: item.TargetAlias,
  978. Source: item.SourceService,
  979. Target: item.TargetService,
  980. SourceIcon: item.SourceIcon,
  981. TargetIcon: item.TargetIcon,
  982. }
  983. if item.SourceAlias != item.TargetAlias {
  984. edge.Source = "InCloud"
  985. edge.SourceIcon = "cloud"
  986. edge2 := models.ServiceEdge{
  987. AppAlias: item.SourceAlias,
  988. Source: item.SourceService,
  989. Target: "OutCloud",
  990. SourceIcon: item.SourceIcon,
  991. TargetIcon: "cloud",
  992. }
  993. // edges = append(edges, edge2)
  994. edges[edgeKey(edge2)] = edge2
  995. }
  996. // edges = append(edges, edge)
  997. edges[edgeKey(edge)] = edge
  998. appId, err := query.NewApp().Alias2ID(item.TargetAlias)
  999. if err != nil {
  1000. continue
  1001. }
  1002. key := fmt.Sprintf("%s-%s", item.TargetService, item.TargetAlias)
  1003. if node, ok := nodes[key]; !ok {
  1004. node := models.ServiceNode{
  1005. AppID: int32(appId),
  1006. AppAlias: item.TargetAlias,
  1007. Name: item.TargetService,
  1008. ServiceName: item.TargetService,
  1009. Kind: item.TargetIcon,
  1010. CreatedAt: now,
  1011. UpdatedAt: now,
  1012. }
  1013. node.Type = 1
  1014. if item.TargetAlias != item.SourceAlias { // 说明是对外结点
  1015. node.Type = 2
  1016. }
  1017. nodes[key] = node
  1018. } else {
  1019. if node.Type == 1 {
  1020. if item.TargetAlias != item.SourceAlias {
  1021. node.Type = 3
  1022. }
  1023. } else if node.Type == 2 {
  1024. if item.TargetAlias == item.SourceAlias {
  1025. node.Type = 3
  1026. }
  1027. }
  1028. }
  1029. }
  1030. // 再处理source结点
  1031. for _, item := range list {
  1032. key := fmt.Sprintf("%s-%s", item.SourceService, item.SourceAlias)
  1033. if _, ok := nodes[key]; ok { // 如果存在,说明其已经作为target存在过,不再处理
  1034. continue
  1035. }
  1036. appId, err := query.NewApp().Alias2ID(item.TargetAlias)
  1037. if err != nil {
  1038. continue
  1039. }
  1040. nodes[key] = models.ServiceNode{
  1041. AppID: int32(appId),
  1042. AppAlias: item.SourceAlias,
  1043. Name: item.SourceService,
  1044. ServiceName: item.SourceService,
  1045. Type: 1, // 固定为对内
  1046. Kind: item.SourceIcon,
  1047. CreatedAt: now,
  1048. UpdatedAt: now,
  1049. }
  1050. }
  1051. nodeList := make([]models.ServiceNode, 0, len(nodes))
  1052. for _, node := range nodes {
  1053. nodeList = append(nodeList, node)
  1054. }
  1055. edgeList := make([]models.ServiceEdge, 0, len(edges))
  1056. for _, edge := range edges {
  1057. edgeList = append(edgeList, edge)
  1058. }
  1059. // fmt.Println("nodelist: ", nodeList)
  1060. s.genVirtualService(&nodeList, &edgeList)
  1061. newNodes := s.getNewNodes(nodeList)
  1062. s.insertNewNodes(newNodes, resp)
  1063. newEdges := s.getNewEdges(edgeList)
  1064. s.insertNewEdges(newEdges, resp)
  1065. return nil
  1066. }
  1067. // 处理仅有一个服务的应用
  1068. func (s *Service) genSingleNodeService(resp *dto.ServiceJobGenServiceResp) error {
  1069. now := time.Now()
  1070. start, end := now.Add(-10*time.Minute), now.Add(-5*time.Minute)
  1071. // 查询区间内的AppAlias
  1072. appAliases := []string{}
  1073. err := s.ChOrm.Table(models.TableNameTrace).Distinct("AppAlias").
  1074. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix()).
  1075. Pluck("AppAlias", &appAliases).Error
  1076. if err != nil {
  1077. return errors.Wrap(err, "查询区间内应用失败")
  1078. }
  1079. wg := sync.WaitGroup{}
  1080. var mu sync.Mutex
  1081. for _, appAlias := range appAliases {
  1082. wg.Add(1)
  1083. go func(wg *sync.WaitGroup, appAlias string) error {
  1084. defer wg.Done()
  1085. list := []struct {
  1086. ServiceName string
  1087. SpanKind string
  1088. SdkLang string
  1089. }{}
  1090. traceIdTotal := []struct {
  1091. TraceId string
  1092. Total int64
  1093. }{}
  1094. if err := s.ChOrm.Table(models.TableNameTrace).
  1095. Select("TraceId, count() as Total").
  1096. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix()).
  1097. Where("AppAlias", appAlias).
  1098. Group("TraceId").
  1099. Having("Total", 1).Limit(10).
  1100. Find(&traceIdTotal).Error; err != nil {
  1101. return errors.Wrap(err, "获取TraceID失败")
  1102. }
  1103. traceIds := make([]string, len(traceIdTotal))
  1104. for i, t := range traceIdTotal {
  1105. traceIds[i] = t.TraceId
  1106. }
  1107. sub := s.ChOrm.Debug().Table(models.TableNameTrace).
  1108. Select("TraceId, any(ServiceName) ServiceName, any(SpanKind) SpanKind, any(ResourceAttributes['telemetry.sdk.language']) SdkLang, COUNT() as Total").
  1109. Where("Timestamp>=toDateTime(?)", start.Add(-10*time.Minute).Unix()).Where("TraceId IN ?", traceIds).
  1110. Where("AppAlias", appAlias).
  1111. Group("TraceId").Having("Total", 1)
  1112. if err := s.ChOrm.Table("(?) as t", sub).Distinct("ServiceName, SpanKind, SdkLang").Find(&list).Error; err != nil {
  1113. return errors.Wrap(err, "获取单服务结点信息失败")
  1114. }
  1115. nodes := map[string]models.ServiceNode{}
  1116. edges := []models.ServiceEdge{}
  1117. for _, item := range list {
  1118. num := int64(0)
  1119. s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", appAlias, item.ServiceName).Count(&num)
  1120. if num > 0 {
  1121. continue
  1122. }
  1123. // 如果 ot_service_node 表中不存在, 说明这个服务结点未插入过,且属于单服务结点
  1124. key := fmt.Sprintf("%s-%s", item.ServiceName, appAlias)
  1125. appId, err := query.NewApp().Alias2ID(appAlias)
  1126. if err != nil {
  1127. continue
  1128. }
  1129. if item.SpanKind == "SPAN_KIND_SERVER" {
  1130. nodes[key] = models.ServiceNode{
  1131. AppID: int32(appId),
  1132. AppAlias: appAlias,
  1133. Name: item.ServiceName,
  1134. ServiceName: item.ServiceName,
  1135. Kind: item.SdkLang,
  1136. CreatedAt: now,
  1137. UpdatedAt: now,
  1138. }
  1139. edges = append(edges, models.ServiceEdge{
  1140. AppAlias: appAlias,
  1141. Source: "InCloud",
  1142. Target: item.ServiceName,
  1143. SourceIcon: "cloud",
  1144. TargetIcon: item.SdkLang,
  1145. })
  1146. } else if item.SpanKind == "SPAN_KIND_CLIENT" {
  1147. nodes[key] = models.ServiceNode{
  1148. AppID: int32(appId),
  1149. AppAlias: appAlias,
  1150. Name: item.ServiceName,
  1151. ServiceName: item.ServiceName,
  1152. Kind: item.SdkLang,
  1153. CreatedAt: now,
  1154. UpdatedAt: now,
  1155. }
  1156. edges = append(edges, models.ServiceEdge{
  1157. AppAlias: appAlias,
  1158. Source: item.ServiceName,
  1159. Target: "OutCloud",
  1160. SourceIcon: item.SdkLang,
  1161. TargetIcon: "cloud",
  1162. })
  1163. }
  1164. }
  1165. nodeList := make([]models.ServiceNode, 0, len(nodes))
  1166. for _, node := range nodes {
  1167. nodeList = append(nodeList, node)
  1168. }
  1169. // fmt.Println("nodelist: ", nodeList)
  1170. // s.genVirtualService(&nodeList, &edges)
  1171. mu.Lock()
  1172. newNodes := s.getNewNodes(nodeList)
  1173. s.insertNewNodes(newNodes, resp)
  1174. newEdges := s.getNewEdges(edges)
  1175. s.insertNewEdges(newEdges, resp)
  1176. mu.Unlock()
  1177. return nil
  1178. }(&wg, appAlias)
  1179. }
  1180. wg.Wait()
  1181. return nil
  1182. }
  1183. // 插入结点数据
  1184. func (s *Service) insertNewNodes(newNodes []models.ServiceNode, resp *dto.ServiceJobGenServiceResp) error {
  1185. if len(newNodes) > 0 {
  1186. errs := ""
  1187. result := s.Orm.Model(&models.ServiceNode{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newNodes)
  1188. if result.Error != nil {
  1189. errs = result.Error.Error()
  1190. }
  1191. resp.NodeResult = dto.CreateResult{
  1192. Error: errs,
  1193. RowsAffected: result.RowsAffected + resp.NodeResult.RowsAffected,
  1194. Total: len(newNodes) + resp.NodeResult.Total,
  1195. }
  1196. }
  1197. return nil
  1198. }
  1199. // 插入边数据
  1200. func (s *Service) insertNewEdges(newEdges []models.ServiceEdge, resp *dto.ServiceJobGenServiceResp) error {
  1201. if len(newEdges) > 0 {
  1202. errs := ""
  1203. result := s.Orm.Model(&models.ServiceEdge{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newEdges)
  1204. if result.Error != nil {
  1205. errs = result.Error.Error()
  1206. }
  1207. resp.EdgeResult = dto.CreateResult{
  1208. Error: errs,
  1209. RowsAffected: result.RowsAffected + resp.EdgeResult.RowsAffected,
  1210. Total: len(newEdges) + resp.EdgeResult.Total,
  1211. }
  1212. }
  1213. return nil
  1214. }
  1215. // 获取服务结点数据 用于在边数据不存在的情况下
  1216. func (s *Service) getNewNodesWithoutEdges() ([]models.ServiceNode, error) {
  1217. list := []struct {
  1218. ServiceName string
  1219. AppAlias string
  1220. SdkLang string
  1221. }{}
  1222. if err := s.ChOrm.Table(models.TableNameTrace).
  1223. Select("ServiceName, AppAlias, any(ResourceAttributes['telemetry.sdk.language']) as SdkLang").
  1224. Where("Timestamp>=now()-interval 5 minute").
  1225. Group("ServiceName, AppAlias").Find(&list).Error; err != nil {
  1226. return []models.ServiceNode{}, errors.Wrap(err, "获取服务结点失败")
  1227. }
  1228. nodes := make([]models.ServiceNode, 0, len(list))
  1229. for _, item := range list {
  1230. if appId, err := query.NewApp().Alias2ID(item.AppAlias); err == nil {
  1231. nodes = append(nodes, models.ServiceNode{
  1232. AppID: int32(appId),
  1233. AppAlias: item.AppAlias,
  1234. Name: item.ServiceName,
  1235. ServiceName: item.ServiceName,
  1236. Type: 1, // 仅有一个结点,肯定是对内;此处有个问题,就是后期如果对外了, 没有地方可以更新
  1237. Kind: item.SdkLang,
  1238. })
  1239. }
  1240. }
  1241. newNodes := s.getNewNodes(nodes)
  1242. return newNodes, nil
  1243. }
  1244. // 过滤nodes中的结点, 只要数据库中不存在的node数据
  1245. func (s *Service) getNewNodes(nodes []models.ServiceNode) []models.ServiceNode {
  1246. newNodes := make([]models.ServiceNode, 0)
  1247. sq := query.NewService()
  1248. for _, node := range nodes {
  1249. has, _ := sq.HasNode(node.AppAlias, node.ServiceName)
  1250. if !has {
  1251. newNodes = append(newNodes, node)
  1252. }
  1253. }
  1254. return newNodes
  1255. }
  1256. func (s *Service) getNewEdges(edges []models.ServiceEdge) []models.ServiceEdge {
  1257. newEdges := make([]models.ServiceEdge, 0)
  1258. sq := query.NewService()
  1259. for _, edge := range edges {
  1260. has, _ := sq.HasEdge(edge.AppAlias, edge.Source, edge.Target)
  1261. if !has {
  1262. newEdges = append(newEdges, edge)
  1263. }
  1264. }
  1265. return newEdges
  1266. }
  1267. // 生成 virtual Service
  1268. func (s *Service) genVirtualService(nodes *[]models.ServiceNode, edges *[]models.ServiceEdge) {
  1269. uniq := func(a, b string) string {
  1270. return fmt.Sprintf("%s-%s", a, b)
  1271. }
  1272. nodeMap := map[string]models.ServiceNode{}
  1273. for _, node := range *nodes {
  1274. nodeMap[uniq(node.AppAlias, node.ServiceName)] = node
  1275. }
  1276. sources, targets := map[string]struct{}{}, map[string]struct{}{}
  1277. for _, edge := range *edges {
  1278. sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target)
  1279. if _, ok := nodeMap[sk]; ok {
  1280. sources[sk] = struct{}{}
  1281. }
  1282. if _, ok := nodeMap[tk]; ok {
  1283. targets[tk] = struct{}{}
  1284. }
  1285. }
  1286. // 找到所有的root和leaf 结点
  1287. rootNodes, leafNodes := map[string]models.ServiceNode{}, map[string]models.ServiceNode{}
  1288. for _, edge := range *edges {
  1289. sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target)
  1290. // 如果source在targets中不存在,说明source是root
  1291. if _, ok := targets[sk]; !ok {
  1292. rootNodes[sk] = nodeMap[sk]
  1293. // rootNodes = append(rootNodes, nodeMap[sk])
  1294. }
  1295. // 如果target在sources中不存在,说明target是leaf
  1296. if _, ok := sources[tk]; !ok {
  1297. leafNodes[tk] = nodeMap[tk]
  1298. // leafNodes = append(leafNodes, nodeMap[tk])
  1299. }
  1300. }
  1301. // 对于每个root结点, 检查它对应的root span, 如果root span的span kind为server类型, 则需要在它的上层添加一个virtual service
  1302. for _, node := range rootNodes {
  1303. spans := []models.Trace{}
  1304. if err := s.ChOrm.Table(models.TableNameTrace).
  1305. Where("Timestamp>=NOW()-INTERVAL 1 HOUR").
  1306. Where("ParentSpanId=''").
  1307. Where("SpanKind='SPAN_KIND_SERVER'").
  1308. Where("ServiceName", node.ServiceName).
  1309. Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil {
  1310. continue
  1311. }
  1312. if len(spans) <= 0 {
  1313. continue
  1314. }
  1315. vnode := models.ServiceNode{
  1316. AppID: node.AppID,
  1317. AppAlias: node.AppAlias,
  1318. ServiceName: fmt.Sprintf("%s:frontend", node.ServiceName),
  1319. Type: 1, // 对内服务
  1320. }
  1321. vnode.Name = vnode.ServiceName
  1322. vedge := models.ServiceEdge{
  1323. AppAlias: node.AppAlias,
  1324. Source: vnode.ServiceName,
  1325. Target: node.ServiceName,
  1326. SourceIcon: "",
  1327. }
  1328. vedge.SourceIcon = "cloud"
  1329. vedge.Source = "InCloud" // 所有virtual source都写作InCloud
  1330. for _, span := range spans {
  1331. spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute)
  1332. if spanAttrs.UserAgent() != "" && vnode.Name == "" {
  1333. vnode.Name = "Browser"
  1334. }
  1335. if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok && vedge.TargetIcon == "" {
  1336. vedge.TargetIcon = lang
  1337. }
  1338. if vnode.Name != "" && vedge.TargetIcon != "" {
  1339. break
  1340. }
  1341. }
  1342. // *nodes = append(*nodes, vnode)
  1343. *edges = append(*edges, vedge)
  1344. }
  1345. // 对于每个leaf结点, 检查它对应的span中,是否有span kind为client且为db相关的
  1346. for _, node := range leafNodes {
  1347. spans := []models.Trace{}
  1348. if err := s.ChOrm.Table(models.TableNameTrace).
  1349. Where("Timestamp>=NOW()-INTERVAL 1 HOUR").
  1350. Where("SpanKind='SPAN_KIND_CLIENT' AND (SpanAttributes['db.system'] != '' OR SpanAttributes['db.type'] != '')").
  1351. Where("ServiceName", node.ServiceName).
  1352. Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil {
  1353. continue
  1354. }
  1355. if len(spans) <= 0 {
  1356. continue
  1357. }
  1358. vnode := models.ServiceNode{
  1359. AppID: node.AppID,
  1360. AppAlias: node.AppAlias,
  1361. ServiceName: "",
  1362. Type: 1, // 对内服务
  1363. }
  1364. vedge := models.ServiceEdge{
  1365. AppAlias: node.AppAlias,
  1366. Source: node.ServiceName,
  1367. }
  1368. for _, span := range spans {
  1369. spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute)
  1370. if vnode.ServiceName == "" && spanAttrs.IsDB() {
  1371. vnode.ServiceName = spanAttrs.DbSystem()
  1372. }
  1373. if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok {
  1374. vedge.SourceIcon = lang
  1375. }
  1376. if vnode.ServiceName != "" && vedge.SourceIcon != "" {
  1377. break
  1378. }
  1379. }
  1380. // if vnode.ServiceName == "" {
  1381. // vnode.ServiceName = "db:unknown"
  1382. // vedge.TargetIcon = "database"
  1383. // } else {
  1384. // vedge.TargetIcon = vnode.ServiceName
  1385. // }
  1386. vedge.TargetIcon = "cloud"
  1387. vnode.Name = vnode.ServiceName
  1388. // vedge.Target = vnode.ServiceName
  1389. vedge.Target = "OutCloud"
  1390. // *nodes = append(*nodes, vnode) // virtual service node不加入node表
  1391. *edges = append(*edges, vedge)
  1392. }
  1393. }
  1394. func (s *Service) List(req *dto.ServiceListReq, resp *[]dto.ServiceListResp, total *int64) error {
  1395. appAlias := ""
  1396. err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
  1397. if err != nil {
  1398. return errors.Wrap(err, "未获取到app alias")
  1399. }
  1400. pageScopes := cDto.Paginate(req.GetPageSize(), req.GetPageIndex())
  1401. result := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Scopes(pageScopes).Find(&resp)
  1402. if result.Error != nil {
  1403. return errors.Wrap(result.Error, "获取服务列表失败")
  1404. }
  1405. if err := result.Limit(-1).Offset(-1).Count(total).Error; err != nil {
  1406. return errors.Wrap(err, "获取服务总数量失败")
  1407. }
  1408. return nil
  1409. }
  1410. func (s *Service) ListNoAppAlias(req *dto.ServiceListNoAppAliasReq, resp *[]dto.ServiceListNoAppAliasResp, total *int64) error {
  1411. req.CheckFilling(time.Minute * 5)
  1412. if err := s.ChOrm.Model(&models.Trace{}).
  1413. Select("ServiceName, COUNT() as SpanTotal, COUNT(DISTINCT TraceId) as TraceTotal").
  1414. Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  1415. Where("AppAlias", "UNSET").
  1416. Group("ServiceName").
  1417. Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).
  1418. Find(resp).Offset(-1).Limit(-1).Count(total).Error; err != nil {
  1419. return errors.Wrap(err, "获取离散服务数据失败")
  1420. }
  1421. return nil
  1422. }
  1423. func (s *Service) Update(c *dto.ServiceUpdateReq) error {
  1424. var err error
  1425. var data = models.ServiceNode{}
  1426. err = s.Orm.First(&data, c.GetId()).Error
  1427. if err != nil {
  1428. if err == gorm.ErrRecordNotFound {
  1429. return fmt.Errorf("更新目标不存在, id: %d", c.GetId())
  1430. }
  1431. return err
  1432. }
  1433. c.Generate(&data)
  1434. db := s.Orm.Save(&data) // 允许自定义重名
  1435. if err = db.Error; err != nil {
  1436. s.Log.Errorf("Service update error:%s \r\n", err)
  1437. return err
  1438. }
  1439. return nil
  1440. }
  1441. func (s *Service) StatsFromClickhouse(req *dto.ServiceStatsReq, resp *dto.ServiceStatsResp) error {
  1442. node := models.ServiceNode{}
  1443. var err error
  1444. if req.ID > 0 {
  1445. err = s.Orm.Find(&node, req.ID).Error
  1446. } else {
  1447. err = s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", req.AppAlias, req.ServiceName).Find(&node).Error
  1448. }
  1449. if err != nil {
  1450. return errors.Wrap(err, "查询服务结点失败")
  1451. }
  1452. if node.AppAlias == "" || node.ServiceName == "" {
  1453. return errors.New("缺少必要参数 app_alias, service_name")
  1454. }
  1455. req.CheckFilling(time.Minute * 5)
  1456. if err := s.ChOrm.Model(&models.Trace{}).
  1457. Select("ServiceName, count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, max(Duration)/1e6 as Max, avg(Duration)/1e6 as Avg").
  1458. Where("AppAlias", node.AppAlias).
  1459. Where("ServiceName", node.ServiceName).
  1460. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  1461. Group("ServiceName").Find(resp).Error; err != nil {
  1462. return errors.Wrap(err, "获取基础统计信息失败")
  1463. }
  1464. resp.ID = int64(node.ID)
  1465. resp.ErrorRate = float64(resp.ErrorNum) / float64(resp.Total)
  1466. resp.Rpm = math.Round(float64(resp.Total)/float64((req.EndTime-req.StartTime)/60)*100) / 100
  1467. resp.Max = math.Round(resp.Max*100) / 100
  1468. resp.Avg = math.Round(resp.Avg*100) / 100
  1469. timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  1470. if req.EndTime-req.StartTime >= 60*60 {
  1471. timeField = "formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
  1472. }
  1473. fields := []string{
  1474. timeField,
  1475. "quantile(0.5)(Duration)/1e6 as P50Duration",
  1476. "quantile(0.90)(Duration)/1e6 as P90Duration",
  1477. "quantile(0.99)(Duration)/1e6 as P99Duration",
  1478. }
  1479. quantiles := []struct {
  1480. StartTime string
  1481. P50Duration float64
  1482. P90Duration float64
  1483. P99Duration float64
  1484. }{}
  1485. if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
  1486. Where("AppAlias", node.AppAlias).
  1487. Where("ServiceName", node.ServiceName).
  1488. Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  1489. Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
  1490. return errors.Wrap(err, "获取分位数数据失败")
  1491. }
  1492. resp.Quantiles = dto.Quantiles{
  1493. Time: make([]string, len(quantiles)),
  1494. P50: make([]float64, len(quantiles)),
  1495. P90: make([]float64, len(quantiles)),
  1496. P99: make([]float64, len(quantiles)),
  1497. }
  1498. for i, quantile := range quantiles {
  1499. resp.Quantiles.Time[i] = quantile.StartTime
  1500. resp.Quantiles.P50[i] = math.Round(quantile.P50Duration*100) / 100
  1501. resp.Quantiles.P90[i] = math.Round(quantile.P90Duration*100) / 100
  1502. resp.Quantiles.P99[i] = math.Round(quantile.P99Duration*100) / 100
  1503. }
  1504. return nil
  1505. }
  1506. func (s *Service) Stats(req *dto.ServiceStatsReq, resp *dto.ServiceStatsResp) error {
  1507. node := models.ServiceNode{}
  1508. if err := s.Orm.Find(&node, req.ID).Error; err != nil {
  1509. return errors.Wrap(err, "查询服务结点失败")
  1510. }
  1511. req.CheckFilling(time.Minute * 5)
  1512. mins := (req.EndTime - req.StartTime) / 60
  1513. metric := "observe_server_duration_milliseconds"
  1514. labels := map[string]string{"service_name": node.ServiceName}
  1515. ts := time.Unix(req.EndTime, 0)
  1516. wg := sync.WaitGroup{}
  1517. tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
  1518. // 计算错误
  1519. errorRate := float64(0)
  1520. wg.Add(1)
  1521. go tb.ErrorRate(&errorRate)
  1522. // 计算分位数
  1523. times := []string{}
  1524. p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
  1525. wg.Add(3)
  1526. go tb.QuantileMinutes(0.5, &[]string{}, &p50s)
  1527. go tb.QuantileMinutes(0.9, &[]string{}, &p90s)
  1528. go tb.QuantileMinutes(0.99, &times, &p99s)
  1529. // 计算最大时延
  1530. wg.Add(1)
  1531. maxDuration := float64(0)
  1532. go tb.Quantile(1.0, &maxDuration) // 最大时延就是百分位数为1的值
  1533. // 计算平均时延
  1534. wg.Add(1)
  1535. avgDuration := float64(0)
  1536. go tb.Avg(&avgDuration)
  1537. // 记录 total
  1538. wg.Add(1)
  1539. total := int64(0)
  1540. go tb.Total(&total)
  1541. wg.Wait()
  1542. // resp.ID = int64(node.ID)
  1543. resp.ServiceName = node.ServiceName
  1544. // resp.Total = st.Total
  1545. resp.Total = total
  1546. resp.Rpm = math.Round(float64(resp.Total)/float64((mins))*100) / 100
  1547. resp.Max = maxDuration
  1548. resp.Avg = avgDuration
  1549. if math.IsNaN(resp.Avg) {
  1550. resp.Avg = 0
  1551. }
  1552. if errorRate > 0 {
  1553. resp.ErrorRate = errorRate // 错误率的计算公式可能有问题
  1554. resp.ErrorNum = int64(errorRate * float64(resp.Total))
  1555. }
  1556. resp.Quantiles.Time = times
  1557. resp.Quantiles.P50 = p50s
  1558. resp.Quantiles.P90 = p90s
  1559. resp.Quantiles.P99 = p99s
  1560. return nil
  1561. }
  1562. // 数字视图处理逻辑
  1563. func (s *Service) Digits(req *dto.ServiceDigitsReq, resp *dto.ServiceDigitsResp) error {
  1564. key := fmt.Sprintf("observe__service_digits_%s_%s", req.AppAlias, req.ServiceName)
  1565. if req.ServiceName == "" {
  1566. key = fmt.Sprintf("observe__service_digits_%s", req.AppAlias)
  1567. }
  1568. rdb := config.GetRedisClient()
  1569. res, err := rdb.HGetAll(key).Result()
  1570. if err == nil {
  1571. // 由于当数据不存在时,不会写缓存,所以这里数字视图的缓存数据有可能不存在,属于正常情况
  1572. // if len(res) == 0 {
  1573. // return errors.New("数字视图缓存数据不存在,请稍后再试")
  1574. // }
  1575. resp.Span, _ = strconv.ParseFloat(res["span"], 64)
  1576. resp.Span = math.Round(resp.Span*100) / 100
  1577. resp.Trace, _ = strconv.ParseFloat(res["trace"], 64)
  1578. resp.Trace = math.Round(resp.Trace*100) / 100
  1579. resp.Http, _ = strconv.ParseFloat(res["http"], 64)
  1580. resp.Http = math.Round(resp.Http*100) / 100
  1581. resp.Rpc, _ = strconv.ParseFloat(res["rpc"], 64)
  1582. resp.Rpc = math.Round(resp.Rpc*100) / 100
  1583. resp.DB, _ = strconv.ParseFloat(res["db"], 64)
  1584. resp.DB = math.Round(resp.DB*100) / 100
  1585. resp.Error, _ = strconv.ParseFloat(res["error"], 64)
  1586. resp.Error = math.Round(resp.Error*100) / 100
  1587. return nil
  1588. }
  1589. return errors.Wrap(err, "数字视图缓存获取失败")
  1590. }
  1591. // 服务数字视图生成, 生成最近1小时内,每分钟的数据
  1592. func (s *Service) DigitsGen() error {
  1593. digits := []struct {
  1594. AppAlias string
  1595. ServiceName string
  1596. Span float64
  1597. Trace float64
  1598. Http float64
  1599. Rpc float64
  1600. Error float64
  1601. }{}
  1602. result := s.ChOrm.Table(models.TableNameTrace).Select([]string{
  1603. "AppAlias AS AppAlias",
  1604. "ServiceName AS ServiceName",
  1605. "COUNT()/60 AS Span",
  1606. "COUNT(DISTINCT TraceId)/60 AS Trace",
  1607. "SUM(if(SpanKind='SPAN_KIND_SERVER' AND (SpanAttributes['http.status_code']>='200' OR SpanAttributes['http.response.status_code']>='200'), 1, 0))/60 AS Http",
  1608. "SUM(if(SpanKind='SPAN_KIND_SERVER' AND SpanAttributes['rpc.system']!='', 1, 0))/60 AS Rpc",
  1609. "SUM(if(SpanAttributes['db.type']='' OR SpanAttributes['db.system']!='', 1, 0))/60 AS Db",
  1610. "SUM(if(StatusCode='STATUS_CODE_ERROR' OR SpanAttributes['http.status_code']>='400' OR SpanAttributes['http.response.status_code']>='400', 1, 0))/60 AS Error",
  1611. }).Where("Timestamp>=NOW()-INTERVAL 1 HOUR").Group("AppAlias, ServiceName").Find(&digits)
  1612. if result.Error != nil {
  1613. return errors.Wrap(result.Error, "获取数字视图数据失败")
  1614. }
  1615. rdb := config.GetRedisClient()
  1616. pipe := rdb.Pipeline()
  1617. aliasDigits := map[string]map[string]int{}
  1618. for _, digit := range digits {
  1619. key := fmt.Sprintf("observe__service_digits_%s_%s", digit.AppAlias, digit.ServiceName)
  1620. pipe.HSet(key, map[string]interface{}{
  1621. "span": digit.Span,
  1622. "trace": digit.Trace,
  1623. "http": digit.Http,
  1624. "rpc": digit.Rpc,
  1625. "error": digit.Error,
  1626. })
  1627. pipe.Expire(key, time.Hour) // 缓存1小时
  1628. // 计算 每个应用下 所有service的数字视图数据
  1629. key = fmt.Sprintf("observe__service_digits_%s", digit.AppAlias)
  1630. if _, ok := aliasDigits[key]; !ok {
  1631. aliasDigits[key] = map[string]int{}
  1632. }
  1633. aliasDigits[key]["span"] += int(digit.Span)
  1634. aliasDigits[key]["trace"] += int(digit.Trace)
  1635. aliasDigits[key]["http"] += int(digit.Http)
  1636. aliasDigits[key]["rpc"] += int(digit.Rpc)
  1637. aliasDigits[key]["error"] += int(digit.Error)
  1638. }
  1639. for key, digit := range aliasDigits {
  1640. pipe.HSet(key, map[string]interface{}{
  1641. "span": digit["span"],
  1642. "trace": digit["trace"],
  1643. "http": digit["http"],
  1644. "rpc": digit["rpc"],
  1645. "error": digit["error"],
  1646. })
  1647. }
  1648. pipe.Exec()
  1649. return nil
  1650. }