package apis import ( "context" "fmt" "go-admin/app/observe/models" "go-admin/app/observe/service" "go-admin/app/observe/service/dto" "go-admin/common/websocket" "go-admin/config" "go-admin/utils" "net/http" "time" "github.com/go-admin-team/go-admin-core/logger" ws "github.com/gorilla/websocket" "github.com/gin-gonic/gin" ) type Service struct { utils.OtApi } func (e Service) List(c *gin.Context) { req := new(dto.ServiceListReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } resp := make([]dto.ServiceListResp, 0) total := int64(0) if err := svc.List(req, &resp, &total); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.PageOK(resp, int(total), req.PageIndex, req.PageSize, "success") } func (e Service) ListNoAppAlias(c *gin.Context) { req := new(dto.ServiceListNoAppAliasReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } resp := make([]dto.ServiceListNoAppAliasResp, 0) total := int64(0) if err := svc.ListNoAppAlias(req, &resp, &total); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.PageOK(resp, int(total), req.GetPageIndex(), req.GetPageSize(), "success") } func (e Service) Update(c *gin.Context) { req := new(dto.ServiceUpdateReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } if err := svc.Update(req); err != nil { e.Error(500, err, fmt.Sprintf("修改Service失败,\r\n失败信息 %s", err.Error())) return } e.OK(req.GetId(), "修改成功") } func (e Service) Stats(c *gin.Context) { req := new(dto.ServiceStatsReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } resp := new(dto.ServiceStatsResp) if config.ExtConfig.ClickhouseMetrics { if err := svc.StatsFromClickhouse(req, resp); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } } else { if err := svc.Stats(req, resp); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } } e.OK(resp, "success") } // Show 服务拓扑 func (e Service) Show(c *gin.Context) { req := new(dto.ServiceGetEdgesReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } // err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors // if err != nil { // e.Error(http.StatusInternalServerError, err, err.Error()) // return // } if req.StartTime == 0 && req.EndTime == 0 { // 兼容前端 req.EndTime = time.Now().Unix() req.StartTime = req.EndTime - 600 // 10分钟 } resp := new(models.Graph) if err := svc.GetGraph(c, req, resp); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.OK(resp, "success") } // SpanScatterChart 点击服务拓扑结点时,左侧展示的散点图 func (e Service) SpanScatterChart(c *gin.Context) { req := new(dto.SpanScatterChartReq) svc := new(service.Service) if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } result := make(models.ScatterChart) if err := svc.GetSpanScatterChart(c, req, &result); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.OK(result, "success") } // ServiceLiveness 服务活跃度 func (e Service) ServiceLiveness(c *gin.Context) { req := new(dto.SpanScatterChartReq) svc := new(service.Service) if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } result := make([]models.CoordinatePoint, 0) if err := svc.GetServiceLiveness(c, req, &result); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.OK(result, "success") } func (e Service) CompareServiceLiveness(c *gin.Context) { req := new(dto.SpanScatterChartReq) svc := new(service.Service) if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } tmpResult := make([]models.CoordinatePoint, 0) if err := svc.CompareServiceLiveness(c, req, &tmpResult); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } length := len(tmpResult) / 2 * 2 halfInedx := len(tmpResult) / 2 tmpResult = tmpResult[:length] timeList := make([]any, halfInedx) ct := make([]any, halfInedx) fd := make([]any, halfInedx) for i := 0; i < halfInedx; i++ { fd[i] = tmpResult[i][1] ct[i] = tmpResult[i+halfInedx][1] timeList[i] = tmpResult[i+halfInedx][0] } result := make(map[string][]any) result["time"] = timeList result["current"] = ct result["forward"] = fd e.OK(result, "success") } func (e Service) CompareServiceErrors(c *gin.Context) { req := new(dto.SpanScatterChartReq) svc := new(service.Service) if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } tmpResult := make([]models.CoordinatePoint, 0) if err := svc.CompareServiceErrors(c, req, &tmpResult); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } length := len(tmpResult) / 2 * 2 halfInedx := len(tmpResult) / 2 tmpResult = tmpResult[:length] timeList := make([]any, halfInedx) ct := make([]any, halfInedx) fd := make([]any, halfInedx) for i := 0; i < halfInedx; i++ { fd[i] = tmpResult[i][1] ct[i] = tmpResult[i+halfInedx][1] timeList[i] = tmpResult[i+halfInedx][0] } result := make(map[string][]any) result["time"] = timeList result["current"] = ct result["forward"] = fd e.OK(result, "success") } // Spans span列表 func (e Service) Spans(c *gin.Context) { req := new(dto.ServiceSpansReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } resp := make([]dto.ServiceSpansResp, 0) count := int64(0) if err := svc.Spans(req, &resp, &count); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.PageOK(resp, int(count), req.GetPageIndex(), req.GetPageSize(), "success") } // 生成service graph点边数据 func (sj Service) GenGraph(c *gin.Context) { svc := service.Service{} sj.MakeContext(c).MakeDB().MakeService(&svc.OtService) resp := new(dto.ServiceJobGenServiceResp) if err := svc.GenService(resp); err != nil { sj.Error(http.StatusInternalServerError, err, err.Error()) return } sj.OK(resp, "success") } // 数字视图 func (e Service) Digits(c *gin.Context) { req := new(dto.ServiceDigitsReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } resp := new(dto.ServiceDigitsResp) if err := svc.Digits(req, resp); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.OK(resp, "success") } // 数字视图生成 func (e Service) DigitsGen(c *gin.Context) { svc := service.Service{} e.MakeContext(c).MakeDB().MakeService(&svc.OtService) if err := svc.DigitsGen(); err != nil { e.Error(http.StatusInternalServerError, err, err.Error()) return } e.OK("", "success") } // 拓扑图 // 待优化的点: // // 由于多个请求会产生多个连接,一个连接每次会产生多次数据库查询,连接越多数据库查询越多 // 改成 一个业务对应一组数据库查询,多个连接过来时,如果是同一个业务,使用同一个业务的查询结果 // 即使连接很多,也不会对数据库造成压力,数据库的查询只与业务数量相关 func (e Service) Topology(c *gin.Context) { req := new(dto.ServiceGetEdgesReq) svc := new(service.Service) if err := e.Init(c, req, &svc.OtService); err != nil { return } timeout := 5 * time.Minute // timeout = 30 * time.Second ctx, cancel := context.WithTimeout(c.Request.Context(), timeout) defer cancel() conn, _ := websocket.Conn(c) go func() { for { // 从客户端读取数据 msgType, msg, err := conn.ReadMessage() if err != nil { logger.Errorf("读取客户端消息失败: %s", err) cancel() return } if msgType == ws.CloseMessage { logger.Infof("收到客户端发送的断开连接消息: %d %s", msgType, msg) cancel() return } // 如果是其它类型的消息,暂时不处理,未与前端约定 logger.Info("客户端正常消息:%s", msg) } }() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() first := true for { sgb := new(service.ScopeGraphBasic) resp := new(models.Graph) select { case <-ticker.C: // 返回正常从库中读取的数据 sgb.UpdateGraph(svc, c, req, resp) // conn.WriteMessage(ws.TextMessage, []byte("normal message...")) conn.WriteJSON(*sgb) logger.Infof("ticker 5s...") case <-ctx.Done(): logger.Warnf("客户端已关闭,或超过最大连接时间(%.2fs)", timeout.Seconds()) return default: // 返回初使值 if first { sgb.AddNoSoulGraph(svc, c, req, resp) conn.WriteJSON(*sgb) if req.Live { req.EndTime = 0 req.StartTime = 0 req.CheckFilling(3 * time.Minute) } resp := new(models.Graph) sgb = new(service.ScopeGraphBasic) sgb.UpdateGraph(svc, c, req, resp) conn.WriteJSON(*sgb) first = false } time.Sleep(100 * time.Microsecond) } } }