From 9c17f1c76f22b1029d15d3c1c581bd75c47175bc Mon Sep 17 00:00:00 2001 From: Javier Guerra Date: Mon, 18 Nov 2019 10:31:25 -0500 Subject: [PATCH] refactor(*) split for readability Split event and instance handling code into their own modules. --- event.go | 96 ++++++++++++++++++ instance.go | 141 ++++++++++++++++++++++++++ main.go | 43 ++++++++ pluginserver.go | 265 ------------------------------------------------ 4 files changed, 280 insertions(+), 265 deletions(-) create mode 100644 event.go create mode 100644 instance.go create mode 100644 main.go diff --git a/event.go b/event.go new file mode 100644 index 0000000..c5cfe67 --- /dev/null +++ b/event.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "github.com/kong/go-pdk" +) + +// Incoming data for a new event. +// TODO: add some relevant data to reduce number of callbacks. +type StartEventData struct { + InstanceId int // Instance ID to start the event + EventName string // event name (not handler method name) + // .... +} + +type eventData struct { + id int // event id + instance *instanceData // plugin instance + ipc chan string // communication channel (TODO: use decoded structs) + pdk *pdk.PDK // go-pdk instance +} + +// HandleEvent starts the call/{callback/response}*/finish cycle. +// More than one event can be run concurrenty for a single plugin instance, +// they all receive the same object instance, so should be careful if it's +// mutated or holds references to mutable data. +// +// RPC exported data +func (s PluginServer) HandleEvent(in StartEventData, out *StepData) error { + s.lock.RLock() + instance, ok := s.instances[in.InstanceId] + s.lock.RUnlock() + if !ok { + return fmt.Errorf("No plugin instance %d", in.InstanceId) + } + + h, ok := instance.handlers[in.EventName] + if !ok { + return fmt.Errorf("undefined method %s on plugin %s", + in.EventName, instance.plugin.name) + } + + ipc := make(chan string) + + event := eventData{ + instance: instance, + ipc: ipc, + pdk: pdk.Init(ipc), + } + + s.lock.Lock() + event.id = s.nextEventId + s.nextEventId++ + s.events[event.id] = &event + s.lock.Unlock() + + //log.Printf("Will launch goroutine for key %d / operation %s\n", key, op) + go func() { + _ = <-ipc + h(event.pdk) + ipc <- "ret" + + s.lock.Lock() + delete(s.events, event.id) + s.lock.Unlock() + }() + + *out = StepData{EventId: event.id, Data: "ok"} + return nil +} + +// A callback's response/request. +// TODO: use decoded structure instead of a JSON string. +type StepData struct { + EventId int // event cycle to which this belongs + Data string // carried data +} + +// Step carries a callback's anser back from Kong to the plugin, +// the return value is either a new callback request or a finish signal. +// +// RPC exported method +func (s PluginServer) Step(in StepData, out *StepData) error { + s.lock.RLock() + event, ok := s.events[in.EventId] + s.lock.RUnlock() + if !ok { + return fmt.Errorf("No running event %d", in.EventId) + } + + event.ipc <- in.Data + outStr := <-event.ipc + *out = StepData{Data: outStr} // TODO: decode outStr + + return nil +} diff --git a/instance.go b/instance.go new file mode 100644 index 0000000..e283f0a --- /dev/null +++ b/instance.go @@ -0,0 +1,141 @@ +package main + +import ( + "encoding/json" + "fmt" + "github.com/kong/go-pdk" +) + +// --- instanceData --- // +type instanceData struct { + id int + plugin *pluginData + initialized bool + config interface{} + handlers map[string]func(kong *pdk.PDK) +} + +type ( + certificater interface{ Certificate(*pdk.PDK) } + rewriter interface{ Rewrite(*pdk.PDK) } + accesser interface{ Access(*pdk.PDK) } + headerFilter interface{ HeaderFilter(*pdk.PDK) } + bodyFilter interface{ BodyFilter(*pdk.PDK) } + prereader interface{ Preread(*pdk.PDK) } + logger interface{ Log(*pdk.PDK) } +) + +func getHandlers(config interface{}) map[string]func(kong *pdk.PDK) { + handlers := map[string]func(kong *pdk.PDK){} + + if h, ok := config.(certificater); ok { handlers["certificate"] = h.Certificate } + if h, ok := config.(rewriter) ; ok { handlers["rewrite"] = h.Rewrite } + if h, ok := config.(accesser) ; ok { handlers["access"] = h.Access } + if h, ok := config.(headerFilter); ok { handlers["header_filter"] = h.HeaderFilter } + if h, ok := config.(bodyFilter) ; ok { handlers["body_filter"] = h.BodyFilter } + if h, ok := config.(prereader) ; ok { handlers["preread"] = h.Preread } + if h, ok := config.(logger) ; ok { handlers["log"] = h.Log } + + return handlers +} + +// Configuration data for a new plugin instance. +type PluginConfig struct { + Name string // plugin name + Config []byte // configuration data, as a JSON string +} + +// Current state of a plugin instance. TODO: add some statistics +type InstanceStatus struct { + Name string // plugin name + Id int // instance id + Config interface{} // configuration data, decoded +} + +// StartInstance starts a plugin instance, as requred by configuration data. More than +// one instance can be started for a single plugin. If the configuration changes, +// a new instance should be started and the old one closed. +// +// RPC exported method +func (s *PluginServer) StartInstance(config PluginConfig, status *InstanceStatus) error { + plug, err := s.loadPlugin(config.Name) + if err != nil { + return err + } + + instanceConfig := plug.constructor() + + if err := json.Unmarshal(config.Config, instanceConfig); err != nil { + return fmt.Errorf("Decoding config: %w", err) + } + + instance := instanceData{ + plugin: plug, + config: instanceConfig, + handlers: getHandlers(instanceConfig), + } + + s.lock.Lock() + instance.id = s.nextInstanceId + s.nextInstanceId++ + s.instances[instance.id] = &instance + s.lock.Unlock() + + *status = InstanceStatus{ + Name: config.Name, + Id: instance.id, + Config: instance.config, + } + + return nil +} + +// InstanceStatus returns a given resource's status (the same given when started) +// +// RPC exported method +func (s PluginServer) InstanceStatus(id int, status *InstanceStatus) error { + s.lock.RLock() + instance, ok := s.instances[id] + s.lock.RUnlock() + if !ok { + return fmt.Errorf("No plugin instance %d", id) + } + + *status = InstanceStatus{ + Name: instance.plugin.name, + Id: instance.id, + Config: instance.config, + } + + return nil +} + +// CloseInstance is used when an instance shouldn't be used anymore. +// Doesn't kill any running event but the instance is no longer accesible, +// so it's not possible to start a new event with it and will be garbage +// collected after the last reference event finishes. +// Returns the status just before closing. +// +// RPC exported method +func (s PluginServer) CloseInstance(id int, status *InstanceStatus) error { + s.lock.RLock() + instance, ok := s.instances[id] + s.lock.RUnlock() + if !ok { + return fmt.Errorf("No plugin instance %d", id) + } + + *status = InstanceStatus{ + Name: instance.plugin.name, + Id: instance.id, + Config: instance.config, + } + + // kill? + + s.lock.Lock() + delete(s.instances, id) + s.lock.Unlock() + + return nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..fbfffa0 --- /dev/null +++ b/main.go @@ -0,0 +1,43 @@ +// go-pluginserver is a standalone RPC server that runs +// Go plugins for Kong. +package main + +import ( + "flag" + "github.com/ugorji/go/codec" + "log" + "net" + "net/rpc" +) + +var socket = flag.String("socket", "", "Socket to listen into") + +func runServer(listener net.Listener) { + var handle codec.MsgpackHandle + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("accept(): %s", err) + return + } + rpcCodec := codec.MsgpackSpecRpc.ServerCodec(conn, &handle) + rpc.ServeCodec(rpcCodec) + } +} + +func main() { + flag.Parse() + + if *socket != "" { + listener, err := net.Listen("unix", *socket) + if err != nil { + log.Printf(`listen("%s"): %s`, socket, err) + return + } + + rpc.RegisterName("plugin", newServer()) + + runServer(listener) + } +} diff --git a/pluginserver.go b/pluginserver.go index 76450ce..067370d 100644 --- a/pluginserver.go +++ b/pluginserver.go @@ -1,16 +1,7 @@ -// go-pluginserver is a standalone RPC server that runs -// Go plugins for Kong. package main import ( - "encoding/json" - "flag" "fmt" - "github.com/kong/go-pdk" - "github.com/ugorji/go/codec" - "log" - "net" - "net/rpc" "path" "plugin" "reflect" @@ -18,38 +9,6 @@ import ( "sync" ) -var socket = flag.String("socket", "", "Socket to listen into") - -func runServer(listener net.Listener) { - var handle codec.MsgpackHandle - - for { - conn, err := listener.Accept() - if err != nil { - log.Printf("accept(): %s", err) - return - } - rpcCodec := codec.MsgpackSpecRpc.ServerCodec(conn, &handle) - rpc.ServeCodec(rpcCodec) - } -} - -func main() { - flag.Parse() - - if *socket != "" { - listener, err := net.Listen("unix", *socket) - if err != nil { - log.Printf(`listen("%s"): %s`, socket, err) - return - } - - rpc.RegisterName("plugin", newServer()) - - runServer(listener) - } -} - // --- PluginServer --- // // Holds the execution status of the plugin server. @@ -241,227 +200,3 @@ func (s PluginServer) GetPluginInfo(name string, info *PluginInfo) error { return nil } - -// --- instanceData --- // -type instanceData struct { - id int - plugin *pluginData - initialized bool - config interface{} - handlers map[string]func(kong *pdk.PDK) -} - -type ( - certificater interface{ Certificate(*pdk.PDK) } - rewriter interface{ Rewrite(*pdk.PDK) } - accesser interface{ Access(*pdk.PDK) } - headerFilter interface{ HeaderFilter(*pdk.PDK) } - bodyFilter interface{ BodyFilter(*pdk.PDK) } - prereader interface{ Preread(*pdk.PDK) } - logger interface{ Log(*pdk.PDK) } -) - -func getHandlers(config interface{}) map[string]func(kong *pdk.PDK) { - handlers := map[string]func(kong *pdk.PDK){} - - if h, ok := config.(certificater); ok { handlers["certificate"] = h.Certificate } - if h, ok := config.(rewriter) ; ok { handlers["rewrite"] = h.Rewrite } - if h, ok := config.(accesser) ; ok { handlers["access"] = h.Access } - if h, ok := config.(headerFilter); ok { handlers["header_filter"] = h.HeaderFilter } - if h, ok := config.(bodyFilter) ; ok { handlers["body_filter"] = h.BodyFilter } - if h, ok := config.(prereader) ; ok { handlers["preread"] = h.Preread } - if h, ok := config.(logger) ; ok { handlers["log"] = h.Log } - - return handlers -} - -// Configuration data for a new plugin instance. -type PluginConfig struct { - Name string // plugin name - Config []byte // configuration data, as a JSON string -} - -// Current state of a plugin instance. TODO: add some statistics -type InstanceStatus struct { - Name string // plugin name - Id int // instance id - Config interface{} // configuration data, decoded -} - -// StartInstance starts a plugin instance, as requred by configuration data. More than -// one instance can be started for a single plugin. If the configuration changes, -// a new instance should be started and the old one closed. -// -// RPC exported method -func (s *PluginServer) StartInstance(config PluginConfig, status *InstanceStatus) error { - plug, err := s.loadPlugin(config.Name) - if err != nil { - return err - } - - instanceConfig := plug.constructor() - - if err := json.Unmarshal(config.Config, instanceConfig); err != nil { - return fmt.Errorf("Decoding config: %w", err) - } - - instance := instanceData{ - plugin: plug, - config: instanceConfig, - handlers: getHandlers(instanceConfig), - } - - s.lock.Lock() - instance.id = s.nextInstanceId - s.nextInstanceId++ - s.instances[instance.id] = &instance - s.lock.Unlock() - - *status = InstanceStatus{ - Name: config.Name, - Id: instance.id, - Config: instance.config, - } - - return nil -} - -// InstanceStatus returns a given resource's status (the same given when started) -// -// RPC exported method -func (s PluginServer) InstanceStatus(id int, status *InstanceStatus) error { - s.lock.RLock() - instance, ok := s.instances[id] - s.lock.RUnlock() - if !ok { - return fmt.Errorf("No plugin instance %d", id) - } - - *status = InstanceStatus{ - Name: instance.plugin.name, - Id: instance.id, - Config: instance.config, - } - - return nil -} - -// CloseInstance is used when an instance shouldn't be used anymore. -// Doesn't kill any running event but the instance is no longer accesible, -// so it's not possible to start a new event with it and will be garbage -// collected after the last reference event finishes. -// Returns the status just before closing. -// -// RPC exported method -func (s PluginServer) CloseInstance(id int, status *InstanceStatus) error { - s.lock.RLock() - instance, ok := s.instances[id] - s.lock.RUnlock() - if !ok { - return fmt.Errorf("No plugin instance %d", id) - } - - *status = InstanceStatus{ - Name: instance.plugin.name, - Id: instance.id, - Config: instance.config, - } - - // kill? - - s.lock.Lock() - delete(s.instances, id) - s.lock.Unlock() - - return nil -} - -// Incoming data for a new event. -// TODO: add some relevant data to reduce number of callbacks. -type StartEventData struct { - InstanceId int // Instance ID to start the event - EventName string // event name (not handler method name) - // .... -} - -type eventData struct { - id int // event id - instance *instanceData // plugin instance - ipc chan string // communication channel (TODO: use decoded structs) - pdk *pdk.PDK // go-pdk instance -} - -// HandleEvent starts the call/{callback/response}*/finish cycle. -// More than one event can be run concurrenty for a single plugin instance, -// they all receive the same object instance, so should be careful if it's -// mutated or holds references to mutable data. -// -// RPC exported data -func (s PluginServer) HandleEvent(in StartEventData, out *StepData) error { - s.lock.RLock() - instance, ok := s.instances[in.InstanceId] - s.lock.RUnlock() - if !ok { - return fmt.Errorf("No plugin instance %d", in.InstanceId) - } - - h, ok := instance.handlers[in.EventName] - if !ok { - return fmt.Errorf("undefined method %s on plugin %s", - in.EventName, instance.plugin.name) - } - - ipc := make(chan string) - - event := eventData{ - instance: instance, - ipc: ipc, - pdk: pdk.Init(ipc), - } - - s.lock.Lock() - event.id = s.nextEventId - s.nextEventId++ - s.events[event.id] = &event - s.lock.Unlock() - - //log.Printf("Will launch goroutine for key %d / operation %s\n", key, op) - go func() { - _ = <-ipc - h(event.pdk) - ipc <- "ret" - - s.lock.Lock() - delete(s.events, event.id) - s.lock.Unlock() - }() - - *out = StepData{EventId: event.id, Data: "ok"} - return nil -} - -// A callback's response/request. -// TODO: use decoded structure instead of a JSON string. -type StepData struct { - EventId int // event cycle to which this belongs - Data string // carried data -} - -// Step carries a callback's anser back from Kong to the plugin, -// the return value is either a new callback request or a finish signal. -// -// RPC exported method -func (s PluginServer) Step(in StepData, out *StepData) error { - s.lock.RLock() - event, ok := s.events[in.EventId] - s.lock.RUnlock() - if !ok { - return fmt.Errorf("No running event %d", in.EventId) - } - - event.ipc <- in.Data - outStr := <-event.ipc - *out = StepData{Data: outStr} // TODO: decode outStr - - return nil -}