prototype for frontier

This commit is contained in:
singchia
2024-02-19 07:52:39 +08:00
parent 16cbb7780b
commit d37c5548d4
12 changed files with 143 additions and 14 deletions

View File

@@ -1,7 +1,63 @@
package main
import "fmt"
import (
"context"
"github.com/jumboframes/armorigo/sigaction"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/edgebound"
"github.com/singchia/frontier/pkg/exchange"
"github.com/singchia/frontier/pkg/mq"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/servicebound"
"github.com/singchia/go-timer/v2"
"k8s.io/klog/v2"
)
func main() {
fmt.Println("vim-go")
conf, err := config.Parse()
if err != nil {
klog.Errorf("parse flags err: %s", err)
return
}
// dao
dao, err := dao.NewDao(conf)
if err != nil {
klog.Errorf("new dao err: %s", err)
return
}
// mqm
mqm, err := mq.NewMQM(conf)
if err != nil {
klog.Errorf("new mq manager err: %s", err)
return
}
// exchange
exchange, err := exchange.NewExchange(conf)
if err != nil {
klog.Errorf("new exchange err: %s", err)
return
}
tmr := timer.NewTimer()
// servicebound
servicebound, err := servicebound.NewServicebound(conf, dao, nil, exchange, mqm, tmr)
if err != nil {
klog.Errorf("new servicebound err: %s", err)
return
}
servicebound.Serve()
// edgebound
edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr)
if err != nil {
klog.Errorf("new edgebound err: %s", err)
return
}
edgebound.Serve()
sig := sigaction.NewSignal()
sig.Wait(context.TODO())
edgebound.Close()
servicebound.Close()
}

View File

