refac(remove handler): refactored remove handler logic and fixed concurrent map access error

Refactored removeHandler to avoid memory leaks when removing pointers from slice.
Also fixed concurrent map access when the flagOnce is set on a handler and it gets removed inside async Publish. Moved the removal into the synchronous method call.
This commit is contained in:
Michal Klimuk
2017-03-15 10:23:39 +01:00
parent 57e0dbb612
commit 4958b1ef4b

View File

@@ -45,7 +45,6 @@ type eventHandler struct {
flagOnce bool
async bool
transactional bool
called bool
sync.Mutex // lock for an event handler - useful for running async callbacks serially
}
@@ -74,7 +73,7 @@ func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHan
// Returns error if `fn` is not a function.
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, false, false, false, sync.Mutex{},
reflect.ValueOf(fn), false, false, false, sync.Mutex{},
})
}
@@ -84,7 +83,7 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, transactional, false, sync.Mutex{},
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{},
})
}
@@ -92,7 +91,7 @@ func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, false, false, false, sync.Mutex{},
reflect.ValueOf(fn), true, false, false, sync.Mutex{},
})
}
@@ -101,7 +100,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, true, false, false, sync.Mutex{},
reflect.ValueOf(fn), true, true, false, sync.Mutex{},
})
}
@@ -122,7 +121,7 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 {
bus.removeHandler(topic, reflect.ValueOf(handler))
bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler)))
return nil
}
return fmt.Errorf("topic %s doesn't exist", topic)
@@ -133,7 +132,10 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok {
for _, handler := range handlers {
for i, handler := range handlers {
if handler.flagOnce {
bus.removeHandler(topic, i)
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
@@ -149,13 +151,6 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
passedArguments := bus.setUpPublish(topic, args...)
if handler.flagOnce {
bus.removeHandler(topic, handler.callBack)
if handler.called {
return
}
}
handler.called = true
handler.callBack.Call(passedArguments)
}
@@ -167,6 +162,17 @@ func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ..
bus.doPublish(handler, topic, args...)
}
func (bus *EventBus) removeHandler(topic string, idx int) {
if _, ok := bus.handlers[topic]; !ok {
return
}
l := len(bus.handlers[topic])
copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
bus.handlers[topic][l-1] = nil // or the zero value of T
bus.handlers[topic] = bus.handlers[topic][:l-1]
}
func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
if _, ok := bus.handlers[topic]; ok {
for idx, handler := range bus.handlers[topic] {
@@ -178,13 +184,6 @@ func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
return -1
}
func (bus *EventBus) removeHandler(topic string, callback reflect.Value) {
i := bus.findHandlerIdx(topic, callback)
if i >= 0 {
bus.handlers[topic] = append(bus.handlers[topic][:i], bus.handlers[topic][i+1:]...)
}
}
func (bus *EventBus) setUpPublish(topic string, args ...interface{}) []reflect.Value {
passedArguments := make([]reflect.Value, 0)