pipe_router.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package app
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "context"
  8. log "github.com/sirupsen/logrus"
  9. "github.com/weaveworks/common/mtime"
  10. "github.com/weaveworks/scope/common/xfer"
  11. )
  12. const (
  13. gcInterval = 30 * time.Second // we check all the pipes every 30s
  14. pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
  15. gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
  16. )
  17. // End is an enum for either end of the pipe.
  18. type End int
  19. // Valid values of type End
  20. const (
  21. UIEnd = iota
  22. ProbeEnd
  23. )
  24. func (e End) String() string {
  25. if e == UIEnd {
  26. return "ui"
  27. }
  28. return "probe"
  29. }
  30. // PipeRouter stores pipes and allows you to connect to either end of them.
  31. type PipeRouter interface {
  32. Exists(context.Context, string) (bool, error)
  33. Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
  34. Release(context.Context, string, End) error
  35. Delete(context.Context, string) error
  36. Stop()
  37. }
  38. // PipeRouter connects incoming and outgoing pipes.
  39. type localPipeRouter struct {
  40. sync.Mutex
  41. wait sync.WaitGroup
  42. quit chan struct{}
  43. pipes map[string]*pipe
  44. }
  45. // for each end of the pipe, we keep a reference count & lastUsedTIme,
  46. // such that we can timeout pipes when either end is inactive.
  47. type pipe struct {
  48. xfer.Pipe
  49. tombstoneTime time.Time
  50. ui, probe end
  51. }
  52. type end struct {
  53. refCount int
  54. lastUsedTime time.Time
  55. }
  56. func (p *pipe) end(end End) (*end, io.ReadWriter) {
  57. ui, probe := p.Ends()
  58. if end == UIEnd {
  59. return &p.ui, ui
  60. }
  61. return &p.probe, probe
  62. }
  63. // NewLocalPipeRouter returns a new local (in-memory) pipe router.
  64. func NewLocalPipeRouter() PipeRouter {
  65. pipeRouter := &localPipeRouter{
  66. quit: make(chan struct{}),
  67. pipes: map[string]*pipe{},
  68. }
  69. pipeRouter.wait.Add(1)
  70. go pipeRouter.gcLoop()
  71. return pipeRouter
  72. }
  73. func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
  74. pr.Lock()
  75. defer pr.Unlock()
  76. p, ok := pr.pipes[id]
  77. if !ok {
  78. return true, nil
  79. }
  80. return !p.Closed(), nil
  81. }
  82. func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
  83. pr.Lock()
  84. defer pr.Unlock()
  85. p, ok := pr.pipes[id]
  86. if !ok {
  87. log.Debugf("Creating pipe id %s", id)
  88. p = &pipe{
  89. ui: end{lastUsedTime: mtime.Now()},
  90. probe: end{lastUsedTime: mtime.Now()},
  91. Pipe: xfer.NewPipe(),
  92. }
  93. pr.pipes[id] = p
  94. }
  95. if p.Closed() {
  96. return nil, nil, fmt.Errorf("Pipe %s closed", id)
  97. }
  98. end, endIO := p.end(e)
  99. end.refCount++
  100. return p, endIO, nil
  101. }
  102. func (pr *localPipeRouter) Release(_ context.Context, id string, e End) error {
  103. pr.Lock()
  104. defer pr.Unlock()
  105. p, ok := pr.pipes[id]
  106. if !ok {
  107. return fmt.Errorf("Pipe %s not found", id)
  108. }
  109. end, _ := p.end(e)
  110. end.refCount--
  111. if end.refCount > 0 {
  112. return nil
  113. }
  114. if !p.Closed() {
  115. end.lastUsedTime = mtime.Now()
  116. }
  117. return nil
  118. }
  119. func (pr *localPipeRouter) Delete(_ context.Context, id string) error {
  120. pr.Lock()
  121. defer pr.Unlock()
  122. p, ok := pr.pipes[id]
  123. if !ok {
  124. return nil
  125. }
  126. p.Close()
  127. p.tombstoneTime = mtime.Now()
  128. return nil
  129. }
  130. func (pr *localPipeRouter) Stop() {
  131. close(pr.quit)
  132. pr.wait.Wait()
  133. }
  134. func (pr *localPipeRouter) gcLoop() {
  135. defer pr.wait.Done()
  136. ticker := time.Tick(gcInterval)
  137. for {
  138. select {
  139. case <-pr.quit:
  140. return
  141. case <-ticker:
  142. }
  143. pr.timeout()
  144. pr.garbageCollect()
  145. }
  146. }
  147. func (pr *localPipeRouter) timeout() {
  148. pr.Lock()
  149. defer pr.Unlock()
  150. now := mtime.Now()
  151. for id, pipe := range pr.pipes {
  152. if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
  153. continue
  154. }
  155. if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
  156. (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
  157. log.Infof("Timing out pipe %s", id)
  158. pipe.Close()
  159. pipe.tombstoneTime = now
  160. }
  161. }
  162. }
  163. func (pr *localPipeRouter) garbageCollect() {
  164. pr.Lock()
  165. defer pr.Unlock()
  166. now := mtime.Now()
  167. for pipeID, pipe := range pr.pipes {
  168. if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
  169. delete(pr.pipes, pipeID)
  170. }
  171. }
  172. }