staleness_end_to_end_test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal_test
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/http/httptest"
  10. "net/url"
  11. "os"
  12. "strings"
  13. "sync/atomic"
  14. "testing"
  15. "time"
  16. "github.com/gogo/protobuf/proto"
  17. "github.com/golang/snappy"
  18. "github.com/prometheus/prometheus/model/value"
  19. "github.com/prometheus/prometheus/prompb"
  20. "github.com/stretchr/testify/assert"
  21. "github.com/stretchr/testify/require"
  22. "go.opentelemetry.io/collector/component"
  23. "go.opentelemetry.io/collector/confmap"
  24. "go.opentelemetry.io/collector/confmap/provider/fileprovider"
  25. "go.opentelemetry.io/collector/exporter"
  26. "go.opentelemetry.io/collector/otelcol"
  27. "go.opentelemetry.io/collector/processor"
  28. "go.opentelemetry.io/collector/processor/batchprocessor"
  29. "go.opentelemetry.io/collector/receiver"
  30. "go.uber.org/zap"
  31. "go.uber.org/zap/zapcore"
  32. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
  33. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
  34. )
  35. // Test that staleness markers are emitted for timeseries that intermittently disappear.
  36. // This test runs the entire collector and end-to-end scrapes then checks with the
  37. // Prometheus remotewrite exporter that staleness markers are emitted per timeseries.
  38. // See https://github.com/open-telemetry/opentelemetry-collector/issues/3413
  39. func TestStalenessMarkersEndToEnd(t *testing.T) {
  40. if testing.Short() {
  41. t.Skip("This test can take a long time")
  42. }
  43. ctx, cancel := context.WithCancel(context.Background())
  44. // 1. Setup the server that sends series that intermittently appear and disappear.
  45. n := &atomic.Uint64{}
  46. scrapeServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
  47. // Increment the scrape count atomically per scrape.
  48. i := n.Add(1)
  49. select {
  50. case <-ctx.Done():
  51. return
  52. default:
  53. }
  54. // Alternate metrics per scrape so that every one of
  55. // them will be reported as stale.
  56. if i%2 == 0 {
  57. fmt.Fprintf(rw, `
  58. # HELP jvm_memory_bytes_used Used bytes of a given JVM memory area.
  59. # TYPE jvm_memory_bytes_used gauge
  60. jvm_memory_bytes_used{area="heap"} %.1f`, float64(i))
  61. } else {
  62. fmt.Fprintf(rw, `
  63. # HELP jvm_memory_pool_bytes_used Used bytes of a given JVM memory pool.
  64. # TYPE jvm_memory_pool_bytes_used gauge
  65. jvm_memory_pool_bytes_used{pool="CodeHeap 'non-nmethods'"} %.1f`, float64(i))
  66. }
  67. }))
  68. defer scrapeServer.Close()
  69. serverURL, err := url.Parse(scrapeServer.URL)
  70. require.NoError(t, err)
  71. // 2. Set up the Prometheus RemoteWrite endpoint.
  72. prweUploads := make(chan *prompb.WriteRequest)
  73. prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
  74. // Snappy decode the uploads.
  75. payload, rerr := io.ReadAll(req.Body)
  76. require.NoError(t, rerr)
  77. recv := make([]byte, len(payload))
  78. decoded, derr := snappy.Decode(recv, payload)
  79. require.NoError(t, derr)
  80. writeReq := new(prompb.WriteRequest)
  81. require.NoError(t, proto.Unmarshal(decoded, writeReq))
  82. select {
  83. case <-ctx.Done():
  84. return
  85. case prweUploads <- writeReq:
  86. }
  87. }))
  88. defer prweServer.Close()
  89. // 3. Set the OpenTelemetry Prometheus receiver.
  90. cfg := fmt.Sprintf(`
  91. receivers:
  92. prometheus:
  93. config:
  94. scrape_configs:
  95. - job_name: 'test'
  96. scrape_interval: 100ms
  97. static_configs:
  98. - targets: [%q]
  99. processors:
  100. batch:
  101. exporters:
  102. prometheusremotewrite:
  103. endpoint: %q
  104. tls:
  105. insecure: true
  106. service:
  107. pipelines:
  108. metrics:
  109. receivers: [prometheus]
  110. processors: [batch]
  111. exporters: [prometheusremotewrite]`, serverURL.Host, prweServer.URL)
  112. confFile, err := os.CreateTemp(os.TempDir(), "conf-")
  113. require.Nil(t, err)
  114. defer os.Remove(confFile.Name())
  115. _, err = confFile.Write([]byte(cfg))
  116. require.Nil(t, err)
  117. // 4. Run the OpenTelemetry Collector.
  118. receivers, err := receiver.MakeFactoryMap(prometheusreceiver.NewFactory())
  119. require.Nil(t, err)
  120. exporters, err := exporter.MakeFactoryMap(prometheusremotewriteexporter.NewFactory())
  121. require.Nil(t, err)
  122. processors, err := processor.MakeFactoryMap(batchprocessor.NewFactory())
  123. require.Nil(t, err)
  124. factories := otelcol.Factories{
  125. Receivers: receivers,
  126. Exporters: exporters,
  127. Processors: processors,
  128. }
  129. fmp := fileprovider.New()
  130. configProvider, err := otelcol.NewConfigProvider(
  131. otelcol.ConfigProviderSettings{
  132. ResolverSettings: confmap.ResolverSettings{
  133. URIs: []string{confFile.Name()},
  134. Providers: map[string]confmap.Provider{fmp.Scheme(): fmp},
  135. },
  136. })
  137. require.NoError(t, err)
  138. appSettings := otelcol.CollectorSettings{
  139. Factories: func() (otelcol.Factories, error) { return factories, nil },
  140. ConfigProvider: configProvider,
  141. BuildInfo: component.BuildInfo{
  142. Command: "otelcol",
  143. Description: "OpenTelemetry Collector",
  144. Version: "tests",
  145. },
  146. LoggingOptions: []zap.Option{
  147. // Turn off the verbose logging from the collector.
  148. zap.WrapCore(func(zapcore.Core) zapcore.Core {
  149. return zapcore.NewNopCore()
  150. }),
  151. },
  152. }
  153. app, err := otelcol.NewCollector(appSettings)
  154. require.Nil(t, err)
  155. go func() {
  156. assert.NoError(t, app.Run(context.Background()))
  157. }()
  158. defer app.Shutdown()
  159. // Wait until the collector has actually started.
  160. for notYetStarted := true; notYetStarted; {
  161. state := app.GetState()
  162. switch state {
  163. case otelcol.StateRunning, otelcol.StateClosed, otelcol.StateClosing:
  164. notYetStarted = false
  165. case otelcol.StateStarting:
  166. }
  167. time.Sleep(10 * time.Millisecond)
  168. }
  169. // 5. Let's wait on 10 fetches.
  170. var wReqL []*prompb.WriteRequest
  171. for i := 0; i < 10; i++ {
  172. wReqL = append(wReqL, <-prweUploads)
  173. }
  174. defer cancel()
  175. // 6. Assert that we encounter the stale markers aka special NaNs for the various time series.
  176. staleMarkerCount := 0
  177. totalSamples := 0
  178. require.True(t, len(wReqL) > 0, "Expecting at least one WriteRequest")
  179. for i, wReq := range wReqL {
  180. name := fmt.Sprintf("WriteRequest#%d", i)
  181. require.True(t, len(wReq.Timeseries) > 0, "Expecting at least 1 timeSeries for:: "+name)
  182. for j, ts := range wReq.Timeseries {
  183. fullName := fmt.Sprintf("%s/TimeSeries#%d", name, j)
  184. assert.True(t, len(ts.Samples) > 0, "Expected at least 1 Sample in:: "+fullName)
  185. // We are strictly counting series directly included in the scrapes, and no
  186. // internal timeseries like "up" nor "scrape_seconds" etc.
  187. metricName := ""
  188. for _, label := range ts.Labels {
  189. if label.Name == "__name__" {
  190. metricName = label.Value
  191. }
  192. }
  193. if !strings.HasPrefix(metricName, "jvm") {
  194. continue
  195. }
  196. for _, sample := range ts.Samples {
  197. totalSamples++
  198. if value.IsStaleNaN(sample.Value) {
  199. staleMarkerCount++
  200. }
  201. }
  202. }
  203. }
  204. require.True(t, totalSamples > 0, "Expected at least 1 sample")
  205. // On every alternative scrape the prior scrape will be reported as sale.
  206. // Expect at least:
  207. // * The first scrape will NOT return stale markers
  208. // * (N-1 / alternatives) = ((10-1) / 2) = ~40% chance of stale markers being emitted.
  209. chance := float64(staleMarkerCount) / float64(totalSamples)
  210. require.True(t, chance >= 0.4, fmt.Sprintf("Expected at least one stale marker: %.3f", chance))
  211. }