refactor(*) split for readability

Split event and instance handling code into their own modules.
This commit is contained in:
Javier Guerra
2019-11-18 10:31:25 -05:00
committed by Guilherme Salazar
parent 1af48ef618
commit 9c17f1c76f
4 changed files with 280 additions and 265 deletions

96
event.go Normal file
View File

@@ -0,0 +1,96 @@
package main
import (
"fmt"
"github.com/kong/go-pdk"
)
// Incoming data for a new event.
// TODO: add some relevant data to reduce number of callbacks.
type StartEventData struct {
InstanceId int // Instance ID to start the event
EventName string // event name (not handler method name)
// ....
}
type eventData struct {
id int // event id
instance *instanceData // plugin instance
ipc chan string // communication channel (TODO: use decoded structs)
pdk *pdk.PDK // go-pdk instance
}
// HandleEvent starts the call/{callback/response}*/finish cycle.
// More than one event can be run concurrenty for a single plugin instance,
// they all receive the same object instance, so should be careful if it's
// mutated or holds references to mutable data.
//
// RPC exported data
func (s PluginServer) HandleEvent(in StartEventData, out *StepData) error {
s.lock.RLock()
instance, ok := s.instances[in.InstanceId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", in.InstanceId)
}
h, ok := instance.handlers[in.EventName]
if !ok {
return fmt.Errorf("undefined method %s on plugin %s",
in.EventName, instance.plugin.name)
}
ipc := make(chan string)
event := eventData{
instance: instance,
ipc: ipc,
pdk: pdk.Init(ipc),
}
s.lock.Lock()
event.id = s.nextEventId
s.nextEventId++
s.events[event.id] = &event
s.lock.Unlock()
//log.Printf("Will launch goroutine for key %d / operation %s\n", key, op)
go func() {
_ = <-ipc
h(event.pdk)
ipc <- "ret"
s.lock.Lock()
delete(s.events, event.id)
s.lock.Unlock()
}()
*out = StepData{EventId: event.id, Data: "ok"}
return nil
}
// A callback's response/request.
// TODO: use decoded structure instead of a JSON string.
type StepData struct {
EventId int // event cycle to which this belongs
Data string // carried data
}
// Step carries a callback's anser back from Kong to the plugin,
// the return value is either a new callback request or a finish signal.
//
// RPC exported method
func (s PluginServer) Step(in StepData, out *StepData) error {
s.lock.RLock()
event, ok := s.events[in.EventId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No running event %d", in.EventId)
}
event.ipc <- in.Data
outStr := <-event.ipc
*out = StepData{Data: outStr} // TODO: decode outStr
return nil
}

141
instance.go Normal file
View File

@@ -0,0 +1,141 @@
package main
import (
"encoding/json"
"fmt"
"github.com/kong/go-pdk"
)
// --- instanceData --- //
type instanceData struct {
id int
plugin *pluginData
initialized bool
config interface{}
handlers map[string]func(kong *pdk.PDK)
}
type (
certificater interface{ Certificate(*pdk.PDK) }
rewriter interface{ Rewrite(*pdk.PDK) }
accesser interface{ Access(*pdk.PDK) }
headerFilter interface{ HeaderFilter(*pdk.PDK) }
bodyFilter interface{ BodyFilter(*pdk.PDK) }
prereader interface{ Preread(*pdk.PDK) }
logger interface{ Log(*pdk.PDK) }
)
func getHandlers(config interface{}) map[string]func(kong *pdk.PDK) {
handlers := map[string]func(kong *pdk.PDK){}
if h, ok := config.(certificater); ok { handlers["certificate"] = h.Certificate }
if h, ok := config.(rewriter) ; ok { handlers["rewrite"] = h.Rewrite }
if h, ok := config.(accesser) ; ok { handlers["access"] = h.Access }
if h, ok := config.(headerFilter); ok { handlers["header_filter"] = h.HeaderFilter }
if h, ok := config.(bodyFilter) ; ok { handlers["body_filter"] = h.BodyFilter }
if h, ok := config.(prereader) ; ok { handlers["preread"] = h.Preread }
if h, ok := config.(logger) ; ok { handlers["log"] = h.Log }
return handlers
}
// Configuration data for a new plugin instance.
type PluginConfig struct {
Name string // plugin name
Config []byte // configuration data, as a JSON string
}
// Current state of a plugin instance. TODO: add some statistics
type InstanceStatus struct {
Name string // plugin name
Id int // instance id
Config interface{} // configuration data, decoded
}
// StartInstance starts a plugin instance, as requred by configuration data. More than
// one instance can be started for a single plugin. If the configuration changes,
// a new instance should be started and the old one closed.
//
// RPC exported method
func (s *PluginServer) StartInstance(config PluginConfig, status *InstanceStatus) error {
plug, err := s.loadPlugin(config.Name)
if err != nil {
return err
}
instanceConfig := plug.constructor()
if err := json.Unmarshal(config.Config, instanceConfig); err != nil {
return fmt.Errorf("Decoding config: %w", err)
}
instance := instanceData{
plugin: plug,
config: instanceConfig,
handlers: getHandlers(instanceConfig),
}
s.lock.Lock()
instance.id = s.nextInstanceId
s.nextInstanceId++
s.instances[instance.id] = &instance
s.lock.Unlock()
*status = InstanceStatus{
Name: config.Name,
Id: instance.id,
Config: instance.config,
}
return nil
}
// InstanceStatus returns a given resource's status (the same given when started)
//
// RPC exported method
func (s PluginServer) InstanceStatus(id int, status *InstanceStatus) error {
s.lock.RLock()
instance, ok := s.instances[id]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", id)
}
*status = InstanceStatus{
Name: instance.plugin.name,
Id: instance.id,
Config: instance.config,
}
return nil
}
// CloseInstance is used when an instance shouldn't be used anymore.
// Doesn't kill any running event but the instance is no longer accesible,
// so it's not possible to start a new event with it and will be garbage
// collected after the last reference event finishes.
// Returns the status just before closing.
//
// RPC exported method
func (s PluginServer) CloseInstance(id int, status *InstanceStatus) error {
s.lock.RLock()
instance, ok := s.instances[id]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", id)
}
*status = InstanceStatus{
Name: instance.plugin.name,
Id: instance.id,
Config: instance.config,
}
// kill?
s.lock.Lock()
delete(s.instances, id)
s.lock.Unlock()
return nil
}

