123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package app
- import (
- "bytes"
- "fmt"
- "net"
- "net/http"
- "net/http/httptest"
- "net/url"
- "strings"
- "testing"
- "time"
- "context"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- "github.com/weaveworks/common/mtime"
- "github.com/weaveworks/scope/common/xfer"
- "github.com/weaveworks/scope/probe/appclient"
- "github.com/weaveworks/scope/probe/controls"
- "github.com/weaveworks/scope/test"
- )
- func TestPipeTimeout(t *testing.T) {
- router := mux.NewRouter()
- pr := NewLocalPipeRouter().(*localPipeRouter)
- RegisterPipeRoutes(router, pr)
- pr.Stop() // we don't want the loop running in the background
- mtime.NowForce(time.Now())
- defer mtime.NowReset()
- // create a new pipe.
- id := "foo"
- ctx := context.Background()
- pipe, _, err := pr.Get(ctx, id, UIEnd)
- if err != nil {
- t.Fatalf("not ok: %v", err)
- }
- // move time forward such that the new pipe should timeout
- mtime.NowForce(mtime.Now().Add(pipeTimeout))
- pr.timeout()
- if !pipe.Closed() {
- t.Fatalf("pipe didn't timeout")
- }
- // move time forward such that the pipe should be GCd
- mtime.NowForce(mtime.Now().Add(gcTimeout))
- pr.garbageCollect()
- if _, ok := pr.pipes[id]; ok {
- t.Fatalf("pipe not gc'd")
- }
- }
- type adapter struct {
- c appclient.AppClient
- }
- func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
- a.c.PipeConnection(pipeID, pipe)
- return nil
- }
- func (a adapter) PipeClose(_, pipeID string) error {
- return a.c.PipeClose(pipeID)
- }
- func TestPipeClose(t *testing.T) {
- router := mux.NewRouter()
- pr := NewLocalPipeRouter()
- RegisterPipeRoutes(router, pr)
- defer pr.Stop()
- server := httptest.NewServer(router)
- defer server.Close()
- ip, port, err := net.SplitHostPort(strings.TrimPrefix(server.URL, "http://"))
- if err != nil {
- t.Fatal(err)
- }
- probeConfig := appclient.ProbeConfig{
- ProbeID: "foo",
- }
- url := url.URL{Scheme: "http", Host: ip + ":" + port}
- client, err := appclient.NewAppClient(probeConfig, ip+":"+port, url, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer client.Stop()
- // this is the probe end of the pipe
- pipeID, pipe, err := controls.NewPipe(adapter{client}, "appid")
- if err != nil {
- t.Fatal(err)
- }
- // this is a client to the app
- pipeURL := fmt.Sprintf("ws://%s:%s/api/pipe/%s", ip, port, pipeID)
- conn, _, err := websocket.DefaultDialer.Dial(pipeURL, http.Header{})
- if err != nil {
- t.Fatal(err)
- }
- // Send something from pipe -> app -> conn
- local, _ := pipe.Ends()
- msg := []byte("hello world")
- if _, err := local.Write(msg); err != nil {
- t.Fatal(err)
- }
- if _, buf, err := conn.ReadMessage(); err != nil {
- t.Fatal(err)
- } else if !bytes.Equal(buf, msg) {
- t.Fatalf("%v != %v", buf, msg)
- }
- // Send something from conn -> app -> probe
- msg = []byte("goodbye, cruel world")
- if err := conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
- t.Fatal(err)
- }
- buf := make([]byte, 1024)
- if n, err := local.Read(buf); err != nil {
- t.Fatal(err)
- } else if !bytes.Equal(msg, buf[:n]) {
- t.Fatalf("%v != %v", buf, msg)
- }
- // Now delete the pipe
- if err := pipe.Close(); err != nil {
- t.Fatal(err)
- }
- // the client backs off for 1 second before trying to reconnect the pipe,
- // so we need to wait for longer.
- test.Poll(t, 2*time.Second, true, func() interface{} {
- return pipe.Closed()
- })
- }
|