123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package framer implements simple frame decoding techniques for an io.ReadCloser
- package framer
- import (
- "encoding/binary"
- "encoding/json"
- "io"
- )
- type lengthDelimitedFrameWriter struct {
- w io.Writer
- h [4]byte
- }
- func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
- return &lengthDelimitedFrameWriter{w: w}
- }
- // Write writes a single frame to the nested writer, prepending it with the length in
- // in bytes of data (as a 4 byte, bigendian uint32).
- func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
- binary.BigEndian.PutUint32(w.h[:], uint32(len(data)))
- n, err := w.w.Write(w.h[:])
- if err != nil {
- return 0, err
- }
- if n != len(w.h) {
- return 0, io.ErrShortWrite
- }
- return w.w.Write(data)
- }
- type lengthDelimitedFrameReader struct {
- r io.ReadCloser
- remaining int
- }
- // NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed
- // frames off of a stream.
- //
- // The protocol is:
- //
- // stream: message ...
- // message: prefix body
- // prefix: 4 byte uint32 in BigEndian order, denotes length of body
- // body: bytes (0..prefix)
- //
- // If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
- // will be returned along with the number of bytes read.
- func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
- return &lengthDelimitedFrameReader{r: r}
- }
- // Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer
- // is returned and subsequent calls will attempt to read the last frame. A frame is complete when
- // err is nil.
- func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
- if r.remaining <= 0 {
- header := [4]byte{}
- n, err := io.ReadAtLeast(r.r, header[:4], 4)
- if err != nil {
- return 0, err
- }
- if n != 4 {
- return 0, io.ErrUnexpectedEOF
- }
- frameLength := int(binary.BigEndian.Uint32(header[:]))
- r.remaining = frameLength
- }
- expect := r.remaining
- max := expect
- if max > len(data) {
- max = len(data)
- }
- n, err := io.ReadAtLeast(r.r, data[:max], int(max))
- r.remaining -= n
- if err == io.ErrShortBuffer || r.remaining > 0 {
- return n, io.ErrShortBuffer
- }
- if err != nil {
- return n, err
- }
- if n != expect {
- return n, io.ErrUnexpectedEOF
- }
- return n, nil
- }
- func (r *lengthDelimitedFrameReader) Close() error {
- return r.r.Close()
- }
- type jsonFrameReader struct {
- r io.ReadCloser
- decoder *json.Decoder
- remaining []byte
- }
- // NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off
- // of a wire.
- //
- // The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate
- // the read.
- func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
- return &jsonFrameReader{
- r: r,
- decoder: json.NewDecoder(r),
- }
- }
- // ReadFrame decodes the next JSON object in the stream, or returns an error. The returned
- // byte slice will be modified the next time ReadFrame is invoked and should not be altered.
- func (r *jsonFrameReader) Read(data []byte) (int, error) {
- // Return whatever remaining data exists from an in progress frame
- if n := len(r.remaining); n > 0 {
- if n <= len(data) {
- data = append(data[0:0], r.remaining...)
- r.remaining = nil
- return n, nil
- }
- n = len(data)
- data = append(data[0:0], r.remaining[:n]...)
- r.remaining = r.remaining[n:]
- return n, io.ErrShortBuffer
- }
- // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see
- // data written to data, or be larger than data and a different array.
- n := len(data)
- m := json.RawMessage(data[:0])
- if err := r.decoder.Decode(&m); err != nil {
- return 0, err
- }
- // If capacity of data is less than length of the message, decoder will allocate a new slice
- // and set m to it, which means we need to copy the partial result back into data and preserve
- // the remaining result for subsequent reads.
- if len(m) > n {
- data = append(data[0:0], m[:n]...)
- r.remaining = m[n:]
- return n, io.ErrShortBuffer
- }
- return len(m), nil
- }
- func (r *jsonFrameReader) Close() error {
- return r.r.Close()
- }
|