123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- package multitenant
- import (
- "fmt"
- "io"
- "net/http"
- "net/url"
- "sync"
- "time"
- "context"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- opentracing "github.com/opentracing/opentracing-go"
- log "github.com/sirupsen/logrus"
- "github.com/weaveworks/common/middleware"
- "github.com/weaveworks/common/mtime"
- "github.com/weaveworks/scope/app"
- "github.com/weaveworks/scope/common/xfer"
- )
- const (
- gcInterval = 30 * time.Second // we check all the pipes every 30s
- pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
- gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
- )
- var (
- wsDialer = &websocket.Dialer{}
- )
- // TODO deal with garbage collection
- type consulPipe struct {
- CreatedAt, DeletedAt time.Time
- UIAddr, ProbeAddr string // Addrs where each end is connected
- UIRef, ProbeRef int // Ref counts
- }
- func (c *consulPipe) setAddrFor(e app.End, addr string) {
- if e == app.UIEnd {
- c.UIAddr = addr
- } else {
- c.ProbeAddr = addr
- }
- }
- func (c *consulPipe) addrFor(e app.End) string {
- if e == app.UIEnd {
- return c.UIAddr
- }
- return c.ProbeAddr
- }
- func (c *consulPipe) eitherEndFor(addr string) bool {
- return c.addrFor(app.UIEnd) == addr || c.addrFor(app.ProbeEnd) == addr
- }
- func (c *consulPipe) acquire(e app.End) int {
- if e == app.UIEnd {
- c.UIRef++
- return c.UIRef
- }
- c.ProbeRef++
- return c.ProbeRef
- }
- func (c *consulPipe) release(e app.End) int {
- if e == app.UIEnd {
- c.UIRef--
- return c.UIRef
- }
- c.ProbeRef--
- return c.ProbeRef
- }
- type consulPipeRouter struct {
- prefix string
- advertise string // Address of this pipe router to advertise in consul
- client ConsulClient
- userIDer UserIDer
- activePipes map[string]xfer.Pipe
- bridges map[string]*bridgeConnection
- actorChan chan func()
- pipeWaiters map[string][]chan xfer.Pipe
- // Used by Stop()
- quit chan struct{}
- wait sync.WaitGroup
- }
- // NewConsulPipeRouter returns a new consul based router
- func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
- pipeRouter := &consulPipeRouter{
- prefix: prefix,
- advertise: advertise,
- client: client,
- userIDer: userIDer,
- activePipes: map[string]xfer.Pipe{},
- bridges: map[string]*bridgeConnection{},
- actorChan: make(chan func()),
- pipeWaiters: map[string][]chan xfer.Pipe{},
- quit: make(chan struct{}),
- }
- pipeRouter.wait.Add(2)
- go pipeRouter.watchAll()
- go pipeRouter.actor()
- go pipeRouter.privateAPI()
- return pipeRouter
- }
- func (pr *consulPipeRouter) Stop() {
- close(pr.quit)
- pr.wait.Wait()
- }
- func (pr *consulPipeRouter) actor() {
- defer pr.wait.Done()
- for {
- select {
- case f := <-pr.actorChan:
- f()
- case <-pr.quit:
- return
- }
- }
- }
- // watchAll listens to all pipe updates from consul.
- // This is effectively a distributed, consistent actor routine.
- // All state changes for this pipe router happen in this loop,
- // and all the methods are implemented as CAS's on consul, to
- // trigger an event in this loop.
- func (pr *consulPipeRouter) watchAll() {
- defer pr.wait.Done()
- pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
- cp := *value.(*consulPipe)
- select {
- case pr.actorChan <- func() { pr.handlePipeUpdate(key, cp) }:
- return true
- case <-pr.quit:
- return false
- }
- })
- }
- func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
- // 1. If this pipe is closed, or we're not one of the ends, we
- // should ensure our local pipe (and bridge) is closed.
- if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
- pipe, ok := pr.activePipes[key]
- delete(pr.activePipes, key)
- if ok {
- log.Infof("Deleting pipe %s", key)
- pipe.Close()
- }
- bridge, ok := pr.bridges[key]
- delete(pr.bridges, key)
- if ok {
- bridge.stop()
- }
- return
- }
- if !cp.eitherEndFor(pr.advertise) {
- return
- }
- // 2. If this pipe if for us, we should have a pipe for it.
- pipe, ok := pr.activePipes[key]
- if !ok {
- log.Infof("Creating pipe %s", key)
- pipe = xfer.NewPipe()
- pr.activePipes[key] = pipe
- for _, pw := range pr.pipeWaiters[key] {
- pw <- pipe
- }
- delete(pr.pipeWaiters, key)
- }
- // 3. Ensure there is a bridging connection for this pipe.
- // Semantics are the owner of the UIEnd connects to the owner of the ProbeEnd
- shouldBridge := cp.DeletedAt.IsZero() &&
- cp.addrFor(app.UIEnd) != cp.addrFor(app.ProbeEnd) &&
- cp.addrFor(app.UIEnd) == pr.advertise &&
- cp.addrFor(app.ProbeEnd) != ""
- bridge, ok := pr.bridges[key]
- // If we shouldn't be bridging but are, or we should be bridging but are pointing
- // at the wrong place, stop the current bridge.
- if (!shouldBridge && ok) || (shouldBridge && ok && bridge.addr != cp.addrFor(app.ProbeEnd)) {
- delete(pr.bridges, key)
- bridge.stop()
- ok = false
- }
- // If we should be bridging and are not, start a new bridge
- if shouldBridge && !ok {
- bridge = newBridgeConnection(key, cp.addrFor(app.ProbeEnd), pipe)
- pr.bridges[key] = bridge
- }
- }
- func (pr *consulPipeRouter) getPipe(key string) xfer.Pipe {
- pc := make(chan xfer.Pipe)
- select {
- case pr.actorChan <- func() { pc <- pr.activePipes[key] }:
- return <-pc
- case <-pr.quit:
- return nil
- }
- }
- func (pr *consulPipeRouter) waitForPipe(key string) xfer.Pipe {
- pc := make(chan xfer.Pipe)
- select {
- case pr.actorChan <- func() {
- pipe, ok := pr.activePipes[key]
- if ok {
- pc <- pipe
- } else {
- pr.pipeWaiters[key] = append(pr.pipeWaiters[key], pc)
- }
- }:
- return <-pc
- case <-pr.quit:
- return nil
- }
- }
- func (pr *consulPipeRouter) privateAPI() {
- router := mux.NewRouter()
- router.Methods("GET").
- MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
- HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- key := mux.Vars(r)["key"]
- log.Infof("%s: Server bridge connection started", key)
- defer log.Infof("%s: Server bridge connection stopped", key)
- pipe := pr.getPipe(key)
- if pipe == nil {
- log.Errorf("%s: Server bridge connection; Unknown pipe!", key)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- conn, err := xfer.Upgrade(w, r, nil)
- if err != nil {
- log.Errorf("%s: Server bridge connection; Error upgrading to websocket: %v", key, err)
- return
- }
- defer conn.Close()
- end, _ := pipe.Ends()
- if _, err := pipe.CopyToWebsocket(end, conn); err != nil {
- log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err)
- }
- })
- handler := middleware.Tracer{RouteMatcher: router}.Wrap(router)
- log.Infof("Serving private API on endpoint %s.", pr.advertise)
- log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, handler))
- }
- func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
- userID, err := pr.userIDer(ctx)
- if err != nil {
- return false, err
- }
- key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
- consulPipe := consulPipe{}
- err = pr.client.Get(ctx, key, &consulPipe)
- if err == ErrNotFound {
- return false, nil
- } else if err != nil {
- return false, err
- }
- return consulPipe.DeletedAt.IsZero(), nil
- }
- func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer.Pipe, io.ReadWriter, error) {
- userID, err := pr.userIDer(ctx)
- if err != nil {
- return nil, nil, err
- }
- key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
- log.Infof("Get %s:%s", key, e)
- span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Get", opentracing.Tag{Key: "key", Value: key})
- defer span.Finish()
- // Try to ensure the given end of the given pipe
- // is 'owned' by this pipe service replica in consul.
- err = pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
- var pipe *consulPipe
- if in == nil {
- pipe = &consulPipe{
- CreatedAt: mtime.Now(),
- }
- } else {
- pipe = in.(*consulPipe)
- }
- if !pipe.DeletedAt.IsZero() {
- return nil, false, fmt.Errorf("Pipe %s has been deleted", key)
- }
- end := pipe.addrFor(e)
- if end != "" && end != pr.advertise {
- return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end)
- }
- pipe.setAddrFor(e, pr.advertise)
- pipe.acquire(e)
- return pipe, false, nil
- })
- if err != nil {
- return nil, nil, err
- }
- pipe := pr.waitForPipe(key)
- myEnd, _ := pipe.Ends()
- if e == app.ProbeEnd {
- _, myEnd = pipe.Ends()
- }
- return pipe, myEnd, nil
- }
- func (pr *consulPipeRouter) Release(ctx context.Context, id string, e app.End) error {
- userID, err := pr.userIDer(ctx)
- if err != nil {
- return err
- }
- key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
- log.Infof("Release %s:%s", key, e)
- span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Release", opentracing.Tag{Key: "key", Value: key})
- defer span.Finish()
- // atomically clear my end of the pipe in consul
- return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
- if in == nil {
- return nil, false, fmt.Errorf("pipe %s not found", id)
- }
- p := in.(*consulPipe)
- if p.addrFor(e) != pr.advertise {
- return nil, false, fmt.Errorf("pipe %s not owned by us", id)
- }
- refs := p.release(e)
- if refs == 0 {
- p.setAddrFor(e, "")
- }
- return p, true, nil
- })
- }
- func (pr *consulPipeRouter) Delete(ctx context.Context, id string) error {
- userID, err := pr.userIDer(ctx)
- if err != nil {
- return err
- }
- key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
- log.Infof("Delete %s", key)
- span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Delete", opentracing.Tag{Key: "key", Value: key})
- defer span.Finish()
- return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
- if in == nil {
- return nil, false, fmt.Errorf("Pipe %s not found", id)
- }
- p := in.(*consulPipe)
- p.DeletedAt = mtime.Now()
- return p, false, nil
- })
- }
- // A bridgeConnection represents a connection between two pipe router replicas.
- // They are created & destroyed in response to events from consul, which in turn
- // are triggered when UIs or Probes connect to various pipe routers.
- type bridgeConnection struct {
- key string
- addr string // address to connect to
- pipe xfer.Pipe
- mtx sync.Mutex
- conn xfer.Websocket
- stopped bool
- wait sync.WaitGroup
- }
- func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection {
- log.Infof("%s: Starting client bridge connection", key)
- result := &bridgeConnection{
- key: key,
- addr: addr,
- pipe: pipe,
- }
- result.wait.Add(1)
- go result.loop()
- return result
- }
- func (bc *bridgeConnection) stop() {
- log.Infof("%s: Stopping client bridge connection", bc.key)
- bc.mtx.Lock()
- bc.stopped = true
- if bc.conn != nil {
- bc.conn.Close()
- end, _ := bc.pipe.Ends()
- end.Write(nil) // this will cause the other end of wake up and exit
- }
- bc.mtx.Unlock()
- bc.wait.Wait()
- }
- func (bc *bridgeConnection) loop() {
- log.Infof("%s: Client bridge connection started", bc.key)
- defer bc.wait.Done()
- defer log.Infof("%s: Client bridge connection stopped", bc.key)
- _, end := bc.pipe.Ends()
- url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))
- for {
- bc.mtx.Lock()
- bc.conn = nil
- if bc.stopped {
- bc.mtx.Unlock()
- return
- }
- bc.mtx.Unlock()
- // connect to other pipes instance
- conn, _, err := xfer.DialWS(wsDialer, url, http.Header{})
- if err != nil {
- log.Errorf("%s: Client bridge connection; Error connecting to %s: %v", bc.key, url, err)
- time.Sleep(time.Second) // TODO backoff
- continue
- }
- bc.mtx.Lock()
- if bc.stopped {
- bc.mtx.Unlock()
- conn.Close()
- return
- }
- bc.conn = conn
- bc.mtx.Unlock()
- if _, err := bc.pipe.CopyToWebsocket(end, conn); err != nil {
- log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err)
- }
- conn.Close()
- }
- }
|