123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package jobs
- import (
- "fmt"
- "sync"
- "time"
- models2 "git.cestong.com.cn/cecf/config-center-server/app/jobs/models"
- log "github.com/go-admin-team/go-admin-core/logger"
- "github.com/go-admin-team/go-admin-core/sdk"
- "gorm.io/gorm"
- "github.com/robfig/cron/v3"
- "github.com/go-admin-team/go-admin-core/sdk/pkg"
- "github.com/go-admin-team/go-admin-core/sdk/pkg/cronjob"
- )
- var timeFormat = "2006-01-02 15:04:05"
- var retryCount = 3
- var jobList map[string]JobsExec
- var lock sync.Mutex
- type JobCore struct {
- InvokeTarget string
- Name string
- JobId int
- EntryId int
- CronExpression string
- Args string
- }
- // 任务类型 http
- type HttpJob struct {
- JobCore
- }
- type ExecJob struct {
- JobCore
- }
- func (e *ExecJob) Run() {
- startTime := time.Now()
- var obj = jobList[e.InvokeTarget]
- if obj == nil {
- log.Warn("[Job] ExecJob Run job nil")
- return
- }
- err := CallExec(obj.(JobsExec), e.Args)
- if err != nil {
- // 如果失败暂停一段时间重试
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
- }
- // 结束时间
- endTime := time.Now()
- // 执行时间
- latencyTime := endTime.Sub(startTime)
- //TODO: 待完善部分
- //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
- //ws.SendAll(str)
- log.Info("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
- return
- }
- //http 任务接口
- func (h *HttpJob) Run() {
- startTime := time.Now()
- var count = 0
- var err error
- var str string
- /* 循环 */
- LOOP:
- if count < retryCount {
- /* 跳过迭代 */
- str, err = pkg.Get(h.InvokeTarget)
- if err != nil {
- // 如果失败暂停一段时间重试
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
- fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", (count+1)*5, str)
- time.Sleep(time.Duration(count+1) * 5 * time.Second)
- count = count + 1
- goto LOOP
- }
- }
- // 结束时间
- endTime := time.Now()
- // 执行时间
- latencyTime := endTime.Sub(startTime)
- //TODO: 待完善部分
- log.Infof("[Job] JobCore %s exec success , spend :%v", h.Name, latencyTime)
- return
- }
- // 初始化
- func Setup(dbs map[string]*gorm.DB) {
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
- for k, db := range dbs {
- sdk.Runtime.SetCrontab(k, cronjob.NewWithSeconds())
- setup(k, db)
- }
- }
- func setup(key string, db *gorm.DB) {
- crontab := sdk.Runtime.GetCrontabKey(key)
- sysJob := models2.SysJob{}
- jobList := make([]models2.SysJob, 0)
- err := sysJob.GetList(db, &jobList)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
- }
- if len(jobList) == 0 {
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
- }
- _, err = sysJob.RemoveAllEntryID(db)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
- }
- for i := 0; i < len(jobList); i++ {
- if jobList[i].JobType == 1 {
- j := &HttpJob{}
- j.InvokeTarget = jobList[i].InvokeTarget
- j.CronExpression = jobList[i].CronExpression
- j.JobId = jobList[i].JobId
- j.Name = jobList[i].JobName
- sysJob.EntryId, err = AddJob(crontab, j)
- } else if jobList[i].JobType == 2 {
- j := &ExecJob{}
- j.InvokeTarget = jobList[i].InvokeTarget
- j.CronExpression = jobList[i].CronExpression
- j.JobId = jobList[i].JobId
- j.Name = jobList[i].JobName
- j.Args = jobList[i].Args
- sysJob.EntryId, err = AddJob(crontab, j)
- }
- err = sysJob.Update(db, jobList[i].JobId)
- }
- // 其中任务
- crontab.Start()
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
- // 关闭任务
- defer crontab.Stop()
- select {}
- }
- // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
- func AddJob(c *cron.Cron, job Job) (int, error) {
- if job == nil {
- fmt.Println("unknown")
- return 0, nil
- }
- return job.addJob(c)
- }
- func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
- id, err := c.AddJob(h.CronExpression, h)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
- return 0, err
- }
- EntryId := int(id)
- return EntryId, nil
- }
- func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
- id, err := c.AddJob(h.CronExpression, h)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
- return 0, err
- }
- EntryId := int(id)
- return EntryId, nil
- }
- // 移除任务
- func Remove(c *cron.Cron, entryID int) chan bool {
- ch := make(chan bool)
- go func() {
- c.Remove(cron.EntryID(entryID))
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
- ch <- true
- }()
- return ch
- }
- // 任务停止
- //func Stop() chan bool {
- // ch := make(chan bool)
- // go func() {
- // global.GADMCron.Stop()
- // ch <- true
- // }()
- // return ch
- //}
|