trace.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/app/observe/models"
  6. "go-admin/app/observe/models/query"
  7. "go-admin/app/observe/service/dto"
  8. cDto "go-admin/common/dto"
  9. "go-admin/utils"
  10. "math"
  11. "strings"
  12. "time"
  13. "github.com/pkg/errors"
  14. )
  15. type Trace struct {
  16. utils.OtService
  17. }
  18. func (t *Trace) GetPage(ctx context.Context, req *dto.TraceListReq, result *[]dto.TraceListResp, count *int64) error {
  19. // 先查出所有的TraceId, 之所以这么做,是因为如果直接按时间、时延搜索的话,会过滤部分span,导致span数量、service数量不正确
  20. db := t.ChOrm.Debug().Table(models.TableNameTrace).Distinct()
  21. if req.AppAlias != "" && req.TraceId == "" {
  22. db.Where("AppAlias", req.AppAlias)
  23. }
  24. if req.ServiceName != "" {
  25. if strings.Contains(req.ServiceName, "---") { // 兼容前端
  26. serviceNames := strings.Split(req.ServiceName, "---")
  27. req.ServiceName = serviceNames[1]
  28. }
  29. db.Where("ServiceName", req.ServiceName)
  30. }
  31. req.CheckFilling(time.Hour * 48) // 默认两天
  32. // 由于之前返回给前端时为2024-03-27T13:19:26.272775882+08:00这种格式,前端将之转成秒级时间戳后会缩小查询范围,导致查询不到,所以下面会+1或-1
  33. db.Where("Timestamp > toDateTime(?)", req.StartTime-1)
  34. db.Where("Timestamp < toDateTime(?)", req.EndTime+1)
  35. if req.TraceId != "" {
  36. db.Where("TraceId = ?", req.TraceId)
  37. }
  38. if req.MinDuration > 0 {
  39. db.Where("Duration >= ?", math.Floor(req.MinDuration)*float64(time.Millisecond))
  40. }
  41. if req.MaxDuration > 0 {
  42. db.Where("Duration <= ?", math.Ceil(req.MaxDuration)*float64(time.Millisecond))
  43. }
  44. if req.Failed {
  45. // db.Where("HttpCode >= ?", http.StatusBadRequest)
  46. db.Where("StatusCode = 'STATUS_CODE_ERROR' AND StatusMessage != '' AND StatusMessage != 'status code:0'")
  47. }
  48. if req.Kind != "" && req.SubconditionValue != "" {
  49. db.Where("AppAlias", req.AppAlias)
  50. switch req.Kind {
  51. case "app":
  52. case "biz":
  53. case "interface":
  54. db.Where("")
  55. case "service":
  56. db.Where("ServiceName = ", req.SubconditionValue)
  57. default:
  58. }
  59. }
  60. if req.StatusMessage != "" {
  61. db.Where("StatusMessage", req.StatusMessage)
  62. }
  63. // 这里添加了分页,即使条件为空也不会查询大量数据
  64. db.Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).Order("Duration DESC")
  65. tids := make([]string, 0)
  66. if err := db.Pluck("TraceId", &tids).Limit(-1).Offset(-1).Distinct("TraceId").Count(count).Error; err != nil {
  67. return errors.Wrap(err, "查询trace出错")
  68. }
  69. var res []struct {
  70. TraceId string `json:"trace_id"`
  71. MinTimestamp time.Time `json:"timestamp"`
  72. MaxDuration float64 `json:"duration"`
  73. ServiceNum int64 `json:"service_num"`
  74. SpanNum int64 `json:"span_num"`
  75. }
  76. db = t.ChOrm.Debug().Table(models.TableNameTrace)
  77. fields := []string{
  78. "TraceId, MIN(Timestamp) AS MinTimestamp",
  79. "MAX(Duration)/1e6 AS MaxDuration",
  80. "COUNT(DISTINCT ServiceName) AS ServiceNum",
  81. "COUNT() AS SpanNum",
  82. }
  83. db.Select(fields).Where("TraceId IN ?", tids)
  84. // db.Where("Timestamp>NOW()-INTERVAL 48 HOURS") // 最多查询48小时以内的,可能有些许误差,因为有的trace链路较长
  85. // 缩小查询范围,正常链路都不会太长,取前后一小时,应该能足够覆盖,特殊情况可能有遗漏
  86. db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", req.StartTime-3600, req.EndTime+3600)
  87. if req.AppAlias != "" && req.TraceId == "" {
  88. db.Where("AppAlias", req.AppAlias)
  89. }
  90. if err := db.Group("TraceId").Find(&res).Error; err != nil {
  91. t.Log.Errorf("执行sql失败: %s", err)
  92. return err
  93. }
  94. *result = make([]dto.TraceListResp, len(res))
  95. for i := range *result {
  96. (*result)[i].MinTimestamp = res[i].MinTimestamp.Local().Format(time.DateTime)
  97. (*result)[i].MaxDuration = res[i].MaxDuration
  98. (*result)[i].ServiceNum = res[i].ServiceNum
  99. (*result)[i].SpanNum = res[i].SpanNum
  100. (*result)[i].TraceId = res[i].TraceId
  101. }
  102. return nil
  103. }
  104. func (t *Trace) GetSpans(ctx context.Context, param *dto.TraceDetailReq, root *dto.TraceDetailResp) error {
  105. list := []models.Trace{}
  106. db := t.ChOrm.Table(models.TableNameTrace).
  107. Distinct("*"). // 由于数据库中存在parent id生蚝的情况,可能是由于网张原因导致的,这里去一下重
  108. // Select("ServiceName, SpanName, SpanKind, SpanId, ParentSpanId, Duration/1e6 AS Duration").
  109. Where("TraceId=?", param.TraceId)
  110. param.CheckFilling(30 * time.Minute)
  111. db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", param.StartTime, param.EndTime)
  112. if err := db.Order("Timestamp ASC").Find(&list).Error; err != nil {
  113. t.Log.Errorf("执行sql失败: %s", err)
  114. return err
  115. }
  116. if len(list) == 0 {
  117. return errors.New("指定范围内不存在span数据")
  118. }
  119. dict := []models.Dict{}
  120. if err := t.Orm.Model(&models.Dict{}).Where("type in ('SpanAttributes', 'ResourceAttributes')").Find(&dict).Error; err != nil {
  121. return errors.Wrap(err, "获取字典数据失败")
  122. }
  123. dictMap := make(map[string]models.Dict, len(dict))
  124. for _, item := range dict {
  125. dictMap[item.Label] = item
  126. }
  127. mp := map[string][]*models.Trace{}
  128. s2hs := map[string]map[string]string{}
  129. s2hr := map[string]map[string]string{}
  130. s2ps := map[string]string{} // SpanId => ParentSpanId
  131. for i := range list {
  132. // list[i].Duration /= 1e6
  133. item := list[i]
  134. spanAttrs, hideSpanAttrs := make(map[string]string), make(map[string]string)
  135. resAttrs, hideResAttrs := make(map[string]string), make(map[string]string)
  136. for key, val := range item.SpanAttribute {
  137. if item, ok := dictMap[key]; ok {
  138. title := item.Title
  139. if title == "" {
  140. title = key
  141. }
  142. if item.Hide == 0 {
  143. spanAttrs[title] = val
  144. } else {
  145. hideSpanAttrs[title] = val
  146. }
  147. } else if val != "" {
  148. spanAttrs[key] = val
  149. }
  150. }
  151. for key, val := range item.ResourceAttribute {
  152. if item, ok := dictMap[key]; ok {
  153. title := item.Title
  154. if title == "" {
  155. title = key
  156. }
  157. if item.Hide == 0 {
  158. resAttrs[title] = val
  159. } else {
  160. hideResAttrs[title] = val
  161. }
  162. } else if val != "" {
  163. resAttrs[key] = val
  164. }
  165. }
  166. item.SpanAttribute = spanAttrs
  167. item.ResourceAttribute = resAttrs
  168. list[i].SpanAttribute = spanAttrs
  169. list[i].ResourceAttribute = resAttrs
  170. s2hs[item.SpanID] = hideSpanAttrs
  171. s2hr[item.SpanID] = hideResAttrs
  172. if _, ok := mp[item.ParentSpanID]; !ok {
  173. mp[item.ParentSpanID] = []*models.Trace{}
  174. }
  175. // item.Duration /= 1e6 // 转成毫秒
  176. mp[item.ParentSpanID] = append(mp[item.ParentSpanID], &item)
  177. s2ps[item.SpanID] = item.ParentSpanID
  178. }
  179. hasRootSpan := true
  180. if _, ok := mp[""]; !ok {
  181. *root = dto.TraceDetailResp{}
  182. root.SpanID = "UNKNOWN SPAN ID"
  183. root.SpanName = "LOST ROOT SPAN"
  184. root.ServiceName = "UNKNOWN SERVICE NAME"
  185. root.TraceID = param.TraceId
  186. root.EventsTimestamp = []time.Time{}
  187. root.EventsName = []string{}
  188. root.EventsAttribute = []map[string]string{}
  189. root.LinksAttribute = []map[string]string{}
  190. root.LinksSpanID = []string{}
  191. root.LinksTraceID = []string{}
  192. root.LinksTraceState = []string{}
  193. root.SpanAttribute = map[string]string{}
  194. root.ResourceAttribute = map[string]string{}
  195. hasRootSpan = false
  196. mp[""] = append(mp[""], &root.Trace)
  197. }
  198. root.Trace = *mp[""][0]
  199. // 检测是否存在trace不完整的情况,如果不完整,补充一个中间span用于和root时行链接,使trace完整
  200. lostSpan := make(map[string]*models.Trace, 0) // 用于记录丢失的span
  201. lostSpanChildren := map[string][]models.Trace{} // 用于记录丢失的span的直接子span
  202. for _, item := range list {
  203. if _, ok := s2ps[item.ParentSpanID]; item.ParentSpanID != "" && !ok { // 上级span不存在,说明trace不完整
  204. if _, ok := lostSpan[item.ParentSpanID]; !ok {
  205. lostSpan[item.ParentSpanID] = &models.Trace{
  206. TraceID: param.TraceId,
  207. ParentSpanID: root.SpanID,
  208. SpanID: item.ParentSpanID,
  209. // SpanName: "VIRTUAL SPAN: " + item.ParentSpanID,
  210. SpanName: "VIRTUAL SPAN",
  211. ServiceName: "VIRTUAL SERVICE NAME",
  212. StatusCode: "STATUS_CODE_LOST",
  213. EventsTimestamp: []time.Time{},
  214. EventsName: []string{},
  215. EventsAttribute: []map[string]string{},
  216. LinksAttribute: []map[string]string{},
  217. LinksSpanID: []string{},
  218. LinksTraceID: []string{},
  219. LinksTraceState: []string{},
  220. SpanAttribute: map[string]string{},
  221. ResourceAttribute: map[string]string{},
  222. }
  223. mp[root.SpanID] = append(mp[root.SpanID], lostSpan[item.ParentSpanID])
  224. }
  225. lostSpanChildren[item.ParentSpanID] = append(lostSpanChildren[item.ParentSpanID], item)
  226. }
  227. }
  228. if len(lostSpan) > 0 { // 如果存在丢失span的情况, 统计丢失span的duration
  229. for i := range lostSpan {
  230. for _, span := range lostSpanChildren[lostSpan[i].SpanID] {
  231. lostSpan[i].Duration += span.Duration
  232. }
  233. }
  234. }
  235. if !hasRootSpan { // 如果root span不存在, 统计Duration
  236. for _, span := range mp[root.SpanID] {
  237. root.Duration += float64(span.Duration)
  238. }
  239. root.Trace.Duration = uint64(root.Duration)
  240. }
  241. totalDuration := root.Trace.Duration
  242. var genTraceTree func(*dto.TraceDetailResp)
  243. genTraceTree = func(root *dto.TraceDetailResp) {
  244. if totalDuration > 0 {
  245. root.Duration = math.Round(float64(root.Trace.Duration)/1e6*100) / 100
  246. root.DurationPersent = float64(root.Trace.Duration) / float64(totalDuration)
  247. }
  248. root.HiddenSpanAttributes = s2hs[root.SpanID]
  249. root.HiddenResourceAttributes = s2hr[root.SpanID]
  250. if _, ok := mp[root.SpanID]; !ok { // 不存在,说明当前span为叶子结点
  251. root.MaxDepth = 1
  252. return
  253. }
  254. maxChildDepth := 0
  255. for _, item := range mp[root.SpanID] {
  256. child := &dto.TraceDetailResp{Trace: *item}
  257. root.Children = append(root.Children, child)
  258. genTraceTree(child)
  259. if child.MaxDepth > maxChildDepth {
  260. maxChildDepth = child.MaxDepth
  261. }
  262. }
  263. root.MaxDepth = 1 + maxChildDepth
  264. }
  265. genTraceTree(root)
  266. return nil
  267. }
  268. func (t *Trace) GetTraceGraph(ctx context.Context, param *dto.TraceGraphReq, result *dto.TraceGraphResp) error {
  269. rawEdges := make([]dto.TraceGraphEdgeRaw, 0)
  270. if err := t.getEdges(ctx, param, &rawEdges); err != nil {
  271. return err
  272. }
  273. edges := make([]dto.TraceGraphEdge, 0, len(rawEdges))
  274. for _, row := range rawEdges {
  275. edges = append(edges, dto.TraceGraphEdge{
  276. ID: row.SourceSpan + "-" + row.TargetSpan,
  277. Source: row.SourceSpan,
  278. Target: row.TargetSpan,
  279. MainStat: row.RequestType,
  280. })
  281. }
  282. result.Edges = edges
  283. rawNodes := make([]dto.TraceGraphNodeRaw, 0)
  284. if err := t.getNodes(ctx, param, &rawNodes); err != nil {
  285. return err
  286. }
  287. nodes := make([]dto.TraceGraphNode, 0, len(rawNodes))
  288. mp := map[string]float64{} // spanid 与 子span对应的duration之和
  289. for _, row := range rawNodes {
  290. if _, ok := mp[row.ParentSpanId]; !ok {
  291. mp[row.ParentSpanId] = float64(0)
  292. }
  293. mp[row.ParentSpanId] += row.Duration
  294. }
  295. for _, row := range rawNodes {
  296. node := dto.TraceGraphNode{
  297. ID: row.SpanId,
  298. Title: row.ServiceName,
  299. SubTitle: row.SpanName,
  300. MainStat: fmt.Sprintf("total: %.2fms", row.Duration),
  301. SecondaryStat: fmt.Sprintf("current: %.2fms", row.Duration-mp[row.SpanId]),
  302. ArcGreen: 1,
  303. ArcRed: 0,
  304. }
  305. if row.SpanKind == "SPAN_KIND_CLIENT" {
  306. node.ArcRed = (row.Duration - mp[row.SpanId]) / row.Duration
  307. node.ArcGreen = 1 - node.ArcRed
  308. }
  309. nodes = append(nodes, node)
  310. }
  311. result.Nodes = nodes
  312. return nil
  313. }
  314. func (t *Trace) getNodes(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphNodeRaw) error {
  315. db := t.ChOrm.Table(models.TableNameTrace).
  316. Select("SpanId, SpanName, ServiceName, ParentSpanId, SpanKind, Duration/1e6 AS Duration").
  317. Where("TraceId = ?", param.TraceId).
  318. Find(result)
  319. if err := db.Error; err != nil {
  320. t.Log.Errorf("查询trace结点信息失败: %s", err)
  321. return err
  322. }
  323. return nil
  324. }
  325. func (t *Trace) getEdges(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphEdgeRaw) error {
  326. sql := fmt.Sprintf(`WITH %s
  327. SELECT
  328. ParentSpanId AS SourceSpan,
  329. SpanId AS TargetSpan,
  330. IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], IF(SpanAttributes['messaging.system']!='', SpanAttributes['messaging.system'], 'internal')))) as RequestType
  331. FROM
  332. otel_traces
  333. WHERE
  334. TraceId = ? AND ParentSpanId != ''`, ChWithSql)
  335. if err := t.ChOrm.Raw(sql, param.TraceId).Find(result).Error; err != nil {
  336. t.Log.Errorf("查询trace边信息失败: %s", err)
  337. return err
  338. }
  339. return nil
  340. }
  341. func (s *Trace) DBSlowTop(req *dto.TraceDBSlowTopReq, resp *[]dto.TraceDBSlowTopResp) error {
  342. req.CheckFilling(time.Hour)
  343. if req.EndTime-req.StartTime > 3600 {
  344. req.StartTime = req.EndTime - 3600
  345. }
  346. if err := s.ChOrm.Table(models.TableNameTrace).
  347. Select("formatDateTime(Timestamp, '%F %H:%i:%S', 'PRC') AS Datetime, toUnixTimestamp(Timestamp) AS Timestamp1, TraceId, SpanId, SpanName, ServiceName, SpanAttributes['db.statement'] as Statement, Duration/1e6 Duration").
  348. Where("AppAlias", req.AppAlias).
  349. Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
  350. Where("SpanKind='SPAN_KIND_CLIENT'").
  351. Where("SpanAttributes['db.system']!=''").
  352. Order("Duration DESC").
  353. Limit(int(req.Limit)).
  354. Find(resp).Error; err != nil {
  355. return errors.Wrap(err, "获取db span排行失败")
  356. }
  357. svcMap, err := query.NewService().ServiceMap(req.AppAlias)
  358. if err != nil {
  359. return err
  360. }
  361. for i := range *resp {
  362. (*resp)[i].ServiceNameCN = svcMap[(*resp)[i].ServiceName]
  363. }
  364. return nil
  365. }