main.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "github.com/prometheus/client_golang/prometheus/promhttp"
  9. log "github.com/sirupsen/logrus"
  10. "net/http"
  11. "os/signal"
  12. "syscall"
  13. "time"
  14. )
  15. var debug = flag.Bool("debug", false, "enable debug logging")
  16. var configPtr = flag.String("c", "./conf/config.yaml", "config path")
  17. func main() {
  18. flag.Parse()
  19. cfg, errParseCfg := parseConfig(*configPtr)
  20. if errParseCfg != nil {
  21. fmt.Printf("解析配置:%v\n", errParseCfg)
  22. return
  23. }
  24. logger := log.New()
  25. logger.SetLevel(log.InfoLevel)
  26. if *debug {
  27. logger.SetLevel(log.DebugLevel)
  28. }
  29. mainLogger := logger.WithField("name", "main")
  30. mainLogger.Infof("[配置]:%+v", cfg)
  31. mainCtx, mainCancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  32. defer mainCancel()
  33. reg := prometheus.NewRegistry()
  34. rmqCfg := cfg.RocketMq
  35. chCli, errConnectCh := connectClickhouse(cfg.Clickhouse)
  36. if errConnectCh != nil {
  37. mainLogger.Errorf("failed to connect clickhouse:%v, quit", errConnectCh)
  38. return
  39. }
  40. topoChan := make(chan TopoData, cfg.Job.FlatTraceChannelSize)
  41. metricCh := make(chan ServiceMetricData, cfg.Job.FlatTraceChannelSize)
  42. traceKafkaSink := NewTraceKafkaSink(topoChan, metricCh, rmqCfg.Parallelism, logger, cfg.KafkaSink.Brokers,
  43. cfg.KafkaSink.Topic, rmqCfg.TopoTopic, rmqCfg.TopoGroupID, rmqCfg.MetricTopic, rmqCfg.MetricGroupID, reg)
  44. snRelation := cfg.GetSNRelation()
  45. aggregator := NewAggregator(chCli, mainLogger.WithField("name", "Aggregator"), cfg.Aggregate,
  46. snRelation, reg)
  47. pushTask := NewPushTask(cfg.PushTaskConfig, logger, topoChan, metricCh, cfg.IncludeApps, aggregator, reg)
  48. traceKafkaSink.start(mainCtx)
  49. go pushTask.Start(mainCtx)
  50. httpHandler := http.NewServeMux()
  51. httpHandler.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
  52. server := &http.Server{
  53. Addr: ":8080",
  54. Handler: httpHandler,
  55. }
  56. // Create new metrics and register them using the custom registry.
  57. // Expose metrics and custom registry via an HTTP server
  58. // using the HandleFor function. "/metrics" is the usual endpoint for that.
  59. go func() {
  60. mainLogger.Fatal(server.ListenAndServe())
  61. }()
  62. <-mainCtx.Done()
  63. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
  64. defer shutdownCancel()
  65. if errShutDown := server.Shutdown(shutdownCtx); errShutDown != nil {
  66. mainLogger.WithError(errShutDown).Error("shut down server error")
  67. }
  68. mainLogger.Infof("stopped")
  69. }
  70. func connectClickhouse(chCfg ClickhouseConfig) (clickhouse.Conn, error) {
  71. clickhouseConfig := clickhouse.Options{
  72. Addr: chCfg.Servers,
  73. Auth: clickhouse.Auth{
  74. Database: chCfg.Database,
  75. Username: chCfg.User,
  76. Password: chCfg.Pwd,
  77. },
  78. Protocol: clickhouse.Native,
  79. //ClientInfo: clickhouse.ClientInfo{},
  80. //TLS: nil,
  81. //DialContext: nil,
  82. //DialStrategy: nil,
  83. //Debug: false,
  84. //Debugf: nil,
  85. //Settings: nil,
  86. //Compression: nil,
  87. //DialTimeout: 0,
  88. //MaxOpenConns: 0,
  89. //MaxIdleConns: 0,
  90. //ConnMaxLifetime: 0,
  91. //ConnOpenStrategy: 0,
  92. //FreeBufOnConnRelease: false,
  93. //HttpHeaders: nil,
  94. //HttpUrlPath: "",
  95. //BlockBufferSize: 0,
  96. //MaxCompressionBuffer: 0,
  97. //ReadTimeout: 0,
  98. }
  99. conn, err := clickhouse.Open(&clickhouseConfig)
  100. return conn, err
  101. }