package service import ( "context" "fmt" "go-admin/app/observe/models" "go-admin/app/observe/models/query" "go-admin/app/observe/service/dto" cDto "go-admin/common/dto" "go-admin/common/opentelemetry" "go-admin/common/prometheus" "go-admin/utils" "math" "strconv" "strings" "sync" "time" "github.com/go-admin-team/go-admin-core/sdk/config" "github.com/pkg/errors" "gorm.io/gorm" "gorm.io/gorm/clause" ) const ( ChWithSql = `SpanAttributes['http.url'] != '' AS isHttp, SpanAttributes['rpc.system'] != '' AS isRpc, SpanAttributes['rpc.system'] AS rpcSystem, SpanAttributes['db.system'] != '' AS isDb, SpanAttributes['db.system'] AS dbSystem, SpanAttributes['messaging.system'] AS messagingSystem` FIFTYMINUTE = "toStartOfFifteenMinutes" ONEMINUTE = "toStartOfMinute" ) type Service struct { utils.OtService } func (s *Service) GetEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphEdge) error { // 边暂时不计算相关统计值,因为现在的前端用不到,隐藏以提高性能 // sql := `WITH toDateTime(?) AS StartTime, toDateTime(?) AS EndTime, ? AS seconds, ? AS appAlias // SELECT // ot2.ServiceName AS SourceService, // ot1.ServiceName AS TargetService, // ot2.RequestType AS RequestType, // COUNT()/seconds AS Qps, // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0)) AS ErrorNum, // SUM(IF(StatusCode == 'STATUS_CODE_ERROR', 1, 0))/COUNT() AS ErrorRate, // AVG(Duration)/1e6 AS DurationAverage, // quantile(0.5)(Duration)/1e6 AS DurationMedian, // quantile(0.9)(Duration)/1e6 AS DurationP90, // quantile(0.99)(Duration)/1e6 AS DurationP99 // FROM // ( // SELECT // TraceId, // SpanId, // ParentSpanId, // ServiceName, // Duration, // StatusCode, // AppAlias // FROM otel.otel_traces // WHERE AppAlias=appAlias AND Timestamp > StartTime AND Timestamp < EndTime // ) AS ot1 // INNER JOIN // ( // SELECT // SpanId, // ServiceName, // IF(SpanAttributes['http.url'] != '', 'http', IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'] , IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system']))) as RequestType // FROM otel.otel_traces // WHERE AppAlias=appAlias and Timestamp > StartTime AND Timestamp < EndTime // ) AS ot2 ON ot2.SpanId = ot1.ParentSpanId // WHERE SourceService != TargetService // GROUP BY SourceService, TargetService, RequestType` // seconds := params.EndTime - params.StartTime // rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, seconds, params.AppAlias) // param.AppAlias // if err != nil { // return err // } // edgeMap := map[string]models.GraphEdge{} // for rows.Next() { // row := new(dto.ServiceEdgeRaw) // if err := rows.ScanStruct(row); err != nil { // s.Log.Errorf("扫描行到结构体失败: %s", err) // } // edgeMap[s.getEdgeId(row.SourceService, row.TargetService)] = models.GraphEdge{ // ID: row.SourceService + "-" + row.TargetService, // Source: row.SourceService, // Target: row.TargetService, // MainStat: fmt.Sprintf("total: %d, %s: %.2fr/s, err: %d / %.2f%%", row.TotalNum, row.RequestType, row.Qps, row.ErrorNum, row.ErrorRate), // 展示示例 grpc:12.39req/s // SecondaryStat: fmt.Sprintf("avg: %.2fms, med: %.2fms, p90: %.2fms, p99: %.2fms", // row.DurationAverage, row.DurationMedian, row.DurationP90, row.DurationP99), // } // } _, defaultEdgeMap := s.getDefaultGraph(params.AppAlias) // for k, edge := range edgeMap { // defaultEdgeMap[k] = edge // } *result = defaultEdgeMap return nil } func (s *Service) GetNoSoulEdges(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphEdge) error { // nodes := []models.ServiceNode{} edgeMap := map[string]models.GraphEdge{} _, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias) for k, edge := range edgeMap { defaultEdgeMap[k] = edge } for _, edge := range defaultEdgeMap { *result = append(*result, edge) } return nil } func (s *Service) GetNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *map[string]models.GraphNodeScope) error { sql := `SELECT ServiceName, countIf(Duration <= 1.5 * 1000 * 1000 *1000 ) AS satisfied, countIf(Duration > 1.5 * 1000 * 1000 * 1000 AND Duration <= 2.5 * 1000 * 1000 * 1000 ) AS tolerable, countIf(Duration > 2.5 * 1000 * 1000 * 1000 ) AS frustrated, ROUND((satisfied + tolerable / 2.0 ) / (satisfied + tolerable + frustrated), 4) AS Apdex, COUNT(DISTINCT TraceId) AS TraceNum, COUNT() AS SpanNum, SUM(IF(SpanKind IN ('SPAN_KIND_CLIENT', 'SPAN_KIND_PRODUCER'), 1, 0)) AS SentNum, SUM(IF(SpanKind IN ('SPAN_KIND_SERVER', 'SPAN_KIND_CONSUMER'), 1, 0)) AS ReceivedNum, SUM(IF(SpanKind='SPAN_KIND_INTERNAL', 1, 0)) AS InternalNum, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum, any(ResourceAttributes['telemetry.sdk.language']) AS SdkLang FROM otel_traces ot WHERE Timestamp > toDateTime(?) AND Timestamp < toDateTime(?) AND AppAlias = ? GROUP BY ServiceName ORDER BY ServiceName` rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias) if err != nil { s.Log.Error("执行sql错误: %s", err) return err } serviceNodes := []models.ServiceNode{} s.Orm.Model(&models.ServiceNode{}).Where("app_alias", params.AppAlias).Find(&serviceNodes) serviceNameMap := map[string]string{} for _, node := range serviceNodes { serviceNameMap[node.ServiceName] = node.Name } nodeMap := map[string]models.GraphNodeScope{} s.Log.Debug("-- rows扫描行到graph_node_scope --") for rows.Next() { row := new(dto.SerivceGraphNodeRaw) if err := rows.ScanStruct(row); err != nil { s.Log.Errorf("扫描行到结构体失败: %s", err) } title, ok := serviceNameMap[row.ServiceName] if !ok { title = row.ServiceName } nodeMap[row.ServiceName] = models.GraphNodeScope{ ID: row.ServiceName, Title: title, SubTitle: "", Send: int64(row.SentNum), Receive: int64(row.ReceivedNum), SecondaryStat: "", ArcSuccess: float64(row.SpanNum-row.ErrorNum) / float64(row.SpanNum), ArcFaild: float64(row.ErrorNum) / float64(row.SpanNum), Icon: row.SdkLang, Apdex: row.Apdex, // MainStat: fmt.Sprintf("sent: %d, received: %d", row.SentNum, row.ReceivedNum), } } defaultNodeMap, _ := s.getDefaultGraph(params.AppAlias) for k, node := range nodeMap { defaultNodeMap[k] = node } *result = defaultNodeMap // for _, node := range defaultNodeMap { // *result = append(*result, node) // } return nil } func (s *Service) GetNoSoulNodes(ctx context.Context, params *dto.ServiceGetEdgesReq, result *[]models.GraphNodeScope) error { defaultNodeMap, _ := s.getDefaultGraphNoSoul(params.AppAlias) for _, node := range defaultNodeMap { *result = append(*result, node) } return nil } func (s *Service) GetGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) { params.CheckFilling(5 * time.Minute) nodes, edges := make(map[string]models.GraphNodeScope, 0), make(map[string]models.GraphEdge, 0) if err = s.GetNodes(ctx, params, &nodes); err != nil { return } if err = s.GetEdges(ctx, params, &edges); err != nil { return } // nodes, edges, _ = s.extendNodesAndEdges(params, nodes, edges) for _, node := range nodes { result.Nodes = append(result.Nodes, node) } for _, edge := range edges { result.Edges = append(result.Edges, edge) } // result.Edges = edges // result.Nodes = nodes return } func (s *Service) GetNoSoulGraph(ctx context.Context, params *dto.ServiceGetEdgesReq, result *models.Graph) (err error) { params.CheckFilling(5 * time.Minute) nodes, edges := make([]models.GraphNodeScope, 0), make([]models.GraphEdge, 0) defaultNodeMap, defaultEdgeMap := s.getDefaultGraphNoSoul(params.AppAlias) // defaultNodeMap, defaultEdgeMap, _ = s.extendNodesAndEdges(params, defaultNodeMap, defaultEdgeMap) for _, node := range defaultNodeMap { nodes = append(nodes, node) } for _, edge := range defaultEdgeMap { edges = append(edges, edge) } // if err = s.GetNoSoulNodes(ctx, params, &nodes); err != nil { // return // } // if err = s.GetNoSoulEdges(ctx, params, &edges); err != nil { // return // } result.Edges = edges result.Nodes = nodes return } func (s *Service) getDefaultGraph(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) { data := []models.ServiceEdge{} edges = make(map[string]models.GraphEdge) nodes = make(map[string]models.GraphNodeScope) serviceNodes := []models.ServiceNode{} s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes) serviceNameMap := map[string]string{} for _, node := range serviceNodes { serviceNameMap[node.ServiceName] = node.Name } if err := s.Orm.Model(&models.ServiceEdge{}).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil { for _, item := range data { sourceTitle, targetTitle := item.Source, item.Target if title, ok := serviceNameMap[sourceTitle]; ok { sourceTitle = title } if title, ok := serviceNameMap[targetTitle]; ok { targetTitle = title } source, target := item.Source, item.Target if item.SourceApp == appAlias { // 如果源应用是当前应用 if item.SourceType != "service" { continue } nodes[source] = models.GraphNodeScope{ ID: source, Title: sourceTitle, Icon: item.SourceIcon, } if item.TargetType == "application" { target = "downstream" nodes[target] = models.GraphNodeScope{ ID: target, Title: "下游应用", Icon: "cloud", } } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" { nodes[target] = models.GraphNodeScope{ ID: target, Title: targetTitle, Icon: item.TargetIcon, } } else { continue } } if item.TargetApp == appAlias { if item.TargetType != "service" { continue } nodes[target] = models.GraphNodeScope{ ID: target, Title: target, Icon: item.TargetIcon, } if item.SourceType == "application" || item.SourceType == "client" { source = "upstream" nodes[source] = models.GraphNodeScope{ ID: source, Title: "上游应用", Icon: "cloud", } } else if item.SourceType == "service" && item.SourceApp == appAlias { nodes[source] = models.GraphNodeScope{ ID: source, Title: sourceTitle, Icon: item.SourceIcon, } } else { continue } } // if item.SourceType == "client" || item.SourceType == "application" { // source = "upstream" // nodes[source] = models.GraphNodeScope{ // ID: source, // Title: "上游应用", // Icon: "cloud", // } // } else { // nodes[source] = models.GraphNodeScope{ // ID: source, // Title: sourceTitle, // MainStat: "sent: 0, received: 0", // Icon: item.SourceIcon, // } // } // if item.TargetType == "application" { // target = "downstream" // nodes[target] = models.GraphNodeScope{ // ID: target, // Title: "下游应用", // Icon: "cloud", // } // } else { // nodes[target] = models.GraphNodeScope{ // ID: target, // Title: targetTitle, // MainStat: "sent: 0, received: 0", // Icon: item.TargetIcon, // } // } edges[s.getEdgeId(source, target)] = models.GraphEdge{ ID: s.getEdgeId(source, target), Source: source, Target: target, MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%", } } } return } func (s *Service) getDefaultGraphNoSoul(appAlias string) (nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) { data := []models.ServiceEdge{} serviceNodes := []models.ServiceNode{} s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Find(&serviceNodes) serviceNameMap := map[string]string{} for _, node := range serviceNodes { serviceNameMap[node.ServiceName] = node.Name } nodes = make(map[string]models.GraphNodeScope) if err := s.Orm.Table(models.TableNameServiceEdge).Where("source_app=? or target_app=?", appAlias, appAlias).Find(&data).Error; err == nil { edges = make(map[string]models.GraphEdge, len(data)) for _, item := range data { source, target := item.Source, item.Target sourceTitle, targetTitle := item.Source, item.Target if title, ok := serviceNameMap[sourceTitle]; ok { sourceTitle = title } if title, ok := serviceNameMap[targetTitle]; ok { targetTitle = title } if item.SourceApp == appAlias { // 如果源应用是当前应用 if item.SourceType != "service" { continue } nodes[source] = models.GraphNodeScope{ ID: source, Title: sourceTitle, Icon: item.SourceIcon, } if item.TargetType == "application" { target = "downstream" nodes[target] = models.GraphNodeScope{ ID: target, Title: "下游应用", Icon: "cloud", } } else if (item.TargetType == "service" && item.TargetApp == appAlias) || item.TargetType == "database" || item.TargetType == "messaging" { nodes[target] = models.GraphNodeScope{ ID: target, Title: targetTitle, Icon: item.TargetIcon, } } else { continue } } if item.TargetApp == appAlias { if item.TargetType != "service" { continue } nodes[target] = models.GraphNodeScope{ ID: target, Title: targetTitle, Icon: item.TargetIcon, } if item.SourceType == "application" || item.SourceType == "client" { source = "upstream" nodes[source] = models.GraphNodeScope{ ID: source, Title: "上游应用", Icon: "cloud", } } else if item.SourceType == "service" && item.SourceApp == appAlias { nodes[source] = models.GraphNodeScope{ ID: source, Title: sourceTitle, Icon: item.SourceIcon, } } else { continue } } // if item.SourceType == "client" || item.SourceType == "application" { // source = "upstream" // nodes[source] = models.GraphNodeScope{ // ID: source, // Title: "上游应用", // Icon: "cloud", // } // } else { // nodes[source] = models.GraphNodeScope{ // ID: source, // Title: source, // Icon: source, // } // } // if item.TargetType == "application" { // target = "downstream" // nodes[target] = models.GraphNodeScope{ // ID: target, // Title: "下游应用", // Icon: "cloud", // } // } else { // nodes[target] = models.GraphNodeScope{ // ID: target, // Title: target, // Icon: target, // } // } edges[s.getEdgeId(source, target)] = models.GraphEdge{ ID: s.getEdgeId(source, target), Source: source, Target: target, // MainStat: "total: 0, http: 0.00r/s, err: 0 / 0.00%", } } } dn := []models.ServiceNode{} err := s.Orm.Table(models.TableNameServiceNode).Where("app_alias", appAlias).Find(&dn).Error if err == nil { for _, item := range dn { title := item.ServiceName if val, ok := serviceNameMap[item.ServiceName]; ok { title = val } nodes[item.ServiceName] = models.GraphNodeScope{ ID: item.ServiceName, Title: title, Icon: item.Kind, } } } return } // 扩展节点和边 func (s *Service) extendNodesAndEdges(params *dto.ServiceGetEdgesReq, nodes map[string]models.GraphNodeScope, edges map[string]models.GraphEdge) (map[string]models.GraphNodeScope, map[string]models.GraphEdge, error) { components := []models.SystemComponent{} err := s.Orm.Model(&models.SystemComponent{}).Where("app_alias", params.AppAlias).Find(&components).Error if err != nil { return nodes, edges, errors.Wrap(err, "获取系统组件失败") } dbMp, err := s.databaseNodeMap(params) if err != nil { return nodes, edges, err } msgMp, err := s.messagingNodeMap(params) if err != nil { return nodes, edges, err } cliMp, err := s.clientNodeMap(params) if err != nil { return nodes, edges, err } prevSvcToComponent := map[string]string{} nextSvcToComponent := map[string]string{} newEdges := map[string]models.GraphEdge{} for _, component := range components { if component.PrevServiceName != "" { prevSvcToComponent[component.PrevServiceName] = component.Component newEdges[s.getEdgeId(component.PrevServiceName, component.Component)] = models.GraphEdge{ Source: component.PrevServiceName, Target: component.Component, } } if component.NextServiceName != "" { nextSvcToComponent[component.NextServiceName] = component.Component newEdges[s.getEdgeId(component.Component, component.NextServiceName)] = models.GraphEdge{ Source: component.Component, Target: component.NextServiceName, } } node := models.GraphNodeScope{ ID: component.Component, Title: component.Name, Icon: component.Component, } if dbStat, ok := dbMp[component.Component]; ok { node.Receive = dbStat.Total node.Send = 0 node.ArcFaild = float64(dbStat.ErrorNum) / float64(dbStat.Total) node.ArcSuccess = 1 - node.ArcFaild } if msgStat, ok := msgMp[component.Component]; ok { node.Receive = msgStat.ReceivedNum node.Send = msgStat.SentNum // 严格来说, kafka通常只能被动接收,无发送量,但为不方便理解,将消费量作为发送量 node.ArcFaild = float64(msgStat.ErrorNum) / float64(msgStat.Total) node.ArcSuccess = 1 - node.ArcFaild } if cliStat, ok := cliMp[component.Component]; ok { node.Receive = 0 node.Send = cliStat.Total node.ArcFaild = float64(cliStat.ErrorNum) / float64(cliStat.Total) node.ArcSuccess = 1 - node.ArcFaild } nodes[component.Component] = node } dels := []string{} for sourceTarget, edge := range edges { source, target := s.getSourceTarget(sourceTarget) if component, ok := prevSvcToComponent[source]; ok { newEdges[s.getEdgeId(source, component)] = models.GraphEdge{ Source: source, Target: component, MainStat: edge.MainStat, SecondaryStat: edge.SecondaryStat, } } if component, ok := nextSvcToComponent[target]; ok { newEdges[s.getEdgeId(component, target)] = models.GraphEdge{ Source: component, Target: target, MainStat: edge.MainStat, SecondaryStat: edge.SecondaryStat, } } if prevSvcToComponent[source] != "" && nextSvcToComponent[target] != "" { // 如果组件既有上游节点又有下游节点, 则该组件为消息中间件, 删除原来两节点直连的边 dels = append(dels, sourceTarget) } } for _, key := range dels { delete(edges, key) } for key, edge := range newEdges { edges[key] = edge } return nodes, edges, nil } func (s *Service) databaseNodeMap(params *dto.ServiceGetEdgesReq) (map[string]dto.ServiceComponentStats, error) { list := []dto.ServiceComponentStats{} if err := s.ChOrm.Model(&models.Trace{}).Select(` SpanAttributes['db.system'] AS Component, COUNT() AS Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) AS ErrorNum `).Where("AppAlias", params.AppAlias). Where("Timestamp>=toDateTime(?) and Timestamp=toDateTime(?) and Timestamp=toDateTime(?) and TimestamptoDateTime(?) AND TimestamptoDateTime(?) AND Timestamp? AND SpanKind!='SPAN_KIND_INTERNAL' ORDER BY Timestamp ASC` rows, err := s.OlapConn.Query(ctx, sql, params.StartTime, params.EndTime, params.AppAlias, params.ServiceName, p90) if err != nil { s.Log.Errorf("执行sql失败:%s", err) return } var t time.Time var r string var d float64 var e bool for rows.Next() { err = rows.Scan(&t, &d, &e, &r) if err != nil { s.Log.Errorf("扫描行到变量失败:%s", err) break } estr := "success" if e { estr = "failed" } if _, ok := (*result)[r]; !ok { (*result)[r] = map[string][]models.CoordinatePoint{ "success": {}, "failed": {}, } } (*result)[r][estr] = append((*result)[r][estr], []any{t, d}) } return } func (s *Service) GetServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) { if params.EndTime == 0 { params.EndTime = time.Now().Unix() params.StartTime = params.EndTime - 5*60 } sql := fmt.Sprintf(`SELECT toStartOfMinute(toTimeZone(Timestamp,'Asia/Hong_Kong')) AS hr, COUNT(*), median(Duration/1e6), FROM otel.otel_traces Where ServiceName='%s' AND Timestamp > (NOW() - toIntervalHour(3)) GROUP BY hr ORDER BY hr ASC`, params.ServiceName) rows, err := s.OlapConn.Query(ctx, sql) if err != nil { s.Log.Errorf("select liveness service error:%s", err) return } var t time.Time var d float64 var c uint64 for rows.Next() { err = rows.Scan(&t, &c, &d) if err != nil { s.Log.Errorf("Scan rows errors:%s", err) break } *result = append(*result, []any{t, c, d}) } return } func (s *Service) CompareServiceLiveness(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) { startTime := time.Now().Add(-2 * time.Hour * time.Duration(params.HourNum)).Truncate(time.Hour) endTime := time.Now().Truncate(time.Hour).Add(time.Hour) list := []struct { Start string Total int64 }{} sql := `SELECT formatDateTime(StartTime, '%F %H:%i', 'PRC') as Start, countMerge(TraceNum) as Total FROM otel_traces_aggbysvc WHERE AppAlias=? AND ServiceName=? AND StartTime>=? AND StartTime=? and StartTime (NOW() - toIntervalHour(%d)) // GROUP BY hr // ORDER BY hr ASC` // if params.HourNum < 6 { // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2) // } else { // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2) // } // rows, err := s.OlapConn.Query(ctx, livenessSQL) // if err != nil { // s.Log.Errorf("select compare liveness service error:%s", err) // return // } // var t string // var d uint64 // for rows.Next() { // err = rows.Scan(&t, &d) // if err != nil { // s.Log.Errorf("Scan rows errors:%s", err) // break // } // *result = append(*result, [2]any{t, d}) // } // return } func (s *Service) CompareServiceErrors(ctx context.Context, params *dto.SpanScatterChartReq, result *[]models.CoordinatePoint) (err error) { db := s.ChOrm.Model(&models.TracesError{}). Where("AppAlias=? and ServiceName=? and Timestamp>now()-interval ? hour", params.AppAlias, params.ServiceName, params.HourNum*2) if params.HourNum == 1 { db.Select("formatDateTime(toStartOfMinute(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total") } else if params.HourNum <= 3 { db.Select("formatDateTime(toStartOfFiveMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total") } else if params.HourNum <= 6 { db.Select("formatDateTime(toStartOfTenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total") } else { db.Select("formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime, count() as Total") } list := []struct { StartTime string Total int64 }{} err = db.Group("StartTime").Order("StartTime ASC").Scan(&list).Error if err != nil { return } *result = make([]models.CoordinatePoint, 0, len(list)) for _, item := range list { *result = append(*result, []any{item.StartTime, item.Total}) } return // livenessSQL := `with (if(SpanAttributes['http.status_code']!='', SpanAttributes['http.status_code'], SpanAttributes['http.response.status_code'])) as HttpCode // SELECT // toString(%s(toTimeZone(Timestamp,'Asia/Hong_Kong'))) AS hr, // COUNT(*) // FROM otel.otel_traces // Where ServiceName='%s' // AND (HttpCode >= '400' OR StatusCode = 'STATUS_CODE_ERROR') // AND AppAlias = '%s' // AND Timestamp > (NOW() - toIntervalHour(%d)) // GROUP BY hr // ORDER BY hr ASC` // if params.HourNum < 6 { // livenessSQL = fmt.Sprintf(livenessSQL, ONEMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2) // } else { // livenessSQL = fmt.Sprintf(livenessSQL, FIFTYMINUTE, params.ServiceName, params.AppAlias, params.HourNum*2) // } // rows, err := s.OlapConn.Query(ctx, livenessSQL) // if err != nil { // s.Log.Errorf("select compare liveness service error:%s", err) // return // } // var t string // var d uint64 // for rows.Next() { // err = rows.Scan(&t, &d) // if err != nil { // s.Log.Errorf("Scan rows errors:%s", err) // break // } // *result = append(*result, [2]any{t, d}) // } // return } func (s *Service) Spans(req *dto.ServiceSpansReq, resp *[]dto.ServiceSpansResp, count *int64) error { req.CheckFilling(time.Minute * 5) list := []struct { Datetime time.Time `json:"datetime"` TraceId string `json:"trace_id"` SpanId string `json:"span_id"` ServiceName string `json:"service_name"` Method string `json:"method"` Code string `json:"code"` Duration float64 `json:"duration"` }{} db := s.ChOrm.Table(models.TableNameTrace). Select(`Timestamp as Datetime, TraceId,SpanId,ServiceName,IF(SpanAttributes['http.method'] != '', SpanAttributes['http.method'], IF(SpanAttributes['http.request.method'] != '', SpanAttributes['http.request.method'], IF(SpanAttributes['rpc.system'] != '', SpanAttributes['rpc.system'], IF(SpanAttributes['db.system'] != '', SpanAttributes['db.system'], SpanAttributes['messaging.system'] ) ) ) ) AS Method, IF(SpanAttributes['http.status_code']>='200', SpanAttributes['http.status_code'], IF(SpanAttributes['http.response.status_code'] >= '200',SpanAttributes['http.response.status_code'], IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')] != '', SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')], IF(SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')] != '', SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')], IF(SpanAttributes['messaging.operation']!='', SpanAttributes['messaging.operation'], IF(SpanAttributes['db.operation'] != '',SpanAttributes['db.operation'], SpanName) ) ) ) ) ) AS Code, Duration `, ). Where("AppAlias", req.AppAlias). Where("Timestamp>=toDateTime(?) AND Timestamp 1 { db.Where("ServiceName IN ?", req.ServiceName) } if req.TraceId != "" { db.Where("TraceId", req.TraceId) } if req.MaxDuration > 0 { db.Where("Duration 0 { db.Where("Duration>=?", req.MinDuration*1e6) } if req.SpanKind != "" { db.Where("SpanKind", req.SpanKind) } if req.SpanAttributeKey != "" { db.Where(fmt.Sprintf("SpanAttributes['%s']", req.SpanAttributeKey), req.SpanAttributeValue) } if req.OnlyDatabase { db.Where("SpanAttributes['db.system'] != ?", "") } if req.SpanName != "" { db.Where("SpanName", req.SpanName) } if req.OnlyException { // db.Where(`StatusCode=? OR // SpanAttributes['http.status_code']>=? OR SpanAttributes['http.response.status_code']>=? OR // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.status_code')]>? OR // SpanAttributes[concat('rpc.', SpanAttributes['rpc.system'], '.error_code')]!=? // `, "STATUS_CODE_ERROR", "400", "400", "0", "") // 简化查询, 加速仅异常条件慢问题,这么写也能查到,只是查到的http类错误都为span kind client, 没关系,因为都要点进去看的 db.Where("StatusCode", "STATUS_CODE_ERROR") } if req.RequestMethod != "" { db.Where(`SpanAttributes['http.method'] = ? OR SpanAttributes['http.request.method'] = ? OR SpanAttributes['rpc.system'] = ? OR SpanAttributes['db.system'] = ? OR SpanAttributes['messaging.system'] = ?`, req.RequestMethod, req.RequestMethod, req.RequestMethod, req.RequestMethod, req.RequestMethod) } order := req.OrderBy([]string{"Duration", "Timestamp"}, "Duration", "DESC") if err := db.Order(order).Find(&list).Limit(-1).Offset(-1).Count(count).Error; err != nil { return errors.Wrap(err, "获取service相关span失败") } *resp = make([]dto.ServiceSpansResp, len(list)) for i, item := range list { // (*resp)[i].Duration /= 1e6 (*resp)[i].Datetime = item.Datetime.Local().Format(time.DateTime) (*resp)[i].Timestamp = item.Datetime.Unix() (*resp)[i].TraceId = item.TraceId (*resp)[i].SpanId = item.SpanId (*resp)[i].Method = item.Method (*resp)[i].Code = item.Code (*resp)[i].ServiceName = item.ServiceName (*resp)[i].Duration = item.Duration / 1e6 } return nil } func (s *Service) GenService(resp *dto.ServiceJobGenServiceResp) error { defer s.genSingleNodeService(resp) list := []struct { TargetService string SourceService string TargetAlias string SourceAlias string SourceIcon string TargetIcon string }{} now := time.Now() // start, end := now.Add(-time.Hour), now //数据量太大,调整为分钟级,跨度大于增加的数据量即可 start, end := now.Add(-5*time.Minute), now sub1 := s.ChOrm.Table(models.TableNameTrace). Select("SpanId, ParentSpanId, ServiceName, AppAlias, ResourceAttributes['telemetry.sdk.language'] SdkLang"). Where("Timestamp>=toDateTime(?) AND Timestamp=toDateTime(?) AND Timestamp=toDateTime(?) AND Timestamp=toDateTime(?) AND Timestamp=toDateTime(?)", start.Add(-10*time.Minute).Unix()).Where("TraceId IN ?", traceIds). Where("AppAlias", appAlias). Group("TraceId").Having("Total", 1) if err := s.ChOrm.Table("(?) as t", sub).Distinct("ServiceName, SpanKind, SdkLang").Find(&list).Error; err != nil { return errors.Wrap(err, "获取单服务结点信息失败") } nodes := map[string]models.ServiceNode{} edges := []models.ServiceEdge{} for _, item := range list { num := int64(0) s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", appAlias, item.ServiceName).Count(&num) if num > 0 { continue } // 如果 ot_service_node 表中不存在, 说明这个服务结点未插入过,且属于单服务结点 key := fmt.Sprintf("%s-%s", item.ServiceName, appAlias) appId, err := query.NewApp().Alias2ID(appAlias) if err != nil { continue } if item.SpanKind == "SPAN_KIND_SERVER" { nodes[key] = models.ServiceNode{ AppID: int32(appId), AppAlias: appAlias, Name: item.ServiceName, ServiceName: item.ServiceName, Kind: item.SdkLang, CreatedAt: now, UpdatedAt: now, } edges = append(edges, models.ServiceEdge{ AppAlias: appAlias, Source: "InCloud", Target: item.ServiceName, SourceIcon: "cloud", TargetIcon: item.SdkLang, }) } else if item.SpanKind == "SPAN_KIND_CLIENT" { nodes[key] = models.ServiceNode{ AppID: int32(appId), AppAlias: appAlias, Name: item.ServiceName, ServiceName: item.ServiceName, Kind: item.SdkLang, CreatedAt: now, UpdatedAt: now, } edges = append(edges, models.ServiceEdge{ AppAlias: appAlias, Source: item.ServiceName, Target: "OutCloud", SourceIcon: item.SdkLang, TargetIcon: "cloud", }) } } nodeList := make([]models.ServiceNode, 0, len(nodes)) for _, node := range nodes { nodeList = append(nodeList, node) } // fmt.Println("nodelist: ", nodeList) // s.genVirtualService(&nodeList, &edges) mu.Lock() newNodes := s.getNewNodes(nodeList) s.insertNewNodes(newNodes, resp) newEdges := s.getNewEdges(edges) s.insertNewEdges(newEdges, resp) mu.Unlock() return nil }(&wg, appAlias) } wg.Wait() return nil } // 插入结点数据 func (s *Service) insertNewNodes(newNodes []models.ServiceNode, resp *dto.ServiceJobGenServiceResp) error { if len(newNodes) > 0 { errs := "" result := s.Orm.Model(&models.ServiceNode{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newNodes) if result.Error != nil { errs = result.Error.Error() } resp.NodeResult = dto.CreateResult{ Error: errs, RowsAffected: result.RowsAffected + resp.NodeResult.RowsAffected, Total: len(newNodes) + resp.NodeResult.Total, } } return nil } // 插入边数据 func (s *Service) insertNewEdges(newEdges []models.ServiceEdge, resp *dto.ServiceJobGenServiceResp) error { if len(newEdges) > 0 { errs := "" result := s.Orm.Model(&models.ServiceEdge{}).Clauses(clause.OnConflict{DoNothing: true}).Create(&newEdges) if result.Error != nil { errs = result.Error.Error() } resp.EdgeResult = dto.CreateResult{ Error: errs, RowsAffected: result.RowsAffected + resp.EdgeResult.RowsAffected, Total: len(newEdges) + resp.EdgeResult.Total, } } return nil } // 获取服务结点数据 用于在边数据不存在的情况下 func (s *Service) getNewNodesWithoutEdges() ([]models.ServiceNode, error) { list := []struct { ServiceName string AppAlias string SdkLang string }{} if err := s.ChOrm.Table(models.TableNameTrace). Select("ServiceName, AppAlias, any(ResourceAttributes['telemetry.sdk.language']) as SdkLang"). Where("Timestamp>=now()-interval 5 minute"). Group("ServiceName, AppAlias").Find(&list).Error; err != nil { return []models.ServiceNode{}, errors.Wrap(err, "获取服务结点失败") } nodes := make([]models.ServiceNode, 0, len(list)) for _, item := range list { if appId, err := query.NewApp().Alias2ID(item.AppAlias); err == nil { nodes = append(nodes, models.ServiceNode{ AppID: int32(appId), AppAlias: item.AppAlias, Name: item.ServiceName, ServiceName: item.ServiceName, Type: 1, // 仅有一个结点,肯定是对内;此处有个问题,就是后期如果对外了, 没有地方可以更新 Kind: item.SdkLang, }) } } newNodes := s.getNewNodes(nodes) return newNodes, nil } // 过滤nodes中的结点, 只要数据库中不存在的node数据 func (s *Service) getNewNodes(nodes []models.ServiceNode) []models.ServiceNode { newNodes := make([]models.ServiceNode, 0) sq := query.NewService() for _, node := range nodes { has, _ := sq.HasNode(node.AppAlias, node.ServiceName) if !has { newNodes = append(newNodes, node) } } return newNodes } func (s *Service) getNewEdges(edges []models.ServiceEdge) []models.ServiceEdge { newEdges := make([]models.ServiceEdge, 0) sq := query.NewService() for _, edge := range edges { has, _ := sq.HasEdge(edge.AppAlias, edge.Source, edge.Target) if !has { newEdges = append(newEdges, edge) } } return newEdges } // 生成 virtual Service func (s *Service) genVirtualService(nodes *[]models.ServiceNode, edges *[]models.ServiceEdge) { uniq := func(a, b string) string { return fmt.Sprintf("%s-%s", a, b) } nodeMap := map[string]models.ServiceNode{} for _, node := range *nodes { nodeMap[uniq(node.AppAlias, node.ServiceName)] = node } sources, targets := map[string]struct{}{}, map[string]struct{}{} for _, edge := range *edges { sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target) if _, ok := nodeMap[sk]; ok { sources[sk] = struct{}{} } if _, ok := nodeMap[tk]; ok { targets[tk] = struct{}{} } } // 找到所有的root和leaf 结点 rootNodes, leafNodes := map[string]models.ServiceNode{}, map[string]models.ServiceNode{} for _, edge := range *edges { sk, tk := uniq(edge.AppAlias, edge.Source), uniq(edge.AppAlias, edge.Target) // 如果source在targets中不存在,说明source是root if _, ok := targets[sk]; !ok { rootNodes[sk] = nodeMap[sk] // rootNodes = append(rootNodes, nodeMap[sk]) } // 如果target在sources中不存在,说明target是leaf if _, ok := sources[tk]; !ok { leafNodes[tk] = nodeMap[tk] // leafNodes = append(leafNodes, nodeMap[tk]) } } // 对于每个root结点, 检查它对应的root span, 如果root span的span kind为server类型, 则需要在它的上层添加一个virtual service for _, node := range rootNodes { spans := []models.Trace{} if err := s.ChOrm.Table(models.TableNameTrace). Where("Timestamp>=NOW()-INTERVAL 1 HOUR"). Where("ParentSpanId=''"). Where("SpanKind='SPAN_KIND_SERVER'"). Where("ServiceName", node.ServiceName). Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil { continue } if len(spans) <= 0 { continue } vnode := models.ServiceNode{ AppID: node.AppID, AppAlias: node.AppAlias, ServiceName: fmt.Sprintf("%s:frontend", node.ServiceName), Type: 1, // 对内服务 } vnode.Name = vnode.ServiceName vedge := models.ServiceEdge{ AppAlias: node.AppAlias, Source: vnode.ServiceName, Target: node.ServiceName, SourceIcon: "", } vedge.SourceIcon = "cloud" vedge.Source = "InCloud" // 所有virtual source都写作InCloud for _, span := range spans { spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute) if spanAttrs.UserAgent() != "" && vnode.Name == "" { vnode.Name = "Browser" } if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok && vedge.TargetIcon == "" { vedge.TargetIcon = lang } if vnode.Name != "" && vedge.TargetIcon != "" { break } } // *nodes = append(*nodes, vnode) *edges = append(*edges, vedge) } // 对于每个leaf结点, 检查它对应的span中,是否有span kind为client且为db相关的 for _, node := range leafNodes { spans := []models.Trace{} if err := s.ChOrm.Table(models.TableNameTrace). Where("Timestamp>=NOW()-INTERVAL 1 HOUR"). Where("SpanKind='SPAN_KIND_CLIENT' AND (SpanAttributes['db.system'] != '' OR SpanAttributes['db.type'] != '')"). Where("ServiceName", node.ServiceName). Where("AppAlias", node.AppAlias).Limit(100).Find(&spans).Error; err != nil { continue } if len(spans) <= 0 { continue } vnode := models.ServiceNode{ AppID: node.AppID, AppAlias: node.AppAlias, ServiceName: "", Type: 1, // 对内服务 } vedge := models.ServiceEdge{ AppAlias: node.AppAlias, Source: node.ServiceName, } for _, span := range spans { spanAttrs := opentelemetry.NewSpanAttributes(span.SpanAttribute) if vnode.ServiceName == "" && spanAttrs.IsDB() { vnode.ServiceName = spanAttrs.DbSystem() } if lang, ok := span.ResourceAttribute["telemetry.sdk.language"]; ok { vedge.SourceIcon = lang } if vnode.ServiceName != "" && vedge.SourceIcon != "" { break } } // if vnode.ServiceName == "" { // vnode.ServiceName = "db:unknown" // vedge.TargetIcon = "database" // } else { // vedge.TargetIcon = vnode.ServiceName // } vedge.TargetIcon = "cloud" vnode.Name = vnode.ServiceName // vedge.Target = vnode.ServiceName vedge.Target = "OutCloud" // *nodes = append(*nodes, vnode) // virtual service node不加入node表 *edges = append(*edges, vedge) } } func (s *Service) List(req *dto.ServiceListReq, resp *[]dto.ServiceListResp, total *int64) error { appAlias := "" err := s.Orm.Model(&models.App{}).Where("id", req.AppId).Pluck("alias", &appAlias).Error if err != nil { return errors.Wrap(err, "未获取到app alias") } pageScopes := cDto.Paginate(req.GetPageSize(), req.GetPageIndex()) result := s.Orm.Model(&models.ServiceNode{}).Where("app_alias", appAlias).Scopes(pageScopes).Find(&resp) if result.Error != nil { return errors.Wrap(result.Error, "获取服务列表失败") } if err := result.Limit(-1).Offset(-1).Count(total).Error; err != nil { return errors.Wrap(err, "获取服务总数量失败") } return nil } func (s *Service) ListNoAppAlias(req *dto.ServiceListNoAppAliasReq, resp *[]dto.ServiceListNoAppAliasResp, total *int64) error { req.CheckFilling(time.Minute * 5) if err := s.ChOrm.Model(&models.Trace{}). Select("ServiceName, COUNT() as SpanTotal, COUNT(DISTINCT TraceId) as TraceTotal"). Where("Timestamp>=toDateTime(?) and Timestamp 0 { err = s.Orm.Find(&node, req.ID).Error } else { err = s.Orm.Model(&models.ServiceNode{}).Where("app_alias=? and service_name=?", req.AppAlias, req.ServiceName).Find(&node).Error } if err != nil { return errors.Wrap(err, "查询服务结点失败") } if node.AppAlias == "" || node.ServiceName == "" { return errors.New("缺少必要参数 app_alias, service_name") } req.CheckFilling(time.Minute * 5) if err := s.ChOrm.Model(&models.Trace{}). Select("ServiceName, count() Total, SUM(IF(StatusCode='STATUS_CODE_ERROR', 1, 0)) as ErrorNum, max(Duration)/1e6 as Max, avg(Duration)/1e6 as Avg"). Where("AppAlias", node.AppAlias). Where("ServiceName", node.ServiceName). Where("Timestamp>toDateTime(?) and Timestamp= 60*60 { timeField = "formatDateTime(toStartOfFifteenMinutes(Timestamp), '%F %H:%i', 'PRC') as StartTime" } fields := []string{ timeField, "quantile(0.5)(Duration)/1e6 as P50Duration", "quantile(0.90)(Duration)/1e6 as P90Duration", "quantile(0.99)(Duration)/1e6 as P99Duration", } quantiles := []struct { StartTime string P50Duration float64 P90Duration float64 P99Duration float64 }{} if err := s.ChOrm.Model(&models.Trace{}).Select(fields). Where("AppAlias", node.AppAlias). Where("ServiceName", node.ServiceName). Where("Timestamp>toDateTime(?) and Timestamp 0 { resp.ErrorRate = errorRate // 错误率的计算公式可能有问题 resp.ErrorNum = int64(errorRate * float64(resp.Total)) } resp.Quantiles.Time = times resp.Quantiles.P50 = p50s resp.Quantiles.P90 = p90s resp.Quantiles.P99 = p99s return nil } // 数字视图处理逻辑 func (s *Service) Digits(req *dto.ServiceDigitsReq, resp *dto.ServiceDigitsResp) error { key := fmt.Sprintf("observe__service_digits_%s_%s", req.AppAlias, req.ServiceName) if req.ServiceName == "" { key = fmt.Sprintf("observe__service_digits_%s", req.AppAlias) } rdb := config.GetRedisClient() res, err := rdb.HGetAll(key).Result() if err == nil { // 由于当数据不存在时,不会写缓存,所以这里数字视图的缓存数据有可能不存在,属于正常情况 // if len(res) == 0 { // return errors.New("数字视图缓存数据不存在,请稍后再试") // } resp.Span, _ = strconv.ParseFloat(res["span"], 64) resp.Span = math.Round(resp.Span*100) / 100 resp.Trace, _ = strconv.ParseFloat(res["trace"], 64) resp.Trace = math.Round(resp.Trace*100) / 100 resp.Http, _ = strconv.ParseFloat(res["http"], 64) resp.Http = math.Round(resp.Http*100) / 100 resp.Rpc, _ = strconv.ParseFloat(res["rpc"], 64) resp.Rpc = math.Round(resp.Rpc*100) / 100 resp.DB, _ = strconv.ParseFloat(res["db"], 64) resp.DB = math.Round(resp.DB*100) / 100 resp.Error, _ = strconv.ParseFloat(res["error"], 64) resp.Error = math.Round(resp.Error*100) / 100 return nil } return errors.Wrap(err, "数字视图缓存获取失败") } // 服务数字视图生成, 生成最近1小时内,每分钟的数据 func (s *Service) DigitsGen() error { digits := []struct { AppAlias string ServiceName string Span float64 Trace float64 Http float64 Rpc float64 Error float64 }{} result := s.ChOrm.Table(models.TableNameTrace).Select([]string{ "AppAlias AS AppAlias", "ServiceName AS ServiceName", "COUNT()/60 AS Span", "COUNT(DISTINCT TraceId)/60 AS Trace", "SUM(if(SpanKind='SPAN_KIND_SERVER' AND (SpanAttributes['http.status_code']>='200' OR SpanAttributes['http.response.status_code']>='200'), 1, 0))/60 AS Http", "SUM(if(SpanKind='SPAN_KIND_SERVER' AND SpanAttributes['rpc.system']!='', 1, 0))/60 AS Rpc", "SUM(if(SpanAttributes['db.type']='' OR SpanAttributes['db.system']!='', 1, 0))/60 AS Db", "SUM(if(StatusCode='STATUS_CODE_ERROR' OR SpanAttributes['http.status_code']>='400' OR SpanAttributes['http.response.status_code']>='400', 1, 0))/60 AS Error", }).Where("Timestamp>=NOW()-INTERVAL 1 HOUR").Group("AppAlias, ServiceName").Find(&digits) if result.Error != nil { return errors.Wrap(result.Error, "获取数字视图数据失败") } rdb := config.GetRedisClient() pipe := rdb.Pipeline() aliasDigits := map[string]map[string]int{} for _, digit := range digits { key := fmt.Sprintf("observe__service_digits_%s_%s", digit.AppAlias, digit.ServiceName) pipe.HSet(key, map[string]interface{}{ "span": digit.Span, "trace": digit.Trace, "http": digit.Http, "rpc": digit.Rpc, "error": digit.Error, }) pipe.Expire(key, time.Hour) // 缓存1小时 // 计算 每个应用下 所有service的数字视图数据 key = fmt.Sprintf("observe__service_digits_%s", digit.AppAlias) if _, ok := aliasDigits[key]; !ok { aliasDigits[key] = map[string]int{} } aliasDigits[key]["span"] += int(digit.Span) aliasDigits[key]["trace"] += int(digit.Trace) aliasDigits[key]["http"] += int(digit.Http) aliasDigits[key]["rpc"] += int(digit.Rpc) aliasDigits[key]["error"] += int(digit.Error) } for key, digit := range aliasDigits { pipe.HSet(key, map[string]interface{}{ "span": digit["span"], "trace": digit["trace"], "http": digit["http"], "rpc": digit["rpc"], "error": digit["error"], }) } pipe.Exec() return nil }