receiver.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudspannerreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver"
  4. import (
  5. "context"
  6. _ "embed"
  7. "fmt"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "go.opentelemetry.io/collector/receiver"
  11. "go.uber.org/zap"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/datasource"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filterfactory"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadataparser"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/statsreader"
  17. )
  18. //go:embed "internal/metadataconfig/metrics.yaml"
  19. var metadataYaml []byte
  20. var _ receiver.Metrics = (*googleCloudSpannerReceiver)(nil)
  21. type googleCloudSpannerReceiver struct {
  22. logger *zap.Logger
  23. config *Config
  24. cancel context.CancelFunc
  25. projectReaders []statsreader.CompositeReader
  26. metricsBuilder metadata.MetricsBuilder
  27. }
  28. func newGoogleCloudSpannerReceiver(logger *zap.Logger, config *Config) *googleCloudSpannerReceiver {
  29. return &googleCloudSpannerReceiver{
  30. logger: logger,
  31. config: config,
  32. }
  33. }
  34. func (r *googleCloudSpannerReceiver) Scrape(ctx context.Context) (pmetric.Metrics, error) {
  35. var allMetricsDataPoints []*metadata.MetricsDataPoint
  36. for _, projectReader := range r.projectReaders {
  37. dataPoints, err := projectReader.Read(ctx)
  38. if err != nil {
  39. return pmetric.Metrics{}, err
  40. }
  41. allMetricsDataPoints = append(allMetricsDataPoints, dataPoints...)
  42. }
  43. return r.metricsBuilder.Build(allMetricsDataPoints)
  44. }
  45. func (r *googleCloudSpannerReceiver) Start(ctx context.Context, _ component.Host) error {
  46. ctx, r.cancel = context.WithCancel(ctx)
  47. err := r.initialize(ctx)
  48. if err != nil {
  49. return err
  50. }
  51. return nil
  52. }
  53. func (r *googleCloudSpannerReceiver) Shutdown(context.Context) error {
  54. for _, projectReader := range r.projectReaders {
  55. projectReader.Shutdown()
  56. }
  57. if r.metricsBuilder == nil {
  58. return nil
  59. }
  60. err := r.metricsBuilder.Shutdown()
  61. if err != nil {
  62. return err
  63. }
  64. r.cancel()
  65. return nil
  66. }
  67. func (r *googleCloudSpannerReceiver) initialize(ctx context.Context) error {
  68. parsedMetadata, err := metadataparser.ParseMetadataConfig(metadataYaml)
  69. if err != nil {
  70. return fmt.Errorf("error occurred during parsing of metadata: %w", err)
  71. }
  72. err = r.initializeProjectReaders(ctx, parsedMetadata)
  73. if err != nil {
  74. return err
  75. }
  76. return r.initializeMetricsBuilder(parsedMetadata)
  77. }
  78. func (r *googleCloudSpannerReceiver) initializeProjectReaders(ctx context.Context,
  79. parsedMetadata []*metadata.MetricsMetadata) error {
  80. readerConfig := statsreader.ReaderConfig{
  81. BackfillEnabled: r.config.BackfillEnabled,
  82. TopMetricsQueryMaxRows: r.config.TopMetricsQueryMaxRows,
  83. HideTopnLockstatsRowrangestartkey: r.config.HideTopnLockstatsRowrangestartkey,
  84. TruncateText: r.config.TruncateText,
  85. }
  86. for _, project := range r.config.Projects {
  87. projectReader, err := newProjectReader(ctx, r.logger, project, parsedMetadata, readerConfig)
  88. if err != nil {
  89. return err
  90. }
  91. r.projectReaders = append(r.projectReaders, projectReader)
  92. }
  93. return nil
  94. }
  95. func (r *googleCloudSpannerReceiver) initializeMetricsBuilder(parsedMetadata []*metadata.MetricsMetadata) error {
  96. r.logger.Debug("Constructing metrics builder")
  97. projectAmount := len(r.config.Projects)
  98. instanceAmount := 0
  99. databaseAmount := 0
  100. for _, project := range r.config.Projects {
  101. instanceAmount += len(project.Instances)
  102. for _, instance := range project.Instances {
  103. databaseAmount += len(instance.Databases)
  104. }
  105. }
  106. factoryConfig := &filterfactory.ItemFilterFactoryConfig{
  107. MetadataItems: parsedMetadata,
  108. TotalLimit: r.config.CardinalityTotalLimit,
  109. ProjectAmount: projectAmount,
  110. InstanceAmount: instanceAmount,
  111. DatabaseAmount: databaseAmount,
  112. }
  113. itemFilterResolver, err := filterfactory.NewItemFilterResolver(r.logger, factoryConfig)
  114. if err != nil {
  115. return err
  116. }
  117. r.metricsBuilder = metadata.NewMetricsFromDataPointBuilder(itemFilterResolver)
  118. return nil
  119. }
  120. func newProjectReader(ctx context.Context, logger *zap.Logger, project Project, parsedMetadata []*metadata.MetricsMetadata,
  121. readerConfig statsreader.ReaderConfig) (*statsreader.ProjectReader, error) {
  122. logger.Debug("Constructing project reader for project", zap.String("project id", project.ID))
  123. databaseReadersCount := 0
  124. for _, instance := range project.Instances {
  125. databaseReadersCount += len(instance.Databases)
  126. }
  127. databaseReaders := make([]statsreader.CompositeReader, databaseReadersCount)
  128. databaseReaderIndex := 0
  129. for _, instance := range project.Instances {
  130. for _, database := range instance.Databases {
  131. logger.Debug("Constructing database reader for combination of project, instance, database",
  132. zap.String("project id", project.ID), zap.String("instance id", instance.ID), zap.String("database", database))
  133. databaseID := datasource.NewDatabaseID(project.ID, instance.ID, database)
  134. databaseReader, err := statsreader.NewDatabaseReader(ctx, parsedMetadata, databaseID,
  135. project.ServiceAccountKey, readerConfig, logger)
  136. if err != nil {
  137. return nil, err
  138. }
  139. databaseReaders[databaseReaderIndex] = databaseReader
  140. databaseReaderIndex++
  141. }
  142. }
  143. return statsreader.NewProjectReader(databaseReaders, logger), nil
  144. }