package handler import ( "encoding/json" "fmt" "log" "strconv" "strings" "sync" "time" "github.com/go-admin-team/go-admin-core/logger" "github.com/go-admin-team/go-admin-core/sdk/config" "github.com/go-admin-team/go-admin-core/storage" "github.com/panjf2000/ants/v2" amodels "go-admin/app/admin/models" omodels "go-admin/app/observe/models" "go-admin/common/olap" extConfig "go-admin/config" "go-admin/utils" "gorm.io/driver/mysql" "gorm.io/gorm" ) var ( dbInstance *gorm.DB chInstance *gorm.DB alertManagerInstance *AlertManager ) const ( INACTIVE = 1 << iota PENDING FIRING SENDSUCCESS DEFAULTSMSTPL = ` 所属系统:{{app_name}} 事件名称:{{events_name}} 事件创建时间:{{date_time}} 事件描述:{{events}}` ) // type EventsAction interface { // Judge() *EventsAction // PointCompare() *EventsAction // JudgeTriggerHz() *EventsAction // JudgeInterval(*gorm.DB) *EventsAction // SetUID() *EventsAction // CreateEventRecord(*gorm.DB) *EventsAction // } /* ## rules结构 { "rule_name": "业务健康评分(近5分钟)" "rule_alias": "apdex::health:5m" "rule_group": "apdex" "project": "" "rule_body": models.OtMonitorRules "policy_body": xxxx // TODO: "create_time": time.Time } ## alert_policy结构 @@ float数值类型 { "condition":"> = < ", "point":7.0, "tigger_hz": 3, //次,最小为1 "interval": 5 //min,选0,表示立即触发 } @@ 关键字判断类型 { "condition":"=", "point":"我是错误关键字", //float、string、int类型 "tigger_hz": 3, //次,最小为1 "interval": 5 //min,选0,表示立即触发 } @@ 次数判断 { "condition":"=", "point":2, //float、string、int类型 "tigger_hz": 3, //次,最小为1 "interval": 5 //min,选0,表示立即触发 } ## 告警结构 { "alert_name": "", // Policy中定义这个name模版 "rule_alias": "", // apdex::health:5m "project": "Project1", // 所属app_alias "alert_msg": "", // Policy中定义的 msg模版 "labels": { "env": "prod", "service": "service1" }, "alert_body": "xxxxx", "is_done": false, "is_know": false } */ func GetDBInstance() *gorm.DB { if dbInstance == nil { db, err := gorm.Open(mysql.Open(config.DatabaseConfig.Source), &gorm.Config{}) if err != nil { logger.Debug("datasource: ", config.DatabaseConfig.Source) logger.Fatal("create db instance err: ", err) } dbInstance = db } return dbInstance } func GetCHInstance() *gorm.DB { if chInstance == nil { chInstance = olap.GetClickhouseOrm() } return chInstance } func GetPrometheusInstance() string { fmt.Println(extConfig.ExtConfig.Prometheus.Address) return extConfig.ExtConfig.Prometheus.Address } func GetSmsConfigInstance(db *gorm.DB) *SmsConfig { sc := new(SmsConfig) sc.Appkey = extConfig.ExtConfig.SmsConfig.Appkey sc.Appsecret = extConfig.ExtConfig.SmsConfig.Appsecret // sc.SmsTpl = extConfig.ExtConfig.SmsConfig.SmsTpl // TODO: set default sms template,需要修改为外部模版 sc.SmsTpl = DEFAULTSMSTPL sc.Url = extConfig.ExtConfig.SmsConfig.Url sc.SQLRecord = extConfig.ExtConfig.SmsConfig.SqlRecord //判断使用配置文件默认联系人还是应用管理的联系人,通常快速测试时使用配置文件,此时sqlrecord为false recordMap := make(map[string][]string) if sc.SQLRecord { apps := make([]amodels.OtApps, 0) if err := db.Find(&apps).Error; err != nil { logger.Error("sms, get app list err: ", err) return nil } for _, app := range apps { recordMap[app.Alias] = strings.Split(app.ContractPhone, ",") } } else { parts := strings.Split(extConfig.ExtConfig.SmsConfig.AppsGroup, ";") for _, part := range parts { if part == "" { continue } subParts := strings.SplitN(part, ":", 2) if len(subParts) != 2 { continue } id := subParts[0] phoneNumbers := subParts[1] recordMap[id] = strings.Split(phoneNumbers, ",") } } sc.AppsGroup = recordMap logger.Info("sms phone map: ", sc.AppsGroup) return sc } func GetApps() ([]amodels.OtApps, error) { appList := make([]amodels.OtApps, 0) if err := GetDBInstance().Find(&appList).Error; err != nil { logger.Error("get app list err: ", err) return nil, err } logger.Debug("apps: ", appList) return appList, nil } func SetAlertManager() { ruleChanLength, _ := strconv.Atoi(utils.GetDefaultEnv("RULECHANLENGTH", 200)) interval, _ := strconv.Atoi(utils.GetDefaultEnv("INTERVAL", 30)) runtimes, _ := strconv.Atoi(utils.GetDefaultEnv("GORUNTIMES", 200)) retry, _ := strconv.Atoi(utils.GetDefaultEnv("MAXRETRY", 0)) am := new(AlertManager) am.RulesChan = make(chan omodels.OtRulesPolicy, ruleChanLength) //TODO: 配置 am.Interval = interval am.Runtime = runtimes am.MaxRetry = retry // cacheAdapter, err := config.CacheConfig.Setup() // if err != nil { // log.Fatal("set redis Error") // } am.DB = GetDBInstance() am.CH = GetCHInstance() am.PromAddr = GetPrometheusInstance() am.PromMap = new(sync.Map) // am.Redis = cacheAdapter p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) { am.consumerHandler(i) }) am.P = p alertManagerInstance = am } func GetAlertManager() *AlertManager { if alertManagerInstance == nil { SetAlertManager() } return alertManagerInstance } func CloseAlertManager() { if alertManagerInstance != nil { close(alertManagerInstance.RulesChan) alertManagerInstance.P.ReleaseTimeout(3 * time.Second) } return } type AlertManager struct { RulesChan chan omodels.OtRulesPolicy P *ants.PoolWithFunc DB *gorm.DB CH *gorm.DB PromAddr string Redis storage.AdapterCache Interval int //TODO: 设置配置项 Runtime int //Goroutine 数量 MaxRetry int //重试次数 PromMap *sync.Map } func (a *AlertManager) PolicyProducter() { rules, err := a.GetAppsAlertPolicy() if err != nil { logger.Errorf("Get alert policy err: ", err.Error()) return } for _, v := range rules { logger.Debug("product policy: ", v) v.CreateTime = time.Now() a.RulesChan <- v } } func (a *AlertManager) GetAppsAlertPolicy() ([]omodels.OtRulesPolicy, error) { result := make([]omodels.OtRulesPolicy, 0) if err := a.DB.Raw(`SELECT ap.*, a.name AS app_name, a.alias AS app_alias, mr.name AS rule_name, mr.monitor_alias AS rule_monitor_alias, mr.kind AS rule_kind, mr.group AS rule_group, mr.data_source AS rule_data_source, mr.data_source AS rule_data_source, mr.table AS rule_table, mr.value_type AS rule_value_type, mr.expression AS rule_expression, mr.interval AS rule_interval, mr.verify AS rule_verify, mr.power AS rule_power FROM ot_alert_policy ap LEFT JOIN ot_apps a ON ap.app_id = a.id LEFT JOIN ot_monitor_rules mr ON ap.rule_id = mr.id where app_id != 999999 and mr.power = 1 and mr.verify = 1 and ap.power = 1 and ap.deleted_at is null and a.deleted_at is null and mr.deleted_at is null`). Scan(&result).Error; err != nil { log.Println("get app alert policy err: ", err) return nil, err } // log.Println("policy: ", result) return result, nil } func (a *AlertManager) GetMonitorRules() ([]amodels.OtMonitorRules, error) { monitorRules := make([]amodels.OtMonitorRules, 0) if err := a.DB.Find(&monitorRules).Where("verify = ?", 1).Where("power = ?", 1).Error; err != nil { logger.Error("get app list err: ", err) return nil, err } return monitorRules, nil } func (a *AlertManager) PolicyConsumer() { for r := range a.RulesChan { _ = a.P.Invoke(r) } } func (a *AlertManager) consumerHandler(i interface{}) { var err error c := 0 eHandler := InitEventHandler(a.PromMap, GetSmsConfigInstance(a.DB)) rp, ok := i.(omodels.OtRulesPolicy) if ok { eHandler.RP = &rp } else { logger.Errorf("assert rp error") goto DONE } // logger.Debug("consumer: ", eHandler.RP) LOOP: if err = json.Unmarshal([]byte(eHandler.RP.Policy), eHandler.AC); err != nil { c += 1 if c < a.MaxRetry { goto LOOP } goto DONE } logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC) logger.Debug("ch expression: ", eHandler.RP.RuleExpression) eHandler.RP.RuleValueType = "prometheus" if err = a.chQueryHandler(eHandler); err != nil { c += 1 if c < a.MaxRetry { goto LOOP } goto DONE } if len(eHandler. JudgeRow(). //判断指标还是事件 SetUID(). // RegisterPrometheusGauge(). //注册指标metric guage collector SetPromKV(). //暴露指标 TODO: PointCompare(). //判断指标与阈值比较 JudgeInterval(a.CH). //判断触发周期 SendMsg(). //发送短信 CreateEventRecord(a.CH). //记录事件 CreateAlert(a.DB). //判断本次检测是否发出告警 Errs) > 0 { c += 1 if c < a.MaxRetry { goto LOOP } goto DONE } DONE: if len(eHandler.Errs) > 0 || err != nil { logger.Error("Event 分析异常: ") for k, v := range eHandler.Errs { logger.Errorf("%s err: %s", k, v) } // 本次分析异常,将任务发回或销毁//TODO: 记录 // a.RulesChan <- ap return } if !eHandler.JR.IsException { logger.Info("分析结果: 无异常事件,状态为", eHandler.JR.AlertStatus) } else { logger.Warn("分析结果:发现异常事件,状态为", eHandler.JR.AlertStatus) } logger.Info("Event 分析结束") return } func (a *AlertManager) replaceExpressionPlaceholders(e *EventHandler) string { e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{app_alias}}", fmt.Sprintf("'%s'", e.RP.AppAlias), -1) e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{check_interval}}", fmt.Sprintf("%d", e.RP.RuleInterval), -1) logger.Debug("rule expression: ", e.RP.RuleExpression) return e.RP.RuleExpression } func (a *AlertManager) chQueryHandler(e *EventHandler) error { switch e.RP.RuleValueType { case "row": var result interface{} if err := a.CH.Table(e.RP.RuleTable). Raw(a.replaceExpressionPlaceholders(e)). Row().Scan(&result); err != nil { logger.Error("rows query err: ", err.Error()) return err } e.JR.RowResult = result logger.Debug("row result: ", e.JR.RowResult) case "rows": rows, err := a.CH.Table(e.RP.RuleTable).Raw( e.RP.RuleExpression, // e.RP.RuleInterval, e.AC.Interval, e.RP.AppAlias).Rows() if err != nil { logger.Error("rows query err: ", err.Error()) return err } rlist := make([]map[string]interface{}, 0) defer rows.Close() for rows.Next() { r := make(map[string]interface{}) if err := a.CH.ScanRows(rows, &r); err != nil { logger.Errorf("rows scan err: %s", err) return err } rlist = append(rlist, r) } e.JR.RowsResult = rlist logger.Debug("rows compare result: ", len(e.JR.RowsResult)) default: var result *float64 // var result interface{} if err := a.CH.Table(e.RP.RuleTable). Raw(a.replaceExpressionPlaceholders(e)). Row().Scan(&result); err != nil { logger.Error("default rows query err: ", err.Error()) return err } e.JR.RowGaugeResult = *result logger.Debug("row result: ", e.JR.RowGaugeResult) } return nil }