package service import ( "context" "fmt" "go-admin/app/observe/models" "go-admin/app/observe/models/query" "go-admin/app/observe/service/dto" cDto "go-admin/common/dto" "go-admin/utils" "math" "strings" "time" "github.com/pkg/errors" ) type Trace struct { utils.OtService } func (t *Trace) GetPage(ctx context.Context, req *dto.TraceListReq, result *[]dto.TraceListResp, count *int64) error { // 先查出所有的TraceId, 之所以这么做,是因为如果直接按时间、时延搜索的话,会过滤部分span,导致span数量、service数量不正确 db := t.ChOrm.Debug().Table(models.TableNameTrace).Distinct() if req.AppAlias != "" && req.TraceId == "" { db.Where("AppAlias", req.AppAlias) } if req.ServiceName != "" { if strings.Contains(req.ServiceName, "---") { // 兼容前端 serviceNames := strings.Split(req.ServiceName, "---") req.ServiceName = serviceNames[1] } db.Where("ServiceName", req.ServiceName) } req.CheckFilling(time.Hour * 48) // 默认两天 // 由于之前返回给前端时为2024-03-27T13:19:26.272775882+08:00这种格式,前端将之转成秒级时间戳后会缩小查询范围,导致查询不到,所以下面会+1或-1 db.Where("Timestamp > toDateTime(?)", req.StartTime-1) db.Where("Timestamp < toDateTime(?)", req.EndTime+1) if req.TraceId != "" { db.Where("TraceId = ?", req.TraceId) } if req.MinDuration > 0 { db.Where("Duration >= ?", math.Floor(req.MinDuration)*float64(time.Millisecond)) } if req.MaxDuration > 0 { db.Where("Duration <= ?", math.Ceil(req.MaxDuration)*float64(time.Millisecond)) } if req.Failed { // db.Where("HttpCode >= ?", http.StatusBadRequest) db.Where("StatusCode = 'STATUS_CODE_ERROR' AND StatusMessage != '' AND StatusMessage != 'status code:0'") } if req.Kind != "" && req.SubconditionValue != "" { db.Where("AppAlias", req.AppAlias) switch req.Kind { case "app": case "biz": case "interface": db.Where("") case "service": db.Where("ServiceName = ", req.SubconditionValue) default: } } if req.StatusMessage != "" { db.Where("StatusMessage", req.StatusMessage) } // 这里添加了分页,即使条件为空也不会查询大量数据 db.Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).Order("Duration DESC") tids := make([]string, 0) if err := db.Pluck("TraceId", &tids).Limit(-1).Offset(-1).Distinct("TraceId").Count(count).Error; err != nil { return errors.Wrap(err, "查询trace出错") } var res []struct { TraceId string `json:"trace_id"` MinTimestamp time.Time `json:"timestamp"` MaxDuration float64 `json:"duration"` ServiceNum int64 `json:"service_num"` SpanNum int64 `json:"span_num"` } db = t.ChOrm.Debug().Table(models.TableNameTrace) fields := []string{ "TraceId, MIN(Timestamp) AS MinTimestamp", "MAX(Duration)/1e6 AS MaxDuration", "COUNT(DISTINCT ServiceName) AS ServiceNum", "COUNT() AS SpanNum", } db.Select(fields).Where("TraceId IN ?", tids) // db.Where("Timestamp>NOW()-INTERVAL 48 HOURS") // 最多查询48小时以内的,可能有些许误差,因为有的trace链路较长 // 缩小查询范围,正常链路都不会太长,取前后一小时,应该能足够覆盖,特殊情况可能有遗漏 db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", req.StartTime-3600, req.EndTime+3600) if req.AppAlias != "" && req.TraceId == "" { db.Where("AppAlias", req.AppAlias) } if err := db.Group("TraceId").Find(&res).Error; err != nil { t.Log.Errorf("执行sql失败: %s", err) return err } *result = make([]dto.TraceListResp, len(res)) for i := range *result { (*result)[i].MinTimestamp = res[i].MinTimestamp.Local().Format(time.DateTime) (*result)[i].MaxDuration = res[i].MaxDuration (*result)[i].ServiceNum = res[i].ServiceNum (*result)[i].SpanNum = res[i].SpanNum (*result)[i].TraceId = res[i].TraceId } return nil } func (t *Trace) GetSpans(ctx context.Context, param *dto.TraceDetailReq, root *dto.TraceDetailResp) error { list := []models.Trace{} db := t.ChOrm.Table(models.TableNameTrace). Distinct("*"). // 由于数据库中存在parent id生蚝的情况,可能是由于网张原因导致的,这里去一下重 // Select("ServiceName, SpanName, SpanKind, SpanId, ParentSpanId, Duration/1e6 AS Duration"). Where("TraceId=?", param.TraceId) param.CheckFilling(30 * time.Minute) db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", param.StartTime, param.EndTime) if err := db.Order("Timestamp ASC").Find(&list).Error; err != nil { t.Log.Errorf("执行sql失败: %s", err) return err } if len(list) == 0 { return errors.New("指定范围内不存在span数据") } dict := []models.Dict{} if err := t.Orm.Model(&models.Dict{}).Where("type in ('SpanAttributes', 'ResourceAttributes')").Find(&dict).Error; err != nil { return errors.Wrap(err, "获取字典数据失败") } dictMap := make(map[string]models.Dict, len(dict)) for _, item := range dict { dictMap[item.Label] = item } mp := map[string][]*models.Trace{} s2hs := map[string]map[string]string{} s2hr := map[string]map[string]string{} s2ps := map[string]string{} // SpanId => ParentSpanId for i := range list { // list[i].Duration /= 1e6 item := list[i] spanAttrs, hideSpanAttrs := make(map[string]string), make(map[string]string) resAttrs, hideResAttrs := make(map[string]string), make(map[string]string) for key, val := range item.SpanAttribute { if item, ok := dictMap[key]; ok { title := item.Title if title == "" { title = key } if item.Hide == 0 { spanAttrs[title] = val } else { hideSpanAttrs[title] = val } } else if val != "" { spanAttrs[key] = val } } for key, val := range item.ResourceAttribute { if item, ok := dictMap[key]; ok { title := item.Title if title == "" { title = key } if item.Hide == 0 { resAttrs[title] = val } else { hideResAttrs[title] = val } } else if val != "" { resAttrs[key] = val } } item.SpanAttribute = spanAttrs item.ResourceAttribute = resAttrs list[i].SpanAttribute = spanAttrs list[i].ResourceAttribute = resAttrs s2hs[item.SpanID] = hideSpanAttrs s2hr[item.SpanID] = hideResAttrs if _, ok := mp[item.ParentSpanID]; !ok { mp[item.ParentSpanID] = []*models.Trace{} } // item.Duration /= 1e6 // 转成毫秒 mp[item.ParentSpanID] = append(mp[item.ParentSpanID], &item) s2ps[item.SpanID] = item.ParentSpanID } hasRootSpan := true if _, ok := mp[""]; !ok { *root = dto.TraceDetailResp{} root.SpanID = "UNKNOWN SPAN ID" root.SpanName = "LOST ROOT SPAN" root.ServiceName = "UNKNOWN SERVICE NAME" root.TraceID = param.TraceId root.EventsTimestamp = []time.Time{} root.EventsName = []string{} root.EventsAttribute = []map[string]string{} root.LinksAttribute = []map[string]string{} root.LinksSpanID = []string{} root.LinksTraceID = []string{} root.LinksTraceState = []string{} root.SpanAttribute = map[string]string{} root.ResourceAttribute = map[string]string{} hasRootSpan = false mp[""] = append(mp[""], &root.Trace) } root.Trace = *mp[""][0] // 检测是否存在trace不完整的情况,如果不完整,补充一个中间span用于和root时行链接,使trace完整 lostSpan := make(map[string]*models.Trace, 0) // 用于记录丢失的span lostSpanChildren := map[string][]models.Trace{} // 用于记录丢失的span的直接子span for _, item := range list { if _, ok := s2ps[item.ParentSpanID]; item.ParentSpanID != "" && !ok { // 上级span不存在,说明trace不完整 if _, ok := lostSpan[item.ParentSpanID]; !ok { lostSpan[item.ParentSpanID] = &models.Trace{ TraceID: param.TraceId, ParentSpanID: root.SpanID, SpanID: item.ParentSpanID, // SpanName: "VIRTUAL SPAN: " + item.ParentSpanID, SpanName: "VIRTUAL SPAN", ServiceName: "VIRTUAL SERVICE NAME", StatusCode: "STATUS_CODE_LOST", EventsTimestamp: []time.Time{}, EventsName: []string{}, EventsAttribute: []map[string]string{}, LinksAttribute: []map[string]string{}, LinksSpanID: []string{}, LinksTraceID: []string{}, LinksTraceState: []string{}, SpanAttribute: map[string]string{}, ResourceAttribute: map[string]string{}, } mp[root.SpanID] = append(mp[root.SpanID], lostSpan[item.ParentSpanID]) } lostSpanChildren[item.ParentSpanID] = append(lostSpanChildren[item.ParentSpanID], item) } } if len(lostSpan) > 0 { // 如果存在丢失span的情况, 统计丢失span的duration for i := range lostSpan { for _, span := range lostSpanChildren[lostSpan[i].SpanID] { lostSpan[i].Duration += span.Duration } } } if !hasRootSpan { // 如果root span不存在, 统计Duration for _, span := range mp[root.SpanID] { root.Duration += float64(span.Duration) } root.Trace.Duration = uint64(root.Duration) } totalDuration := root.Trace.Duration var genTraceTree func(*dto.TraceDetailResp) genTraceTree = func(root *dto.TraceDetailResp) { if totalDuration > 0 { root.Duration = math.Round(float64(root.Trace.Duration)/1e6*100) / 100 root.DurationPersent = float64(root.Trace.Duration) / float64(totalDuration) } root.HiddenSpanAttributes = s2hs[root.SpanID] root.HiddenResourceAttributes = s2hr[root.SpanID] if _, ok := mp[root.SpanID]; !ok { // 不存在,说明当前span为叶子结点 root.MaxDepth = 1 return } maxChildDepth := 0 for _, item := range mp[root.SpanID] { child := &dto.TraceDetailResp{Trace: *item} root.Children = append(root.Children, child) genTraceTree(child) if child.MaxDepth > maxChildDepth { maxChildDepth = child.MaxDepth } } root.MaxDepth = 1 + maxChildDepth } genTraceTree(root) return nil } func (t *Trace) GetTraceGraph(ctx context.Context, param *dto.TraceGraphReq, result *dto.TraceGraphResp) error { rawEdges := make([]dto.TraceGraphEdgeRaw, 0) if err := t.getEdges(ctx, param, &rawEdges); err != nil { return err } edges := make([]dto.TraceGraphEdge, 0, len(rawEdges)) for _, row := range rawEdges { edges = append(edges, dto.TraceGraphEdge{ ID: row.SourceSpan + "-" + row.TargetSpan, Source: row.SourceSpan, Target: row.TargetSpan, MainStat: row.RequestType, }) } result.Edges = edges rawNodes := make([]dto.TraceGraphNodeRaw, 0) if err := t.getNodes(ctx, param, &rawNodes); err != nil { return err } nodes := make([]dto.TraceGraphNode, 0, len(rawNodes)) mp := map[string]float64{} // spanid 与 子span对应的duration之和 for _, row := range rawNodes { if _, ok := mp[row.ParentSpanId]; !ok { mp[row.ParentSpanId] = float64(0) } mp[row.ParentSpanId] += row.Duration } for _, row := range rawNodes { node := dto.TraceGraphNode{ ID: row.SpanId, Title: row.ServiceName, SubTitle: row.SpanName, MainStat: fmt.Sprintf("total: %.2fms", row.Duration), SecondaryStat: fmt.Sprintf("current: %.2fms", row.Duration-mp[row.SpanId]), ArcGreen: 1, ArcRed: 0, } if row.SpanKind == "SPAN_KIND_CLIENT" { node.ArcRed = (row.Duration - mp[row.SpanId]) / row.Duration node.ArcGreen = 1 - node.ArcRed } nodes = append(nodes, node) } result.Nodes = nodes return nil } func (t *Trace) getNodes(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphNodeRaw) error { db := t.ChOrm.Table(models.TableNameTrace). Select("SpanId, SpanName, ServiceName, ParentSpanId, SpanKind, Duration/1e6 AS Duration"). Where("TraceId = ?", param.TraceId). Find(result) if err := db.Error; err != nil { t.Log.Errorf("查询trace结点信息失败: %s", err) return err } return nil } func (t *Trace) getEdges(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphEdgeRaw) error { sql := fmt.Sprintf(`WITH %s SELECT ParentSpanId AS SourceSpan, SpanId AS TargetSpan, IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], IF(SpanAttributes['messaging.system']!='', SpanAttributes['messaging.system'], 'internal')))) as RequestType FROM otel_traces WHERE TraceId = ? AND ParentSpanId != ''`, ChWithSql) if err := t.ChOrm.Raw(sql, param.TraceId).Find(result).Error; err != nil { t.Log.Errorf("查询trace边信息失败: %s", err) return err } return nil } func (s *Trace) DBSlowTop(req *dto.TraceDBSlowTopReq, resp *[]dto.TraceDBSlowTopResp) error { req.CheckFilling(time.Hour) if req.EndTime-req.StartTime > 3600 { req.StartTime = req.EndTime - 3600 } if err := s.ChOrm.Table(models.TableNameTrace). Select("formatDateTime(Timestamp, '%F %H:%i:%S', 'PRC') AS Datetime, toUnixTimestamp(Timestamp) AS Timestamp1, TraceId, SpanId, SpanName, ServiceName, SpanAttributes['db.statement'] as Statement, Duration/1e6 Duration"). Where("AppAlias", req.AppAlias). Where("Timestamp>=toDateTime(?) AND Timestamp