123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package carbonreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
- import (
- "context"
- "errors"
- "fmt"
- "strings"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/receiver"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
- )
- var (
- errEmptyEndpoint = errors.New("empty endpoint")
- )
- // carbonreceiver implements a receiver.Metrics for Carbon plaintext, aka "line", protocol.
- // see https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol.
- type carbonReceiver struct {
- settings receiver.CreateSettings
- config *Config
- server transport.Server
- reporter transport.Reporter
- parser protocol.Parser
- nextConsumer consumer.Metrics
- }
- var _ receiver.Metrics = (*carbonReceiver)(nil)
- // newMetricsReceiver creates the Carbon receiver with the given configuration.
- func newMetricsReceiver(
- set receiver.CreateSettings,
- config Config,
- nextConsumer consumer.Metrics,
- ) (receiver.Metrics, error) {
- if nextConsumer == nil {
- return nil, component.ErrNilNextConsumer
- }
- if config.Endpoint == "" {
- return nil, errEmptyEndpoint
- }
- if config.Parser == nil {
- // Set the defaults
- config.Parser = &protocol.Config{
- Type: "plaintext",
- Config: &protocol.PlaintextConfig{},
- }
- }
- parser, err := config.Parser.Config.BuildParser()
- if err != nil {
- return nil, err
- }
- rep, err := newReporter(set)
- if err != nil {
- return nil, err
- }
- r := carbonReceiver{
- settings: set,
- config: &config,
- nextConsumer: nextConsumer,
- reporter: rep,
- parser: parser,
- }
- return &r, nil
- }
- func buildTransportServer(config Config) (transport.Server, error) {
- switch strings.ToLower(config.Transport) {
- case "", "tcp":
- return transport.NewTCPServer(config.Endpoint, config.TCPIdleTimeout)
- case "udp":
- return transport.NewUDPServer(config.Endpoint)
- }
- return nil, fmt.Errorf("unsupported transport %q", config.Transport)
- }
- // Start tells the receiver to start its processing.
- // By convention the consumer of the received data is set when the receiver
- // instance is created.
- func (r *carbonReceiver) Start(_ context.Context, host component.Host) error {
- server, err := buildTransportServer(*r.config)
- if err != nil {
- return err
- }
- r.server = server
- go func() {
- if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter); err != nil {
- host.ReportFatalError(err)
- }
- }()
- return nil
- }
- // Shutdown tells the receiver that should stop reception,
- // giving it a chance to perform any necessary clean-up.
- func (r *carbonReceiver) Shutdown(context.Context) error {
- if r.server == nil {
- return nil
- }
- return r.server.Close()
- }
|