فهرست منبع

[ADD] 事件异常指标提取至prometheus,提供metrics接口供以prometheus拉取指标。

pujielan 3 ماه پیش
والد
کامیت
c9a46b854e
7فایلهای تغییر یافته به همراه298 افزوده شده و 17 حذف شده
  1. 41 0
      app/alert/router/init_router.go
  2. 23 0
      app/alert/router/monitor.go
  3. 40 0
      app/alert/router/router.go
  4. 1 0
      app/observe/models/otel_events.go
  5. 80 2
      cmd/alert/server.go
  6. 70 6
      handler/events.go
  7. 43 9
      handler/handler.go

+ 41 - 0
app/alert/router/init_router.go

@@ -0,0 +1,41 @@
+package router
+
+import (
+	"os"
+
+	common "go-admin/common/middleware"
+
+	"github.com/gin-gonic/gin"
+	log "github.com/go-admin-team/go-admin-core/logger"
+	"github.com/go-admin-team/go-admin-core/sdk"
+)
+
+// InitRouter 路由初始化,不要怀疑,这里用到了
+func InitRouter() {
+	var r *gin.Engine
+	h := sdk.Runtime.GetEngine()
+	if h == nil {
+		log.Fatal("not found engine...")
+		os.Exit(-1)
+	}
+	switch t := h.(type) {
+	case *gin.Engine:
+		r = t
+	default:
+		log.Fatal("not support other engine")
+		os.Exit(-1)
+	}
+
+	// the jwt middleware
+	authMiddleware, err := common.AuthInit()
+	if err != nil {
+		log.Fatalf("JWT Init Error, %s", err.Error())
+	}
+
+	// 注册系统路由
+	// InitSysRouter(r, authMiddleware)
+
+	// 注册业务路由
+	// TODO: 这里可存放业务路由,里边并无实际路由只有演示代码
+	InitExamplesRouter(r, authMiddleware)
+}

+ 23 - 0
app/alert/router/monitor.go

