123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package xfer
- import (
- "io"
- "net/http"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- log "github.com/sirupsen/logrus"
- "github.com/ugorji/go/codec"
- "github.com/weaveworks/common/mtime"
- )
- const (
- // Time allowed to write a message to the peer.
- writeWait = 10 * time.Second
- // Time allowed to read the next pong message from the peer. Needs to be less
- // than the idle timeout on whatever frontend server is proxying the
- // websocket connections (e.g. nginx).
- pongWait = 60 * time.Second
- // Send pings to peer with this period. Must be less than pongWait. The peer
- // must respond with a pong in < pongWait. But it may take writeWait for the
- // pong to be sent. Therefore we want to allow time for that, and a bit of
- // delay/round-trip in case the peer is busy. 1/3 of pongWait seems like a
- // reasonable amount of time to respond to a ping.
- pingPeriod = ((pongWait - writeWait) * 2 / 3)
- )
- // Websocket exposes the bits of *websocket.Conn we actually use.
- type Websocket interface {
- ReadMessage() (messageType int, p []byte, err error)
- WriteMessage(messageType int, data []byte) error
- ReadJSON(v interface{}) error
- WriteJSON(v interface{}) error
- Close() error
- }
- type pingingWebsocket struct {
- pinger *time.Timer
- readLock sync.Mutex
- writeLock sync.Mutex
- conn *websocket.Conn
- }
- var upgrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool { return true },
- }
- // Upgrade upgrades the HTTP server connection to the WebSocket protocol.
- func Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (Websocket, error) {
- wsConn, err := upgrader.Upgrade(w, r, responseHeader)
- if err != nil {
- return nil, err
- }
- return Ping(wsConn), nil
- }
- // WSDialer can dial a new websocket
- type WSDialer interface {
- Dial(urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error)
- }
- // DialWS creates a new client connection. Use requestHeader to specify the
- // origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies (Cookie).
- // Use the response.Header to get the selected subprotocol
- // (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
- func DialWS(d WSDialer, urlStr string, requestHeader http.Header) (Websocket, *http.Response, error) {
- wsConn, resp, err := d.Dial(urlStr, requestHeader)
- if err != nil {
- return nil, resp, err
- }
- return Ping(wsConn), resp, nil
- }
- // Ping adds a periodic ping to a websocket connection.
- func Ping(c *websocket.Conn) Websocket {
- p := &pingingWebsocket{conn: c}
- p.conn.SetPongHandler(p.pong)
- p.conn.SetReadDeadline(mtime.Now().Add(pongWait))
- p.pinger = time.AfterFunc(pingPeriod, p.ping)
- return p
- }
- func (p *pingingWebsocket) ping() {
- p.writeLock.Lock()
- defer p.writeLock.Unlock()
- if err := p.conn.WriteControl(websocket.PingMessage, nil, mtime.Now().Add(writeWait)); err != nil {
- log.Errorf("websocket ping error: %v", err)
- p.conn.Close()
- return
- }
- p.pinger.Reset(pingPeriod)
- }
- func (p *pingingWebsocket) pong(string) error {
- p.conn.SetReadDeadline(mtime.Now().Add(pongWait))
- return nil
- }
- // ReadMessage is a helper method for getting a reader using NextReader and
- // reading from that reader to a buffer.
- func (p *pingingWebsocket) ReadMessage() (int, []byte, error) {
- p.readLock.Lock()
- defer p.readLock.Unlock()
- return p.conn.ReadMessage()
- }
- // WriteMessage is a helper method for getting a writer using NextWriter,
- // writing the message and closing the writer.
- func (p *pingingWebsocket) WriteMessage(messageType int, data []byte) error {
- p.writeLock.Lock()
- defer p.writeLock.Unlock()
- if err := p.conn.SetWriteDeadline(mtime.Now().Add(writeWait)); err != nil {
- return err
- }
- return p.conn.WriteMessage(messageType, data)
- }
- // WriteJSON writes the JSON encoding of v to the connection.
- func (p *pingingWebsocket) WriteJSON(v interface{}) error {
- p.writeLock.Lock()
- defer p.writeLock.Unlock()
- w, err := p.conn.NextWriter(websocket.TextMessage)
- if err != nil {
- return err
- }
- if err := p.conn.SetWriteDeadline(mtime.Now().Add(writeWait)); err != nil {
- return err
- }
- err1 := codec.NewEncoder(w, &codec.JsonHandle{}).Encode(v)
- err2 := w.Close()
- if err1 != nil {
- return err1
- }
- return err2
- }
- // ReadJSON reads the next JSON-encoded message from the connection and stores
- // it in the value pointed to by v.
- func (p *pingingWebsocket) ReadJSON(v interface{}) error {
- p.readLock.Lock()
- defer p.readLock.Unlock()
- _, r, err := p.conn.NextReader()
- if err != nil {
- return err
- }
- err = codec.NewDecoder(r, &codec.JsonHandle{}).Decode(v)
- if err == io.EOF {
- // One value is expected in the message.
- err = io.ErrUnexpectedEOF
- }
- return err
- }
- // Close closes the connection
- func (p *pingingWebsocket) Close() error {
- p.writeLock.Lock()
- defer p.writeLock.Unlock()
- p.pinger.Stop()
- return p.conn.Close()
- }
- // IsExpectedWSCloseError returns boolean indicating whether the error is a
- // clean disconnection.
- func IsExpectedWSCloseError(err error) bool {
- return err == io.EOF || err == io.ErrClosedPipe || websocket.IsCloseError(err,
- websocket.CloseNormalClosure,
- websocket.CloseGoingAway,
- websocket.CloseNoStatusReceived,
- websocket.CloseAbnormalClosure,
- )
- }
|