Files
yatm/executor/job.go
2023-10-12 22:54:07 +08:00

154 lines
3.4 KiB
Go

package executor
import (
"context"
"fmt"
"time"
"github.com/samuelncui/yatm/entity"
"gorm.io/gorm"
)
var (
ModelJob = &Job{}
ErrJobNotFound = fmt.Errorf("get job: job not found")
)
type Job struct {
ID int64 `gorm:"primaryKey;autoIncrement"`
Status entity.JobStatus
Priority int64
State *entity.JobState
CreateTime time.Time
UpdateTime time.Time `gorm:"index:idx_update_time"`
}
func (j *Job) BeforeUpdate(tx *gorm.DB) error {
j.UpdateTime = time.Now()
if j.CreateTime.IsZero() {
j.CreateTime = j.UpdateTime
}
return nil
}
func (e *Executor) DeleteJobs(ctx context.Context, ids ...int64) error {
jobs, err := e.MGetJob(ctx, ids...)
if err != nil {
return fmt.Errorf("mget jobs fail")
}
for _, job := range jobs {
job.Status = entity.JobStatus_DELETED
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
return fmt.Errorf("delete job write db fail, id= %d err= %w", job.ID, r.Error)
}
}
return nil
}
func (e *Executor) SaveJob(ctx context.Context, job *Job) (*Job, error) {
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
return nil, fmt.Errorf("save job fail, err= %w", r.Error)
}
return job, nil
}
func (e *Executor) MGetJob(ctx context.Context, ids ...int64) (map[int64]*Job, error) {
if len(ids) == 0 {
return map[int64]*Job{}, nil
}
jobs := make([]*Job, 0, len(ids))
if r := e.db.WithContext(ctx).Where("id IN (?)", ids).Find(&jobs); r.Error != nil {
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
}
result := make(map[int64]*Job, len(jobs))
for _, job := range jobs {
result[job.ID] = job
}
return result, nil
}
func (e *Executor) GetJob(ctx context.Context, id int64) (*Job, error) {
jobs, err := e.MGetJob(ctx, id)
if err != nil {
return nil, err
}
job, ok := jobs[id]
if !ok || job == nil {
return nil, ErrJobNotFound
}
return job, nil
}
// func (e *Executor) getNextJob(ctx context.Context) (*Job, error) {
// job := new(Job)
// if r := e.db.WithContext(ctx).
// Where("status = ?", entity.JobStatus_Pending).
// Order("priority DESC, create_time ASC").
// Limit(1).First(job); r.Error != nil {
// if errors.Is(r.Error, gorm.ErrRecordNotFound) {
// return nil, nil
// }
// return nil, r.Error
// }
// return job, nil
// }
func (e *Executor) ListJob(ctx context.Context, filter *entity.JobFilter) ([]*Job, error) {
db := e.db.WithContext(ctx)
if filter.Status != nil {
db = db.Where("status = ?", *filter.Status)
} else {
db = db.Where("status < ?", entity.JobStatusVisible)
}
if filter.Limit != nil {
db = db.Limit(int(*filter.Limit))
} else {
db = db.Limit(20)
}
if filter.Offset != nil {
db = db.Offset(int(*filter.Offset))
}
db = db.Order("create_time DESC")
jobs := make([]*Job, 0, 20)
if r := db.Find(&jobs); r.Error != nil {
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
}
return jobs, nil
}
func (e *Executor) ListRecentlyUpdateJob(ctx context.Context, filter *entity.JobRecentlyUpdateFilter) ([]*Job, error) {
db := e.db.WithContext(ctx)
if filter.UpdateSinceNs != nil {
db = db.Where("update_time > ?", time.Unix(0, *filter.UpdateSinceNs))
}
if filter.Limit != nil {
db = db.Limit(int(*filter.Limit))
} else {
db = db.Limit(20)
}
db = db.Order("update_time ASC")
jobs := make([]*Job, 0, 20)
if r := db.Find(&jobs); r.Error != nil {
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
}
return jobs, nil
}