diff --git a/event_bus.go b/event_bus.go index b449c38..39a9462 100644 --- a/event_bus.go +++ b/event_bus.go @@ -45,7 +45,6 @@ type eventHandler struct { flagOnce bool async bool transactional bool - called bool sync.Mutex // lock for an event handler - useful for running async callbacks serially } @@ -74,7 +73,7 @@ func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHan // Returns error if `fn` is not a function. func (bus *EventBus) Subscribe(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), false, false, false, false, sync.Mutex{}, + reflect.ValueOf(fn), false, false, false, sync.Mutex{}, }) } @@ -84,7 +83,7 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error { // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), false, true, transactional, false, sync.Mutex{}, + reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, }) } @@ -92,7 +91,7 @@ func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), true, false, false, false, sync.Mutex{}, + reflect.ValueOf(fn), true, false, false, sync.Mutex{}, }) } @@ -101,7 +100,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), true, true, false, false, sync.Mutex{}, + reflect.ValueOf(fn), true, true, false, sync.Mutex{}, }) } @@ -122,7 +121,7 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { bus.lock.Lock() defer bus.lock.Unlock() if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 { - bus.removeHandler(topic, reflect.ValueOf(handler)) + bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler))) return nil } return fmt.Errorf("topic %s doesn't exist", topic) @@ -133,7 +132,10 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) { bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish defer bus.lock.Unlock() if handlers, ok := bus.handlers[topic]; ok { - for _, handler := range handlers { + for i, handler := range handlers { + if handler.flagOnce { + bus.removeHandler(topic, i) + } if !handler.async { bus.doPublish(handler, topic, args...) } else { @@ -149,13 +151,6 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) { func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) { passedArguments := bus.setUpPublish(topic, args...) - if handler.flagOnce { - bus.removeHandler(topic, handler.callBack) - if handler.called { - return - } - } - handler.called = true handler.callBack.Call(passedArguments) } @@ -167,6 +162,17 @@ func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args .. bus.doPublish(handler, topic, args...) } +func (bus *EventBus) removeHandler(topic string, idx int) { + if _, ok := bus.handlers[topic]; !ok { + return + } + l := len(bus.handlers[topic]) + + copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:]) + bus.handlers[topic][l-1] = nil // or the zero value of T + bus.handlers[topic] = bus.handlers[topic][:l-1] +} + func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int { if _, ok := bus.handlers[topic]; ok { for idx, handler := range bus.handlers[topic] { @@ -178,13 +184,6 @@ func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int { return -1 } -func (bus *EventBus) removeHandler(topic string, callback reflect.Value) { - i := bus.findHandlerIdx(topic, callback) - if i >= 0 { - bus.handlers[topic] = append(bus.handlers[topic][:i], bus.handlers[topic][i+1:]...) - } -} - func (bus *EventBus) setUpPublish(topic string, args ...interface{}) []reflect.Value { passedArguments := make([]reflect.Value, 0)