handler.go 9.1 KB


  1. package handler
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/go-admin-team/go-admin-core/logger"
  10. "github.com/go-admin-team/go-admin-core/sdk/config"
  11. "github.com/go-admin-team/go-admin-core/storage"
  12. "github.com/panjf2000/ants/v2"
  13. amodels "go-admin/app/admin/models"
  14. omodels "go-admin/app/observe/models"
  15. "go-admin/common/olap"
  16. extConfig "go-admin/config"
  17. "go-admin/utils"
  18. "gorm.io/driver/mysql"
  19. "gorm.io/gorm"
  20. )
  21. var (
  22. dbInstance *gorm.DB
  23. chInstance *gorm.DB
  24. alertManagerInstance *AlertManager
  25. )
  26. const (
  27. INACTIVE = 1 << iota
  28. PENDING
  29. FIRING
  30. )
  31. // type EventsAction interface {
  32. // Judge() *EventsAction
  33. // PointCompare() *EventsAction
  34. // JudgeTriggerHz() *EventsAction
  35. // JudgeInterval(*gorm.DB) *EventsAction
  36. // SetUID() *EventsAction
  37. // CreateEventRecord(*gorm.DB) *EventsAction
  38. // }
  39. /*
  40. ## rules结构
  41. {
  42. "rule_name": "业务健康评分(近5分钟)"
  43. "rule_alias": "apdex::health:5m"
  44. "rule_group": "apdex"
  45. "project": ""
  46. "rule_body": models.OtMonitorRules
  47. "policy_body": xxxx // TODO:
  48. "create_time": time.Time
  49. }
  50. ## alert_policy结构
  51. @@ float数值类型
  52. {
  53. "condition":"> = < ",
  54. "point":7.0,
  55. "tigger_hz": 3, //次,最小为1
  56. "interval": 5 //min,选0,表示立即触发
  57. }
  58. @@ 关键字判断类型
  59. {
  60. "condition":"=",
  61. "point":"我是错误关键字", //float、string、int类型
  62. "tigger_hz": 3, //次,最小为1
  63. "interval": 5 //min,选0,表示立即触发
  64. }
  65. @@ 次数判断
  66. {
  67. "condition":"=",
  68. "point":2, //float、string、int类型
  69. "tigger_hz": 3, //次,最小为1
  70. "interval": 5 //min,选0,表示立即触发
  71. }
  72. ## 告警结构
  73. {
  74. "alert_name": "", // Policy中定义这个name模版
  75. "rule_alias": "", // apdex::health:5m
  76. "project": "Project1", // 所属app_alias
  77. "alert_msg": "", // Policy中定义的 msg模版
  78. "labels": {
  79. "env": "prod",
  80. "service": "service1"
  81. },
  82. "alert_body": "xxxxx",
  83. "is_done": false,
  84. "is_know": false
  85. }
  86. */
  87. func GetDBInstance() *gorm.DB {
  88. if dbInstance == nil {
  89. db, err := gorm.Open(mysql.Open(config.DatabaseConfig.Source), &gorm.Config{})
  90. if err != nil {
  91. logger.Debug("datasource: ", config.DatabaseConfig.Source)
  92. logger.Fatal("create db instance err: ", err)
  93. }
  94. dbInstance = db
  95. }
  96. return dbInstance
  97. }
  98. func GetCHInstance() *gorm.DB {
  99. if chInstance == nil {
  100. chInstance = olap.GetClickhouseOrm()
  101. }
  102. return chInstance
  103. }
  104. func GetPrometheusInstance() string {
  105. fmt.Println(extConfig.ExtConfig.Prometheus.Address)
  106. return extConfig.ExtConfig.Prometheus.Address
  107. }
  108. func GetApps() ([]amodels.OtApps, error) {
  109. appList := make([]amodels.OtApps, 0)
  110. if err := GetDBInstance().Find(&appList).Error; err != nil {
  111. logger.Error("get app list err: ", err)
  112. return nil, err
  113. }
  114. logger.Debug("apps: ", appList)
  115. return appList, nil
  116. }
  117. func SetAlertManager() {
  118. ruleChanLength, _ := strconv.Atoi(utils.GetDefaultEnv("RULECHANLENGTH", 200))
  119. interval, _ := strconv.Atoi(utils.GetDefaultEnv("INTERVAL", 30))
  120. runtimes, _ := strconv.Atoi(utils.GetDefaultEnv("GORUNTIMES", 200))
  121. retry, _ := strconv.Atoi(utils.GetDefaultEnv("MAXRETRY", 0))
  122. am := new(AlertManager)
  123. am.RulesChan = make(chan omodels.OtRulesPolicy, ruleChanLength) //TODO: 配置
  124. am.Interval = interval
  125. am.Runtime = runtimes
  126. am.MaxRetry = retry
  127. // cacheAdapter, err := config.CacheConfig.Setup()
  128. // if err != nil {
  129. // log.Fatal("set redis Error")
  130. // }
  131. am.DB = GetDBInstance()
  132. am.CH = GetCHInstance()
  133. am.PromAddr = GetPrometheusInstance()
  134. am.PromMap = new(sync.Map)
  135. // am.Redis = cacheAdapter
  136. p, _ := ants.NewPoolWithFunc(am.Runtime, func(i interface{}) {
  137. am.consumerHandler(i)
  138. })
  139. am.P = p
  140. alertManagerInstance = am
  141. }
  142. func GetAlertManager() *AlertManager {
  143. if alertManagerInstance == nil {
  144. SetAlertManager()
  145. }
  146. return alertManagerInstance
  147. }
  148. func CloseAlertManager() {
  149. if alertManagerInstance != nil {
  150. close(alertManagerInstance.RulesChan)
  151. alertManagerInstance.P.ReleaseTimeout(3 * time.Second)
  152. }
  153. return
  154. }
  155. type AlertManager struct {
  156. RulesChan chan omodels.OtRulesPolicy
  157. P *ants.PoolWithFunc
  158. DB *gorm.DB
  159. CH *gorm.DB
  160. PromAddr string
  161. Redis storage.AdapterCache
  162. Interval int //TODO: 设置配置项
  163. Runtime int //Goroutine 数量
  164. MaxRetry int //重试次数
  165. PromMap *sync.Map
  166. }
  167. func (a *AlertManager) PolicyProducter() {
  168. rules, err := a.GetAppsAlertPolicy()
  169. if err != nil {
  170. logger.Errorf("Get alert policy err: ", err.Error())
  171. return
  172. }
  173. for _, v := range rules {
  174. logger.Debug("product policy: ", v)
  175. v.CreateTime = time.Now()
  176. a.RulesChan <- v
  177. }
  178. }
  179. func (a *AlertManager) GetAppsAlertPolicy() ([]omodels.OtRulesPolicy, error) {
  180. result := make([]omodels.OtRulesPolicy, 0)
  181. if err := a.DB.Raw(`SELECT ap.*, a.name AS app_name, a.alias AS app_alias,
  182. mr.name AS rule_name, mr.monitor_alias AS rule_monitor_alias,
  183. mr.kind AS rule_kind, mr.group AS rule_group, mr.data_source AS rule_data_source,
  184. mr.data_source AS rule_data_source, mr.table AS rule_table,
  185. mr.value_type AS rule_value_type, mr.expression AS rule_expression,
  186. mr.interval AS rule_interval, mr.verify AS rule_verify, mr.power AS rule_power
  187. FROM ot_alert_policy ap
  188. LEFT JOIN ot_apps a ON ap.app_id = a.id
  189. LEFT JOIN ot_monitor_rules mr ON ap.rule_id = mr.id
  190. where app_id != 999999 and mr.power = 1 and mr.verify = 1 and ap.power = 1
  191. and ap.deleted_at is null and a.deleted_at is null and mr.deleted_at is null`).
  192. Scan(&result).Error; err != nil {
  193. log.Println("get app alert policy err: ", err)
  194. return nil, err
  195. }
  196. // log.Println("policy: ", result)
  197. return result, nil
  198. }
  199. func (a *AlertManager) GetMonitorRules() ([]amodels.OtMonitorRules, error) {
  200. monitorRules := make([]amodels.OtMonitorRules, 0)
  201. if err := a.DB.Find(&monitorRules).Where("verify = ?", 1).Where("power = ?", 1).Error; err != nil {
  202. logger.Error("get app list err: ", err)
  203. return nil, err
  204. }
  205. return monitorRules, nil
  206. }
  207. func (a *AlertManager) PolicyConsumer() {
  208. for r := range a.RulesChan {
  209. _ = a.P.Invoke(r)
  210. }
  211. }
  212. func (a *AlertManager) consumerHandler(i interface{}) {
  213. var err error
  214. c := 0
  215. eHandler := InitEventHandler(a.PromMap)
  216. rp, ok := i.(omodels.OtRulesPolicy)
  217. if ok {
  218. eHandler.RP = &rp
  219. } else {
  220. logger.Errorf("assert rp error")
  221. goto DONE
  222. }
  223. // logger.Debug("consumer: ", eHandler.RP)
  224. LOOP:
  225. if err = json.Unmarshal([]byte(eHandler.RP.Policy), eHandler.AC); err != nil {
  226. c += 1
  227. if c < a.MaxRetry {
  228. goto LOOP
  229. }
  230. goto DONE
  231. }
  232. logger.Debug("alert condition: ", eHandler.RP.Policy, eHandler.AC)
  233. logger.Debug("ch expression: ", eHandler.RP.RuleExpression)
  234. eHandler.RP.RuleValueType = "prometheus"
  235. if err = a.chQueryHandler(eHandler); err != nil {
  236. c += 1
  237. if c < a.MaxRetry {
  238. goto LOOP
  239. }
  240. goto DONE
  241. }
  242. // if len(eHandler.
  243. // JudgeRow(). //判断指标还是事件
  244. // SetUID(). //
  245. // PointCompare(). //判断指标与阈值比较
  246. // JudgeTriggerHz(). //判断触发频率
  247. // JudgeInterval(a.CH). //判断触发周期
  248. // CreateEventRecord(a.CH). //记录事件
  249. // CreateAlert(a.DB). //判断本次检测是否发出告警
  250. // Errs) > 0 {
  251. // c += 1
  252. // if c < a.MaxRetry {
  253. // goto LOOP
  254. // }
  255. // goto DONE
  256. // }
  257. if len(eHandler.
  258. JudgeRow(). //判断指标还是事件
  259. SetUID(). //
  260. RegisterPrometheusGauge(). //注册指标metric guage collector
  261. SetPromKV(). //暴露指标 TODO:
  262. Errs) > 0 {
  263. c += 1
  264. if c < a.MaxRetry {
  265. goto LOOP
  266. }
  267. goto DONE
  268. }
  269. DONE:
  270. if len(eHandler.Errs) > 0 || err != nil {
  271. logger.Error("Event 分析异常: ")
  272. for k, v := range eHandler.Errs {
  273. logger.Errorf("%s err: %s", k, v)
  274. }
  275. // 本次分析异常,将任务发回或销毁//TODO: 记录
  276. // a.RulesChan <- ap
  277. return
  278. }
  279. if !eHandler.JR.IsException {
  280. logger.Info("分析结果: 无异常事件", eHandler.JR.AlertStatus)
  281. } else {
  282. logger.Warn("分析结果:发现异常事件", eHandler.JR.AlertStatus)
  283. }
  284. logger.Info("Event 分析结束")
  285. return
  286. }
  287. func (a *AlertManager) chQueryHandler(e *EventHandler) error {
  288. switch e.RP.RuleValueType {
  289. case "row":
  290. var result interface{}
  291. if err := a.CH.Table(e.RP.RuleTable).Raw(
  292. e.RP.RuleExpression,
  293. e.RP.RuleInterval,
  294. e.RP.AppAlias).Row().Scan(&result); err != nil {
  295. logger.Error("rows query err: ", err.Error())
  296. return err
  297. }
  298. e.JR.RowResult = result
  299. logger.Debug("row result: ", e.JR.RowResult)
  300. case "rows":
  301. rows, err := a.CH.Table(e.RP.RuleTable).Raw(
  302. e.RP.RuleExpression,
  303. // e.RP.RuleInterval,
  304. e.AC.Interval,
  305. e.RP.AppAlias).Rows()
  306. if err != nil {
  307. logger.Error("rows query err: ", err.Error())
  308. return err
  309. }
  310. rlist := make([]map[string]interface{}, 0)
  311. defer rows.Close()
  312. for rows.Next() {
  313. r := make(map[string]interface{})
  314. if err := a.CH.ScanRows(rows, &r); err != nil {
  315. logger.Errorf("rows scan err: %s", err)
  316. return err
  317. }
  318. rlist = append(rlist, r)
  319. }
  320. e.JR.RowsResult = rlist
  321. logger.Debug("rows compare result: ", len(e.JR.RowsResult))
  322. default:
  323. var result float64
  324. if err := a.CH.Table(e.RP.RuleTable).Raw(
  325. e.RP.RuleExpression,
  326. e.RP.RuleInterval,
  327. e.RP.AppAlias).Row().Scan(&result); err != nil {
  328. logger.Error("rows query err: ", err.Error())
  329. return err
  330. }
  331. e.JR.RowGaugeResult = result
  332. logger.Debug("row result: ", e.JR.RowGaugeResult)
  333. }
  334. return nil
  335. }