util.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal"
  4. import (
  5. "bytes"
  6. "fmt"
  7. "io"
  8. "github.com/gogo/protobuf/proto"
  9. "github.com/golang/snappy"
  10. )
  11. const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"
  12. // parseProtoReader parses a compressed proto from an io.Reader.
  13. func parseProtoReader(reader io.Reader, expectedSize, maxSize int, req proto.Message) error {
  14. body, err := decompressRequest(reader, expectedSize, maxSize)
  15. if err != nil {
  16. return err
  17. }
  18. // We re-implement proto.Unmarshal here as it calls XXX_Unmarshal first,
  19. // which we can't override without upsetting golint.
  20. req.Reset()
  21. if u, ok := req.(proto.Unmarshaler); ok {
  22. err = u.Unmarshal(body)
  23. } else {
  24. err = proto.NewBuffer(body).Unmarshal(req)
  25. }
  26. if err != nil {
  27. return err
  28. }
  29. return nil
  30. }
  31. func decompressRequest(reader io.Reader, expectedSize, maxSize int) (body []byte, err error) {
  32. defer func() {
  33. if err != nil && len(body) > maxSize {
  34. err = fmt.Errorf(messageSizeLargerErrFmt, len(body), maxSize)
  35. }
  36. }()
  37. if expectedSize > maxSize {
  38. return nil, fmt.Errorf(messageSizeLargerErrFmt, expectedSize, maxSize)
  39. }
  40. buffer, ok := tryBufferFromReader(reader)
  41. if ok {
  42. body, err = decompressFromBuffer(buffer, maxSize)
  43. return
  44. }
  45. body, err = decompressFromReader(reader, expectedSize, maxSize)
  46. return
  47. }
  48. func decompressFromReader(reader io.Reader, expectedSize, maxSize int) ([]byte, error) {
  49. var (
  50. buf bytes.Buffer
  51. body []byte
  52. err error
  53. )
  54. if expectedSize > 0 {
  55. buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
  56. }
  57. // Read from LimitReader with limit max+1. So if the underlying
  58. // reader is over limit, the result will be bigger than max.
  59. reader = io.LimitReader(reader, int64(maxSize)+1)
  60. _, err = buf.ReadFrom(reader)
  61. if err != nil {
  62. return nil, err
  63. }
  64. body, err = decompressFromBuffer(&buf, maxSize)
  65. return body, err
  66. }
  67. func decompressFromBuffer(buffer *bytes.Buffer, maxSize int) ([]byte, error) {
  68. if len(buffer.Bytes()) > maxSize {
  69. return nil, fmt.Errorf(messageSizeLargerErrFmt, len(buffer.Bytes()), maxSize)
  70. }
  71. size, err := snappy.DecodedLen(buffer.Bytes())
  72. if err != nil {
  73. return nil, err
  74. }
  75. if size > maxSize {
  76. return nil, fmt.Errorf(messageSizeLargerErrFmt, size, maxSize)
  77. }
  78. body, err := snappy.Decode(nil, buffer.Bytes())
  79. if err != nil {
  80. return nil, err
  81. }
  82. return body, nil
  83. }
  84. // tryBufferFromReader attempts to cast the reader to a `*bytes.Buffer` this is possible when using httpgrpc.
  85. // If it fails it will return nil and false.
  86. func tryBufferFromReader(reader io.Reader) (*bytes.Buffer, bool) {
  87. if bufReader, ok := reader.(interface {
  88. BytesBuffer() *bytes.Buffer
  89. }); ok && bufReader != nil {
  90. return bufReader.BytesBuffer(), true
  91. }
  92. return nil, false
  93. }