mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-17 07:11:30 +00:00
299 lines
7.0 KiB
Go
299 lines
7.0 KiB
Go
package plugin
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// JobQueue manages job queueing with priority, deduplication, retry and history
|
|
type JobQueue struct {
|
|
mu sync.RWMutex
|
|
priorityQueue *PriorityQueue
|
|
seenJobs map[string]bool // For deduplication
|
|
jobHistory []*ExecutionRecord
|
|
maxHistorySize int
|
|
deduplicationTTL time.Duration
|
|
lastSeenJob map[string]time.Time
|
|
}
|
|
|
|
// PriorityQueue implements heap.Interface for job ordering
|
|
type PriorityQueue []*Job
|
|
|
|
func (pq PriorityQueue) Len() int { return len(pq) }
|
|
|
|
func (pq PriorityQueue) Less(i, j int) bool {
|
|
// Higher priority jobs come first
|
|
if pq[i].Type != pq[j].Type {
|
|
return pq[i].Type < pq[j].Type
|
|
}
|
|
// If same type, earlier creation time comes first
|
|
return pq[i].CreatedAt.Before(pq[j].CreatedAt)
|
|
}
|
|
|
|
func (pq PriorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
}
|
|
|
|
func (pq *PriorityQueue) Push(x interface{}) {
|
|
*pq = append(*pq, x.(*Job))
|
|
}
|
|
|
|
func (pq *PriorityQueue) Pop() interface{} {
|
|
old := *pq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*pq = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// NewJobQueue creates a new job queue
|
|
func NewJobQueue(maxHistorySize int, deduplicationTTL time.Duration) *JobQueue {
|
|
jq := &JobQueue{
|
|
priorityQueue: &PriorityQueue{},
|
|
seenJobs: make(map[string]bool),
|
|
jobHistory: make([]*ExecutionRecord, 0, maxHistorySize),
|
|
maxHistorySize: maxHistorySize,
|
|
deduplicationTTL: deduplicationTTL,
|
|
lastSeenJob: make(map[string]time.Time),
|
|
}
|
|
heap.Init(jq.priorityQueue)
|
|
return jq
|
|
}
|
|
|
|
// Enqueue adds a job to the queue with deduplication
|
|
func (jq *JobQueue) Enqueue(job *Job) error {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
// Check for duplicates within TTL window
|
|
if lastSeen, exists := jq.lastSeenJob[job.ID]; exists {
|
|
if time.Since(lastSeen) < jq.deduplicationTTL {
|
|
return fmt.Errorf("job %s already enqueued recently", job.ID)
|
|
}
|
|
}
|
|
|
|
job.SetState(JobStatePending)
|
|
heap.Push(jq.priorityQueue, job)
|
|
jq.seenJobs[job.ID] = true
|
|
jq.lastSeenJob[job.ID] = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Dequeue retrieves the next job from the queue
|
|
func (jq *JobQueue) Dequeue() *Job {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
if jq.priorityQueue.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
job := heap.Pop(jq.priorityQueue).(*Job)
|
|
return job
|
|
}
|
|
|
|
// Peek returns the next job without removing it
|
|
func (jq *JobQueue) Peek() *Job {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
if jq.priorityQueue.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
return (*jq.priorityQueue)[0]
|
|
}
|
|
|
|
// Size returns the current queue size
|
|
func (jq *JobQueue) Size() int {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
return jq.priorityQueue.Len()
|
|
}
|
|
|
|
// RecordExecution adds an execution record to history
|
|
func (jq *JobQueue) RecordExecution(record *ExecutionRecord) {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
jq.jobHistory = append(jq.jobHistory, record)
|
|
|
|
// Keep history size bounded
|
|
if len(jq.jobHistory) > jq.maxHistorySize {
|
|
// Remove oldest entries
|
|
removeCount := len(jq.jobHistory) - jq.maxHistorySize
|
|
jq.jobHistory = jq.jobHistory[removeCount:]
|
|
}
|
|
}
|
|
|
|
// GetHistory returns job execution history
|
|
func (jq *JobQueue) GetHistory(limit int) []*ExecutionRecord {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
if limit <= 0 || limit > len(jq.jobHistory) {
|
|
limit = len(jq.jobHistory)
|
|
}
|
|
|
|
// Return the most recent entries
|
|
startIdx := len(jq.jobHistory) - limit
|
|
if startIdx < 0 {
|
|
startIdx = 0
|
|
}
|
|
|
|
result := make([]*ExecutionRecord, limit)
|
|
copy(result, jq.jobHistory[startIdx:])
|
|
return result
|
|
}
|
|
|
|
// GetHistoryForPlugin returns history for a specific plugin
|
|
func (jq *JobQueue) GetHistoryForPlugin(pluginID string, limit int) []*ExecutionRecord {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
var result []*ExecutionRecord
|
|
for i := len(jq.jobHistory) - 1; i >= 0 && len(result) < limit; i-- {
|
|
if jq.jobHistory[i].PluginID == pluginID {
|
|
result = append(result, jq.jobHistory[i])
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetHistoryForJobType returns history for a specific job type
|
|
func (jq *JobQueue) GetHistoryForJobType(jobType string, limit int) []*ExecutionRecord {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
var result []*ExecutionRecord
|
|
for i := len(jq.jobHistory) - 1; i >= 0 && len(result) < limit; i-- {
|
|
if jq.jobHistory[i].JobType == jobType {
|
|
result = append(result, jq.jobHistory[i])
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ClearHistory removes all execution history
|
|
func (jq *JobQueue) ClearHistory() {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
jq.jobHistory = make([]*ExecutionRecord, 0, jq.maxHistorySize)
|
|
}
|
|
|
|
// PurgeOldHistory removes history entries older than the specified time
|
|
func (jq *JobQueue) PurgeOldHistory(beforeTime time.Time) int {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
removed := 0
|
|
newHistory := make([]*ExecutionRecord, 0)
|
|
|
|
for _, record := range jq.jobHistory {
|
|
if record.CreatedAt.After(beforeTime) {
|
|
newHistory = append(newHistory, record)
|
|
} else {
|
|
removed++
|
|
}
|
|
}
|
|
|
|
jq.jobHistory = newHistory
|
|
return removed
|
|
}
|
|
|
|
// HistorySize returns the number of records in history
|
|
func (jq *JobQueue) HistorySize() int {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
return len(jq.jobHistory)
|
|
}
|
|
|
|
// RetryJob re-enqueues a failed job up to maxRetries times
|
|
func (jq *JobQueue) RetryJob(job *Job, maxRetries int) error {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
if job.RetryCount >= maxRetries {
|
|
return fmt.Errorf("job %s exceeded max retries (%d)", job.ID, maxRetries)
|
|
}
|
|
|
|
job.RetryCount++
|
|
job.SetState(JobStatePending)
|
|
heap.Push(jq.priorityQueue, job)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetExecutionStats returns statistics about job executions
|
|
func (jq *JobQueue) GetExecutionStats() map[string]interface{} {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
completed := 0
|
|
failed := 0
|
|
totalExecutionTime := int64(0)
|
|
|
|
for _, record := range jq.jobHistory {
|
|
switch record.State {
|
|
case JobStateCompleted:
|
|
completed++
|
|
case JobStateFailed:
|
|
failed++
|
|
}
|
|
if record.CompletedAt != nil && record.StartedAt != nil {
|
|
totalExecutionTime += record.CompletedAt.Sub(*record.StartedAt).Milliseconds()
|
|
}
|
|
}
|
|
|
|
avgExecutionTime := int64(0)
|
|
if completed+failed > 0 {
|
|
avgExecutionTime = totalExecutionTime / int64(completed+failed)
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"total_history": len(jq.jobHistory),
|
|
"completed_jobs": completed,
|
|
"failed_jobs": failed,
|
|
"avg_execution_time_ms": avgExecutionTime,
|
|
"current_queue_size": jq.priorityQueue.Len(),
|
|
}
|
|
}
|
|
|
|
// GetQueuedJobs returns all jobs currently in the queue
|
|
func (jq *JobQueue) GetQueuedJobs() []*Job {
|
|
jq.mu.RLock()
|
|
defer jq.mu.RUnlock()
|
|
|
|
result := make([]*Job, len(*jq.priorityQueue))
|
|
copy(result, *jq.priorityQueue)
|
|
return result
|
|
}
|
|
|
|
// RemoveJob removes a specific job from the queue
|
|
func (jq *JobQueue) RemoveJob(jobID string) bool {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
for i, job := range *jq.priorityQueue {
|
|
if job.ID == jobID {
|
|
heap.Remove(jq.priorityQueue, i)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// PurgeQueuedJobs clears all pending jobs from the queue
|
|
func (jq *JobQueue) PurgeQueuedJobs() int {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
count := jq.priorityQueue.Len()
|
|
*jq.priorityQueue = PriorityQueue{}
|
|
heap.Init(jq.priorityQueue)
|
|
return count
|
|
}
|