consul_pipe_router.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. package multitenant
  2. import (
  3. "fmt"
  4. "io"
  5. "net/http"
  6. "net/url"
  7. "sync"
  8. "time"
  9. "context"
  10. "github.com/gorilla/mux"
  11. "github.com/gorilla/websocket"
  12. opentracing "github.com/opentracing/opentracing-go"
  13. log "github.com/sirupsen/logrus"
  14. "github.com/weaveworks/common/middleware"
  15. "github.com/weaveworks/common/mtime"
  16. "github.com/weaveworks/scope/app"
  17. "github.com/weaveworks/scope/common/xfer"
  18. )
  19. const (
  20. gcInterval = 30 * time.Second // we check all the pipes every 30s
  21. pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
  22. gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
  23. )
  24. var (
  25. wsDialer = &websocket.Dialer{}
  26. )
  27. // TODO deal with garbage collection
  28. type consulPipe struct {
  29. CreatedAt, DeletedAt time.Time
  30. UIAddr, ProbeAddr string // Addrs where each end is connected
  31. UIRef, ProbeRef int // Ref counts
  32. }
  33. func (c *consulPipe) setAddrFor(e app.End, addr string) {
  34. if e == app.UIEnd {
  35. c.UIAddr = addr
  36. } else {
  37. c.ProbeAddr = addr
  38. }
  39. }
  40. func (c *consulPipe) addrFor(e app.End) string {
  41. if e == app.UIEnd {
  42. return c.UIAddr
  43. }
  44. return c.ProbeAddr
  45. }
  46. func (c *consulPipe) eitherEndFor(addr string) bool {
  47. return c.addrFor(app.UIEnd) == addr || c.addrFor(app.ProbeEnd) == addr
  48. }
  49. func (c *consulPipe) acquire(e app.End) int {
  50. if e == app.UIEnd {
  51. c.UIRef++
  52. return c.UIRef
  53. }
  54. c.ProbeRef++
  55. return c.ProbeRef
  56. }
  57. func (c *consulPipe) release(e app.End) int {
  58. if e == app.UIEnd {
  59. c.UIRef--
  60. return c.UIRef
  61. }
  62. c.ProbeRef--
  63. return c.ProbeRef
  64. }
  65. type consulPipeRouter struct {
  66. prefix string
  67. advertise string // Address of this pipe router to advertise in consul
  68. client ConsulClient
  69. userIDer UserIDer
  70. activePipes map[string]xfer.Pipe
  71. bridges map[string]*bridgeConnection
  72. actorChan chan func()
  73. pipeWaiters map[string][]chan xfer.Pipe
  74. // Used by Stop()
  75. quit chan struct{}
  76. wait sync.WaitGroup
  77. }
  78. // NewConsulPipeRouter returns a new consul based router
  79. func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
  80. pipeRouter := &consulPipeRouter{
  81. prefix: prefix,
  82. advertise: advertise,
  83. client: client,
  84. userIDer: userIDer,
  85. activePipes: map[string]xfer.Pipe{},
  86. bridges: map[string]*bridgeConnection{},
  87. actorChan: make(chan func()),
  88. pipeWaiters: map[string][]chan xfer.Pipe{},
  89. quit: make(chan struct{}),
  90. }
  91. pipeRouter.wait.Add(2)
  92. go pipeRouter.watchAll()
  93. go pipeRouter.actor()
  94. go pipeRouter.privateAPI()
  95. return pipeRouter
  96. }
  97. func (pr *consulPipeRouter) Stop() {
  98. close(pr.quit)
  99. pr.wait.Wait()
  100. }
  101. func (pr *consulPipeRouter) actor() {
  102. defer pr.wait.Done()
  103. for {
  104. select {
  105. case f := <-pr.actorChan:
  106. f()
  107. case <-pr.quit:
  108. return
  109. }
  110. }
  111. }
  112. // watchAll listens to all pipe updates from consul.
  113. // This is effectively a distributed, consistent actor routine.
  114. // All state changes for this pipe router happen in this loop,
  115. // and all the methods are implemented as CAS's on consul, to
  116. // trigger an event in this loop.
  117. func (pr *consulPipeRouter) watchAll() {
  118. defer pr.wait.Done()
  119. pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
  120. cp := *value.(*consulPipe)
  121. select {
  122. case pr.actorChan <- func() { pr.handlePipeUpdate(key, cp) }:
  123. return true
  124. case <-pr.quit:
  125. return false
  126. }
  127. })
  128. }
  129. func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
  130. // 1. If this pipe is closed, or we're not one of the ends, we
  131. // should ensure our local pipe (and bridge) is closed.
  132. if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
  133. pipe, ok := pr.activePipes[key]
  134. delete(pr.activePipes, key)
  135. if ok {
  136. log.Infof("Deleting pipe %s", key)
  137. pipe.Close()
  138. }
  139. bridge, ok := pr.bridges[key]
  140. delete(pr.bridges, key)
  141. if ok {
  142. bridge.stop()
  143. }
  144. return
  145. }
  146. if !cp.eitherEndFor(pr.advertise) {
  147. return
  148. }
  149. // 2. If this pipe if for us, we should have a pipe for it.
  150. pipe, ok := pr.activePipes[key]
  151. if !ok {
  152. log.Infof("Creating pipe %s", key)
  153. pipe = xfer.NewPipe()
  154. pr.activePipes[key] = pipe
  155. for _, pw := range pr.pipeWaiters[key] {
  156. pw <- pipe
  157. }
  158. delete(pr.pipeWaiters, key)
  159. }
  160. // 3. Ensure there is a bridging connection for this pipe.
  161. // Semantics are the owner of the UIEnd connects to the owner of the ProbeEnd
  162. shouldBridge := cp.DeletedAt.IsZero() &&
  163. cp.addrFor(app.UIEnd) != cp.addrFor(app.ProbeEnd) &&
  164. cp.addrFor(app.UIEnd) == pr.advertise &&
  165. cp.addrFor(app.ProbeEnd) != ""
  166. bridge, ok := pr.bridges[key]
  167. // If we shouldn't be bridging but are, or we should be bridging but are pointing
  168. // at the wrong place, stop the current bridge.
  169. if (!shouldBridge && ok) || (shouldBridge && ok && bridge.addr != cp.addrFor(app.ProbeEnd)) {
  170. delete(pr.bridges, key)
  171. bridge.stop()
  172. ok = false
  173. }
  174. // If we should be bridging and are not, start a new bridge
  175. if shouldBridge && !ok {
  176. bridge = newBridgeConnection(key, cp.addrFor(app.ProbeEnd), pipe)
  177. pr.bridges[key] = bridge
  178. }
  179. }
  180. func (pr *consulPipeRouter) getPipe(key string) xfer.Pipe {
  181. pc := make(chan xfer.Pipe)
  182. select {
  183. case pr.actorChan <- func() { pc <- pr.activePipes[key] }:
  184. return <-pc
  185. case <-pr.quit:
  186. return nil
  187. }
  188. }
  189. func (pr *consulPipeRouter) waitForPipe(key string) xfer.Pipe {
  190. pc := make(chan xfer.Pipe)
  191. select {
  192. case pr.actorChan <- func() {
  193. pipe, ok := pr.activePipes[key]
  194. if ok {
  195. pc <- pipe
  196. } else {
  197. pr.pipeWaiters[key] = append(pr.pipeWaiters[key], pc)
  198. }
  199. }:
  200. return <-pc
  201. case <-pr.quit:
  202. return nil
  203. }
  204. }
  205. func (pr *consulPipeRouter) privateAPI() {
  206. router := mux.NewRouter()
  207. router.Methods("GET").
  208. MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
  209. HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  210. key := mux.Vars(r)["key"]
  211. log.Infof("%s: Server bridge connection started", key)
  212. defer log.Infof("%s: Server bridge connection stopped", key)
  213. pipe := pr.getPipe(key)
  214. if pipe == nil {
  215. log.Errorf("%s: Server bridge connection; Unknown pipe!", key)
  216. w.WriteHeader(http.StatusNotFound)
  217. return
  218. }
  219. conn, err := xfer.Upgrade(w, r, nil)
  220. if err != nil {
  221. log.Errorf("%s: Server bridge connection; Error upgrading to websocket: %v", key, err)
  222. return
  223. }
  224. defer conn.Close()
  225. end, _ := pipe.Ends()
  226. if _, err := pipe.CopyToWebsocket(end, conn); err != nil {
  227. log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err)
  228. }
  229. })
  230. handler := middleware.Tracer{RouteMatcher: router}.Wrap(router)
  231. log.Infof("Serving private API on endpoint %s.", pr.advertise)
  232. log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, handler))
  233. }
  234. func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
  235. userID, err := pr.userIDer(ctx)
  236. if err != nil {
  237. return false, err
  238. }
  239. key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
  240. consulPipe := consulPipe{}
  241. err = pr.client.Get(ctx, key, &consulPipe)
  242. if err == ErrNotFound {
  243. return false, nil
  244. } else if err != nil {
  245. return false, err
  246. }
  247. return consulPipe.DeletedAt.IsZero(), nil
  248. }
  249. func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer.Pipe, io.ReadWriter, error) {
  250. userID, err := pr.userIDer(ctx)
  251. if err != nil {
  252. return nil, nil, err
  253. }
  254. key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
  255. log.Infof("Get %s:%s", key, e)
  256. span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Get", opentracing.Tag{Key: "key", Value: key})
  257. defer span.Finish()
  258. // Try to ensure the given end of the given pipe
  259. // is 'owned' by this pipe service replica in consul.
  260. err = pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
  261. var pipe *consulPipe
  262. if in == nil {
  263. pipe = &consulPipe{
  264. CreatedAt: mtime.Now(),
  265. }
  266. } else {
  267. pipe = in.(*consulPipe)
  268. }
  269. if !pipe.DeletedAt.IsZero() {
  270. return nil, false, fmt.Errorf("Pipe %s has been deleted", key)
  271. }
  272. end := pipe.addrFor(e)
  273. if end != "" && end != pr.advertise {
  274. return nil, true, fmt.Errorf("Error: Pipe %s has existing connection to %s", key, end)
  275. }
  276. pipe.setAddrFor(e, pr.advertise)
  277. pipe.acquire(e)
  278. return pipe, false, nil
  279. })
  280. if err != nil {
  281. return nil, nil, err
  282. }
  283. pipe := pr.waitForPipe(key)
  284. myEnd, _ := pipe.Ends()
  285. if e == app.ProbeEnd {
  286. _, myEnd = pipe.Ends()
  287. }
  288. return pipe, myEnd, nil
  289. }
  290. func (pr *consulPipeRouter) Release(ctx context.Context, id string, e app.End) error {
  291. userID, err := pr.userIDer(ctx)
  292. if err != nil {
  293. return err
  294. }
  295. key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
  296. log.Infof("Release %s:%s", key, e)
  297. span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Release", opentracing.Tag{Key: "key", Value: key})
  298. defer span.Finish()
  299. // atomically clear my end of the pipe in consul
  300. return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
  301. if in == nil {
  302. return nil, false, fmt.Errorf("pipe %s not found", id)
  303. }
  304. p := in.(*consulPipe)
  305. if p.addrFor(e) != pr.advertise {
  306. return nil, false, fmt.Errorf("pipe %s not owned by us", id)
  307. }
  308. refs := p.release(e)
  309. if refs == 0 {
  310. p.setAddrFor(e, "")
  311. }
  312. return p, true, nil
  313. })
  314. }
  315. func (pr *consulPipeRouter) Delete(ctx context.Context, id string) error {
  316. userID, err := pr.userIDer(ctx)
  317. if err != nil {
  318. return err
  319. }
  320. key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
  321. log.Infof("Delete %s", key)
  322. span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Delete", opentracing.Tag{Key: "key", Value: key})
  323. defer span.Finish()
  324. return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
  325. if in == nil {
  326. return nil, false, fmt.Errorf("Pipe %s not found", id)
  327. }
  328. p := in.(*consulPipe)
  329. p.DeletedAt = mtime.Now()
  330. return p, false, nil
  331. })
  332. }
  333. // A bridgeConnection represents a connection between two pipe router replicas.
  334. // They are created & destroyed in response to events from consul, which in turn
  335. // are triggered when UIs or Probes connect to various pipe routers.
  336. type bridgeConnection struct {
  337. key string
  338. addr string // address to connect to
  339. pipe xfer.Pipe
  340. mtx sync.Mutex
  341. conn xfer.Websocket
  342. stopped bool
  343. wait sync.WaitGroup
  344. }
  345. func newBridgeConnection(key, addr string, pipe xfer.Pipe) *bridgeConnection {
  346. log.Infof("%s: Starting client bridge connection", key)
  347. result := &bridgeConnection{
  348. key: key,
  349. addr: addr,
  350. pipe: pipe,
  351. }
  352. result.wait.Add(1)
  353. go result.loop()
  354. return result
  355. }
  356. func (bc *bridgeConnection) stop() {
  357. log.Infof("%s: Stopping client bridge connection", bc.key)
  358. bc.mtx.Lock()
  359. bc.stopped = true
  360. if bc.conn != nil {
  361. bc.conn.Close()
  362. end, _ := bc.pipe.Ends()
  363. end.Write(nil) // this will cause the other end of wake up and exit
  364. }
  365. bc.mtx.Unlock()
  366. bc.wait.Wait()
  367. }
  368. func (bc *bridgeConnection) loop() {
  369. log.Infof("%s: Client bridge connection started", bc.key)
  370. defer bc.wait.Done()
  371. defer log.Infof("%s: Client bridge connection stopped", bc.key)
  372. _, end := bc.pipe.Ends()
  373. url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))
  374. for {
  375. bc.mtx.Lock()
  376. bc.conn = nil
  377. if bc.stopped {
  378. bc.mtx.Unlock()
  379. return
  380. }
  381. bc.mtx.Unlock()
  382. // connect to other pipes instance
  383. conn, _, err := xfer.DialWS(wsDialer, url, http.Header{})
  384. if err != nil {
  385. log.Errorf("%s: Client bridge connection; Error connecting to %s: %v", bc.key, url, err)
  386. time.Sleep(time.Second) // TODO backoff
  387. continue
  388. }
  389. bc.mtx.Lock()
  390. if bc.stopped {
  391. bc.mtx.Unlock()
  392. conn.Close()
  393. return
  394. }
  395. bc.conn = conn
  396. bc.mtx.Unlock()
  397. if _, err := bc.pipe.CopyToWebsocket(end, conn); err != nil {
  398. log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err)
  399. }
  400. conn.Close()
  401. }
  402. }