file_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package otlpjsonfilereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver"
  4. import (
  5. "context"
  6. "os"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/component/componenttest"
  14. "go.opentelemetry.io/collector/confmap/confmaptest"
  15. "go.opentelemetry.io/collector/consumer/consumertest"
  16. "go.opentelemetry.io/collector/pdata/plog"
  17. "go.opentelemetry.io/collector/pdata/pmetric"
  18. "go.opentelemetry.io/collector/pdata/ptrace"
  19. "go.opentelemetry.io/collector/receiver/receivertest"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
  23. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata"
  24. )
  25. func TestDefaultConfig(t *testing.T) {
  26. factory := NewFactory()
  27. cfg := factory.CreateDefaultConfig()
  28. require.NotNil(t, cfg, "failed to create default config")
  29. require.NoError(t, componenttest.CheckConfigStruct(cfg))
  30. }
  31. func TestFileTracesReceiver(t *testing.T) {
  32. tempFolder := t.TempDir()
  33. factory := NewFactory()
  34. cfg := createDefaultConfig().(*Config)
  35. cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
  36. cfg.Config.StartAt = "beginning"
  37. sink := new(consumertest.TracesSink)
  38. receiver, err := factory.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
  39. assert.NoError(t, err)
  40. err = receiver.Start(context.Background(), nil)
  41. require.NoError(t, err)
  42. td := testdata.GenerateTracesTwoSpansSameResource()
  43. marshaler := &ptrace.JSONMarshaler{}
  44. b, err := marshaler.MarshalTraces(td)
  45. assert.NoError(t, err)
  46. b = append(b, '\n')
  47. err = os.WriteFile(filepath.Join(tempFolder, "traces.json"), b, 0600)
  48. assert.NoError(t, err)
  49. time.Sleep(1 * time.Second)
  50. require.Len(t, sink.AllTraces(), 1)
  51. assert.EqualValues(t, td, sink.AllTraces()[0])
  52. err = receiver.Shutdown(context.Background())
  53. assert.NoError(t, err)
  54. }
  55. func TestFileMetricsReceiver(t *testing.T) {
  56. tempFolder := t.TempDir()
  57. factory := NewFactory()
  58. cfg := createDefaultConfig().(*Config)
  59. cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
  60. cfg.Config.StartAt = "beginning"
  61. sink := new(consumertest.MetricsSink)
  62. receiver, err := factory.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
  63. assert.NoError(t, err)
  64. err = receiver.Start(context.Background(), nil)
  65. assert.NoError(t, err)
  66. md := testdata.GenerateMetricsManyMetricsSameResource(5)
  67. marshaler := &pmetric.JSONMarshaler{}
  68. b, err := marshaler.MarshalMetrics(md)
  69. assert.NoError(t, err)
  70. b = append(b, '\n')
  71. err = os.WriteFile(filepath.Join(tempFolder, "metrics.json"), b, 0600)
  72. assert.NoError(t, err)
  73. time.Sleep(1 * time.Second)
  74. require.Len(t, sink.AllMetrics(), 1)
  75. assert.EqualValues(t, md, sink.AllMetrics()[0])
  76. err = receiver.Shutdown(context.Background())
  77. assert.NoError(t, err)
  78. }
  79. func TestFileLogsReceiver(t *testing.T) {
  80. tempFolder := t.TempDir()
  81. factory := NewFactory()
  82. cfg := createDefaultConfig().(*Config)
  83. cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
  84. cfg.Config.StartAt = "beginning"
  85. sink := new(consumertest.LogsSink)
  86. receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
  87. assert.NoError(t, err)
  88. err = receiver.Start(context.Background(), nil)
  89. assert.NoError(t, err)
  90. ld := testdata.GenerateLogsManyLogRecordsSameResource(5)
  91. marshaler := &plog.JSONMarshaler{}
  92. b, err := marshaler.MarshalLogs(ld)
  93. assert.NoError(t, err)
  94. b = append(b, '\n')
  95. err = os.WriteFile(filepath.Join(tempFolder, "logs.json"), b, 0600)
  96. assert.NoError(t, err)
  97. time.Sleep(1 * time.Second)
  98. require.Len(t, sink.AllLogs(), 1)
  99. assert.EqualValues(t, ld, sink.AllLogs()[0])
  100. err = receiver.Shutdown(context.Background())
  101. assert.NoError(t, err)
  102. }
  103. func testdataConfigYamlAsMap() *Config {
  104. return &Config{
  105. Config: fileconsumer.Config{
  106. IncludeFileName: true,
  107. IncludeFilePath: false,
  108. IncludeFileNameResolved: false,
  109. IncludeFilePathResolved: false,
  110. PollInterval: 200 * time.Millisecond,
  111. Encoding: "utf-8",
  112. StartAt: "end",
  113. FingerprintSize: 1000,
  114. MaxLogSize: 1024 * 1024,
  115. MaxConcurrentFiles: 1024,
  116. FlushPeriod: 500 * time.Millisecond,
  117. Criteria: matcher.Criteria{
  118. Include: []string{"/var/log/*.log"},
  119. Exclude: []string{"/var/log/example.log"},
  120. },
  121. },
  122. }
  123. }
  124. func TestLoadConfig(t *testing.T) {
  125. cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
  126. require.NoError(t, err)
  127. factory := NewFactory()
  128. cfg := factory.CreateDefaultConfig()
  129. sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
  130. require.NoError(t, err)
  131. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  132. assert.Equal(t, testdataConfigYamlAsMap(), cfg)
  133. }
  134. func TestFileMixedSignals(t *testing.T) {
  135. tempFolder := t.TempDir()
  136. factory := NewFactory()
  137. cfg := createDefaultConfig().(*Config)
  138. cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
  139. cfg.Config.StartAt = "beginning"
  140. cs := receivertest.NewNopCreateSettings()
  141. ms := new(consumertest.MetricsSink)
  142. mr, err := factory.CreateMetricsReceiver(context.Background(), cs, cfg, ms)
  143. assert.NoError(t, err)
  144. err = mr.Start(context.Background(), nil)
  145. assert.NoError(t, err)
  146. ts := new(consumertest.TracesSink)
  147. tr, err := factory.CreateTracesReceiver(context.Background(), cs, cfg, ts)
  148. assert.NoError(t, err)
  149. err = tr.Start(context.Background(), nil)
  150. assert.NoError(t, err)
  151. ls := new(consumertest.LogsSink)
  152. lr, err := factory.CreateLogsReceiver(context.Background(), cs, cfg, ls)
  153. assert.NoError(t, err)
  154. err = lr.Start(context.Background(), nil)
  155. assert.NoError(t, err)
  156. md := testdata.GenerateMetricsManyMetricsSameResource(5)
  157. marshaler := &pmetric.JSONMarshaler{}
  158. b, err := marshaler.MarshalMetrics(md)
  159. assert.NoError(t, err)
  160. td := testdata.GenerateTracesTwoSpansSameResource()
  161. tmarshaler := &ptrace.JSONMarshaler{}
  162. b2, err := tmarshaler.MarshalTraces(td)
  163. assert.NoError(t, err)
  164. ld := testdata.GenerateLogsManyLogRecordsSameResource(5)
  165. lmarshaler := &plog.JSONMarshaler{}
  166. b3, err := lmarshaler.MarshalLogs(ld)
  167. assert.NoError(t, err)
  168. b = append(b, '\n')
  169. b = append(b, b2...)
  170. b = append(b, '\n')
  171. b = append(b, b3...)
  172. b = append(b, '\n')
  173. err = os.WriteFile(filepath.Join(tempFolder, "metrics.json"), b, 0600)
  174. assert.NoError(t, err)
  175. time.Sleep(1 * time.Second)
  176. require.Len(t, ms.AllMetrics(), 1)
  177. assert.EqualValues(t, md, ms.AllMetrics()[0])
  178. require.Len(t, ts.AllTraces(), 1)
  179. assert.EqualValues(t, td, ts.AllTraces()[0])
  180. require.Len(t, ls.AllLogs(), 1)
  181. assert.EqualValues(t, ld, ls.AllLogs()[0])
  182. err = mr.Shutdown(context.Background())
  183. assert.NoError(t, err)
  184. err = tr.Shutdown(context.Background())
  185. assert.NoError(t, err)
  186. err = lr.Shutdown(context.Background())
  187. assert.NoError(t, err)
  188. }