145 lines
2.8 KiB
Go
145 lines
2.8 KiB
Go
// Package queue provides a priority queue for scan jobs.
|
|
// Jobs are ordered by tier priority (paid first) with FIFO tiebreaker.
|
|
package queue
|
|
|
|
import (
|
|
"container/heap"
|
|
"sync"
|
|
|
|
scanner "atcr.io/scanner"
|
|
)
|
|
|
|
// Priority levels: lower number = higher priority
|
|
const (
|
|
PriorityOwner = 0
|
|
PriorityQuartermaster = 1
|
|
PriorityBosun = 2
|
|
PriorityDeckhand = 3
|
|
)
|
|
|
|
// TierToPriority converts a tier name to a priority level
|
|
func TierToPriority(tier string) int {
|
|
switch tier {
|
|
case "owner":
|
|
return PriorityOwner
|
|
case "quartermaster":
|
|
return PriorityQuartermaster
|
|
case "bosun":
|
|
return PriorityBosun
|
|
default:
|
|
return PriorityDeckhand
|
|
}
|
|
}
|
|
|
|
// item wraps a ScanJob with priority metadata for the heap
|
|
type item struct {
|
|
job *scanner.ScanJob
|
|
priority int
|
|
seq int64 // FIFO tiebreaker
|
|
}
|
|
|
|
// priorityHeap implements heap.Interface
|
|
type priorityHeap []*item
|
|
|
|
func (h priorityHeap) Len() int { return len(h) }
|
|
|
|
func (h priorityHeap) Less(i, j int) bool {
|
|
if h[i].priority != h[j].priority {
|
|
return h[i].priority < h[j].priority
|
|
}
|
|
return h[i].seq < h[j].seq
|
|
}
|
|
|
|
func (h priorityHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h *priorityHeap) Push(x any) {
|
|
*h = append(*h, x.(*item))
|
|
}
|
|
|
|
func (h *priorityHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
it := old[n-1]
|
|
old[n-1] = nil
|
|
*h = old[:n-1]
|
|
return it
|
|
}
|
|
|
|
// JobQueue is a thread-safe priority queue for scan jobs.
|
|
// Workers block on Dequeue until a job is available.
|
|
type JobQueue struct {
|
|
mu sync.Mutex
|
|
cond *sync.Cond
|
|
h priorityHeap
|
|
maxSize int
|
|
closed bool
|
|
}
|
|
|
|
// NewJobQueue creates a new priority queue with the given max size
|
|
func NewJobQueue(maxSize int) *JobQueue {
|
|
q := &JobQueue{
|
|
h: make(priorityHeap, 0),
|
|
maxSize: maxSize,
|
|
}
|
|
q.cond = sync.NewCond(&q.mu)
|
|
heap.Init(&q.h)
|
|
return q
|
|
}
|
|
|
|
// Enqueue adds a job to the priority queue.
|
|
// Returns false if the queue is full.
|
|
func (q *JobQueue) Enqueue(job *scanner.ScanJob) bool {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
if q.closed {
|
|
return false
|
|
}
|
|
|
|
if q.h.Len() >= q.maxSize {
|
|
return false
|
|
}
|
|
|
|
heap.Push(&q.h, &item{
|
|
job: job,
|
|
priority: TierToPriority(job.Tier),
|
|
seq: job.Seq,
|
|
})
|
|
|
|
q.cond.Signal()
|
|
return true
|
|
}
|
|
|
|
// Dequeue blocks until a job is available and returns it.
|
|
// Returns nil if the queue is closed.
|
|
func (q *JobQueue) Dequeue() *scanner.ScanJob {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
for q.h.Len() == 0 && !q.closed {
|
|
q.cond.Wait()
|
|
}
|
|
|
|
if q.closed && q.h.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
it := heap.Pop(&q.h).(*item)
|
|
return it.job
|
|
}
|
|
|
|
// Len returns the number of jobs in the queue
|
|
func (q *JobQueue) Len() int {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
return q.h.Len()
|
|
}
|
|
|
|
// Close signals all waiting goroutines to stop
|
|
func (q *JobQueue) Close() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
q.closed = true
|
|
q.cond.Broadcast()
|
|
}
|