file.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package otlpjsonfilereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver"
  4. import (
  5. "context"
  6. "go.opentelemetry.io/collector/component"
  7. "go.opentelemetry.io/collector/consumer"
  8. "go.opentelemetry.io/collector/pdata/plog"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "go.opentelemetry.io/collector/pdata/ptrace"
  11. "go.opentelemetry.io/collector/receiver"
  12. "go.opentelemetry.io/collector/receiver/receiverhelper"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata"
  16. )
  17. const (
  18. transport = "file"
  19. )
  20. // NewFactory creates a factory for file receiver
  21. func NewFactory() receiver.Factory {
  22. return receiver.NewFactory(
  23. metadata.Type,
  24. createDefaultConfig,
  25. receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
  26. receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
  27. receiver.WithTraces(createTracesReceiver, metadata.TracesStability))
  28. }
  29. type Config struct {
  30. fileconsumer.Config `mapstructure:",squash"`
  31. StorageID *component.ID `mapstructure:"storage"`
  32. }
  33. func createDefaultConfig() component.Config {
  34. return &Config{
  35. Config: *fileconsumer.NewConfig(),
  36. }
  37. }
  38. type otlpjsonfilereceiver struct {
  39. input *fileconsumer.Manager
  40. id component.ID
  41. storageID *component.ID
  42. }
  43. func (f *otlpjsonfilereceiver) Start(ctx context.Context, host component.Host) error {
  44. storageClient, err := adapter.GetStorageClient(ctx, host, f.storageID, f.id)
  45. if err != nil {
  46. return err
  47. }
  48. return f.input.Start(storageClient)
  49. }
  50. func (f *otlpjsonfilereceiver) Shutdown(_ context.Context) error {
  51. return f.input.Stop()
  52. }
  53. func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, logs consumer.Logs) (receiver.Logs, error) {
  54. logsUnmarshaler := &plog.JSONUnmarshaler{}
  55. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  56. ReceiverID: settings.ID,
  57. Transport: transport,
  58. ReceiverCreateSettings: settings,
  59. })
  60. if err != nil {
  61. return nil, err
  62. }
  63. cfg := configuration.(*Config)
  64. input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
  65. ctx = obsrecv.StartLogsOp(ctx)
  66. var l plog.Logs
  67. l, err = logsUnmarshaler.UnmarshalLogs(token)
  68. if err != nil {
  69. obsrecv.EndLogsOp(ctx, metadata.Type, 0, err)
  70. } else {
  71. logRecordCount := l.LogRecordCount()
  72. if logRecordCount != 0 {
  73. err = logs.ConsumeLogs(ctx, l)
  74. }
  75. obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err)
  76. }
  77. return nil
  78. })
  79. if err != nil {
  80. return nil, err
  81. }
  82. return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
  83. }
  84. func createMetricsReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, metrics consumer.Metrics) (receiver.Metrics, error) {
  85. metricsUnmarshaler := &pmetric.JSONUnmarshaler{}
  86. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  87. ReceiverID: settings.ID,
  88. Transport: transport,
  89. ReceiverCreateSettings: settings,
  90. })
  91. if err != nil {
  92. return nil, err
  93. }
  94. cfg := configuration.(*Config)
  95. input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
  96. ctx = obsrecv.StartMetricsOp(ctx)
  97. var m pmetric.Metrics
  98. m, err = metricsUnmarshaler.UnmarshalMetrics(token)
  99. if err != nil {
  100. obsrecv.EndMetricsOp(ctx, metadata.Type, 0, err)
  101. } else {
  102. if m.ResourceMetrics().Len() != 0 {
  103. err = metrics.ConsumeMetrics(ctx, m)
  104. }
  105. obsrecv.EndMetricsOp(ctx, metadata.Type, m.MetricCount(), err)
  106. }
  107. return nil
  108. })
  109. if err != nil {
  110. return nil, err
  111. }
  112. return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
  113. }
  114. func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, traces consumer.Traces) (receiver.Traces, error) {
  115. tracesUnmarshaler := &ptrace.JSONUnmarshaler{}
  116. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  117. ReceiverID: settings.ID,
  118. Transport: transport,
  119. ReceiverCreateSettings: settings,
  120. })
  121. if err != nil {
  122. return nil, err
  123. }
  124. cfg := configuration.(*Config)
  125. input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
  126. ctx = obsrecv.StartTracesOp(ctx)
  127. var t ptrace.Traces
  128. t, err = tracesUnmarshaler.UnmarshalTraces(token)
  129. if err != nil {
  130. obsrecv.EndTracesOp(ctx, metadata.Type, 0, err)
  131. } else {
  132. if t.ResourceSpans().Len() != 0 {
  133. err = traces.ConsumeTraces(ctx, t)
  134. }
  135. obsrecv.EndTracesOp(ctx, metadata.Type, t.SpanCount(), err)
  136. }
  137. return nil
  138. })
  139. if err != nil {
  140. return nil, err
  141. }
  142. return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
  143. }