123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
- import (
- "context"
- "errors"
- "fmt"
- "net"
- "net/http"
- "sync"
- "github.com/gorilla/mux"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/config/configgrpc"
- "go.opentelemetry.io/collector/config/confighttp"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/receiver"
- "go.uber.org/multierr"
- "google.golang.org/grpc"
- cds "skywalking.apache.org/repo/goapi/collect/agent/configuration/v3"
- event "skywalking.apache.org/repo/goapi/collect/event/v3"
- v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
- profile "skywalking.apache.org/repo/goapi/collect/language/profile/v3"
- management "skywalking.apache.org/repo/goapi/collect/management/v3"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace"
- )
- // configuration defines the behavior and the ports that
- // the Skywalking receiver will use.
- type configuration struct {
- CollectorHTTPPort int
- CollectorHTTPSettings confighttp.HTTPServerSettings
- CollectorGRPCPort int
- CollectorGRPCServerSettings configgrpc.GRPCServerSettings
- }
- // Receiver type is used to receive spans that were originally intended to be sent to Skywaking.
- // This receiver is basically a Skywalking collector.
- type swReceiver struct {
- config *configuration
- grpc *grpc.Server
- collectorServer *http.Server
- goroutines sync.WaitGroup
- settings receiver.CreateSettings
- traceReceiver *trace.Receiver
- metricsReceiver *metrics.Receiver
- dummyReportService *dummyReportService
- }
- // newSkywalkingReceiver creates a TracesReceiver that receives traffic as a Skywalking collector
- func newSkywalkingReceiver(
- config *configuration,
- set receiver.CreateSettings,
- ) *swReceiver {
- return &swReceiver{
- config: config,
- settings: set,
- }
- }
- // registerTraceConsumer register a TracesReceiver that receives trace
- func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error {
- if tc == nil {
- return component.ErrNilNextConsumer
- }
- var err error
- sr.traceReceiver, err = trace.NewReceiver(tc, sr.settings)
- if err != nil {
- return err
- }
- return nil
- }
- // registerTraceConsumer register a TracesReceiver that receives trace
- func (sr *swReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
- if mc == nil {
- return component.ErrNilNextConsumer
- }
- var err error
- sr.metricsReceiver, err = metrics.NewReceiver(mc, sr.settings)
- if err != nil {
- return err
- }
- return nil
- }
- func (sr *swReceiver) collectorGRPCAddr() string {
- var port int
- if sr.config != nil {
- port = sr.config.CollectorGRPCPort
- }
- return fmt.Sprintf(":%d", port)
- }
- func (sr *swReceiver) collectorGRPCEnabled() bool {
- return sr.config != nil && sr.config.CollectorGRPCPort > 0
- }
- func (sr *swReceiver) collectorHTTPEnabled() bool {
- return sr.config != nil && sr.config.CollectorHTTPPort > 0
- }
- func (sr *swReceiver) Start(_ context.Context, host component.Host) error {
- return sr.startCollector(host)
- }
- func (sr *swReceiver) Shutdown(ctx context.Context) error {
- var errs error
- if sr.collectorServer != nil {
- if cerr := sr.collectorServer.Shutdown(ctx); cerr != nil {
- errs = multierr.Append(errs, cerr)
- }
- }
- if sr.grpc != nil {
- sr.grpc.GracefulStop()
- }
- sr.goroutines.Wait()
- return errs
- }
- func (sr *swReceiver) startCollector(host component.Host) error {
- if !sr.collectorGRPCEnabled() && !sr.collectorHTTPEnabled() {
- return nil
- }
- if sr.collectorHTTPEnabled() {
- cln, cerr := sr.config.CollectorHTTPSettings.ToListener()
- if cerr != nil {
- return fmt.Errorf("failed to bind to Collector address %q: %w",
- sr.config.CollectorHTTPSettings.Endpoint, cerr)
- }
- nr := mux.NewRouter()
- nr.HandleFunc("/v3/segments", sr.traceReceiver.HTTPHandler).Methods(http.MethodPost)
- sr.collectorServer, cerr = sr.config.CollectorHTTPSettings.ToServer(host, sr.settings.TelemetrySettings, nr)
- if cerr != nil {
- return cerr
- }
- sr.goroutines.Add(1)
- go func() {
- defer sr.goroutines.Done()
- if errHTTP := sr.collectorServer.Serve(cln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
- host.ReportFatalError(errHTTP)
- }
- }()
- }
- if sr.collectorGRPCEnabled() {
- var err error
- sr.grpc, err = sr.config.CollectorGRPCServerSettings.ToServer(host, sr.settings.TelemetrySettings)
- if err != nil {
- return fmt.Errorf("failed to build the options for the Skywalking gRPC Collector: %w", err)
- }
- gaddr := sr.collectorGRPCAddr()
- gln, gerr := net.Listen("tcp", gaddr)
- if gerr != nil {
- return fmt.Errorf("failed to bind to gRPC address %q: %w", gaddr, gerr)
- }
- if sr.traceReceiver != nil {
- v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.traceReceiver)
- }
- if sr.metricsReceiver != nil {
- v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.metricsReceiver)
- }
- sr.dummyReportService = &dummyReportService{}
- management.RegisterManagementServiceServer(sr.grpc, sr.dummyReportService)
- cds.RegisterConfigurationDiscoveryServiceServer(sr.grpc, sr.dummyReportService)
- event.RegisterEventServiceServer(sr.grpc, &eventService{})
- profile.RegisterProfileTaskServer(sr.grpc, sr.dummyReportService)
- v3.RegisterMeterReportServiceServer(sr.grpc, &meterService{})
- v3.RegisterCLRMetricReportServiceServer(sr.grpc, &clrService{})
- v3.RegisterBrowserPerfServiceServer(sr.grpc, sr.dummyReportService)
- sr.goroutines.Add(1)
- go func() {
- defer sr.goroutines.Done()
- if errGrpc := sr.grpc.Serve(gln); !errors.Is(errGrpc, grpc.ErrServerStopped) && errGrpc != nil {
- host.ReportFatalError(errGrpc)
- }
- }()
- }
- return nil
- }
|