mirror of
https://github.com/zhufuyi/sponge.git
synced 2025-10-04 08:26:24 +08:00
114 lines
3.8 KiB
Go
114 lines
3.8 KiB
Go
package sasynq
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
)
|
|
|
|
func runProducer(client *Client) error {
|
|
userPayload1 := &EmailPayload{UserID: 101, Message: "Critical Update"}
|
|
_, info1, err := client.EnqueueNow(TypeEmailSend, userPayload1, WithQueue("critical"), WithMaxRetry(5), asynq.Retention(60*time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info1.ID, info1.Queue)
|
|
|
|
userPayload2 := &SMSPayload{UserID: 202, Message: "Weekly Newsletter"}
|
|
_, info2, err := client.EnqueueIn(time.Second*5, TypeSMSSend, userPayload2, WithQueue("default"), WithMaxRetry(3), asynq.Retention(60*time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeSMSSend, info2.ID, info2.Queue)
|
|
cancelTask("default", info2.ID, true) // cancel task will succeed
|
|
|
|
userPayload3 := &MsgNotificationPayload{UserID: 303, Message: "Promotional Offer"}
|
|
_, info3, err := client.EnqueueAt(time.Now().Add(time.Second*10), TypeMsgNotification, userPayload3, WithQueue("low"), WithMaxRetry(1), asynq.Retention(60*time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeMsgNotification, info3.ID, info3.Queue)
|
|
cancelTask("low", info3.ID, true) // cancel task will succeed
|
|
|
|
userPayload4 := &UniqueTaskPayload{UserID: 404, Message: "unique task"}
|
|
_, info4, err := client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeUniqueTask, info4.ID, info4.Queue)
|
|
_, _, err = client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
|
|
if err != nil {
|
|
log.Printf("triggered duplicate task error:%v", err)
|
|
}
|
|
|
|
// Equivalent EnqueueNow function
|
|
userPayload5 := &EmailPayload{UserID: 505, Message: "Important Notification"}
|
|
task, err := NewTask(TypeEmailSend, userPayload5)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
info5, err := client.Enqueue(task, WithQueue("low"), WithMaxRetry(3), WithDeadline(time.Now().Add(time.Second*15)), WithTaskID(fmt.Sprintf("unique-%d", rand.Int63n(1e10))), asynq.Retention(60*time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info5.ID, info5.Queue)
|
|
|
|
return nil
|
|
}
|
|
|
|
func cancelTask(queue string, taskID string, isScheduled bool) {
|
|
fmt.Println()
|
|
defer fmt.Println()
|
|
time.Sleep(time.Second)
|
|
|
|
inspector := NewInspector(getRedisConfig())
|
|
|
|
info, err := inspector.GetTaskInfo(queue, taskID)
|
|
if err != nil {
|
|
log.Printf("get task info failed: %s, queue=%s, taskID=%s", err, queue, taskID)
|
|
return
|
|
}
|
|
log.Printf("task status: type=%s, id=%s queue=%s, status=%s", info.Type, info.ID, info.Queue, info.State.String())
|
|
if info.State == asynq.TaskStateCompleted {
|
|
return
|
|
}
|
|
|
|
time.Sleep(time.Millisecond * 100)
|
|
if isScheduled {
|
|
err = inspector.CancelTask(queue, info.ID)
|
|
} else {
|
|
err = inspector.CancelTask("", info.ID) // queue is empty string for non-scheduled tasks
|
|
}
|
|
|
|
if err != nil {
|
|
log.Printf("cancel task failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
|
|
return
|
|
}
|
|
log.Printf("cancel task succeeded: type=%s, id=%s queue=%s", info.Type, info.ID, info.Queue)
|
|
|
|
time.Sleep(time.Millisecond * 100)
|
|
info2, err := inspector.GetTaskInfo(queue, info.ID)
|
|
if err != nil {
|
|
log.Printf("get task info after cancel failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
|
|
return
|
|
}
|
|
log.Printf("get task status after cancel: type=%s, id=%s queue=%s, status=%s", info2.Type, info2.ID, info2.Queue, info2.State.String())
|
|
}
|
|
|
|
func TestProducer(t *testing.T) {
|
|
client := NewClient(getRedisConfig())
|
|
|
|
err := runProducer(client)
|
|
if err != nil {
|
|
t.Log("run producer failed:", err)
|
|
return
|
|
}
|
|
defer client.Close()
|
|
|
|
log.Println("all tasks enqueued")
|
|
}
|