Implemented rlocking/runlocking of rwmutex in Channel IsActive and NumSubs receiver functions. Implemented PUBSUB NUMPAT handler unit test

This commit is contained in:
Kelvin Mwinuka
2024-03-20 12:36:03 +08:00
parent 852f23b36e
commit a19bfa8f73
3 changed files with 108 additions and 2 deletions

View File

@@ -617,6 +617,108 @@ func Test_HandlePubSubChannels(t *testing.T) {
}
}
func Test_HandleNumPat(t *testing.T) {}
func Test_HandleNumPat(t *testing.T) {
done := make(chan struct{})
go func() {
// Create separate mock server for this test
var port uint16 = 7591
pubsub = NewPubSub()
mockServer := server.NewServer(server.Opts{
PubSub: pubsub,
Commands: Commands(),
Config: utils.Config{
BindAddr: bindAddr,
Port: port,
DataDir: "",
EvictionPolicy: utils.NoEviction,
},
})
ctx := context.WithValue(context.Background(), "test_name", "PUBSUB NUMPAT")
patterns := []string{"pattern_[123]", "pattern_[456]", "pattern_[789]"}
connections := make([]struct {
w *net.Conn
r *resp.Conn
}, 3)
for i := 0; i < len(connections); i++ {
w, r := net.Pipe()
connections[i] = struct {
w *net.Conn
r *resp.Conn
}{w: &w, r: resp.NewConn(r)}
go func() {
if _, err := handleSubscribe(ctx, append([]string{"PSUBSCRIBE"}, patterns...), mockServer, &w); err != nil {
t.Error(err)
}
}()
for j := 0; j < len(patterns); j++ {
v, _, err := connections[i].r.ReadValue()
if err != nil {
t.Error(err)
}
arr := v.Array()
if !slices.ContainsFunc(patterns, func(s string) bool {
return s == arr[1].String()
}) {
t.Errorf("found unexpected pattern in response \"%s\"", arr[1].String())
}
}
}
verifyNumPatResponse := func(res []byte, expected int) {
rd := resp.NewReader(bytes.NewReader(res))
rv, _, err := rd.ReadValue()
if err != nil {
t.Error(err)
}
if rv.Integer() != expected {
t.Errorf("expected first NUMPAT response to be %d, got %d", expected, rv.Integer())
}
}
// Check that we receive all the patterns with NUMPAT commands
res, err := handlePubSubNumPat(ctx, []string{"PUBSUB", "NUMPAT"}, mockServer, nil)
if err != nil {
t.Error(err)
}
verifyNumPatResponse(res, len(patterns))
// Unsubscribe from a channel and check if the number of active channels is updated
for _, conn := range connections {
_, err = handleUnsubscribe(ctx, []string{"PUNSUBSCRIBE", patterns[0]}, mockServer, conn.w)
if err != nil {
t.Error(err)
}
}
res, err = handlePubSubNumPat(ctx, []string{"PUBSUB", "NUMPAT"}, mockServer, nil)
if err != nil {
t.Error(err)
}
verifyNumPatResponse(res, len(patterns)-1)
// Unsubscribe from all the channels and check if we get a 0 response
for _, conn := range connections {
_, err = handleUnsubscribe(ctx, []string{"PUNSUBSCRIBE"}, mockServer, conn.w)
if err != nil {
t.Error(err)
}
}
res, err = handlePubSubNumPat(ctx, []string{"PUBSUB", "NUMPAT"}, mockServer, nil)
if err != nil {
t.Error(err)
}
verifyNumPatResponse(res, 0)
done <- struct{}{}
}()
select {
case <-time.After(300 * time.Millisecond):
t.Error("timeout")
case <-done:
}
}
func Test_HandleNumSub(t *testing.T) {}