pipes_internal_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package app
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "net/http/httptest"
  8. "net/url"
  9. "strings"
  10. "testing"
  11. "time"
  12. "context"
  13. "github.com/gorilla/mux"
  14. "github.com/gorilla/websocket"
  15. "github.com/weaveworks/common/mtime"
  16. "github.com/weaveworks/scope/common/xfer"
  17. "github.com/weaveworks/scope/probe/appclient"
  18. "github.com/weaveworks/scope/probe/controls"
  19. "github.com/weaveworks/scope/test"
  20. )
  21. func TestPipeTimeout(t *testing.T) {
  22. router := mux.NewRouter()
  23. pr := NewLocalPipeRouter().(*localPipeRouter)
  24. RegisterPipeRoutes(router, pr)
  25. pr.Stop() // we don't want the loop running in the background
  26. mtime.NowForce(time.Now())
  27. defer mtime.NowReset()
  28. // create a new pipe.
  29. id := "foo"
  30. ctx := context.Background()
  31. pipe, _, err := pr.Get(ctx, id, UIEnd)
  32. if err != nil {
  33. t.Fatalf("not ok: %v", err)
  34. }
  35. // move time forward such that the new pipe should timeout
  36. mtime.NowForce(mtime.Now().Add(pipeTimeout))
  37. pr.timeout()
  38. if !pipe.Closed() {
  39. t.Fatalf("pipe didn't timeout")
  40. }
  41. // move time forward such that the pipe should be GCd
  42. mtime.NowForce(mtime.Now().Add(gcTimeout))
  43. pr.garbageCollect()
  44. if _, ok := pr.pipes[id]; ok {
  45. t.Fatalf("pipe not gc'd")
  46. }
  47. }
  48. type adapter struct {
  49. c appclient.AppClient
  50. }
  51. func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
  52. a.c.PipeConnection(pipeID, pipe)
  53. return nil
  54. }
  55. func (a adapter) PipeClose(_, pipeID string) error {
  56. return a.c.PipeClose(pipeID)
  57. }
  58. func TestPipeClose(t *testing.T) {
  59. router := mux.NewRouter()
  60. pr := NewLocalPipeRouter()
  61. RegisterPipeRoutes(router, pr)
  62. defer pr.Stop()
  63. server := httptest.NewServer(router)
  64. defer server.Close()
  65. ip, port, err := net.SplitHostPort(strings.TrimPrefix(server.URL, "http://"))
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. probeConfig := appclient.ProbeConfig{
  70. ProbeID: "foo",
  71. }
  72. url := url.URL{Scheme: "http", Host: ip + ":" + port}
  73. client, err := appclient.NewAppClient(probeConfig, ip+":"+port, url, nil)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. defer client.Stop()
  78. // this is the probe end of the pipe
  79. pipeID, pipe, err := controls.NewPipe(adapter{client}, "appid")
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. // this is a client to the app
  84. pipeURL := fmt.Sprintf("ws://%s:%s/api/pipe/%s", ip, port, pipeID)
  85. conn, _, err := websocket.DefaultDialer.Dial(pipeURL, http.Header{})
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. // Send something from pipe -> app -> conn
  90. local, _ := pipe.Ends()
  91. msg := []byte("hello world")
  92. if _, err := local.Write(msg); err != nil {
  93. t.Fatal(err)
  94. }
  95. if _, buf, err := conn.ReadMessage(); err != nil {
  96. t.Fatal(err)
  97. } else if !bytes.Equal(buf, msg) {
  98. t.Fatalf("%v != %v", buf, msg)
  99. }
  100. // Send something from conn -> app -> probe
  101. msg = []byte("goodbye, cruel world")
  102. if err := conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
  103. t.Fatal(err)
  104. }
  105. buf := make([]byte, 1024)
  106. if n, err := local.Read(buf); err != nil {
  107. t.Fatal(err)
  108. } else if !bytes.Equal(msg, buf[:n]) {
  109. t.Fatalf("%v != %v", buf, msg)
  110. }
  111. // Now delete the pipe
  112. if err := pipe.Close(); err != nil {
  113. t.Fatal(err)
  114. }
  115. // the client backs off for 1 second before trying to reconnect the pipe,
  116. // so we need to wait for longer.
  117. test.Poll(t, 2*time.Second, true, func() interface{} {
  118. return pipe.Closed()
  119. })
  120. }