consul_pipe_router_internal_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package multitenant
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "testing"
  10. "golang.org/x/sync/errgroup"
  11. "github.com/weaveworks/scope/app"
  12. "github.com/weaveworks/scope/common/xfer"
  13. "github.com/weaveworks/scope/probe/appclient"
  14. )
  15. type adapter struct {
  16. c appclient.AppClient
  17. }
  18. func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
  19. a.c.PipeConnection(pipeID, pipe)
  20. return nil
  21. }
  22. func (a adapter) PipeClose(_, pipeID string) error {
  23. return a.c.PipeClose(pipeID)
  24. }
  25. type pipeconn struct {
  26. id string
  27. uiPR, probePR app.PipeRouter
  28. uiPipe, probePipe xfer.Pipe
  29. uiIO, probeIO io.ReadWriter
  30. }
  31. func (p *pipeconn) test(t *testing.T) {
  32. msg := []byte("hello " + p.id)
  33. wait := errgroup.Group{}
  34. wait.Go(func() error {
  35. // write something to the probe end
  36. _, err := p.probeIO.Write(msg)
  37. return err
  38. })
  39. wait.Go(func() error {
  40. // read it back off the other end
  41. buf := make([]byte, len(msg))
  42. n, err := p.uiIO.Read(buf)
  43. if err != nil {
  44. return err
  45. }
  46. if n != len(buf) {
  47. return fmt.Errorf("only read %d", n)
  48. }
  49. if !bytes.Equal(buf, msg) {
  50. return fmt.Errorf("Got: %v, Expected: %v", buf, msg)
  51. }
  52. return nil
  53. })
  54. err := wait.Wait()
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. }
  59. type pipeTest struct {
  60. prs []app.PipeRouter
  61. pipes []*pipeconn
  62. }
  63. func (pt *pipeTest) newPipe(t *testing.T) {
  64. // make a new pipe id
  65. id := fmt.Sprintf("pipe-%d", rand.Int63())
  66. log.Printf(">>>> newPipe %s", id)
  67. // pick a random PR to connect app to
  68. uiIndex := rand.Intn(len(pt.prs))
  69. uiPR := pt.prs[uiIndex]
  70. uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd)
  71. if err != nil {
  72. t.Fatal(err)
  73. }
  74. // pick a random PR to connect probe to
  75. probeIndex := rand.Intn(len(pt.prs))
  76. for probeIndex == uiIndex {
  77. probeIndex = rand.Intn(len(pt.prs))
  78. }
  79. probePR := pt.prs[probeIndex]
  80. probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd)
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. pipe := &pipeconn{
  85. id: id,
  86. uiPR: uiPR,
  87. uiPipe: uiPipe,
  88. uiIO: uiIO,
  89. probePR: probePR,
  90. probePipe: probePipe,
  91. probeIO: probeIO,
  92. }
  93. pipe.test(t)
  94. pt.pipes = append(pt.pipes, pipe)
  95. }
  96. func (pt *pipeTest) deletePipe(t *testing.T) {
  97. // pick a random pipe
  98. i := rand.Intn(len(pt.pipes))
  99. pipe := pt.pipes[i]
  100. log.Printf(">>>> deletePipe %s", pipe.id)
  101. if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
  102. t.Fatal(err)
  103. }
  104. if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
  105. t.Fatal(err)
  106. }
  107. // remove from list
  108. pt.pipes = pt.pipes[:i+copy(pt.pipes[i:], pt.pipes[i+1:])]
  109. }
  110. func (pt *pipeTest) reconnectPipe(t *testing.T) {
  111. // pick a random pipe
  112. pipe := pt.pipes[rand.Intn(len(pt.pipes))]
  113. log.Printf(">>>> reconnectPipe %s", pipe.id)
  114. // pick a random PR to connect to
  115. newPR := pt.prs[rand.Intn(len(pt.prs))]
  116. // pick a random end
  117. if rand.Float32() < 0.5 {
  118. if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
  119. t.Fatal(err)
  120. }
  121. uiPipe, uiIO, err := newPR.Get(context.Background(), pipe.id, app.UIEnd)
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. pipe.uiPR, pipe.uiPipe, pipe.uiIO = newPR, uiPipe, uiIO
  126. } else {
  127. if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
  128. t.Fatal(err)
  129. }
  130. probePipe, probeIO, err := newPR.Get(context.Background(), pipe.id, app.ProbeEnd)
  131. if err != nil {
  132. t.Fatal(err)
  133. }
  134. pipe.probePR, pipe.probePipe, pipe.probeIO = newPR, probePipe, probeIO
  135. }
  136. }
  137. func TestPipeRouter(t *testing.T) {
  138. var (
  139. consul = newMockConsulClient()
  140. replicas = 2
  141. iterations = 10
  142. pt = pipeTest{}
  143. )
  144. for i := 0; i < replicas; i++ {
  145. pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer)
  146. defer pr.Stop()
  147. pt.prs = append(pt.prs, pr)
  148. }
  149. for i := 0; i < iterations; i++ {
  150. log.Printf("Iteration %d", i)
  151. pt.newPipe(t)
  152. pt.deletePipe(t)
  153. }
  154. }
  155. //func TestPipeHard(t *testing.T) {
  156. // if len(pipes) <= 0 {
  157. // newPipe()
  158. // continue
  159. // } else if len(pipes) >= 2 {
  160. // deletePipe()
  161. // continue
  162. // }
  163. // r := rand.Float32()
  164. // switch {
  165. // case 0.0 < r && r <= 0.3:
  166. // newPipe()
  167. // case 0.3 < r && r <= 0.6:
  168. // deletePipe()
  169. // case 0.6 < r && r <= 1.0:
  170. // reconnectPipe()
  171. // }
  172. //}