process.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
  4. import (
  5. "strings"
  6. "go.opentelemetry.io/collector/pdata/pcommon"
  7. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/metadata"
  8. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
  9. )
  10. func (s *flinkmetricsScraper) processJobmanagerMetrics(now pcommon.Timestamp, jobmanagerMetrics *models.JobmanagerMetrics) {
  11. if jobmanagerMetrics == nil {
  12. return
  13. }
  14. for _, metric := range jobmanagerMetrics.Metrics {
  15. switch metric.ID {
  16. case "Status.JVM.CPU.Load":
  17. _ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
  18. case "Status.JVM.GarbageCollector.PS_MarkSweep.Time":
  19. _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
  20. case "Status.JVM.GarbageCollector.PS_Scavenge.Time":
  21. _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
  22. case "Status.JVM.GarbageCollector.PS_MarkSweep.Count":
  23. _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
  24. case "Status.JVM.GarbageCollector.PS_Scavenge.Count":
  25. _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
  26. case "Status.Flink.Memory.Managed.Used":
  27. _ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
  28. case "Status.Flink.Memory.Managed.Total":
  29. _ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
  30. case "Status.JVM.Memory.Mapped.TotalCapacity":
  31. _ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
  32. case "Status.JVM.Memory.Mapped.MemoryUsed":
  33. _ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
  34. case "Status.JVM.CPU.Time":
  35. _ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
  36. case "Status.JVM.Threads.Count":
  37. _ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
  38. case "Status.JVM.Memory.Heap.Committed":
  39. _ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
  40. case "Status.JVM.Memory.Metaspace.Committed":
  41. _ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
  42. case "Status.JVM.Memory.NonHeap.Max":
  43. _ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
  44. case "Status.JVM.Memory.NonHeap.Committed":
  45. _ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
  46. case "Status.JVM.Memory.NonHeap.Used":
  47. _ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
  48. case "Status.JVM.Memory.Metaspace.Max":
  49. _ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
  50. case "Status.JVM.Memory.Direct.MemoryUsed":
  51. _ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
  52. case "Status.JVM.Memory.Direct.TotalCapacity":
  53. _ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
  54. case "Status.JVM.ClassLoader.ClassesLoaded":
  55. _ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
  56. case "Status.JVM.Memory.Metaspace.Used":
  57. _ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
  58. case "Status.JVM.Memory.Heap.Max":
  59. _ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
  60. case "Status.JVM.Memory.Heap.Used":
  61. _ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
  62. }
  63. }
  64. rb := s.mb.NewResourceBuilder()
  65. rb.SetHostName(jobmanagerMetrics.Host)
  66. rb.SetFlinkResourceTypeJobmanager()
  67. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  68. }
  69. func (s *flinkmetricsScraper) processTaskmanagerMetrics(now pcommon.Timestamp, taskmanagerMetricInstances []*models.TaskmanagerMetrics) {
  70. for _, taskmanagerMetrics := range taskmanagerMetricInstances {
  71. for _, metric := range taskmanagerMetrics.Metrics {
  72. switch metric.ID {
  73. case "Status.JVM.GarbageCollector.G1_Young_Generation.Count":
  74. _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
  75. case "Status.JVM.GarbageCollector.G1_Old_Generation.Count":
  76. _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
  77. case "Status.JVM.GarbageCollector.G1_Old_Generation.Time":
  78. _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
  79. case "Status.JVM.GarbageCollector.G1_Young_Generation.Time":
  80. _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
  81. case "Status.JVM.CPU.Load":
  82. _ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
  83. case "Status.Flink.Memory.Managed.Used":
  84. _ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
  85. case "Status.Flink.Memory.Managed.Total":
  86. _ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
  87. case "Status.JVM.Memory.Mapped.TotalCapacity":
  88. _ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
  89. case "Status.JVM.Memory.Mapped.MemoryUsed":
  90. _ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
  91. case "Status.JVM.CPU.Time":
  92. _ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
  93. case "Status.JVM.Threads.Count":
  94. _ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
  95. case "Status.JVM.Memory.Heap.Committed":
  96. _ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
  97. case "Status.JVM.Memory.Metaspace.Committed":
  98. _ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
  99. case "Status.JVM.Memory.NonHeap.Max":
  100. _ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
  101. case "Status.JVM.Memory.NonHeap.Committed":
  102. _ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
  103. case "Status.JVM.Memory.NonHeap.Used":
  104. _ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
  105. case "Status.JVM.Memory.Metaspace.Max":
  106. _ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
  107. case "Status.JVM.Memory.Direct.MemoryUsed":
  108. _ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
  109. case "Status.JVM.Memory.Direct.TotalCapacity":
  110. _ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
  111. case "Status.JVM.ClassLoader.ClassesLoaded":
  112. _ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
  113. case "Status.JVM.Memory.Metaspace.Used":
  114. _ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
  115. case "Status.JVM.Memory.Heap.Max":
  116. _ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
  117. case "Status.JVM.Memory.Heap.Used":
  118. _ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
  119. }
  120. }
  121. rb := s.mb.NewResourceBuilder()
  122. rb.SetHostName(taskmanagerMetrics.Host)
  123. rb.SetFlinkTaskmanagerID(taskmanagerMetrics.TaskmanagerID)
  124. rb.SetFlinkResourceTypeTaskmanager()
  125. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  126. }
  127. }
  128. func (s *flinkmetricsScraper) processJobsMetrics(now pcommon.Timestamp, jobsMetricsInstances []*models.JobMetrics) {
  129. for _, jobsMetrics := range jobsMetricsInstances {
  130. for _, metric := range jobsMetrics.Metrics {
  131. switch metric.ID {
  132. case "numRestarts":
  133. _ = s.mb.RecordFlinkJobRestartCountDataPoint(now, metric.Value)
  134. case "lastCheckpointSize":
  135. _ = s.mb.RecordFlinkJobLastCheckpointSizeDataPoint(now, metric.Value)
  136. case "lastCheckpointDuration":
  137. _ = s.mb.RecordFlinkJobLastCheckpointTimeDataPoint(now, metric.Value)
  138. case "numberOfInProgressCheckpoints":
  139. _ = s.mb.RecordFlinkJobCheckpointInProgressDataPoint(now, metric.Value)
  140. case "numberOfCompletedCheckpoints":
  141. _ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointCompleted)
  142. case "numberOfFailedCheckpoints":
  143. _ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointFailed)
  144. }
  145. }
  146. rb := s.mb.NewResourceBuilder()
  147. rb.SetHostName(jobsMetrics.Host)
  148. rb.SetFlinkJobName(jobsMetrics.JobName)
  149. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  150. }
  151. }
  152. func (s *flinkmetricsScraper) processSubtaskMetrics(now pcommon.Timestamp, subtaskMetricsInstances []*models.SubtaskMetrics) {
  153. for _, subtaskMetrics := range subtaskMetricsInstances {
  154. for _, metric := range subtaskMetrics.Metrics {
  155. switch {
  156. // record task metrics
  157. case metric.ID == "numRecordsIn":
  158. _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordIn)
  159. case metric.ID == "numRecordsOut":
  160. _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordOut)
  161. case metric.ID == "numLateRecordsDropped":
  162. _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordDropped)
  163. // record operator metrics
  164. case strings.Contains(metric.ID, ".numRecordsIn"):
  165. operatorName := strings.Split(metric.ID, ".numRecordsIn")
  166. _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordIn)
  167. case strings.Contains(metric.ID, ".numRecordsOut"):
  168. operatorName := strings.Split(metric.ID, ".numRecordsOut")
  169. _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordOut)
  170. case strings.Contains(metric.ID, ".numLateRecordsDropped"):
  171. operatorName := strings.Split(metric.ID, ".numLateRecordsDropped")
  172. _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordDropped)
  173. case strings.Contains(metric.ID, ".currentOutputWatermark"):
  174. operatorName := strings.Split(metric.ID, ".currentOutputWatermark")
  175. _ = s.mb.RecordFlinkOperatorWatermarkOutputDataPoint(now, metric.Value, operatorName[0])
  176. }
  177. }
  178. rb := s.mb.NewResourceBuilder()
  179. rb.SetHostName(subtaskMetrics.Host)
  180. rb.SetFlinkTaskmanagerID(subtaskMetrics.TaskmanagerID)
  181. rb.SetFlinkJobName(subtaskMetrics.JobName)
  182. rb.SetFlinkTaskName(subtaskMetrics.TaskName)
  183. rb.SetFlinkSubtaskIndex(subtaskMetrics.SubtaskIndex)
  184. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  185. }
  186. }