scraper.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package apachesparkreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "time"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/pdata/pcommon"
  11. "go.opentelemetry.io/collector/pdata/pmetric"
  12. "go.opentelemetry.io/collector/receiver"
  13. "go.opentelemetry.io/collector/receiver/scrapererror"
  14. "go.uber.org/zap"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/metadata"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/models"
  17. )
  18. var (
  19. errFailedAppIDCollection = errors.New("failed to retrieve app ids")
  20. errNoMatchingAllowedApps = errors.New("no apps matched allowed names")
  21. )
  22. type sparkScraper struct {
  23. client client
  24. logger *zap.Logger
  25. config *Config
  26. settings component.TelemetrySettings
  27. mb *metadata.MetricsBuilder
  28. }
  29. func newSparkScraper(logger *zap.Logger, cfg *Config, settings receiver.CreateSettings) *sparkScraper {
  30. return &sparkScraper{
  31. logger: logger,
  32. config: cfg,
  33. settings: settings.TelemetrySettings,
  34. mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
  35. }
  36. }
  37. func (s *sparkScraper) start(_ context.Context, host component.Host) (err error) {
  38. httpClient, err := newApacheSparkClient(s.config, host, s.settings)
  39. if err != nil {
  40. return fmt.Errorf("failed to start: %w", err)
  41. }
  42. s.client = httpClient
  43. return nil
  44. }
  45. func (s *sparkScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
  46. now := pcommon.NewTimestampFromTime(time.Now())
  47. var scrapeErrors scrapererror.ScrapeErrors
  48. // Call applications endpoint to get ids and names for all apps in the cluster
  49. apps, err := s.client.Applications()
  50. if err != nil {
  51. return pmetric.NewMetrics(), errFailedAppIDCollection
  52. }
  53. // Check apps against allowed app names from config
  54. var allowedApps []models.Application
  55. // If no app names specified, allow all apps
  56. switch {
  57. case len(s.config.ApplicationNames) == 0:
  58. allowedApps = apps
  59. default:
  60. // Some allowed app names specified, compare to app names from applications endpoint
  61. appMap := make(map[string][]models.Application)
  62. for _, app := range apps {
  63. appMap[app.Name] = append(appMap[app.Name], app)
  64. }
  65. for _, name := range s.config.ApplicationNames {
  66. if apps, ok := appMap[name]; ok {
  67. allowedApps = append(allowedApps, apps...)
  68. }
  69. }
  70. if len(allowedApps) == 0 {
  71. return pmetric.NewMetrics(), errNoMatchingAllowedApps
  72. }
  73. }
  74. // Get stats from the 'metrics' endpoint
  75. clusterStats, err := s.client.ClusterStats()
  76. if err != nil {
  77. scrapeErrors.AddPartial(32, err)
  78. s.logger.Warn("Failed to scrape cluster stats", zap.Error(err))
  79. } else {
  80. for _, app := range allowedApps {
  81. s.recordCluster(clusterStats, now, app.ApplicationID, app.Name)
  82. }
  83. }
  84. // For each application id, get stats from stages & executors endpoints
  85. for _, app := range allowedApps {
  86. stageStats, err := s.client.StageStats(app.ApplicationID)
  87. if err != nil {
  88. scrapeErrors.AddPartial(24, err)
  89. s.logger.Warn("Failed to scrape stage stats", zap.Error(err))
  90. } else {
  91. s.recordStages(stageStats, now, app.ApplicationID, app.Name)
  92. }
  93. executorStats, err := s.client.ExecutorStats(app.ApplicationID)
  94. if err != nil {
  95. scrapeErrors.AddPartial(13, err)
  96. s.logger.Warn("Failed to scrape executor stats", zap.Error(err))
  97. } else {
  98. s.recordExecutors(executorStats, now, app.ApplicationID, app.Name)
  99. }
  100. jobStats, err := s.client.JobStats(app.ApplicationID)
  101. if err != nil {
  102. scrapeErrors.AddPartial(8, err)
  103. s.logger.Warn("Failed to scrape job stats", zap.Error(err))
  104. } else {
  105. s.recordJobs(jobStats, now, app.ApplicationID, app.Name)
  106. }
  107. }
  108. return s.mb.Emit(), scrapeErrors.Combine()
  109. }
  110. func (s *sparkScraper) recordCluster(clusterStats *models.ClusterProperties, now pcommon.Timestamp, appID string, appName string) {
  111. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.disk.diskSpaceUsed_MB", appID)]; ok {
  112. s.mb.RecordSparkDriverBlockManagerDiskUsageDataPoint(now, int64(stat.Value))
  113. }
  114. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.offHeapMemUsed_MB", appID)]; ok {
  115. s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap, metadata.AttributeStateUsed)
  116. }
  117. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.onHeapMemUsed_MB", appID)]; ok {
  118. s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap, metadata.AttributeStateUsed)
  119. }
  120. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.remainingOffHeapMem_MB", appID)]; ok {
  121. s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap, metadata.AttributeStateFree)
  122. }
  123. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.remainingOnHeapMem_MB", appID)]; ok {
  124. s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap, metadata.AttributeStateFree)
  125. }
  126. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.fileCacheHits", appID)]; ok {
  127. s.mb.RecordSparkDriverHiveExternalCatalogFileCacheHitsDataPoint(now, stat.Count)
  128. }
  129. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.filesDiscovered", appID)]; ok {
  130. s.mb.RecordSparkDriverHiveExternalCatalogFilesDiscoveredDataPoint(now, stat.Count)
  131. }
  132. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.hiveClientCalls", appID)]; ok {
  133. s.mb.RecordSparkDriverHiveExternalCatalogHiveClientCallsDataPoint(now, stat.Count)
  134. }
  135. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.parallelListingJobCount", appID)]; ok {
  136. s.mb.RecordSparkDriverHiveExternalCatalogParallelListingJobsDataPoint(now, stat.Count)
  137. }
  138. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.partitionsFetched", appID)]; ok {
  139. s.mb.RecordSparkDriverHiveExternalCatalogPartitionsFetchedDataPoint(now, stat.Count)
  140. }
  141. if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.compilationTime", appID)]; ok {
  142. s.mb.RecordSparkDriverCodeGeneratorCompilationCountDataPoint(now, stat.Count)
  143. s.mb.RecordSparkDriverCodeGeneratorCompilationAverageTimeDataPoint(now, stat.Mean)
  144. }
  145. if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.generatedClassSize", appID)]; ok {
  146. s.mb.RecordSparkDriverCodeGeneratorGeneratedClassCountDataPoint(now, stat.Count)
  147. s.mb.RecordSparkDriverCodeGeneratorGeneratedClassAverageSizeDataPoint(now, stat.Mean)
  148. }
  149. if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.generatedMethodSize", appID)]; ok {
  150. s.mb.RecordSparkDriverCodeGeneratorGeneratedMethodCountDataPoint(now, stat.Count)
  151. s.mb.RecordSparkDriverCodeGeneratorGeneratedMethodAverageSizeDataPoint(now, stat.Mean)
  152. }
  153. if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.sourceCodeSize", appID)]; ok {
  154. s.mb.RecordSparkDriverCodeGeneratorSourceCodeOperationsDataPoint(now, stat.Count)
  155. s.mb.RecordSparkDriverCodeGeneratorSourceCodeAverageSizeDataPoint(now, stat.Mean)
  156. }
  157. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.job.activeJobs", appID)]; ok {
  158. s.mb.RecordSparkDriverDagSchedulerJobActiveDataPoint(now, int64(stat.Value))
  159. }
  160. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.job.allJobs", appID)]; ok {
  161. s.mb.RecordSparkDriverDagSchedulerJobCountDataPoint(now, int64(stat.Value))
  162. }
  163. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.failedStages", appID)]; ok {
  164. s.mb.RecordSparkDriverDagSchedulerStageFailedDataPoint(now, int64(stat.Value))
  165. }
  166. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.runningStages", appID)]; ok {
  167. s.mb.RecordSparkDriverDagSchedulerStageCountDataPoint(now, int64(stat.Value), metadata.AttributeSchedulerStatusRunning)
  168. }
  169. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.waitingStages", appID)]; ok {
  170. s.mb.RecordSparkDriverDagSchedulerStageCountDataPoint(now, int64(stat.Value), metadata.AttributeSchedulerStatusWaiting)
  171. }
  172. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.LiveListenerBus.numEventsPosted", appID)]; ok {
  173. s.mb.RecordSparkDriverLiveListenerBusPostedDataPoint(now, stat.Count)
  174. }
  175. if stat, ok := clusterStats.Timers[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.listenerProcessingTime", appID)]; ok {
  176. s.mb.RecordSparkDriverLiveListenerBusProcessingTimeAverageDataPoint(now, stat.Mean)
  177. }
  178. if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.numDroppedEvents", appID)]; ok {
  179. s.mb.RecordSparkDriverLiveListenerBusDroppedDataPoint(now, stat.Count)
  180. }
  181. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.size", appID)]; ok {
  182. s.mb.RecordSparkDriverLiveListenerBusQueueSizeDataPoint(now, int64(stat.Value))
  183. }
  184. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.JVMCPU.jvmCpuTime", appID)]; ok {
  185. s.mb.RecordSparkDriverJvmCPUTimeDataPoint(now, int64(stat.Value))
  186. }
  187. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.JVMOffHeapMemory", appID)]; ok {
  188. s.mb.RecordSparkDriverExecutorMemoryJvmDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
  189. }
  190. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.JVMHeapMemory", appID)]; ok {
  191. s.mb.RecordSparkDriverExecutorMemoryJvmDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
  192. }
  193. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OffHeapExecutionMemory", appID)]; ok {
  194. s.mb.RecordSparkDriverExecutorMemoryExecutionDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
  195. }
  196. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OnHeapExecutionMemory", appID)]; ok {
  197. s.mb.RecordSparkDriverExecutorMemoryExecutionDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
  198. }
  199. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OffHeapStorageMemory", appID)]; ok {
  200. s.mb.RecordSparkDriverExecutorMemoryStorageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
  201. }
  202. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OnHeapStorageMemory", appID)]; ok {
  203. s.mb.RecordSparkDriverExecutorMemoryStorageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
  204. }
  205. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.DirectPoolMemory", appID)]; ok {
  206. s.mb.RecordSparkDriverExecutorMemoryPoolDataPoint(now, int64(stat.Value), metadata.AttributePoolMemoryTypeDirect)
  207. }
  208. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MappedPoolMemory", appID)]; ok {
  209. s.mb.RecordSparkDriverExecutorMemoryPoolDataPoint(now, int64(stat.Value), metadata.AttributePoolMemoryTypeMapped)
  210. }
  211. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MinorGCCount", appID)]; ok {
  212. s.mb.RecordSparkDriverExecutorGcOperationsDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMinor)
  213. }
  214. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MajorGCCount", appID)]; ok {
  215. s.mb.RecordSparkDriverExecutorGcOperationsDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMajor)
  216. }
  217. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MinorGCTime", appID)]; ok {
  218. s.mb.RecordSparkDriverExecutorGcTimeDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMinor)
  219. }
  220. if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MajorGCTime", appID)]; ok {
  221. s.mb.RecordSparkDriverExecutorGcTimeDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMajor)
  222. }
  223. rb := s.mb.NewResourceBuilder()
  224. rb.SetSparkApplicationID(appID)
  225. rb.SetSparkApplicationName(appName)
  226. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  227. }
  228. func (s *sparkScraper) recordStages(stageStats []models.Stage, now pcommon.Timestamp, appID string, appName string) {
  229. for _, stage := range stageStats {
  230. switch stage.Status {
  231. case "ACTIVE":
  232. s.mb.RecordSparkStageStatusDataPoint(now, 0, true, false, false, false)
  233. case "COMPLETE":
  234. s.mb.RecordSparkStageStatusDataPoint(now, 0, false, true, false, false)
  235. case "PENDING":
  236. s.mb.RecordSparkStageStatusDataPoint(now, 0, false, false, true, false)
  237. case "FAILED":
  238. s.mb.RecordSparkStageStatusDataPoint(now, 0, false, false, false, true)
  239. default:
  240. s.logger.Warn("Unsupported Spark stage status supplied: ignoring this stage's metrics and continuing to metrics for next stage", zap.String("status", stage.Status))
  241. continue
  242. }
  243. s.mb.RecordSparkStageTaskActiveDataPoint(now, stage.NumActiveTasks)
  244. s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumCompleteTasks, metadata.AttributeStageTaskResultCompleted)
  245. s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumFailedTasks, metadata.AttributeStageTaskResultFailed)
  246. s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumKilledTasks, metadata.AttributeStageTaskResultKilled)
  247. s.mb.RecordSparkStageExecutorRunTimeDataPoint(now, stage.ExecutorRunTime)
  248. s.mb.RecordSparkStageExecutorCPUTimeDataPoint(now, stage.ExecutorCPUTime)
  249. s.mb.RecordSparkStageTaskResultSizeDataPoint(now, stage.ResultSize)
  250. s.mb.RecordSparkStageJvmGcTimeDataPoint(now, stage.JvmGcTime)
  251. s.mb.RecordSparkStageMemorySpilledDataPoint(now, stage.MemoryBytesSpilled)
  252. s.mb.RecordSparkStageDiskSpilledDataPoint(now, stage.DiskBytesSpilled)
  253. s.mb.RecordSparkStageMemoryPeakDataPoint(now, stage.PeakExecutionMemory)
  254. s.mb.RecordSparkStageIoSizeDataPoint(now, stage.InputBytes, metadata.AttributeDirectionIn)
  255. s.mb.RecordSparkStageIoSizeDataPoint(now, stage.OutputBytes, metadata.AttributeDirectionOut)
  256. s.mb.RecordSparkStageIoRecordsDataPoint(now, stage.InputRecords, metadata.AttributeDirectionIn)
  257. s.mb.RecordSparkStageIoRecordsDataPoint(now, stage.OutputRecords, metadata.AttributeDirectionOut)
  258. s.mb.RecordSparkStageShuffleBlocksFetchedDataPoint(now, stage.ShuffleRemoteBlocksFetched, metadata.AttributeSourceRemote)
  259. s.mb.RecordSparkStageShuffleBlocksFetchedDataPoint(now, stage.ShuffleLocalBlocksFetched, metadata.AttributeSourceLocal)
  260. s.mb.RecordSparkStageShuffleFetchWaitTimeDataPoint(now, stage.ShuffleFetchWaitTime)
  261. s.mb.RecordSparkStageShuffleIoDiskDataPoint(now, stage.ShuffleRemoteBytesReadToDisk)
  262. s.mb.RecordSparkStageShuffleIoReadSizeDataPoint(now, stage.ShuffleLocalBytesRead, metadata.AttributeSourceLocal)
  263. s.mb.RecordSparkStageShuffleIoReadSizeDataPoint(now, stage.ShuffleRemoteBytesRead, metadata.AttributeSourceRemote)
  264. s.mb.RecordSparkStageShuffleIoWriteSizeDataPoint(now, stage.ShuffleWriteBytes)
  265. s.mb.RecordSparkStageShuffleIoRecordsDataPoint(now, stage.ShuffleReadRecords, metadata.AttributeDirectionIn)
  266. s.mb.RecordSparkStageShuffleIoRecordsDataPoint(now, stage.ShuffleWriteRecords, metadata.AttributeDirectionOut)
  267. s.mb.RecordSparkStageShuffleWriteTimeDataPoint(now, stage.ShuffleWriteTime)
  268. rb := s.mb.NewResourceBuilder()
  269. rb.SetSparkApplicationID(appID)
  270. rb.SetSparkApplicationName(appName)
  271. rb.SetSparkStageID(stage.StageID)
  272. rb.SetSparkStageAttemptID(stage.AttemptID)
  273. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  274. }
  275. }
  276. func (s *sparkScraper) recordExecutors(executorStats []models.Executor, now pcommon.Timestamp, appID string, appName string) {
  277. for _, executor := range executorStats {
  278. s.mb.RecordSparkExecutorMemoryUsageDataPoint(now, executor.MemoryUsed)
  279. s.mb.RecordSparkExecutorDiskUsageDataPoint(now, executor.DiskUsed)
  280. s.mb.RecordSparkExecutorTaskLimitDataPoint(now, executor.MaxTasks)
  281. s.mb.RecordSparkExecutorTaskActiveDataPoint(now, executor.ActiveTasks)
  282. s.mb.RecordSparkExecutorTaskResultDataPoint(now, executor.FailedTasks, metadata.AttributeExecutorTaskResultFailed)
  283. s.mb.RecordSparkExecutorTaskResultDataPoint(now, executor.CompletedTasks, metadata.AttributeExecutorTaskResultCompleted)
  284. s.mb.RecordSparkExecutorTimeDataPoint(now, executor.TotalDuration)
  285. s.mb.RecordSparkExecutorGcTimeDataPoint(now, executor.TotalGCTime)
  286. s.mb.RecordSparkExecutorInputSizeDataPoint(now, executor.TotalInputBytes)
  287. s.mb.RecordSparkExecutorShuffleIoSizeDataPoint(now, executor.TotalShuffleRead, metadata.AttributeDirectionIn)
  288. s.mb.RecordSparkExecutorShuffleIoSizeDataPoint(now, executor.TotalShuffleWrite, metadata.AttributeDirectionOut)
  289. used := executor.UsedOnHeapStorageMemory
  290. s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, used, metadata.AttributeLocationOnHeap, metadata.AttributeStateUsed)
  291. s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, executor.TotalOnHeapStorageMemory-used, metadata.AttributeLocationOnHeap, metadata.AttributeStateFree)
  292. used = executor.UsedOffHeapStorageMemory
  293. s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, used, metadata.AttributeLocationOffHeap, metadata.AttributeStateUsed)
  294. s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, executor.TotalOffHeapStorageMemory-used, metadata.AttributeLocationOffHeap, metadata.AttributeStateFree)
  295. rb := s.mb.NewResourceBuilder()
  296. rb.SetSparkApplicationID(appID)
  297. rb.SetSparkApplicationName(appName)
  298. rb.SetSparkExecutorID(executor.ExecutorID)
  299. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  300. }
  301. }
  302. func (s *sparkScraper) recordJobs(jobStats []models.Job, now pcommon.Timestamp, appID string, appName string) {
  303. for _, job := range jobStats {
  304. s.mb.RecordSparkJobTaskActiveDataPoint(now, job.NumActiveTasks)
  305. s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumCompletedTasks, metadata.AttributeJobResultCompleted)
  306. s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumSkippedTasks, metadata.AttributeJobResultSkipped)
  307. s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumFailedTasks, metadata.AttributeJobResultFailed)
  308. s.mb.RecordSparkJobStageActiveDataPoint(now, job.NumActiveStages)
  309. s.mb.RecordSparkJobStageResultDataPoint(now, job.NumCompletedStages, metadata.AttributeJobResultCompleted)
  310. s.mb.RecordSparkJobStageResultDataPoint(now, job.NumSkippedStages, metadata.AttributeJobResultSkipped)
  311. s.mb.RecordSparkJobStageResultDataPoint(now, job.NumFailedStages, metadata.AttributeJobResultFailed)
  312. rb := s.mb.NewResourceBuilder()
  313. rb.SetSparkApplicationID(appID)
  314. rb.SetSparkApplicationName(appName)
  315. rb.SetSparkJobID(job.JobID)
  316. s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
  317. }
  318. }