123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package apachesparkreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver"
- import (
- "context"
- "errors"
- "fmt"
- "time"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/scrapererror"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/metadata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/models"
- )
- var (
- errFailedAppIDCollection = errors.New("failed to retrieve app ids")
- errNoMatchingAllowedApps = errors.New("no apps matched allowed names")
- )
- type sparkScraper struct {
- client client
- logger *zap.Logger
- config *Config
- settings component.TelemetrySettings
- mb *metadata.MetricsBuilder
- }
- func newSparkScraper(logger *zap.Logger, cfg *Config, settings receiver.CreateSettings) *sparkScraper {
- return &sparkScraper{
- logger: logger,
- config: cfg,
- settings: settings.TelemetrySettings,
- mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
- }
- }
- func (s *sparkScraper) start(_ context.Context, host component.Host) (err error) {
- httpClient, err := newApacheSparkClient(s.config, host, s.settings)
- if err != nil {
- return fmt.Errorf("failed to start: %w", err)
- }
- s.client = httpClient
- return nil
- }
- func (s *sparkScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
- now := pcommon.NewTimestampFromTime(time.Now())
- var scrapeErrors scrapererror.ScrapeErrors
- // Call applications endpoint to get ids and names for all apps in the cluster
- apps, err := s.client.Applications()
- if err != nil {
- return pmetric.NewMetrics(), errFailedAppIDCollection
- }
- // Check apps against allowed app names from config
- var allowedApps []models.Application
- // If no app names specified, allow all apps
- switch {
- case len(s.config.ApplicationNames) == 0:
- allowedApps = apps
- default:
- // Some allowed app names specified, compare to app names from applications endpoint
- appMap := make(map[string][]models.Application)
- for _, app := range apps {
- appMap[app.Name] = append(appMap[app.Name], app)
- }
- for _, name := range s.config.ApplicationNames {
- if apps, ok := appMap[name]; ok {
- allowedApps = append(allowedApps, apps...)
- }
- }
- if len(allowedApps) == 0 {
- return pmetric.NewMetrics(), errNoMatchingAllowedApps
- }
- }
- // Get stats from the 'metrics' endpoint
- clusterStats, err := s.client.ClusterStats()
- if err != nil {
- scrapeErrors.AddPartial(32, err)
- s.logger.Warn("Failed to scrape cluster stats", zap.Error(err))
- } else {
- for _, app := range allowedApps {
- s.recordCluster(clusterStats, now, app.ApplicationID, app.Name)
- }
- }
- // For each application id, get stats from stages & executors endpoints
- for _, app := range allowedApps {
- stageStats, err := s.client.StageStats(app.ApplicationID)
- if err != nil {
- scrapeErrors.AddPartial(24, err)
- s.logger.Warn("Failed to scrape stage stats", zap.Error(err))
- } else {
- s.recordStages(stageStats, now, app.ApplicationID, app.Name)
- }
- executorStats, err := s.client.ExecutorStats(app.ApplicationID)
- if err != nil {
- scrapeErrors.AddPartial(13, err)
- s.logger.Warn("Failed to scrape executor stats", zap.Error(err))
- } else {
- s.recordExecutors(executorStats, now, app.ApplicationID, app.Name)
- }
- jobStats, err := s.client.JobStats(app.ApplicationID)
- if err != nil {
- scrapeErrors.AddPartial(8, err)
- s.logger.Warn("Failed to scrape job stats", zap.Error(err))
- } else {
- s.recordJobs(jobStats, now, app.ApplicationID, app.Name)
- }
- }
- return s.mb.Emit(), scrapeErrors.Combine()
- }
- func (s *sparkScraper) recordCluster(clusterStats *models.ClusterProperties, now pcommon.Timestamp, appID string, appName string) {
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.disk.diskSpaceUsed_MB", appID)]; ok {
- s.mb.RecordSparkDriverBlockManagerDiskUsageDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.offHeapMemUsed_MB", appID)]; ok {
- s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap, metadata.AttributeStateUsed)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.onHeapMemUsed_MB", appID)]; ok {
- s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap, metadata.AttributeStateUsed)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.remainingOffHeapMem_MB", appID)]; ok {
- s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap, metadata.AttributeStateFree)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.BlockManager.memory.remainingOnHeapMem_MB", appID)]; ok {
- s.mb.RecordSparkDriverBlockManagerMemoryUsageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap, metadata.AttributeStateFree)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.fileCacheHits", appID)]; ok {
- s.mb.RecordSparkDriverHiveExternalCatalogFileCacheHitsDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.filesDiscovered", appID)]; ok {
- s.mb.RecordSparkDriverHiveExternalCatalogFilesDiscoveredDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.hiveClientCalls", appID)]; ok {
- s.mb.RecordSparkDriverHiveExternalCatalogHiveClientCallsDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.parallelListingJobCount", appID)]; ok {
- s.mb.RecordSparkDriverHiveExternalCatalogParallelListingJobsDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.HiveExternalCatalog.partitionsFetched", appID)]; ok {
- s.mb.RecordSparkDriverHiveExternalCatalogPartitionsFetchedDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.compilationTime", appID)]; ok {
- s.mb.RecordSparkDriverCodeGeneratorCompilationCountDataPoint(now, stat.Count)
- s.mb.RecordSparkDriverCodeGeneratorCompilationAverageTimeDataPoint(now, stat.Mean)
- }
- if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.generatedClassSize", appID)]; ok {
- s.mb.RecordSparkDriverCodeGeneratorGeneratedClassCountDataPoint(now, stat.Count)
- s.mb.RecordSparkDriverCodeGeneratorGeneratedClassAverageSizeDataPoint(now, stat.Mean)
- }
- if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.generatedMethodSize", appID)]; ok {
- s.mb.RecordSparkDriverCodeGeneratorGeneratedMethodCountDataPoint(now, stat.Count)
- s.mb.RecordSparkDriverCodeGeneratorGeneratedMethodAverageSizeDataPoint(now, stat.Mean)
- }
- if stat, ok := clusterStats.Histograms[fmt.Sprintf("%s.driver.CodeGenerator.sourceCodeSize", appID)]; ok {
- s.mb.RecordSparkDriverCodeGeneratorSourceCodeOperationsDataPoint(now, stat.Count)
- s.mb.RecordSparkDriverCodeGeneratorSourceCodeAverageSizeDataPoint(now, stat.Mean)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.job.activeJobs", appID)]; ok {
- s.mb.RecordSparkDriverDagSchedulerJobActiveDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.job.allJobs", appID)]; ok {
- s.mb.RecordSparkDriverDagSchedulerJobCountDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.failedStages", appID)]; ok {
- s.mb.RecordSparkDriverDagSchedulerStageFailedDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.runningStages", appID)]; ok {
- s.mb.RecordSparkDriverDagSchedulerStageCountDataPoint(now, int64(stat.Value), metadata.AttributeSchedulerStatusRunning)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.DAGScheduler.stage.waitingStages", appID)]; ok {
- s.mb.RecordSparkDriverDagSchedulerStageCountDataPoint(now, int64(stat.Value), metadata.AttributeSchedulerStatusWaiting)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.LiveListenerBus.numEventsPosted", appID)]; ok {
- s.mb.RecordSparkDriverLiveListenerBusPostedDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Timers[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.listenerProcessingTime", appID)]; ok {
- s.mb.RecordSparkDriverLiveListenerBusProcessingTimeAverageDataPoint(now, stat.Mean)
- }
- if stat, ok := clusterStats.Counters[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.numDroppedEvents", appID)]; ok {
- s.mb.RecordSparkDriverLiveListenerBusDroppedDataPoint(now, stat.Count)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.LiveListenerBus.queue.appStatus.size", appID)]; ok {
- s.mb.RecordSparkDriverLiveListenerBusQueueSizeDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.JVMCPU.jvmCpuTime", appID)]; ok {
- s.mb.RecordSparkDriverJvmCPUTimeDataPoint(now, int64(stat.Value))
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.JVMOffHeapMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryJvmDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.JVMHeapMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryJvmDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OffHeapExecutionMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryExecutionDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OnHeapExecutionMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryExecutionDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OffHeapStorageMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryStorageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOffHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.OnHeapStorageMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryStorageDataPoint(now, int64(stat.Value), metadata.AttributeLocationOnHeap)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.DirectPoolMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryPoolDataPoint(now, int64(stat.Value), metadata.AttributePoolMemoryTypeDirect)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MappedPoolMemory", appID)]; ok {
- s.mb.RecordSparkDriverExecutorMemoryPoolDataPoint(now, int64(stat.Value), metadata.AttributePoolMemoryTypeMapped)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MinorGCCount", appID)]; ok {
- s.mb.RecordSparkDriverExecutorGcOperationsDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMinor)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MajorGCCount", appID)]; ok {
- s.mb.RecordSparkDriverExecutorGcOperationsDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMajor)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MinorGCTime", appID)]; ok {
- s.mb.RecordSparkDriverExecutorGcTimeDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMinor)
- }
- if stat, ok := clusterStats.Gauges[fmt.Sprintf("%s.driver.ExecutorMetrics.MajorGCTime", appID)]; ok {
- s.mb.RecordSparkDriverExecutorGcTimeDataPoint(now, int64(stat.Value), metadata.AttributeGcTypeMajor)
- }
- rb := s.mb.NewResourceBuilder()
- rb.SetSparkApplicationID(appID)
- rb.SetSparkApplicationName(appName)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- func (s *sparkScraper) recordStages(stageStats []models.Stage, now pcommon.Timestamp, appID string, appName string) {
- for _, stage := range stageStats {
- switch stage.Status {
- case "ACTIVE":
- s.mb.RecordSparkStageStatusDataPoint(now, 0, true, false, false, false)
- case "COMPLETE":
- s.mb.RecordSparkStageStatusDataPoint(now, 0, false, true, false, false)
- case "PENDING":
- s.mb.RecordSparkStageStatusDataPoint(now, 0, false, false, true, false)
- case "FAILED":
- s.mb.RecordSparkStageStatusDataPoint(now, 0, false, false, false, true)
- default:
- s.logger.Warn("Unsupported Spark stage status supplied: ignoring this stage's metrics and continuing to metrics for next stage", zap.String("status", stage.Status))
- continue
- }
- s.mb.RecordSparkStageTaskActiveDataPoint(now, stage.NumActiveTasks)
- s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumCompleteTasks, metadata.AttributeStageTaskResultCompleted)
- s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumFailedTasks, metadata.AttributeStageTaskResultFailed)
- s.mb.RecordSparkStageTaskResultDataPoint(now, stage.NumKilledTasks, metadata.AttributeStageTaskResultKilled)
- s.mb.RecordSparkStageExecutorRunTimeDataPoint(now, stage.ExecutorRunTime)
- s.mb.RecordSparkStageExecutorCPUTimeDataPoint(now, stage.ExecutorCPUTime)
- s.mb.RecordSparkStageTaskResultSizeDataPoint(now, stage.ResultSize)
- s.mb.RecordSparkStageJvmGcTimeDataPoint(now, stage.JvmGcTime)
- s.mb.RecordSparkStageMemorySpilledDataPoint(now, stage.MemoryBytesSpilled)
- s.mb.RecordSparkStageDiskSpilledDataPoint(now, stage.DiskBytesSpilled)
- s.mb.RecordSparkStageMemoryPeakDataPoint(now, stage.PeakExecutionMemory)
- s.mb.RecordSparkStageIoSizeDataPoint(now, stage.InputBytes, metadata.AttributeDirectionIn)
- s.mb.RecordSparkStageIoSizeDataPoint(now, stage.OutputBytes, metadata.AttributeDirectionOut)
- s.mb.RecordSparkStageIoRecordsDataPoint(now, stage.InputRecords, metadata.AttributeDirectionIn)
- s.mb.RecordSparkStageIoRecordsDataPoint(now, stage.OutputRecords, metadata.AttributeDirectionOut)
- s.mb.RecordSparkStageShuffleBlocksFetchedDataPoint(now, stage.ShuffleRemoteBlocksFetched, metadata.AttributeSourceRemote)
- s.mb.RecordSparkStageShuffleBlocksFetchedDataPoint(now, stage.ShuffleLocalBlocksFetched, metadata.AttributeSourceLocal)
- s.mb.RecordSparkStageShuffleFetchWaitTimeDataPoint(now, stage.ShuffleFetchWaitTime)
- s.mb.RecordSparkStageShuffleIoDiskDataPoint(now, stage.ShuffleRemoteBytesReadToDisk)
- s.mb.RecordSparkStageShuffleIoReadSizeDataPoint(now, stage.ShuffleLocalBytesRead, metadata.AttributeSourceLocal)
- s.mb.RecordSparkStageShuffleIoReadSizeDataPoint(now, stage.ShuffleRemoteBytesRead, metadata.AttributeSourceRemote)
- s.mb.RecordSparkStageShuffleIoWriteSizeDataPoint(now, stage.ShuffleWriteBytes)
- s.mb.RecordSparkStageShuffleIoRecordsDataPoint(now, stage.ShuffleReadRecords, metadata.AttributeDirectionIn)
- s.mb.RecordSparkStageShuffleIoRecordsDataPoint(now, stage.ShuffleWriteRecords, metadata.AttributeDirectionOut)
- s.mb.RecordSparkStageShuffleWriteTimeDataPoint(now, stage.ShuffleWriteTime)
- rb := s.mb.NewResourceBuilder()
- rb.SetSparkApplicationID(appID)
- rb.SetSparkApplicationName(appName)
- rb.SetSparkStageID(stage.StageID)
- rb.SetSparkStageAttemptID(stage.AttemptID)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
- func (s *sparkScraper) recordExecutors(executorStats []models.Executor, now pcommon.Timestamp, appID string, appName string) {
- for _, executor := range executorStats {
- s.mb.RecordSparkExecutorMemoryUsageDataPoint(now, executor.MemoryUsed)
- s.mb.RecordSparkExecutorDiskUsageDataPoint(now, executor.DiskUsed)
- s.mb.RecordSparkExecutorTaskLimitDataPoint(now, executor.MaxTasks)
- s.mb.RecordSparkExecutorTaskActiveDataPoint(now, executor.ActiveTasks)
- s.mb.RecordSparkExecutorTaskResultDataPoint(now, executor.FailedTasks, metadata.AttributeExecutorTaskResultFailed)
- s.mb.RecordSparkExecutorTaskResultDataPoint(now, executor.CompletedTasks, metadata.AttributeExecutorTaskResultCompleted)
- s.mb.RecordSparkExecutorTimeDataPoint(now, executor.TotalDuration)
- s.mb.RecordSparkExecutorGcTimeDataPoint(now, executor.TotalGCTime)
- s.mb.RecordSparkExecutorInputSizeDataPoint(now, executor.TotalInputBytes)
- s.mb.RecordSparkExecutorShuffleIoSizeDataPoint(now, executor.TotalShuffleRead, metadata.AttributeDirectionIn)
- s.mb.RecordSparkExecutorShuffleIoSizeDataPoint(now, executor.TotalShuffleWrite, metadata.AttributeDirectionOut)
- used := executor.UsedOnHeapStorageMemory
- s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, used, metadata.AttributeLocationOnHeap, metadata.AttributeStateUsed)
- s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, executor.TotalOnHeapStorageMemory-used, metadata.AttributeLocationOnHeap, metadata.AttributeStateFree)
- used = executor.UsedOffHeapStorageMemory
- s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, used, metadata.AttributeLocationOffHeap, metadata.AttributeStateUsed)
- s.mb.RecordSparkExecutorStorageMemoryUsageDataPoint(now, executor.TotalOffHeapStorageMemory-used, metadata.AttributeLocationOffHeap, metadata.AttributeStateFree)
- rb := s.mb.NewResourceBuilder()
- rb.SetSparkApplicationID(appID)
- rb.SetSparkApplicationName(appName)
- rb.SetSparkExecutorID(executor.ExecutorID)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
- func (s *sparkScraper) recordJobs(jobStats []models.Job, now pcommon.Timestamp, appID string, appName string) {
- for _, job := range jobStats {
- s.mb.RecordSparkJobTaskActiveDataPoint(now, job.NumActiveTasks)
- s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumCompletedTasks, metadata.AttributeJobResultCompleted)
- s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumSkippedTasks, metadata.AttributeJobResultSkipped)
- s.mb.RecordSparkJobTaskResultDataPoint(now, job.NumFailedTasks, metadata.AttributeJobResultFailed)
- s.mb.RecordSparkJobStageActiveDataPoint(now, job.NumActiveStages)
- s.mb.RecordSparkJobStageResultDataPoint(now, job.NumCompletedStages, metadata.AttributeJobResultCompleted)
- s.mb.RecordSparkJobStageResultDataPoint(now, job.NumSkippedStages, metadata.AttributeJobResultSkipped)
- s.mb.RecordSparkJobStageResultDataPoint(now, job.NumFailedStages, metadata.AttributeJobResultFailed)
- rb := s.mb.NewResourceBuilder()
- rb.SetSparkApplicationID(appID)
- rb.SetSparkApplicationName(appName)
- rb.SetSparkJobID(job.JobID)
- s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
- }
- }
|