123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- package apis
- import (
- "context"
- "fmt"
- "go-admin/app/observe/models"
- "go-admin/app/observe/service"
- "go-admin/app/observe/service/dto"
- "go-admin/common/websocket"
- "go-admin/config"
- "go-admin/utils"
- "net/http"
- "time"
- "github.com/go-admin-team/go-admin-core/logger"
- ws "github.com/gorilla/websocket"
- "github.com/gin-gonic/gin"
- )
- type Service struct {
- utils.OtApi
- }
- func (e Service) List(c *gin.Context) {
- req := new(dto.ServiceListReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- resp := make([]dto.ServiceListResp, 0)
- total := int64(0)
- if err := svc.List(req, &resp, &total); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.PageOK(resp, int(total), req.PageIndex, req.PageSize, "success")
- }
- func (e Service) ListNoAppAlias(c *gin.Context) {
- req := new(dto.ServiceListNoAppAliasReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- resp := make([]dto.ServiceListNoAppAliasResp, 0)
- total := int64(0)
- if err := svc.ListNoAppAlias(req, &resp, &total); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.PageOK(resp, int(total), req.GetPageIndex(), req.GetPageSize(), "success")
- }
- func (e Service) Update(c *gin.Context) {
- req := new(dto.ServiceUpdateReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- if err := svc.Update(req); err != nil {
- e.Error(500, err, fmt.Sprintf("修改Service失败,\r\n失败信息 %s", err.Error()))
- return
- }
- e.OK(req.GetId(), "修改成功")
- }
- func (e Service) Stats(c *gin.Context) {
- req := new(dto.ServiceStatsReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- resp := new(dto.ServiceStatsResp)
- if config.ExtConfig.ClickhouseMetrics {
- if err := svc.StatsFromClickhouse(req, resp); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- } else {
- if err := svc.Stats(req, resp); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- }
- e.OK(resp, "success")
- }
- // Show 服务拓扑
- func (e Service) Show(c *gin.Context) {
- req := new(dto.ServiceGetEdgesReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- // err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors
- // if err != nil {
- // e.Error(http.StatusInternalServerError, err, err.Error())
- // return
- // }
- if req.StartTime == 0 && req.EndTime == 0 { // 兼容前端
- req.EndTime = time.Now().Unix()
- req.StartTime = req.EndTime - 600 // 10分钟
- }
- resp := new(models.Graph)
- if err := svc.GetGraph(c, req, resp); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.OK(resp, "success")
- }
- // SpanScatterChart 点击服务拓扑结点时,左侧展示的散点图
- func (e Service) SpanScatterChart(c *gin.Context) {
- req := new(dto.SpanScatterChartReq)
- svc := new(service.Service)
- if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- result := make(models.ScatterChart)
- if err := svc.GetSpanScatterChart(c, req, &result); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.OK(result, "success")
- }
- // ServiceLiveness 服务活跃度
- func (e Service) ServiceLiveness(c *gin.Context) {
- req := new(dto.SpanScatterChartReq)
- svc := new(service.Service)
- if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- result := make([]models.CoordinatePoint, 0)
- if err := svc.GetServiceLiveness(c, req, &result); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.OK(result, "success")
- }
- func (e Service) CompareServiceLiveness(c *gin.Context) {
- req := new(dto.SpanScatterChartReq)
- svc := new(service.Service)
- if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- tmpResult := make([]models.CoordinatePoint, 0)
- if err := svc.CompareServiceLiveness(c, req, &tmpResult); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- length := len(tmpResult) / 2 * 2
- halfInedx := len(tmpResult) / 2
- tmpResult = tmpResult[:length]
- timeList := make([]any, halfInedx)
- ct := make([]any, halfInedx)
- fd := make([]any, halfInedx)
- for i := 0; i < halfInedx; i++ {
- fd[i] = tmpResult[i][1]
- ct[i] = tmpResult[i+halfInedx][1]
- timeList[i] = tmpResult[i+halfInedx][0]
- }
- result := make(map[string][]any)
- result["time"] = timeList
- result["current"] = ct
- result["forward"] = fd
- e.OK(result, "success")
- }
- func (e Service) CompareServiceErrors(c *gin.Context) {
- req := new(dto.SpanScatterChartReq)
- svc := new(service.Service)
- if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- tmpResult := make([]models.CoordinatePoint, 0)
- if err := svc.CompareServiceErrors(c, req, &tmpResult); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- length := len(tmpResult) / 2 * 2
- halfInedx := len(tmpResult) / 2
- tmpResult = tmpResult[:length]
- timeList := make([]any, halfInedx)
- ct := make([]any, halfInedx)
- fd := make([]any, halfInedx)
- for i := 0; i < halfInedx; i++ {
- fd[i] = tmpResult[i][1]
- ct[i] = tmpResult[i+halfInedx][1]
- timeList[i] = tmpResult[i+halfInedx][0]
- }
- result := make(map[string][]any)
- result["time"] = timeList
- result["current"] = ct
- result["forward"] = fd
- e.OK(result, "success")
- }
- // Spans span列表
- func (e Service) Spans(c *gin.Context) {
- req := new(dto.ServiceSpansReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- resp := make([]dto.ServiceSpansResp, 0)
- count := int64(0)
- if err := svc.Spans(req, &resp, &count); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.PageOK(resp, int(count), req.GetPageIndex(), req.GetPageSize(), "success")
- }
- // 生成service graph点边数据
- func (sj Service) GenGraph(c *gin.Context) {
- svc := service.Service{}
- sj.MakeContext(c).MakeDB().MakeService(&svc.OtService)
- resp := new(dto.ServiceJobGenServiceResp)
- if err := svc.GenService(resp); err != nil {
- sj.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- sj.OK(resp, "success")
- }
- // 数字视图
- func (e Service) Digits(c *gin.Context) {
- req := new(dto.ServiceDigitsReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- resp := new(dto.ServiceDigitsResp)
- if err := svc.Digits(req, resp); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.OK(resp, "success")
- }
- // 数字视图生成
- func (e Service) DigitsGen(c *gin.Context) {
- svc := service.Service{}
- e.MakeContext(c).MakeDB().MakeService(&svc.OtService)
- if err := svc.DigitsGen(); err != nil {
- e.Error(http.StatusInternalServerError, err, err.Error())
- return
- }
- e.OK("", "success")
- }
- // 拓扑图
- // 待优化的点:
- //
- // 由于多个请求会产生多个连接,一个连接每次会产生多次数据库查询,连接越多数据库查询越多
- // 改成 一个业务对应一组数据库查询,多个连接过来时,如果是同一个业务,使用同一个业务的查询结果
- // 即使连接很多,也不会对数据库造成压力,数据库的查询只与业务数量相关
- func (e Service) Topology(c *gin.Context) {
- req := new(dto.ServiceGetEdgesReq)
- svc := new(service.Service)
- if err := e.Init(c, req, &svc.OtService); err != nil {
- return
- }
- timeout := 5 * time.Minute
- // timeout = 30 * time.Second
- ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
- defer cancel()
- conn, _ := websocket.Conn(c)
- go func() {
- for {
- // 从客户端读取数据
- msgType, msg, err := conn.ReadMessage()
- if err != nil {
- logger.Errorf("读取客户端消息失败: %s", err)
- cancel()
- return
- }
- if msgType == ws.CloseMessage {
- logger.Infof("收到客户端发送的断开连接消息: %d %s", msgType, msg)
- cancel()
- return
- }
- // 如果是其它类型的消息,暂时不处理,未与前端约定
- logger.Info("客户端正常消息:%s", msg)
- }
- }()
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
- first := true
- for {
- sgb := new(service.ScopeGraphBasic)
- resp := new(models.Graph)
- select {
- case <-ticker.C:
- // 返回正常从库中读取的数据
- sgb.UpdateGraph(svc, c, req, resp)
- // conn.WriteMessage(ws.TextMessage, []byte("normal message..."))
- conn.WriteJSON(*sgb)
- logger.Infof("ticker 5s...")
- case <-ctx.Done():
- logger.Warnf("客户端已关闭,或超过最大连接时间(%.2fs)", timeout.Seconds())
- return
- default:
- // 返回初使值
- if first {
- sgb.AddNoSoulGraph(svc, c, req, resp)
- conn.WriteJSON(*sgb)
- if req.Live {
- req.EndTime = 0
- req.StartTime = 0
- req.CheckFilling(3 * time.Minute)
- }
- resp := new(models.Graph)
- sgb = new(service.ScopeGraphBasic)
- sgb.UpdateGraph(svc, c, req, resp)
- conn.WriteJSON(*sgb)
- first = false
- }
- time.Sleep(100 * time.Microsecond)
- }
- }
- }
|