@@ -10,11 +10,16 @@ type Exchange interface {
// rpc, message and raw io to edge
ForwardToEdge(*Meta, geminio.End)
// stream to edge
StreamToEdge(geminio.Stream)
// TODO StreamToEdge(geminio.Stream)
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
StreamToService(geminio.Stream)
// TODO StreamToService(geminio.Stream)
// for exchange
AddEdgebound(Edgebound)
AddServicebound(Servicebound)
AddMQM(MQM)
}
// edge related
@@ -23,6 +28,9 @@ type Edgebound interface {
// for management
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
Serve()
Close() error
}
type EdgeInformer interface {
@@ -35,10 +43,13 @@ type EdgeInformer interface {
type Servicebound interface {
ListService() []geminio.End
// for management
GetServiceByName(service string) geminio.End
// TODO GetServiceByName(service string) geminio.End
GetServiceByRPC(rpc string) (geminio.End, error)
GetServiceByTopic(topic string) (geminio.End, error)
DelSerivces(service string) error
Serve()
Close() error
}
type ServiceInformer interface {

View File

@@ -99,7 +99,7 @@ type Configuration struct {
}
// Configuration accepts config file and command-line, and command-line is more privileged.
func ParseFlags() (*Configuration, error) {
func Parse() (*Configuration, error) {
var (
argConfigFile = pflag.String("config", "", "config file, default not configured")
argDaemonRLimitNofile = pflag.Int("daemon-rlimit-nofile", -1, "SetRLimit for number of file of this daemon, default: -1 means ignore")

View File

@@ -28,7 +28,7 @@ func TestParseFlags(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseFlags()
got, err := Parse()
if (err != nil) != tt.wantErr {
t.Errorf("ParseFlags() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@@ -15,7 +15,7 @@ func (em *edgeManager) acceptStream(stream geminio.Stream) {
em.streams.MSet(edgeID, streamID, stream)
// exchange to service
if em.exchange != nil {
em.exchange.StreamToService(stream)
// TODO em.exchange.StreamToService(stream)
}
}

View File

@@ -25,12 +25,18 @@ import (
"k8s.io/klog/v2"
)
func NewEdgebound(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer,
exchange api.Exchange, tmr timer.Timer) (api.Edgebound, error) {
return newEdgeManager(conf, dao, informer, exchange, tmr)
}
type edgeManager struct {
*delegate.UnimplementedDelegate
conf *config.Configuration
informer api.EdgeInformer
exchange api.Exchange
conf *config.Configuration
// edgeID allocator
idFactory id.IDFactory
shub *synchub.SyncHub
@@ -75,6 +81,7 @@ func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeI
// a simple unix timestamp incemental id factory
idFactory: id.DefaultIncIDCounter,
informer: informer,
exchange: exchange,
}
if !listen.TLS.Enable {
@@ -292,6 +299,10 @@ func (em *edgeManager) ListStreams(edgeID uint64) []geminio.Stream {
return utils.Slice2streams(all)
}
func (em *edgeManager) DelEdgeByID(edgeID uint64) error {
panic("TODO")
}
// Close all edges and manager
func (em *edgeManager) Close() error {
bypass := &em.conf.Edgebound.Bypass

View File

@@ -2,10 +2,36 @@ package exchange
import (
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/config"
)
type exchange struct {
conf *config.Configuration
Edgebound api.Edgebound
Servicebound api.Servicebound
MQM api.MQM
}
func NewExchange(conf *config.Configuration) (api.Exchange, error) {
return newExchange(conf)
}
func newExchange(conf *config.Configuration) (*exchange, error) {
exchange := &exchange{
conf: conf,
}
return exchange, nil
}
func (ex *exchange) AddEdgebound(edgebound api.Edgebound) {
ex.Edgebound = edgebound
}
func (ex *exchange) AddServicebound(servicebound api.Servicebound) {
ex.Servicebound = servicebound
}
func (ex *exchange) AddMQM(mqm api.MQM) {
ex.MQM = mqm
}

View File

@@ -103,6 +103,15 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) {
}()
}
func (ex *exchange) ForwardToService(end geminio.End) {
// raw
ex.forwardRawToService(end)
// message
ex.forwardMessageToService(end)
// rpc
ex.forwardRPCToService(end)
}
// raw io from edge, and forward to service
func (ex *exchange) forwardRawToService(end geminio.End) {
//drop the io, actually we won't be here

View File

@@ -18,7 +18,11 @@ type mqManager struct {
mqindex map[string]*uint64 // for round robin
}
func newMQManager(conf *config.Configuration) (api.MQM, error) {
func NewMQM(conf *config.Configuration) (api.MQM, error) {
return newMQManager(conf)
}
func newMQManager(conf *config.Configuration) (*mqManager, error) {
mqm := &mqManager{
mqs: make(map[string][]api.MQ),
conf: conf,

View File

@@ -16,7 +16,7 @@ func (sm *serviceManager) acceptStream(stream geminio.Stream) {
sm.streams.MSet(serviceID, streamID, stream)
// exchange to edge
if sm.exchange != nil {
sm.exchange.StreamToEdge(stream)
// TODO sm.exchange.StreamToEdge(stream)
}
}

View File

@@ -24,6 +24,11 @@ import (
"k8s.io/klog/v2"
)
func NewServicebound(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer,
exchange api.Exchange, mqm api.MQM, tmr timer.Timer) (api.Servicebound, error) {
return newServiceManager(conf, dao, informer, exchange, mqm, tmr)
}
type serviceManager struct {
*delegate.UnimplementedDelegate
conf *config.Configuration
@@ -52,7 +57,7 @@ type serviceManager struct {
}
func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer,
exchange api.Exchange, tmr timer.Timer) (*serviceManager, error) {
exchange api.Exchange, mqm api.MQM, tmr timer.Timer) (*serviceManager, error) {
listen := &conf.Servicebound.Listen
var (
ln net.Listener
@@ -73,6 +78,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.Se
idFactory: id.DefaultIncIDCounter,
informer: informer,
}
exchange.AddServicebound(sm)
if !listen.TLS.Enable {
if ln, err = net.Listen(network, addr); err != nil {
@@ -166,7 +172,9 @@ func (sm *serviceManager) handleConn(conn net.Conn) error {
// register topics claim of end
sm.remoteReceiveClaim(end.ClientID(), meta.Topics)
// add the end to MQM
if sm.mqm != nil {
sm.mqm.AddMQByEnd(meta.Topics, end)
}
// handle online event for end
if err = sm.online(end, meta); err != nil {
@@ -246,6 +254,10 @@ func (sm *serviceManager) CountServices() int {
return len(sm.services)
}
func (sm *serviceManager) DelSerivces(service string) error {
panic("TODO")
}
func (sm *serviceManager) ListStreams(serviceID uint64) []geminio.Stream {
all := sm.streams.MGetAll(serviceID)
return utils.Slice2streams(all)

View File

@@ -33,7 +33,7 @@ func TestServiceManager(t *testing.T) {
}
inf.wg.Add(2)
// service manager
sm, err := newServiceManager(conf, dao, inf, nil, timer.NewTimer())
sm, err := newServiceManager(conf, dao, inf, nil, nil, timer.NewTimer())
if err != nil {
t.Error(err)
return