123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver"
- import (
- "fmt"
- "strconv"
- "strings"
- "time"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
- )
- // WavefrontParser converts metrics in the Wavefront format, see
- // https://docs.wavefront.com/wavefront_data_format.html#metrics-data-format-syntax,
- // into the internal format of the Collector
- type WavefrontParser struct {
- ExtractCollectdTags bool `mapstructure:"extract_collectd_tags"`
- }
- var _ (protocol.Parser) = (*WavefrontParser)(nil)
- var _ (protocol.ParserConfig) = (*WavefrontParser)(nil)
- // Only two chars can be espcaped per Wavafront SDK, see
- // https://github.com/wavefrontHQ/wavefront-sdk-go/blob/2c5891318fcd83c35c93bba2b411640495473333/senders/formatter.go#L20
- var escapedCharReplacer = strings.NewReplacer(
- `\"`, `"`, // Replaces escaped double-quotes
- `\n`, "\n", // Repaces escaped new-line.
- )
- // BuildParser creates a new Parser instance that receives Wavefront metric data.
- func (wp *WavefrontParser) BuildParser() (protocol.Parser, error) {
- return wp, nil
- }
- // Parse receives the string with Wavefront metric data, and transforms it to
- // the collector metric format. See
- // https://docs.wavefront.com/wavefront_data_format.html#metrics-data-format-syntax.
- //
- // Each line received represents a Wavefront metric in the following format:
- //
- // "<metricName> <metricValue> [<timestamp>] source=<source> [pointTags]"
- //
- // Detailed description of each element is available on the link above.
- func (wp *WavefrontParser) Parse(line string) (pmetric.Metric, error) {
- parts := strings.SplitN(line, " ", 3)
- if len(parts) < 3 {
- return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]", line)
- }
- metricName := unDoubleQuote(parts[0])
- if metricName == "" {
- return pmetric.Metric{}, fmt.Errorf("empty name for wavefront metric [%s]", line)
- }
- valueStr := parts[1]
- rest := parts[2]
- parts = strings.SplitN(rest, " ", 2)
- timestampStr := parts[0]
- var tags string
- if len(parts) == 2 {
- tags = parts[1]
- }
- var ts time.Time
- if unixTime, err := strconv.ParseInt(timestampStr, 10, 64); err == nil {
- ts = time.Unix(unixTime, 0)
- } else {
- // Timestamp can be omitted so it is only correct if the string was a tag.
- if strings.IndexByte(timestampStr, '=') == -1 {
- return pmetric.Metric{}, fmt.Errorf(
- "invalid timestamp for wavefront metric [%s]", line)
- }
- // Assume timestamp was omitted, get current time and adjust index.
- ts = time.Now()
- tags = rest
- }
- attributes := pcommon.NewMap()
- if tags != "" {
- // to need for special treatment for source, treat it as a normal tag since
- // tags are separated by space and are optionally double-quoted.
- if err := buildLabels(attributes, tags); err != nil {
- return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]: %w", line, err)
- }
- }
- if wp.ExtractCollectdTags {
- metricName = wp.injectCollectDLabels(metricName, attributes)
- }
- metric := pmetric.NewMetric()
- metric.SetName(metricName)
- dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
- dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
- attributes.CopyTo(dp.Attributes())
- if intVal, err := strconv.ParseInt(valueStr, 10, 64); err == nil {
- dp.SetIntValue(intVal)
- } else {
- dblVal, err := strconv.ParseFloat(valueStr, 64)
- if err != nil {
- return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric value [%s]: %w", line, err)
- }
- dp.SetDoubleValue(dblVal)
- }
- return metric, nil
- }
- func (wp *WavefrontParser) injectCollectDLabels(
- metricName string,
- attributes pcommon.Map,
- ) string {
- // This comes from SignalFx Gateway code that has the capability to
- // remove CollectD tags from the name of the metric.
- var toAddDims map[string]string
- index := strings.Index(metricName, "..")
- for {
- metricName, toAddDims = collectd.LabelsFromName(&metricName)
- if len(toAddDims) == 0 {
- if index == -1 {
- metricName = strings.ReplaceAll(metricName, "..", ".")
- }
- break
- }
- for k, v := range toAddDims {
- attributes.PutStr(k, v)
- }
- }
- return metricName
- }
- func buildLabels(attributes pcommon.Map, tags string) (err error) {
- if tags == "" {
- return
- }
- for {
- parts := strings.SplitN(tags, "=", 2)
- if len(parts) != 2 {
- return fmt.Errorf("failed to break key for [%s]", tags)
- }
- key := parts[0]
- rest := parts[1]
- tagLen := len(key) + 1 // Length of key plus separator and yet to be determined length of the value.
- var value string
- if len(rest) > 1 && rest[0] == '"' {
- // Skip until non-escaped double quote.
- foundEscape := false
- i := 1
- for ; i < len(rest); i++ {
- if rest[i] != '"' && rest[i] != 'n' {
- continue
- }
- isPrevCharEscape := rest[i-1] == '\\'
- if rest[i] == '"' && !isPrevCharEscape {
- // Non-escaped double-quote, it is the end of the value.
- break
- }
- foundEscape = foundEscape || isPrevCharEscape
- }
- value = rest[1:i]
- tagLen += len(value) + 2 // plus 2 to account for the double-quotes.
- if foundEscape {
- // Per implementation of Wavefront SDK only double-quotes and
- // newline characters are escaped. See the link below:
- // https://github.com/wavefrontHQ/wavefront-sdk-go/blob/2c5891318fcd83c35c93bba2b411640495473333/senders/formatter.go#L20
- value = escapedCharReplacer.Replace(value)
- }
- } else {
- // Skip until space.
- i := 0
- for ; i < len(rest) && rest[i] != ' '; i++ { // nolint
- }
- value = rest[:i]
- tagLen += i
- }
- attributes.PutStr(key, value)
- tags = strings.TrimLeft(tags[tagLen:], " ")
- if tags == "" {
- break
- }
- }
- return
- }
- func unDoubleQuote(s string) string {
- n := len(s)
- if n < 2 {
- return s
- }
- if s[0] == '"' && s[n-1] == '"' {
- return s[1 : n-1]
- }
- return s
- }
|