service.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package apis
  2. import (
  3. "context"
  4. "fmt"
  5. "go-admin/app/observe/models"
  6. "go-admin/app/observe/service"
  7. "go-admin/app/observe/service/dto"
  8. "go-admin/common/websocket"
  9. "go-admin/config"
  10. "go-admin/utils"
  11. "net/http"
  12. "time"
  13. "github.com/go-admin-team/go-admin-core/logger"
  14. ws "github.com/gorilla/websocket"
  15. "github.com/gin-gonic/gin"
  16. )
  17. type Service struct {
  18. utils.OtApi
  19. }
  20. func (e Service) List(c *gin.Context) {
  21. req := new(dto.ServiceListReq)
  22. svc := new(service.Service)
  23. if err := e.Init(c, req, &svc.OtService); err != nil {
  24. return
  25. }
  26. resp := make([]dto.ServiceListResp, 0)
  27. total := int64(0)
  28. if err := svc.List(req, &resp, &total); err != nil {
  29. e.Error(http.StatusInternalServerError, err, err.Error())
  30. return
  31. }
  32. e.PageOK(resp, int(total), req.PageIndex, req.PageSize, "success")
  33. }
  34. func (e Service) ListNoAppAlias(c *gin.Context) {
  35. req := new(dto.ServiceListNoAppAliasReq)
  36. svc := new(service.Service)
  37. if err := e.Init(c, req, &svc.OtService); err != nil {
  38. return
  39. }
  40. resp := make([]dto.ServiceListNoAppAliasResp, 0)
  41. total := int64(0)
  42. if err := svc.ListNoAppAlias(req, &resp, &total); err != nil {
  43. e.Error(http.StatusInternalServerError, err, err.Error())
  44. return
  45. }
  46. e.PageOK(resp, int(total), req.GetPageIndex(), req.GetPageSize(), "success")
  47. }
  48. func (e Service) Update(c *gin.Context) {
  49. req := new(dto.ServiceUpdateReq)
  50. svc := new(service.Service)
  51. if err := e.Init(c, req, &svc.OtService); err != nil {
  52. return
  53. }
  54. if err := svc.Update(req); err != nil {
  55. e.Error(500, err, fmt.Sprintf("修改Service失败,\r\n失败信息 %s", err.Error()))
  56. return
  57. }
  58. e.OK(req.GetId(), "修改成功")
  59. }
  60. func (e Service) Stats(c *gin.Context) {
  61. req := new(dto.ServiceStatsReq)
  62. svc := new(service.Service)
  63. if err := e.Init(c, req, &svc.OtService); err != nil {
  64. return
  65. }
  66. resp := new(dto.ServiceStatsResp)
  67. if config.ExtConfig.ClickhouseMetrics {
  68. if err := svc.StatsFromClickhouse(req, resp); err != nil {
  69. e.Error(http.StatusInternalServerError, err, err.Error())
  70. return
  71. }
  72. } else {
  73. if err := svc.Stats(req, resp); err != nil {
  74. e.Error(http.StatusInternalServerError, err, err.Error())
  75. return
  76. }
  77. }
  78. e.OK(resp, "success")
  79. }
  80. // Show 服务拓扑
  81. func (e Service) Show(c *gin.Context) {
  82. req := new(dto.ServiceGetEdgesReq)
  83. svc := new(service.Service)
  84. if err := e.Init(c, req, &svc.OtService); err != nil {
  85. return
  86. }
  87. // err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors
  88. // if err != nil {
  89. // e.Error(http.StatusInternalServerError, err, err.Error())
  90. // return
  91. // }
  92. if req.StartTime == 0 && req.EndTime == 0 { // 兼容前端
  93. req.EndTime = time.Now().Unix()
  94. req.StartTime = req.EndTime - 600 // 10分钟
  95. }
  96. resp := new(models.Graph)
  97. if err := svc.GetGraph(c, req, resp); err != nil {
  98. e.Error(http.StatusInternalServerError, err, err.Error())
  99. return
  100. }
  101. e.OK(resp, "success")
  102. }
  103. // SpanScatterChart 点击服务拓扑结点时,左侧展示的散点图
  104. func (e Service) SpanScatterChart(c *gin.Context) {
  105. req := new(dto.SpanScatterChartReq)
  106. svc := new(service.Service)
  107. if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
  108. e.Error(http.StatusInternalServerError, err, err.Error())
  109. return
  110. }
  111. result := make(models.ScatterChart)
  112. if err := svc.GetSpanScatterChart(c, req, &result); err != nil {
  113. e.Error(http.StatusInternalServerError, err, err.Error())
  114. return
  115. }
  116. e.OK(result, "success")
  117. }
  118. // ServiceLiveness 服务活跃度
  119. func (e Service) ServiceLiveness(c *gin.Context) {
  120. req := new(dto.SpanScatterChartReq)
  121. svc := new(service.Service)
  122. if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
  123. e.Error(http.StatusInternalServerError, err, err.Error())
  124. return
  125. }
  126. result := make([]models.CoordinatePoint, 0)
  127. if err := svc.GetServiceLiveness(c, req, &result); err != nil {
  128. e.Error(http.StatusInternalServerError, err, err.Error())
  129. return
  130. }
  131. e.OK(result, "success")
  132. }
  133. func (e Service) CompareServiceLiveness(c *gin.Context) {
  134. req := new(dto.SpanScatterChartReq)
  135. svc := new(service.Service)
  136. if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
  137. e.Error(http.StatusInternalServerError, err, err.Error())
  138. return
  139. }
  140. tmpResult := make([]models.CoordinatePoint, 0)
  141. if err := svc.CompareServiceLiveness(c, req, &tmpResult); err != nil {
  142. e.Error(http.StatusInternalServerError, err, err.Error())
  143. return
  144. }
  145. length := len(tmpResult) / 2 * 2
  146. halfInedx := len(tmpResult) / 2
  147. tmpResult = tmpResult[:length]
  148. timeList := make([]any, halfInedx)
  149. ct := make([]any, halfInedx)
  150. fd := make([]any, halfInedx)
  151. for i := 0; i < halfInedx; i++ {
  152. fd[i] = tmpResult[i][1]
  153. ct[i] = tmpResult[i+halfInedx][1]
  154. timeList[i] = tmpResult[i+halfInedx][0]
  155. }
  156. result := make(map[string][]any)
  157. result["time"] = timeList
  158. result["current"] = ct
  159. result["forward"] = fd
  160. e.OK(result, "success")
  161. }
  162. func (e Service) CompareServiceErrors(c *gin.Context) {
  163. req := new(dto.SpanScatterChartReq)
  164. svc := new(service.Service)
  165. if err := e.MakeContext(c).MakeDB().Bind(req).MakeService(&svc.OtService).Errors; err != nil {
  166. e.Error(http.StatusInternalServerError, err, err.Error())
  167. return
  168. }
  169. tmpResult := make([]models.CoordinatePoint, 0)
  170. if err := svc.CompareServiceErrors(c, req, &tmpResult); err != nil {
  171. e.Error(http.StatusInternalServerError, err, err.Error())
  172. return
  173. }
  174. length := len(tmpResult) / 2 * 2
  175. halfInedx := len(tmpResult) / 2
  176. tmpResult = tmpResult[:length]
  177. timeList := make([]any, halfInedx)
  178. ct := make([]any, halfInedx)
  179. fd := make([]any, halfInedx)
  180. for i := 0; i < halfInedx; i++ {
  181. fd[i] = tmpResult[i][1]
  182. ct[i] = tmpResult[i+halfInedx][1]
  183. timeList[i] = tmpResult[i+halfInedx][0]
  184. }
  185. result := make(map[string][]any)
  186. result["time"] = timeList
  187. result["current"] = ct
  188. result["forward"] = fd
  189. e.OK(result, "success")
  190. }
  191. // Spans span列表
  192. func (e Service) Spans(c *gin.Context) {
  193. req := new(dto.ServiceSpansReq)
  194. svc := new(service.Service)
  195. if err := e.Init(c, req, &svc.OtService); err != nil {
  196. return
  197. }
  198. resp := make([]dto.ServiceSpansResp, 0)
  199. count := int64(0)
  200. if err := svc.Spans(req, &resp, &count); err != nil {
  201. e.Error(http.StatusInternalServerError, err, err.Error())
  202. return
  203. }
  204. e.PageOK(resp, int(count), req.GetPageIndex(), req.GetPageSize(), "success")
  205. }
  206. // 生成service graph点边数据
  207. func (sj Service) GenGraph(c *gin.Context) {
  208. svc := service.Service{}
  209. sj.MakeContext(c).MakeDB().MakeService(&svc.OtService)
  210. resp := new(dto.ServiceJobGenServiceResp)
  211. if err := svc.GenService(resp); err != nil {
  212. sj.Error(http.StatusInternalServerError, err, err.Error())
  213. return
  214. }
  215. sj.OK(resp, "success")
  216. }
  217. // 数字视图
  218. func (e Service) Digits(c *gin.Context) {
  219. req := new(dto.ServiceDigitsReq)
  220. svc := new(service.Service)
  221. if err := e.Init(c, req, &svc.OtService); err != nil {
  222. return
  223. }
  224. resp := new(dto.ServiceDigitsResp)
  225. if err := svc.Digits(req, resp); err != nil {
  226. e.Error(http.StatusInternalServerError, err, err.Error())
  227. return
  228. }
  229. e.OK(resp, "success")
  230. }
  231. // 数字视图生成
  232. func (e Service) DigitsGen(c *gin.Context) {
  233. svc := service.Service{}
  234. e.MakeContext(c).MakeDB().MakeService(&svc.OtService)
  235. if err := svc.DigitsGen(); err != nil {
  236. e.Error(http.StatusInternalServerError, err, err.Error())
  237. return
  238. }
  239. e.OK("", "success")
  240. }
  241. // 拓扑图
  242. // 待优化的点:
  243. //
  244. // 由于多个请求会产生多个连接,一个连接每次会产生多次数据库查询,连接越多数据库查询越多
  245. // 改成 一个业务对应一组数据库查询,多个连接过来时,如果是同一个业务,使用同一个业务的查询结果
  246. // 即使连接很多,也不会对数据库造成压力,数据库的查询只与业务数量相关
  247. func (e Service) Topology(c *gin.Context) {
  248. req := new(dto.ServiceGetEdgesReq)
  249. svc := new(service.Service)
  250. if err := e.Init(c, req, &svc.OtService); err != nil {
  251. return
  252. }
  253. timeout := 5 * time.Minute
  254. // timeout = 30 * time.Second
  255. ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
  256. defer cancel()
  257. conn, _ := websocket.Conn(c)
  258. go func() {
  259. for {
  260. // 从客户端读取数据
  261. msgType, msg, err := conn.ReadMessage()
  262. if err != nil {
  263. logger.Errorf("读取客户端消息失败: %s", err)
  264. cancel()
  265. return
  266. }
  267. if msgType == ws.CloseMessage {
  268. logger.Infof("收到客户端发送的断开连接消息: %d %s", msgType, msg)
  269. cancel()
  270. return
  271. }
  272. // 如果是其它类型的消息,暂时不处理,未与前端约定
  273. logger.Info("客户端正常消息:%s", msg)
  274. }
  275. }()
  276. ticker := time.NewTicker(5 * time.Second)
  277. defer ticker.Stop()
  278. first := true
  279. for {
  280. sgb := new(service.ScopeGraphBasic)
  281. resp := new(models.Graph)
  282. select {
  283. case <-ticker.C:
  284. // 返回正常从库中读取的数据
  285. sgb.UpdateGraph(svc, c, req, resp)
  286. // conn.WriteMessage(ws.TextMessage, []byte("normal message..."))
  287. conn.WriteJSON(*sgb)
  288. logger.Infof("ticker 5s...")
  289. case <-ctx.Done():
  290. logger.Warnf("客户端已关闭,或超过最大连接时间(%.2fs)", timeout.Seconds())
  291. return
  292. default:
  293. // 返回初使值
  294. if first {
  295. sgb.AddNoSoulGraph(svc, c, req, resp)
  296. conn.WriteJSON(*sgb)
  297. if req.Live {
  298. req.EndTime = 0
  299. req.StartTime = 0
  300. req.CheckFilling(3 * time.Minute)
  301. }
  302. resp := new(models.Graph)
  303. sgb = new(service.ScopeGraphBasic)
  304. sgb.UpdateGraph(svc, c, req, resp)
  305. conn.WriteJSON(*sgb)
  306. first = false
  307. }
  308. time.Sleep(100 * time.Microsecond)
  309. }
  310. }
  311. }