servicebound: add service apis

This commit is contained in:
singchia
2024-02-07 15:05:20 +08:00
parent 60a522ad2e
commit fbc53c6bcd
6 changed files with 158 additions and 18 deletions

View File

@@ -35,22 +35,22 @@ type RPCMessager interface {
// Stream multiplexer
type Multiplexer interface {
// Open a stream to specific service
OpenStream(service string) (geminio.Stream, error)
// Open a stream to a specific service
OpenStream(serviceName string) (geminio.Stream, error)
AcceptStream() (geminio.Stream, error)
ListStreams() []geminio.Stream
}
type Edge interface {
// Edge can direct Message or RPC
// Edge can directly Publish Message or Call RPC
RPCMessager
// Edge can manage streams from or to a Service
// Edge can manage(create, list...) streams from or to a Service
Multiplexer
// Edge is a net.Listener, actually it's wrapper of Multiplexer
// The Accept is a wrapper for AccetpStream
// The Addr is a wrapper for LocalAddr
// The Accept function is a wrapper from AccetpStream
// The Addr is a wrapper from LocalAddr
net.Listener
// Meta

View File

@@ -79,9 +79,9 @@ func (end *edgeEnd) Receive(ctx context.Context) (geminio.Message, error) {
}
// Multiplexer
func (end *edgeEnd) OpenStream(service string) (geminio.Stream, error) {
func (end *edgeEnd) OpenStream(serviceName string) (geminio.Stream, error) {
opt := options.OpenStream()
opt.SetPeer(service)
opt.SetPeer(serviceName)
return end.End.OpenStream(opt)
}

View File

@@ -1,31 +0,0 @@
package service
type OnEdgeOnline struct {
EdgeID uint64
Meta []byte
Net string
Str string
}
func (online *OnEdgeOnline) Network() string {
return online.Net
}
func (online *OnEdgeOnline) String() string {
return online.Str
}
type OnEdgeOffline struct {
EdgeID uint64
Meta []byte
Net string
Str string
}
func (offline *OnEdgeOffline) Network() string {
return offline.Net
}
func (offline *OnEdgeOffline) String() string {
return offline.Str
}

View File

@@ -5,6 +5,7 @@ import (
"net"
"github.com/singchia/geminio"
"github.com/singchia/geminio/client"
)
// RPCer is edge and it's method oriented
@@ -20,8 +21,8 @@ type RPCer interface {
type Messager interface {
NewMessage(data []byte) geminio.Message
Publish(ctx context.Context, topic string, msg geminio.Message) error
PublishAsync(ctx context.Context, topic string, msg geminio.Message, ch chan *geminio.Publish) (*geminio.Publish, error)
Publish(ctx context.Context, edgeID uint64, msg geminio.Message) error
PublishAsync(ctx context.Context, edgeID uint64, msg geminio.Message, ch chan *geminio.Publish) (*geminio.Publish, error)
Receive(ctx context.Context) (geminio.Message, error)
}
@@ -33,9 +34,9 @@ type RPCMessager interface {
// Stream multiplexer
type Multiplexer interface {
// Open a stream to specific edgeID
OpenStream(edgeID uint64) (geminio.Stream, error)
OpenStream(ctx context.Context, edgeID uint64) (geminio.Stream, error)
AcceptStream() (geminio.Stream, error)
ListStream() []geminio.Stream
ListStreams() []geminio.Stream
}
// controller functions
@@ -46,7 +47,7 @@ type EdgeOffline func(edgeID uint64, meta []byte, addr net.Addr) error
type ControlRegister interface {
RegisterGetEdgeID(ctx context.Context, getEdgeID GetEdgeID) error
RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error
RegisterEdgeOnffline(ctx context.Context, edgeOffline EdgeOffline) error
RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error
}
type Service interface {
@@ -70,6 +71,6 @@ type Service interface {
type Dialer func() (net.Conn, error)
// the service field specific the role for this Service, and then Edge can OpenStream to this service
func NewService(dialer Dialer, service string, opts ...ServiceOption) (Service, error) {
return nil, nil
func NewService(dialer Dialer, opts ...ServiceOption) (Service, error) {
return newServiceEnd(client.Dialer(dialer), opts...)
}

View File

@@ -4,9 +4,12 @@ import (
"context"
"encoding/binary"
"encoding/json"
"strconv"
"github.com/singchia/frontier/pkg/proto"
"github.com/singchia/geminio"
"github.com/singchia/geminio/client"
"github.com/singchia/geminio/options"
)
type serviceEnd struct {
@@ -28,6 +31,24 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
if sopt.logger != nil {
sopts.SetLog(sopt.logger)
}
// meta
meta := &proto.Meta{}
if sopt.topics != nil {
// we deliver topics in meta
meta.Topics = sopt.topics
}
if sopt.service != "" {
meta.Service = sopt.service
}
data, err := json.Marshal(meta)
if err != nil {
return nil, err
}
sopts.SetMeta(data)
// delegate
if sopt.delegate != nil {
sopts.SetDelegate(sopt.delegate)
}
// new geminio end
end, err := client.NewRetryEndWithDialer(dialer, sopts)
if err != nil {
@@ -53,7 +74,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE
func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error {
return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
on := &OnEdgeOnline{}
on := &proto.OnEdgeOnline{}
err := json.Unmarshal(req.Data(), on)
if err != nil {
// shouldn't be here
@@ -71,7 +92,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed
func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error {
return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
off := &OnEdgeOffline{}
off := &proto.OnEdgeOffline{}
err := json.Unmarshal(req.Data(), off)
if err != nil {
// shouldn't be here
@@ -149,3 +170,80 @@ func (service *serviceEnd) Register(ctx context.Context, method string, rpc gemi
}
return service.End.Register(ctx, method, wrap)
}
// Messager
func (service *serviceEnd) NewMessage(data []byte) geminio.Message {
return service.End.NewMessage(data)
}
func (service *serviceEnd) Publish(ctx context.Context, edgeID uint64, msg geminio.Message) error {
tail := make([]byte, 8)
binary.BigEndian.PutUint64(tail, edgeID)
custom := msg.Custom()
if custom == nil {
custom = tail
} else {
custom = append(custom, tail...)
}
msg.SetCustom(custom)
// publish real end
err := service.End.Publish(ctx, msg)
msg.SetClientID(edgeID)
// TODO we need to set EdgeID to let user know
return err
}
func (service *serviceEnd) PublishAsync(ctx context.Context, edgeID uint64, msg geminio.Message, ch chan *geminio.Publish) (*geminio.Publish, error) {
tail := make([]byte, 8)
binary.BigEndian.PutUint64(tail, edgeID)
custom := msg.Custom()
if custom == nil {
custom = tail
} else {
custom = append(custom, tail...)
}
msg.SetCustom(custom)
// publish async
pub, err := service.End.PublishAsync(ctx, msg, ch)
// TODO we need to set EdgeID to let user know
return pub, err
}
func (service *serviceEnd) Receive(ctx context.Context) (geminio.Message, error) {
msg, err := service.End.Receive(ctx)
if err != nil {
return nil, err
}
custom := msg.Custom()
if custom == nil || len(custom) < 8 {
// shoudn't be here
return msg, nil
}
edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:])
custom = custom[:custom[len(custom)-8]]
msg.SetClientID(edgeID)
msg.SetCustom(custom)
return msg, nil
}
// Multiplexer
func (service *serviceEnd) OpenStream(ctx context.Context, edgeID uint64) (geminio.Stream, error) {
id := strconv.FormatUint(edgeID, 10)
opt := options.OpenStream()
opt.SetPeer(id)
return service.End.OpenStream(opt)
}
func (service *serviceEnd) AcceptStream() (geminio.Stream, error) {
return service.End.AcceptStream()
}
func (service *serviceEnd) ListStreams() []geminio.Stream {
return service.End.ListStreams()
}
func (service *serviceEnd) Close() error {
return service.End.Close()
}

View File

@@ -3,15 +3,24 @@ package service
import (
"github.com/jumboframes/armorigo/log"
"github.com/singchia/geminio/delegate"
"github.com/singchia/go-timer/v2"
)
type Logger log.Logger
type Timer timer.Timer
type Delegate delegate.ClientDelegate
type serviceOption struct {
logger Logger
tmr Timer
// to tell frontier which topics to receive, default no receiving
topics []string
// to tell frontier what service we are
service string
// delegate to know online offline stuff
delegate Delegate
}
type ServiceOption func(*serviceOption)
@@ -28,3 +37,22 @@ func OptionServiceTimer(tmr Timer) ServiceOption {
opt.tmr = tmr
}
}
func OptionServiceReceiveTopics(topics []string) ServiceOption {
return func(opt *serviceOption) {
opt.topics = topics
}
}
func OptionServiceName(service string) ServiceOption {
return func(opt *serviceOption) {
opt.service = service
}
}
// delegations for the service own connection, streams and more
func OptionServiceDelegate(delegate Delegate) ServiceOption {
return func(opt *serviceOption) {
opt.delegate = delegate
}
}