// Package queue provides a priority queue for scan jobs. // Jobs are ordered by tier priority (paid first) with FIFO tiebreaker. package queue import ( "container/heap" "sync" scanner "atcr.io/scanner" ) // Priority levels: lower number = higher priority const ( PriorityOwner = 0 PriorityQuartermaster = 1 PriorityBosun = 2 PriorityDeckhand = 3 ) // TierToPriority converts a tier name to a priority level func TierToPriority(tier string) int { switch tier { case "owner": return PriorityOwner case "quartermaster": return PriorityQuartermaster case "bosun": return PriorityBosun default: return PriorityDeckhand } } // item wraps a ScanJob with priority metadata for the heap type item struct { job *scanner.ScanJob priority int seq int64 // FIFO tiebreaker } // priorityHeap implements heap.Interface type priorityHeap []*item func (h priorityHeap) Len() int { return len(h) } func (h priorityHeap) Less(i, j int) bool { if h[i].priority != h[j].priority { return h[i].priority < h[j].priority } return h[i].seq < h[j].seq } func (h priorityHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *priorityHeap) Push(x any) { *h = append(*h, x.(*item)) } func (h *priorityHeap) Pop() any { old := *h n := len(old) it := old[n-1] old[n-1] = nil *h = old[:n-1] return it } // JobQueue is a thread-safe priority queue for scan jobs. // Workers block on Dequeue until a job is available. type JobQueue struct { mu sync.Mutex cond *sync.Cond h priorityHeap maxSize int closed bool } // NewJobQueue creates a new priority queue with the given max size func NewJobQueue(maxSize int) *JobQueue { q := &JobQueue{ h: make(priorityHeap, 0), maxSize: maxSize, } q.cond = sync.NewCond(&q.mu) heap.Init(&q.h) return q } // Enqueue adds a job to the priority queue. // Returns false if the queue is full. func (q *JobQueue) Enqueue(job *scanner.ScanJob) bool { q.mu.Lock() defer q.mu.Unlock() if q.closed { return false } if q.h.Len() >= q.maxSize { return false } heap.Push(&q.h, &item{ job: job, priority: TierToPriority(job.Tier), seq: job.Seq, }) q.cond.Signal() return true } // Dequeue blocks until a job is available and returns it. // Returns nil if the queue is closed. func (q *JobQueue) Dequeue() *scanner.ScanJob { q.mu.Lock() defer q.mu.Unlock() for q.h.Len() == 0 && !q.closed { q.cond.Wait() } if q.closed && q.h.Len() == 0 { return nil } it := heap.Pop(&q.h).(*item) return it.job } // Len returns the number of jobs in the queue func (q *JobQueue) Len() int { q.mu.Lock() defer q.mu.Unlock() return q.h.Len() } // Close signals all waiting goroutines to stop func (q *JobQueue) Close() { q.mu.Lock() defer q.mu.Unlock() q.closed = true q.cond.Broadcast() }