123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- package app
- import (
- "fmt"
- "io"
- "sync"
- "time"
- "context"
- log "github.com/sirupsen/logrus"
- "github.com/weaveworks/common/mtime"
- "github.com/weaveworks/scope/common/xfer"
- )
- const (
- gcInterval = 30 * time.Second // we check all the pipes every 30s
- pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
- gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
- )
- // End is an enum for either end of the pipe.
- type End int
- // Valid values of type End
- const (
- UIEnd = iota
- ProbeEnd
- )
- func (e End) String() string {
- if e == UIEnd {
- return "ui"
- }
- return "probe"
- }
- // PipeRouter stores pipes and allows you to connect to either end of them.
- type PipeRouter interface {
- Exists(context.Context, string) (bool, error)
- Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
- Release(context.Context, string, End) error
- Delete(context.Context, string) error
- Stop()
- }
- // PipeRouter connects incoming and outgoing pipes.
- type localPipeRouter struct {
- sync.Mutex
- wait sync.WaitGroup
- quit chan struct{}
- pipes map[string]*pipe
- }
- // for each end of the pipe, we keep a reference count & lastUsedTIme,
- // such that we can timeout pipes when either end is inactive.
- type pipe struct {
- xfer.Pipe
- tombstoneTime time.Time
- ui, probe end
- }
- type end struct {
- refCount int
- lastUsedTime time.Time
- }
- func (p *pipe) end(end End) (*end, io.ReadWriter) {
- ui, probe := p.Ends()
- if end == UIEnd {
- return &p.ui, ui
- }
- return &p.probe, probe
- }
- // NewLocalPipeRouter returns a new local (in-memory) pipe router.
- func NewLocalPipeRouter() PipeRouter {
- pipeRouter := &localPipeRouter{
- quit: make(chan struct{}),
- pipes: map[string]*pipe{},
- }
- pipeRouter.wait.Add(1)
- go pipeRouter.gcLoop()
- return pipeRouter
- }
- func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
- pr.Lock()
- defer pr.Unlock()
- p, ok := pr.pipes[id]
- if !ok {
- return true, nil
- }
- return !p.Closed(), nil
- }
- func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
- pr.Lock()
- defer pr.Unlock()
- p, ok := pr.pipes[id]
- if !ok {
- log.Debugf("Creating pipe id %s", id)
- p = &pipe{
- ui: end{lastUsedTime: mtime.Now()},
- probe: end{lastUsedTime: mtime.Now()},
- Pipe: xfer.NewPipe(),
- }
- pr.pipes[id] = p
- }
- if p.Closed() {
- return nil, nil, fmt.Errorf("Pipe %s closed", id)
- }
- end, endIO := p.end(e)
- end.refCount++
- return p, endIO, nil
- }
- func (pr *localPipeRouter) Release(_ context.Context, id string, e End) error {
- pr.Lock()
- defer pr.Unlock()
- p, ok := pr.pipes[id]
- if !ok {
- return fmt.Errorf("Pipe %s not found", id)
- }
- end, _ := p.end(e)
- end.refCount--
- if end.refCount > 0 {
- return nil
- }
- if !p.Closed() {
- end.lastUsedTime = mtime.Now()
- }
- return nil
- }
- func (pr *localPipeRouter) Delete(_ context.Context, id string) error {
- pr.Lock()
- defer pr.Unlock()
- p, ok := pr.pipes[id]
- if !ok {
- return nil
- }
- p.Close()
- p.tombstoneTime = mtime.Now()
- return nil
- }
- func (pr *localPipeRouter) Stop() {
- close(pr.quit)
- pr.wait.Wait()
- }
- func (pr *localPipeRouter) gcLoop() {
- defer pr.wait.Done()
- ticker := time.Tick(gcInterval)
- for {
- select {
- case <-pr.quit:
- return
- case <-ticker:
- }
- pr.timeout()
- pr.garbageCollect()
- }
- }
- func (pr *localPipeRouter) timeout() {
- pr.Lock()
- defer pr.Unlock()
- now := mtime.Now()
- for id, pipe := range pr.pipes {
- if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
- continue
- }
- if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
- (pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
- log.Infof("Timing out pipe %s", id)
- pipe.Close()
- pipe.tombstoneTime = now
- }
- }
- }
- func (pr *localPipeRouter) garbageCollect() {
- pr.Lock()
- defer pr.Unlock()
- now := mtime.Now()
- for pipeID, pipe := range pr.pipes {
- if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
- delete(pr.pipes, pipeID)
- }
- }
- }
|