handler.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package handler
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-admin-team/go-admin-core/logger"
  11. "github.com/go-admin-team/go-admin-core/sdk/config"
  12. "github.com/go-admin-team/go-admin-core/storage"
  13. "github.com/panjf2000/ants/v2"
  14. amodels "go-admin/app/admin/models"
  15. omodels "go-admin/app/observe/models"
  16. "go-admin/common/olap"
  17. extConfig "go-admin/config"
  18. "go-admin/utils"
  19. "gorm.io/driver/mysql"
  20. "gorm.io/gorm"
  21. )
  22. var (
  23. dbInstance *gorm.DB
  24. chInstance *gorm.DB
  25. alertManagerInstance *AlertManager
  26. )
  27. const (
  28. INACTIVE = 1 << iota
  29. PENDING
  30. FIRING
  31. SENDSUCCESS
  32. DEFAULTSMSTPL = `
  33. 所属系统:{{app_name}}
  34. 事件名称:{{events_name}}
  35. 事件创建时间:{{date_time}}
  36. 事件描述:{{events}}`
  37. )
  38. // type EventsAction interface {
  39. // Judge() *EventsAction
  40. // PointCompare() *EventsAction
  41. // JudgeTriggerHz() *EventsAction
  42. // JudgeInterval(*gorm.DB) *EventsAction
  43. // SetUID() *EventsAction
  44. // CreateEventRecord(*gorm.DB) *EventsAction
  45. // }
  46. /*
  47. ## rules结构
  48. {
  49. "rule_name": "业务健康评分(近5分钟)"
  50. "rule_alias": "apdex::health:5m"
  51. "rule_group": "apdex"
  52. "project": ""
  53. "rule_body": models.OtMonitorRules
  54. "policy_body": xxxx // TODO:
  55. "create_time": time.Time
  56. }
  57. ## alert_policy结构
  58. @@ float数值类型
  59. {
  60. "condition":"> = < ",
  61. "point":7.0,
  62. "tigger_hz": 3, //次,最小为1
  63. "interval": 5 //min,选0,表示立即触发
  64. }
  65. @@ 关键字判断类型
  66. {
  67. "condition":"=",
  68. "point":"我是错误关键字", //float、string、int类型
  69. "tigger_hz": 3, //次,最小为1
  70. "interval": 5 //min,选0,表示立即触发
  71. }
  72. @@ 次数判断
  73. {
  74. "condition":"=",
  75. "point":2, //float、string、int类型
  76. "tigger_hz": 3, //次,最小为1
  77. "interval": 5 //min,选0,表示立即触发
  78. }
  79. ## 告警结构
  80. {
  81. "alert_name": "", // Policy中定义这个name模版
  82. "rule_alias": "", // apdex::health:5m
  83. "project": "Project1", // 所属app_alias
  84. "alert_msg": "", // Policy中定义的 msg模版
  85. "labels": {
  86. "env": "prod",
  87. "service": "service1"
  88. },
  89. "alert_body": "xxxxx",
  90. "is_done": false,
  91. "is_know": false
  92. }
  93. */
  94. func GetDBInstance() *gorm.DB {
  95. if dbInstance == nil {
  96. db, err := gorm.Open(mysql.Open(config.DatabaseConfig.Source), &gorm.Config{})
  97. if err != nil {
  98. logger.Debug("datasource: ", config.DatabaseConfig.Source)
  99. logger.Fatal("create db instance err: ", err)
  100. }
  101. dbInstance = db
  102. }
  103. return dbInstance
  104. }
  105. func GetCHInstance() *gorm.DB {
  106. if chInstance == nil {
  107. chInstance = olap.GetClickhouseOrm()
  108. }
  109. return chInstance
  110. }
  111. func GetPrometheusInstance() string {
  112. fmt.Println(extConfig.ExtConfig.Prometheus.Address)
  113. return extConfig.ExtConfig.Prometheus.Address
  114. }
  115. func GetSmsConfigInstance(db *gorm.DB) *SmsConfig {
  116. sc := new(SmsConfig)
  117. sc.Appkey = extConfig.ExtConfig.SmsConfig.Appkey
  118. sc.Appsecret = extConfig.ExtConfig.SmsConfig.Appsecret
  119. // sc.SmsTpl = extConfig.ExtConfig.SmsConfig.SmsTpl
  120. // TODO: set default sms template,需要修改为外部模版
  121. sc.SmsTpl = DEFAULTSMSTPL
  122. sc.Url = extConfig.ExtConfig.SmsConfig.Url
  123. sc.SQLRecord = extConfig.ExtConfig.SmsConfig.SqlRecord
  124. //判断使用配置文件默认联系人还是应用管理的联系人,通常快速测试时使用配置文件,此时sqlrecord为false
  125. recordMap := make(map[string][]string)
  126. if sc.SQLRecord {
  127. apps := make([]amodels.OtApps, 0)
  128. if err := db.Find(&apps).Error; err != nil {
  129. logger.Error("sms, get app list err: ", err)
  130. return nil
  131. }
  132. for _, app := range apps {
  133. recordMap[app.Alias] = strings.Split(app.ContractPhone, ",")
  134. }
  135. } else {
  136. parts := strings.Split(extConfig.ExtConfig.SmsConfig.AppsGroup, ";")
  137. for _, part := range parts {
  138. if part == "" {
  139. continue
  140. }
  141. subParts := strings.SplitN(part, ":", 2)
  142. if len(subParts) != 2 {
  143. continue
  144. }
  145. id := subParts[0]
  146. phoneNumbers := subParts[1]
  147. recordMap[id] = strings.Split(phoneNumbers, ",")
  148. }
  149. }
  150. sc.AppsGroup = recordMap
  151. logger.Info("sms phone map: ", sc.AppsGroup)
  152. return sc
  153. }
  154. func GetApps() ([]amodels.OtApps, error) {
  155. appList := make([]amodels.OtApps, 0)
  156. if err := GetDBInstance().Find(&appList).Error; err != nil {
  157. logger.Error("get app list err: ", err)
  158. return nil, err
  159. }
  160. logger.Debug("apps: ", appList)
  161. return appList, nil
  162. }
  163. func SetAlertManager() {
  164. ruleChanLength, _ := strconv.Atoi(utils.GetDefaultEnv("RULECHANLENGTH", 200))
  165. interval, _ := strconv.Atoi(utils.GetDefaultEnv("INTERVAL", 30))
  166. runtimes, _ := strconv.Atoi(utils.GetDefaultEnv("GORUNTIMES", 200))
  167. retry, _ := strconv.Atoi(utils.GetDefaultEnv("MAXRETRY", 0))
  168. am := new(AlertManager)
  169. am.RulesChan = make(chan omodels.OtRulesPolicy, ruleChanLength) //TODO: 配置
  170. am.Interval = interval
  171. am.Runtime = runtimes
  172. am.MaxRetry = retry
  173. // cacheAdapter, err := config.CacheConfig.Setup()
  174. // if err != nil {
  175. // log.Fatal("set redis Error")
  176. // }
  177. am.DB = GetDBInstance()
  178. am.CH = GetCHInstance()
  179. am.PromAddr = GetPrometheusInstance()
  180. am.PromMap = new(sync.Map)
  181. // am.Redis = cacheAdapter
  182. p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) {
  183. am.consumerHandler(i)
  184. })
  185. am.P = p
  186. alertManagerInstance = am
  187. }
  188. func GetAlertManager() *AlertManager {
  189. if alertManagerInstance == nil {
  190. SetAlertManager()
  191. }
  192. return alertManagerInstance
  193. }
  194. func CloseAlertManager() {
  195. if alertManagerInstance != nil {
  196. close(alertManagerInstance.RulesChan)
  197. alertManagerInstance.P.ReleaseTimeout(3 * time.Second)
  198. }
  199. return
  200. }
  201. type AlertManager struct {
  202. RulesChan chan omodels.OtRulesPolicy
  203. P *ants.PoolWithFunc
  204. DB *gorm.DB
  205. CH *gorm.DB
  206. PromAddr string
  207. Redis storage.AdapterCache
  208. Interval int //TODO: 设置配置项
  209. Runtime int //Goroutine 数量
  210. MaxRetry int //重试次数
  211. PromMap *sync.Map
  212. }
  213. func (a *AlertManager) PolicyProducter() {
  214. rules, err := a.GetAppsAlertPolicy()
  215. if err != nil {
  216. logger.Errorf("Get alert policy err: ", err.Error())
  217. return
  218. }
  219. for _, v := range rules {
  220. logger.Debug("product policy: ", v)
  221. v.CreateTime = time.Now()
  222. a.RulesChan <- v
  223. }
  224. }
  225. func (a *AlertManager) GetAppsAlertPolicy() ([]omodels.OtRulesPolicy, error) {
  226. result := make([]omodels.OtRulesPolicy, 0)
  227. if err := a.DB.Raw(`SELECT ap.*, a.name AS app_name, a.alias AS app_alias,
  228. mr.name AS rule_name, mr.monitor_alias AS rule_monitor_alias,
  229. mr.kind AS rule_kind, mr.group AS rule_group, mr.data_source AS rule_data_source,
  230. mr.data_source AS rule_data_source, mr.table AS rule_table,
  231. mr.value_type AS rule_value_type, mr.expression AS rule_expression,
  232. mr.interval AS rule_interval, mr.verify AS rule_verify, mr.power AS rule_power
  233. FROM ot_alert_policy ap
  234. LEFT JOIN ot_apps a ON ap.app_id = a.id
  235. LEFT JOIN ot_monitor_rules mr ON ap.rule_id = mr.id
  236. where app_id != 999999 and mr.power = 1 and mr.verify = 1 and ap.power = 1
  237. and ap.deleted_at is null and a.deleted_at is null and mr.deleted_at is null`).
  238. Scan(&result).Error; err != nil {
  239. log.Println("get app alert policy err: ", err)
  240. return nil, err
  241. }
  242. // log.Println("policy: ", result)
  243. return result, nil
  244. }
  245. func (a *AlertManager) GetMonitorRules() ([]amodels.OtMonitorRules, error) {
  246. monitorRules := make([]amodels.OtMonitorRules, 0)
  247. if err := a.DB.Find(&monitorRules).Where("verify = ?", 1).Where("power = ?", 1).Error; err != nil {
  248. logger.Error("get app list err: ", err)
  249. return nil, err
  250. }
  251. return monitorRules, nil
  252. }
  253. func (a *AlertManager) PolicyConsumer() {
  254. for r := range a.RulesChan {
  255. _ = a.P.Invoke(r)
  256. }
  257. }
  258. func (a *AlertManager) consumerHandler(i interface{}) {
  259. var err error
  260. c := 0
  261. eHandler := InitEventHandler(a.PromMap, GetSmsConfigInstance(a.DB))
  262. rp, ok := i.(omodels.OtRulesPolicy)
  263. if ok {
  264. eHandler.RP = &rp
  265. } else {
  266. logger.Errorf("assert rp error")
  267. goto DONE
  268. }
  269. // logger.Debug("consumer: ", eHandler.RP)
  270. LOOP:
  271. if err = json.Unmarshal([]byte(eHandler.RP.Policy), eHandler.AC); err != nil {
  272. c += 1
  273. if c < a.MaxRetry {
  274. goto LOOP
  275. }
  276. goto DONE
  277. }
  278. logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
  279. logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
  280. eHandler.RP.RuleValueType = "prometheus"
  281. if err = a.chQueryHandler(eHandler); err != nil {
  282. c += 1
  283. if c < a.MaxRetry {
  284. goto LOOP
  285. }
  286. goto DONE
  287. }
  288. if len(eHandler.
  289. JudgeRow(). //判断指标还是事件
  290. SetUID(). //
  291. RegisterPrometheusGauge(). //注册指标metric guage collector
  292. SetPromKV(). //暴露指标 TODO:
  293. PointCompare(). //判断指标与阈值比较
  294. JudgeInterval(a.CH). //判断触发周期
  295. SendMsg(). //发送短信
  296. CreateEventRecord(a.CH). //记录事件
  297. CreateAlert(a.DB). //判断本次检测是否发出告警
  298. Errs) > 0 {
  299. c += 1
  300. if c < a.MaxRetry {
  301. goto LOOP
  302. }
  303. goto DONE
  304. }
  305. DONE:
  306. if len(eHandler.Errs) > 0 || err != nil {
  307. logger.Error("Event 分析异常: ")
  308. for k, v := range eHandler.Errs {
  309. logger.Errorf("%s err: %s", k, v)
  310. }
  311. // 本次分析异常,将任务发回或销毁//TODO: 记录
  312. // a.RulesChan <- ap
  313. return
  314. }
  315. if !eHandler.JR.IsException {
  316. logger.Info("分析结果: 无异常事件,状态为", eHandler.JR.AlertStatus)
  317. } else {
  318. logger.Warn("分析结果:发现异常事件,状态为", eHandler.JR.AlertStatus)
  319. }
  320. logger.Info("Event 分析结束")
  321. return
  322. }
  323. func (a *AlertManager) replaceExpressionPlaceholders(e *EventHandler) string {
  324. e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{app_alias}}", fmt.Sprintf("'%s'", e.RP.AppAlias), -1)
  325. e.RP.RuleExpression = strings.Replace(e.RP.RuleExpression, "{{check_interval}}", fmt.Sprintf("%d", e.RP.RuleInterval), -1)
  326. logger.Debug("rule expression: ", e.RP.RuleExpression)
  327. return e.RP.RuleExpression
  328. }
  329. func (a *AlertManager) chQueryHandler(e *EventHandler) error {
  330. switch e.RP.RuleValueType {
  331. case "row":
  332. var result interface{}
  333. if err := a.CH.Table(e.RP.RuleTable).
  334. Raw(a.replaceExpressionPlaceholders(e)).
  335. Row().Scan(&result); err != nil {
  336. logger.Error("rows query err: ", err.Error())
  337. return err
  338. }
  339. e.JR.RowResult = result
  340. logger.Debug("row result: ", e.JR.RowResult)
  341. case "rows":
  342. rows, err := a.CH.Table(e.RP.RuleTable).Raw(
  343. e.RP.RuleExpression,
  344. // e.RP.RuleInterval,
  345. e.AC.Interval,
  346. e.RP.AppAlias).Rows()
  347. if err != nil {
  348. logger.Error("rows query err: ", err.Error())
  349. return err
  350. }
  351. rlist := make([]map[string]interface{}, 0)
  352. defer rows.Close()
  353. for rows.Next() {
  354. r := make(map[string]interface{})
  355. if err := a.CH.ScanRows(rows, &r); err != nil {
  356. logger.Errorf("rows scan err: %s", err)
  357. return err
  358. }
  359. rlist = append(rlist, r)
  360. }
  361. e.JR.RowsResult = rlist
  362. logger.Debug("rows compare result: ", len(e.JR.RowsResult))
  363. default:
  364. var result *float64
  365. // var result interface{}
  366. if err := a.CH.Table(e.RP.RuleTable).
  367. Raw(a.replaceExpressionPlaceholders(e)).
  368. Row().Scan(&result); err != nil {
  369. logger.Error("default rows query err: ", err.Error())
  370. return err
  371. }
  372. e.JR.RowGaugeResult = *result
  373. logger.Debug("row result: ", e.JR.RowGaugeResult)
  374. }
  375. return nil
  376. }