@@ -0,0 +1,23 @@
+package router
+
+import (
+	"net/http"
+
+	"github.com/gin-gonic/gin"
+	"github.com/go-admin-team/go-admin-core/tools/transfer"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+func init() {
+	routerNoCheckRole = append(routerNoCheckRole, registerMonitorRouter)
+}
+
+// 需认证的路由代码
+func registerMonitorRouter(v1 *gin.RouterGroup) {
+	v1.GET("/metrics", transfer.Handler(promhttp.Handler()))
+	//健康检查
+	v1.GET("/health", func(c *gin.Context) {
+		c.Status(http.StatusOK)
+	})
+
+}

+ 40 - 0
app/alert/router/router.go

@@ -0,0 +1,40 @@
+package router
+
+import (
+	"github.com/gin-gonic/gin"
+	jwt "github.com/go-admin-team/go-admin-core/sdk/pkg/jwtauth"
+)
+
+var (
+	routerNoCheckRole = make([]func(*gin.RouterGroup), 0)
+	routerCheckRole   = make([]func(v1 *gin.RouterGroup, authMiddleware *jwt.GinJWTMiddleware), 0)
+)
+
+// 路由示例
+func InitExamplesRouter(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware) *gin.Engine {
+
+	// 无需认证的路由
+	examplesNoCheckRoleRouter(r)
+	// 需要认证的路由
+	examplesCheckRoleRouter(r, authMiddleware)
+
+	return r
+}
+
+// 无需认证的路由示例
+func examplesNoCheckRoleRouter(r *gin.Engine) {
+	// 可根据业务需求来设置接口版本
+	v1 := r.Group("/")
+	for _, f := range routerNoCheckRole {
+		f(v1)
+	}
+}
+
+// 需要认证的路由示例
+func examplesCheckRoleRouter(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware) {
+	// 可根据业务需求来设置接口版本
+	v1 := r.Group("/")
+	for _, f := range routerCheckRole {
+		f(v1, authMiddleware)
+	}
+}

+ 1 - 0
app/observe/models/otel_events.go

@@ -55,6 +55,7 @@ type JudgeResult struct {
 	RowsResult  []map[string]interface{} `json:"rows_result"`
 	IsException bool                     `json:"is_exception"`
 	AlertStatus int                      `json:"alert_status"`
+	RowGaugeResult float64 		     `json:"row_gauge_result"`
 }
 
 /*

+ 80 - 2
cmd/alert/server.go

@@ -1,18 +1,28 @@
 package alert
 
 import (
+	"context"
 	"fmt"
 	"log"
+	"net/http"
 	"os"
 	"os/signal"
 	"time"
 
+	"github.com/gin-contrib/pprof"
+	"github.com/gin-gonic/gin"
 	"github.com/go-admin-team/go-admin-core/config/source/file"
+	"github.com/go-admin-team/go-admin-core/sdk"
+	"github.com/go-admin-team/go-admin-core/sdk/api"
 	"github.com/go-admin-team/go-admin-core/sdk/config"
 	"github.com/go-admin-team/go-admin-core/sdk/pkg"
 	"github.com/spf13/cobra"
+	"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
 
+	"go-admin/app/alert/router"
 	"go-admin/common/database"
+	common "go-admin/common/middleware"
+	mhandler "go-admin/common/middleware/handler"
 	extConfig "go-admin/config"
 	"go-admin/handler"
 )
@@ -34,8 +44,11 @@ var (
 	}
 )
 
+var AppRouters = make([]func(), 0)
+
 func init() {
 	StartCmd.PersistentFlags().StringVarP(&configYml, "config", "c", "config/settings.yml", "Start server with provided configuration file")
+	AppRouters = append(AppRouters, router.InitRouter)
 }
 
 func setup() {
@@ -57,6 +70,31 @@ func run() error {
 		// gin.SetMode(gin.ReleaseMode)
 
 	}
+	initRouter()
+	//暴露metric数据接口
+	for _, f := range AppRouters {
+		f()
+	}
+
+	srv := &http.Server{
+		Addr:    fmt.Sprintf("%s:%d", config.ApplicationConfig.Host, config.ApplicationConfig.Port),
+		Handler: sdk.Runtime.GetEngine(),
+	}
+	go func() {
+		// 服务连接
+		if config.SslConfig.Enable {
+			if err := srv.ListenAndServeTLS(config.SslConfig.Pem, config.SslConfig.KeyStr); err != nil && err != http.ErrServerClosed {
+				log.Fatal("listen: ", err)
+			}
+		} else {
+			if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+				log.Fatal("listen: ", err)
+			}
+		}
+	}()
+	fmt.Println(pkg.Green("Server run at:"))
+	fmt.Printf("-  Local:   http://localhost:%d/ \r\n", config.ApplicationConfig.Port)
+	fmt.Printf("-  Network: http://%s:%d/ \r\n", pkg.GetLocaHonst(), config.ApplicationConfig.Port)
 
 	//TODO:10s一次的检测,设定可配置
 	// productor
@@ -82,9 +120,49 @@ func run() error {
 	signal.Notify(quit, os.Interrupt)
 	<-quit
 
-	// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	// defer cancel()
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	fmt.Printf("%s Shutdown Server ... \r\n", pkg.GetCurrentTimeStr())
+
 	fmt.Printf("%s Shutdown alert server ... \r\n", pkg.GetCurrentTimeStr())
 	log.Println("Alert server exiting")
+	if err := srv.Shutdown(ctx); err != nil {
+		log.Fatal("Server Shutdown:", err)
+	}
+	log.Println("Server exiting")
 	return nil
 }
+
+func initRouter() {
+	var r *gin.Engine
+	h := sdk.Runtime.GetEngine()
+	if h == nil {
+		h = gin.New()
+		sdk.Runtime.SetEngine(h)
+	}
+	switch h := h.(type) {
+	case *gin.Engine:
+		r = h
+	default:
+		log.Fatal("not support other engine")
+		os.Exit(-1)
+	}
+
+	if config.ApplicationConfig.Mode == "dev" {
+		pprof.Register(r)
+	}
+
+	if config.SslConfig.Enable {
+		r.Use(mhandler.TlsHandler())
+	}
+
+	r.Use(otelgin.Middleware("observe-server"))
+
+	// r.Use(middleware.Metrics())
+	r.Use(common.Sentinel()).
+		Use(common.RequestId(pkg.TrafficKey)).
+		Use(api.SetRequestLogger)
+
+	common.InitMiddleware(r)
+
+}

+ 70 - 6
handler/events.go

@@ -5,18 +5,23 @@ import (
 	"fmt"
 	amodels "go-admin/app/admin/models"
 	omodels "go-admin/app/observe/models"
+	"log"
+	"sync"
 	"time"
 
+	"github.com/prometheus/client_golang/prometheus"
+
 	"github.com/go-admin-team/go-admin-core/logger"
 	"gorm.io/gorm"
 )
 
 type EventHandler struct {
-	RP   *omodels.OtRulesPolicy  `json:"result_policy"`
-	AC   *omodels.AlertCondition `json:"alert_condition"`
-	JR   *omodels.JudgeResult    `json:"judge_result"`
-	Emo  *omodels.Events
-	Errs map[string]error `json:"_"`
+	RP      *omodels.OtRulesPolicy  `json:"result_policy"`
+	AC      *omodels.AlertCondition `json:"alert_condition"`
+	JR      *omodels.JudgeResult    `json:"judge_result"`
+	Emo     *omodels.Events
+	promMap *sync.Map
+	Errs    map[string]error `json:"_"`
 }
 
 /*
@@ -24,7 +29,7 @@ type EventHandler struct {
 	rows: list值,出现代表有异常,立即写入异常事件监控,但不一定告警,需要结合频率,周期进行判断
 */
 
-func InitEventHandler() *EventHandler {
+func InitEventHandler(pMap *sync.Map) *EventHandler {
 	e := new(EventHandler)
 	e.RP = new(omodels.OtRulesPolicy)
 	e.AC = new(omodels.AlertCondition)
@@ -32,6 +37,7 @@ func InitEventHandler() *EventHandler {
 	e.JR.AlertStatus = INACTIVE
 	//eg:@@UNSET_apdex::health:5m_{"condition":"<","point":0.7,"point_type":"float","tigger_hz":3,"interval":5}
 	e.Errs = make(map[string]error)
+	e.promMap = pMap
 	return e
 }
 
@@ -44,8 +50,14 @@ func (e *EventHandler) JudgeRow() *EventHandler {
 			e.JR.IsException = true
 			e.JR.AlertStatus = PENDING
 		}
+		/*
+		  不处理rows类型
+		  取消该类型的解析,在prometheus中进行数量判断及相关收敛规则
+		*/
 	case "row":
 		e.JR.CompareV = e.JR.RowResult
+	default:
+		e.JR.CompareV = e.JR.RowResult
 	}
 	return e
 }
@@ -182,6 +194,58 @@ func (e *EventHandler) SetUID() *EventHandler {
 	return e
 }
 
+func (e *EventHandler) RegisterPrometheusGauge() *EventHandler {
+	gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: e.RP.RuleMonitorAlias,
+		Help: e.RP.RuleName,
+	}, []string{"app_alias", "kind", "uid"})
+	if err := prometheus.Register(gauge); err != nil {
+		if err.Error() == "duplicate metrics collector registration attempted" {
+			logger.Errorf("Failed to register gauge: %v", err)
+			return e
+		} else {
+			log.Fatalf("Failed to register gauge: %v", err)
+		}
+
+	}
+	e.promMap.Store(e.RP.RuleMonitorAlias, gauge)
+	return e
+}
+
+func (e *EventHandler) SetPromKV() *EventHandler {
+	e.GetRegisteredGauge(e.RP.RuleMonitorAlias).WithLabelValues(
+		e.RP.AppAlias, e.RP.RuleKind, e.JR.UID,
+	).Set(float64(e.JR.RowGaugeResult))
+	logger.Debugf("e.JR.RowGaugeResult", e.JR.RowGaugeResult)
+	return e
+}
+
+// checkCollectorExists 检查注册表中是否存在具有给定名称的collector
+func CheckCollectorExists(reg *prometheus.Registry, name string) error {
+	merticF, err := reg.Gather()
+	if err != nil {
+		logger.Errorf("checkCollector, error: %s", err.Error())
+	}
+	for _, m := range merticF {
+		fmt.Println("metricFamily: ", m)
+	}
+	return nil
+}
+
+func (e *EventHandler) GetRegisteredGauge(gaugeName string) *prometheus.GaugeVec {
+	logger.Debugf("guageName: %s", gaugeName)
+	logger.Debug(e.promMap)
+	promk, ok := e.promMap.Load(gaugeName)
+	if ok {
+		logger.Debug("has guage name")
+		return promk.(*prometheus.GaugeVec)
+	} else {
+		e.RegisterPrometheusGauge()
+		logger.Debug("recreate prometh guagename")
+		return e.GetRegisteredGauge(gaugeName)
+	}
+}
+
 func (e *EventHandler) CreateEventRecord(chdb *gorm.DB) *EventHandler {
 	if e.JR.IsException {
 		row := "UNSET"

+ 43 - 9
handler/handler.go

@@ -2,8 +2,10 @@ package handler
 
 import (
 	"encoding/json"
+	"fmt"
 	"log"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/go-admin-team/go-admin-core/logger"
@@ -14,6 +16,7 @@ import (
 	amodels "go-admin/app/admin/models"
 	omodels "go-admin/app/observe/models"
 	"go-admin/common/olap"
+	extConfig "go-admin/config"
 	"go-admin/utils"
 
 	"gorm.io/driver/mysql"
@@ -115,6 +118,11 @@ func GetCHInstance() *gorm.DB {
 	return chInstance
 }
 
+func GetPrometheusInstance() string {
+	fmt.Println(extConfig.ExtConfig.Prometheus.Address)
+	return extConfig.ExtConfig.Prometheus.Address
+}
+
 func GetApps() ([]amodels.OtApps, error) {
 	appList := make([]amodels.OtApps, 0)
 	if err := GetDBInstance().Find(&appList).Error; err != nil {
@@ -142,6 +150,8 @@ func SetAlertManager() {
 	// }
 	am.DB = GetDBInstance()
 	am.CH = GetCHInstance()
+	am.PromAddr = GetPrometheusInstance()
+	am.PromMap = new(sync.Map)
 	// am.Redis = cacheAdapter
 	p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) {
 		am.consumerHandler(i)
@@ -170,10 +180,12 @@ type AlertManager struct {
 	P         *ants.PoolWithFunc
 	DB        *gorm.DB
 	CH        *gorm.DB
+	PromAddr  string
 	Redis     storage.AdapterCache
 	Interval  int //TODO: 设置配置项
 	Runtime   int //Goroutine 数量
 	MaxRetry  int //重试次数
+	PromMap   *sync.Map
 }
 
 func (a *AlertManager) PolicyProducter() {
@@ -229,7 +241,7 @@ func (a *AlertManager) PolicyConsumer() {
 func (a *AlertManager) consumerHandler(i interface{}) {
 	var err error
 	c := 0
-	eHandler := InitEventHandler()
+	eHandler := InitEventHandler(a.PromMap)
 	rp, ok := i.(omodels.OtRulesPolicy)
 	if ok {
 		eHandler.RP = &rp
@@ -248,6 +260,7 @@ LOOP:
 	}
 	logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
 	logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
+	eHandler.RP.RuleValueType = "prometheus"
 	if err = a.chQueryHandler(eHandler); err != nil {
 		c += 1
 		if c < a.MaxRetry {
@@ -255,14 +268,26 @@ LOOP:
 		}
 		goto DONE
 	}
+	// if len(eHandler.
+	// 	JudgeRow().          //判断指标还是事件
+	// 	SetUID().            //
+	// 	PointCompare().      //判断指标与阈值比较
+	// 	JudgeTriggerHz().    //判断触发频率
+	// 	JudgeInterval(a.CH). //判断触发周期
+	// 	CreateEventRecord(a.CH). //记录事件
+	// 	CreateAlert(a.DB).       //判断本次检测是否发出告警
+	// 	Errs) > 0 {
+	// 	c += 1
+	// 	if c < a.MaxRetry {
+	// 		goto LOOP
+	// 	}
+	// 	goto DONE
+	// }
 	if len(eHandler.
-		JudgeRow().              //判断指标还是事件
-		SetUID().                //
-		PointCompare().          //判断指标与阈值比较
-		JudgeTriggerHz().        //判断触发频率
-		JudgeInterval(a.CH).     //判断出发周期
-		CreateEventRecord(a.CH). //记录事件
-		CreateAlert(a.DB).       //判断本次检测是否发出告警
+		JudgeRow().                //判断指标还是事件
+		SetUID().                  //
+		RegisterPrometheusGauge(). //注册指标metric guage collector
+		SetPromKV().               //暴露指标 TODO:
 		Errs) > 0 {
 		c += 1
 		if c < a.MaxRetry {
@@ -326,7 +351,16 @@ func (a *AlertManager) chQueryHandler(e *EventHandler) error {
 		e.JR.RowsResult = rlist
 		logger.Debug("rows compare result: ", len(e.JR.RowsResult))
 	default:
-		return nil
+		var result float64
+		if err := a.CH.Table(e.RP.RuleTable).Raw(
+			e.RP.RuleExpression,
+			e.RP.RuleInterval,
+			e.RP.AppAlias).Row().Scan(&result); err != nil {
+			logger.Error("rows query err: ", err.Error())
+			return err
+		}
+		e.JR.RowGaugeResult = result
+		logger.Debug("row result: ", e.JR.RowGaugeResult)
 	}
 	return nil
 }