wavefront_parser.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver"
  4. import (
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go.opentelemetry.io/collector/pdata/pcommon"
  10. "go.opentelemetry.io/collector/pdata/pmetric"
  11. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
  13. )
  14. // WavefrontParser converts metrics in the Wavefront format, see
  15. // https://docs.wavefront.com/wavefront_data_format.html#metrics-data-format-syntax,
  16. // into the internal format of the Collector
  17. type WavefrontParser struct {
  18. ExtractCollectdTags bool `mapstructure:"extract_collectd_tags"`
  19. }
  20. var _ (protocol.Parser) = (*WavefrontParser)(nil)
  21. var _ (protocol.ParserConfig) = (*WavefrontParser)(nil)
  22. // Only two chars can be espcaped per Wavafront SDK, see
  23. // https://github.com/wavefrontHQ/wavefront-sdk-go/blob/2c5891318fcd83c35c93bba2b411640495473333/senders/formatter.go#L20
  24. var escapedCharReplacer = strings.NewReplacer(
  25. `\"`, `"`, // Replaces escaped double-quotes
  26. `\n`, "\n", // Repaces escaped new-line.
  27. )
  28. // BuildParser creates a new Parser instance that receives Wavefront metric data.
  29. func (wp *WavefrontParser) BuildParser() (protocol.Parser, error) {
  30. return wp, nil
  31. }
  32. // Parse receives the string with Wavefront metric data, and transforms it to
  33. // the collector metric format. See
  34. // https://docs.wavefront.com/wavefront_data_format.html#metrics-data-format-syntax.
  35. //
  36. // Each line received represents a Wavefront metric in the following format:
  37. //
  38. // "<metricName> <metricValue> [<timestamp>] source=<source> [pointTags]"
  39. //
  40. // Detailed description of each element is available on the link above.
  41. func (wp *WavefrontParser) Parse(line string) (pmetric.Metric, error) {
  42. parts := strings.SplitN(line, " ", 3)
  43. if len(parts) < 3 {
  44. return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]", line)
  45. }
  46. metricName := unDoubleQuote(parts[0])
  47. if metricName == "" {
  48. return pmetric.Metric{}, fmt.Errorf("empty name for wavefront metric [%s]", line)
  49. }
  50. valueStr := parts[1]
  51. rest := parts[2]
  52. parts = strings.SplitN(rest, " ", 2)
  53. timestampStr := parts[0]
  54. var tags string
  55. if len(parts) == 2 {
  56. tags = parts[1]
  57. }
  58. var ts time.Time
  59. if unixTime, err := strconv.ParseInt(timestampStr, 10, 64); err == nil {
  60. ts = time.Unix(unixTime, 0)
  61. } else {
  62. // Timestamp can be omitted so it is only correct if the string was a tag.
  63. if strings.IndexByte(timestampStr, '=') == -1 {
  64. return pmetric.Metric{}, fmt.Errorf(
  65. "invalid timestamp for wavefront metric [%s]", line)
  66. }
  67. // Assume timestamp was omitted, get current time and adjust index.
  68. ts = time.Now()
  69. tags = rest
  70. }
  71. attributes := pcommon.NewMap()
  72. if tags != "" {
  73. // to need for special treatment for source, treat it as a normal tag since
  74. // tags are separated by space and are optionally double-quoted.
  75. if err := buildLabels(attributes, tags); err != nil {
  76. return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]: %w", line, err)
  77. }
  78. }
  79. if wp.ExtractCollectdTags {
  80. metricName = wp.injectCollectDLabels(metricName, attributes)
  81. }
  82. metric := pmetric.NewMetric()
  83. metric.SetName(metricName)
  84. dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
  85. dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
  86. attributes.CopyTo(dp.Attributes())
  87. if intVal, err := strconv.ParseInt(valueStr, 10, 64); err == nil {
  88. dp.SetIntValue(intVal)
  89. } else {
  90. dblVal, err := strconv.ParseFloat(valueStr, 64)
  91. if err != nil {
  92. return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric value [%s]: %w", line, err)
  93. }
  94. dp.SetDoubleValue(dblVal)
  95. }
  96. return metric, nil
  97. }
  98. func (wp *WavefrontParser) injectCollectDLabels(
  99. metricName string,
  100. attributes pcommon.Map,
  101. ) string {
  102. // This comes from SignalFx Gateway code that has the capability to
  103. // remove CollectD tags from the name of the metric.
  104. var toAddDims map[string]string
  105. index := strings.Index(metricName, "..")
  106. for {
  107. metricName, toAddDims = collectd.LabelsFromName(&metricName)
  108. if len(toAddDims) == 0 {
  109. if index == -1 {
  110. metricName = strings.ReplaceAll(metricName, "..", ".")
  111. }
  112. break
  113. }
  114. for k, v := range toAddDims {
  115. attributes.PutStr(k, v)
  116. }
  117. }
  118. return metricName
  119. }
  120. func buildLabels(attributes pcommon.Map, tags string) (err error) {
  121. if tags == "" {
  122. return
  123. }
  124. for {
  125. parts := strings.SplitN(tags, "=", 2)
  126. if len(parts) != 2 {
  127. return fmt.Errorf("failed to break key for [%s]", tags)
  128. }
  129. key := parts[0]
  130. rest := parts[1]
  131. tagLen := len(key) + 1 // Length of key plus separator and yet to be determined length of the value.
  132. var value string
  133. if len(rest) > 1 && rest[0] == '"' {
  134. // Skip until non-escaped double quote.
  135. foundEscape := false
  136. i := 1
  137. for ; i < len(rest); i++ {
  138. if rest[i] != '"' && rest[i] != 'n' {
  139. continue
  140. }
  141. isPrevCharEscape := rest[i-1] == '\\'
  142. if rest[i] == '"' && !isPrevCharEscape {
  143. // Non-escaped double-quote, it is the end of the value.
  144. break
  145. }
  146. foundEscape = foundEscape || isPrevCharEscape
  147. }
  148. value = rest[1:i]
  149. tagLen += len(value) + 2 // plus 2 to account for the double-quotes.
  150. if foundEscape {
  151. // Per implementation of Wavefront SDK only double-quotes and
  152. // newline characters are escaped. See the link below:
  153. // https://github.com/wavefrontHQ/wavefront-sdk-go/blob/2c5891318fcd83c35c93bba2b411640495473333/senders/formatter.go#L20
  154. value = escapedCharReplacer.Replace(value)
  155. }
  156. } else {
  157. // Skip until space.
  158. i := 0
  159. for ; i < len(rest) && rest[i] != ' '; i++ { // nolint
  160. }
  161. value = rest[:i]
  162. tagLen += i
  163. }
  164. attributes.PutStr(key, value)
  165. tags = strings.TrimLeft(tags[tagLen:], " ")
  166. if tags == "" {
  167. break
  168. }
  169. }
  170. return
  171. }
  172. func unDoubleQuote(s string) string {
  173. n := len(s)
  174. if n < 2 {
  175. return s
  176. }
  177. if s[0] == '"' && s[n-1] == '"' {
  178. return s[1 : n-1]
  179. }
  180. return s
  181. }