Files
sponge/pkg/rabbitmq/subscriber_test.go
2024-12-19 13:31:29 +08:00

77 lines
1.6 KiB
Go

package rabbitmq
import (
"context"
"testing"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/go-dev-frame/sponge/pkg/utils"
)
func TestSubscriber(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err := runPublisher(ctx, testChannelName)
if err != nil {
t.Log(err)
return
}
err = runSubscriber(ctx, testChannelName, "fanout-queue-1")
if err != nil {
t.Log(err)
return
}
err = runSubscriber(ctx, testChannelName, "fanout-queue-2")
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func runSubscriber(ctx context.Context, channelName string, identifier string) error {
var subscriberErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
subscriberErr = err
return
}
s, err := NewSubscriber(channelName, identifier, connection, WithConsumerAutoAck(false))
if err != nil {
subscriberErr = err
return
}
s.Subscribe(ctx, handler)
})
return subscriberErr
}
func TestSubscriberErr(t *testing.T) {
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := NewSubscriber(testChannelName, "fanout-queue-1", &Connection{conn: &amqp.Connection{}})
if err != nil {
t.Log(err)
return
}
})
s := &Subscriber{&Consumer{connection: &Connection{conn: &amqp.Connection{}}, ch: &amqp.Channel{}}}
utils.SafeRun(context.Background(), func(ctx context.Context) {
s.Subscribe(context.Background(), handler)
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
s.Close()
})
}