Files
sponge/pkg/sasynq/client_test.go
2025-07-31 20:15:52 +08:00

114 lines
3.8 KiB
Go

package sasynq
import (
"fmt"
"log"
"math/rand"
"testing"
"time"
"github.com/hibiken/asynq"
)
func runProducer(client *Client) error {
userPayload1 := &EmailPayload{UserID: 101, Message: "Critical Update"}
_, info1, err := client.EnqueueNow(TypeEmailSend, userPayload1, WithQueue("critical"), WithMaxRetry(5), asynq.Retention(60*time.Second))
if err != nil {
return err
}
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info1.ID, info1.Queue)
userPayload2 := &SMSPayload{UserID: 202, Message: "Weekly Newsletter"}
_, info2, err := client.EnqueueIn(time.Second*5, TypeSMSSend, userPayload2, WithQueue("default"), WithMaxRetry(3), asynq.Retention(60*time.Second))
if err != nil {
return err
}
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeSMSSend, info2.ID, info2.Queue)
cancelTask("default", info2.ID, true) // cancel task will succeed
userPayload3 := &MsgNotificationPayload{UserID: 303, Message: "Promotional Offer"}
_, info3, err := client.EnqueueAt(time.Now().Add(time.Second*10), TypeMsgNotification, userPayload3, WithQueue("low"), WithMaxRetry(1), asynq.Retention(60*time.Second))
if err != nil {
return err
}
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeMsgNotification, info3.ID, info3.Queue)
cancelTask("low", info3.ID, true) // cancel task will succeed
userPayload4 := &UniqueTaskPayload{UserID: 404, Message: "unique task"}
_, info4, err := client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
if err != nil {
return err
}
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeUniqueTask, info4.ID, info4.Queue)
_, _, err = client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
if err != nil {
log.Printf("triggered duplicate task error:%v", err)
}
// Equivalent EnqueueNow function
userPayload5 := &EmailPayload{UserID: 505, Message: "Important Notification"}
task, err := NewTask(TypeEmailSend, userPayload5)
if err != nil {
return err
}
info5, err := client.Enqueue(task, WithQueue("low"), WithMaxRetry(3), WithDeadline(time.Now().Add(time.Second*15)), WithTaskID(fmt.Sprintf("unique-%d", rand.Int63n(1e10))), asynq.Retention(60*time.Second))
if err != nil {
return err
}
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info5.ID, info5.Queue)
return nil
}
func cancelTask(queue string, taskID string, isScheduled bool) {
fmt.Println()
defer fmt.Println()
time.Sleep(time.Second)
inspector := NewInspector(getRedisConfig())
info, err := inspector.GetTaskInfo(queue, taskID)
if err != nil {
log.Printf("get task info failed: %s, queue=%s, taskID=%s", err, queue, taskID)
return
}
log.Printf("task status: type=%s, id=%s queue=%s, status=%s", info.Type, info.ID, info.Queue, info.State.String())
if info.State == asynq.TaskStateCompleted {
return
}
time.Sleep(time.Millisecond * 100)
if isScheduled {
err = inspector.CancelTask(queue, info.ID)
} else {
err = inspector.CancelTask("", info.ID) // queue is empty string for non-scheduled tasks
}
if err != nil {
log.Printf("cancel task failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
return
}
log.Printf("cancel task succeeded: type=%s, id=%s queue=%s", info.Type, info.ID, info.Queue)
time.Sleep(time.Millisecond * 100)
info2, err := inspector.GetTaskInfo(queue, info.ID)
if err != nil {
log.Printf("get task info after cancel failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
return
}
log.Printf("get task status after cancel: type=%s, id=%s queue=%s, status=%s", info2.Type, info2.ID, info2.Queue, info2.State.String())
}
func TestProducer(t *testing.T) {
client := NewClient(getRedisConfig())
err := runProducer(client)
if err != nil {
t.Log("run producer failed:", err)
return
}
defer client.Close()
log.Println("all tasks enqueued")
}