code optimization

code optimization
This commit is contained in:
werben
2023-04-23 14:40:14 +08:00
parent 58f091fb98
commit a16344e4ce
5 changed files with 27 additions and 22 deletions

View File

@@ -35,6 +35,8 @@ func main() {
bus.Subscribe("testtopic", handler)
bus.Publish("testtopic", 100)
// Subscribers receive messages asynchronously,
// So delay for a while before unsubscribe
time.Sleep(time.Millisecond)
bus.Unsubscribe("testtopic", handler)
bus.Close()
@@ -66,6 +68,8 @@ func main() {
}(pipe)
wg.Wait()
// Subscribers receive messages asynchronously,
// So delay for a while before unsubscribe
time.Sleep(time.Millisecond)
pipe.Unsubscribe(handler1)
pipe.Unsubscribe(handler2)

View File

@@ -9,10 +9,12 @@ type err struct {
Code int
}
// String return the error's message
func (e err) String() string {
return e.Msg
}
// Error return the error's message
func (e err) Error() string {
return e.Msg
}

View File

@@ -146,6 +146,8 @@ func (e *EventBus) Unsubscribe(topic string, handler any) error {
}
// Subscribe subscribes to a topic, return error if handler is not a function.
// handler must be a function, and must have two parameters, the first parameter must be a string,
// the type of the handler's second parameter must be consistent with the type of the payload in Publish
func (e *EventBus) Subscribe(topic string, handler any) error {
typ := reflect.TypeOf(handler)
if typ.Kind() != reflect.Func {
@@ -168,6 +170,7 @@ func (e *EventBus) Subscribe(topic string, handler any) error {
}
// Publish trigger handlers defined for a topic. payload argument will be transferred to the handler.
// The type of the payload must correspond to the second parameter of the handler in Subscribe
func (e *EventBus) Publish(topic string, payload any) error {
ch, ok := e.channels.Load(topic)

View File

@@ -7,12 +7,10 @@ import (
"github.com/stretchr/testify/assert"
)
func sub1(topic string, val int) {
// fmt.Printf("sub1 topic:%s, val:%d\n", topic, val)
func busHandlerOne(topic string, val int) {
}
func sub2(topic string, val int) {
// fmt.Printf("sub2 topic:%s, val:%d\n", topic, val)
func busHandlerTwo(topic string, val int) {
}
func Test_newChannel(t *testing.T) {
@@ -31,10 +29,10 @@ func Test_channelSubscribe(t *testing.T) {
assert.NotNil(t, ch.channel)
assert.Equal(t, "test_topic", ch.topic)
err := ch.subscribe(sub1)
err := ch.subscribe(busHandlerOne)
assert.Nil(t, err)
ch.close()
err = ch.subscribe(sub2)
err = ch.subscribe(busHandlerTwo)
assert.Equal(t, ErrChannelClosed, err)
}
@@ -44,15 +42,15 @@ func Test_channelUnsubscribe(t *testing.T) {
assert.NotNil(t, ch.channel)
assert.Equal(t, "test_topic", ch.topic)
err := ch.subscribe(sub1)
err := ch.subscribe(busHandlerOne)
assert.Nil(t, err)
err = ch.unsubscribe(sub1)
err = ch.unsubscribe(busHandlerOne)
assert.Nil(t, err)
err = ch.subscribe(sub1)
err = ch.subscribe(busHandlerOne)
assert.Nil(t, err)
ch.close()
err = ch.subscribe(sub2)
err = ch.subscribe(busHandlerTwo)
assert.Equal(t, ErrChannelClosed, err)
}
@@ -62,7 +60,7 @@ func Test_channelPublish(t *testing.T) {
assert.NotNil(t, ch.channel)
assert.Equal(t, "test_topic", ch.topic)
ch.subscribe(sub1)
ch.subscribe(busHandlerOne)
time.Sleep(time.Millisecond)
go func() {

View File

@@ -8,12 +8,10 @@ import (
"github.com/stretchr/testify/assert"
)
func pipeSubOne(val int) {
// fmt.Printf("pipeSubOne:%d\n", val)
func pipeHandlerOne(val int) {
}
func pipeSubTwo(val int) {
// fmt.Printf("pipeSubTwo:%d\n", val)
func pipeHandlerTwo(val int) {
}
func Test_NewPipe(t *testing.T) {
@@ -30,10 +28,10 @@ func Test_PipeSubscribe(t *testing.T) {
assert.NotNil(t, p)
assert.NotNil(t, p.channel)
err := p.Subscribe(pipeSubOne)
err := p.Subscribe(pipeHandlerOne)
assert.Nil(t, err)
p.Close()
err = p.Subscribe(pipeSubTwo)
err = p.Subscribe(pipeHandlerTwo)
assert.Equal(t, ErrChannelClosed, err)
}
@@ -42,15 +40,15 @@ func Test_PipeUnsubscribe(t *testing.T) {
assert.NotNil(t, p)
assert.NotNil(t, p.channel)
err := p.Subscribe(pipeSubOne)
err := p.Subscribe(pipeHandlerOne)
assert.Nil(t, err)
err = p.Unsubscribe(pipeSubOne)
err = p.Unsubscribe(pipeHandlerOne)
assert.Nil(t, err)
err = p.Subscribe(pipeSubOne)
err = p.Subscribe(pipeHandlerOne)
assert.Nil(t, err)
p.Close()
err = p.Unsubscribe(pipeSubOne)
err = p.Unsubscribe(pipeHandlerOne)
assert.Equal(t, ErrChannelClosed, err)
}
@@ -59,7 +57,7 @@ func Test_PipePublish(t *testing.T) {
assert.NotNil(t, p)
assert.NotNil(t, p.channel)
err := p.Subscribe(pipeSubOne)
err := p.Subscribe(pipeHandlerOne)
time.Sleep(time.Millisecond)
var wg sync.WaitGroup