diff --git a/go.mod b/go.mod index a39f31c..35f6cd3 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,9 @@ module github.com/singchia/frontier go 1.20 -replace github.com/singchia/geminio => ../../moresec/singchia/geminio - require ( github.com/jumboframes/armorigo v0.3.0 - github.com/singchia/geminio v1.1.1 + github.com/singchia/geminio v1.1.2 github.com/singchia/go-timer/v2 v2.2.1 github.com/soheilhy/cmux v0.1.5 github.com/spf13/pflag v1.0.5 diff --git a/pkg/api/interface.go b/pkg/api/interface.go index e75cba4..057f997 100644 --- a/pkg/api/interface.go +++ b/pkg/api/interface.go @@ -35,7 +35,9 @@ type EdgeInformer interface { type Servicebound interface { ListService() []geminio.End // for management - GetService(service string) geminio.End + GetServiceByName(service string) geminio.End + GetServiceByRPC(rpc string) (geminio.End, error) + GetServiceByTopic(topic string) (geminio.End, error) DelSerivces(service string) error } @@ -44,3 +46,27 @@ type ServiceInformer interface { ServiceOffline(serviceID uint64, service string, addr net.Addr) ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) } + +// mq related +type MQ interface { + Produce(topic string, data []byte, opts ...OptionProduce) error +} + +type ProduceOption struct { + Origin interface{} + EdgeID uint64 +} + +type OptionProduce func(*ProduceOption) + +func WithEdgeID(edgeID uint64) OptionProduce { + return func(po *ProduceOption) { + po.EdgeID = edgeID + } +} + +func WithOrigin(origin interface{}) OptionProduce { + return func(po *ProduceOption) { + po.Origin = origin + } +} diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index 178641a..5cf7cde 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -263,6 +263,13 @@ func (em *edgeManager) handleConn(conn net.Conn) error { return nil } +func (em *edgeManager) GetEdgeByID(edgeID uint64) geminio.End { + em.mtx.RLock() + defer em.mtx.RUnlock() + + return em.edges[edgeID] +} + func (em *edgeManager) ListEdges() []geminio.End { ends := []geminio.End{} em.mtx.RLock() diff --git a/pkg/exchange/exchange.go b/pkg/exchange/exchange.go index 5041928..6aeed0e 100644 --- a/pkg/exchange/exchange.go +++ b/pkg/exchange/exchange.go @@ -7,4 +7,5 @@ import ( type exchange struct { Edgebound api.Edgebound Servicebound api.Servicebound + MQ api.MQ } diff --git a/pkg/exchange/forward.go b/pkg/exchange/forward.go index 97f9c90..95dc872 100644 --- a/pkg/exchange/forward.go +++ b/pkg/exchange/forward.go @@ -20,15 +20,15 @@ func (ex *exchange) ForwardToEdge(meta *api.Meta, end geminio.End) { } func (ex *exchange) forwardRawToEdge(end geminio.End) { + //drop the io, actually we won't be here go func() { - klog.V(6).Infof("exchange forward raw, discard for now") - //drop the io, actually we won't be here + klog.V(6).Infof("exchange forward raw to edge, discard for now, serviceID: %d", end.ClientID()) io.Copy(io.Discard, end) }() } func (ex *exchange) forwardRPCToEdge(end geminio.End) { - // we hijack all rpc and forward them to edge + // we hijack all rpcs and forward them to edge end.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) { serviceID := end.ClientID() // get target edgeID @@ -39,14 +39,14 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) { // get edge edge := ex.Edgebound.GetEdgeByID(edgeID) if edge == nil { - klog.V(4).Infof("forward rpc, service: %d, call edge: %d is not online", serviceID, edgeID) + klog.V(4).Infof("forward rpc, serviceID: %d, call edgeID: %d, is not online", serviceID, edgeID) r2.SetError(api.ErrEdgeNotOnline) return } // call edge r3, err := edge.Call(ctx, method, r1) if err != nil { - klog.V(5).Infof("forward rpc, service: %d, call edge: %d err: %s", serviceID, edgeID, err) + klog.V(5).Infof("forward rpc, serviceID: %d, call edgeID: %d, err: %s", serviceID, edgeID, err) r2.SetError(err) return } @@ -59,9 +59,10 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) { } else { custom = append(custom, tail...) } + r2.SetCustom(custom) + // return r2.SetData(r3.Data()) r2.SetError(r3.Error()) - r2.SetCustom(custom) }) } @@ -72,10 +73,10 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { msg, err := end.Receive(context.TODO()) if err != nil { if err == io.EOF { - klog.V(5).Infof("forward message, service: %d receive EOF", serviceID) + klog.V(5).Infof("forward message, serviceID: %d, receive EOF", serviceID) return } - klog.Errorf("forward message, service: %d receive err: %s", serviceID, err) + klog.Errorf("forward message, serviceID: %d, receive err: %s", serviceID, err) continue } // get target edgeID @@ -86,14 +87,14 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { // get edge edge := ex.Edgebound.GetEdgeByID(edgeID) if edge == nil { - klog.V(4).Infof("forward message, service: %d, the edge: %d is not online", serviceID, edgeID) + klog.V(4).Infof("forward message, serviceID: %d, the edge: %d is not online", serviceID, edgeID) msg.Error(api.ErrEdgeNotOnline) return } // publish in sync, TODO publish in async err = edge.Publish(context.TODO(), msg) if err != nil { - klog.V(5).Infof("forward message, service: %d, publish edge: %d err: %s", serviceID, edgeID, err) + klog.V(5).Infof("forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err) msg.Error(err) return } @@ -101,3 +102,73 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { } }() } + +// raw io from edge, and forward to service +func (ex *exchange) forwardRawToService(end geminio.End) { + //drop the io, actually we won't be here + go func() { + klog.V(6).Infof("exchange forward raw to service, discard for now, edgeID: %d", end.ClientID()) + io.Copy(io.Discard, end) + }() +} + +// rpc from edge, and forward to service +func (ex *exchange) forwardRPCToService(end geminio.End) { + edgeID := end.ClientID() + // we hijack all rpcs and forward them to service + end.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) { + // get service + edge, err := ex.Servicebound.GetServiceByRPC(method) + if err != nil { + klog.Errorf("exchange forward rpc to service, get service by rpc err: %s, edgeID: %d", err, edgeID) + r2.SetError(err) + return + } + // we record the edgeID to service + tail := make([]byte, 8) + binary.BigEndian.PutUint64(tail, edgeID) + custom := r1.Custom() + if custom == nil { + custom = tail + } else { + custom = append(custom, tail...) + } + r1.SetCustom(custom) + // call + r3, err := edge.Call(ctx, method, r1) + if err != nil { + klog.Errorf("exchange forward rpc to service, call service err: %s, edgeID: %d", err, edgeID) + r2.SetError(err) + return + } + klog.V(6).Infof("exchange forward rpc to service, call service rpc: %s success, edgeID: %d", method, edgeID) + r2.SetData(r3.Data()) + }) +} + +// message from edge, and forward to topic owner +func (ex *exchange) forwardMessageToService(end geminio.End) { + edgeID := end.ClientID() + go func() { + for { + msg, err := end.Receive(context.TODO()) + if err != nil { + if err == io.EOF { + klog.V(5).Infof("forward message, edgeID: %d, receive EOF", edgeID) + return + } + klog.Errorf("forward message, receive err: %s, edgeID: %d, ", err, edgeID) + continue + } + topic := msg.Topic() + // TODO seperate async and sync produce + err = ex.MQ.Produce(topic, msg.Data(), api.WithOrigin(msg), api.WithEdgeID(edgeID)) + if err != nil { + klog.Errorf("forward message, produce err: %s, edgeID: %d", err, edgeID) + msg.Error(err) + continue + } + msg.Done() + } + }() +} diff --git a/pkg/repo/dao/dao_service.go b/pkg/repo/dao/dao_service.go index 532c955..1743e70 100644 --- a/pkg/repo/dao/dao_service.go +++ b/pkg/repo/dao/dao_service.go @@ -16,6 +16,7 @@ type ServiceQuery struct { ServiceID uint64 } +// service func (dao *Dao) ListServices(query *ServiceQuery) ([]*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) if dao.config.Log.Verbosity >= 4 { @@ -139,6 +140,18 @@ type ServiceRPCQuery struct { ServiceID uint64 } +func (dao *Dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { + tx := dao.dbService.Model(&model.ServiceRPC{}) + if dao.config.Log.Verbosity >= 4 { + tx = tx.Debug() + } + tx = tx.Where("rpc = ?", rpc) + + var mrpc model.ServiceRPC + tx = tx.First(&mrpc) + return &mrpc, tx.Error +} + func (dao *Dao) ListServiceRPCs(query *ServiceRPCQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) if dao.config.Log.Verbosity >= 4 { @@ -221,6 +234,18 @@ type ServiceTopicQuery struct { ServiceID uint64 } +func (dao *Dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { + tx := dao.dbService.Model(&model.ServiceTopic{}) + if dao.config.Log.Verbosity >= 4 { + tx = tx.Debug() + } + tx = tx.Where("topic = ?", topic) + + var mtopic model.ServiceTopic + tx = tx.First(&mtopic) + return &mtopic, tx.Error +} + func (dao *Dao) ListServiceTopics(query *ServiceTopicQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) if dao.config.Log.Verbosity >= 4 { diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 9de7f2b..5e81940 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -192,6 +192,39 @@ func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) return nil } +func (sm *serviceManager) GetServiceByID(serviceID uint64) geminio.End { + sm.mtx.RLock() + defer sm.mtx.RUnlock() + + return sm.services[serviceID] +} + +func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) { + mrpc, err := sm.dao.GetServiceRPC(rpc) + if err != nil { + klog.Errorf("get service by rpc: %s, err: %s", rpc, err) + return nil, err + } + + sm.mtx.RLock() + defer sm.mtx.RUnlock() + + return sm.services[mrpc.ServiceID], nil +} + +func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) { + mtopic, err := sm.dao.GetServiceTopic(topic) + if err != nil { + klog.Errorf("get service by topic: %s, err: %s", topic, err) + return nil, err + } + + sm.mtx.RLock() + defer sm.mtx.RUnlock() + + return sm.services[mtopic.ServiceID], nil +} + func (sm *serviceManager) ListService() []geminio.End { ends := []geminio.End{} sm.mtx.RLock()