123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package main
- import (
- "context"
- "flag"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- log "github.com/sirupsen/logrus"
- "net/http"
- "os/signal"
- "syscall"
- "time"
- )
- var debug = flag.Bool("debug", false, "enable debug logging")
- var configPtr = flag.String("c", "./conf/config.yaml", "config path")
- func main() {
- flag.Parse()
- cfg, errParseCfg := parseConfig(*configPtr)
- if errParseCfg != nil {
- fmt.Printf("解析配置:%v\n", errParseCfg)
- return
- }
- logger := log.New()
- logger.SetLevel(log.InfoLevel)
- if *debug {
- logger.SetLevel(log.DebugLevel)
- }
- mainLogger := logger.WithField("name", "main")
- mainLogger.Infof("[配置]:%+v", cfg)
- mainCtx, mainCancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
- defer mainCancel()
- reg := prometheus.NewRegistry()
- rmqCfg := cfg.RocketMq
- chCli, errConnectCh := connectClickhouse(cfg.Clickhouse)
- if errConnectCh != nil {
- mainLogger.Errorf("failed to connect clickhouse:%v, quit", errConnectCh)
- return
- }
- topoChan := make(chan TopoData, cfg.Job.FlatTraceChannelSize)
- metricCh := make(chan ServiceMetricData, cfg.Job.FlatTraceChannelSize)
- traceKafkaSink := NewTraceKafkaSink(topoChan, metricCh, rmqCfg.Parallelism, logger, cfg.KafkaSink.Brokers,
- cfg.KafkaSink.Topic, rmqCfg.TopoTopic, rmqCfg.TopoGroupID, rmqCfg.MetricTopic, rmqCfg.MetricGroupID, reg)
- snRelation := cfg.GetSNRelation()
- aggregator := NewAggregator(chCli, mainLogger.WithField("name", "Aggregator"), cfg.Aggregate,
- snRelation, reg)
- pushTask := NewPushTask(cfg.PushTaskConfig, logger, topoChan, metricCh, cfg.IncludeApps, aggregator, reg)
- traceKafkaSink.start(mainCtx)
- go pushTask.Start(mainCtx)
- httpHandler := http.NewServeMux()
- httpHandler.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
- server := &http.Server{
- Addr: ":8080",
- Handler: httpHandler,
- }
- // Create new metrics and register them using the custom registry.
- // Expose metrics and custom registry via an HTTP server
- // using the HandleFor function. "/metrics" is the usual endpoint for that.
- go func() {
- mainLogger.Fatal(server.ListenAndServe())
- }()
- <-mainCtx.Done()
- shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer shutdownCancel()
- if errShutDown := server.Shutdown(shutdownCtx); errShutDown != nil {
- mainLogger.WithError(errShutDown).Error("shut down server error")
- }
- mainLogger.Infof("stopped")
- }
- func connectClickhouse(chCfg ClickhouseConfig) (clickhouse.Conn, error) {
- clickhouseConfig := clickhouse.Options{
- Addr: chCfg.Servers,
- Auth: clickhouse.Auth{
- Database: chCfg.Database,
- Username: chCfg.User,
- Password: chCfg.Pwd,
- },
- Protocol: clickhouse.Native,
- //ClientInfo: clickhouse.ClientInfo{},
- //TLS: nil,
- //DialContext: nil,
- //DialStrategy: nil,
- //Debug: false,
- //Debugf: nil,
- //Settings: nil,
- //Compression: nil,
- //DialTimeout: 0,
- //MaxOpenConns: 0,
- //MaxIdleConns: 0,
- //ConnMaxLifetime: 0,
- //ConnOpenStrategy: 0,
- //FreeBufOnConnRelease: false,
- //HttpHeaders: nil,
- //HttpUrlPath: "",
- //BlockBufferSize: 0,
- //MaxCompressionBuffer: 0,
- //ReadTimeout: 0,
- }
- conn, err := clickhouse.Open(&clickhouseConfig)
- return conn, err
- }
|