grouped_metric.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
  4. import (
  5. "encoding/json"
  6. "strings"
  7. "go.opentelemetry.io/collector/pdata/pmetric"
  8. "go.uber.org/zap"
  9. aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
  10. )
  11. // groupedMetric defines set of metrics with same namespace, timestamp and labels
  12. type groupedMetric struct {
  13. labels map[string]string
  14. metrics map[string]*metricInfo
  15. metadata cWMetricMetadata
  16. }
  17. // metricInfo defines value and unit for OT Metrics
  18. type metricInfo struct {
  19. value any
  20. unit string
  21. }
  22. // addToGroupedMetric processes OT metrics and adds them into GroupedMetric buckets
  23. func addToGroupedMetric(pmd pmetric.Metric, groupedMetrics map[any]*groupedMetric, metadata cWMetricMetadata, patternReplaceSucceeded bool, logger *zap.Logger, descriptor map[string]MetricDescriptor, config *Config, calculators *emfCalculators) error {
  24. dps := getDataPoints(pmd, metadata, logger)
  25. if dps == nil || dps.Len() == 0 {
  26. return nil
  27. }
  28. for i := 0; i < dps.Len(); i++ {
  29. // Drop stale or NaN metric values
  30. if staleOrNan, attrs := dps.IsStaleOrNaN(i); staleOrNan {
  31. if config != nil && config.logger != nil {
  32. config.logger.Debug("dropped metric with nan value",
  33. zap.String("metric.name", pmd.Name()),
  34. zap.Any("metric.attributes", attrs))
  35. }
  36. continue
  37. }
  38. dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators)
  39. if !retained {
  40. continue
  41. }
  42. for _, dp := range dps {
  43. labels := dp.labels
  44. if metricType, ok := labels["Type"]; ok {
  45. if (metricType == "Pod" || metricType == "Container") && config.EKSFargateContainerInsightsEnabled {
  46. addKubernetesWrapper(labels)
  47. }
  48. }
  49. // if patterns were found in config file and weren't replaced by resource attributes, replace those patterns with metric labels.
  50. // if patterns are provided for a valid key and that key doesn't exist in the resource attributes, it is replaced with `undefined`.
  51. if !patternReplaceSucceeded {
  52. if strings.Contains(metadata.logGroup, "undefined") {
  53. metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger)
  54. }
  55. if strings.Contains(metadata.logStream, "undefined") {
  56. metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger)
  57. }
  58. }
  59. metric := &metricInfo{
  60. value: dp.value,
  61. unit: translateUnit(pmd, descriptor),
  62. }
  63. if dp.timestampMs > 0 {
  64. metadata.timestampMs = dp.timestampMs
  65. }
  66. // Extra params to use when grouping metrics
  67. groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)
  68. if _, ok := groupedMetrics[groupKey]; ok {
  69. // if MetricName already exists in metrics map, print warning log
  70. if _, ok := groupedMetrics[groupKey].metrics[dp.name]; ok {
  71. logger.Warn(
  72. "Duplicate metric found",
  73. zap.String("Name", dp.name),
  74. zap.Any("Labels", labels),
  75. )
  76. } else {
  77. groupedMetrics[groupKey].metrics[dp.name] = metric
  78. }
  79. } else {
  80. groupedMetrics[groupKey] = &groupedMetric{
  81. labels: labels,
  82. metrics: map[string]*metricInfo{(dp.name): metric},
  83. metadata: metadata,
  84. }
  85. }
  86. }
  87. }
  88. return nil
  89. }
  90. type kubernetesObj struct {
  91. ContainerName string `json:"container_name,omitempty"`
  92. Docker *internalDockerObj `json:"docker,omitempty"`
  93. Host string `json:"host,omitempty"`
  94. Labels *internalLabelsObj `json:"labels,omitempty"`
  95. NamespaceName string `json:"namespace_name,omitempty"`
  96. PodID string `json:"pod_id,omitempty"`
  97. PodName string `json:"pod_name,omitempty"`
  98. PodOwners *internalPodOwnersObj `json:"pod_owners,omitempty"`
  99. ServiceName string `json:"service_name,omitempty"`
  100. }
  101. type internalDockerObj struct {
  102. ContainerID string `json:"container_id,omitempty"`
  103. }
  104. type internalLabelsObj struct {
  105. App string `json:"app,omitempty"`
  106. PodTemplateHash string `json:"pod-template-hash,omitempty"`
  107. }
  108. type internalPodOwnersObj struct {
  109. OwnerKind string `json:"owner_kind,omitempty"`
  110. OwnerName string `json:"owner_name,omitempty"`
  111. }
  112. func addKubernetesWrapper(labels map[string]string) {
  113. // fill in obj
  114. filledInObj := kubernetesObj{
  115. ContainerName: mapGetHelper(labels, "container"),
  116. Docker: &internalDockerObj{
  117. ContainerID: mapGetHelper(labels, "container_id"),
  118. },
  119. Host: mapGetHelper(labels, "NodeName"),
  120. Labels: &internalLabelsObj{
  121. App: mapGetHelper(labels, "app"),
  122. PodTemplateHash: mapGetHelper(labels, "pod-template-hash"),
  123. },
  124. NamespaceName: mapGetHelper(labels, "Namespace"),
  125. PodID: mapGetHelper(labels, "PodId"),
  126. PodName: mapGetHelper(labels, "PodName"),
  127. PodOwners: &internalPodOwnersObj{
  128. OwnerKind: mapGetHelper(labels, "owner_kind"),
  129. OwnerName: mapGetHelper(labels, "owner_name"),
  130. },
  131. ServiceName: mapGetHelper(labels, "Service"),
  132. }
  133. // handle nested empty object
  134. if filledInObj.Docker.ContainerID == "" {
  135. filledInObj.Docker = nil
  136. }
  137. if filledInObj.Labels.App == "" && filledInObj.Labels.PodTemplateHash == "" {
  138. filledInObj.Labels = nil
  139. }
  140. if filledInObj.PodOwners.OwnerKind == "" && filledInObj.PodOwners.OwnerName == "" {
  141. filledInObj.PodOwners = nil
  142. }
  143. jsonBytes, _ := json.Marshal(filledInObj)
  144. labels["kubernetes"] = string(jsonBytes)
  145. }
  146. func mapGetHelper(labels map[string]string, key string) string {
  147. val, ok := labels[key]
  148. if ok {
  149. return val
  150. }
  151. return ""
  152. }
  153. func translateUnit(metric pmetric.Metric, descriptor map[string]MetricDescriptor) string {
  154. unit := metric.Unit()
  155. if descriptor, exists := descriptor[metric.Name()]; exists {
  156. if unit == "" || descriptor.Overwrite {
  157. return descriptor.Unit
  158. }
  159. }
  160. switch unit {
  161. case "ms":
  162. unit = "Milliseconds"
  163. case "s":
  164. unit = "Seconds"
  165. case "us":
  166. unit = "Microseconds"
  167. case "By":
  168. unit = "Bytes"
  169. case "Bi":
  170. unit = "Bits"
  171. }
  172. return unit
  173. }