package mq import ( "context" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" ) type Queue struct { consumers storage.IMap[string, *consumer] tasks chan *QueuedTask // channel to hold tasks name string } func newQueue(name string, queueSize int) *Queue { return &Queue{ name: name, consumers: memory.New[string, *consumer](), tasks: make(chan *QueuedTask, queueSize), // buffer size for tasks } } func (b *Broker) NewQueue(qName string) *Queue { q, ok := b.queues.Get(qName) if ok { return q } q = newQueue(qName, b.opts.queueSize) b.queues.Set(qName, q) go b.dispatchWorker(q) return q } type QueueTask struct { ctx context.Context payload *Task priority int } type PriorityQueue []*QueueTask func (pq PriorityQueue) Len() int { return len(pq) } func (pq PriorityQueue) Less(i, j int) bool { return pq[i].priority > pq[j].priority } func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } func (pq *PriorityQueue) Push(x interface{}) { item := x.(*QueueTask) *pq = append(*pq, item) } func (pq *PriorityQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] *pq = old[0 : n-1] return item }