Browse Source

[REV] 异常告警检测短信发送未完成版本

pujielan 2 months ago
parent
commit
8c17ca8945

+ 1 - 1
app/admin/service/ot_alert_policy.go

@@ -148,8 +148,8 @@ func (e *OtAlertPolicy) ImportPolicy(c *dto.OtAlertPolicyImportReq, p *actions.D
 		ap := models.OtAlertPolicy{
 			AppId:  c.AppId,
 			RuleId: int64(v.Id),
-			Policy: v.Policy,
 			Power:  0,
+			// Policy: v.Policy,
 		}
 		list = append(list, ap)
 	}

+ 1 - 1
app/observe/models/otel_events.go

@@ -50,12 +50,12 @@ type AlertCondition struct {
 
 type JudgeResult struct {
 	UID         string                   `json:"uid"`
-	CompareV    interface{}              `json:"compare_value"`
 	RowResult   interface{}              `json:"row_result"`
 	RowsResult  []map[string]interface{} `json:"rows_result"`
 	IsException bool                     `json:"is_exception"`
 	AlertStatus int                      `json:"alert_status"`
 	RowGaugeResult float64 		     `json:"row_gauge_result"`
+	CompareV    interface{}              `json:"compare_value"`
 }
 
 /*

+ 1 - 1
app/observe/service/event.go

@@ -51,7 +51,7 @@ func (e *Event) ExceptionNums(c *dto.EventExceptionNumsReq, result *[]dto.EventE
 	db := e.Orm.Table(fmt.Sprintf("%s t", tbl)).Scopes(
 		cDto.Paginate(c.GetPageSize(), c.GetPageIndex()),
 	)
-	db.Select([]string{
+	db.Debug().Select([]string{
 		"app_id",
 		fmt.Sprintf("(SELECT app_name FROM %s WHERE app_id=t.app_id ORDER BY created_at DESC LIMIT 1) AS app_name", tbl),
 		"COUNT(1) AS exception_num",

+ 34 - 20
handler/events.go

@@ -16,17 +16,18 @@ import (
 )
 
 type EventHandler struct {
-	RP      *omodels.OtRulesPolicy  `json:"result_policy"`
-	AC      *omodels.AlertCondition `json:"alert_condition"`
-	JR      *omodels.JudgeResult    `json:"judge_result"`
-	Emo     *omodels.Events
-	promMap *sync.Map
-	Errs    map[string]error `json:"_"`
+	RP             *omodels.OtRulesPolicy  `json:"result_policy"`
+	AC             *omodels.AlertCondition `json:"alert_condition"`
+	JR             *omodels.JudgeResult    `json:"judge_result"`
+	Emo            *omodels.Events
+	promMap        *sync.Map
+	CheckStartTime time.Time        `json:"check_starttime"`
+	Errs           map[string]error `json:"_"`
 }
 
 /*
-	row: 单个数值,出现不代表异常,需要与point比较,合则立即写入异常事件,单不一定告警,需要结合频率、周期进行判断
-	rows: list值,出现代表有异常,立即写入异常事件监控,但不一定告警,需要结合频率,周期进行判断
+	row: 单个数值,出现不代表异常,需要与point比较,合则立即写入异常事件,单不一定告警,需要结合频率、周期进行判断
+	--{取消该条检测} rows: list值,出现代表有异常,立即写入异常事件监控,但不一定告警,需要结合频率,周期进行判断
 */
 
 func InitEventHandler(pMap *sync.Map) *EventHandler {
@@ -38,6 +39,7 @@ func InitEventHandler(pMap *sync.Map) *EventHandler {
 	//eg:@@UNSET_apdex::health:5m_{"condition":"<","point":0.7,"point_type":"float","tigger_hz":3,"interval":5}
 	e.Errs = make(map[string]error)
 	e.promMap = pMap
+	e.CheckStartTime = time.Now()
 	return e
 }
 
@@ -57,8 +59,9 @@ func (e *EventHandler) JudgeRow() *EventHandler {
 	case "row":
 		e.JR.CompareV = e.JR.RowResult
 	default:
-		e.JR.CompareV = e.JR.RowResult
+		e.JR.CompareV = e.JR.RowGaugeResult
 	}
+	logger.Debug("compareV is: ", e.JR.CompareV)
 	return e
 }
 
@@ -114,7 +117,8 @@ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler {
 		return e
 	}
 	switch e.RP.RuleValueType {
-	case "row":
+	// case "row":
+	default:
 		if !e.JR.IsException {
 			return e
 		}
@@ -127,11 +131,18 @@ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler {
 		err = chdb.Table(emo.TableName()).
 			Raw(`SELECT COUNT(*)
 				FROM otel_events 
-				WHERE AlertStatus = ? AND UID = ?`, FIRING, e.JR.UID).
+				WHERE AlertStatus = ? AND UID = ? AND Timestamp >= ?`, FIRING, e.JR.UID, ft).
 			Row().Scan(&count)
-		if count == 0 {
-			ft = timer
-		} else {
+		if err != nil {
+			logger.Error("select fire count err: ", err)
+			e.Errs["select fire count err"] = err
+			return e
+		}
+		// if count == 0 {
+		// 	// ft = timer
+		// 	timer = ft
+		// } else
+		if count != 0 {
 			err = chdb.Table(emo.TableName()).
 				Raw(`SELECT AppendTime
 				FROM otel_events
@@ -144,11 +155,11 @@ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler {
 				e.Errs["alert status 4 row"] = err
 				return e
 			}
+			if timer.After(ft) || timer.Equal(ft) {
+				ft = timer
+			}
 		}
 
-		if timer.After(ft) || timer.Equal(ft) {
-			ft = timer
-		}
 		rows, err := chdb.Table(emo.TableName()).
 			Raw(`SELECT *
 				FROM otel_events
@@ -218,7 +229,7 @@ func (e *EventHandler) SetPromKV() *EventHandler {
 	e.GetRegisteredGauge(e.RP.RuleMonitorAlias).WithLabelValues(
 		e.RP.AppAlias, e.RP.RuleKind, e.JR.UID,
 	).Set(float64(e.JR.RowGaugeResult))
-	logger.Debug("e.JR.RowGaugeResult", e.JR.RowGaugeResult)
+	logger.Debug("e.JR.RowGaugeResult: ", e.JR.RowGaugeResult)
 	return e
 }
 
@@ -252,9 +263,10 @@ func (e *EventHandler) CreateEventRecord(chdb *gorm.DB) *EventHandler {
 	if e.JR.IsException {
 		row := "UNSET"
 		rows := make([]map[string]string, 0)
-		if e.RP.RuleValueType == "row" {
+		switch e.RP.RuleValueType {
+		case "row":
 			row = fmt.Sprintf("%v", e.JR.RowResult)
-		} else {
+		case "rows":
 			for _, d := range e.JR.RowsResult {
 				tmp := make(map[string]string)
 				for k, v := range d {
@@ -262,6 +274,8 @@ func (e *EventHandler) CreateEventRecord(chdb *gorm.DB) *EventHandler {
 				}
 				rows = append(rows, tmp)
 			}
+		default:
+			row = fmt.Sprintf("%v", e.JR.RowGaugeResult)
 		}
 		var exceptionName string
 		if e.RP.RuleValueType == "rows" {

+ 24 - 14
handler/handler.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"log"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 
@@ -288,6 +289,10 @@ LOOP:
 		SetUID().                  //
 		RegisterPrometheusGauge(). //注册指标metric guage collector
 		SetPromKV().               //暴露指标 TODO:
+		PointCompare().            //判断指标与阈值比较
+		JudgeInterval(a.CH).       //判断触发周期
+		CreateEventRecord(a.CH).   //记录事件
+		CreateAlert(a.DB).         //判断本次检测是否发出告警
 		Errs) > 0 {
 		c += 1
 		if c < a.MaxRetry {
@@ -306,23 +311,28 @@ DONE:
 		return
 	}
 	if !eHandler.JR.IsException {
-		logger.Info("分析结果: 无异常事件", eHandler.JR.AlertStatus)
+		logger.Info("分析结果: 无异常事件,状态为", eHandler.JR.AlertStatus)
 	} else {
-		logger.Warn("分析结果:发现异常事件", eHandler.JR.AlertStatus)
+		logger.Warn("分析结果:发现异常事件,状态为", eHandler.JR.AlertStatus)
 	}
 	logger.Info("Event 分析结束")
 	return
 }
 
-func (a *AlertManager) chQueryHandler(e *EventHandler) error {
+func (a *AlertManager) replaceExpressionPlaceholders(e *EventHandler) string {
+	e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{app_alias}}", fmt.Sprintf("'%s'", e.RP.AppAlias), -1)
+	e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{check_interval}}", fmt.Sprintf("%d", e.RP.RuleInterval), -1)
+	logger.Debug("rule expression: ", e.RP.RuleExpression)
+	return e.RP.RuleExpression
+}
 
+func (a *AlertManager) chQueryHandler(e *EventHandler) error {
 	switch e.RP.RuleValueType {
 	case "row":
 		var result interface{}
-		if err := a.CH.Table(e.RP.RuleTable).Raw(
-			e.RP.RuleExpression,
-			e.RP.RuleInterval,
-			e.RP.AppAlias).Row().Scan(&result); err != nil {
+		if err := a.CH.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Row().Scan(&result); err != nil {
 			logger.Error("rows query err: ", err.Error())
 			return err
 		}
@@ -351,15 +361,15 @@ func (a *AlertManager) chQueryHandler(e *EventHandler) error {
 		e.JR.RowsResult = rlist
 		logger.Debug("rows compare result: ", len(e.JR.RowsResult))
 	default:
-		var result float64
-		if err := a.CH.Table(e.RP.RuleTable).Raw(
-			e.RP.RuleExpression,
-			e.RP.RuleInterval,
-			e.RP.AppAlias).Row().Scan(&result); err != nil {
-			logger.Error("rows query err: ", err.Error())
+		var result *float64
+		// var result interface{}
+		if err := a.CH.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Row().Scan(&result); err != nil {
+			logger.Error("default rows query err: ", err.Error())
 			return err
 		}
-		e.JR.RowGaugeResult = result
+		e.JR.RowGaugeResult = *result
 		logger.Debug("row result: ", e.JR.RowGaugeResult)
 	}
 	return nil