eventhandler struct and rewrite of how async functions

This commit is contained in:
Bennett AH
2014-12-24 18:03:05 +00:00
parent 24341169ef
commit 55c8fc0b86
2 changed files with 185 additions and 57 deletions

View File

@@ -8,17 +8,23 @@ import (
// EventBus - box for handlers and callbacks.
type EventBus struct {
handlers map[string]reflect.Value
flagOnce map[string]bool
lock sync.Mutex
handlers map[string]*eventHandler
lock sync.Mutex // a lock for the map
wg sync.WaitGroup
}
type eventHandler struct {
callBack reflect.Value
flagOnce bool
async bool
transactional bool
sync.Mutex // lock for an event handler - useful for running async callbacks serially
}
// New returns new EventBus with empty handlers.
func New() *EventBus {
return &EventBus{
make(map[string]reflect.Value),
make(map[string]bool),
make(map[string]*eventHandler),
sync.Mutex{},
sync.WaitGroup{},
}
@@ -33,8 +39,22 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
v := reflect.ValueOf(fn)
bus.handlers[topic] = v
bus.flagOnce[topic] = false
bus.handlers[topic] = &eventHandler{
v, false, false, false, sync.Mutex{},
}
return nil
}
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
v := reflect.ValueOf(fn)
bus.handlers[topic] = &eventHandler{
v, false, true, transactional, sync.Mutex{},
}
return nil
}
@@ -47,8 +67,22 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
v := reflect.ValueOf(fn)
bus.handlers[topic] = v
bus.flagOnce[topic] = true
bus.handlers[topic] = &eventHandler{
v, true, false, false, sync.Mutex{},
}
return nil
}
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
v := reflect.ValueOf(fn)
bus.handlers[topic] = &eventHandler{
v, true, true, false, sync.Mutex{},
}
return nil
}
@@ -56,7 +90,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
func (bus *EventBus) HasCallback(topic string) bool {
bus.lock.Lock()
defer bus.lock.Unlock()
_, ok := bus.handlers[topic];
_, ok := bus.handlers[topic]
return ok
}
@@ -72,32 +106,51 @@ func (bus *EventBus) Unsubscribe(topic string) error {
return fmt.Errorf("topic %s doesn't exist", topic)
}
// PublishAsync executes callback defined for a topic asynchronously. Useful for slow callbacks.
// Any addional argument will be tranfered to the callback.
func (bus *EventBus) PublishAsync(topic string, args ...interface{}) {
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock()
if handler, ok := bus.handlers[topic]; ok {
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.doPublishAsync(handler, topic, args...)
}
} else {
bus.lock.Unlock()
}
}
func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
passedArguments := bus.setUpPublish(handler.flagOnce, topic, args...)
bus.lock.Unlock()
handler.callBack.Call(passedArguments)
}
func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {
bus.wg.Add(1)
go func() {
defer bus.wg.Done()
bus.Publish(topic, args...)
if handler.transactional {
handler.Lock()
defer handler.Unlock()
bus.doPublish(handler, topic, args...)
} else {
passedArguments := bus.setUpPublish(handler.flagOnce, topic, args...)
bus.lock.Unlock()
handler.callBack.Call(passedArguments)
}
}()
}
// Publish executes callback defined for a topic. Any addional argument will be tranfered to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock()
defer bus.lock.Unlock()
if handler, ok := bus.handlers[topic]; ok {
removeAfterExec, _ := bus.flagOnce[topic]
passedArguments := make([]reflect.Value, 0)
for _, arg := range args {
passedArguments = append(passedArguments, reflect.ValueOf(arg))
}
handler.Call(passedArguments)
if removeAfterExec {
delete(bus.handlers, topic)
bus.flagOnce[topic] = false
}
func (bus *EventBus) setUpPublish(removeAfterExec bool, topic string, args ...interface{}) []reflect.Value {
passedArguments := make([]reflect.Value, 0)
for _, arg := range args {
passedArguments = append(passedArguments, reflect.ValueOf(arg))
}
if removeAfterExec {
delete(bus.handlers, topic)
}
return passedArguments
}
// WaitAsync waits for all async callbacks to complete

View File

@@ -1,63 +1,138 @@
package EventBus
import "testing"
import (
"testing"
"time"
)
func TestNew(t *testing.T) {
bus := New();
bus := New()
if bus == nil {
t.Log("New EventBus not created!");
t.Fail();
t.Log("New EventBus not created!")
t.Fail()
}
}
func TestHasCallback(t *testing.T) {
bus := New();
bus.Subscribe("topic", func() {} );
bus := New()
bus.Subscribe("topic", func() {})
if bus.HasCallback("topic_topic") {
t.Fail();
t.Fail()
}
if !bus.HasCallback("topic") {
t.Fail();
t.Fail()
}
}
func TestSubscribe(t *testing.T) {
bus := New();
if bus.Subscribe("topic", func() {} ) != nil {
t.Fail();
bus := New()
if bus.Subscribe("topic", func() {}) != nil {
t.Fail()
}
if bus.Subscribe("topic", "String" ) == nil {
t.Fail();
if bus.Subscribe("topic", "String") == nil {
t.Fail()
}
}
func TestSubscribeOnce(t *testing.T) {
bus := New();
if bus.SubscribeOnce("topic", func() {} ) != nil {
t.Fail();
bus := New()
if bus.SubscribeOnce("topic", func() {}) != nil {
t.Fail()
}
if bus.SubscribeOnce("topic", "String" ) == nil {
t.Fail();
if bus.SubscribeOnce("topic", "String") == nil {
t.Fail()
}
}
func TestUnsubscribe(t *testing.T) {
bus := New();
bus.Subscribe("topic", func() {} );
bus := New()
bus.Subscribe("topic", func() {})
if bus.Unsubscribe("topic") != nil {
t.Fail();
t.Fail()
}
if bus.Unsubscribe("topic") == nil {
t.Fail();
t.Fail()
}
}
func TestPublish(t *testing.T) {
bus := New();
bus := New()
bus.Subscribe("topic", func(a int, b int) {
if (a != b) {
t.Fail();
}
} );
bus.Publish("topic", 10, 10);
if a != b {
t.Fail()
}
})
bus.Publish("topic", 10, 10)
}
func TestSubcribeOnceAsync(t *testing.T) {
results := make([]int, 0)
bus := New()
bus.SubscribeOnceAsync("topic", func(a int, out *[]int) {
*out = append(*out, a)
})
bus.Publish("topic", 10, &results)
bus.Publish("topic", 10, &results)
bus.WaitAsync()
if len(results) != 1 {
t.Fail()
}
if bus.HasCallback("topic") {
t.Fail()
}
}
func TestSubscribeAsyncTransactional(t *testing.T) {
results := make([]int, 0)
bus := New()
bus.SubscribeAsync("topic", func(a int, out *[]int, dur string) {
sleep, _ := time.ParseDuration(dur)
time.Sleep(sleep)
*out = append(*out, a)
}, true)
bus.Publish("topic", 1, &results, "1s")
bus.Publish("topic", 2, &results, "0s")
bus.WaitAsync()
if len(results) != 2 {
t.Fail()
}
if results[0] != 1 || results[1] != 2 {
t.Fail()
}
}
func TestSubscribeAsync(t *testing.T) {
results := make(chan int)
bus := New()
bus.SubscribeAsync("topic", func(a int, out chan<- int) {
out <- a
}, false)
bus.Publish("topic", 1, results)
bus.Publish("topic", 2, results)
numResults := 0
go func() {
for range results {
numResults++
}
}()
bus.WaitAsync()
if numResults != 2 {
t.Fail()
}
}