package eventbus import ( "sync" "testing" "time" "github.com/stretchr/testify/assert" ) func busHandlerOne(topic string, val int) { } func busHandlerTwo(topic string, val int) { } func Test_newChannel(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) assert.NotNil(t, ch.stopCh) assert.NotNil(t, ch.handlers) ch.close() bufferedCh := newChannel("test_topic", 100) assert.NotNil(t, bufferedCh) assert.NotNil(t, bufferedCh.channel) assert.Equal(t, 100, cap(bufferedCh.channel)) assert.Equal(t, "test_topic", bufferedCh.topic) assert.NotNil(t, bufferedCh.stopCh) assert.NotNil(t, bufferedCh.handlers) bufferedCh.close() bufferedZeroCh := newChannel("test_topic", 0) assert.NotNil(t, bufferedZeroCh) assert.NotNil(t, bufferedZeroCh.channel) assert.Equal(t, "test_topic", bufferedZeroCh.topic) assert.NotNil(t, bufferedZeroCh.stopCh) assert.NotNil(t, bufferedZeroCh.handlers) bufferedZeroCh.close() } func Test_channelSubscribe(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) err := ch.subscribe(busHandlerOne) assert.Nil(t, err) ch.close() err = ch.subscribe(busHandlerTwo) assert.Equal(t, ErrChannelClosed, err) } func Test_channelUnsubscribe(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) err := ch.subscribe(busHandlerOne) assert.Nil(t, err) err = ch.unsubscribe(busHandlerOne) assert.Nil(t, err) err = ch.subscribe(busHandlerOne) assert.Nil(t, err) ch.close() err = ch.unsubscribe(busHandlerTwo) assert.Equal(t, ErrChannelClosed, err) } func Test_channelClose(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) err := ch.subscribe(busHandlerOne) assert.Nil(t, err) ch.close() assert.Equal(t, uint32(0), ch.handlers.Len()) ch.close() } func Test_channelPublish(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) ch.subscribe(busHandlerOne) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 100; i++ { err := ch.publish(i) assert.Nil(t, err) } wg.Done() }() wg.Wait() err := ch.publish(nil) assert.Nil(t, err) time.Sleep(time.Millisecond) ch.close() err = ch.publish(1) assert.Equal(t, ErrChannelClosed, err) } func Test_channelPublishSync(t *testing.T) { ch := newChannel("test_topic", -1) assert.NotNil(t, ch) assert.NotNil(t, ch.channel) assert.Equal(t, "test_topic", ch.topic) ch.subscribe(busHandlerOne) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 100; i++ { err := ch.publish(i) assert.Nil(t, err) } wg.Done() }() wg.Wait() err := ch.publishSync(nil) assert.Nil(t, err) time.Sleep(time.Millisecond) ch.close() err = ch.publishSync(1) assert.Equal(t, ErrChannelClosed, err) } func Test_New(t *testing.T) { bus := New() assert.NotNil(t, bus) assert.Equal(t, -1, bus.bufferSize) assert.NotNil(t, bus.channels) bus.Close() } func Test_NewBuffered(t *testing.T) { bus := NewBuffered(100) assert.NotNil(t, bus) assert.Equal(t, 100, bus.bufferSize) assert.NotNil(t, bus.channels) bus.Close() busZero := NewBuffered(0) assert.NotNil(t, busZero) assert.Equal(t, 1, busZero.bufferSize) assert.NotNil(t, busZero.channels) busZero.Close() } func Test_EventBusSubscribe(t *testing.T) { bus := New() assert.NotNil(t, bus) assert.Equal(t, -1, bus.bufferSize) assert.NotNil(t, bus.channels) err := bus.Subscribe("testtopic", busHandlerOne) assert.Nil(t, err) err = bus.Subscribe("testtopic", 1) assert.Equal(t, ErrHandlerIsNotFunc, err) err = bus.Subscribe("testtopic", func(topic string) error { return nil }) assert.Equal(t, ErrHandlerParamNum, err) err = bus.Subscribe("testtopic", func(topic int, payload int) error { return nil }) assert.Equal(t, ErrHandlerFirstParam, err) bus.Close() err = bus.Unsubscribe("testtopic", busHandlerTwo) assert.Equal(t, ErrChannelClosed, err) } func Test_EventBusUnsubscribe(t *testing.T) { bus := New() assert.NotNil(t, bus) assert.Equal(t, -1, bus.bufferSize) assert.NotNil(t, bus.channels) err := bus.Unsubscribe("testtopic", busHandlerOne) assert.Equal(t, ErrNoSubscriber, err) err = bus.Subscribe("testtopic", busHandlerOne) assert.Nil(t, err) err = bus.Unsubscribe("testtopic", busHandlerOne) assert.Nil(t, err) bus.Close() err = bus.Unsubscribe("testtopic", busHandlerTwo) assert.Equal(t, ErrChannelClosed, err) } func Test_EventBusPublish(t *testing.T) { bus := New() assert.NotNil(t, bus) assert.Equal(t, -1, bus.bufferSize) assert.NotNil(t, bus.channels) err := bus.Publish("testtopic", 1) assert.Nil(t, err) err = bus.Subscribe("testtopic", busHandlerOne) assert.Nil(t, err) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 100; i++ { err := bus.Publish("testtopic", i) assert.Nil(t, err) } wg.Done() }() wg.Wait() bus.Close() } func Test_EventBusPublishSync(t *testing.T) { bus := New() assert.NotNil(t, bus) assert.Equal(t, -1, bus.bufferSize) assert.NotNil(t, bus.channels) err := bus.PublishSync("testtopic", 1) assert.Nil(t, err) err = bus.Subscribe("testtopic", busHandlerOne) assert.Nil(t, err) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 100; i++ { err := bus.PublishSync("testtopic", i) assert.Nil(t, err) } wg.Done() }() wg.Wait() bus.Close() } func BenchmarkEventBusPublish(b *testing.B) { bus := New() bus.Subscribe("testtopic", busHandlerOne) b.ResetTimer() var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < b.N; i++ { bus.Publish("testtopic", i) } wg.Done() }() wg.Wait() bus.Close() } func BenchmarkEventBusPublishSync(b *testing.B) { bus := New() bus.Subscribe("testtopic", busHandlerOne) b.ResetTimer() var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < b.N; i++ { bus.PublishSync("testtopic", i) } wg.Done() }() wg.Wait() bus.Close() }