|
- package service
- import (
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/models/query"
- "go-admin/app/observe/service/dto"
- cDto "go-admin/common/dto"
- "go-admin/common/prometheus"
- cUtils "go-admin/common/utils"
- "go-admin/config"
- "go-admin/utils"
- "math"
- "strings"
- "sync"
- "time"
- adModels "go-admin/app/admin/models"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/pkg/errors"
- )
- type Biz struct {
- // service.Service
- utils.OtService
- }
- func (b *Biz) List(req *dto.BizListReq, resp *[]dto.BizListResp, count *int64) error {
- bizIds := []int64{}
- if req.AppAlias != "" && (req.Route != "" || req.ServiceName != "") {
- db := b.Orm.Model(&models.BizNode{}).
- Where("ot_biz_node.app_alias", req.AppAlias)
- if req.Route != "" {
- db.Where("span_kind='SPAN_KIND_SERVER'").Where("span_name like ?", "%"+req.Route)
- }
- if req.ServiceName != "" {
- serviceNames := strings.Split(req.ServiceName, ",")
- db.Where("service_name in ?", serviceNames)
- }
- err := db.Joins("inner join ot_biz_edge be on be.source=ot_biz_node.id or be.target=ot_biz_node.id").
- Pluck("be.biz_id", &bizIds).Error
- if err != nil {
- return errors.Wrap(err, "获取业务id失败")
- }
- }
- list := []models.Biz{}
- db := b.Orm.Model(&models.Biz{}).
- Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex()))
- if len(bizIds) > 0 {
- db.Where("id in ?", bizIds)
- }
- if req.AppAlias != "" {
- db.Where("ot_biz.app_alias", req.AppAlias)
- } else { // 兼容旧逻辑,后期删除
- db.Where("app_id", req.AppId)
- }
- if err := db.
- Order("favor DESC, id ASC").Find(&list).
- Limit(-1).Offset(-1).Count(count).Error; err != nil {
- return errors.Wrap(err, "获取业务列表失败")
- }
- if len(list) == 0 {
- return nil
- }
- svcMap, err := query.NewService().ServiceMap(list[0].AppAlias)
- if err != nil {
- return err
- }
- for _, item := range list {
- r := new(dto.BizListResp)
- r.ID = int64(item.ID)
- r.Name = item.Name
- r.AppId = item.AppID
- r.ServiceName = item.ServiceName
- r.ServiceNameCN = svcMap[item.ServiceName]
- r.SpanName = item.SpanName
- r.Favor = int8(item.Favor)
- *resp = append(*resp, *r)
- }
- return nil
- }
- func (s *Biz) StatsFromClickhouse(req *dto.BizStatsReq, resp *dto.BizStatsResp) error {
- biz := models.Biz{}
- if err := s.Orm.Model(&models.Biz{}).
- Where("id", req.BizId).
- First(&biz).Error; err != nil {
- return errors.Wrap(err, "获取业务失败")
- }
- req.CheckFilling(time.Minute * 5)
- if err := s.ChOrm.Model(&models.Trace{}).
- Select("ServiceName, count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, max(Duration)/1e6 as Max, avg(Duration)/1e6 as Avg").
- Where("AppAlias", biz.AppAlias).
- Where("ServiceName", biz.ServiceName).
- Where("SpanName", biz.SpanName).
- Where("ParentSpanId", "").
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Group("ServiceName").Find(resp).Error; err != nil {
- return errors.Wrap(err, "获取基础统计信息失败")
- }
- resp.BizId = biz.ID
- resp.BizHash = biz.Hash
- if resp.Total > 0 {
- resp.ErrorRate = float64(resp.ErrorNum) / float64(resp.Total)
- }
- resp.Rpm = math.Round(float64(resp.Total)/float64((req.EndTime-req.StartTime)/60)*100) / 100
- resp.Max = math.Round(resp.Max*100) / 100
- resp.Avg = math.Round(resp.Avg*100) / 100
- timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- if req.EndTime-req.StartTime >= 60*60 {
- timeField = "formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- }
- fields := []string{
- timeField,
- "quantile(0.5)(Duration)/1e6 as P50Duration",
- "quantile(0.90)(Duration)/1e6 as P90Duration",
- "quantile(0.99)(Duration)/1e6 as P99Duration",
- }
- quantiles := []struct {
- StartTime string
- P50Duration float64
- P90Duration float64
- P99Duration float64
- }{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
- Where("AppAlias", biz.AppAlias).
- Where("ServiceName", biz.ServiceName).
- Where("SpanName", biz.SpanName).
- Where("ParentSpanId", "").
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
- return errors.Wrap(err, "获取分位数数据失败")
- }
- resp.Quantiles = dto.Quantiles{
- Time: make([]string, len(quantiles)),
- P50: make([]float64, len(quantiles)),
- P90: make([]float64, len(quantiles)),
- P99: make([]float64, len(quantiles)),
- }
- for i, quantile := range quantiles {
- resp.Quantiles.Time[i] = quantile.StartTime
- resp.Quantiles.P50[i] = math.Round(quantile.P50Duration*100) / 100
- resp.Quantiles.P90[i] = math.Round(quantile.P90Duration*100) / 100
- resp.Quantiles.P99[i] = math.Round(quantile.P99Duration*100) / 100
- }
- return nil
- }
- func (s *Biz) Stats(req *dto.BizStatsReq, resp *dto.BizStatsResp) error {
- biz := models.Biz{}
- if err := s.Orm.Model(&models.Biz{}).
- Where("id", req.BizId).
- First(&biz).Error; err != nil {
- return errors.Wrap(err, "获取业务失败")
- }
- req.BizHash = biz.Hash // 后面bizHash从参数中传
- wg := sync.WaitGroup{}
- req.CheckFilling(time.Minute * 5)
- mins := (req.EndTime - req.StartTime) / 60
- metric := "observe_biz_node_duration_milliseconds"
- labels := map[string]string{"biz_hash": req.BizHash}
- ts := time.Unix(req.EndTime, 0)
- tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
- // 计算错误
- errorRate := float64(0)
- wg.Add(1)
- go tb.ErrorRate(&errorRate)
- // 计算分位数
- times := []string{}
- p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
- wg.Add(3)
- go tb.QuantileMinutes(0.5, &[]string{}, &p50s)
- go tb.QuantileMinutes(0.9, &[]string{}, &p90s)
- go tb.QuantileMinutes(0.99, ×, &p99s)
- appAlias := ""
- if err := s.Orm.Model(&adModels.OtApps{}).Where("alias", biz.AppAlias).Pluck("alias", &appAlias).Error; err != nil {
- return errors.Wrap(err, "获取应用别名失败")
- }
- // 计算 max duration, min duration, avg duration, 从clickhouse读取,效率较低,但max min在prometheus不好算,尤其是min,应该是算不出来从prometheus
- // wg.Add(1)
- // st := struct {
- // MaxDuration float64
- // MinDuration float64
- // AvgDuration float64
- // }{}
- // go func() {
- // defer wg.Done()
- // s.ChOrm.Table(models.TableNameMetricsHistogram).
- // Select("avg(Sum/Count) AvgDuration, min(Min) MinDuration , max(Max) MaxDuration").
- // Where("MetricName", "observe.biz.duration").
- // Where(`Attributes['app.id']=?`, strconv.Itoa(int(biz.ID))).
- // Where("TimeUnix>=? AND TimeUnix<?", req.StartTime, req.EndTime).
- // Find(&st)
- // }()
- // 获取max duration
- wg.Add(1)
- maxDuration := float64(0)
- // go tb.Quantile(1, &maxDuration)
- gauge := prometheus.NewGauge(&wg, "observe_biz_node_duration_max_milliseconds", labels, ts, mins)
- go gauge.Max(&maxDuration)
- // 获取avg duration
- wg.Add(1)
- avgDuration := float64(0)
- go tb.Avg(&avgDuration)
- // 记录 total
- wg.Add(1)
- total := int64(0)
- go tb.Total(&total)
- wg.Wait()
- resp.BizId = int64(biz.ID)
- resp.BizHash = req.BizHash
- // resp.Total = st.Total
- resp.Total = total
- resp.Rpm = math.Round(float64(resp.Total)/float64((mins))*100) / 100
- if maxDuration > 0 {
- resp.Max = maxDuration
- }
- // resp.Min = st.MinDuration
- resp.Avg = avgDuration
- if math.IsNaN(resp.Avg) {
- resp.Avg = 0
- }
- resp.ErrorRate = errorRate // 错误率的计算公式可能有问题
- resp.ErrorNum = int64(errorRate * float64(resp.Total))
- resp.Quantiles.Time = times
- resp.Quantiles.P50 = p50s
- resp.Quantiles.P90 = p90s
- resp.Quantiles.P99 = p99s
- return nil
- }
- func (s *Biz) Detail(req *dto.BizDetailReq, resp *[]dto.BizDetailResp, count *int64) error {
- biz := models.Biz{}
- if err := s.Orm.Model(&models.Biz{}).Where("id", req.BizId).First(&biz).Error; err != nil {
- return errors.Wrap(err, "业务ID不存在")
- }
- appAlias := ""
- if err := s.Orm.Model(&adModels.OtApps{}).Where("id", biz.AppID).Pluck("alias", &appAlias).Error; err != nil {
- return errors.Wrap(err, "获取应用别名失败")
- }
- req.CheckFilling(time.Minute * 5)
- startTime, endTime := req.StartTime, req.EndTime
- if endTime-startTime > 1800 {
- startTime = endTime - 1800
- }
- for *count == 0 {
- traceList := []struct {
- Timestamp time.Time
- TraceId string
- SpanId string
- Duration int64
- }{}
- db := s.ChOrm.Debug().Table(models.TableNameTrace).
- Select("Timestamp, TraceId, SpanId, Duration").
- Where("AppAlias", appAlias).
- Where("ServiceName", biz.ServiceName).
- Where("SpanName", biz.SpanName).
- Where("SpanKind", biz.SpanKind).
- Where("ParentSpanId=''").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime)
- if req.MinDuration > 0 {
- db.Where("Duration>?", req.MinDuration*float64(time.Millisecond))
- }
- if req.MaxDuration > 0 {
- db.Where("Duration<?", req.MaxDuration*float64(time.Millisecond))
- }
- if req.OnlyException {
- db.Where("StatusCode", "STATUS_CODE_ERROR")
- }
- db.Count(count)
- if *count == 0 {
- if startTime <= req.StartTime {
- break
- }
- diff := endTime - startTime
- endTime = startTime
- startTime = startTime - diff*2
- if startTime < req.StartTime {
- startTime = req.StartTime
- }
- continue
- }
- if err := db.Order(req.OrderBy([]string{"Duration", "Timestamp"}, "Duration", "DESC")).
- Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).
- Find(&traceList).Offset(-1).Limit(-1).Count(count).Error; err != nil {
- return errors.Wrap(err, "获取业务详情失败")
- }
- *resp = make([]dto.BizDetailResp, len(traceList))
- for i, trace := range traceList {
- (*resp)[i] = dto.BizDetailResp{
- Datetime: trace.Timestamp.Local().Format(time.DateTime),
- Timestamp: trace.Timestamp.Unix(),
- TraceId: trace.TraceId,
- SpanId: trace.SpanId,
- SpanName: biz.SpanName,
- Duration: int64(math.Round(float64(trace.Duration / 1e6))),
- }
- }
- }
- return nil
- }
- func (s *Biz) Graph(req *dto.BizGraphReq, resp *[]dto.BizGraphResp) error {
- biz := models.Biz{}
- if err := s.Orm.First(&biz, req.BizId).Error; err != nil {
- return errors.Wrap(err, "获取业务数据失败")
- }
- edges := []models.BizEdge{}
- if err := s.Orm.Model(&models.BizEdge{}).Where("biz_id", req.BizId).Where("app_id", biz.AppID).Find(&edges).Error; err != nil {
- return errors.Wrap(err, "获取边数据失败")
- }
- if len(edges) == 0 {
- return errors.New("边数据不存在")
- }
- nodeIdMap := map[int64]struct{}{}
- for _, edge := range edges {
- if edge.Source > 0 {
- nodeIdMap[edge.Source] = struct{}{}
- }
- nodeIdMap[edge.Target] = struct{}{}
- }
- nodeIds := []int64{}
- for nodeId := range nodeIdMap {
- nodeIds = append(nodeIds, nodeId)
- }
- nodes := []models.BizNode{}
- if err := s.Orm.Model(&models.BizNode{}).Find(&nodes, nodeIds).Error; err != nil {
- return errors.Wrap(err, "获取结点数据失败")
- }
- if len(nodes) == 0 {
- return errors.New("结点数据不存在")
- }
- appAlias := ""
- if err := s.Orm.Model(&models.App{}).Where("id", biz.AppID).Pluck("alias", &appAlias).Error; err != nil {
- return errors.New("app不存在")
- }
- svcNodes := []struct {
- ServiceName string
- Name string
- }{}
- if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&svcNodes).Error; err != nil {
- return errors.Wrap(err, "获取服务列表失败")
- }
- svcMap := map[string]string{}
- for _, node := range svcNodes {
- svcMap[node.ServiceName] = node.Name
- }
- req.CheckFilling(time.Hour)
- nodeMap := map[int64]models.BizNode{} // nodeId => node
- for _, node := range nodes {
- nodeMap[node.ID] = node
- }
- sourceNodeds := map[int64]struct{}{}
- targetNodes := map[int64]struct{}{}
- sourceTargets := map[int64][]int64{} // sourceId => targetIds
- heads := map[int64]struct{}{} // 头结点id,正常情况下仅有一个
- for _, edge := range edges {
- if edge.Source > 0 {
- sourceNodeds[edge.Source] = struct{}{}
- targetNodes[edge.Target] = struct{}{}
- sourceTargets[edge.Source] = append(sourceTargets[edge.Source], edge.Target)
- } else {
- heads[edge.Target] = struct{}{}
- }
- }
- // 正常情况下, heads已经存在一个, 这里仅当业务决策树不完整时才会向heads中继续添加
- for source := range sourceNodeds {
- if _, ok := targetNodes[source]; !ok {
- heads[source] = struct{}{}
- }
- }
- if len(heads) == 0 { // 未找到头结点, 为了显示数据,往heads中加入第一个node结点的id
- heads[nodes[0].ID] = struct{}{}
- }
- wg := &sync.WaitGroup{}
- var genTree func(root *dto.BizGraphResp)
- genTree = func(root *dto.BizGraphResp) {
- wg.Add(1)
- root.Stats = new(dto.BizGraphStatsResp) // 一定要写在外面, 在方法内new的话,有的地方获取不到
- go s.graphStats(wg, appAlias, req.StartTime, req.EndTime, root)
- // s.graphStats(wg, appAlias, req.StartTime, req.EndTime, root)
- children, ok := sourceTargets[root.ID]
- // fmt.Println("-------", ok, len(children), children, root.BizNode.ID)
- if !ok || len(children) == 0 {
- root.MaxDepth = 1
- return
- }
- maxDepth := int64(0)
- for _, child := range children {
- node := new(dto.BizGraphResp)
- // if node.BizNode, ok = nodeMap[child]; ok {
- if node2, ok := nodeMap[child]; ok {
- node.ID = node2.ID
- node.Name = node2.Name
- node.UniqueID = fmt.Sprintf("%s-%d", root.UniqueID, node2.ID)
- node.Type = node2.Type
- node.BizHash = node2.BizHash
- node.AppAlias = node2.AppAlias
- node.ServiceName = node2.ServiceName
- node.ServiceNameCN = svcMap[node.ServiceName]
- node.SpanName = node2.SpanName
- node.SpanKind = node2.SpanKind
- node.SpanType = node2.SpanType
- node.IsVirtual = node2.IsVirtual
- genTree(node)
- root.Children = append(root.Children, node)
- if node.MaxDepth > maxDepth {
- maxDepth = node.MaxDepth
- }
- }
- }
- root.MaxDepth = 1 + maxDepth
- }
- for head := range heads {
- root := new(dto.BizGraphResp)
- if node, ok := nodeMap[head]; ok {
- // root.BizNode = node
- root.ID = node.ID
- root.Name = node.Name
- root.UniqueID = fmt.Sprintf("%d", node.ID)
- root.Type = node.Type
- root.BizHash = node.BizHash
- root.AppAlias = node.AppAlias
- root.ServiceName = node.ServiceName
- root.ServiceNameCN = svcMap[node.ServiceName]
- root.SpanName = node.SpanName
- root.SpanKind = node.SpanKind
- root.SpanType = node.SpanType
- root.IsVirtual = node.IsVirtual
- genTree(root)
- *resp = append(*resp, *root)
- }
- }
- wg.Wait()
- return nil
- }
- func (s *Biz) GraphStats(req *dto.BizGraphStatsReq, resp *dto.BizGraphStatsResp) error {
- bizNode := models.BizNode{}
- if err := s.Orm.Model(&models.BizNode{}).Where("id", req.BizNodeId).First(&bizNode).Error; err != nil {
- return errors.Wrap(err, "获取业务结点信息失败")
- }
- appAlias := ""
- if err := s.Orm.Model(&adModels.OtApps{}).Where("id", bizNode.AppID).Pluck("alias", &appAlias).Error; err != nil {
- return errors.Wrap(err, "获取应用别名失败")
- }
- type stats struct {
- Duration float64
- SuccessRate float64
- }
- st1, st2 := stats{}, stats{}
- errch := make(chan error, 2)
- genStats := func(wg *sync.WaitGroup, ch chan error, startTime, endTime int64, st *stats) {
- defer wg.Done()
- fields := "quantile(0.5)(Duration) AS Duration, SUM(IF(StatusCode!='STATUS_CODE_ERROR', 1, 0))/COUNT() AS SuccessRate"
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select(fields).
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime).
- Where("AppAlias", appAlias).
- Where("ServiceName", bizNode.ServiceName).
- Where("SpanName", bizNode.SpanName).
- Where("SpanKind", bizNode.SpanKind).
- Group("ServiceName, SpanName, SpanKind").
- Find(&st).Error; err != nil {
- ch <- errors.Wrap(err, "获取统计数据失败")
- return
- }
- }
- req.CheckFilling(time.Hour)
- wg := &sync.WaitGroup{}
- wg.Add(2)
- diff := req.EndTime - req.StartTime
- go genStats(wg, errch, req.StartTime, req.EndTime, &st1)
- go genStats(wg, errch, req.StartTime-diff, req.EndTime-diff, &st2)
- wg.Wait()
- close(errch)
- if err := <-errch; err != nil {
- return err
- }
- resp.BizNodeId = req.BizNodeId
- resp.Duration = st1.Duration / 1e6
- resp.SuccessRate = st1.SuccessRate
- resp.SuccessRateUp = st1.SuccessRate >= st2.SuccessRate
- return nil
- }
- func (s *Biz) graphStats(wg *sync.WaitGroup, appAlias string, startTime, endTime int64, root *dto.BizGraphResp) error {
- defer wg.Done()
- type stats struct {
- Duration float64
- SuccessRate float64
- }
- st1, st2 := stats{}, stats{}
- errch := make(chan error, 2)
- genStats := func(wg *sync.WaitGroup, startTime, endTime int64, st *stats) {
- defer wg.Done()
- fields := "quantile(0.5)(Duration) / 1e6 AS Duration, SUM(IF(StatusCode!='STATUS_CODE_ERROR', 1, 0))/COUNT() AS SuccessRate"
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select(fields).
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", startTime, endTime).
- Where("AppAlias", appAlias).
- Where("ServiceName", root.ServiceName).
- Where("SpanName", root.SpanName).
- Where("SpanKind", root.SpanKind).
- Group("ServiceName, SpanName, SpanKind").
- Find(&st).Error; err != nil {
- errch <- errors.Wrap(err, "获取统计数据失败")
- return
- }
- }
- wg2 := &sync.WaitGroup{}
- wg2.Add(2)
- diff := endTime - startTime
- go genStats(wg2, startTime, endTime, &st1)
- go genStats(wg2, startTime-diff, endTime-diff, &st2)
- wg2.Wait()
- root.Stats.BizNodeId = root.ID
- root.Stats.Duration = math.Round(st1.Duration*100) / 100
- root.Stats.SuccessRate = st1.SuccessRate
- root.Stats.SuccessRateUp = st1.SuccessRate >= st2.SuccessRate
- return nil
- }
- func (s *Biz) DigitsFromClickhouse(req *dto.BizDigitsReq, resp *dto.BizDigitsResp) error {
- alias := ""
- s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &alias)
- req.AppAlias = alias // 后期 app alias从参数中传
- req.CheckFilling(time.Hour)
- mins := (req.EndTime - req.StartTime) / 60
- // 计算业务总数和服务总数
- bizTotal, serviceTotal := int64(0), int64(0)
- appAlias := ""
- err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
- if err != nil {
- return errors.Wrap(err, "未获取到app alias")
- }
- if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Count(&serviceTotal).Error; err != nil {
- return errors.Wrap(err, "获取服务总数失败")
- }
- if err := s.Orm.Model(&models.Biz{}).Where("app_alias", appAlias).Count(&bizTotal).Error; err != nil {
- return errors.Wrap(err, "获取业务总数失败")
- }
- resp.Biz = bizTotal
- resp.Service = serviceTotal
- //
- row := struct {
- Total int64
- ErrorNum int64
- DatabaseNum int64
- }{}
- if err := s.ChOrm.Model(&models.Trace{}).
- Select("count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, SUM(IF(SpanAttributes['db.system'] != '', 1, 0)) as DatabaseNum").
- Where("AppAlias", alias).
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Find(&row).Error; err != nil {
- return errors.Wrap(err, "获取基础统计信息失败")
- }
- resp.DatabaseBiz = row.DatabaseNum
- resp.DatabaseBizPerMinute = math.Round(float64(row.DatabaseNum)/float64(mins)*100) / 100
- resp.Error = row.ErrorNum
- if row.Total > 0 {
- resp.ErrorRate = float64(row.ErrorNum) / float64(row.Total)
- }
- resp.ErrorPerMinute = math.Round(float64(row.ErrorNum)/float64(mins)*100) / 100
- resp.Trace = float64(row.Total)
- resp.TracePerMinute = math.Round(float64(row.Total)/float64(mins)*100) / 100
- return nil
- }
- func (s *Biz) Digits(req *dto.BizDigitsReq, resp *dto.BizDigitsResp) error {
- alias := ""
- s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &alias)
- req.AppAlias = alias // 后期 app alias从参数中传
- req.CheckFilling(time.Hour)
- mins := (req.EndTime - req.StartTime) / 60
- metric := "observe_biz_node_duration_milliseconds"
- labels := map[string]string{"app_alias": req.AppAlias}
- ts := time.Unix(req.EndTime, 0)
- wg := sync.WaitGroup{}
- tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
- // 计算错误率和trace总数
- total, errRate := int64(0), float64(0)
- wg.Add(2)
- go tb.Total(&total)
- go tb.ErrorRate(&errRate)
- // 计算业务总数和服务总数
- bizTotal, serviceTotal := int64(0), int64(0)
- appAlias := ""
- err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
- if err != nil {
- return errors.Wrap(err, "未获取到app alias")
- }
- if err := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Count(&serviceTotal).Error; err != nil {
- return errors.Wrap(err, "获取服务总数失败")
- }
- if err := s.Orm.Model(&models.Biz{}).Where("app_alias", appAlias).Count(&bizTotal).Error; err != nil {
- return errors.Wrap(err, "获取业务总数失败")
- }
- // 计算 db 相关的业务结点
- wg.Add(1)
- labels["database"] = "true"
- tb2 := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
- dbTotal := int64(0)
- tb2.Total(&dbTotal)
- // todo log
- wg.Wait()
- resp.Biz = bizTotal
- resp.Service = serviceTotal
- resp.ErrorRate = math.Round(errRate*1e5) / 1e5
- resp.Error = int64(errRate * float64(total))
- resp.ErrorPerMinute = math.Round(errRate*float64(total)/float64(mins)*100) / 100
- resp.Trace = float64(total)
- resp.TracePerMinute = math.Round(float64(total)/float64(mins)*100) / 100
- resp.DatabaseBiz = dbTotal
- resp.DatabaseBizPerMinute = math.Round(float64(dbTotal)/float64(mins)*100) / 100
- return nil
- }
- // 标记
- func (s *Biz) Mark(req *dto.BizMarkReq) error {
- if req.Id <= 0 {
- return errors.New("参数id非法")
- }
- //err := s.Orm.Model(&models.Biz{}).Where("id", req.Id).Update("name", req.Name).Error
- err := s.Orm.Table("ot_biz").Where("id", req.Id).Update("name", req.Name).Error
- if err != nil {
- return errors.Wrap(err, "标记失败")
- }
- return nil
- }
- // 收藏
- func (s *Biz) Favor(req *dto.BizFavorReq) error {
- if req.Id <= 0 {
- return errors.New("参数id非法")
- }
- if req.Favor != 0 && req.Favor != 1 {
- return errors.New("参数favor非法")
- }
- err := s.Orm.Model(&models.Biz{}).Where("id", req.Id).Update("favor", req.Favor).Error
- if err != nil {
- msg := "收藏失败"
- if req.Favor == 0 {
- msg = "取消收藏失败"
- }
- return errors.Wrap(err, msg)
- }
- return nil
- }
- // 删除
- func (s *Biz) Delete(req *dto.BizDeleteReq) error {
- if req.Id <= 0 {
- return errors.New("参数id非法")
- }
- err := s.Orm.Delete(&models.Biz{}, req.Id).Error
- if err != nil {
- return errors.Wrap(err, "删除失败")
- }
- return nil
- }
- func (s *Biz) StatisticBasicGuy(req *dto.BizBasicGuy, resp *map[string]any) error {
- var countBiz int64
- var countService int64
- var countInterface int64
- if err := s.Orm.Table("ot_biz").Where("app_alias", req.AppAlias).Count(&countBiz).Error; err != nil {
- return errors.Wrap(err, "统计业务数错误")
- }
- if err := s.Orm.Table("ot_service_nodes").Where("app_alias", req.AppAlias).Count(&countService).Error; err != nil {
- return errors.Wrap(err, "统计服务总数错误")
- }
- if err := s.Orm.Table("ot_url_mapping").Where("app_alias", req.AppAlias).Where("type", 2).Count(&countInterface).Error; err != nil {
- return errors.Wrap(err, "统计对外接口总数错误")
- }
- (*resp)["biz"] = countBiz
- (*resp)["svrs"] = countService
- (*resp)["interface"] = countInterface
- return nil
- }
- type bizItem struct {
- AppAlias string
- ServiceName string
- SpanName string
- SpanKind string
- }
- func (s *Biz) GenGraph() error {
- // total := int64(0)
- groups := "AppAlias, ServiceName, SpanName"
- fields := fmt.Sprintf("%s, COUNT() Total", groups)
- // if err := s.ChOrm.Table(models.TableNameTrace).Select(fields).
- // Where("Timestamp>=now()-interval 2 hour AND Timestamp<=now()-interval 1 hour"). // 保证trace完整
- // Group(groups).Having("Total>1").Count(&total).Error; err != nil {
- // return errors.Wrap(err, "获取业务总数失败")
- // }
- pageIndex, pageSize := 1, 100
- // pageTotal := int(math.Ceil(float64(total) / float64(pageSize)))
- for i := pageIndex; ; i++ {
- list := []bizItem{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select(fields).
- Scopes(cDto.Paginate(pageSize, i)).
- Where("Timestamp>now()-interval 1 hour").
- Where("ParentSpanId=''").
- Group(groups).Having("Total>1").
- Find(&list).Error; err != nil {
- log.Errorf("获取业务列表失败: %s", err.Error())
- continue
- }
- if len(list) == 0 {
- break
- }
- newBizs := []models.Biz{}
- for _, item := range list {
- appId, err := query.NewApp().Alias2ID(item.AppAlias)
- if err != nil {
- log.Errorf("获取app id失败: %s", err.Error())
- continue
- }
- hash := cUtils.SimpleHash(item.AppAlias, item.ServiceName, item.SpanName)
- if bizId, err := query.NewBiz().BizHash2ID(hash); err != nil {
- log.Errorf("biz id获取失败: %s", err.Error())
- } else if bizId == 0 {
- newBizs = append(newBizs, models.Biz{
- Name: item.SpanName,
- Hash: hash,
- AppID: appId,
- AppAlias: item.AppAlias,
- ServiceName: item.ServiceName,
- SpanName: item.SpanName,
- IsAutoCreated: 1,
- })
- }
- // if _, err := query.NewBiz().CheckAddBiz(appId, item.ServiceName, item.SpanName); err != nil {
- // log.Errorf("check add biz失败: %s", err.Error())
- // continue
- // }
- // go s.genGraphNode(bizId, appId, item.AppAlias, item.ServiceName, item.SpanName)
- }
- if len(newBizs) == 0 {
- continue
- }
- if err := s.Orm.Model(&models.Biz{}).Create(&newBizs).Error; err != nil {
- log.Errorf("biz create失败: %s", err.Error())
- }
- }
- return nil
- }
- // // 时间 限制
- // func (s *Biz) genGraphNode(bizId, appId int64, appAlias, serviceName, spanName string) {
- // traceIds := []string{}
- // if err := s.ChOrm.Table(models.TableNameTrace).Select("DISTINCT TraceId").
- // Where("AppAlias=? AND ServiceName=? AND SpanName=? AND TraceId=''", appAlias, serviceName, spanName).
- // Limit(10).
- // Pluck("TraceId", &traceIds).Error; err != nil {
- // return
- // }
- // fields := "SpanId, ParentSpanId, AppAlias, ServiceName, SpanName, SpanKind"
- // for _, traceId := range traceIds {
- // list := []bizItem{}
- // if err := s.ChOrm.Table(models.TableNameTrace).
- // Select(fields).Where("TraceId", traceId).
- // Find(&list).Error; err != nil {
- // log.Errorf("获取业务结点失败: %s", err)
- // continue
- // }
- // for _, item := range list {
- // bizNode := models.BizNode{
- // Name: item.SpanName,
- // Type: 0, // todo
- // ExternalId: 0, // todo
- // BizId: bizId,
- // AppId: appId,
- // // appAlias: appAlias,
- // ServiceName: serviceName,
- // SpanName: spanName,
- // SpanKind: item.SpanKind,
- // IsVirtual: 0,
- // }
- // query.NewBizNode().CheckAddBizNode(&bizNode)
- // }
- // }
- // }
- func (s *Biz) nodeInfo(id int64, hash string, node *models.BizNode) error {
- if id == 0 && hash == "" {
- return errors.New("无效参数")
- }
- db := s.Orm.Model(&models.BizNode{})
- if hash != "" {
- db.Where("hash", hash)
- } else {
- db.Where("id", id)
- }
- if err := db.Find(node).Error; err != nil {
- return errors.Wrap(err, "")
- }
- return nil
- }
- func (s *Biz) NodeSpans(req *dto.BizNodeSpansReq, resp *[]dto.ServiceSpansResp, count *int64) error {
- node := models.BizNode{}
- if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
- return err
- }
- req.CheckFilling(time.Hour)
- req2 := dto.ServiceSpansReq{
- AppAlias: node.AppAlias,
- ServiceName: []string{node.ServiceName},
- SpanName: node.SpanName,
- SpanKind: node.SpanKind,
- TimeRange: req.TimeRange,
- Pagination: req.Pagination,
- SortInfo: req.SortInfo,
- }
- svc := Service{s.OtService}
- return svc.Spans(&req2, resp, count)
- }
- func (s *Biz) NodeStatsFromClickhouse(req *dto.BizNodeStatsReq, resp *dto.BizNodeStatsResp) error {
- node := models.BizNode{}
- if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
- return err
- }
- resp.ID = node.ID
- resp.Hash = node.Hash
- req.CheckFilling(time.Hour)
- if err := s.ChOrm.Model(&models.Trace{}).
- Select(`count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0))/count() as ErrorRate,
- quantile(0.5)(Duration)/1e6 as P50, quantile(0.9)(Duration)/1e6 as P90, quantile(0.99)(Duration)/1e6 as P99`).
- Where("AppAlias", node.AppAlias).
- Where("ServiceName", node.ServiceName).
- Where("SpanKind", node.SpanKind).
- Where("SpanName", node.SpanName).
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Find(resp).Error; err != nil {
- return errors.Wrap(err, "获取基础统计信息失败")
- }
- resp.ID = int64(node.ID)
- resp.ServiceName = node.ServiceName
- if node.ExternalID > 0 {
- urlmapping := models.UrlMapping{}
- if err := s.Orm.Model(&models.UrlMapping{}).Where("id", node.ExternalID).Find(&urlmapping).Error; err != nil {
- return errors.Wrap(err, "获取接口解析信息失败")
- }
- resp.Name = urlmapping.Name
- resp.Value = urlmapping.Url
- resp.HttpMethod = urlmapping.Method
- }
- timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- if req.EndTime-req.StartTime >= 60*60 {
- timeField = "formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- }
- fields := []string{
- timeField,
- "quantile(0.5)(Duration)/1e6 as P50Duration",
- "quantile(0.90)(Duration)/1e6 as P90Duration",
- "quantile(0.99)(Duration)/1e6 as P99Duration",
- }
- quantiles := []struct {
- StartTime string
- P50Duration float64
- P90Duration float64
- P99Duration float64
- }{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
- Where("AppAlias", node.AppAlias).
- Where("ServiceName", node.ServiceName).
- Where("SpanKind", node.SpanKind).
- Where("SpanName", node.SpanName).
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
- return errors.Wrap(err, "获取分位数数据失败")
- }
- resp.DurationStats = dto.DurationStats{
- Time: make([]string, len(quantiles)),
- P50: make([]float64, len(quantiles)),
- P90: make([]float64, len(quantiles)),
- P99: make([]float64, len(quantiles)),
- }
- for i, quantile := range quantiles {
- resp.DurationStats.Time[i] = quantile.StartTime
- resp.DurationStats.P50[i] = math.Round(quantile.P50Duration*100) / 100
- resp.DurationStats.P90[i] = math.Round(quantile.P90Duration*100) / 100
- resp.DurationStats.P99[i] = math.Round(quantile.P99Duration*100) / 100
- }
- return nil
- }
- func (s *Biz) NodeStats(req *dto.BizNodeStatsReq, resp *dto.BizNodeStatsResp) error {
- node := models.BizNode{}
- if err := s.nodeInfo(req.ID, req.Hash, &node); err != nil {
- return err
- }
- resp.ID = node.ID
- resp.Hash = node.Hash
- req.CheckFilling(time.Hour)
- wg := sync.WaitGroup{}
- mins := (req.EndTime - req.StartTime) / 60
- metric := "observe_biz_node_duration_milliseconds"
- labels := map[string]string{"biz_node_hash": node.Hash}
- ts := time.Unix(req.EndTime, 0)
- hist := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
- wg.Add(5)
- go hist.Quantile(0.5, &resp.P50)
- go hist.Quantile(0.9, &resp.P90)
- go hist.Quantile(0.99, &resp.P99)
- go hist.ErrorRate(&resp.ErrorRate)
- go hist.Total(&resp.Total)
- times := []string{}
- p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
- wg.Add(3)
- go hist.QuantileMinutes(0.5, &[]string{}, &p50s)
- go hist.QuantileMinutes(0.9, &[]string{}, &p90s)
- go hist.QuantileMinutes(0.99, ×, &p99s)
- wg.Wait()
- resp.ServiceName = node.ServiceName
- resp.DurationStats = dto.DurationStats{
- Time: times,
- P50: p50s,
- P90: p90s,
- P99: p99s,
- }
- if node.ExternalID > 0 {
- urlmapping := models.UrlMapping{}
- if err := s.Orm.Model(&models.UrlMapping{}).Where("id", node.ExternalID).Find(&urlmapping).Error; err != nil {
- return errors.Wrap(err, "获取接口解析信息失败")
- }
- resp.Name = urlmapping.Name
- resp.Value = urlmapping.Url
- resp.HttpMethod = urlmapping.Method
- }
- return nil
- }
- func (s *Biz) NodeUpdate(req *dto.BizNodeUpdateReq) error {
- db := s.Orm.Model(&models.BizNode{})
- if req.ID > 0 {
- db.Where("id", req.ID)
- } else if req.Hash != "" {
- db.Where("hash", req.Hash)
- } else {
- return errors.New("参数非法")
- }
- if err := db.Update("name", req.Name).Error; err != nil {
- return errors.Wrap(err, "更新失败")
- }
- return nil
- }
- func (s *Biz) SlowTop(req *dto.BizSlowTopReq, resp *[]dto.BizSlowTopResp) error {
- req.CheckFilling(time.Hour)
- if req.EndTime-req.StartTime > 3600 {
- req.StartTime = req.EndTime - 3600
- }
- respTemp := []dto.BizSlowTopResp{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select("AppAlias, ServiceName, SpanName, SpanKind, avg(Duration)/1e6 Duration, SUM(if(StatusCode='STATUS_CODE_ERROR', 1, 0))/count() ErrorRate").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Where("AppAlias", req.AppAlias).
- Where("ParentSpanId=''").
- Group("AppAlias, ServiceName, SpanName, SpanKind").
- Order("Duration desc").
- Limit(int(req.Limit)).
- Find(&respTemp).Error; err != nil {
- return errors.Wrap(err, "查询基础业务数据失败")
- }
- svcMap, err := query.NewService().ServiceMap(req.AppAlias)
- if err != nil {
- return err
- }
- mins := (req.EndTime - req.StartTime) / 60
- metric := "observe_biz_node_duration_milliseconds"
- ts := time.Unix(req.EndTime, 0)
- wg := sync.WaitGroup{}
- for i, trace := range respTemp {
- wg.Add(1)
- go func(t dto.BizSlowTopResp, i int) {
- defer wg.Done()
- biz := models.Biz{}
- s.Orm.Model(&models.Biz{}).
- Where("app_alias", t.AppAlias).
- Where("service_name", t.ServiceName).
- Where("span_name", t.SpanName).
- Find(&biz)
- respTemp[i].BizId = biz.ID
- respTemp[i].BizName = biz.Name
- respTemp[i].BizHash = biz.Hash
- respTemp[i].Duration = math.Round(respTemp[i].Duration*100) / 100
- // 使用sql查询出来的ErrorRate, 但sql查询出来的应该不准确, 只是当前节点的错误率,而不包括下游节点错误率
- if !config.ExtConfig.ClickhouseMetrics {
- labels := map[string]string{"biz_hash": biz.Hash}
- wg2 := sync.WaitGroup{}
- tb := prometheus.NewHistogram(&wg2, metric, labels, ts, mins)
- wg2.Add(1)
- go tb.ErrorRate(&respTemp[i].ErrorRate)
- wg2.Wait()
- }
- }(trace, i)
- }
- wg.Wait()
- for _, trace := range respTemp {
- if trace.BizId > 0 {
- trace.ServiceNameCN = svcMap[trace.ServiceName]
- (*resp) = append((*resp), trace)
- }
- }
- return nil
- }
|