43
main.go Normal file
View File

@@ -0,0 +1,43 @@
// go-pluginserver is a standalone RPC server that runs
// Go plugins for Kong.
package main
import (
"flag"
"github.com/ugorji/go/codec"
"log"
"net"
"net/rpc"
)
var socket = flag.String("socket", "", "Socket to listen into")
func runServer(listener net.Listener) {
var handle codec.MsgpackHandle
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("accept(): %s", err)
return
}
rpcCodec := codec.MsgpackSpecRpc.ServerCodec(conn, &handle)
rpc.ServeCodec(rpcCodec)
}
}
func main() {
flag.Parse()
if *socket != "" {
listener, err := net.Listen("unix", *socket)
if err != nil {
log.Printf(`listen("%s"): %s`, socket, err)
return
}
rpc.RegisterName("plugin", newServer())
runServer(listener)
}
}

View File

@@ -1,16 +1,7 @@
// go-pluginserver is a standalone RPC server that runs
// Go plugins for Kong.
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/kong/go-pdk"
"github.com/ugorji/go/codec"
"log"
"net"
"net/rpc"
"path"
"plugin"
"reflect"
@@ -18,38 +9,6 @@ import (
"sync"
)
var socket = flag.String("socket", "", "Socket to listen into")
func runServer(listener net.Listener) {
var handle codec.MsgpackHandle
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("accept(): %s", err)
return
}
rpcCodec := codec.MsgpackSpecRpc.ServerCodec(conn, &handle)
rpc.ServeCodec(rpcCodec)
}
}
func main() {
flag.Parse()
if *socket != "" {
listener, err := net.Listen("unix", *socket)
if err != nil {
log.Printf(`listen("%s"): %s`, socket, err)
return
}
rpc.RegisterName("plugin", newServer())
runServer(listener)
}
}
// --- PluginServer --- //
// Holds the execution status of the plugin server.
@@ -241,227 +200,3 @@ func (s PluginServer) GetPluginInfo(name string, info *PluginInfo) error {
return nil
}
// --- instanceData --- //
type instanceData struct {
id int
plugin *pluginData
initialized bool
config interface{}
handlers map[string]func(kong *pdk.PDK)
}
type (
certificater interface{ Certificate(*pdk.PDK) }
rewriter interface{ Rewrite(*pdk.PDK) }
accesser interface{ Access(*pdk.PDK) }
headerFilter interface{ HeaderFilter(*pdk.PDK) }
bodyFilter interface{ BodyFilter(*pdk.PDK) }
prereader interface{ Preread(*pdk.PDK) }
logger interface{ Log(*pdk.PDK) }
)
func getHandlers(config interface{}) map[string]func(kong *pdk.PDK) {
handlers := map[string]func(kong *pdk.PDK){}
if h, ok := config.(certificater); ok { handlers["certificate"] = h.Certificate }
if h, ok := config.(rewriter) ; ok { handlers["rewrite"] = h.Rewrite }
if h, ok := config.(accesser) ; ok { handlers["access"] = h.Access }
if h, ok := config.(headerFilter); ok { handlers["header_filter"] = h.HeaderFilter }
if h, ok := config.(bodyFilter) ; ok { handlers["body_filter"] = h.BodyFilter }
if h, ok := config.(prereader) ; ok { handlers["preread"] = h.Preread }
if h, ok := config.(logger) ; ok { handlers["log"] = h.Log }
return handlers
}
// Configuration data for a new plugin instance.
type PluginConfig struct {
Name string // plugin name
Config []byte // configuration data, as a JSON string
}
// Current state of a plugin instance. TODO: add some statistics
type InstanceStatus struct {
Name string // plugin name
Id int // instance id
Config interface{} // configuration data, decoded
}
// StartInstance starts a plugin instance, as requred by configuration data. More than
// one instance can be started for a single plugin. If the configuration changes,
// a new instance should be started and the old one closed.
//
// RPC exported method
func (s *PluginServer) StartInstance(config PluginConfig, status *InstanceStatus) error {
plug, err := s.loadPlugin(config.Name)
if err != nil {
return err
}
instanceConfig := plug.constructor()
if err := json.Unmarshal(config.Config, instanceConfig); err != nil {
return fmt.Errorf("Decoding config: %w", err)
}
instance := instanceData{
plugin: plug,
config: instanceConfig,
handlers: getHandlers(instanceConfig),
}
s.lock.Lock()
instance.id = s.nextInstanceId
s.nextInstanceId++
s.instances[instance.id] = &instance
s.lock.Unlock()
*status = InstanceStatus{
Name: config.Name,
Id: instance.id,
Config: instance.config,
}
return nil
}
// InstanceStatus returns a given resource's status (the same given when started)
//
// RPC exported method
func (s PluginServer) InstanceStatus(id int, status *InstanceStatus) error {
s.lock.RLock()
instance, ok := s.instances[id]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", id)
}
*status = InstanceStatus{
Name: instance.plugin.name,
Id: instance.id,
Config: instance.config,
}
return nil
}
// CloseInstance is used when an instance shouldn't be used anymore.
// Doesn't kill any running event but the instance is no longer accesible,
// so it's not possible to start a new event with it and will be garbage
// collected after the last reference event finishes.
// Returns the status just before closing.
//
// RPC exported method
func (s PluginServer) CloseInstance(id int, status *InstanceStatus) error {
s.lock.RLock()
instance, ok := s.instances[id]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", id)
}
*status = InstanceStatus{
Name: instance.plugin.name,
Id: instance.id,
Config: instance.config,
}
// kill?
s.lock.Lock()
delete(s.instances, id)
s.lock.Unlock()
return nil
}
// Incoming data for a new event.
// TODO: add some relevant data to reduce number of callbacks.
type StartEventData struct {
InstanceId int // Instance ID to start the event
EventName string // event name (not handler method name)
// ....
}
type eventData struct {
id int // event id
instance *instanceData // plugin instance
ipc chan string // communication channel (TODO: use decoded structs)
pdk *pdk.PDK // go-pdk instance
}
// HandleEvent starts the call/{callback/response}*/finish cycle.
// More than one event can be run concurrenty for a single plugin instance,
// they all receive the same object instance, so should be careful if it's
// mutated or holds references to mutable data.
//
// RPC exported data
func (s PluginServer) HandleEvent(in StartEventData, out *StepData) error {
s.lock.RLock()
instance, ok := s.instances[in.InstanceId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", in.InstanceId)
}
h, ok := instance.handlers[in.EventName]
if !ok {
return fmt.Errorf("undefined method %s on plugin %s",
in.EventName, instance.plugin.name)
}
ipc := make(chan string)
event := eventData{
instance: instance,
ipc: ipc,
pdk: pdk.Init(ipc),
}
s.lock.Lock()
event.id = s.nextEventId
s.nextEventId++
s.events[event.id] = &event
s.lock.Unlock()
//log.Printf("Will launch goroutine for key %d / operation %s\n", key, op)
go func() {
_ = <-ipc
h(event.pdk)
ipc <- "ret"
s.lock.Lock()
delete(s.events, event.id)
s.lock.Unlock()
}()
*out = StepData{EventId: event.id, Data: "ok"}
return nil
}
// A callback's response/request.
// TODO: use decoded structure instead of a JSON string.
type StepData struct {
EventId int // event cycle to which this belongs
Data string // carried data
}
// Step carries a callback's anser back from Kong to the plugin,
// the return value is either a new callback request or a finish signal.
//
// RPC exported method
func (s PluginServer) Step(in StepData, out *StepData) error {
s.lock.RLock()
event, ok := s.events[in.EventId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No running event %d", in.EventId)
}
event.ipc <- in.Data
outStr := <-event.ipc
*out = StepData{Data: outStr} // TODO: decode outStr
return nil
}