unmarshal.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal"
  4. import (
  5. "io"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "unsafe"
  11. "github.com/buger/jsonparser"
  12. "github.com/grafana/loki/pkg/push"
  13. jsoniter "github.com/json-iterator/go"
  14. )
  15. // PushRequest models a log stream push but is unmarshalled to proto push format.
  16. type PushRequest struct {
  17. Streams []Stream `json:"streams"`
  18. }
  19. // Stream helps with unmarshalling of each log stream for push request.
  20. type Stream push.Stream
  21. func (s *Stream) UnmarshalJSON(data []byte) error {
  22. err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
  23. switch string(key) {
  24. case "stream":
  25. var labels LabelSet
  26. if err := labels.UnmarshalJSON(val); err != nil {
  27. return err
  28. }
  29. s.Labels = labels.String()
  30. case "values":
  31. if ty == jsonparser.Null {
  32. return nil
  33. }
  34. entries, err := unmarshalHTTPToLogProtoEntries(val)
  35. if err != nil {
  36. return err
  37. }
  38. s.Entries = entries
  39. }
  40. return nil
  41. })
  42. return err
  43. }
  44. func unmarshalHTTPToLogProtoEntries(data []byte) ([]push.Entry, error) {
  45. var (
  46. entries []push.Entry
  47. parseError error
  48. )
  49. if _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
  50. if err != nil || parseError != nil {
  51. return
  52. }
  53. if ty == jsonparser.Null {
  54. return
  55. }
  56. e, err := unmarshalHTTPToLogProtoEntry(value)
  57. if err != nil {
  58. parseError = err
  59. return
  60. }
  61. entries = append(entries, e)
  62. }); err != nil {
  63. parseError = err
  64. }
  65. if parseError != nil {
  66. return nil, parseError
  67. }
  68. return entries, nil
  69. }
  70. func unmarshalHTTPToLogProtoEntry(data []byte) (push.Entry, error) {
  71. var (
  72. i int
  73. parseError error
  74. e push.Entry
  75. )
  76. _, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
  77. // assert that both items in array are of type string
  78. if (i == 0 || i == 1) && t != jsonparser.String {
  79. parseError = jsonparser.MalformedStringError
  80. return
  81. } else if i == 2 && t != jsonparser.Object {
  82. parseError = jsonparser.MalformedObjectError
  83. return
  84. }
  85. switch i {
  86. case 0: // timestamp
  87. ts, err := jsonparser.ParseInt(value)
  88. if err != nil {
  89. parseError = err
  90. return
  91. }
  92. e.Timestamp = time.Unix(0, ts)
  93. case 1: // value
  94. v, err := jsonparser.ParseString(value)
  95. if err != nil {
  96. parseError = err
  97. return
  98. }
  99. e.Line = v
  100. case 2: // structuredMetadata
  101. var structuredMetadata []push.LabelAdapter
  102. err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
  103. if dataType != jsonparser.String {
  104. return jsonparser.MalformedStringError
  105. }
  106. structuredMetadata = append(structuredMetadata, push.LabelAdapter{
  107. Name: string(key),
  108. Value: string(val),
  109. })
  110. return nil
  111. })
  112. if err != nil {
  113. parseError = err
  114. return
  115. }
  116. e.StructuredMetadata = structuredMetadata
  117. }
  118. i++
  119. })
  120. if parseError != nil {
  121. return e, parseError
  122. }
  123. return e, err
  124. }
  125. // LabelSet is a key/value pair mapping of labels
  126. type LabelSet map[string]string
  127. func (l *LabelSet) UnmarshalJSON(data []byte) error {
  128. if *l == nil {
  129. *l = make(LabelSet)
  130. }
  131. return jsonparser.ObjectEach(data, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
  132. v, err := jsonparser.ParseString(val)
  133. if err != nil {
  134. return err
  135. }
  136. k, err := jsonparser.ParseString(key)
  137. if err != nil {
  138. return err
  139. }
  140. (*l)[k] = v
  141. return nil
  142. })
  143. }
  144. // String implements the Stringer interface. It returns a formatted/sorted set of label key/value pairs.
  145. func (l LabelSet) String() string {
  146. var b strings.Builder
  147. keys := make([]string, 0, len(l))
  148. for k := range l {
  149. keys = append(keys, k)
  150. }
  151. sort.Strings(keys)
  152. b.WriteByte('{')
  153. for i, k := range keys {
  154. if i > 0 {
  155. b.WriteByte(',')
  156. b.WriteByte(' ')
  157. }
  158. b.WriteString(k)
  159. b.WriteByte('=')
  160. b.WriteString(strconv.Quote(l[k]))
  161. }
  162. b.WriteByte('}')
  163. return b.String()
  164. }
  165. // decodePushRequest directly decodes json to a push.PushRequest
  166. func decodePushRequest(b io.Reader, r *push.PushRequest) error {
  167. var request PushRequest
  168. if err := jsoniter.NewDecoder(b).Decode(&request); err != nil {
  169. return err
  170. }
  171. *r = push.PushRequest{
  172. Streams: *(*[]push.Stream)(unsafe.Pointer(&request.Streams)),
  173. }
  174. return nil
  175. }