diff --git a/api/v1/service/service_end.go b/api/v1/service/service_end.go index 23abbc7..2a1b777 100644 --- a/api/v1/service/service_end.go +++ b/api/v1/service/service_end.go @@ -6,7 +6,7 @@ import ( "encoding/json" "strconv" - "github.com/singchia/frontier/pkg/proto" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/geminio" "github.com/singchia/geminio/client" "github.com/singchia/geminio/options" @@ -32,7 +32,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er sopts.SetLog(sopt.logger) } // meta - meta := &proto.Meta{} + meta := &api.Meta{} if sopt.topics != nil { // we deliver topics in meta meta.Topics = sopt.topics @@ -74,7 +74,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error { return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { - on := &proto.OnEdgeOnline{} + on := &api.OnEdgeOnline{} err := json.Unmarshal(req.Data(), on) if err != nil { // shouldn't be here @@ -92,7 +92,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error { return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { - off := &proto.OnEdgeOffline{} + off := &api.OnEdgeOffline{} err := json.Unmarshal(req.Data(), off) if err != nil { // shouldn't be here diff --git a/pkg/api/error.go b/pkg/api/error.go new file mode 100644 index 0000000..453e780 --- /dev/null +++ b/pkg/api/error.go @@ -0,0 +1,7 @@ +package api + +import "errors" + +var ( + ErrEdgeNotOnline = errors.New("edge not online") +) diff --git a/pkg/api/interface.go b/pkg/api/interface.go new file mode 100644 index 0000000..e75cba4 --- /dev/null +++ b/pkg/api/interface.go @@ -0,0 +1,46 @@ +package api + +import ( + "net" + + "github.com/singchia/geminio" +) + +type Exchange interface { + // rpc, message and raw io to edge + ForwardToEdge(*Meta, geminio.End) + // stream to edge + StreamToEdge(geminio.Stream) + // rpc, message and raw io to service + ForwardToService(geminio.End) + // stream to service + StreamToService(geminio.Stream) +} + +// edge related +type Edgebound interface { + ListEdges() []geminio.End + // for management + GetEdgeByID(edgeID uint64) geminio.End + DelEdgeByID(edgeID uint64) error +} + +type EdgeInformer interface { + EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) + EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) + EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) +} + +// service related +type Servicebound interface { + ListService() []geminio.End + // for management + GetService(service string) geminio.End + DelSerivces(service string) error +} + +type ServiceInformer interface { + ServiceOnline(serviceID uint64, service string, addr net.Addr) + ServiceOffline(serviceID uint64, service string, addr net.Addr) + ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) +} diff --git a/pkg/proto/proto.go b/pkg/api/proto.go similarity index 97% rename from pkg/proto/proto.go rename to pkg/api/proto.go index bc29909..e1f696f 100644 --- a/pkg/proto/proto.go +++ b/pkg/api/proto.go @@ -1,4 +1,4 @@ -package proto +package api // frontier -> service type OnEdgeOnline struct { diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index ae378ba..178641a 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -10,6 +10,7 @@ import ( "github.com/jumboframes/armorigo/rproxy" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/repo/dao" @@ -24,31 +25,11 @@ import ( "k8s.io/klog/v2" ) -type Edgebound interface { - ListEdges() []geminio.End - // for management - GetEdgeByID(edgeID uint64) geminio.End - DelEdgeByID(edgeID uint64) error -} - -type EdgeInformer interface { - EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) - EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) - EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) -} - -type Exchange interface { - // rpc, message and raw io to service - ForwardToService(geminio.End) - // stream to service - StreamToService(geminio.Stream) -} - type edgeManager struct { *delegate.UnimplementedDelegate - informer EdgeInformer - exchange Exchange + informer api.EdgeInformer + exchange api.Exchange conf *config.Configuration // edgeID allocator idFactory id.IDFactory @@ -74,8 +55,8 @@ type edgeManager struct { } // support for tls, mtls and tcp listening -func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer EdgeInformer, - exchange Exchange, tmr timer.Timer) (*edgeManager, error) { +func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer, + exchange api.Exchange, tmr timer.Timer) (*edgeManager, error) { listen := &conf.Edgebound.Listen var ( ln net.Listener diff --git a/pkg/servicebound/service_dataplane.go b/pkg/servicebound/service_dataplane.go index 8b476f8..f21e052 100644 --- a/pkg/servicebound/service_dataplane.go +++ b/pkg/servicebound/service_dataplane.go @@ -1,6 +1,7 @@ package servicebound import ( + "github.com/singchia/frontier/pkg/api" "github.com/singchia/geminio" "k8s.io/klog/v2" ) @@ -30,11 +31,11 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) { } // forward to exchange -func (sm *serviceManager) forward(end geminio.End) { +func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) { serviceID := end.ClientID() - service := end.Meta() + service := meta.Service klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service) if sm.exchange != nil { - sm.exchange.ForwardToService(end) + sm.exchange.ForwardToEdge(meta, end) } } diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 6780f3e..9de7f2b 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -1,17 +1,19 @@ package servicebound import ( - "context" "crypto/tls" "crypto/x509" + "encoding/json" "net" "os" "sync" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/repo/dao" + "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/frontier/pkg/security" "github.com/singchia/frontier/pkg/utils" "github.com/singchia/geminio" @@ -22,31 +24,11 @@ import ( "k8s.io/klog/v2" ) -type Servicebound interface { - ListService() []geminio.End - // for management - GetService(service string) geminio.End - DelSerivces(service string) error -} - -type ServiceInformer interface { - ServiceOnline(serviceID uint64, service string, addr net.Addr) - ServiceOffline(serviceID uint64, service string, addr net.Addr) - ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) -} - -type Exchange interface { - // rpc, message and raw io to edge - ForwardToService(geminio.End) - // stream to edge - StreamToEdge(geminio.Stream) -} - type serviceManager struct { *delegate.UnimplementedDelegate - informer ServiceInformer - exchange Exchange + informer api.ServiceInformer + exchange api.Exchange conf *config.Configuration // serviceID allocator idFactory id.IDFactory @@ -67,8 +49,8 @@ type serviceManager struct { tmr timer.Timer } -func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer ServiceInformer, - exchange Exchange, tmr timer.Timer) (*serviceManager, error) { +func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer, + exchange api.Exchange, tmr timer.Timer) (*serviceManager, error) { listen := &conf.Servicebound.Listen var ( ln net.Listener @@ -173,19 +155,40 @@ func (sm *serviceManager) handleConn(conn net.Conn) error { klog.Errorf("service manager geminio server new end err: %s", err) return err } - - // handle online event for end - if err = sm.online(end); err != nil { + meta := &api.Meta{} + err = json.Unmarshal(end.Meta(), meta) + if err != nil { + klog.Errorf("handle conn, json unmarshal err: %s", err) return err } + // register topics claim of end + sm.remoteReceiveClaim(end.ClientID(), meta.Topics) - // register methods for service - if err = end.Register(context.TODO(), "topic_claim", sm.RemoteReceiveClaim); err != nil { + // handle online event for end + if err = sm.online(end, meta); err != nil { return err } // forward and stream up to edge - sm.forward(end) + sm.forward(meta, end) + return nil +} + +func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) error { + klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", topics, serviceID) + var err error + // memdb + for _, topic := range topics { + st := &model.ServiceTopic{ + Topic: topic, + ServiceID: serviceID, + } + err = sm.dao.CreateServiceTopic(st) + if err != nil { + klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err) + return err + } + } return nil } diff --git a/pkg/servicebound/service_onoff.go b/pkg/servicebound/service_onoff.go index e0188d2..d0f588e 100644 --- a/pkg/servicebound/service_onoff.go +++ b/pkg/servicebound/service_onoff.go @@ -1,13 +1,12 @@ package servicebound import ( - "context" - "encoding/json" "net" "strconv" "time" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/geminio" @@ -15,7 +14,7 @@ import ( "k8s.io/klog/v2" ) -func (sm *serviceManager) online(end geminio.End) error { +func (sm *serviceManager) online(end geminio.End, meta *api.Meta) error { // cache var sync synchub.Sync sm.mtx.Lock() @@ -42,7 +41,7 @@ func (sm *serviceManager) online(end geminio.End) error { // memdb service := &model.Service{ ServiceID: end.ClientID(), - Service: string(end.Meta()), + Service: meta.Service, Addr: end.RemoteAddr().String(), CreateTime: time.Now().Unix(), } @@ -157,33 +156,6 @@ func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uin } } -// RemoteReceiveClaim is called by wrappered RPC -func (sm *serviceManager) RemoteReceiveClaim(ctx context.Context, req geminio.Request, rsp geminio.Response) { - // TODO return err - serviceID := req.ClientID() - - claim := &TopicClaim{} - err := json.Unmarshal(req.Data(), claim) - if err != nil { - klog.Errorf("service remote receive claim, err: %s", err) - return - } - klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", claim.Topics, serviceID) - - // memdb - for _, topic := range claim.Topics { - st := &model.ServiceTopic{ - Topic: topic, - ServiceID: serviceID, - } - err = sm.dao.CreateServiceTopic(st) - if err != nil { - klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err) - return - } - } -} - // actually the meta is service func (sm *serviceManager) GetClientID(meta []byte) (uint64, error) { // TODO