handler.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package ws
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gin-gonic/gin"
  6. "github.com/go-admin-team/go-admin-core/logger"
  7. "github.com/gorilla/websocket"
  8. uuid "github.com/satori/go.uuid"
  9. )
  10. // WSClient gin 处理 websocket handler
  11. func (manager *Manager) WSClient(ctx *gin.Context) {
  12. upGrader := websocket.Upgrader{
  13. // cross origin domain
  14. CheckOrigin: func(r *http.Request) bool {
  15. return true
  16. },
  17. // 处理 Sec-WebSocket-Protocol Header
  18. Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
  19. }
  20. conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
  21. if err != nil {
  22. logger.Debugf("websocket connect error: %s", ctx.Param("channel"))
  23. return
  24. }
  25. client := &Client{
  26. ID: uuid.NewV4().String(),
  27. Group: ctx.Param("channel"),
  28. Socket: conn,
  29. Message: make(chan []byte, 1024),
  30. }
  31. logger.Debug("client ", client)
  32. manager.RegisterClient(client)
  33. logger.Debug("before read xxxx")
  34. go client.Read()
  35. go client.Write()
  36. // time.Sleep(time.Second * 15)
  37. // 测试单个 client 发送数据
  38. // manager.Send(client.ID, client.Group, []byte("Send message ----"+time.Now().Format("2006-01-02 15:04:05")+" --- "+ctx.Param("channel")))
  39. manager.SendString(client.ID, client.Group, ("Send message ----" + time.Now().Format("2006-01-02 15:04:05") + " --- " + ctx.Param("channel")))
  40. }
  41. // //WSLogClient websocket handler 后台日志实时处理测试
  42. // func (manager *Manager) WSLogClient(ctx *gin.Context) {
  43. // upGrader := websocket.Upgrader{
  44. // CheckOrigin: func(r *http.Request) bool {
  45. // return true
  46. // },
  47. // Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
  48. // }
  49. // conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
  50. // if err != nil {
  51. // logging.Error("websocket connect error: %s", ctx.Param("channel"))
  52. // return
  53. // }
  54. // client := &Client{
  55. // ID: uuid.NewV4().String(),
  56. // Group: ctx.Param("channel"),
  57. // Socket: conn,
  58. // Message: make(chan []byte, 1024),
  59. // }
  60. // manager.RegisterClient(client)
  61. // go client.Write()
  62. // logContent := make(chan []byte, 1024)
  63. // handlerError := make(chan error, 128)
  64. // var filePath string
  65. // switch ctx.Param("channel") {
  66. // case "dialog":
  67. // filePath = setting.WSSetting.DiaLogPath
  68. // case "nginxlog":
  69. // filePath = setting.WSSetting.NginxLogPath
  70. // case "startlog":
  71. // filePath = setting.WSSetting.StartLogPath
  72. // default:
  73. // filePath = setting.WSSetting.StartLogPath
  74. // }
  75. // go util.TailFile(filePath, logContent, handlerError)
  76. // go func() {
  77. // for {
  78. // select {
  79. // case content := <-logContent:
  80. // manager.Send(client.ID, client.Group, content)
  81. // case <-handlerError:
  82. // fmt.Println(err.Error())
  83. // }
  84. // }
  85. // }()
  86. // }
  87. // //WSAsyncMissionMsg WSAsyncMissionMsg handler 异步消息处理
  88. // func (manager *Manager) WSAsyncMissionMsg(ctx *gin.Context) {
  89. // ag := app.Gin{C: ctx}
  90. // if int(manager.LenClient()) > setting.WSSetting.LenClient {
  91. // ag.Response(http.StatusBadRequest, e.ERROR_COUNT_FAIL, "能够建立的双端长链接已经超过上限")
  92. // return
  93. // }
  94. // msgID := ctx.Param("channel")
  95. // log.Println("get mission id:", msgID)
  96. // upGrader := websocket.Upgrader{
  97. // CheckOrigin: func(r *http.Request) bool {
  98. // return true
  99. // },
  100. // Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
  101. // }
  102. // conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
  103. // if err != nil {
  104. // logging.Error("websocket connect error: %s", ctx.Param("channel"))
  105. // return
  106. // }
  107. // client := &Client{
  108. // ID: msgID,
  109. // Group: ctx.Param("channel"),
  110. // Socket: conn,
  111. // Message: make(chan []byte, 1024),
  112. // }
  113. // manager.RegisterClient(client)
  114. // go client.Write()
  115. // // go client.Read() TODO: 客户端发起心跳,5秒同步一次信号,超时3次断开连接
  116. // handlerError := make(chan error, 128)
  117. // handler := new(controllers.RecvHandler)
  118. // handler.MissionID = msgID
  119. // handler.Message = make(chan []byte, 1024)
  120. // go rabbitmq.Recv(rabbitmq.NewQueueExchangeSimple(msgID, msgID, setting.MissionSetting.MqURL), handler, 3)
  121. // go func() {
  122. // for {
  123. // select {
  124. // case content := <-handler.Message:
  125. // manager.Send(client.ID, client.Group, content)
  126. // case <-handlerError:
  127. // fmt.Println(err.Error())
  128. // }
  129. // }
  130. // }()
  131. // }
  132. // TestSendGroup 测试组广播
  133. func TestSendGroup() {
  134. for {
  135. time.Sleep(time.Second * 20)
  136. WebsocketManager.SendGroup("pujielan", []byte("SendGroup message ----"+time.Now().Format("2006-01-02 15:04:05")))
  137. }
  138. }
  139. // TestSendAll 测试广播
  140. func TestSendAll() {
  141. for {
  142. time.Sleep(time.Second * 25)
  143. WebsocketManager.SendAll([]byte("SendAll message ----" + time.Now().Format("2006-01-02 15:04:05")))
  144. // fmt.Println(WebsocketManager.Info())
  145. }
  146. }