Files
seaweedfs/weed/admin/plugin/job_queue.go

299 lines
7.0 KiB
Go

package plugin
import (
"container/heap"
"fmt"
"sync"
"time"
)
// JobQueue manages job queueing with priority, deduplication, retry and history
type JobQueue struct {
mu sync.RWMutex
priorityQueue *PriorityQueue
seenJobs map[string]bool // For deduplication
jobHistory []*ExecutionRecord
maxHistorySize int
deduplicationTTL time.Duration
lastSeenJob map[string]time.Time
}
// PriorityQueue implements heap.Interface for job ordering
type PriorityQueue []*Job
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// Higher priority jobs come first
if pq[i].Type != pq[j].Type {
return pq[i].Type < pq[j].Type
}
// If same type, earlier creation time comes first
return pq[i].CreatedAt.Before(pq[j].CreatedAt)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*Job))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// NewJobQueue creates a new job queue
func NewJobQueue(maxHistorySize int, deduplicationTTL time.Duration) *JobQueue {
jq := &JobQueue{
priorityQueue: &PriorityQueue{},
seenJobs: make(map[string]bool),
jobHistory: make([]*ExecutionRecord, 0, maxHistorySize),
maxHistorySize: maxHistorySize,
deduplicationTTL: deduplicationTTL,
lastSeenJob: make(map[string]time.Time),
}
heap.Init(jq.priorityQueue)
return jq
}
// Enqueue adds a job to the queue with deduplication
func (jq *JobQueue) Enqueue(job *Job) error {
jq.mu.Lock()
defer jq.mu.Unlock()
// Check for duplicates within TTL window
if lastSeen, exists := jq.lastSeenJob[job.ID]; exists {
if time.Since(lastSeen) < jq.deduplicationTTL {
return fmt.Errorf("job %s already enqueued recently", job.ID)
}
}
job.SetState(JobStatePending)
heap.Push(jq.priorityQueue, job)
jq.seenJobs[job.ID] = true
jq.lastSeenJob[job.ID] = time.Now()
return nil
}
// Dequeue retrieves the next job from the queue
func (jq *JobQueue) Dequeue() *Job {
jq.mu.Lock()
defer jq.mu.Unlock()
if jq.priorityQueue.Len() == 0 {
return nil
}
job := heap.Pop(jq.priorityQueue).(*Job)
return job
}
// Peek returns the next job without removing it
func (jq *JobQueue) Peek() *Job {
jq.mu.RLock()
defer jq.mu.RUnlock()
if jq.priorityQueue.Len() == 0 {
return nil
}
return (*jq.priorityQueue)[0]
}
// Size returns the current queue size
func (jq *JobQueue) Size() int {
jq.mu.RLock()
defer jq.mu.RUnlock()
return jq.priorityQueue.Len()
}
// RecordExecution adds an execution record to history
func (jq *JobQueue) RecordExecution(record *ExecutionRecord) {
jq.mu.Lock()
defer jq.mu.Unlock()
jq.jobHistory = append(jq.jobHistory, record)
// Keep history size bounded
if len(jq.jobHistory) > jq.maxHistorySize {
// Remove oldest entries
removeCount := len(jq.jobHistory) - jq.maxHistorySize
jq.jobHistory = jq.jobHistory[removeCount:]
}
}
// GetHistory returns job execution history
func (jq *JobQueue) GetHistory(limit int) []*ExecutionRecord {
jq.mu.RLock()
defer jq.mu.RUnlock()
if limit <= 0 || limit > len(jq.jobHistory) {
limit = len(jq.jobHistory)
}
// Return the most recent entries
startIdx := len(jq.jobHistory) - limit
if startIdx < 0 {
startIdx = 0
}
result := make([]*ExecutionRecord, limit)
copy(result, jq.jobHistory[startIdx:])
return result
}
// GetHistoryForPlugin returns history for a specific plugin
func (jq *JobQueue) GetHistoryForPlugin(pluginID string, limit int) []*ExecutionRecord {
jq.mu.RLock()
defer jq.mu.RUnlock()
var result []*ExecutionRecord
for i := len(jq.jobHistory) - 1; i >= 0 && len(result) < limit; i-- {
if jq.jobHistory[i].PluginID == pluginID {
result = append(result, jq.jobHistory[i])
}
}
return result
}
// GetHistoryForJobType returns history for a specific job type
func (jq *JobQueue) GetHistoryForJobType(jobType string, limit int) []*ExecutionRecord {
jq.mu.RLock()
defer jq.mu.RUnlock()
var result []*ExecutionRecord
for i := len(jq.jobHistory) - 1; i >= 0 && len(result) < limit; i-- {
if jq.jobHistory[i].JobType == jobType {
result = append(result, jq.jobHistory[i])
}
}
return result
}
// ClearHistory removes all execution history
func (jq *JobQueue) ClearHistory() {
jq.mu.Lock()
defer jq.mu.Unlock()
jq.jobHistory = make([]*ExecutionRecord, 0, jq.maxHistorySize)
}
// PurgeOldHistory removes history entries older than the specified time
func (jq *JobQueue) PurgeOldHistory(beforeTime time.Time) int {
jq.mu.Lock()
defer jq.mu.Unlock()
removed := 0
newHistory := make([]*ExecutionRecord, 0)
for _, record := range jq.jobHistory {
if record.CreatedAt.After(beforeTime) {
newHistory = append(newHistory, record)
} else {
removed++
}
}
jq.jobHistory = newHistory
return removed
}
// HistorySize returns the number of records in history
func (jq *JobQueue) HistorySize() int {
jq.mu.RLock()
defer jq.mu.RUnlock()
return len(jq.jobHistory)
}
// RetryJob re-enqueues a failed job up to maxRetries times
func (jq *JobQueue) RetryJob(job *Job, maxRetries int) error {
jq.mu.Lock()
defer jq.mu.Unlock()
if job.RetryCount >= maxRetries {
return fmt.Errorf("job %s exceeded max retries (%d)", job.ID, maxRetries)
}
job.RetryCount++
job.SetState(JobStatePending)
heap.Push(jq.priorityQueue, job)
return nil
}
// GetExecutionStats returns statistics about job executions
func (jq *JobQueue) GetExecutionStats() map[string]interface{} {
jq.mu.RLock()
defer jq.mu.RUnlock()
completed := 0
failed := 0
totalExecutionTime := int64(0)
for _, record := range jq.jobHistory {
switch record.State {
case JobStateCompleted:
completed++
case JobStateFailed:
failed++
}
if record.CompletedAt != nil && record.StartedAt != nil {
totalExecutionTime += record.CompletedAt.Sub(*record.StartedAt).Milliseconds()
}
}
avgExecutionTime := int64(0)
if completed+failed > 0 {
avgExecutionTime = totalExecutionTime / int64(completed+failed)
}
return map[string]interface{}{
"total_history": len(jq.jobHistory),
"completed_jobs": completed,
"failed_jobs": failed,
"avg_execution_time_ms": avgExecutionTime,
"current_queue_size": jq.priorityQueue.Len(),
}
}
// GetQueuedJobs returns all jobs currently in the queue
func (jq *JobQueue) GetQueuedJobs() []*Job {
jq.mu.RLock()
defer jq.mu.RUnlock()
result := make([]*Job, len(*jq.priorityQueue))
copy(result, *jq.priorityQueue)
return result
}
// RemoveJob removes a specific job from the queue
func (jq *JobQueue) RemoveJob(jobID string) bool {
jq.mu.Lock()
defer jq.mu.Unlock()
for i, job := range *jq.priorityQueue {
if job.ID == jobID {
heap.Remove(jq.priorityQueue, i)
return true
}
}
return false
}
// PurgeQueuedJobs clears all pending jobs from the queue
func (jq *JobQueue) PurgeQueuedJobs() int {
jq.mu.Lock()
defer jq.mu.Unlock()
count := jq.priorityQueue.Len()
*jq.priorityQueue = PriorityQueue{}
heap.Init(jq.priorityQueue)
return count
}