skywalking_receiver.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "net/http"
  10. "sync"
  11. "github.com/gorilla/mux"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/config/configgrpc"
  14. "go.opentelemetry.io/collector/config/confighttp"
  15. "go.opentelemetry.io/collector/consumer"
  16. "go.opentelemetry.io/collector/receiver"
  17. "go.uber.org/multierr"
  18. "google.golang.org/grpc"
  19. cds "skywalking.apache.org/repo/goapi/collect/agent/configuration/v3"
  20. event "skywalking.apache.org/repo/goapi/collect/event/v3"
  21. v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
  22. profile "skywalking.apache.org/repo/goapi/collect/language/profile/v3"
  23. management "skywalking.apache.org/repo/goapi/collect/management/v3"
  24. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics"
  25. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace"
  26. )
  27. // configuration defines the behavior and the ports that
  28. // the Skywalking receiver will use.
  29. type configuration struct {
  30. CollectorHTTPPort int
  31. CollectorHTTPSettings confighttp.HTTPServerSettings
  32. CollectorGRPCPort int
  33. CollectorGRPCServerSettings configgrpc.GRPCServerSettings
  34. }
  35. // Receiver type is used to receive spans that were originally intended to be sent to Skywaking.
  36. // This receiver is basically a Skywalking collector.
  37. type swReceiver struct {
  38. config *configuration
  39. grpc *grpc.Server
  40. collectorServer *http.Server
  41. goroutines sync.WaitGroup
  42. settings receiver.CreateSettings
  43. traceReceiver *trace.Receiver
  44. metricsReceiver *metrics.Receiver
  45. dummyReportService *dummyReportService
  46. }
  47. // newSkywalkingReceiver creates a TracesReceiver that receives traffic as a Skywalking collector
  48. func newSkywalkingReceiver(
  49. config *configuration,
  50. set receiver.CreateSettings,
  51. ) *swReceiver {
  52. return &swReceiver{
  53. config: config,
  54. settings: set,
  55. }
  56. }
  57. // registerTraceConsumer register a TracesReceiver that receives trace
  58. func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error {
  59. if tc == nil {
  60. return component.ErrNilNextConsumer
  61. }
  62. var err error
  63. sr.traceReceiver, err = trace.NewReceiver(tc, sr.settings)
  64. if err != nil {
  65. return err
  66. }
  67. return nil
  68. }
  69. // registerTraceConsumer register a TracesReceiver that receives trace
  70. func (sr *swReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
  71. if mc == nil {
  72. return component.ErrNilNextConsumer
  73. }
  74. var err error
  75. sr.metricsReceiver, err = metrics.NewReceiver(mc, sr.settings)
  76. if err != nil {
  77. return err
  78. }
  79. return nil
  80. }
  81. func (sr *swReceiver) collectorGRPCAddr() string {
  82. var port int
  83. if sr.config != nil {
  84. port = sr.config.CollectorGRPCPort
  85. }
  86. return fmt.Sprintf(":%d", port)
  87. }
  88. func (sr *swReceiver) collectorGRPCEnabled() bool {
  89. return sr.config != nil && sr.config.CollectorGRPCPort > 0
  90. }
  91. func (sr *swReceiver) collectorHTTPEnabled() bool {
  92. return sr.config != nil && sr.config.CollectorHTTPPort > 0
  93. }
  94. func (sr *swReceiver) Start(_ context.Context, host component.Host) error {
  95. return sr.startCollector(host)
  96. }
  97. func (sr *swReceiver) Shutdown(ctx context.Context) error {
  98. var errs error
  99. if sr.collectorServer != nil {
  100. if cerr := sr.collectorServer.Shutdown(ctx); cerr != nil {
  101. errs = multierr.Append(errs, cerr)
  102. }
  103. }
  104. if sr.grpc != nil {
  105. sr.grpc.GracefulStop()
  106. }
  107. sr.goroutines.Wait()
  108. return errs
  109. }
  110. func (sr *swReceiver) startCollector(host component.Host) error {
  111. if !sr.collectorGRPCEnabled() && !sr.collectorHTTPEnabled() {
  112. return nil
  113. }
  114. if sr.collectorHTTPEnabled() {
  115. cln, cerr := sr.config.CollectorHTTPSettings.ToListener()
  116. if cerr != nil {
  117. return fmt.Errorf("failed to bind to Collector address %q: %w",
  118. sr.config.CollectorHTTPSettings.Endpoint, cerr)
  119. }
  120. nr := mux.NewRouter()
  121. nr.HandleFunc("/v3/segments", sr.traceReceiver.HTTPHandler).Methods(http.MethodPost)
  122. sr.collectorServer, cerr = sr.config.CollectorHTTPSettings.ToServer(host, sr.settings.TelemetrySettings, nr)
  123. if cerr != nil {
  124. return cerr
  125. }
  126. sr.goroutines.Add(1)
  127. go func() {
  128. defer sr.goroutines.Done()
  129. if errHTTP := sr.collectorServer.Serve(cln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  130. host.ReportFatalError(errHTTP)
  131. }
  132. }()
  133. }
  134. if sr.collectorGRPCEnabled() {
  135. var err error
  136. sr.grpc, err = sr.config.CollectorGRPCServerSettings.ToServer(host, sr.settings.TelemetrySettings)
  137. if err != nil {
  138. return fmt.Errorf("failed to build the options for the Skywalking gRPC Collector: %w", err)
  139. }
  140. gaddr := sr.collectorGRPCAddr()
  141. gln, gerr := net.Listen("tcp", gaddr)
  142. if gerr != nil {
  143. return fmt.Errorf("failed to bind to gRPC address %q: %w", gaddr, gerr)
  144. }
  145. if sr.traceReceiver != nil {
  146. v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.traceReceiver)
  147. }
  148. if sr.metricsReceiver != nil {
  149. v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.metricsReceiver)
  150. }
  151. sr.dummyReportService = &dummyReportService{}
  152. management.RegisterManagementServiceServer(sr.grpc, sr.dummyReportService)
  153. cds.RegisterConfigurationDiscoveryServiceServer(sr.grpc, sr.dummyReportService)
  154. event.RegisterEventServiceServer(sr.grpc, &eventService{})
  155. profile.RegisterProfileTaskServer(sr.grpc, sr.dummyReportService)
  156. v3.RegisterMeterReportServiceServer(sr.grpc, &meterService{})
  157. v3.RegisterCLRMetricReportServiceServer(sr.grpc, &clrService{})
  158. v3.RegisterBrowserPerfServiceServer(sr.grpc, sr.dummyReportService)
  159. sr.goroutines.Add(1)
  160. go func() {
  161. defer sr.goroutines.Done()
  162. if errGrpc := sr.grpc.Serve(gln); !errors.Is(errGrpc, grpc.ErrServerStopped) && errGrpc != nil {
  163. host.ReportFatalError(errGrpc)
  164. }
  165. }()
  166. }
  167. return nil
  168. }