handler.go 10 KB


  1. package handler
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-admin-team/go-admin-core/logger"
  11. "github.com/go-admin-team/go-admin-core/sdk/config"
  12. "github.com/go-admin-team/go-admin-core/storage"
  13. "github.com/panjf2000/ants/v2"
  14. amodels "go-admin/app/admin/models"
  15. omodels "go-admin/app/observe/models"
  16. "go-admin/common/olap"
  17. extConfig "go-admin/config"
  18. "go-admin/utils"
  19. "gorm.io/driver/mysql"
  20. "gorm.io/gorm"
  21. )
  22. var (
  23. dbInstance *gorm.DB
  24. chInstance *gorm.DB
  25. alertManagerInstance *AlertManager
  26. )
  27. const (
  28. INACTIVE = 1 << iota
  29. PENDING
  30. FIRING
  31. SENDSUCCESS
  32. DEFAULTSMSTPL = `
  33. 【江苏省电力公司】
  34. 所属系统:{{app_name}}
  35. 事件名称:{{events_name}}
  36. 事件创建时间:{{date_time}}
  37. 事件描述:{{events}}`
  38. )
  39. // type EventsAction interface {
  40. // Judge() *EventsAction
  41. // PointCompare() *EventsAction
  42. // JudgeTriggerHz() *EventsAction
  43. // JudgeInterval(*gorm.DB) *EventsAction
  44. // SetUID() *EventsAction
  45. // CreateEventRecord(*gorm.DB) *EventsAction
  46. // }
  47. /*
  48. ## rules结构
  49. {
  50. "rule_name": "业务健康评分(近5分钟)"
  51. "rule_alias": "apdex::health:5m"
  52. "rule_group": "apdex"
  53. "project": ""
  54. "rule_body": models.OtMonitorRules
  55. "policy_body": xxxx // TODO:
  56. "create_time": time.Time
  57. }
  58. ## alert_policy结构
  59. @@ float数值类型
  60. {
  61. "condition":"> = < ",
  62. "point":7.0,
  63. "tigger_hz": 3, //次,最小为1
  64. "interval": 5 //min,选0,表示立即触发
  65. }
  66. @@ 关键字判断类型
  67. {
  68. "condition":"=",
  69. "point":"我是错误关键字", //float、string、int类型
  70. "tigger_hz": 3, //次,最小为1
  71. "interval": 5 //min,选0,表示立即触发
  72. }
  73. @@ 次数判断
  74. {
  75. "condition":"=",
  76. "point":2, //float、string、int类型
  77. "tigger_hz": 3, //次,最小为1
  78. "interval": 5 //min,选0,表示立即触发
  79. }
  80. ## 告警结构
  81. {
  82. "alert_name": "", // Policy中定义这个name模版
  83. "rule_alias": "", // apdex::health:5m
  84. "project": "Project1", // 所属app_alias
  85. "alert_msg": "", // Policy中定义的 msg模版
  86. "labels": {
  87. "env": "prod",
  88. "service": "service1"
  89. },
  90. "alert_body": "xxxxx",
  91. "is_done": false,
  92. "is_know": false
  93. }
  94. */
  95. func GetDBInstance() *gorm.DB {
  96. if dbInstance == nil {
  97. db, err := gorm.Open(mysql.Open(config.DatabaseConfig.Source), &gorm.Config{})
  98. if err != nil {
  99. logger.Debug("datasource: ", config.DatabaseConfig.Source)
  100. logger.Fatal("create db instance err: ", err)
  101. }
  102. dbInstance = db
  103. }
  104. return dbInstance
  105. }
  106. func GetCHInstance() *gorm.DB {
  107. if chInstance == nil {
  108. chInstance = olap.GetClickhouseOrm()
  109. }
  110. return chInstance
  111. }
  112. func GetPrometheusInstance() string {
  113. fmt.Println(extConfig.ExtConfig.Prometheus.Address)
  114. return extConfig.ExtConfig.Prometheus.Address
  115. }
  116. func GetSmsConfigInstance() *SmsConfig {
  117. sc := new(SmsConfig)
  118. sc.Appkey = extConfig.ExtConfig.SmsConfig.Appkey
  119. sc.Appsecret = extConfig.ExtConfig.SmsConfig.Appsecret
  120. // sc.SmsTpl = extConfig.ExtConfig.SmsConfig.SmsTpl
  121. // TODO: set default sms template,需要修改为外部模版
  122. sc.SmsTpl = DEFAULTSMSTPL
  123. sc.Url = extConfig.ExtConfig.SmsConfig.Url
  124. // sc.SQLRecord = extConfig.ExtConfig.SmsConfig.SQLRecord
  125. recordMap := make(map[string][]string)
  126. parts := strings.Split(extConfig.ExtConfig.SmsConfig.AppsGroup, ";")
  127. for _, part := range parts {
  128. if part == "" {
  129. continue
  130. }
  131. subParts := strings.SplitN(part, ":", 2)
  132. if len(subParts) != 2 {
  133. continue
  134. }
  135. id := subParts[0]
  136. phoneNumbers := subParts[1]
  137. recordMap[id] = strings.Split(phoneNumbers, ",")
  138. }
  139. sc.AppsGroup = recordMap
  140. logger.Info("sms phone map: ", sc.AppsGroup)
  141. return sc
  142. }
  143. func GetApps() ([]amodels.OtApps, error) {
  144. appList := make([]amodels.OtApps, 0)
  145. if err := GetDBInstance().Find(&appList).Error; err != nil {
  146. logger.Error("get app list err: ", err)
  147. return nil, err
  148. }
  149. logger.Debug("apps: ", appList)
  150. return appList, nil
  151. }
  152. func SetAlertManager() {
  153. ruleChanLength, _ := strconv.Atoi(utils.GetDefaultEnv("RULECHANLENGTH", 200))
  154. interval, _ := strconv.Atoi(utils.GetDefaultEnv("INTERVAL", 30))
  155. runtimes, _ := strconv.Atoi(utils.GetDefaultEnv("GORUNTIMES", 200))
  156. retry, _ := strconv.Atoi(utils.GetDefaultEnv("MAXRETRY", 0))
  157. am := new(AlertManager)
  158. am.RulesChan = make(chan omodels.OtRulesPolicy, ruleChanLength) //TODO: 配置
  159. am.Interval = interval
  160. am.Runtime = runtimes
  161. am.MaxRetry = retry
  162. // cacheAdapter, err := config.CacheConfig.Setup()
  163. // if err != nil {
  164. // log.Fatal("set redis Error")
  165. // }
  166. am.DB = GetDBInstance()
  167. am.CH = GetCHInstance()
  168. am.PromAddr = GetPrometheusInstance()
  169. am.PromMap = new(sync.Map)
  170. // am.Redis = cacheAdapter
  171. p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) {
  172. am.consumerHandler(i)
  173. })
  174. am.P = p
  175. alertManagerInstance = am
  176. }
  177. func GetAlertManager() *AlertManager {
  178. if alertManagerInstance == nil {
  179. SetAlertManager()
  180. }
  181. return alertManagerInstance
  182. }
  183. func CloseAlertManager() {
  184. if alertManagerInstance != nil {
  185. close(alertManagerInstance.RulesChan)
  186. alertManagerInstance.P.ReleaseTimeout(3 * time.Second)
  187. }
  188. return
  189. }
  190. type AlertManager struct {
  191. RulesChan chan omodels.OtRulesPolicy
  192. P *ants.PoolWithFunc
  193. DB *gorm.DB
  194. CH *gorm.DB
  195. PromAddr string
  196. Redis storage.AdapterCache
  197. Interval int //TODO: 设置配置项
  198. Runtime int //Goroutine 数量
  199. MaxRetry int //重试次数
  200. PromMap *sync.Map
  201. }
  202. func (a *AlertManager) PolicyProducter() {
  203. rules, err := a.GetAppsAlertPolicy()
  204. if err != nil {
  205. logger.Errorf("Get alert policy err: ", err.Error())
  206. return
  207. }
  208. for _, v := range rules {
  209. logger.Debug("product policy: ", v)
  210. v.CreateTime = time.Now()
  211. a.RulesChan <- v
  212. }
  213. }
  214. func (a *AlertManager) GetAppsAlertPolicy() ([]omodels.OtRulesPolicy, error) {
  215. result := make([]omodels.OtRulesPolicy, 0)
  216. if err := a.DB.Raw(`SELECT ap.*, a.name AS app_name, a.alias AS app_alias,
  217. mr.name AS rule_name, mr.monitor_alias AS rule_monitor_alias,
  218. mr.kind AS rule_kind, mr.group AS rule_group, mr.data_source AS rule_data_source,
  219. mr.data_source AS rule_data_source, mr.table AS rule_table,
  220. mr.value_type AS rule_value_type, mr.expression AS rule_expression,
  221. mr.interval AS rule_interval, mr.verify AS rule_verify, mr.power AS rule_power
  222. FROM ot_alert_policy ap
  223. LEFT JOIN ot_apps a ON ap.app_id = a.id
  224. LEFT JOIN ot_monitor_rules mr ON ap.rule_id = mr.id
  225. where app_id != 999999 and mr.power = 1 and mr.verify = 1 and ap.power = 1
  226. and ap.deleted_at is null and a.deleted_at is null and mr.deleted_at is null`).
  227. Scan(&result).Error; err != nil {
  228. log.Println("get app alert policy err: ", err)
  229. return nil, err
  230. }
  231. // log.Println("policy: ", result)
  232. return result, nil
  233. }
  234. func (a *AlertManager) GetMonitorRules() ([]amodels.OtMonitorRules, error) {
  235. monitorRules := make([]amodels.OtMonitorRules, 0)
  236. if err := a.DB.Find(&monitorRules).Where("verify = ?", 1).Where("power = ?", 1).Error; err != nil {
  237. logger.Error("get app list err: ", err)
  238. return nil, err
  239. }
  240. return monitorRules, nil
  241. }
  242. func (a *AlertManager) PolicyConsumer() {
  243. for r := range a.RulesChan {
  244. _ = a.P.Invoke(r)
  245. }
  246. }
  247. func (a *AlertManager) consumerHandler(i interface{}) {
  248. var err error
  249. c := 0
  250. eHandler := InitEventHandler(a.PromMap, GetSmsConfigInstance())
  251. rp, ok := i.(omodels.OtRulesPolicy)
  252. if ok {
  253. eHandler.RP = &rp
  254. } else {
  255. logger.Errorf("assert rp error")
  256. goto DONE
  257. }
  258. // logger.Debug("consumer: ", eHandler.RP)
  259. LOOP:
  260. if err = json.Unmarshal([]byte(eHandler.RP.Policy), eHandler.AC); err != nil {
  261. c += 1
  262. if c < a.MaxRetry {
  263. goto LOOP
  264. }
  265. goto DONE
  266. }
  267. logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
  268. logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
  269. eHandler.RP.RuleValueType = "prometheus"
  270. if err = a.chQueryHandler(eHandler); err != nil {
  271. c += 1
  272. if c < a.MaxRetry {
  273. goto LOOP
  274. }
  275. goto DONE
  276. }
  277. if len(eHandler.
  278. JudgeRow(). //判断指标还是事件
  279. SetUID(). //
  280. RegisterPrometheusGauge(). //注册指标metric guage collector
  281. SetPromKV(). //暴露指标 TODO:
  282. PointCompare(). //判断指标与阈值比较
  283. JudgeInterval(a.CH). //判断触发周期
  284. SendMsg(). //发送短信
  285. CreateEventRecord(a.CH). //记录事件
  286. CreateAlert(a.DB). //判断本次检测是否发出告警
  287. Errs) > 0 {
  288. c += 1
  289. if c < a.MaxRetry {
  290. goto LOOP
  291. }
  292. goto DONE
  293. }
  294. DONE:
  295. if len(eHandler.Errs) > 0 || err != nil {
  296. logger.Error("Event 分析异常: ")
  297. for k, v := range eHandler.Errs {
  298. logger.Errorf("%s err: %s", k, v)
  299. }
  300. // 本次分析异常,将任务发回或销毁//TODO: 记录
  301. // a.RulesChan <- ap
  302. return
  303. }
  304. if !eHandler.JR.IsException {
  305. logger.Info("分析结果: 无异常事件,状态为", eHandler.JR.AlertStatus)
  306. } else {
  307. logger.Warn("分析结果:发现异常事件,状态为", eHandler.JR.AlertStatus)
  308. }
  309. logger.Info("Event 分析结束")
  310. return
  311. }
  312. func (a *AlertManager) replaceExpressionPlaceholders(e *EventHandler) string {
  313. e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{app_alias}}", fmt.Sprintf("'%s'", e.RP.AppAlias), -1)
  314. e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{check_interval}}", fmt.Sprintf("%d", e.RP.RuleInterval), -1)
  315. logger.Debug("rule expression: ", e.RP.RuleExpression)
  316. return e.RP.RuleExpression
  317. }
  318. func (a *AlertManager) chQueryHandler(e *EventHandler) error {
  319. switch e.RP.RuleValueType {
  320. case "row":
  321. var result interface{}
  322. if err := a.CH.Table(e.RP.RuleTable).
  323. Raw(a.replaceExpressionPlaceholders(e)).
  324. Row().Scan(&result); err != nil {
  325. logger.Error("rows query err: ", err.Error())
  326. return err
  327. }
  328. e.JR.RowResult = result
  329. logger.Debug("row result: ", e.JR.RowResult)
  330. case "rows":
  331. rows, err := a.CH.Table(e.RP.RuleTable).Raw(
  332. e.RP.RuleExpression,
  333. // e.RP.RuleInterval,
  334. e.AC.Interval,
  335. e.RP.AppAlias).Rows()
  336. if err != nil {
  337. logger.Error("rows query err: ", err.Error())
  338. return err
  339. }
  340. rlist := make([]map[string]interface{}, 0)
  341. defer rows.Close()
  342. for rows.Next() {
  343. r := make(map[string]interface{})
  344. if err := a.CH.ScanRows(rows, &r); err != nil {
  345. logger.Errorf("rows scan err: %s", err)
  346. return err
  347. }
  348. rlist = append(rlist, r)
  349. }
  350. e.JR.RowsResult = rlist
  351. logger.Debug("rows compare result: ", len(e.JR.RowsResult))
  352. default:
  353. var result *float64
  354. // var result interface{}
  355. if err := a.CH.Table(e.RP.RuleTable).
  356. Raw(a.replaceExpressionPlaceholders(e)).
  357. Row().Scan(&result); err != nil {
  358. logger.Error("default rows query err: ", err.Error())
  359. return err
  360. }
  361. e.JR.RowGaugeResult = *result
  362. logger.Debug("row result: ", e.JR.RowGaugeResult)
  363. }
  364. return nil
  365. }