|
- package service
- import (
- "context"
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/models/query"
- "go-admin/app/observe/service/dto"
- cDto "go-admin/common/dto"
- "go-admin/utils"
- "math"
- "strings"
- "time"
- "github.com/pkg/errors"
- )
- type Trace struct {
- utils.OtService
- }
- func (t *Trace) GetPage(ctx context.Context, req *dto.TraceListReq, result *[]dto.TraceListResp, count *int64) error {
- // 先查出所有的TraceId, 之所以这么做,是因为如果直接按时间、时延搜索的话,会过滤部分span,导致span数量、service数量不正确
- db := t.ChOrm.Debug().Table(models.TableNameTrace).Distinct()
- if req.AppAlias != "" && req.TraceId == "" {
- db.Where("AppAlias", req.AppAlias)
- }
- if req.ServiceName != "" {
- if strings.Contains(req.ServiceName, "---") { // 兼容前端
- serviceNames := strings.Split(req.ServiceName, "---")
- req.ServiceName = serviceNames[1]
- }
- db.Where("ServiceName", req.ServiceName)
- }
- req.CheckFilling(time.Hour * 48) // 默认两天
- // 由于之前返回给前端时为2024-03-27T13:19:26.272775882+08:00这种格式,前端将之转成秒级时间戳后会缩小查询范围,导致查询不到,所以下面会+1或-1
- db.Where("Timestamp > toDateTime(?)", req.StartTime-1)
- db.Where("Timestamp < toDateTime(?)", req.EndTime+1)
- if req.TraceId != "" {
- db.Where("TraceId = ?", req.TraceId)
- }
- if req.MinDuration > 0 {
- db.Where("Duration >= ?", math.Floor(req.MinDuration)*float64(time.Millisecond))
- }
- if req.MaxDuration > 0 {
- db.Where("Duration <= ?", math.Ceil(req.MaxDuration)*float64(time.Millisecond))
- }
- if req.Failed {
- // db.Where("HttpCode >= ?", http.StatusBadRequest)
- db.Where("StatusCode = 'STATUS_CODE_ERROR' AND StatusMessage != '' AND StatusMessage != 'status code:0'")
- }
- if req.Kind != "" && req.SubconditionValue != "" {
- db.Where("AppAlias", req.AppAlias)
- switch req.Kind {
- case "app":
- case "biz":
- case "interface":
- db.Where("")
- case "service":
- db.Where("ServiceName = ", req.SubconditionValue)
- default:
- }
- }
- if req.StatusMessage != "" {
- db.Where("StatusMessage", req.StatusMessage)
- }
- // 这里添加了分页,即使条件为空也不会查询大量数据
- db.Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).Order("Duration DESC")
- tids := make([]string, 0)
- if err := db.Pluck("TraceId", &tids).Limit(-1).Offset(-1).Distinct("TraceId").Count(count).Error; err != nil {
- return errors.Wrap(err, "查询trace出错")
- }
- var res []struct {
- TraceId string `json:"trace_id"`
- MinTimestamp time.Time `json:"timestamp"`
- MaxDuration float64 `json:"duration"`
- ServiceNum int64 `json:"service_num"`
- SpanNum int64 `json:"span_num"`
- }
- db = t.ChOrm.Debug().Table(models.TableNameTrace)
- fields := []string{
- "TraceId, MIN(Timestamp) AS MinTimestamp",
- "MAX(Duration)/1e6 AS MaxDuration",
- "COUNT(DISTINCT ServiceName) AS ServiceNum",
- "COUNT() AS SpanNum",
- }
- db.Select(fields).Where("TraceId IN ?", tids)
- // db.Where("Timestamp>NOW()-INTERVAL 48 HOURS") // 最多查询48小时以内的,可能有些许误差,因为有的trace链路较长
- // 缩小查询范围,正常链路都不会太长,取前后一小时,应该能足够覆盖,特殊情况可能有遗漏
- db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", req.StartTime-3600, req.EndTime+3600)
- if req.AppAlias != "" && req.TraceId == "" {
- db.Where("AppAlias", req.AppAlias)
- }
- if err := db.Group("TraceId").Find(&res).Error; err != nil {
- t.Log.Errorf("执行sql失败: %s", err)
- return err
- }
- *result = make([]dto.TraceListResp, len(res))
- for i := range *result {
- (*result)[i].MinTimestamp = res[i].MinTimestamp.Local().Format(time.DateTime)
- (*result)[i].MaxDuration = res[i].MaxDuration
- (*result)[i].ServiceNum = res[i].ServiceNum
- (*result)[i].SpanNum = res[i].SpanNum
- (*result)[i].TraceId = res[i].TraceId
- }
- return nil
- }
- func (t *Trace) GetSpans(ctx context.Context, param *dto.TraceDetailReq, root *dto.TraceDetailResp) error {
- list := []models.Trace{}
- db := t.ChOrm.Table(models.TableNameTrace).
- Distinct("*"). // 由于数据库中存在parent id生蚝的情况,可能是由于网张原因导致的,这里去一下重
- // Select("ServiceName, SpanName, SpanKind, SpanId, ParentSpanId, Duration/1e6 AS Duration").
- Where("TraceId=?", param.TraceId)
- param.CheckFilling(30 * time.Minute)
- db.Where("Timestamp>=toDateTime(?) AND Timestamp<=toDateTime(?)", param.StartTime, param.EndTime)
- if err := db.Order("Timestamp ASC").Find(&list).Error; err != nil {
- t.Log.Errorf("执行sql失败: %s", err)
- return err
- }
- if len(list) == 0 {
- return errors.New("指定范围内不存在span数据")
- }
- dict := []models.Dict{}
- if err := t.Orm.Model(&models.Dict{}).Where("type in ('SpanAttributes', 'ResourceAttributes')").Find(&dict).Error; err != nil {
- return errors.Wrap(err, "获取字典数据失败")
- }
- dictMap := make(map[string]models.Dict, len(dict))
- for _, item := range dict {
- dictMap[item.Label] = item
- }
- mp := map[string][]*models.Trace{}
- s2hs := map[string]map[string]string{}
- s2hr := map[string]map[string]string{}
- s2ps := map[string]string{} // SpanId => ParentSpanId
- for i := range list {
- // list[i].Duration /= 1e6
- item := list[i]
- spanAttrs, hideSpanAttrs := make(map[string]string), make(map[string]string)
- resAttrs, hideResAttrs := make(map[string]string), make(map[string]string)
- for key, val := range item.SpanAttribute {
- if item, ok := dictMap[key]; ok {
- title := item.Title
- if title == "" {
- title = key
- }
- if item.Hide == 0 {
- spanAttrs[title] = val
- } else {
- hideSpanAttrs[title] = val
- }
- } else if val != "" {
- spanAttrs[key] = val
- }
- }
- for key, val := range item.ResourceAttribute {
- if item, ok := dictMap[key]; ok {
- title := item.Title
- if title == "" {
- title = key
- }
- if item.Hide == 0 {
- resAttrs[title] = val
- } else {
- hideResAttrs[title] = val
- }
- } else if val != "" {
- resAttrs[key] = val
- }
- }
- item.SpanAttribute = spanAttrs
- item.ResourceAttribute = resAttrs
- list[i].SpanAttribute = spanAttrs
- list[i].ResourceAttribute = resAttrs
- s2hs[item.SpanID] = hideSpanAttrs
- s2hr[item.SpanID] = hideResAttrs
- if _, ok := mp[item.ParentSpanID]; !ok {
- mp[item.ParentSpanID] = []*models.Trace{}
- }
- // item.Duration /= 1e6 // 转成毫秒
- mp[item.ParentSpanID] = append(mp[item.ParentSpanID], &item)
- s2ps[item.SpanID] = item.ParentSpanID
- }
- hasRootSpan := true
- if _, ok := mp[""]; !ok {
- *root = dto.TraceDetailResp{}
- root.SpanID = "UNKNOWN SPAN ID"
- root.SpanName = "LOST ROOT SPAN"
- root.ServiceName = "UNKNOWN SERVICE NAME"
- root.TraceID = param.TraceId
- root.EventsTimestamp = []time.Time{}
- root.EventsName = []string{}
- root.EventsAttribute = []map[string]string{}
- root.LinksAttribute = []map[string]string{}
- root.LinksSpanID = []string{}
- root.LinksTraceID = []string{}
- root.LinksTraceState = []string{}
- root.SpanAttribute = map[string]string{}
- root.ResourceAttribute = map[string]string{}
- hasRootSpan = false
- mp[""] = append(mp[""], &root.Trace)
- }
- root.Trace = *mp[""][0]
- // 检测是否存在trace不完整的情况,如果不完整,补充一个中间span用于和root时行链接,使trace完整
- lostSpan := make(map[string]*models.Trace, 0) // 用于记录丢失的span
- lostSpanChildren := map[string][]models.Trace{} // 用于记录丢失的span的直接子span
- for _, item := range list {
- if _, ok := s2ps[item.ParentSpanID]; item.ParentSpanID != "" && !ok { // 上级span不存在,说明trace不完整
- if _, ok := lostSpan[item.ParentSpanID]; !ok {
- lostSpan[item.ParentSpanID] = &models.Trace{
- TraceID: param.TraceId,
- ParentSpanID: root.SpanID,
- SpanID: item.ParentSpanID,
- // SpanName: "VIRTUAL SPAN: " + item.ParentSpanID,
- SpanName: "VIRTUAL SPAN",
- ServiceName: "VIRTUAL SERVICE NAME",
- StatusCode: "STATUS_CODE_LOST",
- EventsTimestamp: []time.Time{},
- EventsName: []string{},
- EventsAttribute: []map[string]string{},
- LinksAttribute: []map[string]string{},
- LinksSpanID: []string{},
- LinksTraceID: []string{},
- LinksTraceState: []string{},
- SpanAttribute: map[string]string{},
- ResourceAttribute: map[string]string{},
- }
- mp[root.SpanID] = append(mp[root.SpanID], lostSpan[item.ParentSpanID])
- }
- lostSpanChildren[item.ParentSpanID] = append(lostSpanChildren[item.ParentSpanID], item)
- }
- }
- if len(lostSpan) > 0 { // 如果存在丢失span的情况, 统计丢失span的duration
- for i := range lostSpan {
- for _, span := range lostSpanChildren[lostSpan[i].SpanID] {
- lostSpan[i].Duration += span.Duration
- }
- }
- }
- if !hasRootSpan { // 如果root span不存在, 统计Duration
- for _, span := range mp[root.SpanID] {
- root.Duration += float64(span.Duration)
- }
- root.Trace.Duration = uint64(root.Duration)
- }
- totalDuration := root.Trace.Duration
- var genTraceTree func(*dto.TraceDetailResp)
- genTraceTree = func(root *dto.TraceDetailResp) {
- if totalDuration > 0 {
- root.Duration = math.Round(float64(root.Trace.Duration)/1e6*100) / 100
- root.DurationPersent = float64(root.Trace.Duration) / float64(totalDuration)
- }
- root.HiddenSpanAttributes = s2hs[root.SpanID]
- root.HiddenResourceAttributes = s2hr[root.SpanID]
- if _, ok := mp[root.SpanID]; !ok { // 不存在,说明当前span为叶子结点
- root.MaxDepth = 1
- return
- }
- maxChildDepth := 0
- for _, item := range mp[root.SpanID] {
- child := &dto.TraceDetailResp{Trace: *item}
- root.Children = append(root.Children, child)
- genTraceTree(child)
- if child.MaxDepth > maxChildDepth {
- maxChildDepth = child.MaxDepth
- }
- }
- root.MaxDepth = 1 + maxChildDepth
- }
- genTraceTree(root)
- return nil
- }
- func (t *Trace) GetTraceGraph(ctx context.Context, param *dto.TraceGraphReq, result *dto.TraceGraphResp) error {
- rawEdges := make([]dto.TraceGraphEdgeRaw, 0)
- if err := t.getEdges(ctx, param, &rawEdges); err != nil {
- return err
- }
- edges := make([]dto.TraceGraphEdge, 0, len(rawEdges))
- for _, row := range rawEdges {
- edges = append(edges, dto.TraceGraphEdge{
- ID: row.SourceSpan + "-" + row.TargetSpan,
- Source: row.SourceSpan,
- Target: row.TargetSpan,
- MainStat: row.RequestType,
- })
- }
- result.Edges = edges
- rawNodes := make([]dto.TraceGraphNodeRaw, 0)
- if err := t.getNodes(ctx, param, &rawNodes); err != nil {
- return err
- }
- nodes := make([]dto.TraceGraphNode, 0, len(rawNodes))
- mp := map[string]float64{} // spanid 与 子span对应的duration之和
- for _, row := range rawNodes {
- if _, ok := mp[row.ParentSpanId]; !ok {
- mp[row.ParentSpanId] = float64(0)
- }
- mp[row.ParentSpanId] += row.Duration
- }
- for _, row := range rawNodes {
- node := dto.TraceGraphNode{
- ID: row.SpanId,
- Title: row.ServiceName,
- SubTitle: row.SpanName,
- MainStat: fmt.Sprintf("total: %.2fms", row.Duration),
- SecondaryStat: fmt.Sprintf("current: %.2fms", row.Duration-mp[row.SpanId]),
- ArcGreen: 1,
- ArcRed: 0,
- }
- if row.SpanKind == "SPAN_KIND_CLIENT" {
- node.ArcRed = (row.Duration - mp[row.SpanId]) / row.Duration
- node.ArcGreen = 1 - node.ArcRed
- }
- nodes = append(nodes, node)
- }
- result.Nodes = nodes
- return nil
- }
- func (t *Trace) getNodes(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphNodeRaw) error {
- db := t.ChOrm.Table(models.TableNameTrace).
- Select("SpanId, SpanName, ServiceName, ParentSpanId, SpanKind, Duration/1e6 AS Duration").
- Where("TraceId = ?", param.TraceId).
- Find(result)
- if err := db.Error; err != nil {
- t.Log.Errorf("查询trace结点信息失败: %s", err)
- return err
- }
- return nil
- }
- func (t *Trace) getEdges(_ context.Context, param *dto.TraceGraphReq, result *[]dto.TraceGraphEdgeRaw) error {
- sql := fmt.Sprintf(`WITH %s
- SELECT
- ParentSpanId AS SourceSpan,
- SpanId AS TargetSpan,
- IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], IF(SpanAttributes['messaging.system']!='', SpanAttributes['messaging.system'], 'internal')))) as RequestType
- FROM
- otel_traces
- WHERE
- TraceId = ? AND ParentSpanId != ''`, ChWithSql)
- if err := t.ChOrm.Raw(sql, param.TraceId).Find(result).Error; err != nil {
- t.Log.Errorf("查询trace边信息失败: %s", err)
- return err
- }
- return nil
- }
- func (s *Trace) DBSlowTop(req *dto.TraceDBSlowTopReq, resp *[]dto.TraceDBSlowTopResp) error {
- req.CheckFilling(time.Hour)
- if req.EndTime-req.StartTime > 3600 {
- req.StartTime = req.EndTime - 3600
- }
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select("formatDateTime(Timestamp, '%F %H:%i:%S', 'PRC') AS Datetime, toUnixTimestamp(Timestamp) AS Timestamp1, TraceId, SpanId, SpanName, ServiceName, SpanAttributes['db.statement'] as Statement, Duration/1e6 Duration").
- Where("AppAlias", req.AppAlias).
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Where("SpanKind='SPAN_KIND_CLIENT'").
- Where("SpanAttributes['db.system']!=''").
- Order("Duration DESC").
- Limit(int(req.Limit)).
- Find(resp).Error; err != nil {
- return errors.Wrap(err, "获取db span排行失败")
- }
- svcMap, err := query.NewService().ServiceMap(req.AppAlias)
- if err != nil {
- return err
- }
- for i := range *resp {
- (*resp)[i].ServiceNameCN = svcMap[(*resp)[i].ServiceName]
- }
- return nil
- }
|