123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package ws
- import (
- "net/http"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/go-admin-team/go-admin-core/logger"
- "github.com/gorilla/websocket"
- uuid "github.com/satori/go.uuid"
- )
- // WSClient gin 处理 websocket handler
- func (manager *Manager) WSClient(ctx *gin.Context) {
- upGrader := websocket.Upgrader{
- // cross origin domain
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- // 处理 Sec-WebSocket-Protocol Header
- Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
- }
- conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
- if err != nil {
- logger.Debugf("websocket connect error: %s", ctx.Param("channel"))
- return
- }
- client := &Client{
- ID: uuid.NewV4().String(),
- Group: ctx.Param("channel"),
- Socket: conn,
- Message: make(chan []byte, 1024),
- }
- logger.Debug("client ", client)
- manager.RegisterClient(client)
- logger.Debug("before read xxxx")
- go client.Read()
- go client.Write()
- // time.Sleep(time.Second * 15)
- // 测试单个 client 发送数据
- // manager.Send(client.ID, client.Group, []byte("Send message ----"+time.Now().Format("2006-01-02 15:04:05")+" --- "+ctx.Param("channel")))
- manager.SendString(client.ID, client.Group, ("Send message ----" + time.Now().Format("2006-01-02 15:04:05") + " --- " + ctx.Param("channel")))
- }
- // //WSLogClient websocket handler 后台日志实时处理测试
- // func (manager *Manager) WSLogClient(ctx *gin.Context) {
- // upGrader := websocket.Upgrader{
- // CheckOrigin: func(r *http.Request) bool {
- // return true
- // },
- // Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
- // }
- // conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
- // if err != nil {
- // logging.Error("websocket connect error: %s", ctx.Param("channel"))
- // return
- // }
- // client := &Client{
- // ID: uuid.NewV4().String(),
- // Group: ctx.Param("channel"),
- // Socket: conn,
- // Message: make(chan []byte, 1024),
- // }
- // manager.RegisterClient(client)
- // go client.Write()
- // logContent := make(chan []byte, 1024)
- // handlerError := make(chan error, 128)
- // var filePath string
- // switch ctx.Param("channel") {
- // case "dialog":
- // filePath = setting.WSSetting.DiaLogPath
- // case "nginxlog":
- // filePath = setting.WSSetting.NginxLogPath
- // case "startlog":
- // filePath = setting.WSSetting.StartLogPath
- // default:
- // filePath = setting.WSSetting.StartLogPath
- // }
- // go util.TailFile(filePath, logContent, handlerError)
- // go func() {
- // for {
- // select {
- // case content := <-logContent:
- // manager.Send(client.ID, client.Group, content)
- // case <-handlerError:
- // fmt.Println(err.Error())
- // }
- // }
- // }()
- // }
- // //WSAsyncMissionMsg WSAsyncMissionMsg handler 异步消息处理
- // func (manager *Manager) WSAsyncMissionMsg(ctx *gin.Context) {
- // ag := app.Gin{C: ctx}
- // if int(manager.LenClient()) > setting.WSSetting.LenClient {
- // ag.Response(http.StatusBadRequest, e.ERROR_COUNT_FAIL, "能够建立的双端长链接已经超过上限")
- // return
- // }
- // msgID := ctx.Param("channel")
- // log.Println("get mission id:", msgID)
- // upGrader := websocket.Upgrader{
- // CheckOrigin: func(r *http.Request) bool {
- // return true
- // },
- // Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
- // }
- // conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
- // if err != nil {
- // logging.Error("websocket connect error: %s", ctx.Param("channel"))
- // return
- // }
- // client := &Client{
- // ID: msgID,
- // Group: ctx.Param("channel"),
- // Socket: conn,
- // Message: make(chan []byte, 1024),
- // }
- // manager.RegisterClient(client)
- // go client.Write()
- // // go client.Read() TODO: 客户端发起心跳,5秒同步一次信号,超时3次断开连接
- // handlerError := make(chan error, 128)
- // handler := new(controllers.RecvHandler)
- // handler.MissionID = msgID
- // handler.Message = make(chan []byte, 1024)
- // go rabbitmq.Recv(rabbitmq.NewQueueExchangeSimple(msgID, msgID, setting.MissionSetting.MqURL), handler, 3)
- // go func() {
- // for {
- // select {
- // case content := <-handler.Message:
- // manager.Send(client.ID, client.Group, content)
- // case <-handlerError:
- // fmt.Println(err.Error())
- // }
- // }
- // }()
- // }
- // TestSendGroup 测试组广播
- func TestSendGroup() {
- for {
- time.Sleep(time.Second * 20)
- WebsocketManager.SendGroup("pujielan", []byte("SendGroup message ----"+time.Now().Format("2006-01-02 15:04:05")))
- }
- }
- // TestSendAll 测试广播
- func TestSendAll() {
- for {
- time.Sleep(time.Second * 25)
- WebsocketManager.SendAll([]byte("SendAll message ----" + time.Now().Format("2006-01-02 15:04:05")))
- // fmt.Println(WebsocketManager.Info())
- }
- }
|