|
- package service
- import (
- "context"
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/models/query"
- "go-admin/app/observe/service/dto"
- cDto "go-admin/common/dto"
- "go-admin/common/opentelemetry"
- "go-admin/common/prometheus"
- "go-admin/utils"
- "math"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "github.com/pkg/errors"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- const (
- ChWithSql = `SpanAttributes['http.url'] != '' AS isHttp,
- SpanAttributes['rpc.system'] != '' AS isRpc,
- SpanAttributes['rpc.system'] AS rpcSystem,
- SpanAttributes['db.system'] != '' AS isDb,
- SpanAttributes['db.system'] AS dbSystem,
- SpanAttributes['messaging.system'] AS messagingSystem`
- FIFTYMINUTE = "toStartOfFifteenMinutes"
- ONEMINUTE = "toStartOfMinute"
- )
- type Service struct {
- utils.OtService
- }
- func (s *Service) GetEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphEdge) error {
- // 边暂时不计算相关统计值,因为现在的前端用不到,隐藏以提高性能
- // sql := `WITH toDateTime(?) AS StartTime, toDateTime(?) AS EndTime, ? AS seconds, ? AS appAlias
- // SELECT
- // ot2.ServiceName AS SourceService,
- // ot1.ServiceName AS TargetService,
- // ot2.RequestType AS RequestType,
- // COUNT()/seconds AS Qps,
- // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
- // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0))/COUNT() AS ErrorRate,
- // AVG(Duration)/1e6 AS DurationAverage,
- // quantile(0.5)(Duration)/1e6 AS DurationMedian,
- // quantile(0.9)(Duration)/1e6 AS DurationP90,
- // quantile(0.99)(Duration)/1e6 AS DurationP99
- // FROM
- // (
- // SELECT
- // TraceId,
- // SpanId,
- // ParentSpanId,
- // ServiceName,
- // Duration,
- // StatusCode,
- // AppAlias
- // FROM otel.otel_traces
- // WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime
- // ) AS ot1
- // INNER JOIN
- // (
- // SELECT
- // SpanId,
- // ServiceName,
- // IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system']))) as RequestType
- // FROM otel.otel_traces
- // WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime
- // ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId
- // WHERE SourceService != TargetService
- // GROUP BY SourceService, TargetService, RequestType`
- // seconds := params.EndTime - params.StartTime
- // rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, seconds, params.AppAlias) // param.AppAlias
- // if err != nil {
- // return err
- // }
- // edgeMap := map[string]models.GraphEdge{}
- // for rows.Next() {
- // row := new(dto.ServiceEdgeRaw)
- // if err := rows.ScanStruct(row); err != nil {
- // s.Log.Errorf("扫描行到结构体失败: %s", err)
- // }
- // edgeMap[s.getEdgeId(row.SourceService, row.TargetService)] = models.GraphEdge{
- // ID: row.SourceService + "-" + row.TargetService,
- // Source: row.SourceService,
- // Target: row.TargetService,
- // MainStat: fmt.Sprintf("total: %d, %s: %.2fr/s, err: %d / %.2f%%", row.TotalNum, row.RequestType, row.Qps, row.ErrorNum, row.ErrorRate), // 展示示例 grpc:12.39req/s
- // SecondaryStat: fmt.Sprintf("avg: %.2fms, med: %.2fms, p90: %.2fms, p99: %.2fms",
- // row.DurationAverage, row.DurationMedian, row.DurationP90, row.DurationP99),
- // }
- // }
- _, defaultEdgeMap := s.getDefaultGraph(params.AppAlias)
- // for k, edge := range edgeMap {
- // defaultEdgeMap[k] = edge
- // }
- *result = defaultEdgeMap
- return nil
- }
- func (s *Service) GetNoSoulEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphEdge) error {
- // nodes := []models.ServiceNode{}
- edgeMap := map[string]models.GraphEdge{}
- _, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias)
- for k, edge := range edgeMap {
- defaultEdgeMap[k] = edge
- }
- for _, edge := range defaultEdgeMap {
- *result = append(*result, edge)
- }
- return nil
- }
- func (s *Service) GetNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphNodeScope) error {
- sql := `SELECT
- ServiceName,
- countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied,
- countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable,
- countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated,
- ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex,
- COUNT(DISTINCT TraceId) AS TraceNum,
- COUNT() AS SpanNum,
- SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum,
- SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum,
- SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum,
- SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum,
- any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang
- FROM
- otel_traces ot
- WHERE
- Timestamp > toDateTime(?)
- AND Timestamp < toDateTime(?)
- AND AppAlias = ?
- GROUP BY ServiceName ORDER BY ServiceName`
- rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias)
- if err != nil {
- s.Log.Error("执行sql错误: %s", err)
- return err
- }
- serviceNodes := []models.ServiceNode{}
- s.Orm.Model(&models.ServiceNode{}).Where("app_alias", params.AppAlias).Find(&serviceNodes)
- serviceNameMap := map[string]string{}
- for _, node := range serviceNodes {
- serviceNameMap[node.ServiceName] = node.Name
- }
- nodeMap := map[string]models.GraphNodeScope{}
- s.Log.Debug("-- rows扫描行到graph_node_scope --")
- for rows.Next() {
- row := new(dto.SerivceGraphNodeRaw)
- if err := rows.ScanStruct(row); err != nil {
- s.Log.Errorf("扫描行到结构体失败: %s", err)
- }
- title, ok := serviceNameMap[row.ServiceName]
- if !ok {
- title = row.ServiceName
- }
- nodeMap[row.ServiceName] = models.GraphNodeScope{
- ID: row.ServiceName,
- Title: title,
- SubTitle: "",
- Send: int64(row.SentNum),
- Receive: int64(row.ReceivedNum),
- SecondaryStat: "",
- ArcSuccess: float64(row.SpanNum-row.ErrorNum) / float64(row.SpanNum),
- ArcFaild: float64(row.ErrorNum) / float64(row.SpanNum),
- Icon: row.SdkLang,
- Apdex: row.Apdex,
- // MainStat: fmt.Sprintf("sent: %d, received: %d", row.SentNum, row.ReceivedNum),
- }
- }
- defaultNodeMap, _ := s.getDefaultGraph(params.AppAlias)
- for k, node := range nodeMap {
- defaultNodeMap[k] = node
- }
- *result = defaultNodeMap
- // for _, node := range defaultNodeMap {
- // *result = append(*result, node)
- // }
- return nil
- }
- func (s *Service) GetNoSoulNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphNodeScope) error {
- defaultNodeMap, _ := s.getDefaultGraphNoSoul(params.AppAlias)
- for _, node := range defaultNodeMap {
- *result = append(*result, node)
- }
- return nil
- }
- func (s *Service) GetGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) {
- params.CheckFilling(5 * time.Minute)
- nodes, edges := make(map[string]models.GraphNodeScope, 0), make(map[string]models.GraphEdge, 0)
- if err = s.GetNodes(ctx, params, &nodes); err != nil {
- return
- }
- if err = s.GetEdges(ctx, params, &edges); err != nil {
- return
- }
- // nodes, edges, _ = s.extendNodesAndEdges(params, nodes, edges)
- for _, node := range nodes {
- result.Nodes = append(result.Nodes, node)
- }
- for _, edge := range edges {
- result.Edges = append(result.Edges, edge)
- }
- // result.Edges = edges
- // result.Nodes = nodes
- return
- }
- func (s *Service) GetNoSoulGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) {
- params.CheckFilling(5 * time.Minute)
- nodes, edges := make([]models.GraphNodeScope, 0), make([]models.GraphEdge, 0)
- defaultNodeMap, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias)
- // defaultNodeMap, defaultEdgeMap, _ = s.extendNodesAndEdges(params, defaultNodeMap, defaultEdgeMap)
- for _, node := range defaultNodeMap {
- nodes = append(nodes, node)
- }
- for _, edge := range defaultEdgeMap {
- edges = append(edges, edge)
- }
- // if err = s.GetNoSoulNodes(ctx, params, &nodes); err != nil {
- // return
- // }
- // if err = s.GetNoSoulEdges(ctx, params, &edges); err != nil {
- // return
- // }
- result.Edges = edges
- result.Nodes = nodes
- return
- }
- func (s *Service) getDefaultGraph(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) {
- data := []models.ServiceEdge{}
- edges = make(map[string]models.GraphEdge)
- nodes = make(map[string]models.GraphNodeScope)
- serviceNodes := []models.ServiceNode{}
- s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes)
- serviceNameMap := map[string]string{}
- for _, node := range serviceNodes {
- serviceNameMap[node.ServiceName] = node.Name
- }
- if err := s.Orm.Model(&models.ServiceEdge{}).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil {
- for _, item := range data {
- sourceTitle, targetTitle := item.Source, item.Target
- if title, ok := serviceNameMap[sourceTitle]; ok {
- sourceTitle = title
- }
- if title, ok := serviceNameMap[targetTitle]; ok {
- targetTitle = title
- }
- source, target := item.Source, item.Target
- if item.SourceApp == appAlias { // 如果源应用是当前应用
- if item.SourceType != "service" {
- continue
- }
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: sourceTitle,
- Icon: item.SourceIcon,
- }
- if item.TargetType == "application" {
- target = "downstream"
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: "下游应用",
- Icon: "cloud",
- }
- } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" {
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: targetTitle,
- Icon: item.TargetIcon,
- }
- } else {
- continue
- }
- }
- if item.TargetApp == appAlias {
- if item.TargetType != "service" {
- continue
- }
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: target,
- Icon: item.TargetIcon,
- }
- if item.SourceType == "application" || item.SourceType == "client" {
- source = "upstream"
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: "上游应用",
- Icon: "cloud",
- }
- } else if item.SourceType == "service" && item.SourceApp == appAlias {
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: sourceTitle,
- Icon: item.SourceIcon,
- }
- } else {
- continue
- }
- }
- // if item.SourceType == "client" || item.SourceType == "application" {
- // source = "upstream"
- // nodes[source] = models.GraphNodeScope{
- // ID: source,
- // Title: "上游应用",
- // Icon: "cloud",
- // }
- // } else {
- // nodes[source] = models.GraphNodeScope{
- // ID: source,
- // Title: sourceTitle,
- // MainStat: "sent: 0, received: 0",
- // Icon: item.SourceIcon,
- // }
- // }
- // if item.TargetType == "application" {
- // target = "downstream"
- // nodes[target] = models.GraphNodeScope{
- // ID: target,
- // Title: "下游应用",
- // Icon: "cloud",
- // }
- // } else {
- // nodes[target] = models.GraphNodeScope{
- // ID: target,
- // Title: targetTitle,
- // MainStat: "sent: 0, received: 0",
- // Icon: item.TargetIcon,
- // }
- // }
- edges[s.getEdgeId(source, target)] = models.GraphEdge{
- ID: s.getEdgeId(source, target),
- Source: source,
- Target: target,
- MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%",
- }
- }
- }
- return
- }
- func (s *Service) getDefaultGraphNoSoul(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) {
- data := []models.ServiceEdge{}
- serviceNodes := []models.ServiceNode{}
- s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes)
- serviceNameMap := map[string]string{}
- for _, node := range serviceNodes {
- serviceNameMap[node.ServiceName] = node.Name
- }
- nodes = make(map[string]models.GraphNodeScope)
- if err := s.Orm.Table(models.TableNameServiceEdge).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil {
- edges = make(map[string]models.GraphEdge, len(data))
- for _, item := range data {
- source, target := item.Source, item.Target
- sourceTitle, targetTitle := item.Source, item.Target
- if title, ok := serviceNameMap[sourceTitle]; ok {
- sourceTitle = title
- }
- if title, ok := serviceNameMap[targetTitle]; ok {
- targetTitle = title
- }
- if item.SourceApp == appAlias { // 如果源应用是当前应用
- if item.SourceType != "service" {
- continue
- }
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: sourceTitle,
- Icon: item.SourceIcon,
- }
- if item.TargetType == "application" {
- target = "downstream"
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: "下游应用",
- Icon: "cloud",
- }
- } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" {
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: targetTitle,
- Icon: item.TargetIcon,
- }
- } else {
- continue
- }
- }
- if item.TargetApp == appAlias {
- if item.TargetType != "service" {
- continue
- }
- nodes[target] = models.GraphNodeScope{
- ID: target,
- Title: targetTitle,
- Icon: item.TargetIcon,
- }
- if item.SourceType == "application" || item.SourceType == "client" {
- source = "upstream"
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: "上游应用",
- Icon: "cloud",
- }
- } else if item.SourceType == "service" && item.SourceApp == appAlias {
- nodes[source] = models.GraphNodeScope{
- ID: source,
- Title: sourceTitle,
- Icon: item.SourceIcon,
- }
- } else {
- continue
- }
- }
- // if item.SourceType == "client" || item.SourceType == "application" {
- // source = "upstream"
- // nodes[source] = models.GraphNodeScope{
- // ID: source,
- // Title: "上游应用",
- // Icon: "cloud",
- // }
- // } else {
- // nodes[source] = models.GraphNodeScope{
- // ID: source,
- // Title: source,
- // Icon: source,
- // }
- // }
- // if item.TargetType == "application" {
- // target = "downstream"
- // nodes[target] = models.GraphNodeScope{
- // ID: target,
- // Title: "下游应用",
- // Icon: "cloud",
- // }
- // } else {
- // nodes[target] = models.GraphNodeScope{
- // ID: target,
- // Title: target,
- // Icon: target,
- // }
- // }
- edges[s.getEdgeId(source, target)] = models.GraphEdge{
- ID: s.getEdgeId(source, target),
- Source: source,
- Target: target,
- // MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%",
- }
- }
- }
- dn := []models.ServiceNode{}
- err := s.Orm.Table(models.TableNameServiceNode).Where("app_alias", appAlias).Find(&dn).Error
- if err == nil {
- for _, item := range dn {
- title := item.ServiceName
- if val, ok := serviceNameMap[item.ServiceName]; ok {
- title = val
- }
- nodes[item.ServiceName] = models.GraphNodeScope{
- ID: item.ServiceName,
- Title: title,
- Icon: item.Kind,
- }
- }
- }
- return
- }
- // 扩展节点和边
- func (s *Service) extendNodesAndEdges(params *dto.ServiceGetEdgesReq, nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) (map[string]models.GraphNodeScope, map[string]models.GraphEdge, error) {
- components := []models.SystemComponent{}
- err := s.Orm.Model(&models.SystemComponent{}).Where("app_alias", params.AppAlias).Find(&components).Error
- if err != nil {
- return nodes, edges, errors.Wrap(err, "获取系统组件失败")
- }
- dbMp, err := s.databaseNodeMap(params)
- if err != nil {
- return nodes, edges, err
- }
- msgMp, err := s.messagingNodeMap(params)
- if err != nil {
- return nodes, edges, err
- }
- cliMp, err := s.clientNodeMap(params)
- if err != nil {
- return nodes, edges, err
- }
- prevSvcToComponent := map[string]string{}
- nextSvcToComponent := map[string]string{}
- newEdges := map[string]models.GraphEdge{}
- for _, component := range components {
- if component.PrevServiceName != "" {
- prevSvcToComponent[component.PrevServiceName] = component.Component
- newEdges[s.getEdgeId(component.PrevServiceName, component.Component)] = models.GraphEdge{
- Source: component.PrevServiceName,
- Target: component.Component,
- }
- }
- if component.NextServiceName != "" {
- nextSvcToComponent[component.NextServiceName] = component.Component
- newEdges[s.getEdgeId(component.Component, component.NextServiceName)] = models.GraphEdge{
- Source: component.Component,
- Target: component.NextServiceName,
- }
- }
- node := models.GraphNodeScope{
- ID: component.Component,
- Title: component.Name,
- Icon: component.Component,
- }
- if dbStat, ok := dbMp[component.Component]; ok {
- node.Receive = dbStat.Total
- node.Send = 0
- node.ArcFaild = float64(dbStat.ErrorNum) / float64(dbStat.Total)
- node.ArcSuccess = 1 - node.ArcFaild
- }
- if msgStat, ok := msgMp[component.Component]; ok {
- node.Receive = msgStat.ReceivedNum
- node.Send = msgStat.SentNum // 严格来说, kafka通常只能被动接收,无发送量,但为不方便理解,将消费量作为发送量
- node.ArcFaild = float64(msgStat.ErrorNum) / float64(msgStat.Total)
- node.ArcSuccess = 1 - node.ArcFaild
- }
- if cliStat, ok := cliMp[component.Component]; ok {
- node.Receive = 0
- node.Send = cliStat.Total
- node.ArcFaild = float64(cliStat.ErrorNum) / float64(cliStat.Total)
- node.ArcSuccess = 1 - node.ArcFaild
- }
- nodes[component.Component] = node
- }
- dels := []string{}
- for sourceTarget, edge := range edges {
- source, target := s.getSourceTarget(sourceTarget)
- if component, ok := prevSvcToComponent[source]; ok {
- newEdges[s.getEdgeId(source, component)] = models.GraphEdge{
- Source: source,
- Target: component,
- MainStat: edge.MainStat,
- SecondaryStat: edge.SecondaryStat,
- }
- }
- if component, ok := nextSvcToComponent[target]; ok {
- newEdges[s.getEdgeId(component, target)] = models.GraphEdge{
- Source: component,
- Target: target,
- MainStat: edge.MainStat,
- SecondaryStat: edge.SecondaryStat,
- }
- }
- if prevSvcToComponent[source] != "" && nextSvcToComponent[target] != "" {
- // 如果组件既有上游节点又有下游节点, 则该组件为消息中间件, 删除原来两节点直连的边
- dels = append(dels, sourceTarget)
- }
- }
- for _, key := range dels {
- delete(edges, key)
- }
- for key, edge := range newEdges {
- edges[key] = edge
- }
- return nodes, edges, nil
- }
- func (s *Service) databaseNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
- list := []dto.ServiceComponentStats{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(`
- SpanAttributes['db.system'] AS Component,
- COUNT() AS Total,
- SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
- `).Where("AppAlias", params.AppAlias).
- Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
- Where("SpanKind", "SPAN_KIND_CLIENT").
- Where("SpanAttributes['db.system']!=''").Group("Component").Find(&list).Error; err != nil {
- return nil, errors.Wrap(err, "获取数据库统计信息失败")
- }
- mp := make(map[string]dto.ServiceComponentStats, len(list))
- for _, item := range list {
- mp[item.Component] = item
- }
- return mp, nil
- }
- func (s *Service) messagingNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
- list := []dto.ServiceComponentStats{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(`
- SpanAttributes['messaging.system'] AS Component,
- COUNT() AS Total,
- SUM(IF(SpanKind='SPAN_KIND_CONSUMER', 1, 0)) AS ReceivedNum,
- SUM(IF(SpanKind='SPAN_KIND_PRODUCER', 1, 0)) AS SentNum,
- SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
- `).Where("AppAlias", params.AppAlias).
- Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
- Where("SpanKind in ('SPAN_KIND_CONSUMER', 'SPAN_KIND_PRODUCER')").
- Where("SpanAttributes['messaging.system']!=''").Group("Component").Find(&list).Error; err != nil {
- return nil, errors.Wrap(err, "获取数据库统计信息失败")
- }
- mp := make(map[string]dto.ServiceComponentStats, len(list))
- for _, item := range list {
- mp[item.Component] = item
- }
- return mp, nil
- }
- func (s *Service) clientNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) {
- list := []dto.ServiceComponentStats{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(`
- splitByChar('/', if(SpanAttributes['user_agent.original']!='', SpanAttributes['user_agent.original'], SpanAttributes['http.user_agent']))[1] AS Component,
- COUNT() AS Total,
- SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum
- `).Where("AppAlias", params.AppAlias).
- Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", params.StartTime, params.EndTime).
- Where("SpanKind='SPAN_KIND_SERVER'").
- Where("ParentSpanId='' and Component!=''").
- Group("Component").Find(&list).Error; err != nil {
- return nil, errors.Wrap(err, "获取数据库统计信息失败")
- }
- mp := make(map[string]dto.ServiceComponentStats, len(list))
- for _, item := range list {
- mp[item.Component] = item
- }
- return mp, nil
- }
- func (s *Service) getEdgeId(source, target string) string {
- return fmt.Sprintf("%s-%s", source, target)
- }
- func (s *Service) getSourceTarget(edgeId string) (string, string) {
- st := strings.Split(edgeId, "-")
- if len(st) == 2 {
- return st[0], st[1]
- }
- return "", ""
- }
- func (s *Service) getPercentileDuration(ctx context.Context, params *dto.SpanScatterChartReq, p90 *float64) (err error) {
- sql := fmt.Sprintf(`SELECT quantile(%f)(Duration) AS p90 FROM otel_traces
- WHERE Timestamp>toDateTime(?) AND Timestamp<toDateTime(?) AND AppAlias=? AND ServiceName=?`, params.Percentile)
- row := s.OlapConn.QueryRow(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.ServiceName)
- err = row.Scan(p90)
- return
- }
- func (s *Service) GetSpanScatterChart(ctx context.Context, params *dto.SpanScatterChartReq, result *models.ScatterChart) (err error) {
- var p90 float64
- if err = s.getPercentileDuration(ctx, params, &p90); err != nil {
- s.Log.Errorf("获取百分位失败:%s", err)
- return
- }
- if params.EndTime == 0 {
- params.EndTime = time.Now().Unix()
- params.StartTime = params.EndTime - 5*60
- }
- sql := `SELECT
- Timestamp,
- Duration/1e6 AS DurationMs,
- StatusCode == 'STATUS_CODE_ERROR' AS Error,
- IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system']))) as RequestType
- FROM otel_traces
- WHERE Timestamp>toDateTime(?) AND Timestamp<toDateTime(?) AND AppAlias=? AND ServiceName=? AND Duration>? AND SpanKind!='SPAN_KIND_INTERNAL'
- ORDER BY Timestamp ASC`
- rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.ServiceName, p90)
- if err != nil {
- s.Log.Errorf("执行sql失败:%s", err)
- return
- }
- var t time.Time
- var r string
- var d float64
- var e bool
- for rows.Next() {
- err = rows.Scan(&t, &d, &e, &r)
- if err != nil {
- s.Log.Errorf("扫描行到变量失败:%s", err)
- break
- }
- estr := "success"
- if e {
- estr = "failed"
- }
- if _, ok := (*result)[r]; !ok {
- (*result)[r] = map[string][]models.CoordinatePoint{
- "success": {},
- "failed": {},
- }
- }
- (*result)[r][estr] = append((*result)[r][estr], []any{t, d})
- }
- return
- }
- func (s *Service) GetServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
- if params.EndTime == 0 {
- params.EndTime = time.Now().Unix()
- params.StartTime = params.EndTime - 5*60
- }
- sql := fmt.Sprintf(`SELECT
- toStartOfMinute(toTimeZone(Timestamp,'Asia/Hong_Kong')) AS hr,
- COUNT(*),
- median(Duration/1e6),
- FROM otel.otel_traces
- Where ServiceName='%s'
- AND Timestamp > (NOW() - toIntervalHour(3))
- GROUP BY hr
- ORDER BY hr ASC`, params.ServiceName)
- rows, err := s.OlapConn.Query(ctx, sql)
- if err != nil {
- s.Log.Errorf("select liveness service error:%s", err)
- return
- }
- var t time.Time
- var d float64
- var c uint64
- for rows.Next() {
- err = rows.Scan(&t, &c, &d)
- if err != nil {
- s.Log.Errorf("Scan rows errors:%s", err)
- break
- }
- *result = append(*result, []any{t, c, d})
- }
- return
- }
- func (s *Service) CompareServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
- startTime := time.Now().Add(-2 * time.Hour * time.Duration(params.HourNum)).Truncate(time.Hour)
- endTime := time.Now().Truncate(time.Hour).Add(time.Hour)
- list := []struct {
- Start string
- Total int64
- }{}
- sql := `SELECT formatDateTime(StartTime, '%F %H:%i', 'PRC') as Start, countMerge(TraceNum) as Total FROM otel_traces_aggbysvc
- WHERE AppAlias=? AND ServiceName=? AND StartTime>=? AND StartTime<? GROUP BY StartTime ORDER BY Start`
- err = s.ChOrm.Raw(sql, params.AppAlias, params.ServiceName, startTime.Unix(), endTime.Unix()).Scan(&list).Error
- // 使用视图效率低,会扫描全表
- // err = s.ChOrm.Model(&models.TracesAggbysvcMerge{}).Debug().
- // Where("AppAlias=? and ServiceName=?", params.AppAlias, params.ServiceName).
- // Where("StartTime>=? and StartTime<?", startTime.Unix(), endTime.Unix()).
- // Select("formatDateTime(StartTime, '%F %H:%i', 'PRC') as Start, TraceNum as Total").
- // Scan(&list).Error
- if err != nil {
- return
- }
- *result = make([]models.CoordinatePoint, 0, len(list))
- for _, item := range list {
- *result = append(*result, []any{item.Start, item.Total})
- }
- return
- // livenessSQL := `SELECT
- // toString(%s(toTimeZone(Timestamp,'Asia/Hong_Kong'))) AS hr,
- // COUNT(*)
- // FROM otel.otel_traces
- // Where ServiceName='%s'
- // AND AppAlias = '%s'
- // AND Timestamp > (NOW() - toIntervalHour(%d))
- // GROUP BY hr
- // ORDER BY hr ASC`
- // if params.HourNum < 6 {
- // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
- // } else {
- // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
- // }
- // rows, err := s.OlapConn.Query(ctx, livenessSQL)
- // if err != nil {
- // s.Log.Errorf("select compare liveness service error:%s", err)
- // return
- // }
- // var t string
- // var d uint64
- // for rows.Next() {
- // err = rows.Scan(&t, &d)
- // if err != nil {
- // s.Log.Errorf("Scan rows errors:%s", err)
- // break
- // }
- // *result = append(*result, [2]any{t, d})
- // }
- // return
- }
- func (s *Service) CompareServiceErrors(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) {
- db := s.ChOrm.Model(&models.TracesError{}).
- Where("AppAlias=? and ServiceName=? and Timestamp>now()-interval ? hour", params.AppAlias, params.ServiceName, params.HourNum*2)
- if params.HourNum == 1 {
- db.Select("formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
- } else if params.HourNum <= 3 {
- db.Select("formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
- } else if params.HourNum <= 6 {
- db.Select("formatDateTime(toStartOfTenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
- } else {
- db.Select("formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total")
- }
- list := []struct {
- StartTime string
- Total int64
- }{}
- err = db.Group("StartTime").Order("StartTime ASC").Scan(&list).Error
- if err != nil {
- return
- }
- *result = make([]models.CoordinatePoint, 0, len(list))
- for _, item := range list {
- *result = append(*result, []any{item.StartTime, item.Total})
- }
- return
- // livenessSQL := `with (if(SpanAttributes['http.status_code']!='', SpanAttributes['http.status_code'], SpanAttributes['http.response.status_code'])) as HttpCode
- // SELECT
- // toString(%s(toTimeZone(Timestamp,'Asia/Hong_Kong'))) AS hr,
- // COUNT(*)
- // FROM otel.otel_traces
- // Where ServiceName='%s'
- // AND (HttpCode >= '400' OR StatusCode = 'STATUS_CODE_ERROR')
- // AND AppAlias = '%s'
- // AND Timestamp > (NOW() - toIntervalHour(%d))
- // GROUP BY hr
- // ORDER BY hr ASC`
- // if params.HourNum < 6 {
- // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
- // } else {
- // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2)
- // }
- // rows, err := s.OlapConn.Query(ctx, livenessSQL)
- // if err != nil {
- // s.Log.Errorf("select compare liveness service error:%s", err)
- // return
- // }
- // var t string
- // var d uint64
- // for rows.Next() {
- // err = rows.Scan(&t, &d)
- // if err != nil {
- // s.Log.Errorf("Scan rows errors:%s", err)
- // break
- // }
- // *result = append(*result, [2]any{t, d})
- // }
- // return
- }
- func (s *Service) Spans(req *dto.ServiceSpansReq, resp *[]dto.ServiceSpansResp, count *int64) error {
- req.CheckFilling(time.Minute * 5)
- list := []struct {
- Datetime time.Time `json:"datetime"`
- TraceId string `json:"trace_id"`
- SpanId string `json:"span_id"`
- ServiceName string `json:"service_name"`
- Method string `json:"method"`
- Code string `json:"code"`
- Duration float64 `json:"duration"`
- }{}
- db := s.ChOrm.Table(models.TableNameTrace).
- Select(`Timestamp as Datetime, TraceId,SpanId,ServiceName,IF(SpanAttributes['http.method'] != '',
- SpanAttributes['http.method'],
- IF(SpanAttributes['http.request.method'] != '', SpanAttributes['http.request.method'],
- IF(SpanAttributes['rpc.system'] != '',
- SpanAttributes['rpc.system'],
- IF(SpanAttributes['db.system'] != '',
- SpanAttributes['db.system'],
- SpanAttributes['messaging.system']
- )
- )
- )
- ) AS Method,
- IF(SpanAttributes['http.status_code']>='200',
- SpanAttributes['http.status_code'],
- IF(SpanAttributes['http.response.status_code'] >= '200',SpanAttributes['http.response.status_code'],
- IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')] != '',
- SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')],
- IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')] != '',
- SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')],
- IF(SpanAttributes['messaging.operation']!='',
- SpanAttributes['messaging.operation'],
- IF(SpanAttributes['db.operation'] != '',SpanAttributes['db.operation'], SpanName)
- )
- )
- )
- )
- ) AS Code,
- Duration `,
- ).
- Where("AppAlias", req.AppAlias).
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex()))
- if len(req.ServiceName) == 1 {
- db.Where("ServiceName", req.ServiceName[0])
- } else if len(req.ServiceName) > 1 {
- db.Where("ServiceName IN ?", req.ServiceName)
- }
- if req.TraceId != "" {
- db.Where("TraceId", req.TraceId)
- }
- if req.MaxDuration > 0 {
- db.Where("Duration<?", req.MaxDuration*1e6)
- }
- if req.MinDuration > 0 {
- db.Where("Duration>=?", req.MinDuration*1e6)
- }
- if req.SpanKind != "" {
- db.Where("SpanKind", req.SpanKind)
- }
- if req.SpanAttributeKey != "" {
- db.Where(fmt.Sprintf("SpanAttributes['%s']", req.SpanAttributeKey), req.SpanAttributeValue)
- }
- if req.OnlyDatabase {
- db.Where("SpanAttributes['db.system'] != ?", "")
- }
- if req.SpanName != "" {
- db.Where("SpanName", req.SpanName)
- }
- if req.OnlyException {
- // db.Where(`StatusCode=? OR
- // SpanAttributes['http.status_code']>=? OR SpanAttributes['http.response.status_code']>=? OR
- // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')]>? OR
- // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')]!=?
- // `, "STATUS_CODE_ERROR", "400", "400", "0", "")
- // 简化查询, 加速仅异常条件慢问题,这么写也能查到,只是查到的http类错误都为span kind client, 没关系,因为都要点进去看的
- db.Where("StatusCode", "STATUS_CODE_ERROR")
- }
- if req.RequestMethod != "" {
- db.Where(`SpanAttributes['http.method'] = ? OR
- SpanAttributes['http.request.method'] = ? OR
- SpanAttributes['rpc.system'] = ? OR
- SpanAttributes['db.system'] = ? OR
- SpanAttributes['messaging.system'] = ?`,
- req.RequestMethod,
- req.RequestMethod,
- req.RequestMethod,
- req.RequestMethod,
- req.RequestMethod)
- }
- order := req.OrderBy([]string{"Duration", "Timestamp"}, "Duration", "DESC")
- if err := db.Order(order).Find(&list).Limit(-1).Offset(-1).Count(count).Error; err != nil {
- return errors.Wrap(err, "获取service相关span失败")
- }
- *resp = make([]dto.ServiceSpansResp, len(list))
- for i, item := range list {
- // (*resp)[i].Duration /= 1e6
- (*resp)[i].Datetime = item.Datetime.Local().Format(time.DateTime)
- (*resp)[i].Timestamp = item.Datetime.Unix()
- (*resp)[i].TraceId = item.TraceId
- (*resp)[i].SpanId = item.SpanId
- (*resp)[i].Method = item.Method
- (*resp)[i].Code = item.Code
- (*resp)[i].ServiceName = item.ServiceName
- (*resp)[i].Duration = item.Duration / 1e6
- }
- return nil
- }
- func (s *Service) GenService(resp *dto.ServiceJobGenServiceResp) error {
- defer s.genSingleNodeService(resp)
- list := []struct {
- TargetService string
- SourceService string
- TargetAlias string
- SourceAlias string
- SourceIcon string
- TargetIcon string
- }{}
- now := time.Now()
- // start, end := now.Add(-time.Hour), now //数据量太大,调整为分钟级,跨度大于增加的数据量即可
- start, end := now.Add(-5*time.Minute), now
- sub1 := s.ChOrm.Table(models.TableNameTrace).
- Select("SpanId, ParentSpanId, ServiceName, AppAlias, ResourceAttributes['telemetry.sdk.language'] SdkLang").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix())
- sub2 := s.ChOrm.Table(models.TableNameTrace).
- Select("SpanId, ServiceName, AppAlias, ResourceAttributes['telemetry.sdk.language'] SdkLang").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix())
- if err := s.ChOrm.Table("(?) AS ot1", sub1).Joins("INNER JOIN (?) AS ot2 ON ot2.SpanId = ot1.ParentSpanId", sub2).
- Select([]string{
- "ot1.ServiceName AS TargetService",
- "ot2.ServiceName AS SourceService",
- "any(ot1.AppAlias) AS TargetAlias",
- "any(ot2.AppAlias) AS SourceAlias",
- "any(ot1.SdkLang) AS TargetIcon",
- "any(ot2.SdkLang) AS SourceIcon",
- }).
- Where("TargetService != SourceService").
- Group("TargetService, SourceService").
- Find(&list).Error; err != nil {
- return errors.Wrap(err, "获取service信息失败")
- }
- if len(list) == 0 {
- return nil
- }
- nodes := map[string]models.ServiceNode{}
- edges := map[string]models.ServiceEdge{}
- edgeKey := func(edge models.ServiceEdge) string {
- return fmt.Sprintf("%s-%s-%s", edge.AppAlias, edge.Source, edge.Target)
- }
- // 先处理target结点,因为仅target结点能看出对内或对外
- for _, item := range list {
- edge := models.ServiceEdge{
- AppAlias: item.TargetAlias,
- Source: item.SourceService,
- Target: item.TargetService,
- SourceIcon: item.SourceIcon,
- TargetIcon: item.TargetIcon,
- }
- if item.SourceAlias != item.TargetAlias {
- edge.Source = "InCloud"
- edge.SourceIcon = "cloud"
- edge2 := models.ServiceEdge{
- AppAlias: item.SourceAlias,
- Source: item.SourceService,
- Target: "OutCloud",
- SourceIcon: item.SourceIcon,
- TargetIcon: "cloud",
- }
- // edges = append(edges, edge2)
- edges[edgeKey(edge2)] = edge2
- }
- // edges = append(edges, edge)
- edges[edgeKey(edge)] = edge
- appId, err := query.NewApp().Alias2ID(item.TargetAlias)
- if err != nil {
- continue
- }
- key := fmt.Sprintf("%s-%s", item.TargetService, item.TargetAlias)
- if node, ok := nodes[key]; !ok {
- node := models.ServiceNode{
- AppID: int32(appId),
- AppAlias: item.TargetAlias,
- Name: item.TargetService,
- ServiceName: item.TargetService,
- Kind: item.TargetIcon,
- CreatedAt: now,
- UpdatedAt: now,
- }
- node.Type = 1
- if item.TargetAlias != item.SourceAlias { // 说明是对外结点
- node.Type = 2
- }
- nodes[key] = node
- } else {
- if node.Type == 1 {
- if item.TargetAlias != item.SourceAlias {
- node.Type = 3
- }
- } else if node.Type == 2 {
- if item.TargetAlias == item.SourceAlias {
- node.Type = 3
- }
- }
- }
- }
- // 再处理source结点
- for _, item := range list {
- key := fmt.Sprintf("%s-%s", item.SourceService, item.SourceAlias)
- if _, ok := nodes[key]; ok { // 如果存在,说明其已经作为target存在过,不再处理
- continue
- }
- appId, err := query.NewApp().Alias2ID(item.TargetAlias)
- if err != nil {
- continue
- }
- nodes[key] = models.ServiceNode{
- AppID: int32(appId),
- AppAlias: item.SourceAlias,
- Name: item.SourceService,
- ServiceName: item.SourceService,
- Type: 1, // 固定为对内
- Kind: item.SourceIcon,
- CreatedAt: now,
- UpdatedAt: now,
- }
- }
- nodeList := make([]models.ServiceNode, 0, len(nodes))
- for _, node := range nodes {
- nodeList = append(nodeList, node)
- }
- edgeList := make([]models.ServiceEdge, 0, len(edges))
- for _, edge := range edges {
- edgeList = append(edgeList, edge)
- }
- // fmt.Println("nodelist: ", nodeList)
- s.genVirtualService(&nodeList, &edgeList)
- newNodes := s.getNewNodes(nodeList)
- s.insertNewNodes(newNodes, resp)
- newEdges := s.getNewEdges(edgeList)
- s.insertNewEdges(newEdges, resp)
- return nil
- }
- // 处理仅有一个服务的应用
- func (s *Service) genSingleNodeService(resp *dto.ServiceJobGenServiceResp) error {
- now := time.Now()
- start, end := now.Add(-10*time.Minute), now.Add(-5*time.Minute)
- // 查询区间内的AppAlias
- appAliases := []string{}
- err := s.ChOrm.Table(models.TableNameTrace).Distinct("AppAlias").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix()).
- Pluck("AppAlias", &appAliases).Error
- if err != nil {
- return errors.Wrap(err, "查询区间内应用失败")
- }
- wg := sync.WaitGroup{}
- var mu sync.Mutex
- for _, appAlias := range appAliases {
- wg.Add(1)
- go func(wg *sync.WaitGroup, appAlias string) error {
- defer wg.Done()
- list := []struct {
- ServiceName string
- SpanKind string
- SdkLang string
- }{}
- traceIdTotal := []struct {
- TraceId string
- Total int64
- }{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select("TraceId, count() as Total").
- Where("Timestamp>=toDateTime(?) AND Timestamp<toDateTime(?)", start.Unix(), end.Unix()).
- Where("AppAlias", appAlias).
- Group("TraceId").
- Having("Total", 1).Limit(10).
- Find(&traceIdTotal).Error; err != nil {
- return errors.Wrap(err, "获取TraceID失败")
- }
- traceIds := make([]string, len(traceIdTotal))
- for i, t := range traceIdTotal {
- traceIds[i] = t.TraceId
- }
- sub := s.ChOrm.Debug().Table(models.TableNameTrace).
- Select("TraceId, any(ServiceName) ServiceName, any(SpanKind) SpanKind, any(ResourceAttributes['telemetry.sdk.language']) SdkLang, COUNT() as Total").
- Where("Timestamp>=toDateTime(?)", start.Add(-10*time.Minute).Unix()).Where("TraceId IN ?", traceIds).
- Where("AppAlias", appAlias).
- Group("TraceId").Having("Total", 1)
- if err := s.ChOrm.Table("(?) as t", sub).Distinct("ServiceName, SpanKind, SdkLang").Find(&list).Error; err != nil {
- return errors.Wrap(err, "获取单服务结点信息失败")
- }
- nodes := map[string]models.ServiceNode{}
- edges := []models.ServiceEdge{}
- for _, item := range list {
- num := int64(0)
- s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", appAlias, item.ServiceName).Count(&num)
- if num > 0 {
- continue
- }
- // 如果 ot_service_node 表中不存在, 说明这个服务结点未插入过,且属于单服务结点
- key := fmt.Sprintf("%s-%s", item.ServiceName, appAlias)
- appId, err := query.NewApp().Alias2ID(appAlias)
- if err != nil {
- continue
- }
- if item.SpanKind == "SPAN_KIND_SERVER" {
- nodes[key] = models.ServiceNode{
- AppID: int32(appId),
- AppAlias: appAlias,
- Name: item.ServiceName,
- ServiceName: item.ServiceName,
- Kind: item.SdkLang,
- CreatedAt: now,
- UpdatedAt: now,
- }
- edges = append(edges, models.ServiceEdge{
- AppAlias: appAlias,
- Source: "InCloud",
- Target: item.ServiceName,
- SourceIcon: "cloud",
- TargetIcon: item.SdkLang,
- })
- } else if item.SpanKind == "SPAN_KIND_CLIENT" {
- nodes[key] = models.ServiceNode{
- AppID: int32(appId),
- AppAlias: appAlias,
- Name: item.ServiceName,
- ServiceName: item.ServiceName,
- Kind: item.SdkLang,
- CreatedAt: now,
- UpdatedAt: now,
- }
- edges = append(edges, models.ServiceEdge{
- AppAlias: appAlias,
- Source: item.ServiceName,
- Target: "OutCloud",
- SourceIcon: item.SdkLang,
- TargetIcon: "cloud",
- })
- }
- }
- nodeList := make([]models.ServiceNode, 0, len(nodes))
- for _, node := range nodes {
- nodeList = append(nodeList, node)
- }
- // fmt.Println("nodelist: ", nodeList)
- // s.genVirtualService(&nodeList, &edges)
- mu.Lock()
- newNodes := s.getNewNodes(nodeList)
- s.insertNewNodes(newNodes, resp)
- newEdges := s.getNewEdges(edges)
- s.insertNewEdges(newEdges, resp)
- mu.Unlock()
- return nil
- }(&wg, appAlias)
- }
- wg.Wait()
- return nil
- }
- // 插入结点数据
- func (s *Service) insertNewNodes(newNodes []models.ServiceNode, resp *dto.ServiceJobGenServiceResp) error {
- if len(newNodes) > 0 {
- errs := ""
- result := s.Orm.Model(&models.ServiceNode{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newNodes)
- if result.Error != nil {
- errs = result.Error.Error()
- }
- resp.NodeResult = dto.CreateResult{
- Error: errs,
- RowsAffected: result.RowsAffected + resp.NodeResult.RowsAffected,
- Total: len(newNodes) + resp.NodeResult.Total,
- }
- }
- return nil
- }
- // 插入边数据
- func (s *Service) insertNewEdges(newEdges []models.ServiceEdge, resp *dto.ServiceJobGenServiceResp) error {
- if len(newEdges) > 0 {
- errs := ""
- result := s.Orm.Model(&models.ServiceEdge{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newEdges)
- if result.Error != nil {
- errs = result.Error.Error()
- }
- resp.EdgeResult = dto.CreateResult{
- Error: errs,
- RowsAffected: result.RowsAffected + resp.EdgeResult.RowsAffected,
- Total: len(newEdges) + resp.EdgeResult.Total,
- }
- }
- return nil
- }
- // 获取服务结点数据 用于在边数据不存在的情况下
- func (s *Service) getNewNodesWithoutEdges() ([]models.ServiceNode, error) {
- list := []struct {
- ServiceName string
- AppAlias string
- SdkLang string
- }{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Select("ServiceName, AppAlias, any(ResourceAttributes['telemetry.sdk.language']) as SdkLang").
- Where("Timestamp>=now()-interval 5 minute").
- Group("ServiceName, AppAlias").Find(&list).Error; err != nil {
- return []models.ServiceNode{}, errors.Wrap(err, "获取服务结点失败")
- }
- nodes := make([]models.ServiceNode, 0, len(list))
- for _, item := range list {
- if appId, err := query.NewApp().Alias2ID(item.AppAlias); err == nil {
- nodes = append(nodes, models.ServiceNode{
- AppID: int32(appId),
- AppAlias: item.AppAlias,
- Name: item.ServiceName,
- ServiceName: item.ServiceName,
- Type: 1, // 仅有一个结点,肯定是对内;此处有个问题,就是后期如果对外了, 没有地方可以更新
- Kind: item.SdkLang,
- })
- }
- }
- newNodes := s.getNewNodes(nodes)
- return newNodes, nil
- }
- // 过滤nodes中的结点, 只要数据库中不存在的node数据
- func (s *Service) getNewNodes(nodes []models.ServiceNode) []models.ServiceNode {
- newNodes := make([]models.ServiceNode, 0)
- sq := query.NewService()
- for _, node := range nodes {
- has, _ := sq.HasNode(node.AppAlias, node.ServiceName)
- if !has {
- newNodes = append(newNodes, node)
- }
- }
- return newNodes
- }
- func (s *Service) getNewEdges(edges []models.ServiceEdge) []models.ServiceEdge {
- newEdges := make([]models.ServiceEdge, 0)
- sq := query.NewService()
- for _, edge := range edges {
- has, _ := sq.HasEdge(edge.AppAlias, edge.Source, edge.Target)
- if !has {
- newEdges = append(newEdges, edge)
- }
- }
- return newEdges
- }
- // 生成 virtual Service
- func (s *Service) genVirtualService(nodes *[]models.ServiceNode, edges *[]models.ServiceEdge) {
- uniq := func(a, b string) string {
- return fmt.Sprintf("%s-%s", a, b)
- }
- nodeMap := map[string]models.ServiceNode{}
- for _, node := range *nodes {
- nodeMap[uniq(node.AppAlias, node.ServiceName)] = node
- }
- sources, targets := map[string]struct{}{}, map[string]struct{}{}
- for _, edge := range *edges {
- sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target)
- if _, ok := nodeMap[sk]; ok {
- sources[sk] = struct{}{}
- }
- if _, ok := nodeMap[tk]; ok {
- targets[tk] = struct{}{}
- }
- }
- // 找到所有的root和leaf 结点
- rootNodes, leafNodes := map[string]models.ServiceNode{}, map[string]models.ServiceNode{}
- for _, edge := range *edges {
- sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target)
- // 如果source在targets中不存在,说明source是root
- if _, ok := targets[sk]; !ok {
- rootNodes[sk] = nodeMap[sk]
- // rootNodes = append(rootNodes, nodeMap[sk])
- }
- // 如果target在sources中不存在,说明target是leaf
- if _, ok := sources[tk]; !ok {
- leafNodes[tk] = nodeMap[tk]
- // leafNodes = append(leafNodes, nodeMap[tk])
- }
- }
- // 对于每个root结点, 检查它对应的root span, 如果root span的span kind为server类型, 则需要在它的上层添加一个virtual service
- for _, node := range rootNodes {
- spans := []models.Trace{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Where("Timestamp>=NOW()-INTERVAL 1 HOUR").
- Where("ParentSpanId=''").
- Where("SpanKind='SPAN_KIND_SERVER'").
- Where("ServiceName", node.ServiceName).
- Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil {
- continue
- }
- if len(spans) <= 0 {
- continue
- }
- vnode := models.ServiceNode{
- AppID: node.AppID,
- AppAlias: node.AppAlias,
- ServiceName: fmt.Sprintf("%s:frontend", node.ServiceName),
- Type: 1, // 对内服务
- }
- vnode.Name = vnode.ServiceName
- vedge := models.ServiceEdge{
- AppAlias: node.AppAlias,
- Source: vnode.ServiceName,
- Target: node.ServiceName,
- SourceIcon: "",
- }
- vedge.SourceIcon = "cloud"
- vedge.Source = "InCloud" // 所有virtual source都写作InCloud
- for _, span := range spans {
- spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute)
- if spanAttrs.UserAgent() != "" && vnode.Name == "" {
- vnode.Name = "Browser"
- }
- if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok && vedge.TargetIcon == "" {
- vedge.TargetIcon = lang
- }
- if vnode.Name != "" && vedge.TargetIcon != "" {
- break
- }
- }
- // *nodes = append(*nodes, vnode)
- *edges = append(*edges, vedge)
- }
- // 对于每个leaf结点, 检查它对应的span中,是否有span kind为client且为db相关的
- for _, node := range leafNodes {
- spans := []models.Trace{}
- if err := s.ChOrm.Table(models.TableNameTrace).
- Where("Timestamp>=NOW()-INTERVAL 1 HOUR").
- Where("SpanKind='SPAN_KIND_CLIENT' AND (SpanAttributes['db.system'] != '' OR SpanAttributes['db.type'] != '')").
- Where("ServiceName", node.ServiceName).
- Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil {
- continue
- }
- if len(spans) <= 0 {
- continue
- }
- vnode := models.ServiceNode{
- AppID: node.AppID,
- AppAlias: node.AppAlias,
- ServiceName: "",
- Type: 1, // 对内服务
- }
- vedge := models.ServiceEdge{
- AppAlias: node.AppAlias,
- Source: node.ServiceName,
- }
- for _, span := range spans {
- spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute)
- if vnode.ServiceName == "" && spanAttrs.IsDB() {
- vnode.ServiceName = spanAttrs.DbSystem()
- }
- if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok {
- vedge.SourceIcon = lang
- }
- if vnode.ServiceName != "" && vedge.SourceIcon != "" {
- break
- }
- }
- // if vnode.ServiceName == "" {
- // vnode.ServiceName = "db:unknown"
- // vedge.TargetIcon = "database"
- // } else {
- // vedge.TargetIcon = vnode.ServiceName
- // }
- vedge.TargetIcon = "cloud"
- vnode.Name = vnode.ServiceName
- // vedge.Target = vnode.ServiceName
- vedge.Target = "OutCloud"
- // *nodes = append(*nodes, vnode) // virtual service node不加入node表
- *edges = append(*edges, vedge)
- }
- }
- func (s *Service) List(req *dto.ServiceListReq, resp *[]dto.ServiceListResp, total *int64) error {
- appAlias := ""
- err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error
- if err != nil {
- return errors.Wrap(err, "未获取到app alias")
- }
- pageScopes := cDto.Paginate(req.GetPageSize(), req.GetPageIndex())
- result := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Scopes(pageScopes).Find(&resp)
- if result.Error != nil {
- return errors.Wrap(result.Error, "获取服务列表失败")
- }
- if err := result.Limit(-1).Offset(-1).Count(total).Error; err != nil {
- return errors.Wrap(err, "获取服务总数量失败")
- }
- return nil
- }
- func (s *Service) ListNoAppAlias(req *dto.ServiceListNoAppAliasReq, resp *[]dto.ServiceListNoAppAliasResp, total *int64) error {
- req.CheckFilling(time.Minute * 5)
- if err := s.ChOrm.Model(&models.Trace{}).
- Select("ServiceName, COUNT() as SpanTotal, COUNT(DISTINCT TraceId) as TraceTotal").
- Where("Timestamp>=toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Where("AppAlias", "UNSET").
- Group("ServiceName").
- Scopes(cDto.Paginate(req.GetPageSize(), req.GetPageIndex())).
- Find(resp).Offset(-1).Limit(-1).Count(total).Error; err != nil {
- return errors.Wrap(err, "获取离散服务数据失败")
- }
- return nil
- }
- func (s *Service) Update(c *dto.ServiceUpdateReq) error {
- var err error
- var data = models.ServiceNode{}
- err = s.Orm.First(&data, c.GetId()).Error
- if err != nil {
- if err == gorm.ErrRecordNotFound {
- return fmt.Errorf("更新目标不存在, id: %d", c.GetId())
- }
- return err
- }
- c.Generate(&data)
- db := s.Orm.Save(&data) // 允许自定义重名
- if err = db.Error; err != nil {
- s.Log.Errorf("Service update error:%s \r\n", err)
- return err
- }
- return nil
- }
- func (s *Service) StatsFromClickhouse(req *dto.ServiceStatsReq, resp *dto.ServiceStatsResp) error {
- node := models.ServiceNode{}
- var err error
- if req.ID > 0 {
- err = s.Orm.Find(&node, req.ID).Error
- } else {
- err = s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", req.AppAlias, req.ServiceName).Find(&node).Error
- }
- if err != nil {
- return errors.Wrap(err, "查询服务结点失败")
- }
- if node.AppAlias == "" || node.ServiceName == "" {
- return errors.New("缺少必要参数 app_alias, service_name")
- }
- req.CheckFilling(time.Minute * 5)
- if err := s.ChOrm.Model(&models.Trace{}).
- Select("ServiceName, count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, max(Duration)/1e6 as Max, avg(Duration)/1e6 as Avg").
- Where("AppAlias", node.AppAlias).
- Where("ServiceName", node.ServiceName).
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Group("ServiceName").Find(resp).Error; err != nil {
- return errors.Wrap(err, "获取基础统计信息失败")
- }
- resp.ID = int64(node.ID)
- resp.ErrorRate = float64(resp.ErrorNum) / float64(resp.Total)
- resp.Rpm = math.Round(float64(resp.Total)/float64((req.EndTime-req.StartTime)/60)*100) / 100
- resp.Max = math.Round(resp.Max*100) / 100
- resp.Avg = math.Round(resp.Avg*100) / 100
- timeField := "formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- if req.EndTime-req.StartTime >= 60*60 {
- timeField = "formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime"
- }
- fields := []string{
- timeField,
- "quantile(0.5)(Duration)/1e6 as P50Duration",
- "quantile(0.90)(Duration)/1e6 as P90Duration",
- "quantile(0.99)(Duration)/1e6 as P99Duration",
- }
- quantiles := []struct {
- StartTime string
- P50Duration float64
- P90Duration float64
- P99Duration float64
- }{}
- if err := s.ChOrm.Model(&models.Trace{}).Select(fields).
- Where("AppAlias", node.AppAlias).
- Where("ServiceName", node.ServiceName).
- Where("Timestamp>toDateTime(?) and Timestamp<toDateTime(?)", req.StartTime, req.EndTime).
- Group(timeField).Order(fmt.Sprintf("%s asc", timeField)).Find(&quantiles).Error; err != nil {
- return errors.Wrap(err, "获取分位数数据失败")
- }
- resp.Quantiles = dto.Quantiles{
- Time: make([]string, len(quantiles)),
- P50: make([]float64, len(quantiles)),
- P90: make([]float64, len(quantiles)),
- P99: make([]float64, len(quantiles)),
- }
- for i, quantile := range quantiles {
- resp.Quantiles.Time[i] = quantile.StartTime
- resp.Quantiles.P50[i] = math.Round(quantile.P50Duration*100) / 100
- resp.Quantiles.P90[i] = math.Round(quantile.P90Duration*100) / 100
- resp.Quantiles.P99[i] = math.Round(quantile.P99Duration*100) / 100
- }
- return nil
- }
- func (s *Service) Stats(req *dto.ServiceStatsReq, resp *dto.ServiceStatsResp) error {
- node := models.ServiceNode{}
- if err := s.Orm.Find(&node, req.ID).Error; err != nil {
- return errors.Wrap(err, "查询服务结点失败")
- }
- req.CheckFilling(time.Minute * 5)
- mins := (req.EndTime - req.StartTime) / 60
- metric := "observe_server_duration_milliseconds"
- labels := map[string]string{"service_name": node.ServiceName}
- ts := time.Unix(req.EndTime, 0)
- wg := sync.WaitGroup{}
- tb := prometheus.NewHistogram(&wg, metric, labels, ts, mins)
- // 计算错误
- errorRate := float64(0)
- wg.Add(1)
- go tb.ErrorRate(&errorRate)
- // 计算分位数
- times := []string{}
- p50s, p90s, p99s := []float64{}, []float64{}, []float64{}
- wg.Add(3)
- go tb.QuantileMinutes(0.5, &[]string{}, &p50s)
- go tb.QuantileMinutes(0.9, &[]string{}, &p90s)
- go tb.QuantileMinutes(0.99, ×, &p99s)
- // 计算最大时延
- wg.Add(1)
- maxDuration := float64(0)
- go tb.Quantile(1.0, &maxDuration) // 最大时延就是百分位数为1的值
- // 计算平均时延
- wg.Add(1)
- avgDuration := float64(0)
- go tb.Avg(&avgDuration)
- // 记录 total
- wg.Add(1)
- total := int64(0)
- go tb.Total(&total)
- wg.Wait()
- // resp.ID = int64(node.ID)
- resp.ServiceName = node.ServiceName
- // resp.Total = st.Total
- resp.Total = total
- resp.Rpm = math.Round(float64(resp.Total)/float64((mins))*100) / 100
- resp.Max = maxDuration
- resp.Avg = avgDuration
- if math.IsNaN(resp.Avg) {
- resp.Avg = 0
- }
- if errorRate > 0 {
- resp.ErrorRate = errorRate // 错误率的计算公式可能有问题
- resp.ErrorNum = int64(errorRate * float64(resp.Total))
- }
- resp.Quantiles.Time = times
- resp.Quantiles.P50 = p50s
- resp.Quantiles.P90 = p90s
- resp.Quantiles.P99 = p99s
- return nil
- }
- // 数字视图处理逻辑
- func (s *Service) Digits(req *dto.ServiceDigitsReq, resp *dto.ServiceDigitsResp) error {
- key := fmt.Sprintf("observe__service_digits_%s_%s", req.AppAlias, req.ServiceName)
- if req.ServiceName == "" {
- key = fmt.Sprintf("observe__service_digits_%s", req.AppAlias)
- }
- rdb := config.GetRedisClient()
- res, err := rdb.HGetAll(key).Result()
- if err == nil {
- // 由于当数据不存在时,不会写缓存,所以这里数字视图的缓存数据有可能不存在,属于正常情况
- // if len(res) == 0 {
- // return errors.New("数字视图缓存数据不存在,请稍后再试")
- // }
- resp.Span, _ = strconv.ParseFloat(res["span"], 64)
- resp.Span = math.Round(resp.Span*100) / 100
- resp.Trace, _ = strconv.ParseFloat(res["trace"], 64)
- resp.Trace = math.Round(resp.Trace*100) / 100
- resp.Http, _ = strconv.ParseFloat(res["http"], 64)
- resp.Http = math.Round(resp.Http*100) / 100
- resp.Rpc, _ = strconv.ParseFloat(res["rpc"], 64)
- resp.Rpc = math.Round(resp.Rpc*100) / 100
- resp.DB, _ = strconv.ParseFloat(res["db"], 64)
- resp.DB = math.Round(resp.DB*100) / 100
- resp.Error, _ = strconv.ParseFloat(res["error"], 64)
- resp.Error = math.Round(resp.Error*100) / 100
- return nil
- }
- return errors.Wrap(err, "数字视图缓存获取失败")
- }
- // 服务数字视图生成, 生成最近1小时内,每分钟的数据
- func (s *Service) DigitsGen() error {
- digits := []struct {
- AppAlias string
- ServiceName string
- Span float64
- Trace float64
- Http float64
- Rpc float64
- Error float64
- }{}
- result := s.ChOrm.Table(models.TableNameTrace).Select([]string{
- "AppAlias AS AppAlias",
- "ServiceName AS ServiceName",
- "COUNT()/60 AS Span",
- "COUNT(DISTINCT TraceId)/60 AS Trace",
- "SUM(if(SpanKind='SPAN_KIND_SERVER' AND (SpanAttributes['http.status_code']>='200' OR SpanAttributes['http.response.status_code']>='200'), 1, 0))/60 AS Http",
- "SUM(if(SpanKind='SPAN_KIND_SERVER' AND SpanAttributes['rpc.system']!='', 1, 0))/60 AS Rpc",
- "SUM(if(SpanAttributes['db.type']='' OR SpanAttributes['db.system']!='', 1, 0))/60 AS Db",
- "SUM(if(StatusCode='STATUS_CODE_ERROR' OR SpanAttributes['http.status_code']>='400' OR SpanAttributes['http.response.status_code']>='400', 1, 0))/60 AS Error",
- }).Where("Timestamp>=NOW()-INTERVAL 1 HOUR").Group("AppAlias, ServiceName").Find(&digits)
- if result.Error != nil {
- return errors.Wrap(result.Error, "获取数字视图数据失败")
- }
- rdb := config.GetRedisClient()
- pipe := rdb.Pipeline()
- aliasDigits := map[string]map[string]int{}
- for _, digit := range digits {
- key := fmt.Sprintf("observe__service_digits_%s_%s", digit.AppAlias, digit.ServiceName)
- pipe.HSet(key, map[string]interface{}{
- "span": digit.Span,
- "trace": digit.Trace,
- "http": digit.Http,
- "rpc": digit.Rpc,
- "error": digit.Error,
- })
- pipe.Expire(key, time.Hour) // 缓存1小时
- // 计算 每个应用下 所有service的数字视图数据
- key = fmt.Sprintf("observe__service_digits_%s", digit.AppAlias)
- if _, ok := aliasDigits[key]; !ok {
- aliasDigits[key] = map[string]int{}
- }
- aliasDigits[key]["span"] += int(digit.Span)
- aliasDigits[key]["trace"] += int(digit.Trace)
- aliasDigits[key]["http"] += int(digit.Http)
- aliasDigits[key]["rpc"] += int(digit.Rpc)
- aliasDigits[key]["error"] += int(digit.Error)
- }
- for key, digit := range aliasDigits {
- pipe.HSet(key, map[string]interface{}{
- "span": digit["span"],
- "trace": digit["trace"],
- "http": digit["http"],
- "rpc": digit["rpc"],
- "error": digit["error"],
- })
- }
- pipe.Exec()
- return nil
- }
|