Files

145 lines
2.8 KiB
Go

// Package queue provides a priority queue for scan jobs.
// Jobs are ordered by tier priority (paid first) with FIFO tiebreaker.
package queue
import (
"container/heap"
"sync"
scanner "atcr.io/scanner"
)
// Priority levels: lower number = higher priority
const (
PriorityOwner = 0
PriorityQuartermaster = 1
PriorityBosun = 2
PriorityDeckhand = 3
)
// TierToPriority converts a tier name to a priority level
func TierToPriority(tier string) int {
switch tier {
case "owner":
return PriorityOwner
case "quartermaster":
return PriorityQuartermaster
case "bosun":
return PriorityBosun
default:
return PriorityDeckhand
}
}
// item wraps a ScanJob with priority metadata for the heap
type item struct {
job *scanner.ScanJob
priority int
seq int64 // FIFO tiebreaker
}
// priorityHeap implements heap.Interface
type priorityHeap []*item
func (h priorityHeap) Len() int { return len(h) }
func (h priorityHeap) Less(i, j int) bool {
if h[i].priority != h[j].priority {
return h[i].priority < h[j].priority
}
return h[i].seq < h[j].seq
}
func (h priorityHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *priorityHeap) Push(x any) {
*h = append(*h, x.(*item))
}
func (h *priorityHeap) Pop() any {
old := *h
n := len(old)
it := old[n-1]
old[n-1] = nil
*h = old[:n-1]
return it
}
// JobQueue is a thread-safe priority queue for scan jobs.
// Workers block on Dequeue until a job is available.
type JobQueue struct {
mu sync.Mutex
cond *sync.Cond
h priorityHeap
maxSize int
closed bool
}
// NewJobQueue creates a new priority queue with the given max size
func NewJobQueue(maxSize int) *JobQueue {
q := &JobQueue{
h: make(priorityHeap, 0),
maxSize: maxSize,
}
q.cond = sync.NewCond(&q.mu)
heap.Init(&q.h)
return q
}
// Enqueue adds a job to the priority queue.
// Returns false if the queue is full.
func (q *JobQueue) Enqueue(job *scanner.ScanJob) bool {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return false
}
if q.h.Len() >= q.maxSize {
return false
}
heap.Push(&q.h, &item{
job: job,
priority: TierToPriority(job.Tier),
seq: job.Seq,
})
q.cond.Signal()
return true
}
// Dequeue blocks until a job is available and returns it.
// Returns nil if the queue is closed.
func (q *JobQueue) Dequeue() *scanner.ScanJob {
q.mu.Lock()
defer q.mu.Unlock()
for q.h.Len() == 0 && !q.closed {
q.cond.Wait()
}
if q.closed && q.h.Len() == 0 {
return nil
}
it := heap.Pop(&q.h).(*item)
return it.job
}
// Len returns the number of jobs in the queue
func (q *JobQueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.h.Len()
}
// Close signals all waiting goroutines to stop
func (q *JobQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.cond.Broadcast()
}