jobbase.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package jobs
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. models2 "git.cestong.com.cn/cecf/config-center-server/app/jobs/models"
  7. log "github.com/go-admin-team/go-admin-core/logger"
  8. "github.com/go-admin-team/go-admin-core/sdk"
  9. "gorm.io/gorm"
  10. "github.com/robfig/cron/v3"
  11. "github.com/go-admin-team/go-admin-core/sdk/pkg"
  12. "github.com/go-admin-team/go-admin-core/sdk/pkg/cronjob"
  13. )
  14. var timeFormat = "2006-01-02 15:04:05"
  15. var retryCount = 3
  16. var jobList map[string]JobsExec
  17. var lock sync.Mutex
  18. type JobCore struct {
  19. InvokeTarget string
  20. Name string
  21. JobId int
  22. EntryId int
  23. CronExpression string
  24. Args string
  25. }
  26. // 任务类型 http
  27. type HttpJob struct {
  28. JobCore
  29. }
  30. type ExecJob struct {
  31. JobCore
  32. }
  33. func (e *ExecJob) Run() {
  34. startTime := time.Now()
  35. var obj = jobList[e.InvokeTarget]
  36. if obj == nil {
  37. log.Warn("[Job] ExecJob Run job nil")
  38. return
  39. }
  40. err := CallExec(obj.(JobsExec), e.Args)
  41. if err != nil {
  42. // 如果失败暂停一段时间重试
  43. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  44. }
  45. // 结束时间
  46. endTime := time.Now()
  47. // 执行时间
  48. latencyTime := endTime.Sub(startTime)
  49. //TODO: 待完善部分
  50. //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
  51. //ws.SendAll(str)
  52. log.Info("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
  53. return
  54. }
  55. //http 任务接口
  56. func (h *HttpJob) Run() {
  57. startTime := time.Now()
  58. var count = 0
  59. var err error
  60. var str string
  61. /* 循环 */
  62. LOOP:
  63. if count < retryCount {
  64. /* 跳过迭代 */
  65. str, err = pkg.Get(h.InvokeTarget)
  66. if err != nil {
  67. // 如果失败暂停一段时间重试
  68. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  69. fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", (count+1)*5, str)
  70. time.Sleep(time.Duration(count+1) * 5 * time.Second)
  71. count = count + 1
  72. goto LOOP
  73. }
  74. }
  75. // 结束时间
  76. endTime := time.Now()
  77. // 执行时间
  78. latencyTime := endTime.Sub(startTime)
  79. //TODO: 待完善部分
  80. log.Infof("[Job] JobCore %s exec success , spend :%v", h.Name, latencyTime)
  81. return
  82. }
  83. // 初始化
  84. func Setup(dbs map[string]*gorm.DB) {
  85. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
  86. for k, db := range dbs {
  87. sdk.Runtime.SetCrontab(k, cronjob.NewWithSeconds())
  88. setup(k, db)
  89. }
  90. }
  91. func setup(key string, db *gorm.DB) {
  92. crontab := sdk.Runtime.GetCrontabKey(key)
  93. sysJob := models2.SysJob{}
  94. jobList := make([]models2.SysJob, 0)
  95. err := sysJob.GetList(db, &jobList)
  96. if err != nil {
  97. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
  98. }
  99. if len(jobList) == 0 {
  100. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
  101. }
  102. _, err = sysJob.RemoveAllEntryID(db)
  103. if err != nil {
  104. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
  105. }
  106. for i := 0; i < len(jobList); i++ {
  107. if jobList[i].JobType == 1 {
  108. j := &HttpJob{}
  109. j.InvokeTarget = jobList[i].InvokeTarget
  110. j.CronExpression = jobList[i].CronExpression
  111. j.JobId = jobList[i].JobId
  112. j.Name = jobList[i].JobName
  113. sysJob.EntryId, err = AddJob(crontab, j)
  114. } else if jobList[i].JobType == 2 {
  115. j := &ExecJob{}
  116. j.InvokeTarget = jobList[i].InvokeTarget
  117. j.CronExpression = jobList[i].CronExpression
  118. j.JobId = jobList[i].JobId
  119. j.Name = jobList[i].JobName
  120. j.Args = jobList[i].Args
  121. sysJob.EntryId, err = AddJob(crontab, j)
  122. }
  123. err = sysJob.Update(db, jobList[i].JobId)
  124. }
  125. // 其中任务
  126. crontab.Start()
  127. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
  128. // 关闭任务
  129. defer crontab.Stop()
  130. select {}
  131. }
  132. // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
  133. func AddJob(c *cron.Cron, job Job) (int, error) {
  134. if job == nil {
  135. fmt.Println("unknown")
  136. return 0, nil
  137. }
  138. return job.addJob(c)
  139. }
  140. func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
  141. id, err := c.AddJob(h.CronExpression, h)
  142. if err != nil {
  143. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  144. return 0, err
  145. }
  146. EntryId := int(id)
  147. return EntryId, nil
  148. }
  149. func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
  150. id, err := c.AddJob(h.CronExpression, h)
  151. if err != nil {
  152. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  153. return 0, err
  154. }
  155. EntryId := int(id)
  156. return EntryId, nil
  157. }
  158. // 移除任务
  159. func Remove(c *cron.Cron, entryID int) chan bool {
  160. ch := make(chan bool)
  161. go func() {
  162. c.Remove(cron.EntryID(entryID))
  163. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
  164. ch <- true
  165. }()
  166. return ch
  167. }
  168. // 任务停止
  169. //func Stop() chan bool {
  170. // ch := make(chan bool)
  171. // go func() {
  172. // global.GADMCron.Stop()
  173. // ch <- true
  174. // }()
  175. // return ch
  176. //}