package handler import ( "encoding/json" "fmt" amodels "go-admin/app/admin/models" omodels "go-admin/app/observe/models" "log" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/go-admin-team/go-admin-core/logger" "gorm.io/gorm" ) type EventHandler struct { RP *omodels.OtRulesPolicy `json:"result_policy"` AC *omodels.AlertCondition `json:"alert_condition"` JR *omodels.JudgeResult `json:"judge_result"` Emo *omodels.Events promMap *sync.Map Errs map[string]error `json:"_"` } /* row: 单个数值,出现不代表异常,需要与point比较,复合则立即写入异常事件,单不一定告警,需要结合频率、周期进行判断 rows: list值,出现代表有异常,立即写入异常事件监控,但不一定告警,需要结合频率,周期进行判断 */ func InitEventHandler(pMap *sync.Map) *EventHandler { e := new(EventHandler) e.RP = new(omodels.OtRulesPolicy) e.AC = new(omodels.AlertCondition) e.JR = new(omodels.JudgeResult) e.JR.AlertStatus = INACTIVE //eg:@@UNSET_apdex::health:5m_{"condition":"<","point":0.7,"point_type":"float","tigger_hz":3,"interval":5} e.Errs = make(map[string]error) e.promMap = pMap return e } func (e *EventHandler) JudgeRow() *EventHandler { switch e.RP.RuleValueType { case "rows": e.JR.CompareV = len(e.JR.RowsResult) //rows 类型出现则表明符合异常 if len(e.JR.RowsResult) > 0 { e.JR.IsException = true e.JR.AlertStatus = PENDING } /* 不处理rows类型 取消该类型的解析,在prometheus中进行数量判断及相关收敛规则 */ case "row": e.JR.CompareV = e.JR.RowResult default: e.JR.CompareV = e.JR.RowResult } return e } func (e *EventHandler) PointCompare() *EventHandler { if e.JR.IsException { return e } logger.Debugf("%v %s %v", e.JR.CompareV, e.AC.Condition, e.AC.Point) switch e.AC.Condition { case ">": e.JR.IsException = e.g(e.JR.CompareV, e.AC.Point, e.AC.PointType) case ">=": e.JR.IsException = e.ge(e.JR.CompareV, e.AC.Point, e.AC.PointType) case "<": e.JR.IsException = e.l(e.JR.CompareV, e.AC.Point, e.AC.PointType) case "<=": e.JR.IsException = e.le(e.JR.CompareV, e.AC.Point, e.AC.PointType) case "=", "==": e.JR.IsException = e.e(e.JR.CompareV, e.AC.Point, e.AC.PointType) case "!=", "<>", "><": e.JR.IsException = e.ne(e.JR.CompareV, e.AC.Point, e.AC.PointType) } return e } func (e *EventHandler) JudgeTriggerHz() *EventHandler { //仅做第一次标注,此处并非直接控制告警 if e.JR.IsException { if e.AC.TiggerHz == 0 { //@@对应事件:x分钟内出现1次,立即告警 e.JR.AlertStatus = FIRING //立即告警, 记录监控事件,event } else { e.JR.AlertStatus = PENDING //出现异常,记录监控事件 event,clickhouse TODO: } if e.RP.RuleValueType == "rows" && len(e.JR.RowsResult) >= e.AC.TiggerHz { e.JR.AlertStatus = FIRING } } return e } /* SELECT AppendTime FROM otel_events WHERE AlertStatus = 4 AND UID = 'UNSET_2' ORDER BY AppendTime DESC LIMIT 1 */ func (e *EventHandler) JudgeInterval(chdb *gorm.DB) *EventHandler { if e.JR.AlertStatus == FIRING { return e } switch e.RP.RuleValueType { case "row": if !e.JR.IsException { return e } //周期内数值异常检测结果为pending的总次数 (AlertStatus = 2) ft := e.RP.CreateTime.Add(-time.Duration(e.AC.Interval) * time.Minute) var err error emo := omodels.Events{} var timer time.Time var count int64 err = chdb.Table(emo.TableName()). Raw(`SELECT COUNT(*) FROM otel_events WHERE AlertStatus = ? AND UID = ?`, FIRING, e.JR.UID). Row().Scan(&count) if count == 0 { ft = timer } else { err = chdb.Table(emo.TableName()). Raw(`SELECT AppendTime FROM otel_events WHERE AlertStatus = ? AND UID = ? ORDER BY AppendTime DESC LIMIT 1`, FIRING, e.JR.UID). Row().Scan(&timer) if err != nil { logger.Error("alert status 4 row query err: ", err) e.Errs["alert status 4 row"] = err return e } } if timer.After(ft) || timer.Equal(ft) { ft = timer } rows, err := chdb.Table(emo.TableName()). Raw(`SELECT * FROM otel_events WHERE AlertStatus = ? AND UID = ? AND AppendTime > ? ORDER BY AppendTime DESC`, PENDING, e.JR.UID, ft). Rows() if err != nil { logger.Error("rows query err: ", err) e.Errs["Get events rows err"] = err } elist := make([]omodels.Events, 0) defer rows.Close() er := omodels.Events{} for rows.Next() { if err := chdb.ScanRows(rows, &er); err != nil { logger.Errorf("range alert rows scan err: %s", err) e.Errs["range alert rows scan err"] = err } elist = append(elist, er) } if len(elist) >= e.AC.TiggerHz { e.JR.IsException = true e.JR.AlertStatus = FIRING } else { e.JR.IsException = true e.JR.AlertStatus = PENDING return e } case "rows": //rows 立即告警 // if e.JR.IsException { // e.JR.AlertStatus = FIRING // } return e } return e } func (e *EventHandler) SetUID() *EventHandler { // logger.Debug("UUUUUUid: ", fmt.Sprintf("%s_%s_%s", e.RP.AppAlias, e.RP.RuleMonitorAlias, e.RP.Policy)) e.JR.UID = fmt.Sprintf("%s_%d", e.RP.AppAlias, e.RP.Id) return e } func (e *EventHandler) RegisterPrometheusGauge() *EventHandler { gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: e.RP.RuleMonitorAlias, Help: e.RP.RuleName, }, []string{"app_alias", "kind", "uid"}) if err := prometheus.Register(gauge); err != nil { if err.Error() == "duplicate metrics collector registration attempted" { logger.Errorf("Failed to register gauge: %v", err) return e } else { log.Fatalf("Failed to register gauge: %v", err) } } e.promMap.Store(e.RP.RuleMonitorAlias, gauge) return e } func (e *EventHandler) SetPromKV() *EventHandler { e.GetRegisteredGauge(e.RP.RuleMonitorAlias).WithLabelValues( e.RP.AppAlias, e.RP.RuleKind, e.JR.UID, ).Set(float64(e.JR.RowGaugeResult)) logger.Debugf("e.JR.RowGaugeResult", e.JR.RowGaugeResult) return e } // checkCollectorExists 检查注册表中是否存在具有给定名称的collector func CheckCollectorExists(reg *prometheus.Registry, name string) error { merticF, err := reg.Gather() if err != nil { logger.Errorf("checkCollector, error: %s", err.Error()) } for _, m := range merticF { fmt.Println("metricFamily: ", m) } return nil } func (e *EventHandler) GetRegisteredGauge(gaugeName string) *prometheus.GaugeVec { logger.Debugf("guageName: %s", gaugeName) logger.Debug(e.promMap) promk, ok := e.promMap.Load(gaugeName) if ok { logger.Debug("has guage name") return promk.(*prometheus.GaugeVec) } else { e.RegisterPrometheusGauge() logger.Debug("recreate prometh guagename") return e.GetRegisteredGauge(gaugeName) } } func (e *EventHandler) CreateEventRecord(chdb *gorm.DB) *EventHandler { if e.JR.IsException { row := "UNSET" rows := make([]map[string]string, 0) if e.RP.RuleValueType == "row" { row = fmt.Sprintf("%v", e.JR.RowResult) } else { for _, d := range e.JR.RowsResult { tmp := make(map[string]string) for k, v := range d { tmp[k] = fmt.Sprintf("%v", v) } rows = append(rows, tmp) } } var exceptionName string if e.RP.RuleValueType == "rows" { if e.JR.AlertStatus == PENDING { //业务状态码异常数(近30分钟), x分钟内发生x次 exceptionName = fmt.Sprintf("%s, %d分钟内发生: %v次", e.RP.RuleName, e.AC.Interval, e.JR.CompareV) } else if e.JR.AlertStatus == FIRING { exceptionName = fmt.Sprintf("%s, %d分钟内发生%s%v次, 当前值为: %v", e.RP.RuleName, e.AC.Interval, e.AC.Condition, e.JR.CompareV, e.JR.CompareV) } } else { exceptionName = fmt.Sprintf("%s%s %v, 当前值为: %v", e.RP.RuleName, e.AC.Condition, e.AC.Point, e.JR.CompareV) } emo := omodels.Events{ Timestamp: time.Now(), AppendTime: e.RP.CreateTime, UID: e.JR.UID, AppID: int64(e.RP.AppId), RuleID: int64(e.RP.RuleId), AppName: e.RP.AppName, AppAlias: e.RP.AppAlias, ExceptionName: exceptionName, RuleInfo: map[string]string{ "policy": e.RP.Policy, "rule_kind": e.RP.RuleKind, "rule_group": e.RP.RuleGroup, "rule_data_source": e.RP.RuleDataSource, "rule_table": e.RP.RuleTable, "rule_value_type": e.RP.RuleValueType, }, CompareV: fmt.Sprintf("%v", e.JR.CompareV), RowResult: row, RowsResult: rows, AlertStatus: int64(e.JR.AlertStatus), } // logger.Debug("emo: ", emo) if err := chdb.Table(emo.TableName()).Create(emo).Error; err != nil { e.Errs["CreateEventRecord_func(Create)"] = err logger.Error("create emos err:", err.Error()) } if e.JR.AlertStatus == FIRING { e.Emo = &emo } } return e } func (e *EventHandler) CreateAlert(db *gorm.DB) *EventHandler { if e.JR.AlertStatus == FIRING { //TODO: 发送至告警队列 fe := amodels.OtFireEvents{ AppId: e.Emo.AppID, RuleId: e.Emo.RuleID, ExceptionName: e.Emo.ExceptionName, RecordTime: e.Emo.Timestamp, AppendTime: e.Emo.AppendTime, Uid: e.Emo.UID, AppName: e.Emo.AppName, AppAlias: e.Emo.AppAlias, RowResult: fmt.Sprintf("%v", e.Emo.RowResult), AlertStatus: e.Emo.AlertStatus, IsKnow: 0, IsResolve: 0, IsIgnore: 0, } var err error rif, err := json.Marshal(e.Emo.RuleInfo) if err != nil { logger.Error("Marshal rule_info err: ", err) e.Errs["Marshal rule_info err"] = err } fe.RuleInfo = string(rif) rowst, err := json.Marshal(e.Emo.RowsResult) if err != nil { logger.Error("Marshal rows_result err: ", err) e.Errs["Marshal rows_result err"] = err } fe.RowsResult = string(rowst) if err = db.Table(fe.TableName()).Create(&fe).Error; err != nil { logger.Error("Create ot_fire_event err: ", err) e.Errs["Create ot_fire_event err"] = err } } return e } func (e *EventHandler) assertType(v, point interface{}, t string) {} // > func (e *EventHandler) g(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) > point.(float64) case "int": return v.(int) > int(point.(float64)) } return false } // >= func (e *EventHandler) ge(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) >= point.(float64) case "int": return v.(int) >= int(point.(float64)) } return false } // < func (e *EventHandler) l(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) < point.(float64) case "int": return v.(int) < int(point.(float64)) } return false } // <= func (e *EventHandler) le(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) <= point.(float64) case "int": return v.(int) <= int(point.(float64)) } return false } // == func (e *EventHandler) e(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) == point.(float64) case "int": return v.(int) == int(point.(float64)) case "string": return v.(string) == point.(string) } return false } // != func (e *EventHandler) ne(v, point interface{}, t string) bool { switch t { case "float": return v.(float64) != point.(float64) case "int": return v.(int) != int(point.(float64)) case "string": return v.(string) != point.(string) } return false }