123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
- import (
- "strings"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/metadata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
- )
- func (s *flinkmetricsScraper) processJobmanagerMetrics(now pcommon.Timestamp, jobmanagerMetrics *models.JobmanagerMetrics) {
- if jobmanagerMetrics == nil {
- return
- }
- for _, metric := range jobmanagerMetrics.Metrics {
- switch metric.ID {
- case "Status.JVM.CPU.Load":
- _ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
- case "Status.JVM.GarbageCollector.PS_MarkSweep.Time":
- _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
- case "Status.JVM.GarbageCollector.PS_Scavenge.Time":
- _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
- case "Status.JVM.GarbageCollector.PS_MarkSweep.Count":
- _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
- case "Status.JVM.GarbageCollector.PS_Scavenge.Count":
- _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
- case "Status.Flink.Memory.Managed.Used":
- _ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
- case "Status.Flink.Memory.Managed.Total":
- _ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Mapped.TotalCapacity":
- _ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Mapped.MemoryUsed":
- _ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
- case "Status.JVM.CPU.Time":
- _ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
- case "Status.JVM.Threads.Count":
- _ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Committed":
- _ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Committed":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Max":
- _ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Committed":
- _ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Used":
- _ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Max":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Direct.MemoryUsed":
- _ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Direct.TotalCapacity":
- _ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
- case "Status.JVM.ClassLoader.ClassesLoaded":
- _ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Used":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Max":
- _ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Used":
- _ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
- }
- }
- rb := s.mb.NewResourceBuilder()
- rb.SetHostName(jobmanagerMetrics.Host)
- rb.SetFlinkResourceTypeJobmanager()
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- func (s *flinkmetricsScraper) processTaskmanagerMetrics(now pcommon.Timestamp, taskmanagerMetricInstances []*models.TaskmanagerMetrics) {
- for _, taskmanagerMetrics := range taskmanagerMetricInstances {
- for _, metric := range taskmanagerMetrics.Metrics {
- switch metric.ID {
- case "Status.JVM.GarbageCollector.G1_Young_Generation.Count":
- _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
- case "Status.JVM.GarbageCollector.G1_Old_Generation.Count":
- _ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
- case "Status.JVM.GarbageCollector.G1_Old_Generation.Time":
- _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
- case "Status.JVM.GarbageCollector.G1_Young_Generation.Time":
- _ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
- case "Status.JVM.CPU.Load":
- _ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
- case "Status.Flink.Memory.Managed.Used":
- _ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
- case "Status.Flink.Memory.Managed.Total":
- _ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Mapped.TotalCapacity":
- _ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Mapped.MemoryUsed":
- _ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
- case "Status.JVM.CPU.Time":
- _ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
- case "Status.JVM.Threads.Count":
- _ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Committed":
- _ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Committed":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Max":
- _ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Committed":
- _ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.NonHeap.Used":
- _ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Max":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Direct.MemoryUsed":
- _ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Direct.TotalCapacity":
- _ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
- case "Status.JVM.ClassLoader.ClassesLoaded":
- _ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Metaspace.Used":
- _ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Max":
- _ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
- case "Status.JVM.Memory.Heap.Used":
- _ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
- }
- }
- rb := s.mb.NewResourceBuilder()
- rb.SetHostName(taskmanagerMetrics.Host)
- rb.SetFlinkTaskmanagerID(taskmanagerMetrics.TaskmanagerID)
- rb.SetFlinkResourceTypeTaskmanager()
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
- func (s *flinkmetricsScraper) processJobsMetrics(now pcommon.Timestamp, jobsMetricsInstances []*models.JobMetrics) {
- for _, jobsMetrics := range jobsMetricsInstances {
- for _, metric := range jobsMetrics.Metrics {
- switch metric.ID {
- case "numRestarts":
- _ = s.mb.RecordFlinkJobRestartCountDataPoint(now, metric.Value)
- case "lastCheckpointSize":
- _ = s.mb.RecordFlinkJobLastCheckpointSizeDataPoint(now, metric.Value)
- case "lastCheckpointDuration":
- _ = s.mb.RecordFlinkJobLastCheckpointTimeDataPoint(now, metric.Value)
- case "numberOfInProgressCheckpoints":
- _ = s.mb.RecordFlinkJobCheckpointInProgressDataPoint(now, metric.Value)
- case "numberOfCompletedCheckpoints":
- _ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointCompleted)
- case "numberOfFailedCheckpoints":
- _ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointFailed)
- }
- }
- rb := s.mb.NewResourceBuilder()
- rb.SetHostName(jobsMetrics.Host)
- rb.SetFlinkJobName(jobsMetrics.JobName)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
- func (s *flinkmetricsScraper) processSubtaskMetrics(now pcommon.Timestamp, subtaskMetricsInstances []*models.SubtaskMetrics) {
- for _, subtaskMetrics := range subtaskMetricsInstances {
- for _, metric := range subtaskMetrics.Metrics {
- switch {
- // record task metrics
- case metric.ID == "numRecordsIn":
- _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordIn)
- case metric.ID == "numRecordsOut":
- _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordOut)
- case metric.ID == "numLateRecordsDropped":
- _ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordDropped)
- // record operator metrics
- case strings.Contains(metric.ID, ".numRecordsIn"):
- operatorName := strings.Split(metric.ID, ".numRecordsIn")
- _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordIn)
- case strings.Contains(metric.ID, ".numRecordsOut"):
- operatorName := strings.Split(metric.ID, ".numRecordsOut")
- _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordOut)
- case strings.Contains(metric.ID, ".numLateRecordsDropped"):
- operatorName := strings.Split(metric.ID, ".numLateRecordsDropped")
- _ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordDropped)
- case strings.Contains(metric.ID, ".currentOutputWatermark"):
- operatorName := strings.Split(metric.ID, ".currentOutputWatermark")
- _ = s.mb.RecordFlinkOperatorWatermarkOutputDataPoint(now, metric.Value, operatorName[0])
- }
- }
- rb := s.mb.NewResourceBuilder()
- rb.SetHostName(subtaskMetrics.Host)
- rb.SetFlinkTaskmanagerID(subtaskMetrics.TaskmanagerID)
- rb.SetFlinkJobName(subtaskMetrics.JobName)
- rb.SetFlinkTaskName(subtaskMetrics.TaskName)
- rb.SetFlinkSubtaskIndex(subtaskMetrics.SubtaskIndex)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
|