jobbase.go 5.1 KB


  1. package jobs
  2. import (
  3. "fmt"
  4. models2 "go-admin/app/jobs/models"
  5. "sync"
  6. "time"
  7. log "github.com/go-admin-team/go-admin-core/logger"
  8. "github.com/go-admin-team/go-admin-core/sdk"
  9. "gorm.io/gorm"
  10. "github.com/robfig/cron/v3"
  11. "github.com/go-admin-team/go-admin-core/sdk/pkg"
  12. "github.com/go-admin-team/go-admin-core/sdk/pkg/cronjob"
  13. )
  14. var timeFormat = "2006-01-02 15:04:05"
  15. var retryCount = 3
  16. var jobList map[string]JobsExec
  17. var lock sync.Mutex
  18. type JobCore struct {
  19. InvokeTarget string
  20. Name string
  21. JobId int
  22. EntryId int
  23. CronExpression string
  24. Args string
  25. }
  26. // 任务类型 http
  27. type HttpJob struct {
  28. JobCore
  29. db *gorm.DB
  30. }
  31. type ExecJob struct {
  32. JobCore
  33. }
  34. func (e *ExecJob) Run() {
  35. startTime := time.Now()
  36. var obj = jobList[e.InvokeTarget]
  37. if obj == nil {
  38. log.Warn("[Job] ExecJob Run job nil")
  39. return
  40. }
  41. err := CallExec(obj.(JobsExec), e.Args)
  42. if err != nil {
  43. // 如果失败暂停一段时间重试
  44. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  45. }
  46. // 结束时间
  47. endTime := time.Now()
  48. // 执行时间
  49. latencyTime := endTime.Sub(startTime)
  50. //TODO: 待完善部分
  51. //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
  52. //ws.SendAll(str)
  53. log.Info("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
  54. }
  55. // http 任务接口
  56. func (h *HttpJob) Run() {
  57. startTime := time.Now()
  58. var count = 0
  59. var err error
  60. var str string
  61. jobLog := models2.SysJobLog{
  62. JobID: int64(h.JobId),
  63. JobName: h.Name,
  64. InvokeTarget: h.InvokeTarget,
  65. CreatedAt: time.Now(),
  66. RetryNum: 0,
  67. }
  68. /* 循环 */
  69. LOOP:
  70. if count < retryCount {
  71. /* 跳过迭代 */
  72. str, err = pkg.Get(h.InvokeTarget)
  73. if err != nil {
  74. // 如果失败暂停一段时间重试
  75. time.Sleep(time.Duration(count+1) * 5 * time.Second)
  76. jobLog.RetryNum = int32(count)
  77. jobLog.Status = 2
  78. jobLog.Duration = int32(time.Since(startTime).Milliseconds())
  79. jobLog.CreatedAt = time.Now()
  80. jobLog.Returns = err.Error()
  81. jobLog.RetryNum = int32(count)
  82. jobLog.ID = 0
  83. h.db.Create(&jobLog)
  84. count = count + 1
  85. goto LOOP
  86. }
  87. }
  88. jobLog.Status = 1
  89. jobLog.RetryNum = int32(count)
  90. jobLog.Duration = int32(time.Since(startTime).Milliseconds())
  91. jobLog.Returns = str
  92. jobLog.RetryNum = int32(count)
  93. jobLog.CreatedAt = time.Now()
  94. jobLog.ID = 0
  95. h.db.Create(&jobLog)
  96. }
  97. // 初始化
  98. func Setup(dbs map[string]*gorm.DB) {
  99. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
  100. for k, db := range dbs {
  101. if k == "*" { // * 代表 默认的mysql库,所有sys_表都在改库
  102. sdk.Runtime.SetCrontab(k, cronjob.NewWithSeconds())
  103. setup(k, db)
  104. }
  105. }
  106. }
  107. func setup(key string, db *gorm.DB) {
  108. crontab := sdk.Runtime.GetCrontabKey(key)
  109. sysJob := models2.SysJob{}
  110. jobList := make([]models2.SysJob, 0)
  111. err := sysJob.GetList(db, &jobList)
  112. if err != nil {
  113. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
  114. }
  115. if len(jobList) == 0 {
  116. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
  117. }
  118. _, err = sysJob.RemoveAllEntryID(db)
  119. if err != nil {
  120. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
  121. }
  122. for i := 0; i < len(jobList); i++ {
  123. if jobList[i].JobType == 1 {
  124. j := &HttpJob{}
  125. j.InvokeTarget = jobList[i].InvokeTarget
  126. j.CronExpression = jobList[i].CronExpression
  127. j.JobId = jobList[i].JobId
  128. j.Name = jobList[i].JobName
  129. j.db = db
  130. sysJob.EntryId, err = AddJob(crontab, j)
  131. } else if jobList[i].JobType == 2 {
  132. j := &ExecJob{}
  133. j.InvokeTarget = jobList[i].InvokeTarget
  134. j.CronExpression = jobList[i].CronExpression
  135. j.JobId = jobList[i].JobId
  136. j.Name = jobList[i].JobName
  137. j.Args = jobList[i].Args
  138. sysJob.EntryId, err = AddJob(crontab, j)
  139. }
  140. err = sysJob.Update(db, jobList[i].JobId)
  141. }
  142. // 其中任务
  143. crontab.Start()
  144. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
  145. // 关闭任务
  146. defer crontab.Stop()
  147. select {}
  148. }
  149. // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
  150. func AddJob(c *cron.Cron, job Job) (int, error) {
  151. if job == nil {
  152. fmt.Println("unknown")
  153. return 0, nil
  154. }
  155. return job.addJob(c)
  156. }
  157. func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
  158. id, err := c.AddJob(h.CronExpression, h)
  159. if err != nil {
  160. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  161. return 0, err
  162. }
  163. EntryId := int(id)
  164. return EntryId, nil
  165. }
  166. func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
  167. id, err := c.AddJob(h.CronExpression, h)
  168. if err != nil {
  169. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  170. return 0, err
  171. }
  172. EntryId := int(id)
  173. return EntryId, nil
  174. }
  175. // 移除任务
  176. func Remove(c *cron.Cron, entryID int) chan bool {
  177. ch := make(chan bool)
  178. go func() {
  179. c.Remove(cron.EntryID(entryID))
  180. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
  181. ch <- true
  182. }()
  183. return ch
  184. }
  185. // 任务停止
  186. //func Stop() chan bool {
  187. // ch := make(chan bool)
  188. // go func() {
  189. // global.GADMCron.Stop()
  190. // ch <- true
  191. // }()
  192. // return ch
  193. //}