metrics_receiver.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
  4. import (
  5. "bytes"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "sync"
  15. "time"
  16. "github.com/go-kit/log"
  17. "github.com/mitchellh/hashstructure/v2"
  18. commonconfig "github.com/prometheus/common/config"
  19. "github.com/prometheus/common/model"
  20. "github.com/prometheus/prometheus/config"
  21. "github.com/prometheus/prometheus/discovery"
  22. promHTTP "github.com/prometheus/prometheus/discovery/http"
  23. "github.com/prometheus/prometheus/scrape"
  24. "go.opentelemetry.io/collector/component"
  25. "go.opentelemetry.io/collector/consumer"
  26. "go.opentelemetry.io/collector/receiver"
  27. "go.uber.org/zap"
  28. "gopkg.in/yaml.v2"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
  30. )
  31. const (
  32. defaultGCInterval = 2 * time.Minute
  33. gcIntervalDelta = 1 * time.Minute
  34. )
  35. // pReceiver is the type that provides Prometheus scraper/receiver functionality.
  36. type pReceiver struct {
  37. cfg *Config
  38. consumer consumer.Metrics
  39. cancelFunc context.CancelFunc
  40. targetAllocatorStop chan struct{}
  41. configLoaded chan struct{}
  42. loadConfigOnce sync.Once
  43. settings receiver.CreateSettings
  44. scrapeManager *scrape.Manager
  45. discoveryManager *discovery.Manager
  46. }
  47. // New creates a new prometheus.Receiver reference.
  48. func newPrometheusReceiver(set receiver.CreateSettings, cfg *Config, next consumer.Metrics) *pReceiver {
  49. pr := &pReceiver{
  50. cfg: cfg,
  51. consumer: next,
  52. settings: set,
  53. configLoaded: make(chan struct{}),
  54. targetAllocatorStop: make(chan struct{}),
  55. }
  56. return pr
  57. }
  58. // Start is the method that starts Prometheus scraping. It
  59. // is controlled by having previously defined a Configuration using perhaps New.
  60. func (r *pReceiver) Start(_ context.Context, host component.Host) error {
  61. discoveryCtx, cancel := context.WithCancel(context.Background())
  62. r.cancelFunc = cancel
  63. logger := internal.NewZapToGokitLogAdapter(r.settings.Logger)
  64. // add scrape configs defined by the collector configs
  65. baseCfg := r.cfg.PrometheusConfig
  66. err := r.initPrometheusComponents(discoveryCtx, host, logger)
  67. if err != nil {
  68. r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err))
  69. return err
  70. }
  71. err = r.applyCfg(baseCfg)
  72. if err != nil {
  73. r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
  74. return err
  75. }
  76. allocConf := r.cfg.TargetAllocator
  77. if allocConf != nil {
  78. err = r.startTargetAllocator(allocConf, baseCfg)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. r.loadConfigOnce.Do(func() {
  84. close(r.configLoaded)
  85. })
  86. return nil
  87. }
  88. func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error {
  89. r.settings.Logger.Info("Starting target allocator discovery")
  90. // immediately sync jobs, not waiting for the first tick
  91. savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
  92. if err != nil {
  93. return err
  94. }
  95. go func() {
  96. targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval)
  97. for {
  98. select {
  99. case <-targetAllocatorIntervalTicker.C:
  100. hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
  101. if newErr != nil {
  102. r.settings.Logger.Error(newErr.Error())
  103. continue
  104. }
  105. savedHash = hash
  106. case <-r.targetAllocatorStop:
  107. targetAllocatorIntervalTicker.Stop()
  108. r.settings.Logger.Info("Stopping target allocator")
  109. return
  110. }
  111. }
  112. }()
  113. return nil
  114. }
  115. // syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
  116. // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
  117. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) {
  118. r.settings.Logger.Debug("Syncing target allocator jobs")
  119. scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint)
  120. if err != nil {
  121. r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
  122. return 0, err
  123. }
  124. hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil)
  125. if err != nil {
  126. r.settings.Logger.Error("Failed to hash job list", zap.Error(err))
  127. return 0, err
  128. }
  129. if hash == compareHash {
  130. // no update needed
  131. return hash, nil
  132. }
  133. // Clear out the current configurations
  134. baseCfg.ScrapeConfigs = []*config.ScrapeConfig{}
  135. for jobName, scrapeConfig := range scrapeConfigsResponse {
  136. var httpSD promHTTP.SDConfig
  137. if allocConf.HTTPSDConfig == nil {
  138. httpSD = promHTTP.SDConfig{
  139. RefreshInterval: model.Duration(30 * time.Second),
  140. }
  141. } else {
  142. httpSD = *allocConf.HTTPSDConfig
  143. }
  144. escapedJob := url.QueryEscape(jobName)
  145. httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID)
  146. httpSD.HTTPClientConfig.FollowRedirects = false
  147. scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
  148. &httpSD,
  149. }
  150. baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig)
  151. }
  152. err = r.applyCfg(baseCfg)
  153. if err != nil {
  154. r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
  155. return 0, err
  156. }
  157. return hash, nil
  158. }
  159. // instantiateShard inserts the SHARD environment variable in the returned configuration
  160. func (r *pReceiver) instantiateShard(body []byte) []byte {
  161. shard, ok := os.LookupEnv("SHARD")
  162. if !ok {
  163. shard = "0"
  164. }
  165. return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard))
  166. }
  167. func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) {
  168. scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL)
  169. _, err := url.Parse(scrapeConfigsURL) // check if valid
  170. if err != nil {
  171. return nil, err
  172. }
  173. resp, err := http.Get(scrapeConfigsURL) //nolint
  174. if err != nil {
  175. return nil, err
  176. }
  177. body, err := io.ReadAll(resp.Body)
  178. if err != nil {
  179. return nil, err
  180. }
  181. jobToScrapeConfig := map[string]*config.ScrapeConfig{}
  182. envReplacedBody := r.instantiateShard(body)
  183. err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig)
  184. if err != nil {
  185. return nil, err
  186. }
  187. err = resp.Body.Close()
  188. if err != nil {
  189. return nil, err
  190. }
  191. return jobToScrapeConfig, nil
  192. }
  193. func (r *pReceiver) applyCfg(cfg *config.Config) error {
  194. if err := r.scrapeManager.ApplyConfig(cfg); err != nil {
  195. return err
  196. }
  197. discoveryCfg := make(map[string]discovery.Configs)
  198. for _, scrapeConfig := range cfg.ScrapeConfigs {
  199. discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
  200. r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName))
  201. }
  202. return r.discoveryManager.ApplyConfig(discoveryCfg)
  203. }
  204. func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component.Host, logger log.Logger) error {
  205. r.discoveryManager = discovery.NewManager(ctx, logger)
  206. go func() {
  207. r.settings.Logger.Info("Starting discovery manager")
  208. if err := r.discoveryManager.Run(); err != nil && !errors.Is(err, context.Canceled) {
  209. r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
  210. host.ReportFatalError(err)
  211. }
  212. }()
  213. var startTimeMetricRegex *regexp.Regexp
  214. if r.cfg.StartTimeMetricRegex != "" {
  215. var err error
  216. startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex)
  217. if err != nil {
  218. return err
  219. }
  220. }
  221. store, err := internal.NewAppendable(
  222. r.consumer,
  223. r.settings,
  224. gcInterval(r.cfg.PrometheusConfig),
  225. r.cfg.UseStartTimeMetric,
  226. startTimeMetricRegex,
  227. useCreatedMetricGate.IsEnabled(),
  228. r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
  229. r.cfg.TrimMetricSuffixes,
  230. )
  231. if err != nil {
  232. return err
  233. }
  234. r.scrapeManager = scrape.NewManager(&scrape.Options{
  235. PassMetadataInContext: true,
  236. EnableProtobufNegotiation: r.cfg.EnableProtobufNegotiation,
  237. ExtraMetrics: r.cfg.ReportExtraScrapeMetrics,
  238. HTTPClientOptions: []commonconfig.HTTPClientOption{
  239. commonconfig.WithUserAgent(r.settings.BuildInfo.Command + "/" + r.settings.BuildInfo.Version),
  240. },
  241. }, logger, store)
  242. go func() {
  243. // The scrape manager needs to wait for the configuration to be loaded before beginning
  244. <-r.configLoaded
  245. r.settings.Logger.Info("Starting scrape manager")
  246. if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
  247. r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
  248. host.ReportFatalError(err)
  249. }
  250. }()
  251. return nil
  252. }
  253. // gcInterval returns the longest scrape interval used by a scrape config,
  254. // plus a delta to prevent race conditions.
  255. // This ensures jobs are not garbage collected between scrapes.
  256. func gcInterval(cfg *config.Config) time.Duration {
  257. gcInterval := defaultGCInterval
  258. if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
  259. gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta
  260. }
  261. for _, scrapeConfig := range cfg.ScrapeConfigs {
  262. if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
  263. gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta
  264. }
  265. }
  266. return gcInterval
  267. }
  268. // Shutdown stops and cancels the underlying Prometheus scrapers.
  269. func (r *pReceiver) Shutdown(context.Context) error {
  270. if r.cancelFunc != nil {
  271. r.cancelFunc()
  272. }
  273. if r.scrapeManager != nil {
  274. r.scrapeManager.Stop()
  275. }
  276. close(r.targetAllocatorStop)
  277. return nil
  278. }