scraper.go 12 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package snowflakereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/snowflakereceiver"
  4. import (
  5. "context"
  6. "time"
  7. "go.opentelemetry.io/collector/component"
  8. "go.opentelemetry.io/collector/pdata/pcommon"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "go.opentelemetry.io/collector/receiver"
  11. "go.opentelemetry.io/collector/receiver/scrapererror"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/snowflakereceiver/internal/metadata"
  13. )
  14. type snowflakeMetricsScraper struct {
  15. client *snowflakeClient
  16. settings component.TelemetrySettings
  17. conf *Config
  18. mb *metadata.MetricsBuilder
  19. }
  20. func newSnowflakeMetricsScraper(settings receiver.CreateSettings, conf *Config) *snowflakeMetricsScraper {
  21. return &snowflakeMetricsScraper{
  22. settings: settings.TelemetrySettings,
  23. conf: conf,
  24. mb: metadata.NewMetricsBuilder(conf.MetricsBuilderConfig, settings),
  25. }
  26. }
  27. // for use with receiver.scraperhelper
  28. func (s *snowflakeMetricsScraper) start(_ context.Context, _ component.Host) (err error) {
  29. s.client, err = newDefaultClient(s.settings, *s.conf)
  30. if err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35. func (s *snowflakeMetricsScraper) shutdown(_ context.Context) (err error) {
  36. if s.client == nil {
  37. return nil
  38. }
  39. err = s.client.client.Close()
  40. return err
  41. }
  42. // wrapper for all of the sub-scraping tasks, implements the scraper interface for
  43. // snowflakeMetricsScraper
  44. func (s *snowflakeMetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
  45. errs := &scrapererror.ScrapeErrors{}
  46. now := pcommon.NewTimestampFromTime(time.Now())
  47. // each client call has its own scrape function
  48. s.scrapeBillingMetrics(ctx, now, *errs)
  49. s.scrapeWarehouseBillingMetrics(ctx, now, *errs)
  50. s.scrapeLoginMetrics(ctx, now, *errs)
  51. s.scrapeHighLevelQueryMetrics(ctx, now, *errs)
  52. s.scrapeDBMetrics(ctx, now, *errs)
  53. s.scrapeSessionMetrics(ctx, now, *errs)
  54. s.scrapeSnowpipeMetrics(ctx, now, *errs)
  55. s.scrapeStorageMetrics(ctx, now, *errs)
  56. rb := s.mb.NewResourceBuilder()
  57. rb.SetSnowflakeAccountName(s.conf.Account)
  58. return s.mb.Emit(metadata.WithResource(rb.Emit())), errs.Combine()
  59. }
  60. func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  61. billingMetrics, err := s.client.FetchBillingMetrics(ctx)
  62. if err != nil {
  63. errs.Add(err)
  64. return
  65. }
  66. for _, row := range *billingMetrics {
  67. s.mb.RecordSnowflakeBillingCloudServiceTotalDataPoint(t, row.totalCloudService, row.serviceType.String)
  68. s.mb.RecordSnowflakeBillingTotalCreditTotalDataPoint(t, row.totalCredits, row.serviceType.String)
  69. s.mb.RecordSnowflakeBillingVirtualWarehouseTotalDataPoint(t, row.totalVirtualWarehouseCredits, row.serviceType.String)
  70. }
  71. }
  72. func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  73. warehouseBillingMetrics, err := s.client.FetchWarehouseBillingMetrics(ctx)
  74. if err != nil {
  75. errs.Add(err)
  76. return
  77. }
  78. for _, row := range *warehouseBillingMetrics {
  79. s.mb.RecordSnowflakeBillingWarehouseTotalCreditTotalDataPoint(t, row.totalCredit, row.warehouseName.String)
  80. s.mb.RecordSnowflakeBillingWarehouseCloudServiceTotalDataPoint(t, row.totalCloudService, row.warehouseName.String)
  81. s.mb.RecordSnowflakeBillingWarehouseVirtualWarehouseTotalDataPoint(t, row.totalVirtualWarehouse, row.warehouseName.String)
  82. }
  83. }
  84. func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  85. loginMetrics, err := s.client.FetchLoginMetrics(ctx)
  86. if err != nil {
  87. errs.Add(err)
  88. return
  89. }
  90. for _, row := range *loginMetrics {
  91. s.mb.RecordSnowflakeLoginsTotalDataPoint(t, row.loginsTotal, row.errorMessage.String, row.reportedClientType.String, row.isSuccess.String)
  92. }
  93. }
  94. func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  95. highLevelQueryMetrics, err := s.client.FetchHighLevelQueryMetrics(ctx)
  96. if err != nil {
  97. errs.Add(err)
  98. return
  99. }
  100. for _, row := range *highLevelQueryMetrics {
  101. s.mb.RecordSnowflakeQueryExecutedDataPoint(t, row.avgQueryExecuted, row.warehouseName.String)
  102. s.mb.RecordSnowflakeQueryBlockedDataPoint(t, row.avgQueryBlocked, row.warehouseName.String)
  103. s.mb.RecordSnowflakeQueryQueuedOverloadDataPoint(t, row.avgQueryQueuedOverload, row.warehouseName.String)
  104. s.mb.RecordSnowflakeQueryQueuedProvisionDataPoint(t, row.avgQueryQueuedProvision, row.warehouseName.String)
  105. }
  106. }
  107. func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  108. DBMetrics, err := s.client.FetchDbMetrics(ctx)
  109. if err != nil {
  110. errs.Add(err)
  111. return
  112. }
  113. for _, row := range *DBMetrics {
  114. s.mb.RecordSnowflakeDatabaseQueryCountDataPoint(t, row.databaseQueryCount, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  115. s.mb.RecordSnowflakeDatabaseBytesScannedAvgDataPoint(t, row.avgBytesScanned, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  116. s.mb.RecordSnowflakeQueryBytesDeletedAvgDataPoint(t, row.avgBytesDeleted, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  117. s.mb.RecordSnowflakeQueryBytesSpilledRemoteAvgDataPoint(t, row.avgBytesSpilledRemote, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  118. s.mb.RecordSnowflakeQueryBytesSpilledLocalAvgDataPoint(t, row.avgBytesSpilledLocal, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  119. s.mb.RecordSnowflakeQueryBytesWrittenAvgDataPoint(t, row.avgBytesWritten, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  120. s.mb.RecordSnowflakeQueryCompilationTimeAvgDataPoint(t, row.avgCompilationTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  121. s.mb.RecordSnowflakeQueryDataScannedCacheAvgDataPoint(t, row.avgDataScannedCache, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  122. s.mb.RecordSnowflakeQueryExecutionTimeAvgDataPoint(t, row.avgExecutionTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  123. s.mb.RecordSnowflakeQueryPartitionsScannedAvgDataPoint(t, row.avgPartitionsScanned, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  124. s.mb.RecordSnowflakeQueuedOverloadTimeAvgDataPoint(t, row.avgQueuedOverloadTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  125. s.mb.RecordSnowflakeQueuedProvisioningTimeAvgDataPoint(t, row.avgQueuedProvisioningTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  126. s.mb.RecordSnowflakeQueuedRepairTimeAvgDataPoint(t, row.avgQueuedRepairTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  127. s.mb.RecordSnowflakeRowsInsertedAvgDataPoint(t, row.avgRowsInserted, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  128. s.mb.RecordSnowflakeRowsDeletedAvgDataPoint(t, row.avgRowsDeleted, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  129. s.mb.RecordSnowflakeRowsProducedAvgDataPoint(t, row.avgRowsProduced, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  130. s.mb.RecordSnowflakeRowsUnloadedAvgDataPoint(t, row.avgRowsUnloaded, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  131. s.mb.RecordSnowflakeRowsUpdatedAvgDataPoint(t, row.avgRowsUpdated, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  132. s.mb.RecordSnowflakeTotalElapsedTimeAvgDataPoint(t, row.avgTotalElapsedTime, row.attributes.schemaName.String, row.attributes.executionStatus.String, row.attributes.errorMessage.String, row.attributes.queryType.String, row.attributes.warehouseName.String, row.attributes.databaseName.String, row.attributes.warehouseSize.String)
  133. }
  134. }
  135. func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  136. sessionMetrics, err := s.client.FetchSessionMetrics(ctx)
  137. if err != nil {
  138. errs.Add(err)
  139. return
  140. }
  141. for _, row := range *sessionMetrics {
  142. s.mb.RecordSnowflakeSessionIDCountDataPoint(t, row.distinctSessionID, row.userName.String)
  143. }
  144. }
  145. func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  146. snowpipeMetrics, err := s.client.FetchSnowpipeMetrics(ctx)
  147. if err != nil {
  148. errs.Add(err)
  149. return
  150. }
  151. for _, row := range *snowpipeMetrics {
  152. s.mb.RecordSnowflakePipeCreditsUsedTotalDataPoint(t, row.creditsUsed, row.pipeName.String)
  153. }
  154. }
  155. func (s *snowflakeMetricsScraper) scrapeStorageMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) {
  156. storageMetrics, err := s.client.FetchStorageMetrics(ctx)
  157. if err != nil {
  158. errs.Add(err)
  159. return
  160. }
  161. for _, row := range *storageMetrics {
  162. s.mb.RecordSnowflakeStorageStorageBytesTotalDataPoint(t, row.storageBytes)
  163. s.mb.RecordSnowflakeStorageStageBytesTotalDataPoint(t, row.stageBytes)
  164. s.mb.RecordSnowflakeStorageFailsafeBytesTotalDataPoint(t, row.failsafeBytes)
  165. }
  166. }