exchange: add api and defines

This commit is contained in:
singchia
2024-02-08 12:09:01 +08:00
parent b91ced2077
commit e295431482
8 changed files with 104 additions and 94 deletions

View File

@@ -6,7 +6,7 @@ import (
"encoding/json"
"strconv"
"github.com/singchia/frontier/pkg/proto"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"github.com/singchia/geminio/client"
"github.com/singchia/geminio/options"
@@ -32,7 +32,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
sopts.SetLog(sopt.logger)
}
// meta
meta := &proto.Meta{}
meta := &api.Meta{}
if sopt.topics != nil {
// we deliver topics in meta
meta.Topics = sopt.topics
@@ -74,7 +74,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE
func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error {
return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
on := &proto.OnEdgeOnline{}
on := &api.OnEdgeOnline{}
err := json.Unmarshal(req.Data(), on)
if err != nil {
// shouldn't be here
@@ -92,7 +92,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed
func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error {
return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
off := &proto.OnEdgeOffline{}
off := &api.OnEdgeOffline{}
err := json.Unmarshal(req.Data(), off)
if err != nil {
// shouldn't be here

7
pkg/api/error.go Normal file
View File

@@ -0,0 +1,7 @@
package api
import "errors"
var (
ErrEdgeNotOnline = errors.New("edge not online")
)

46
pkg/api/interface.go Normal file
View File

@@ -0,0 +1,46 @@
package api
import (
"net"
"github.com/singchia/geminio"
)
type Exchange interface {
// rpc, message and raw io to edge
ForwardToEdge(*Meta, geminio.End)
// stream to edge
StreamToEdge(geminio.Stream)
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
StreamToService(geminio.Stream)
}
// edge related
type Edgebound interface {
ListEdges() []geminio.End
// for management
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
}
type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}
// service related
type Servicebound interface {
ListService() []geminio.End
// for management
GetService(service string) geminio.End
DelSerivces(service string) error
}
type ServiceInformer interface {
ServiceOnline(serviceID uint64, service string, addr net.Addr)
ServiceOffline(serviceID uint64, service string, addr net.Addr)
ServiceHeartbeat(serviceID uint64, service string, addr net.Addr)
}

View File

@@ -1,4 +1,4 @@
package proto
package api
// frontier -> service
type OnEdgeOnline struct {

View File

@@ -10,6 +10,7 @@ import (
"github.com/jumboframes/armorigo/rproxy"
"github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao"
@@ -24,31 +25,11 @@ import (
"k8s.io/klog/v2"
)
type Edgebound interface {
ListEdges() []geminio.End
// for management
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
}
type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}
type Exchange interface {
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
StreamToService(geminio.Stream)
}
type edgeManager struct {
*delegate.UnimplementedDelegate
informer EdgeInformer
exchange Exchange
informer api.EdgeInformer
exchange api.Exchange
conf *config.Configuration
// edgeID allocator
idFactory id.IDFactory
@@ -74,8 +55,8 @@ type edgeManager struct {
}
// support for tls, mtls and tcp listening
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer EdgeInformer,
exchange Exchange, tmr timer.Timer) (*edgeManager, error) {
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer,
exchange api.Exchange, tmr timer.Timer) (*edgeManager, error) {
listen := &conf.Edgebound.Listen
var (
ln net.Listener

View File

@@ -1,6 +1,7 @@
package servicebound
import (
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"k8s.io/klog/v2"
)
@@ -30,11 +31,11 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) {
}
// forward to exchange
func (sm *serviceManager) forward(end geminio.End) {
func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) {
serviceID := end.ClientID()
service := end.Meta()
service := meta.Service
klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service)
if sm.exchange != nil {
sm.exchange.ForwardToService(end)
sm.exchange.ForwardToEdge(meta, end)
}
}

View File

@@ -1,17 +1,19 @@
package servicebound
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"net"
"os"
"sync"
"github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/frontier/pkg/security"
"github.com/singchia/frontier/pkg/utils"
"github.com/singchia/geminio"
@@ -22,31 +24,11 @@ import (
"k8s.io/klog/v2"
)
type Servicebound interface {
ListService() []geminio.End
// for management
GetService(service string) geminio.End
DelSerivces(service string) error
}
type ServiceInformer interface {
ServiceOnline(serviceID uint64, service string, addr net.Addr)
ServiceOffline(serviceID uint64, service string, addr net.Addr)
ServiceHeartbeat(serviceID uint64, service string, addr net.Addr)
}
type Exchange interface {
// rpc, message and raw io to edge
ForwardToService(geminio.End)
// stream to edge
StreamToEdge(geminio.Stream)
}
type serviceManager struct {
*delegate.UnimplementedDelegate
informer ServiceInformer
exchange Exchange
informer api.ServiceInformer
exchange api.Exchange
conf *config.Configuration
// serviceID allocator
idFactory id.IDFactory
@@ -67,8 +49,8 @@ type serviceManager struct {
tmr timer.Timer
}
func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer ServiceInformer,
exchange Exchange, tmr timer.Timer) (*serviceManager, error) {
func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer,
exchange api.Exchange, tmr timer.Timer) (*serviceManager, error) {
listen := &conf.Servicebound.Listen
var (
ln net.Listener
@@ -173,19 +155,40 @@ func (sm *serviceManager) handleConn(conn net.Conn) error {
klog.Errorf("service manager geminio server new end err: %s", err)
return err
}
// handle online event for end
if err = sm.online(end); err != nil {
meta := &api.Meta{}
err = json.Unmarshal(end.Meta(), meta)
if err != nil {
klog.Errorf("handle conn, json unmarshal err: %s", err)
return err
}
// register topics claim of end
sm.remoteReceiveClaim(end.ClientID(), meta.Topics)
// register methods for service
if err = end.Register(context.TODO(), "topic_claim", sm.RemoteReceiveClaim); err != nil {
// handle online event for end
if err = sm.online(end, meta); err != nil {
return err
}
// forward and stream up to edge
sm.forward(end)
sm.forward(meta, end)
return nil
}
func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) error {
klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", topics, serviceID)
var err error
// memdb
for _, topic := range topics {
st := &model.ServiceTopic{
Topic: topic,
ServiceID: serviceID,
}
err = sm.dao.CreateServiceTopic(st)
if err != nil {
klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err)
return err
}
}
return nil
}

View File

@@ -1,13 +1,12 @@
package servicebound
import (
"context"
"encoding/json"
"net"
"strconv"
"time"
"github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/geminio"
@@ -15,7 +14,7 @@ import (
"k8s.io/klog/v2"
)
func (sm *serviceManager) online(end geminio.End) error {
func (sm *serviceManager) online(end geminio.End, meta *api.Meta) error {
// cache
var sync synchub.Sync
sm.mtx.Lock()
@@ -42,7 +41,7 @@ func (sm *serviceManager) online(end geminio.End) error {
// memdb
service := &model.Service{
ServiceID: end.ClientID(),
Service: string(end.Meta()),
Service: meta.Service,
Addr: end.RemoteAddr().String(),
CreateTime: time.Now().Unix(),
}
@@ -157,33 +156,6 @@ func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uin
}
}
// RemoteReceiveClaim is called by wrappered RPC
func (sm *serviceManager) RemoteReceiveClaim(ctx context.Context, req geminio.Request, rsp geminio.Response) {
// TODO return err
serviceID := req.ClientID()
claim := &TopicClaim{}
err := json.Unmarshal(req.Data(), claim)
if err != nil {
klog.Errorf("service remote receive claim, err: %s", err)
return
}
klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", claim.Topics, serviceID)
// memdb
for _, topic := range claim.Topics {
st := &model.ServiceTopic{
Topic: topic,
ServiceID: serviceID,
}
err = sm.dao.CreateServiceTopic(st)
if err != nil {
klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err)
return
}
}
}
// actually the meta is service
func (sm *serviceManager) GetClientID(meta []byte) (uint64, error) {
// TODO