receivers_test.go 16 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. // Skip tests on Windows temporarily, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11451
  4. //go:build !windows
  5. // +build !windows
  6. package main
  7. import (
  8. "context"
  9. "errors"
  10. "path/filepath"
  11. "runtime"
  12. "testing"
  13. promconfig "github.com/prometheus/prometheus/config"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. "go.opentelemetry.io/collector/component"
  17. "go.opentelemetry.io/collector/consumer/consumertest"
  18. "go.opentelemetry.io/collector/receiver"
  19. "go.opentelemetry.io/collector/receiver/receivertest"
  20. tcpop "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
  23. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
  24. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver"
  25. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
  26. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/chronyreceiver"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
  28. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver"
  30. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver"
  31. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver"
  32. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
  33. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver"
  34. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/snmpreceiver"
  35. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver"
  36. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver"
  37. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver"
  38. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver"
  39. )
  40. func TestDefaultReceivers(t *testing.T) {
  41. allFactories, err := components()
  42. assert.NoError(t, err)
  43. rcvrFactories := allFactories.Receivers
  44. tests := []struct {
  45. getConfigFn getReceiverConfigFn
  46. receiver component.Type
  47. skipLifecyle bool
  48. }{
  49. {
  50. receiver: "active_directory_ds",
  51. skipLifecyle: true, // Requires a running windows service
  52. },
  53. {
  54. receiver: "aerospike",
  55. },
  56. {
  57. receiver: "apache",
  58. },
  59. {
  60. receiver: "apachespark",
  61. },
  62. {
  63. receiver: "awscloudwatch",
  64. getConfigFn: func() component.Config {
  65. cfg := rcvrFactories["awscloudwatch"].CreateDefaultConfig().(*awscloudwatchreceiver.Config)
  66. cfg.Region = "us-west-2"
  67. cfg.Logs.Groups = awscloudwatchreceiver.GroupConfig{AutodiscoverConfig: nil}
  68. return cfg
  69. },
  70. },
  71. {
  72. receiver: "awscontainerinsightreceiver",
  73. // TODO: skipped since it will only function in a container environment with procfs in expected location.
  74. skipLifecyle: true,
  75. },
  76. {
  77. receiver: "awsecscontainermetrics",
  78. skipLifecyle: true, // Requires container metaendpoint to be running
  79. },
  80. {
  81. receiver: "awsfirehose",
  82. },
  83. {
  84. receiver: "awsxray",
  85. skipLifecyle: true, // Requires AWS endpoint to check identity to run
  86. },
  87. {
  88. receiver: "azureblob",
  89. getConfigFn: func() component.Config {
  90. cfg := rcvrFactories["azureblob"].CreateDefaultConfig().(*azureblobreceiver.Config)
  91. cfg.ConnectionString = "DefaultEndpointsProtocol=http;AccountName=accountName;AccountKey=accountKey==;BlobEndpoint=test"
  92. cfg.EventHub.EndPoint = "DefaultEndpointsProtocol=http;SharedAccessKeyName=secret;SharedAccessKey=secret;Endpoint=test.test"
  93. return cfg
  94. },
  95. skipLifecyle: true, // Requires Azure event hub to run
  96. },
  97. {
  98. receiver: "azureeventhub",
  99. getConfigFn: func() component.Config {
  100. cfg := rcvrFactories["azureeventhub"].CreateDefaultConfig().(*azureeventhubreceiver.Config)
  101. cfg.Connection = "Endpoint=sb://example.com/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
  102. return cfg
  103. },
  104. skipLifecyle: true, // Requires Azure event hub to run
  105. },
  106. {
  107. receiver: "azuremonitor",
  108. getConfigFn: func() component.Config {
  109. cfg := rcvrFactories["azuremonitor"].CreateDefaultConfig().(*azuremonitorreceiver.Config)
  110. cfg.TenantID = "tenant_id"
  111. cfg.SubscriptionID = "subscription_id"
  112. cfg.ClientID = "client_id"
  113. cfg.ClientSecret = "client_secret"
  114. return cfg
  115. },
  116. skipLifecyle: true, // Requires Azure event hub to run
  117. },
  118. {
  119. receiver: "bigip",
  120. },
  121. {
  122. receiver: "carbon",
  123. getConfigFn: func() component.Config {
  124. cfg := rcvrFactories["carbon"].CreateDefaultConfig().(*carbonreceiver.Config)
  125. cfg.Endpoint = "0.0.0.0:0"
  126. return cfg
  127. },
  128. skipLifecyle: true, // Panics after test have completed, requires a wait group
  129. },
  130. {
  131. receiver: "cloudflare",
  132. skipLifecyle: true,
  133. },
  134. {
  135. receiver: "cloudfoundry",
  136. skipLifecyle: true, // Requires UAA (auth) endpoint to run
  137. },
  138. {
  139. receiver: "chrony",
  140. getConfigFn: func() component.Config {
  141. cfg := rcvrFactories["chrony"].CreateDefaultConfig().(*chronyreceiver.Config)
  142. cfg.Endpoint = "udp://localhost:323"
  143. return cfg
  144. },
  145. },
  146. {
  147. receiver: "collectd",
  148. },
  149. {
  150. receiver: "couchdb",
  151. },
  152. {
  153. receiver: "datadog",
  154. getConfigFn: func() component.Config {
  155. cfg := rcvrFactories["datadog"].CreateDefaultConfig().(*datadogreceiver.Config)
  156. cfg.Endpoint = "localhost:0" // Using a randomly assigned address
  157. return cfg
  158. },
  159. },
  160. {
  161. receiver: "docker_stats",
  162. skipLifecyle: true,
  163. },
  164. {
  165. receiver: "elasticsearch",
  166. },
  167. {
  168. receiver: "expvar",
  169. },
  170. {
  171. receiver: "filelog",
  172. getConfigFn: func() component.Config {
  173. cfg := rcvrFactories["filelog"].CreateDefaultConfig().(*filelogreceiver.FileLogConfig)
  174. cfg.InputConfig.Include = []string{filepath.Join(t.TempDir(), "*")}
  175. return cfg
  176. },
  177. },
  178. {
  179. receiver: "file",
  180. skipLifecyle: true, // Requires an existing JSONL file
  181. },
  182. {
  183. receiver: "filestats",
  184. },
  185. {
  186. receiver: "flinkmetrics",
  187. },
  188. {
  189. receiver: "fluentforward",
  190. },
  191. {
  192. receiver: "googlecloudspanner",
  193. },
  194. {
  195. receiver: "googlecloudpubsub",
  196. skipLifecyle: true, // Requires a pubsub subscription
  197. },
  198. {
  199. receiver: "haproxy",
  200. },
  201. {
  202. receiver: "hostmetrics",
  203. },
  204. {
  205. receiver: "httpcheck",
  206. },
  207. {
  208. receiver: "influxdb",
  209. },
  210. {
  211. receiver: "iis",
  212. skipLifecyle: true, // Requires a running windows process
  213. },
  214. {
  215. receiver: "jaeger",
  216. },
  217. {
  218. receiver: "jmx",
  219. skipLifecyle: true, // Requires a running instance with JMX
  220. getConfigFn: func() component.Config {
  221. cfg := jmxreceiver.NewFactory().CreateDefaultConfig().(*jmxreceiver.Config)
  222. cfg.Endpoint = "localhost:1234"
  223. cfg.TargetSystem = "jvm"
  224. return cfg
  225. },
  226. },
  227. {
  228. receiver: "journald",
  229. skipLifecyle: runtime.GOOS != "linux",
  230. },
  231. {
  232. receiver: "k8s_events",
  233. skipLifecyle: true, // need a valid Kubernetes host and port
  234. },
  235. {
  236. receiver: "k8sobjects",
  237. skipLifecyle: true, // need a valid Kubernetes host and port
  238. },
  239. {
  240. receiver: "kafka",
  241. skipLifecyle: true, // TODO: It needs access to internals to successful start.
  242. },
  243. {
  244. receiver: "kafkametrics",
  245. },
  246. {
  247. receiver: "k8s_cluster",
  248. skipLifecyle: true, // Requires access to the k8s host and port in order to run
  249. },
  250. {
  251. receiver: "kubeletstats",
  252. skipLifecyle: true, // Requires access to certificates to auth against kubelet
  253. },
  254. {
  255. receiver: "loki",
  256. },
  257. {
  258. receiver: "memcached",
  259. },
  260. {
  261. receiver: "mongodb",
  262. skipLifecyle: true, // Causes tests to timeout
  263. },
  264. {
  265. receiver: "mongodbatlas",
  266. getConfigFn: func() component.Config {
  267. cfg := rcvrFactories["mongodbatlas"].CreateDefaultConfig().(*mongodbatlasreceiver.Config)
  268. cfg.Logs.Enabled = true
  269. return cfg
  270. },
  271. },
  272. {
  273. receiver: "mysql",
  274. },
  275. {
  276. receiver: "nginx",
  277. },
  278. {
  279. receiver: "nsxt",
  280. },
  281. {
  282. receiver: "opencensus",
  283. skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown.
  284. },
  285. {
  286. receiver: "oracledb",
  287. },
  288. {
  289. receiver: "otlp",
  290. },
  291. {
  292. receiver: "otlpjsonfile",
  293. getConfigFn: func() component.Config {
  294. cfg := rcvrFactories["otlpjsonfile"].CreateDefaultConfig().(*otlpjsonfilereceiver.Config)
  295. cfg.Include = []string{"/tmp/*.log"}
  296. return cfg
  297. },
  298. },
  299. {
  300. receiver: "podman_stats",
  301. skipLifecyle: true, // Requires a running podman daemon
  302. },
  303. {
  304. receiver: "postgresql",
  305. },
  306. {
  307. receiver: "prometheus",
  308. getConfigFn: func() component.Config {
  309. cfg := rcvrFactories["prometheus"].CreateDefaultConfig().(*prometheusreceiver.Config)
  310. cfg.PrometheusConfig = &promconfig.Config{
  311. ScrapeConfigs: []*promconfig.ScrapeConfig{
  312. {JobName: "test"},
  313. },
  314. }
  315. return cfg
  316. },
  317. },
  318. {
  319. receiver: "pulsar",
  320. skipLifecyle: true, // TODO It requires a running pulsar instance to start successfully.
  321. },
  322. {
  323. receiver: "rabbitmq",
  324. },
  325. {
  326. receiver: "purefa",
  327. },
  328. {
  329. receiver: "purefb",
  330. },
  331. {
  332. receiver: "receiver_creator",
  333. },
  334. {
  335. receiver: "redis",
  336. getConfigFn: func() component.Config {
  337. cfg := rcvrFactories["redis"].CreateDefaultConfig().(*redisreceiver.Config)
  338. cfg.Endpoint = "localhost:6379"
  339. return cfg
  340. },
  341. },
  342. {
  343. receiver: "riak",
  344. },
  345. {
  346. receiver: "sapm",
  347. },
  348. {
  349. receiver: "signalfx",
  350. },
  351. {
  352. receiver: "prometheus_simple",
  353. },
  354. {
  355. receiver: "skywalking",
  356. },
  357. {
  358. receiver: "snmp",
  359. getConfigFn: func() component.Config {
  360. cfg := rcvrFactories["snmp"].CreateDefaultConfig().(*snmpreceiver.Config)
  361. cfg.Metrics = map[string]*snmpreceiver.MetricConfig{
  362. "m1": {
  363. Unit: "1",
  364. Gauge: &snmpreceiver.GaugeMetric{ValueType: "int"},
  365. ScalarOIDs: []snmpreceiver.ScalarOID{{
  366. OID: ".1",
  367. }},
  368. },
  369. }
  370. return cfg
  371. },
  372. },
  373. {
  374. receiver: "snowflake",
  375. },
  376. {
  377. receiver: "splunk_hec",
  378. },
  379. {
  380. receiver: "sqlquery",
  381. },
  382. {
  383. receiver: "sqlserver",
  384. skipLifecyle: true, // Requires a running windows process
  385. },
  386. {
  387. receiver: "sshcheck",
  388. },
  389. {
  390. receiver: "statsd",
  391. },
  392. {
  393. receiver: "wavefront",
  394. skipLifecyle: true, // Depends on carbon receiver to be running correctly
  395. },
  396. {
  397. receiver: "webhookevent",
  398. getConfigFn: func() component.Config {
  399. cfg := rcvrFactories["webhookevent"].CreateDefaultConfig().(*webhookeventreceiver.Config)
  400. cfg.Endpoint = "127.0.0.1:8088"
  401. return cfg
  402. },
  403. },
  404. {
  405. receiver: "windowseventlog",
  406. skipLifecyle: true, // Requires a running windows process
  407. },
  408. {
  409. receiver: "windowsperfcounters",
  410. skipLifecyle: true, // Requires a running windows process
  411. },
  412. {
  413. receiver: "zipkin",
  414. },
  415. {
  416. receiver: "zookeeper",
  417. },
  418. {
  419. receiver: "syslog",
  420. getConfigFn: func() component.Config {
  421. cfg := rcvrFactories["syslog"].CreateDefaultConfig().(*syslogreceiver.SysLogConfig)
  422. cfg.InputConfig.TCP = &tcpop.NewConfig().BaseConfig
  423. cfg.InputConfig.TCP.ListenAddress = "0.0.0.0:0"
  424. cfg.InputConfig.Protocol = "rfc5424"
  425. return cfg
  426. },
  427. },
  428. {
  429. receiver: "tcplog",
  430. getConfigFn: func() component.Config {
  431. cfg := rcvrFactories["tcplog"].CreateDefaultConfig().(*tcplogreceiver.TCPLogConfig)
  432. cfg.InputConfig.ListenAddress = "0.0.0.0:0"
  433. return cfg
  434. },
  435. },
  436. {
  437. receiver: "udplog",
  438. getConfigFn: func() component.Config {
  439. cfg := rcvrFactories["udplog"].CreateDefaultConfig().(*udplogreceiver.UDPLogConfig)
  440. cfg.InputConfig.ListenAddress = "0.0.0.0:0"
  441. return cfg
  442. },
  443. },
  444. {
  445. receiver: "vcenter",
  446. },
  447. {
  448. receiver: "solace",
  449. skipLifecyle: true, // Requires a solace broker to connect to
  450. },
  451. }
  452. assert.Equal(t, len(rcvrFactories), len(tests), "All receivers must be added to the lifecycle suite")
  453. for _, tt := range tests {
  454. t.Run(string(tt.receiver), func(t *testing.T) {
  455. factory := rcvrFactories[tt.receiver]
  456. assert.Equal(t, tt.receiver, factory.Type())
  457. t.Run("shutdown", func(t *testing.T) {
  458. verifyReceiverShutdown(t, factory, tt.getConfigFn)
  459. })
  460. t.Run("lifecycle", func(t *testing.T) {
  461. if tt.skipLifecyle {
  462. t.SkipNow()
  463. }
  464. verifyReceiverLifecycle(t, factory, tt.getConfigFn)
  465. })
  466. })
  467. }
  468. }
  469. // getReceiverConfigFn is used customize the configuration passed to the verification.
  470. // This is used to change ports or provide values required but not provided by the
  471. // default configuration.
  472. type getReceiverConfigFn func() component.Config
  473. // verifyReceiverLifecycle is used to test if a receiver type can handle the typical
  474. // lifecycle of a component. The getConfigFn parameter only need to be specified if
  475. // the test can't be done with the default configuration for the component.
  476. func verifyReceiverLifecycle(t *testing.T, factory receiver.Factory, getConfigFn getReceiverConfigFn) {
  477. ctx := context.Background()
  478. host := newAssertNoErrorHost(t)
  479. receiverCreateSet := receivertest.NewNopCreateSettings()
  480. if getConfigFn == nil {
  481. getConfigFn = factory.CreateDefaultConfig
  482. }
  483. createFns := []createReceiverFn{
  484. wrapCreateLogsRcvr(factory),
  485. wrapCreateTracesRcvr(factory),
  486. wrapCreateMetricsRcvr(factory),
  487. }
  488. for _, createFn := range createFns {
  489. firstRcvr, err := createFn(ctx, receiverCreateSet, getConfigFn())
  490. if errors.Is(err, component.ErrDataTypeIsNotSupported) {
  491. continue
  492. }
  493. require.NoError(t, err)
  494. require.NoError(t, firstRcvr.Start(ctx, host))
  495. require.NoError(t, firstRcvr.Shutdown(ctx))
  496. secondRcvr, err := createFn(ctx, receiverCreateSet, getConfigFn())
  497. require.NoError(t, err)
  498. require.NoError(t, secondRcvr.Start(ctx, host))
  499. require.NoError(t, secondRcvr.Shutdown(ctx))
  500. }
  501. }
  502. // verifyReceiverShutdown is used to test if a receiver type can be shutdown without being started first.
  503. func verifyReceiverShutdown(tb testing.TB, factory receiver.Factory, getConfigFn getReceiverConfigFn) {
  504. ctx := context.Background()
  505. receiverCreateSet := receivertest.NewNopCreateSettings()
  506. if getConfigFn == nil {
  507. getConfigFn = factory.CreateDefaultConfig
  508. }
  509. createFns := []createReceiverFn{
  510. wrapCreateLogsRcvr(factory),
  511. wrapCreateTracesRcvr(factory),
  512. wrapCreateMetricsRcvr(factory),
  513. }
  514. for _, createFn := range createFns {
  515. r, err := createFn(ctx, receiverCreateSet, getConfigFn())
  516. if errors.Is(err, component.ErrDataTypeIsNotSupported) {
  517. continue
  518. }
  519. if r == nil {
  520. continue
  521. }
  522. assert.NotPanics(tb, func() {
  523. assert.NoError(tb, r.Shutdown(ctx))
  524. })
  525. }
  526. }
  527. // assertNoErrorHost implements a component.Host that asserts that there were no errors.
  528. type createReceiverFn func(
  529. ctx context.Context,
  530. set receiver.CreateSettings,
  531. cfg component.Config,
  532. ) (component.Component, error)
  533. func wrapCreateLogsRcvr(factory receiver.Factory) createReceiverFn {
  534. return func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
  535. return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop())
  536. }
  537. }
  538. func wrapCreateMetricsRcvr(factory receiver.Factory) createReceiverFn {
  539. return func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
  540. return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop())
  541. }
  542. }
  543. func wrapCreateTracesRcvr(factory receiver.Factory) createReceiverFn {
  544. return func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
  545. return factory.CreateTracesReceiver(ctx, set, cfg, consumertest.NewNop())
  546. }
  547. }