accumulator.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awsecscontainermetrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver/internal/awsecscontainermetrics"
  4. import (
  5. "time"
  6. "go.opentelemetry.io/collector/pdata/pcommon"
  7. "go.opentelemetry.io/collector/pdata/pmetric"
  8. "go.uber.org/zap"
  9. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil"
  10. )
  11. // metricDataAccumulator defines the accumulator
  12. type metricDataAccumulator struct {
  13. mds []pmetric.Metrics
  14. }
  15. // getMetricsData generates OT Metrics data from task metadata and docker stats
  16. func (acc *metricDataAccumulator) getMetricsData(containerStatsMap map[string]*ContainerStats, metadata ecsutil.TaskMetadata, logger *zap.Logger) {
  17. taskMetrics := ECSMetrics{}
  18. timestamp := pcommon.NewTimestampFromTime(time.Now())
  19. taskResource := taskResource(metadata)
  20. for _, containerMetadata := range metadata.Containers {
  21. containerResource := containerResource(containerMetadata, logger)
  22. taskResource.Attributes().Range(func(k string, av pcommon.Value) bool {
  23. av.CopyTo(containerResource.Attributes().PutEmpty(k))
  24. return true
  25. })
  26. stats, ok := containerStatsMap[containerMetadata.DockerID]
  27. if ok && !isEmptyStats(stats) {
  28. containerMetrics := convertContainerMetrics(stats, logger, containerMetadata)
  29. acc.accumulate(convertToOTLPMetrics(containerPrefix, containerMetrics, containerResource, timestamp))
  30. aggregateTaskMetrics(&taskMetrics, containerMetrics)
  31. } else if containerMetadata.FinishedAt != "" && containerMetadata.StartedAt != "" {
  32. duration, err := calculateDuration(containerMetadata.StartedAt, containerMetadata.FinishedAt)
  33. if err != nil {
  34. logger.Warn("Error time format error found for this container:" + containerMetadata.ContainerName)
  35. }
  36. acc.accumulate(convertStoppedContainerDataToOTMetrics(containerPrefix, containerResource, timestamp, duration))
  37. }
  38. }
  39. overrideWithTaskLevelLimit(&taskMetrics, metadata)
  40. acc.accumulate(convertToOTLPMetrics(taskPrefix, taskMetrics, taskResource, timestamp))
  41. }
  42. func (acc *metricDataAccumulator) accumulate(md pmetric.Metrics) {
  43. acc.mds = append(acc.mds, md)
  44. }
  45. func isEmptyStats(stats *ContainerStats) bool {
  46. return stats == nil || stats.ID == ""
  47. }
  48. func convertContainerMetrics(stats *ContainerStats, logger *zap.Logger, containerMetadata ecsutil.ContainerMetadata) ECSMetrics {
  49. containerMetrics := getContainerMetrics(stats, logger)
  50. if containerMetadata.Limits.Memory != nil {
  51. containerMetrics.MemoryReserved = *containerMetadata.Limits.Memory
  52. }
  53. if containerMetadata.Limits.CPU != nil {
  54. containerMetrics.CPUReserved = *containerMetadata.Limits.CPU
  55. }
  56. if containerMetrics.CPUReserved > 0 {
  57. containerMetrics.CPUUtilized = (containerMetrics.CPUUtilized / containerMetrics.CPUReserved)
  58. }
  59. return containerMetrics
  60. }
  61. func overrideWithTaskLevelLimit(taskMetrics *ECSMetrics, metadata ecsutil.TaskMetadata) {
  62. // Overwrite Memory limit with task level limit
  63. if metadata.Limits.Memory != nil {
  64. taskMetrics.MemoryReserved = *metadata.Limits.Memory
  65. }
  66. // Overwrite CPU limit with task level limit
  67. if metadata.Limits.CPU != nil {
  68. taskMetrics.CPUReserved = *metadata.Limits.CPU * cpusInVCpu
  69. }
  70. // taskMetrics.CPUReserved cannot be zero. In ECS, user needs to set CPU limit
  71. // at least in one place (either in task level or in container level). If the
  72. // task level CPULimit is not present, we calculate it from the summation of
  73. // all container CPU limits.
  74. if taskMetrics.CPUReserved > 0 {
  75. taskMetrics.CPUUtilized = taskMetrics.CPUUsageInVCPU * cpusInVCpu
  76. }
  77. }
  78. func calculateDuration(startTime, endTime string) (float64, error) {
  79. start, err := time.Parse(time.RFC3339Nano, startTime)
  80. if err != nil {
  81. return 0, err
  82. }
  83. end, err := time.Parse(time.RFC3339Nano, endTime)
  84. if err != nil {
  85. return 0, err
  86. }
  87. duration := end.Sub(start)
  88. return duration.Seconds(), nil
  89. }