stream: finish basic streams exchange logics

This commit is contained in:
singchia
2024-03-01 12:00:11 +08:00
parent 998f3dfdfd
commit 44f2c8ae73
17 changed files with 245 additions and 108 deletions

View File

@@ -6,7 +6,7 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"github.com/singchia/geminio/client" "github.com/singchia/geminio/client"
"github.com/singchia/geminio/options" "github.com/singchia/geminio/options"
@@ -32,7 +32,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
sopts.SetLog(sopt.logger) sopts.SetLog(sopt.logger)
} }
// meta // meta
meta := &api.Meta{} meta := &apis.Meta{}
if sopt.topics != nil { if sopt.topics != nil {
// we deliver topics in meta // we deliver topics in meta
meta.Topics = sopt.topics meta.Topics = sopt.topics
@@ -59,10 +59,11 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
// Control Register // Control Register
func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetEdgeID) error { func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetEdgeID) error {
return service.End.Register(ctx, api.RPCGetEdgeID, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { return service.End.Register(ctx, apis.RPCGetEdgeID, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
id, err := getEdgeID(req.Data()) id, err := getEdgeID(req.Data())
if err != nil { if err != nil {
// we just deliver the err back // we just deliver the err back
// get ID err will force close the edge unless EdgeIDAllocWhenNoIDServiceOn is configured
rsp.SetError(err) rsp.SetError(err)
return return
} }
@@ -73,8 +74,8 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE
} }
func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error { func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error {
return service.End.Register(ctx, api.RPCEdgeOnline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { return service.End.Register(ctx, apis.RPCEdgeOnline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
on := &api.OnEdgeOnline{} on := &apis.OnEdgeOnline{}
err := json.Unmarshal(req.Data(), on) err := json.Unmarshal(req.Data(), on)
if err != nil { if err != nil {
// shouldn't be here // shouldn't be here
@@ -83,6 +84,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed
} }
err = edgeOnline(on.EdgeID, on.Meta, on) err = edgeOnline(on.EdgeID, on.Meta, on)
if err != nil { if err != nil {
// online err will force close the edge
rsp.SetError(err) rsp.SetError(err)
return return
} }
@@ -91,8 +93,8 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed
} }
func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error { func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error {
return service.End.Register(ctx, api.RPCEdgeOffline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { return service.End.Register(ctx, apis.RPCEdgeOffline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
off := &api.OnEdgeOffline{} off := &apis.OnEdgeOffline{}
err := json.Unmarshal(req.Data(), off) err := json.Unmarshal(req.Data(), off)
if err != nil { if err != nil {
// shouldn't be here // shouldn't be here

View File

@@ -1,4 +1,4 @@
package api package apis
import ( import (
"errors" "errors"

View File

@@ -1,4 +1,4 @@
package api package apis
import ( import (
"net" "net"
@@ -11,7 +11,7 @@ type Exchange interface {
// rpc, message and raw io to edge // rpc, message and raw io to edge
ForwardToEdge(*Meta, geminio.End) ForwardToEdge(*Meta, geminio.End)
// stream to edge // stream to edge
// TODO StreamToEdge(geminio.Stream) StreamToEdge(geminio.Stream)
// For Edge // For Edge
GetEdgeID(meta []byte) (uint64, error) // get EdgeID for edge GetEdgeID(meta []byte) (uint64, error) // get EdgeID for edge
@@ -20,7 +20,7 @@ type Exchange interface {
// rpc, message and raw io to service // rpc, message and raw io to service
ForwardToService(geminio.End) ForwardToService(geminio.End)
// stream to service // stream to service
// TODO StreamToService(geminio.Stream) StreamToService(geminio.Stream)
// for exchange // for exchange
AddEdgebound(Edgebound) AddEdgebound(Edgebound)
@@ -42,7 +42,7 @@ type Edgebound interface {
type Servicebound interface { type Servicebound interface {
ListService() []geminio.End ListService() []geminio.End
// for management // for management
// TODO GetServiceByName(service string) geminio.End GetServiceByName(service string) (geminio.End, error)
GetServiceByRPC(rpc string) (geminio.End, error) GetServiceByRPC(rpc string) (geminio.End, error)
GetServiceByTopic(topic string) (geminio.End, error) GetServiceByTopic(topic string) (geminio.End, error)
DelSerivces(service string) error DelSerivces(service string) error

View File

@@ -1,4 +1,4 @@
package api package apis
// frontier -> service // frontier -> service
// global rpcs // global rpcs

View File

@@ -15,7 +15,7 @@ func (em *edgeManager) acceptStream(stream geminio.Stream) {
em.streams.MSet(edgeID, streamID, stream) em.streams.MSet(edgeID, streamID, stream)
// exchange to service // exchange to service
if em.exchange != nil { if em.exchange != nil {
// TODO em.exchange.StreamToService(stream) em.exchange.StreamToService(stream)
} }
} }

View File

@@ -12,7 +12,7 @@ import (
"github.com/jumboframes/armorigo/log" "github.com/jumboframes/armorigo/log"
"github.com/jumboframes/armorigo/rproxy" "github.com/jumboframes/armorigo/rproxy"
"github.com/jumboframes/armorigo/synchub" "github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/dao"
@@ -27,8 +27,8 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func NewEdgebound(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer, func NewEdgebound(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer,
exchange api.Exchange, tmr timer.Timer) (api.Edgebound, error) { exchange apis.Exchange, tmr timer.Timer) (apis.Edgebound, error) {
return newEdgeManager(conf, dao, informer, exchange, tmr) return newEdgeManager(conf, dao, informer, exchange, tmr)
} }
@@ -36,8 +36,8 @@ type edgeManager struct {
*delegate.UnimplementedDelegate *delegate.UnimplementedDelegate
conf *config.Configuration conf *config.Configuration
informer api.EdgeInformer informer apis.EdgeInformer
exchange api.Exchange exchange apis.Exchange
// edgeID allocator // edgeID allocator
idFactory id.IDFactory idFactory id.IDFactory
@@ -63,8 +63,8 @@ type edgeManager struct {
} }
// support for tls, mtls and tcp listening // support for tls, mtls and tcp listening
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer, func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer,
exchange api.Exchange, tmr timer.Timer) (*edgeManager, error) { exchange apis.Exchange, tmr timer.Timer) (*edgeManager, error) {
listen := &conf.Edgebound.Listen listen := &conf.Edgebound.Listen
var ( var (
ln net.Listener ln net.Listener
@@ -244,7 +244,7 @@ func (em *edgeManager) Serve() {
for { for {
conn, err := em.geminioLn.Accept() conn, err := em.geminioLn.Accept()
if err != nil { if err != nil {
if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { if !strings.Contains(err.Error(), apis.ErrStrUseOfClosedConnection) {
klog.V(1).Infof("edge manager listener accept err: %s", err) klog.V(1).Infof("edge manager listener accept err: %s", err)
} }
return return

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/jumboframes/armorigo/synchub" "github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/geminio" "github.com/singchia/geminio"
@@ -106,7 +106,7 @@ func (em *edgeManager) ConnOnline(d delegate.ConnDescriber) error {
// exchange to service // exchange to service
if em.exchange != nil { if em.exchange != nil {
err := em.exchange.EdgeOnline(edgeID, meta, addr) err := em.exchange.EdgeOnline(edgeID, meta, addr)
if err == api.ErrServiceNotOnline { if err == apis.ErrServiceNotOnline {
return nil return nil
} }
} }
@@ -176,7 +176,7 @@ func (em *edgeManager) GetClientID(meta []byte) (uint64, error) {
} }
} }
if err == api.ErrServiceNotOnline && em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn { if err == apis.ErrServiceNotOnline && em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn {
edgeID = em.idFactory.GetID() edgeID = em.idFactory.GetID()
klog.V(2).Infof("edge get edgeID: %d, meta: %s, after no ID acquired from exchange", edgeID, string(meta)) klog.V(2).Infof("edge get edgeID: %d, meta: %s, after no ID acquired from exchange", edgeID, string(meta))
return em.idFactory.GetID(), nil return em.idFactory.GetID(), nil

View File

@@ -1,23 +1,23 @@
package exchange package exchange
import ( import (
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/config"
) )
type exchange struct { type exchange struct {
conf *config.Configuration conf *config.Configuration
Edgebound api.Edgebound Edgebound apis.Edgebound
Servicebound api.Servicebound Servicebound apis.Servicebound
MQM api.MQM MQM apis.MQM
} }
func NewExchange(conf *config.Configuration, mqm api.MQM) (api.Exchange, error) { func NewExchange(conf *config.Configuration, mqm apis.MQM) (apis.Exchange, error) {
return newExchange(conf, mqm) return newExchange(conf, mqm)
} }
func newExchange(conf *config.Configuration, mqm api.MQM) (*exchange, error) { func newExchange(conf *config.Configuration, mqm apis.MQM) (*exchange, error) {
exchange := &exchange{ exchange := &exchange{
conf: conf, conf: conf,
MQM: mqm, MQM: mqm,
@@ -25,10 +25,10 @@ func newExchange(conf *config.Configuration, mqm api.MQM) (*exchange, error) {
return exchange, nil return exchange, nil
} }
func (ex *exchange) AddEdgebound(edgebound api.Edgebound) { func (ex *exchange) AddEdgebound(edgebound apis.Edgebound) {
ex.Edgebound = edgebound ex.Edgebound = edgebound
} }
func (ex *exchange) AddServicebound(servicebound api.Servicebound) { func (ex *exchange) AddServicebound(servicebound apis.Servicebound) {
ex.Servicebound = servicebound ex.Servicebound = servicebound
} }

View File

@@ -5,12 +5,13 @@ import (
"encoding/binary" "encoding/binary"
"io" "io"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"github.com/singchia/geminio/options"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func (ex *exchange) ForwardToEdge(meta *api.Meta, end geminio.End) { func (ex *exchange) ForwardToEdge(meta *apis.Meta, end geminio.End) {
// raw // raw
ex.forwardRawToEdge(end) ex.forwardRawToEdge(end)
// message // message
@@ -40,12 +41,14 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) {
edge := ex.Edgebound.GetEdgeByID(edgeID) edge := ex.Edgebound.GetEdgeByID(edgeID)
if edge == nil { if edge == nil {
klog.V(1).Infof("service forward rpc, serviceID: %d, call edgeID: %d, is not online", serviceID, edgeID) klog.V(1).Infof("service forward rpc, serviceID: %d, call edgeID: %d, is not online", serviceID, edgeID)
r2.SetError(api.ErrEdgeNotOnline) r2.SetError(apis.ErrEdgeNotOnline)
return return
} }
// call edge // call edge
r1.SetClientID(edge.ClientID()) ropt := options.NewRequest()
r3, err := edge.Call(ctx, method, r1) ropt.SetCustom(r1.Custom())
r3 := edge.NewRequest(r1.Data(), ropt)
r4, err := edge.Call(ctx, method, r3)
if err != nil { if err != nil {
klog.V(2).Infof("service forward rpc, serviceID: %d, call edgeID: %d, err: %s", serviceID, edgeID, err) klog.V(2).Infof("service forward rpc, serviceID: %d, call edgeID: %d, err: %s", serviceID, edgeID, err)
r2.SetError(err) r2.SetError(err)
@@ -54,7 +57,7 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) {
// we record the edgeID back to r2, for service // we record the edgeID back to r2, for service
tail := make([]byte, 8) tail := make([]byte, 8)
binary.BigEndian.PutUint64(tail, edgeID) binary.BigEndian.PutUint64(tail, edgeID)
custom = r3.Custom() custom = r4.Custom()
if custom == nil { if custom == nil {
custom = tail custom = tail
} else { } else {
@@ -62,8 +65,8 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) {
} }
r2.SetCustom(custom) r2.SetCustom(custom)
// return // return
r2.SetData(r3.Data()) r2.SetData(r4.Data())
r2.SetError(r3.Error()) r2.SetError(r4.Error())
}) })
} }
@@ -90,12 +93,16 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) {
edge := ex.Edgebound.GetEdgeByID(edgeID) edge := ex.Edgebound.GetEdgeByID(edgeID)
if edge == nil { if edge == nil {
klog.V(1).Infof("service forward message, serviceID: %d, the edge: %d is not online", serviceID, edgeID) klog.V(1).Infof("service forward message, serviceID: %d, the edge: %d is not online", serviceID, edgeID)
msg.Error(api.ErrEdgeNotOnline) msg.Error(apis.ErrEdgeNotOnline)
return return
} }
// publish in sync, TODO publish in async // publish in sync, TODO publish in async
msg.SetClientID(edgeID) mopt := options.NewMessage()
err = edge.Publish(context.TODO(), msg) mopt.SetCustom(msg.Custom())
mopt.SetTopic(msg.Topic())
mopt.SetCnss(msg.Cnss())
newmsg := edge.NewMessage(msg.Data(), mopt)
err = edge.Publish(context.TODO(), newmsg)
if err != nil { if err != nil {
klog.V(2).Infof("service forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err) klog.V(2).Infof("service forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err)
msg.Error(err) msg.Error(err)
@@ -146,17 +153,21 @@ func (ex *exchange) forwardRPCToService(end geminio.End) {
} else { } else {
custom = append(custom, tail...) custom = append(custom, tail...)
} }
r1.SetCustom(custom)
r1.SetClientID(serviceID)
// call // call
r3, err := svc.Call(ctx, method, r1) ropt := options.NewRequest()
ropt.SetCustom(custom)
r3 := svc.NewRequest(r1.Data(), ropt)
r4, err := svc.Call(ctx, method, r3)
if err != nil { if err != nil {
klog.Errorf("edge forward rpc to service, call service: %d err: %s, edgeID: %d", serviceID, err, edgeID) klog.Errorf("edge forward rpc to service, call service: %d err: %s, edgeID: %d", serviceID, err, edgeID)
r2.SetError(err) r2.SetError(err)
return return
} }
klog.V(3).Infof("edge forward rpc to service, call service: %d rpc: %s success, edgeID: %d", serviceID, method, edgeID) klog.V(3).Infof("edge forward rpc to service, call service: %d rpc: %s to edgeID: %d success", serviceID, method, edgeID)
r2.SetData(r3.Data())
r2.SetData(r4.Data())
r2.SetCustom(r4.Custom())
}) })
} }
@@ -176,7 +187,7 @@ func (ex *exchange) forwardMessageToService(end geminio.End) {
} }
topic := msg.Topic() topic := msg.Topic()
// TODO seperate async and sync produce // TODO seperate async and sync produce
err = ex.MQM.Produce(topic, msg.Data(), api.WithOrigin(msg), api.WithEdgeID(edgeID)) err = ex.MQM.Produce(topic, msg.Data(), apis.WithOrigin(msg), apis.WithEdgeID(edgeID))
if err != nil { if err != nil {
klog.Errorf("edge forward message, produce err: %s, edgeID: %d", err, edgeID) klog.Errorf("edge forward message, produce err: %s, edgeID: %d", err, edgeID)
msg.Error(err) msg.Error(err)

View File

@@ -6,44 +6,45 @@ import (
"encoding/json" "encoding/json"
"net" "net"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCGetEdgeID) svc, err := ex.Servicebound.GetServiceByRPC(apis.RPCGetEdgeID)
if err != nil { if err != nil {
klog.V(2).Infof("exchange get edgeID, get service err: %s, meta: %s", err, string(meta)) klog.V(2).Infof("exchange get edgeID, get service err: %s, meta: %s", err, string(meta))
if err == api.ErrRecordNotFound { if err == apis.ErrRecordNotFound {
return 0, api.ErrServiceNotOnline return 0, apis.ErrServiceNotOnline
} }
return 0, err return 0, err
} }
// call service // call service
req := svc.NewRequest(meta) req := svc.NewRequest(meta)
rsp, err := svc.Call(context.TODO(), api.RPCGetEdgeID, req) rsp, err := svc.Call(context.TODO(), apis.RPCGetEdgeID, req)
if err != nil { if err != nil {
klog.V(2).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta) klog.V(2).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta)
return 0, err return 0, err
} }
data := rsp.Data() data := rsp.Data()
if data == nil || len(data) != 8 { if data == nil || len(data) != 8 {
return 0, api.ErrIllegalEdgeID return 0, apis.ErrIllegalEdgeID
} }
return binary.BigEndian.Uint64(data), nil return binary.BigEndian.Uint64(data), nil
} }
func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error { func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOnline) svc, err := ex.Servicebound.GetServiceByRPC(apis.RPCEdgeOnline)
if err != nil { if err != nil {
klog.V(2).Infof("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) klog.V(2).Infof("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == api.ErrRecordNotFound { if err == apis.ErrRecordNotFound {
return api.ErrServiceNotOnline return apis.ErrServiceNotOnline
} }
return err return err
} }
// call service the edge online event // call service the edge online event
event := &api.OnEdgeOnline{ event := &apis.OnEdgeOnline{
EdgeID: edgeID, EdgeID: edgeID,
Meta: meta, Meta: meta,
Net: addr.Network(), Net: addr.Network(),
@@ -56,7 +57,7 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error
} }
// call service // call service
req := svc.NewRequest(data) req := svc.NewRequest(data)
_, err = svc.Call(context.TODO(), api.RPCEdgeOnline, req) _, err = svc.Call(context.TODO(), apis.RPCEdgeOnline, req)
if err != nil { if err != nil {
klog.V(2).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) klog.V(2).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr)
return err return err
@@ -65,16 +66,16 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error
} }
func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error { func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOffline) svc, err := ex.Servicebound.GetServiceByRPC(apis.RPCEdgeOffline)
if err != nil { if err != nil {
klog.V(2).Infof("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) klog.V(2).Infof("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == api.ErrRecordNotFound { if err == apis.ErrRecordNotFound {
return api.ErrServiceNotOnline return apis.ErrServiceNotOnline
} }
return err return err
} }
// call service the edge offline event // call service the edge offline event
event := &api.OnEdgeOffline{ event := &apis.OnEdgeOffline{
EdgeID: edgeID, EdgeID: edgeID,
Meta: meta, Meta: meta,
Net: addr.Network(), Net: addr.Network(),
@@ -87,7 +88,7 @@ func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error
} }
// call service // call service
req := svc.NewRequest(data) req := svc.NewRequest(data)
_, err = svc.Call(context.TODO(), api.RPCEdgeOffline, req) _, err = svc.Call(context.TODO(), apis.RPCEdgeOffline, req)
if err != nil { if err != nil {
klog.V(2).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) klog.V(2).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr)
return err return err

View File

@@ -2,10 +2,11 @@ package exchange
import ( import (
"context" "context"
"encoding/binary" "io"
"strconv" "strconv"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"github.com/singchia/geminio/options"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@@ -33,20 +34,74 @@ func (ex *exchange) StreamToEdge(serviceStream geminio.Stream) {
// open stream from edge // open stream from edge
edgeStream, err := edge.OpenStream() edgeStream, err := edge.OpenStream()
if err != nil { if err != nil {
klog.Errorf("stream to edge, open stream err: %s, serviceID: %d, edge:ID: %d", err, serviceID, streamID) klog.Errorf("stream to edge, open stream err: %s, serviceID: %d, edgeID: %d", err, serviceID, streamID)
serviceStream.Close() serviceStream.Close()
return return
} }
// do stream forward // do stream forward
ex.streamForward(serviceStream, edgeStream)
} }
func (ex *exchange) streamForward(left, right geminio.Stream) {} func (ex *exchange) StreamToService(edgeStream geminio.Stream) {
edgeID := edgeStream.ClientID()
streamID := edgeStream.StreamID()
func (ex *exchange) streamForwardMessage(left, right geminio.Stream) { // get service
RecvPub := func(from, to geminio.Stream) { peer := edgeStream.Peer()
svc, err := ex.Servicebound.GetServiceByName(peer)
if err != nil {
klog.V(1).Infof("stream to service, get service: %s err: %s, edgeID: %d, streamID: %d", peer, err, edgeID, streamID)
// TODO return err to the stream
edgeStream.ClientID()
return
}
serviceStream, err := svc.OpenStream()
if err != nil {
klog.Errorf("stream to service, open stream err: %s, serviceID: %d, edgeID: %d", err, svc.ClientID(), edgeID)
edgeStream.Close()
return
}
// do stream forward
ex.streamForward(edgeStream, serviceStream)
}
func (ex *exchange) streamForward(left, right geminio.Stream) {
// raw
ex.streamForwardRaw(left, right)
// message
ex.streamForwardMessage(left, right)
// rpc
ex.streamForwardRPC(left, right)
}
func (ex *exchange) streamForwardRaw(left, right geminio.Stream) {
copy := func(from, to geminio.Stream) {
fromID := from.ClientID() fromID := from.ClientID()
toID := to.ClientID() toID := to.ClientID()
n, err := io.Copy(to, from)
if err != nil {
klog.Errorf("stream forward raw, copy err: %s, fromID: %d, toID: %d, written: %d", err, fromID, toID, n)
} else {
klog.V(4).Infof("stream forward raw done, fromID: %d, toID: %d, written: %d", fromID, toID, n)
}
from.Close()
to.Close()
}
go copy(left, right)
go copy(right, left)
}
func (ex *exchange) streamForwardMessage(left, right geminio.Stream) {
recvPub := func(from, to geminio.Stream) {
fromID := from.ClientID()
toID := to.ClientID()
for { for {
msg, err := from.Receive(context.TODO()) msg, err := from.Receive(context.TODO())
if err != nil { if err != nil {
@@ -54,9 +109,48 @@ func (ex *exchange) streamForwardMessage(left, right geminio.Stream) {
return return
} }
// we record the ID to peer mopt := options.NewMessage()
tail := make([]byte, 8) mopt.SetCustom(msg.Custom())
binary.BigEndian.PutUint64(tail, msg.ClientID()) mopt.SetTopic(msg.Topic())
mopt.SetCnss(msg.Cnss())
newmsg := to.NewMessage(msg.Data(), mopt)
err = to.Publish(context.TODO(), newmsg)
if err != nil {
klog.Errorf("stream forward message, publish err: %s, fromID: %d, toID: %d", err, fromID, toID)
msg.Error(err)
return
}
msg.Done()
} }
} }
go recvPub(left, right)
go recvPub(right, left)
}
func (ex *exchange) streamForwardRPC(left, right geminio.Stream) {
forwardRPC := func(from, to geminio.Stream) {
fromID := from.ClientID()
toID := to.ClientID()
from.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) {
// TODO to carry the fromID to next Call
ropt := options.NewRequest()
ropt.SetCustom(r1.Custom())
r3 := to.NewRequest(r1.Data(), ropt)
r4, err := to.Call(ctx, method, r3)
if err != nil {
klog.Errorf("stream forward rpc, call err: %s, fromID: %d, toID: %d", err, fromID, toID)
r2.SetError(err)
return
}
klog.V(3).Infof("stream froward rpc, call fromID: %d rpc: %s to toID: %d success", fromID, method, toID)
r2.SetData(r4.Data())
r2.SetCustom(r4.Custom())
})
}
forwardRPC(left, right)
forwardRPC(right, left)
} }

View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/config"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@@ -14,24 +14,24 @@ type mqManager struct {
conf *config.Configuration conf *config.Configuration
// mqs // mqs
mtx sync.RWMutex mtx sync.RWMutex
mqs map[string][]api.MQ // key: topic, value: mqs mqs map[string][]apis.MQ // key: topic, value: mqs
mqindex map[string]*uint64 // for round robin mqindex map[string]*uint64 // for round robin
} }
func NewMQM(conf *config.Configuration) (api.MQM, error) { func NewMQM(conf *config.Configuration) (apis.MQM, error) {
return newMQManager(conf) return newMQManager(conf)
} }
func newMQManager(conf *config.Configuration) (*mqManager, error) { func newMQManager(conf *config.Configuration) (*mqManager, error) {
mqm := &mqManager{ mqm := &mqManager{
mqs: make(map[string][]api.MQ), mqs: make(map[string][]apis.MQ),
mqindex: make(map[string]*uint64), mqindex: make(map[string]*uint64),
conf: conf, conf: conf,
} }
return mqm, nil return mqm, nil
} }
func (mqm *mqManager) AddMQ(topics []string, mq api.MQ) { func (mqm *mqManager) AddMQ(topics []string, mq apis.MQ) {
mqm.mtx.Lock() mqm.mtx.Lock()
defer mqm.mtx.Unlock() defer mqm.mtx.Unlock()
@@ -39,7 +39,7 @@ func (mqm *mqManager) AddMQ(topics []string, mq api.MQ) {
mqs, ok := mqm.mqs[topic] mqs, ok := mqm.mqs[topic]
if !ok { if !ok {
klog.V(2).Infof("mq manager, add topic: %s mq succeed", topic) klog.V(2).Infof("mq manager, add topic: %s mq succeed", topic)
mqm.mqs[topic] = []api.MQ{mq} mqm.mqs[topic] = []apis.MQ{mq}
mqm.mqindex[topic] = new(uint64) mqm.mqindex[topic] = new(uint64)
continue continue
} }
@@ -69,12 +69,12 @@ func (mqm *mqManager) AddMQByEnd(topics []string, end geminio.End) {
mqm.AddMQ(topics, mq) mqm.AddMQ(topics, mq)
} }
func (mqm *mqManager) DelMQ(mq api.MQ) { func (mqm *mqManager) DelMQ(mq apis.MQ) {
mqm.mtx.Lock() mqm.mtx.Lock()
defer mqm.mtx.Unlock() defer mqm.mtx.Unlock()
for topic, mqs := range mqm.mqs { for topic, mqs := range mqm.mqs {
news := []api.MQ{} news := []apis.MQ{}
for _, exist := range mqs { for _, exist := range mqs {
if exist == mq { if exist == mq {
klog.V(3).Infof("mq manager, del topic: %s mq succeed", topic) klog.V(3).Infof("mq manager, del topic: %s mq succeed", topic)
@@ -98,7 +98,7 @@ func (mqm *mqManager) DelMQByEnd(end geminio.End) {
defer mqm.mtx.Unlock() defer mqm.mtx.Unlock()
for topic, mqs := range mqm.mqs { for topic, mqs := range mqm.mqs {
news := []api.MQ{} news := []apis.MQ{}
for _, exist := range mqs { for _, exist := range mqs {
left, ok := exist.(*mqService) left, ok := exist.(*mqService)
if ok { if ok {
@@ -118,7 +118,7 @@ func (mqm *mqManager) DelMQByEnd(end geminio.End) {
} }
} }
func (mqm *mqManager) GetMQ(topic string) api.MQ { func (mqm *mqManager) GetMQ(topic string) apis.MQ {
mqm.mtx.RLock() mqm.mtx.RLock()
defer mqm.mtx.RUnlock() defer mqm.mtx.RUnlock()
@@ -133,18 +133,18 @@ func (mqm *mqManager) GetMQ(topic string) api.MQ {
return mqs[i] return mqs[i]
} }
func (mqm *mqManager) GetMQs(topic string) []api.MQ { func (mqm *mqManager) GetMQs(topic string) []apis.MQ {
mqm.mtx.RLock() mqm.mtx.RLock()
defer mqm.mtx.RUnlock() defer mqm.mtx.RUnlock()
return mqm.mqs[topic] return mqm.mqs[topic]
} }
func (mqm *mqManager) Produce(topic string, data []byte, opts ...api.OptionProduce) error { func (mqm *mqManager) Produce(topic string, data []byte, opts ...apis.OptionProduce) error {
mq := mqm.GetMQ(topic) mq := mqm.GetMQ(topic)
if mq == nil { if mq == nil {
mq = mqm.GetMQ("*") mq = mqm.GetMQ("*")
if mq == nil { if mq == nil {
err := api.ErrTopicNotOnline err := apis.ErrTopicNotOnline
klog.Errorf("mq manager, get mq nil, topic: %s err: %s", topic, err) klog.Errorf("mq manager, get mq nil, topic: %s err: %s", topic, err)
return err return err
} }

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"github.com/singchia/geminio/options" "github.com/singchia/geminio/options"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@@ -14,12 +14,12 @@ type mqService struct {
end geminio.End end geminio.End
} }
func NewMQServiceFromEnd(end geminio.End) api.MQ { func NewMQServiceFromEnd(end geminio.End) apis.MQ {
return &mqService{end} return &mqService{end}
} }
func (mq *mqService) Produce(topic string, data []byte, opts ...api.OptionProduce) error { func (mq *mqService) Produce(topic string, data []byte, opts ...apis.OptionProduce) error {
opt := &api.ProduceOption{} opt := &apis.ProduceOption{}
for _, fun := range opts { for _, fun := range opts {
fun(opt) fun(opt)
} }
@@ -39,6 +39,7 @@ func (mq *mqService) Produce(topic string, data []byte, opts ...api.OptionProduc
mopt := options.NewMessage() mopt := options.NewMessage()
mopt.SetCustom(custom) mopt.SetCustom(custom)
mopt.SetTopic(topic) mopt.SetTopic(topic)
mopt.SetCnss(msg.Cnss())
newmsg := mq.end.NewMessage(data, mopt) newmsg := mq.end.NewMessage(data, mopt)
err := mq.end.Publish(context.TODO(), newmsg) err := mq.end.Publish(context.TODO(), newmsg)
if err != nil { if err != nil {

View File

@@ -75,6 +75,21 @@ func (dao *Dao) GetService(serviceID uint64) (*model.Service, error) {
return &service, tx.Error return &service, tx.Error
} }
func (dao *Dao) GetServiceByName(name string) (*model.Service, error) {
tx := dao.dbService.Model(&model.Service{})
if dao.config.Dao.Debug {
tx = tx.Debug()
}
tx = tx.Where("service = ?", name).Limit(1)
var service model.Service
tx = tx.Find(&service)
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return &service, tx.Error
}
type ServiceDelete struct { type ServiceDelete struct {
ServiceID uint64 ServiceID uint64
Addr string Addr string

View File

@@ -1,7 +1,7 @@
package servicebound package servicebound
import ( import (
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@@ -16,7 +16,7 @@ func (sm *serviceManager) acceptStream(stream geminio.Stream) {
sm.streams.MSet(serviceID, streamID, stream) sm.streams.MSet(serviceID, streamID, stream)
// exchange to edge // exchange to edge
if sm.exchange != nil { if sm.exchange != nil {
// TODO sm.exchange.StreamToEdge(stream) sm.exchange.StreamToEdge(stream)
} }
} }
@@ -31,7 +31,7 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) {
} }
// forward to exchange // forward to exchange
func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) { func (sm *serviceManager) forward(meta *apis.Meta, end geminio.End) {
serviceID := end.ClientID() serviceID := end.ClientID()
service := meta.Service service := meta.Service
klog.V(2).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service) klog.V(2).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service)

View File

@@ -12,7 +12,7 @@ import (
"github.com/jumboframes/armorigo/log" "github.com/jumboframes/armorigo/log"
"github.com/jumboframes/armorigo/synchub" "github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/dao"
@@ -27,23 +27,23 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func NewServicebound(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer, func NewServicebound(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer,
exchange api.Exchange, mqm api.MQM, tmr timer.Timer) (api.Servicebound, error) { exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (apis.Servicebound, error) {
return newServiceManager(conf, dao, informer, exchange, mqm, tmr) return newServiceManager(conf, dao, informer, exchange, mqm, tmr)
} }
type end struct { type end struct {
End geminio.End End geminio.End
Meta *api.Meta Meta *apis.Meta
} }
type serviceManager struct { type serviceManager struct {
*delegate.UnimplementedDelegate *delegate.UnimplementedDelegate
conf *config.Configuration conf *config.Configuration
informer api.ServiceInformer informer apis.ServiceInformer
exchange api.Exchange exchange apis.Exchange
mqm api.MQM mqm apis.MQM
// serviceID allocator // serviceID allocator
idFactory id.IDFactory idFactory id.IDFactory
@@ -64,8 +64,8 @@ type serviceManager struct {
tmr timer.Timer tmr timer.Timer
} }
func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer, func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer,
exchange api.Exchange, mqm api.MQM, tmr timer.Timer) (*serviceManager, error) { exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (*serviceManager, error) {
listen := &conf.Servicebound.Listen listen := &conf.Servicebound.Listen
var ( var (
ln net.Listener ln net.Listener
@@ -153,7 +153,7 @@ func (sm *serviceManager) Serve() {
for { for {
conn, err := sm.ln.Accept() conn, err := sm.ln.Accept()
if err != nil { if err != nil {
if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { if !strings.Contains(err.Error(), apis.ErrStrUseOfClosedConnection) {
klog.V(1).Infof("service manager listener accept err: %s", err) klog.V(1).Infof("service manager listener accept err: %s", err)
} }
return return
@@ -176,7 +176,7 @@ func (sm *serviceManager) handleConn(conn net.Conn) error {
klog.Errorf("service manager geminio server new end err: %s", err) klog.Errorf("service manager geminio server new end err: %s", err)
return err return err
} }
meta := &api.Meta{} meta := &apis.Meta{}
err = json.Unmarshal(end.Meta(), meta) err = json.Unmarshal(end.Meta(), meta)
if err != nil { if err != nil {
klog.Errorf("handle conn, json unmarshal err: %s", err) klog.Errorf("handle conn, json unmarshal err: %s", err)
@@ -242,6 +242,19 @@ func (sm *serviceManager) GetServiceByID(serviceID uint64) geminio.End {
return sm.services[serviceID] return sm.services[serviceID]
} }
func (sm *serviceManager) GetServiceByName(name string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mservice, err := sm.dao.GetServiceByName(name)
if err != nil {
klog.V(2).Infof("get service by name: %s, err: %s", name, err)
return nil, err
}
return sm.services[mservice.ServiceID], nil
}
func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) { func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) {
sm.mtx.RLock() sm.mtx.RLock()
defer sm.mtx.RUnlock() defer sm.mtx.RUnlock()

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/jumboframes/armorigo/synchub" "github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/geminio" "github.com/singchia/geminio"
@@ -14,7 +14,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
func (sm *serviceManager) online(end geminio.End, meta *api.Meta) error { func (sm *serviceManager) online(end geminio.End, meta *apis.Meta) error {
// cache // cache
var sync synchub.Sync var sync synchub.Sync
sm.mtx.Lock() sm.mtx.Lock()