mirror of
https://github.com/samuelncui/yatm.git
synced 2025-12-23 06:15:22 +00:00
154 lines
3.4 KiB
Go
154 lines
3.4 KiB
Go
package executor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/samuelncui/yatm/entity"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var (
|
|
ModelJob = &Job{}
|
|
|
|
ErrJobNotFound = fmt.Errorf("get job: job not found")
|
|
)
|
|
|
|
type Job struct {
|
|
ID int64 `gorm:"primaryKey;autoIncrement"`
|
|
Status entity.JobStatus
|
|
Priority int64
|
|
State *entity.JobState
|
|
|
|
CreateTime time.Time
|
|
UpdateTime time.Time `gorm:"index:idx_update_time"`
|
|
}
|
|
|
|
func (j *Job) BeforeUpdate(tx *gorm.DB) error {
|
|
j.UpdateTime = time.Now()
|
|
if j.CreateTime.IsZero() {
|
|
j.CreateTime = j.UpdateTime
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Executor) DeleteJobs(ctx context.Context, ids ...int64) error {
|
|
jobs, err := e.MGetJob(ctx, ids...)
|
|
if err != nil {
|
|
return fmt.Errorf("mget jobs fail")
|
|
}
|
|
|
|
for _, job := range jobs {
|
|
job.Status = entity.JobStatus_DELETED
|
|
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
|
|
return fmt.Errorf("delete job write db fail, id= %d err= %w", job.ID, r.Error)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Executor) SaveJob(ctx context.Context, job *Job) (*Job, error) {
|
|
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
|
|
return nil, fmt.Errorf("save job fail, err= %w", r.Error)
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
func (e *Executor) MGetJob(ctx context.Context, ids ...int64) (map[int64]*Job, error) {
|
|
if len(ids) == 0 {
|
|
return map[int64]*Job{}, nil
|
|
}
|
|
|
|
jobs := make([]*Job, 0, len(ids))
|
|
if r := e.db.WithContext(ctx).Where("id IN (?)", ids).Find(&jobs); r.Error != nil {
|
|
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
|
|
}
|
|
|
|
result := make(map[int64]*Job, len(jobs))
|
|
for _, job := range jobs {
|
|
result[job.ID] = job
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (e *Executor) GetJob(ctx context.Context, id int64) (*Job, error) {
|
|
jobs, err := e.MGetJob(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
job, ok := jobs[id]
|
|
if !ok || job == nil {
|
|
return nil, ErrJobNotFound
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// func (e *Executor) getNextJob(ctx context.Context) (*Job, error) {
|
|
// job := new(Job)
|
|
// if r := e.db.WithContext(ctx).
|
|
// Where("status = ?", entity.JobStatus_Pending).
|
|
// Order("priority DESC, create_time ASC").
|
|
// Limit(1).First(job); r.Error != nil {
|
|
// if errors.Is(r.Error, gorm.ErrRecordNotFound) {
|
|
// return nil, nil
|
|
// }
|
|
// return nil, r.Error
|
|
// }
|
|
|
|
// return job, nil
|
|
// }
|
|
|
|
func (e *Executor) ListJob(ctx context.Context, filter *entity.JobFilter) ([]*Job, error) {
|
|
db := e.db.WithContext(ctx)
|
|
if filter.Status != nil {
|
|
db = db.Where("status = ?", *filter.Status)
|
|
} else {
|
|
db = db.Where("status < ?", entity.JobStatusVisible)
|
|
}
|
|
|
|
if filter.Limit != nil {
|
|
db = db.Limit(int(*filter.Limit))
|
|
} else {
|
|
db = db.Limit(20)
|
|
}
|
|
if filter.Offset != nil {
|
|
db = db.Offset(int(*filter.Offset))
|
|
}
|
|
|
|
db = db.Order("create_time DESC")
|
|
|
|
jobs := make([]*Job, 0, 20)
|
|
if r := db.Find(&jobs); r.Error != nil {
|
|
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
func (e *Executor) ListRecentlyUpdateJob(ctx context.Context, filter *entity.JobRecentlyUpdateFilter) ([]*Job, error) {
|
|
db := e.db.WithContext(ctx)
|
|
if filter.UpdateSinceNs != nil {
|
|
db = db.Where("update_time > ?", time.Unix(0, *filter.UpdateSinceNs))
|
|
}
|
|
|
|
if filter.Limit != nil {
|
|
db = db.Limit(int(*filter.Limit))
|
|
} else {
|
|
db = db.Limit(20)
|
|
}
|
|
|
|
db = db.Order("update_time ASC")
|
|
|
|
jobs := make([]*Job, 0, 20)
|
|
if r := db.Find(&jobs); r.Error != nil {
|
|
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|