factory.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
  4. // This file implements factory for skywalking receiver.
  5. import (
  6. "context"
  7. "fmt"
  8. "net"
  9. "strconv"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/config/configgrpc"
  12. "go.opentelemetry.io/collector/config/confighttp"
  13. "go.opentelemetry.io/collector/config/confignet"
  14. "go.opentelemetry.io/collector/consumer"
  15. "go.opentelemetry.io/collector/receiver"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata"
  18. )
  19. const (
  20. // Protocol values.
  21. protoGRPC = "grpc"
  22. protoHTTP = "http"
  23. // Default endpoints to bind to.
  24. defaultGRPCBindEndpoint = "0.0.0.0:11800"
  25. defaultHTTPBindEndpoint = "0.0.0.0:12800"
  26. )
  27. // NewFactory creates a new Skywalking receiver factory.
  28. func NewFactory() receiver.Factory {
  29. return receiver.NewFactory(
  30. metadata.Type,
  31. createDefaultConfig,
  32. receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
  33. receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
  34. }
  35. // CreateDefaultConfig creates the default configuration for Skywalking receiver.
  36. func createDefaultConfig() component.Config {
  37. return &Config{
  38. Protocols: Protocols{
  39. GRPC: &configgrpc.GRPCServerSettings{
  40. NetAddr: confignet.NetAddr{
  41. Endpoint: defaultGRPCBindEndpoint,
  42. Transport: "tcp",
  43. },
  44. },
  45. HTTP: &confighttp.HTTPServerSettings{
  46. Endpoint: defaultHTTPBindEndpoint,
  47. },
  48. },
  49. }
  50. }
  51. // createTracesReceiver creates a trace receiver based on provided config.
  52. func createTracesReceiver(
  53. _ context.Context,
  54. set receiver.CreateSettings,
  55. cfg component.Config,
  56. nextConsumer consumer.Traces,
  57. ) (receiver.Traces, error) {
  58. // Convert settings in the source c to configuration struct
  59. // that Skywalking receiver understands.
  60. rCfg := cfg.(*Config)
  61. c, err := createConfiguration(rCfg)
  62. if err != nil {
  63. return nil, err
  64. }
  65. r := receivers.GetOrAdd(cfg, func() component.Component {
  66. return newSkywalkingReceiver(c, set)
  67. })
  68. if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil {
  69. return nil, err
  70. }
  71. return r, nil
  72. }
  73. // createMetricsReceiver creates a metrics receiver based on provided config.
  74. func createMetricsReceiver(
  75. _ context.Context,
  76. set receiver.CreateSettings,
  77. cfg component.Config,
  78. nextConsumer consumer.Metrics,
  79. ) (receiver.Metrics, error) {
  80. // Convert settings in the source c to configuration struct
  81. // that Skywalking receiver understands.
  82. rCfg := cfg.(*Config)
  83. c, err := createConfiguration(rCfg)
  84. if err != nil {
  85. return nil, err
  86. }
  87. r := receivers.GetOrAdd(cfg, func() component.Component {
  88. return newSkywalkingReceiver(c, set)
  89. })
  90. if err = r.Unwrap().(*swReceiver).registerMetricsConsumer(nextConsumer); err != nil {
  91. return nil, err
  92. }
  93. return r, nil
  94. }
  95. // create the config that Skywalking receiver will use.
  96. func createConfiguration(rCfg *Config) (*configuration, error) {
  97. var err error
  98. var c configuration
  99. // Set ports
  100. if rCfg.Protocols.GRPC != nil {
  101. c.CollectorGRPCServerSettings = *rCfg.Protocols.GRPC
  102. if c.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.NetAddr.Endpoint); err != nil {
  103. return nil, fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
  104. }
  105. }
  106. if rCfg.Protocols.HTTP != nil {
  107. c.CollectorHTTPSettings = *rCfg.Protocols.HTTP
  108. if c.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.HTTP.Endpoint); err != nil {
  109. return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
  110. }
  111. }
  112. return &c, nil
  113. }
  114. // extract the port number from string in "address:port" format. If the
  115. // port number cannot be extracted returns an error.
  116. func extractPortFromEndpoint(endpoint string) (int, error) {
  117. _, portStr, err := net.SplitHostPort(endpoint)
  118. if err != nil {
  119. return 0, fmt.Errorf("endpoint is not formatted correctly: %w", err)
  120. }
  121. port, err := strconv.ParseInt(portStr, 10, 0)
  122. if err != nil {
  123. return 0, fmt.Errorf("endpoint port is not a number: %w", err)
  124. }
  125. if port < 1 || port > 65535 {
  126. return 0, fmt.Errorf("port number must be between 1 and 65535")
  127. }
  128. return int(port), nil
  129. }
  130. var receivers = sharedcomponent.NewSharedComponents()