pipes.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package xfer
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/gorilla/websocket"
  6. )
  7. // Pipe is a bi-directional channel from something in the probe
  8. // to the UI.
  9. type Pipe interface {
  10. Ends() (io.ReadWriter, io.ReadWriter)
  11. CopyToWebsocket(io.ReadWriter, Websocket) (bool, error)
  12. Close() error
  13. Closed() bool
  14. OnClose(func())
  15. }
  16. type pipe struct {
  17. mtx sync.Mutex
  18. wg sync.WaitGroup
  19. port, starboard io.ReadWriter
  20. closers []io.Closer
  21. quit chan struct{}
  22. closed bool
  23. onClose func()
  24. }
  25. // NewPipeFromEnds makes a new pipe specifying its ends
  26. func NewPipeFromEnds(local io.ReadWriter, remote io.ReadWriter) Pipe {
  27. return &pipe{
  28. port: local,
  29. starboard: remote,
  30. quit: make(chan struct{}),
  31. }
  32. }
  33. // NewPipe makes a new pipe
  34. func NewPipe() Pipe {
  35. r1, w1 := io.Pipe()
  36. r2, w2 := io.Pipe()
  37. return &pipe{
  38. port: struct {
  39. io.Reader
  40. io.Writer
  41. }{
  42. r1, w2,
  43. },
  44. starboard: struct {
  45. io.Reader
  46. io.Writer
  47. }{
  48. r2, w1,
  49. },
  50. closers: []io.Closer{
  51. r1, r2, w1, w2,
  52. },
  53. quit: make(chan struct{}),
  54. }
  55. }
  56. func (p *pipe) Ends() (io.ReadWriter, io.ReadWriter) {
  57. return p.port, p.starboard
  58. }
  59. func (p *pipe) Close() error {
  60. p.mtx.Lock()
  61. var onClose func()
  62. if !p.closed {
  63. p.closed = true
  64. close(p.quit)
  65. for _, c := range p.closers {
  66. c.Close()
  67. }
  68. onClose = p.onClose
  69. }
  70. p.mtx.Unlock()
  71. p.wg.Wait()
  72. // Don't run onClose under lock.
  73. if onClose != nil {
  74. onClose()
  75. }
  76. return nil
  77. }
  78. func (p *pipe) Closed() bool {
  79. p.mtx.Lock()
  80. defer p.mtx.Unlock()
  81. return p.closed
  82. }
  83. func (p *pipe) OnClose(f func()) {
  84. p.mtx.Lock()
  85. defer p.mtx.Unlock()
  86. p.onClose = f
  87. }
  88. // CopyToWebsocket copies pipe data to/from a websocket. It blocks.
  89. // Returns bool 'done' and an error, masked if websocket closed in an expected manner.
  90. func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) (bool, error) {
  91. p.mtx.Lock()
  92. if p.closed {
  93. p.mtx.Unlock()
  94. return true, nil
  95. }
  96. p.wg.Add(1)
  97. p.mtx.Unlock()
  98. defer p.wg.Done()
  99. endError := make(chan error, 1)
  100. connError := make(chan error, 1)
  101. // Read-from-UI loop
  102. go func() {
  103. for {
  104. _, buf, err := conn.ReadMessage() // TODO type should be binary message
  105. if err != nil {
  106. connError <- err
  107. return
  108. }
  109. if p.Closed() {
  110. return
  111. }
  112. if _, err := end.Write(buf); err != nil {
  113. endError <- err
  114. return
  115. }
  116. }
  117. }()
  118. // Write-to-UI loop
  119. go func() {
  120. buf := make([]byte, 1024)
  121. for {
  122. n, err := end.Read(buf)
  123. if err != nil {
  124. endError <- err
  125. return
  126. }
  127. if p.Closed() {
  128. return
  129. }
  130. if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
  131. connError <- err
  132. return
  133. }
  134. }
  135. }()
  136. // block until one of the goroutines exits
  137. // this convoluted mechanism is to ensure we only close the websocket once.
  138. select {
  139. case err := <-endError:
  140. return false, err
  141. case err := <-connError:
  142. if IsExpectedWSCloseError(err) {
  143. return false, nil
  144. }
  145. return false, err
  146. case <-p.quit:
  147. return true, nil
  148. }
  149. }