manager.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package ws
  2. import (
  3. "sync"
  4. "github.com/go-admin-team/go-admin-core/logger"
  5. )
  6. // Manager 所有 websocket 信息
  7. type Manager struct {
  8. Group map[string]map[string]*Client
  9. groupCount, clientCount uint
  10. Lock sync.Mutex
  11. Register, UnRegister chan *Client
  12. Message chan *MessageData
  13. MessageString chan *MessageDataString
  14. GroupMessage chan *GroupMessageData
  15. BroadCastMessage chan *BroadCastMessageData
  16. }
  17. // Start 启动 websocket 管理器
  18. func (manager *Manager) Start() {
  19. for {
  20. select {
  21. // 注册
  22. case client := <-manager.Register:
  23. logger.Debugf("client [%s] connect", client.ID)
  24. logger.Debugf("register client [%s] to group [%s]", client.ID, client.Group)
  25. manager.Lock.Lock()
  26. if manager.Group[client.Group] == nil {
  27. manager.Group[client.Group] = make(map[string]*Client)
  28. manager.groupCount += 1
  29. }
  30. manager.Group[client.Group][client.ID] = client
  31. manager.clientCount += 1
  32. manager.Lock.Unlock()
  33. // 注销
  34. case client := <-manager.UnRegister:
  35. logger.Infof("unregister client [%s] from group [%s]", client.ID, client.Group)
  36. manager.Lock.Lock()
  37. if _, ok := manager.Group[client.Group]; ok {
  38. if _, ok := manager.Group[client.Group][client.ID]; ok {
  39. close(client.Message)
  40. delete(manager.Group[client.Group], client.ID)
  41. manager.clientCount -= 1
  42. if len(manager.Group[client.Group]) == 0 {
  43. //log.Printf("delete empty group [%s]", client.Group)
  44. delete(manager.Group, client.Group)
  45. manager.groupCount -= 1
  46. }
  47. }
  48. }
  49. manager.Lock.Unlock()
  50. // 发送广播数据到某个组的 channel 变量 Send 中
  51. //case data := <-manager.boardCast:
  52. // if groupMap, ok := manager.wsGroup[data.GroupId]; ok {
  53. // for _, conn := range groupMap {
  54. // conn.Send <- data.Data
  55. // }
  56. // }
  57. }
  58. }
  59. }
  60. // SendService 处理单个 client 发送数据
  61. func (manager *Manager) SendService() {
  62. for {
  63. select {
  64. case data := <-manager.Message:
  65. if groupMap, ok := manager.Group[data.Group]; ok {
  66. if conn, ok := groupMap[data.ID]; ok {
  67. conn.Message <- data.Message
  68. }
  69. }
  70. }
  71. }
  72. }
  73. // SendGroupService 处理 group 广播数据
  74. func (manager *Manager) SendGroupService() {
  75. for {
  76. select {
  77. // 发送广播数据到某个组的 channel 变量 Send 中
  78. case data := <-manager.GroupMessage:
  79. if groupMap, ok := manager.Group[data.Group]; ok {
  80. for _, conn := range groupMap {
  81. conn.Message <- data.Message
  82. }
  83. }
  84. }
  85. }
  86. }
  87. // SendAllService 处理广播数据
  88. func (manager *Manager) SendAllService() {
  89. for {
  90. select {
  91. case data := <-manager.BroadCastMessage:
  92. for _, v := range manager.Group {
  93. for _, conn := range v {
  94. conn.Message <- data.Message
  95. }
  96. }
  97. }
  98. }
  99. }
  100. // Send 向指定的 client 发送数据
  101. func (manager *Manager) Send(id string, group string, message []byte) {
  102. data := &MessageData{
  103. ID: id,
  104. Group: group,
  105. Message: message,
  106. }
  107. manager.Message <- data
  108. }
  109. // SendString 向指定的 client 发送数据
  110. func (manager *Manager) SendString(id string, group string, message string) {
  111. data := &MessageDataString{
  112. ID: id,
  113. Group: group,
  114. Message: message,
  115. }
  116. manager.MessageString <- data
  117. }
  118. // SendGroup 向指定的 Group 广播
  119. func (manager *Manager) SendGroup(group string, message []byte) {
  120. data := &GroupMessageData{
  121. Group: group,
  122. Message: message,
  123. }
  124. manager.GroupMessage <- data
  125. }
  126. // SendAll 广播
  127. func (manager *Manager) SendAll(message []byte) {
  128. data := &BroadCastMessageData{
  129. Message: message,
  130. }
  131. manager.BroadCastMessage <- data
  132. }
  133. // RegisterClient 注册
  134. func (manager *Manager) RegisterClient(client *Client) {
  135. manager.Register <- client
  136. }
  137. // UnRegisterClient 注销
  138. func (manager *Manager) UnRegisterClient(client *Client) {
  139. manager.UnRegister <- client
  140. }
  141. // LenGroup 当前组个数
  142. func (manager *Manager) LenGroup() uint {
  143. return manager.groupCount
  144. }
  145. // LenClient 当前连接个数
  146. func (manager *Manager) LenClient() uint {
  147. return manager.clientCount
  148. }
  149. // Info 获取 wsManager 管理器信息
  150. func (manager *Manager) Info() map[string]interface{} {
  151. managerInfo := make(map[string]interface{})
  152. managerInfo["groupLen"] = manager.LenGroup()
  153. managerInfo["clientLen"] = manager.LenClient()
  154. managerInfo["chanRegisterLen"] = len(manager.Register)
  155. managerInfo["chanUnregisterLen"] = len(manager.UnRegister)
  156. managerInfo["chanMessageLen"] = len(manager.Message)
  157. managerInfo["chanGroupMessageLen"] = len(manager.GroupMessage)
  158. managerInfo["chanBroadCastMessageLen"] = len(manager.BroadCastMessage)
  159. return managerInfo
  160. }