package main import ( "context" "flag" "fmt" "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "net/http" "os/signal" "syscall" "time" ) var debug = flag.Bool("debug", false, "enable debug logging") var configPtr = flag.String("c", "./conf/config.yaml", "config path") func main() { flag.Parse() cfg, errParseCfg := parseConfig(*configPtr) if errParseCfg != nil { fmt.Printf("解析配置:%v\n", errParseCfg) return } logger := log.New() logger.SetLevel(log.InfoLevel) if *debug { logger.SetLevel(log.DebugLevel) } mainLogger := logger.WithField("name", "main") mainLogger.Infof("[配置]:%+v", cfg) mainCtx, mainCancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer mainCancel() reg := prometheus.NewRegistry() rmqCfg := cfg.RocketMq chCli, errConnectCh := connectClickhouse(cfg.Clickhouse) if errConnectCh != nil { mainLogger.Errorf("failed to connect clickhouse:%v, quit", errConnectCh) return } topoChan := make(chan TopoData, cfg.Job.FlatTraceChannelSize) metricCh := make(chan ServiceMetricData, cfg.Job.FlatTraceChannelSize) traceKafkaSink := NewTraceKafkaSink(topoChan, metricCh, rmqCfg.Parallelism, logger, cfg.KafkaSink.Brokers, cfg.KafkaSink.Topic, rmqCfg.TopoTopic, rmqCfg.TopoGroupID, rmqCfg.MetricTopic, rmqCfg.MetricGroupID, reg) snRelation := cfg.GetSNRelation() aggregator := NewAggregator(chCli, mainLogger.WithField("name", "Aggregator"), cfg.Aggregate, snRelation, reg) pushTask := NewPushTask(cfg.PushTaskConfig, logger, topoChan, metricCh, cfg.IncludeApps, aggregator, reg) traceKafkaSink.start(mainCtx) go pushTask.Start(mainCtx) httpHandler := http.NewServeMux() httpHandler.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) server := &http.Server{ Addr: ":8080", Handler: httpHandler, } // Create new metrics and register them using the custom registry. // Expose metrics and custom registry via an HTTP server // using the HandleFor function. "/metrics" is the usual endpoint for that. go func() { mainLogger.Fatal(server.ListenAndServe()) }() <-mainCtx.Done() shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() if errShutDown := server.Shutdown(shutdownCtx); errShutDown != nil { mainLogger.WithError(errShutDown).Error("shut down server error") } mainLogger.Infof("stopped") } func connectClickhouse(chCfg ClickhouseConfig) (clickhouse.Conn, error) { clickhouseConfig := clickhouse.Options{ Addr: chCfg.Servers, Auth: clickhouse.Auth{ Database: chCfg.Database, Username: chCfg.User, Password: chCfg.Pwd, }, Protocol: clickhouse.Native, //ClientInfo: clickhouse.ClientInfo{}, //TLS: nil, //DialContext: nil, //DialStrategy: nil, //Debug: false, //Debugf: nil, //Settings: nil, //Compression: nil, //DialTimeout: 0, //MaxOpenConns: 0, //MaxIdleConns: 0, //ConnMaxLifetime: 0, //ConnOpenStrategy: 0, //FreeBufOnConnRelease: false, //HttpHeaders: nil, //HttpUrlPath: "", //BlockBufferSize: 0, //MaxCompressionBuffer: 0, //ReadTimeout: 0, } conn, err := clickhouse.Open(&clickhouseConfig) return conn, err }