|
- package handler
- import (
- "encoding/json"
- "fmt"
- "log"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/go-admin-team/go-admin-core/logger"
- "github.com/go-admin-team/go-admin-core/sdk/config"
- "github.com/go-admin-team/go-admin-core/storage"
- "github.com/panjf2000/ants/v2"
- amodels "go-admin/app/admin/models"
- omodels "go-admin/app/observe/models"
- "go-admin/common/olap"
- extConfig "go-admin/config"
- "go-admin/utils"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- )
- var (
- dbInstance *gorm.DB
- chInstance *gorm.DB
- alertManagerInstance *AlertManager
- )
- const (
- INACTIVE = 1 << iota
- PENDING
- FIRING
- SENDSUCCESS
- DEFAULTSMSTPL = `
- 【江苏省电力公司】
- 所属系统:{{app_name}}
- 事件名称:{{events_name}}
- 事件创建时间:{{date_time}}
- 事件描述:{{events}}`
- )
- // type EventsAction interface {
- // Judge() *EventsAction
- // PointCompare() *EventsAction
- // JudgeTriggerHz() *EventsAction
- // JudgeInterval(*gorm.DB) *EventsAction
- // SetUID() *EventsAction
- // CreateEventRecord(*gorm.DB) *EventsAction
- // }
- /*
- ## rules结构
- {
- "rule_name": "业务健康评分(近5分钟)"
- "rule_alias": "apdex::health:5m"
- "rule_group": "apdex"
- "project": ""
- "rule_body": models.OtMonitorRules
- "policy_body": xxxx // TODO:
- "create_time": time.Time
- }
- ## alert_policy结构
- @@ float数值类型
- {
- "condition":"> = < ",
- "point":7.0,
- "tigger_hz": 3, //次,最小为1
- "interval": 5 //min,选0,表示立即触发
- }
- @@ 关键字判断类型
- {
- "condition":"=",
- "point":"我是错误关键字", //float、string、int类型
- "tigger_hz": 3, //次,最小为1
- "interval": 5 //min,选0,表示立即触发
- }
- @@ 次数判断
- {
- "condition":"=",
- "point":2, //float、string、int类型
- "tigger_hz": 3, //次,最小为1
- "interval": 5 //min,选0,表示立即触发
- }
- ## 告警结构
- {
- "alert_name": "", // Policy中定义这个name模版
- "rule_alias": "", // apdex::health:5m
- "project": "Project1", // 所属app_alias
- "alert_msg": "", // Policy中定义的 msg模版
- "labels": {
- "env": "prod",
- "service": "service1"
- },
- "alert_body": "xxxxx",
- "is_done": false,
- "is_know": false
- }
- */
- func GetDBInstance() *gorm.DB {
- if dbInstance == nil {
- db, err := gorm.Open(mysql.Open(config.DatabaseConfig.Source), &gorm.Config{})
- if err != nil {
- logger.Debug("datasource: ", config.DatabaseConfig.Source)
- logger.Fatal("create db instance err: ", err)
- }
- dbInstance = db
- }
- return dbInstance
- }
- func GetCHInstance() *gorm.DB {
- if chInstance == nil {
- chInstance = olap.GetClickhouseOrm()
- }
- return chInstance
- }
- func GetPrometheusInstance() string {
- fmt.Println(extConfig.ExtConfig.Prometheus.Address)
- return extConfig.ExtConfig.Prometheus.Address
- }
- func GetSmsConfigInstance() *SmsConfig {
- sc := new(SmsConfig)
- sc.Appkey = extConfig.ExtConfig.SmsConfig.Appkey
- sc.Appsecret = extConfig.ExtConfig.SmsConfig.Appsecret
- // sc.SmsTpl = extConfig.ExtConfig.SmsConfig.SmsTpl
- // TODO: set default sms template,需要修改为外部模版
- sc.SmsTpl = DEFAULTSMSTPL
- sc.Url = extConfig.ExtConfig.SmsConfig.Url
- // sc.SQLRecord = extConfig.ExtConfig.SmsConfig.SQLRecord
- recordMap := make(map[string][]string)
- parts := strings.Split(extConfig.ExtConfig.SmsConfig.AppsGroup, ";")
- for _, part := range parts {
- if part == "" {
- continue
- }
- subParts := strings.SplitN(part, ":", 2)
- if len(subParts) != 2 {
- continue
- }
- id := subParts[0]
- phoneNumbers := subParts[1]
- recordMap[id] = strings.Split(phoneNumbers, ",")
- }
- sc.AppsGroup = recordMap
- logger.Info("sms phone map: ", sc.AppsGroup)
- return sc
- }
- func GetApps() ([]amodels.OtApps, error) {
- appList := make([]amodels.OtApps, 0)
- if err := GetDBInstance().Find(&appList).Error; err != nil {
- logger.Error("get app list err: ", err)
- return nil, err
- }
- logger.Debug("apps: ", appList)
- return appList, nil
- }
- func SetAlertManager() {
- ruleChanLength, _ := strconv.Atoi(utils.GetDefaultEnv("RULECHANLENGTH", 200))
- interval, _ := strconv.Atoi(utils.GetDefaultEnv("INTERVAL", 30))
- runtimes, _ := strconv.Atoi(utils.GetDefaultEnv("GORUNTIMES", 200))
- retry, _ := strconv.Atoi(utils.GetDefaultEnv("MAXRETRY", 0))
- am := new(AlertManager)
- am.RulesChan = make(chan omodels.OtRulesPolicy, ruleChanLength) //TODO: 配置
- am.Interval = interval
- am.Runtime = runtimes
- am.MaxRetry = retry
- // cacheAdapter, err := config.CacheConfig.Setup()
- // if err != nil {
- // log.Fatal("set redis Error")
- // }
- am.DB = GetDBInstance()
- am.CH = GetCHInstance()
- am.PromAddr = GetPrometheusInstance()
- am.PromMap = new(sync.Map)
- // am.Redis = cacheAdapter
- p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) {
- am.consumerHandler(i)
- })
- am.P = p
- alertManagerInstance = am
- }
- func GetAlertManager() *AlertManager {
- if alertManagerInstance == nil {
- SetAlertManager()
- }
- return alertManagerInstance
- }
- func CloseAlertManager() {
- if alertManagerInstance != nil {
- close(alertManagerInstance.RulesChan)
- alertManagerInstance.P.ReleaseTimeout(3 * time.Second)
- }
- return
- }
- type AlertManager struct {
- RulesChan chan omodels.OtRulesPolicy
- P *ants.PoolWithFunc
- DB *gorm.DB
- CH *gorm.DB
- PromAddr string
- Redis storage.AdapterCache
- Interval int //TODO: 设置配置项
- Runtime int //Goroutine 数量
- MaxRetry int //重试次数
- PromMap *sync.Map
- }
- func (a *AlertManager) PolicyProducter() {
- rules, err := a.GetAppsAlertPolicy()
- if err != nil {
- logger.Errorf("Get alert policy err: ", err.Error())
- return
- }
- for _, v := range rules {
- logger.Debug("product policy: ", v)
- v.CreateTime = time.Now()
- a.RulesChan <- v
- }
- }
- func (a *AlertManager) GetAppsAlertPolicy() ([]omodels.OtRulesPolicy, error) {
- result := make([]omodels.OtRulesPolicy, 0)
- if err := a.DB.Raw(`SELECT ap.*, a.name AS app_name, a.alias AS app_alias,
- mr.name AS rule_name, mr.monitor_alias AS rule_monitor_alias,
- mr.kind AS rule_kind, mr.group AS rule_group, mr.data_source AS rule_data_source,
- mr.data_source AS rule_data_source, mr.table AS rule_table,
- mr.value_type AS rule_value_type, mr.expression AS rule_expression,
- mr.interval AS rule_interval, mr.verify AS rule_verify, mr.power AS rule_power
- FROM ot_alert_policy ap
- LEFT JOIN ot_apps a ON ap.app_id = a.id
- LEFT JOIN ot_monitor_rules mr ON ap.rule_id = mr.id
- where app_id != 999999 and mr.power = 1 and mr.verify = 1 and ap.power = 1
- and ap.deleted_at is null and a.deleted_at is null and mr.deleted_at is null`).
- Scan(&result).Error; err != nil {
- log.Println("get app alert policy err: ", err)
- return nil, err
- }
- // log.Println("policy: ", result)
- return result, nil
- }
- func (a *AlertManager) GetMonitorRules() ([]amodels.OtMonitorRules, error) {
- monitorRules := make([]amodels.OtMonitorRules, 0)
- if err := a.DB.Find(&monitorRules).Where("verify = ?", 1).Where("power = ?", 1).Error; err != nil {
- logger.Error("get app list err: ", err)
- return nil, err
- }
- return monitorRules, nil
- }
- func (a *AlertManager) PolicyConsumer() {
- for r := range a.RulesChan {
- _ = a.P.Invoke(r)
- }
- }
- func (a *AlertManager) consumerHandler(i interface{}) {
- var err error
- c := 0
- eHandler := InitEventHandler(a.PromMap, GetSmsConfigInstance())
- rp, ok := i.(omodels.OtRulesPolicy)
- if ok {
- eHandler.RP = &rp
- } else {
- logger.Errorf("assert rp error")
- goto DONE
- }
- // logger.Debug("consumer: ", eHandler.RP)
- LOOP:
- if err = json.Unmarshal([]byte(eHandler.RP.Policy), eHandler.AC); err != nil {
- c += 1
- if c < a.MaxRetry {
- goto LOOP
- }
- goto DONE
- }
- logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
- logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
- eHandler.RP.RuleValueType = "prometheus"
- if err = a.chQueryHandler(eHandler); err != nil {
- c += 1
- if c < a.MaxRetry {
- goto LOOP
- }
- goto DONE
- }
- if len(eHandler.
- JudgeRow(). //判断指标还是事件
- SetUID(). //
- RegisterPrometheusGauge(). //注册指标metric guage collector
- SetPromKV(). //暴露指标 TODO:
- PointCompare(). //判断指标与阈值比较
- JudgeInterval(a.CH). //判断触发周期
- SendMsg(). //发送短信
- CreateEventRecord(a.CH). //记录事件
- CreateAlert(a.DB). //判断本次检测是否发出告警
- Errs) > 0 {
- c += 1
- if c < a.MaxRetry {
- goto LOOP
- }
- goto DONE
- }
- DONE:
- if len(eHandler.Errs) > 0 || err != nil {
- logger.Error("Event 分析异常: ")
- for k, v := range eHandler.Errs {
- logger.Errorf("%s err: %s", k, v)
- }
- // 本次分析异常,将任务发回或销毁//TODO: 记录
- // a.RulesChan <- ap
- return
- }
- if !eHandler.JR.IsException {
- logger.Info("分析结果: 无异常事件,状态为", eHandler.JR.AlertStatus)
- } else {
- logger.Warn("分析结果:发现异常事件,状态为", eHandler.JR.AlertStatus)
- }
- logger.Info("Event 分析结束")
- return
- }
- func (a *AlertManager) replaceExpressionPlaceholders(e *EventHandler) string {
- e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{app_alias}}", fmt.Sprintf("'%s'", e.RP.AppAlias), -1)
- e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{check_interval}}", fmt.Sprintf("%d", e.RP.RuleInterval), -1)
- logger.Debug("rule expression: ", e.RP.RuleExpression)
- return e.RP.RuleExpression
- }
- func (a *AlertManager) chQueryHandler(e *EventHandler) error {
- switch e.RP.RuleValueType {
- case "row":
- var result interface{}
- if err := a.CH.Table(e.RP.RuleTable).
- Raw(a.replaceExpressionPlaceholders(e)).
- Row().Scan(&result); err != nil {
- logger.Error("rows query err: ", err.Error())
- return err
- }
- e.JR.RowResult = result
- logger.Debug("row result: ", e.JR.RowResult)
- case "rows":
- rows, err := a.CH.Table(e.RP.RuleTable).Raw(
- e.RP.RuleExpression,
- // e.RP.RuleInterval,
- e.AC.Interval,
- e.RP.AppAlias).Rows()
- if err != nil {
- logger.Error("rows query err: ", err.Error())
- return err
- }
- rlist := make([]map[string]interface{}, 0)
- defer rows.Close()
- for rows.Next() {
- r := make(map[string]interface{})
- if err := a.CH.ScanRows(rows, &r); err != nil {
- logger.Errorf("rows scan err: %s", err)
- return err
- }
- rlist = append(rlist, r)
- }
- e.JR.RowsResult = rlist
- logger.Debug("rows compare result: ", len(e.JR.RowsResult))
- default:
- var result *float64
- // var result interface{}
- if err := a.CH.Table(e.RP.RuleTable).
- Raw(a.replaceExpressionPlaceholders(e)).
- Row().Scan(&result); err != nil {
- logger.Error("default rows query err: ", err.Error())
- return err
- }
- e.JR.RowGaugeResult = *result
- logger.Debug("row result: ", e.JR.RowGaugeResult)
- }
- return nil
- }
|