collectd.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package collectdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
  4. import (
  5. "encoding/json"
  6. "fmt"
  7. "time"
  8. "go.opentelemetry.io/collector/pdata/pcommon"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "go.uber.org/zap"
  11. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
  13. )
  14. type collectDRecord struct {
  15. Dsnames []*string `json:"dsnames"`
  16. Dstypes []*string `json:"dstypes"`
  17. Host *string `json:"host"`
  18. Interval *float64 `json:"interval"`
  19. Plugin *string `json:"plugin"`
  20. PluginInstance *string `json:"plugin_instance"`
  21. Time *float64 `json:"time"`
  22. TypeS *string `json:"type"`
  23. TypeInstance *string `json:"type_instance"`
  24. Values []*json.Number `json:"values"`
  25. Message *string `json:"message"`
  26. Meta map[string]any `json:"meta"`
  27. Severity *string `json:"severity"`
  28. }
  29. type createMetricInfo struct {
  30. Name string
  31. DsType *string
  32. Val *json.Number
  33. }
  34. func (cdr *collectDRecord) isEvent() bool {
  35. return cdr.Time != nil && cdr.Severity != nil && cdr.Message != nil
  36. }
  37. func (cdr *collectDRecord) protoTime() pcommon.Timestamp {
  38. // Return 1970-01-01 00:00:00 +0000 UTC.
  39. if cdr.Time == nil {
  40. return pcommon.NewTimestampFromTime(time.Unix(0, 0))
  41. }
  42. ts := time.Unix(0, int64(float64(time.Second)**cdr.Time))
  43. return pcommon.NewTimestampFromTime(ts)
  44. }
  45. func (cdr *collectDRecord) startTimestamp(metricType string) pcommon.Timestamp {
  46. if metricType == "cumulative" {
  47. return pcommon.NewTimestampFromTime(time.Unix(0, int64((*cdr.Time-*cdr.Interval)*float64(time.Second))))
  48. }
  49. return pcommon.NewTimestampFromTime(time.Unix(0, 0))
  50. }
  51. func (cdr *collectDRecord) appendToMetrics(logger *zap.Logger, scopeMetrics pmetric.ScopeMetrics, defaultLabels map[string]string) error {
  52. // Ignore if record is an event instead of data point
  53. if cdr.isEvent() {
  54. logger.Debug("ignoring log event", zap.String("message", *cdr.Message))
  55. return nil
  56. }
  57. labels := make(map[string]string, len(defaultLabels))
  58. for k, v := range defaultLabels {
  59. labels[k] = v
  60. }
  61. for i := range cdr.Dsnames {
  62. if i < len(cdr.Dstypes) && i < len(cdr.Values) && cdr.Values[i] != nil {
  63. dsType, dsName, val := cdr.Dstypes[i], cdr.Dsnames[i], cdr.Values[i]
  64. metricName, usedDsName := cdr.getReasonableMetricName(i, labels)
  65. createMetric := createMetricInfo{
  66. Name: metricName,
  67. DsType: dsType,
  68. Val: val,
  69. }
  70. addIfNotNullOrEmpty(labels, "plugin", cdr.Plugin)
  71. parseAndAddLabels(labels, cdr.PluginInstance, cdr.Host)
  72. if !usedDsName {
  73. addIfNotNullOrEmpty(labels, "dsname", dsName)
  74. }
  75. metric, err := cdr.newMetric(createMetric, labels)
  76. if err != nil {
  77. return fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
  78. }
  79. newMetric := scopeMetrics.Metrics().AppendEmpty()
  80. metric.MoveTo(newMetric)
  81. }
  82. }
  83. return nil
  84. }
  85. // Create new metric, get labels, then setting attribute and metric info
  86. func (cdr *collectDRecord) newMetric(createMetric createMetricInfo, labels map[string]string) (pmetric.Metric, error) {
  87. attributes := setAttributes(labels)
  88. metric, err := cdr.setMetric(createMetric, attributes)
  89. if err != nil {
  90. return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", createMetric.Name, err)
  91. }
  92. return metric, nil
  93. }
  94. func setAttributes(labels map[string]string) pcommon.Map {
  95. attributes := pcommon.NewMap()
  96. for k, v := range labels {
  97. attributes.PutStr(k, v)
  98. }
  99. return attributes
  100. }
  101. // Set new metric info with name, datapoint, time, attributes
  102. func (cdr *collectDRecord) setMetric(createMetric createMetricInfo, atr pcommon.Map) (pmetric.Metric, error) {
  103. typ := ""
  104. metric := pmetric.NewMetric()
  105. if createMetric.DsType != nil {
  106. typ = *createMetric.DsType
  107. }
  108. metric.SetName(createMetric.Name)
  109. dataPoint := setDataPoint(typ, metric)
  110. dataPoint.SetTimestamp(cdr.protoTime())
  111. atr.CopyTo(dataPoint.Attributes())
  112. if val, err := createMetric.Val.Int64(); err == nil {
  113. dataPoint.SetIntValue(val)
  114. } else if val, err := createMetric.Val.Float64(); err == nil {
  115. dataPoint.SetDoubleValue(val)
  116. } else {
  117. return pmetric.Metric{}, fmt.Errorf("value could not be decoded: %w", err)
  118. }
  119. return metric, nil
  120. }
  121. // check type to decide metric type and return data point
  122. func setDataPoint(typ string, metric pmetric.Metric) pmetric.NumberDataPoint {
  123. var dataPoint pmetric.NumberDataPoint
  124. switch typ {
  125. case "derive", "counter":
  126. sum := metric.SetEmptySum()
  127. sum.SetIsMonotonic(true)
  128. dataPoint = sum.DataPoints().AppendEmpty()
  129. default:
  130. dataPoint = metric.SetEmptyGauge().DataPoints().AppendEmpty()
  131. }
  132. return dataPoint
  133. }
  134. // getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
  135. // if there are more than one dsname append .dsname for the particular uint. if there's only one it
  136. // becomes a dimension.
  137. func (cdr *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
  138. usedDsName := false
  139. capacity := 0
  140. if cdr.TypeS != nil {
  141. capacity += len(*cdr.TypeS)
  142. }
  143. if cdr.TypeInstance != nil {
  144. capacity += len(*cdr.TypeInstance)
  145. }
  146. parts := make([]byte, 0, capacity)
  147. if !isNilOrEmpty(cdr.TypeS) {
  148. parts = append(parts, *cdr.TypeS...)
  149. }
  150. parts = cdr.pointTypeInstance(attrs, parts)
  151. if cdr.Dsnames != nil && !isNilOrEmpty(cdr.Dsnames[index]) && len(cdr.Dsnames) > 1 {
  152. if len(parts) > 0 {
  153. parts = append(parts, '.')
  154. }
  155. parts = append(parts, *cdr.Dsnames[index]...)
  156. usedDsName = true
  157. }
  158. return string(parts), usedDsName
  159. }
  160. // pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
  161. func (cdr *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
  162. if isNilOrEmpty(cdr.TypeInstance) {
  163. return parts
  164. }
  165. instanceName, extractedAttrs := collectd.LabelsFromName(cdr.TypeInstance)
  166. if instanceName != "" {
  167. if len(parts) > 0 {
  168. parts = append(parts, '.')
  169. }
  170. parts = append(parts, instanceName...)
  171. }
  172. for k, v := range extractedAttrs {
  173. if _, exists := attrs[k]; !exists {
  174. val := v
  175. addIfNotNullOrEmpty(attrs, k, &val)
  176. }
  177. }
  178. return parts
  179. }
  180. func isNilOrEmpty(str *string) bool {
  181. return str == nil || *str == ""
  182. }
  183. func addIfNotNullOrEmpty(m map[string]string, key string, val *string) {
  184. if val != nil && *val != "" {
  185. m[key] = *val
  186. }
  187. }
  188. func parseAndAddLabels(labels map[string]string, pluginInstance *string, host *string) {
  189. parseNameForLabels(labels, "plugin_instance", pluginInstance)
  190. parseNameForLabels(labels, "host", host)
  191. }
  192. func parseNameForLabels(labels map[string]string, key string, val *string) {
  193. instanceName, toAddDims := collectd.LabelsFromName(val)
  194. for k, v := range toAddDims {
  195. if _, exists := labels[k]; !exists {
  196. val := v
  197. addIfNotNullOrEmpty(labels, k, &val)
  198. }
  199. }
  200. addIfNotNullOrEmpty(labels, key, &instanceName)
  201. }