servicebound: add service manager test

This commit is contained in:
singchia
2024-02-08 10:17:40 +08:00
parent fbc53c6bcd
commit b91ced2077
4 changed files with 44 additions and 15 deletions

View File

@@ -39,7 +39,7 @@ type Multiplexer interface {
ListStreams() []geminio.Stream
}
// controller functions
// Controller functions
type GetEdgeID func(meta []byte) (uint64, error)
type EdgeOnline func(edgeID uint64, meta []byte, addr net.Addr) error
type EdgeOffline func(edgeID uint64, meta []byte, addr net.Addr) error
@@ -50,6 +50,7 @@ type ControlRegister interface {
RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error
}
// Service
type Service interface {
// Service can direct Message or RPC
RPCMessager

View File

@@ -30,12 +30,12 @@ func TestEdgeManager(t *testing.T) {
return
}
h := &handler{
inf := &informer{
wg: new(sync.WaitGroup),
}
h.wg.Add(2)
inf.wg.Add(2)
// edge manager
em, err := newEdgeManager(conf, dao, h, nil, timer.NewTimer())
em, err := newEdgeManager(conf, dao, inf, nil, timer.NewTimer())
if err != nil {
t.Error(err)
return
@@ -53,20 +53,20 @@ func TestEdgeManager(t *testing.T) {
return
}
edge.Close()
h.wg.Wait()
inf.wg.Wait()
// if the test failed, it will timeout
}
type handler struct {
type informer struct {
wg *sync.WaitGroup
}
func (h *handler) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) {
h.wg.Done()
func (inf *informer) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) {
inf.wg.Done()
}
func (h *handler) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) {
h.wg.Done()
func (inf *informer) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) {
inf.wg.Done()
}
func (h *handler) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {}
func (inf *informer) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {}

View File

@@ -83,6 +83,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer Servic
streams: mapmap.NewMapMap(),
dao: dao,
shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)),
services: make(map[uint64]geminio.End),
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
// a simple unix timestamp incremental id factory
idFactory: id.DefaultIncIDCounter,

View File

@@ -2,14 +2,16 @@ package servicebound
import (
"net"
"sync"
"testing"
"github.com/singchia/frontier/api/v1/service"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/go-timer/v2"
)
func TestServiceManagerStream(t *testing.T) {
func TestServiceManager(t *testing.T) {
network := "tcp"
addr := "0.0.0.0:1202"
@@ -26,8 +28,12 @@ func TestServiceManagerStream(t *testing.T) {
t.Error(err)
return
}
inf := &informer{
wg: new(sync.WaitGroup),
}
inf.wg.Add(2)
// service manager
sm, err := newServiceManager(conf, dao, nil, nil, timer.NewTimer())
sm, err := newServiceManager(conf, dao, inf, nil, timer.NewTimer())
if err != nil {
t.Error(err)
return
@@ -36,8 +42,29 @@ func TestServiceManagerStream(t *testing.T) {
go sm.Serve()
// service
_ = func() (net.Conn, error) {
dialer := func() (net.Conn, error) {
return net.Dial(network, addr)
}
service, err := service.NewService(dialer)
if err != nil {
t.Error(err)
return
}
service.Close()
inf.wg.Wait()
// if the test failed, it will timeout
}
type informer struct {
wg *sync.WaitGroup
}
func (inf *informer) ServiceOnline(serviceID uint64, service string, addr net.Addr) {
inf.wg.Done()
}
func (inf *informer) ServiceOffline(serviceID uint64, service string, addr net.Addr) {
inf.wg.Done()
}
func (inf *informer) ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) {}