فهرست منبع

Merge branch 'dev' of http://git.cestong.com.cn/cecf/observe-server into dev

路佳明 1 هفته پیش
والد
کامیت
fa80ae2b41
2فایلهای تغییر یافته به همراه88 افزوده شده و 15 حذف شده
  1. 10 4
      handler/events.go
  2. 78 11
      handler/handler.go

+ 10 - 4
handler/events.go

@@ -137,16 +137,22 @@ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler {
 		if !e.JR.IsException {
 			return e
 		}
-		//周期内数值异常检测结果为pending的总次数 (AlertStatus = 2)
-		ft := e.RP.CreateTime.Add(-time.Duration(e.AC.Interval) * time.Minute)
 		var err error
+		//周期内数值异常检测结果为pending的总次数 (AlertStatus = 2)
+		location, err := time.LoadLocation("Asia/Shanghai")
+		if err != nil {
+			logger.Error("Error loading location: ", err)
+			e.Errs["Error loading location"] = err
+			return e
+		}
+		ft := e.RP.CreateTime.In(location).Add(-time.Duration(e.AC.Interval) * time.Minute)
 		emo := omodels.Events{}
 		var timer time.Time
 		var count int64
 		err = chdb.Table(emo.TableName()).
 			Raw(`SELECT COUNT(*)
 				FROM otel_events 
-				WHERE AlertStatus = ? AND UID = ? AND Timestamp >= ?`, FIRING, e.JR.UID, ft).
+				WHERE AlertStatus >= ? AND UID = ? AND Timestamp >= ?`, FIRING, e.JR.UID, ft).
 			Row().Scan(&count)
 		if err != nil {
 			logger.Error("select fire count err: ", err)
@@ -161,7 +167,7 @@ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler {
 			err = chdb.Table(emo.TableName()).
 				Raw(`SELECT AppendTime
 				FROM otel_events
-				WHERE AlertStatus = ? AND UID = ?
+				WHERE AlertStatus >= ? AND UID = ?
 				ORDER BY AppendTime DESC
 				LIMIT 1`, FIRING, e.JR.UID).
 				Row().Scan(&timer)

+ 78 - 11
handler/handler.go

@@ -310,13 +310,35 @@ LOOP:
 	}
 	logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
 	logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
-	eHandler.RP.RuleValueType = "prometheus"
-	if err = a.chQueryHandler(eHandler); err != nil {
-		c += 1
-		if c < a.MaxRetry {
-			goto LOOP
+	if eHandler.RP.RuleValueType == "row" {
+		// row类型转换为prometheus格式,存入prometheus
+		eHandler.RP.RuleValueType = "prometheus"
+	}
+	switch eHandler.RP.RuleDataSource {
+	case "clickhouse":
+		if err = a.chQueryHandler(eHandler); err != nil {
+			c += 1
+			if c < a.MaxRetry {
+				goto LOOP
+			}
+			goto DONE
+		}
+	case "mysql":
+		if err = a.mysqlQueryHandler(eHandler); err != nil {
+			c += 1
+			if c < a.MaxRetry {
+				goto LOOP
+			}
+			goto DONE
+		}
+	default:
+		if err = a.chQueryHandler(eHandler); err != nil {
+			c += 1
+			if c < a.MaxRetry {
+				goto LOOP
+			}
+			goto DONE
 		}
-		goto DONE
 	}
 	if len(eHandler.
 		JudgeRow().                //判断指标还是事件
@@ -374,11 +396,9 @@ func (a *AlertManager) chQueryHandler(e *EventHandler) error {
 		e.JR.RowResult = result
 		logger.Debug("row result: ", e.JR.RowResult)
 	case "rows":
-		rows, err := a.CH.Table(e.RP.RuleTable).Raw(
-			e.RP.RuleExpression,
-			// e.RP.RuleInterval,
-			e.AC.Interval,
-			e.RP.AppAlias).Rows()
+		rows, err := a.CH.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Rows()
 		if err != nil {
 			logger.Error("rows query err: ", err.Error())
 			return err
@@ -409,3 +429,50 @@ func (a *AlertManager) chQueryHandler(e *EventHandler) error {
 	}
 	return nil
 }
+
+func (a *AlertManager) mysqlQueryHandler(e *EventHandler) error {
+	switch e.RP.RuleValueType {
+	case "row":
+		var result interface{}
+		if err := a.DB.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Row().Scan(&result); err != nil {
+			logger.Error("mysql rows query err: ", err.Error())
+			return err
+		}
+		e.JR.RowResult = result
+		logger.Debug("mysql row result: ", e.JR.RowResult)
+	case "rows":
+		rows, err := a.DB.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Rows()
+		if err != nil {
+			logger.Error("mysql rows query err: ", err.Error())
+			return err
+		}
+		rlist := make([]map[string]interface{}, 0)
+		defer rows.Close()
+		for rows.Next() {
+			r := make(map[string]interface{})
+			if err := a.DB.ScanRows(rows, &r); err != nil {
+				logger.Errorf("mysql rows scan err: %s", err)
+				return err
+			}
+			rlist = append(rlist, r)
+		}
+		e.JR.RowsResult = rlist
+		logger.Debug("mysql rows compare result: ", len(e.JR.RowsResult))
+	default:
+		var result *float64
+		// var result interface{}
+		if err := a.DB.Table(e.RP.RuleTable).
+			Raw(a.replaceExpressionPlaceholders(e)).
+			Row().Scan(&result); err != nil {
+			logger.Error("default rows query err: ", err.Error())
+			return err
+		}
+		e.JR.RowGaugeResult = *result
+		logger.Debug("msyql row result: ", e.JR.RowGaugeResult)
+	}
+	return nil
+}