123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package otlpjsonfilereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver"
- import (
- "context"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata"
- )
- const (
- transport = "file"
- )
- // NewFactory creates a factory for file receiver
- func NewFactory() receiver.Factory {
- return receiver.NewFactory(
- metadata.Type,
- createDefaultConfig,
- receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
- receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
- receiver.WithTraces(createTracesReceiver, metadata.TracesStability))
- }
- type Config struct {
- fileconsumer.Config `mapstructure:",squash"`
- StorageID *component.ID `mapstructure:"storage"`
- }
- func createDefaultConfig() component.Config {
- return &Config{
- Config: *fileconsumer.NewConfig(),
- }
- }
- type otlpjsonfilereceiver struct {
- input *fileconsumer.Manager
- id component.ID
- storageID *component.ID
- }
- func (f *otlpjsonfilereceiver) Start(ctx context.Context, host component.Host) error {
- storageClient, err := adapter.GetStorageClient(ctx, host, f.storageID, f.id)
- if err != nil {
- return err
- }
- return f.input.Start(storageClient)
- }
- func (f *otlpjsonfilereceiver) Shutdown(_ context.Context) error {
- return f.input.Stop()
- }
- func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, logs consumer.Logs) (receiver.Logs, error) {
- logsUnmarshaler := &plog.JSONUnmarshaler{}
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: settings.ID,
- Transport: transport,
- ReceiverCreateSettings: settings,
- })
- if err != nil {
- return nil, err
- }
- cfg := configuration.(*Config)
- input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
- ctx = obsrecv.StartLogsOp(ctx)
- var l plog.Logs
- l, err = logsUnmarshaler.UnmarshalLogs(token)
- if err != nil {
- obsrecv.EndLogsOp(ctx, metadata.Type, 0, err)
- } else {
- logRecordCount := l.LogRecordCount()
- if logRecordCount != 0 {
- err = logs.ConsumeLogs(ctx, l)
- }
- obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
- }
- func createMetricsReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, metrics consumer.Metrics) (receiver.Metrics, error) {
- metricsUnmarshaler := &pmetric.JSONUnmarshaler{}
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: settings.ID,
- Transport: transport,
- ReceiverCreateSettings: settings,
- })
- if err != nil {
- return nil, err
- }
- cfg := configuration.(*Config)
- input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
- ctx = obsrecv.StartMetricsOp(ctx)
- var m pmetric.Metrics
- m, err = metricsUnmarshaler.UnmarshalMetrics(token)
- if err != nil {
- obsrecv.EndMetricsOp(ctx, metadata.Type, 0, err)
- } else {
- if m.ResourceMetrics().Len() != 0 {
- err = metrics.ConsumeMetrics(ctx, m)
- }
- obsrecv.EndMetricsOp(ctx, metadata.Type, m.MetricCount(), err)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
- }
- func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, configuration component.Config, traces consumer.Traces) (receiver.Traces, error) {
- tracesUnmarshaler := &ptrace.JSONUnmarshaler{}
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: settings.ID,
- Transport: transport,
- ReceiverCreateSettings: settings,
- })
- if err != nil {
- return nil, err
- }
- cfg := configuration.(*Config)
- input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error {
- ctx = obsrecv.StartTracesOp(ctx)
- var t ptrace.Traces
- t, err = tracesUnmarshaler.UnmarshalTraces(token)
- if err != nil {
- obsrecv.EndTracesOp(ctx, metadata.Type, 0, err)
- } else {
- if t.ResourceSpans().Len() != 0 {
- err = traces.ConsumeTraces(ctx, t)
- }
- obsrecv.EndTracesOp(ctx, metadata.Type, t.SpanCount(), err)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil
- }
|