123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package multitenant
- import (
- "bytes"
- "context"
- "fmt"
- "io"
- "log"
- "math/rand"
- "testing"
- "golang.org/x/sync/errgroup"
- "github.com/weaveworks/scope/app"
- "github.com/weaveworks/scope/common/xfer"
- "github.com/weaveworks/scope/probe/appclient"
- )
- type adapter struct {
- c appclient.AppClient
- }
- func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
- a.c.PipeConnection(pipeID, pipe)
- return nil
- }
- func (a adapter) PipeClose(_, pipeID string) error {
- return a.c.PipeClose(pipeID)
- }
- type pipeconn struct {
- id string
- uiPR, probePR app.PipeRouter
- uiPipe, probePipe xfer.Pipe
- uiIO, probeIO io.ReadWriter
- }
- func (p *pipeconn) test(t *testing.T) {
- msg := []byte("hello " + p.id)
- wait := errgroup.Group{}
- wait.Go(func() error {
- // write something to the probe end
- _, err := p.probeIO.Write(msg)
- return err
- })
- wait.Go(func() error {
- // read it back off the other end
- buf := make([]byte, len(msg))
- n, err := p.uiIO.Read(buf)
- if err != nil {
- return err
- }
- if n != len(buf) {
- return fmt.Errorf("only read %d", n)
- }
- if !bytes.Equal(buf, msg) {
- return fmt.Errorf("Got: %v, Expected: %v", buf, msg)
- }
- return nil
- })
- err := wait.Wait()
- if err != nil {
- t.Fatal(err)
- }
- }
- type pipeTest struct {
- prs []app.PipeRouter
- pipes []*pipeconn
- }
- func (pt *pipeTest) newPipe(t *testing.T) {
- // make a new pipe id
- id := fmt.Sprintf("pipe-%d", rand.Int63())
- log.Printf(">>>> newPipe %s", id)
- // pick a random PR to connect app to
- uiIndex := rand.Intn(len(pt.prs))
- uiPR := pt.prs[uiIndex]
- uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd)
- if err != nil {
- t.Fatal(err)
- }
- // pick a random PR to connect probe to
- probeIndex := rand.Intn(len(pt.prs))
- for probeIndex == uiIndex {
- probeIndex = rand.Intn(len(pt.prs))
- }
- probePR := pt.prs[probeIndex]
- probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd)
- if err != nil {
- t.Fatal(err)
- }
- pipe := &pipeconn{
- id: id,
- uiPR: uiPR,
- uiPipe: uiPipe,
- uiIO: uiIO,
- probePR: probePR,
- probePipe: probePipe,
- probeIO: probeIO,
- }
- pipe.test(t)
- pt.pipes = append(pt.pipes, pipe)
- }
- func (pt *pipeTest) deletePipe(t *testing.T) {
- // pick a random pipe
- i := rand.Intn(len(pt.pipes))
- pipe := pt.pipes[i]
- log.Printf(">>>> deletePipe %s", pipe.id)
- if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
- t.Fatal(err)
- }
- if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
- t.Fatal(err)
- }
- // remove from list
- pt.pipes = pt.pipes[:i+copy(pt.pipes[i:], pt.pipes[i+1:])]
- }
- func (pt *pipeTest) reconnectPipe(t *testing.T) {
- // pick a random pipe
- pipe := pt.pipes[rand.Intn(len(pt.pipes))]
- log.Printf(">>>> reconnectPipe %s", pipe.id)
- // pick a random PR to connect to
- newPR := pt.prs[rand.Intn(len(pt.prs))]
- // pick a random end
- if rand.Float32() < 0.5 {
- if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
- t.Fatal(err)
- }
- uiPipe, uiIO, err := newPR.Get(context.Background(), pipe.id, app.UIEnd)
- if err != nil {
- t.Fatal(err)
- }
- pipe.uiPR, pipe.uiPipe, pipe.uiIO = newPR, uiPipe, uiIO
- } else {
- if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
- t.Fatal(err)
- }
- probePipe, probeIO, err := newPR.Get(context.Background(), pipe.id, app.ProbeEnd)
- if err != nil {
- t.Fatal(err)
- }
- pipe.probePR, pipe.probePipe, pipe.probeIO = newPR, probePipe, probeIO
- }
- }
- func TestPipeRouter(t *testing.T) {
- var (
- consul = newMockConsulClient()
- replicas = 2
- iterations = 10
- pt = pipeTest{}
- )
- for i := 0; i < replicas; i++ {
- pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer)
- defer pr.Stop()
- pt.prs = append(pt.prs, pr)
- }
- for i := 0; i < iterations; i++ {
- log.Printf("Iteration %d", i)
- pt.newPipe(t)
- pt.deletePipe(t)
- }
- }
- //func TestPipeHard(t *testing.T) {
- // if len(pipes) <= 0 {
- // newPipe()
- // continue
- // } else if len(pipes) >= 2 {
- // deletePipe()
- // continue
- // }
- // r := rand.Float32()
- // switch {
- // case 0.0 < r && r <= 0.3:
- // newPipe()
- // case 0.3 < r && r <= 0.6:
- // deletePipe()
- // case 0.6 < r && r <= 1.0:
- // reconnectPipe()
- // }
- //}
|