mq: add service type mq

This commit is contained in:
singchia
2024-02-18 13:51:45 +08:00
parent 94483c254f
commit 16cbb7780b
8 changed files with 236 additions and 14 deletions

View File

@@ -4,4 +4,5 @@ import "errors"
var ( var (
ErrEdgeNotOnline = errors.New("edge not online") ErrEdgeNotOnline = errors.New("edge not online")
ErrTopicNotOnline = errors.New("topic not online")
) )

View File

@@ -47,9 +47,21 @@ type ServiceInformer interface {
ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) ServiceHeartbeat(serviceID uint64, service string, addr net.Addr)
} }
// mq related // mq manager and mq related
type MQM interface {
// MQM is a MQ wrapper
MQ
AddMQ(topics []string, mq MQ)
AddMQByEnd(topics []string, end geminio.End)
DelMQ(mq MQ)
DelMQByEnd(end geminio.End)
GetMQ(topic string) MQ
GetMQs(topic string) []MQ
}
type MQ interface { type MQ interface {
Produce(topic string, data []byte, opts ...OptionProduce) error Produce(topic string, data []byte, opts ...OptionProduce) error
Close() error
} }
type ProduceOption struct { type ProduceOption struct {

View File

@@ -66,6 +66,14 @@ type Servicebound struct {
Listen Listen `yaml:"listen"` Listen Listen `yaml:"listen"`
} }
// message queue
type MQ struct {
BroadCast bool `yaml:"broadcast"`
}
// exchange
type Exchange struct{}
type Log struct { type Log struct {
LogDir string `yaml:"log_dir"` LogDir string `yaml:"log_dir"`
LogFile string `yaml:"log_file"` LogFile string `yaml:"log_file"`

View File

@@ -7,5 +7,5 @@ import (
type exchange struct { type exchange struct {
Edgebound api.Edgebound Edgebound api.Edgebound
Servicebound api.Servicebound Servicebound api.Servicebound
MQ api.MQ MQM api.MQM
} }

View File

@@ -162,7 +162,7 @@ func (ex *exchange) forwardMessageToService(end geminio.End) {
} }
topic := msg.Topic() topic := msg.Topic()
// TODO seperate async and sync produce // TODO seperate async and sync produce
err = ex.MQ.Produce(topic, msg.Data(), api.WithOrigin(msg), api.WithEdgeID(edgeID)) err = ex.MQM.Produce(topic, msg.Data(), api.WithOrigin(msg), api.WithEdgeID(edgeID))
if err != nil { if err != nil {
klog.Errorf("forward message, produce err: %s, edgeID: %d", err, edgeID) klog.Errorf("forward message, produce err: %s, edgeID: %d", err, edgeID)
msg.Error(err) msg.Error(err)

View File

@@ -2,27 +2,171 @@ package mq
import ( import (
"sync" "sync"
"sync/atomic"
"github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/config"
"github.com/singchia/geminio"
"k8s.io/klog/v2"
) )
type mq interface {
api.MQ
Close() error
}
type mqManager struct { type mqManager struct {
mtx sync.RWMutex
conf *config.Configuration conf *config.Configuration
// mqs // mqs
mqs map[string][]mq mtx sync.RWMutex
mqs map[string][]api.MQ // key: topic, value: mqs
mqindex map[string]*uint64 // for round robin
} }
func newMQManager(conf *config.Configuration) (*mqManager, error) { func newMQManager(conf *config.Configuration) (api.MQM, error) {
mqm := &mqManager{ mqm := &mqManager{
mqs: make(map[string][]mq), mqs: make(map[string][]api.MQ),
conf: conf, conf: conf,
} }
return mqm, nil return mqm, nil
} }
func (mqm *mqManager) AddMQ(topics []string, mq api.MQ) {
mqm.mtx.Lock()
defer mqm.mtx.Unlock()
for _, topic := range topics {
mqs, ok := mqm.mqs[topic]
if !ok {
klog.V(6).Infof("mq manager, add topic: %s mq succeed", topic)
mqm.mqs[topic] = []api.MQ{mq}
mqm.mqindex[topic] = new(uint64)
continue
}
for _, exist := range mqs {
if exist == mq {
klog.V(5).Infof("mq manager, add topic: %s mq existed", topic)
continue
}
// special handle for service, a deep comparison
left, ok := exist.(*mqService)
if ok {
right, ok := mq.(*mqService)
if ok && left.end == right.end {
klog.V(5).Infof("mq manager, add topic: %s service mq existed", topic)
continue
}
}
}
mqs = append(mqs, mq)
mqm.mqs[topic] = mqs
klog.V(6).Infof("mq mqnager, add topic: %s mq succeed", topic)
}
}
func (mqm *mqManager) AddMQByEnd(topics []string, end geminio.End) {
mq := NewMQServiceFromEnd(end)
mqm.AddMQ(topics, mq)
}
func (mqm *mqManager) DelMQ(mq api.MQ) {
mqm.mtx.Lock()
defer mqm.mtx.Unlock()
for topic, mqs := range mqm.mqs {
news := []api.MQ{}
for _, exist := range mqs {
if exist == mq {
klog.V(6).Infof("mq manager, del topic: %s mq succeed", topic)
continue
}
news = append(news, exist)
}
if len(news) == 0 {
// delete array of this topic
delete(mqm.mqs, topic)
delete(mqm.mqindex, topic)
continue
}
mqm.mqs[topic] = news
}
}
// special handle for service, a deep comparison
func (mqm *mqManager) DelMQByEnd(end geminio.End) {
mqm.mtx.Lock()
defer mqm.mtx.Unlock()
for topic, mqs := range mqm.mqs {
news := []api.MQ{}
for _, exist := range mqs {
left, ok := exist.(*mqService)
if ok {
if ok && left.end == end {
klog.V(6).Infof("mq manager, del topic: %s service mq succeed", topic)
continue
}
}
news = append(news, exist)
}
if len(news) == 0 {
delete(mqm.mqs, topic)
delete(mqm.mqindex, topic)
continue
}
mqm.mqs[topic] = news
}
}
func (mqm *mqManager) GetMQ(topic string) api.MQ {
mqm.mtx.RLock()
defer mqm.mtx.RUnlock()
mqs, ok := mqm.mqs[topic]
if !ok {
return nil
}
index := mqm.mqindex[topic]
newindex := atomic.AddUint64(index, 1)
i := newindex % uint64(len(mqs))
return mqs[i]
}
func (mqm *mqManager) GetMQs(topic string) []api.MQ {
mqm.mtx.RLock()
defer mqm.mtx.RUnlock()
return mqm.mqs[topic]
}
func (mqm *mqManager) Produce(topic string, data []byte, opts ...api.OptionProduce) error {
mq := mqm.GetMQ(topic)
if mq == nil {
mq = mqm.GetMQ("*")
if mq == nil {
err := api.ErrTopicNotOnline
klog.Errorf("mq manager, get mq nil, err: %s", err)
return err
}
}
err := mq.Produce(topic, data, opts...)
if err != nil {
klog.Errorf("mq manager, produce topic: %s message err: %s", topic, err)
return err
}
klog.V(6).Infof("mq manager, produce topic: %s message succeed", topic)
return nil
}
func (mqm *mqManager) Close() error {
mqm.mtx.RLock()
defer mqm.mtx.RUnlock()
var reterr error
for topic, mqs := range mqm.mqs {
for _, mq := range mqs {
err := mq.Close()
if err != nil {
klog.Errorf("mq manager, close mq err: %s, topic: %s", err, topic)
reterr = err
}
}
}
return reterr
}

53
pkg/mq/mq_service.go Normal file
View File

@@ -0,0 +1,53 @@
package mq
import (
"context"
"encoding/binary"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"github.com/singchia/geminio/options"
"k8s.io/klog/v2"
)
type mqService struct {
end geminio.End
}
func NewMQServiceFromEnd(end geminio.End) api.MQ {
return &mqService{end}
}
func (mq *mqService) Produce(topic string, data []byte, opts ...api.OptionProduce) error {
opt := &api.ProduceOption{}
for _, fun := range opts {
fun(opt)
}
msg := opt.Origin.(geminio.Message)
edgeID := opt.EdgeID
tail := make([]byte, 8)
binary.BigEndian.PutUint64(tail, edgeID)
// we record the edgeID to service
custom := msg.Custom()
if custom == nil {
custom = tail
} else {
custom = append(custom, tail...)
}
// new message
mopt := options.NewMessage()
mopt.SetCustom(custom)
mopt.SetTopic(topic)
newmsg := mq.end.NewMessage(data, mopt)
err := mq.end.Publish(context.TODO(), newmsg)
if err != nil {
klog.Errorf("mq service, publish err: %s, edgeID: %d, serviceID: %d", err, edgeID, mq.end.ClientID())
return err
}
return nil
}
func (mq *mqService) Close() error {
return nil
}

View File

@@ -26,10 +26,12 @@ import (
type serviceManager struct { type serviceManager struct {
*delegate.UnimplementedDelegate *delegate.UnimplementedDelegate
conf *config.Configuration
informer api.ServiceInformer informer api.ServiceInformer
exchange api.Exchange exchange api.Exchange
conf *config.Configuration mqm api.MQM
// serviceID allocator // serviceID allocator
idFactory id.IDFactory idFactory id.IDFactory
shub *synchub.SyncHub shub *synchub.SyncHub
@@ -163,6 +165,8 @@ func (sm *serviceManager) handleConn(conn net.Conn) error {
} }
// register topics claim of end // register topics claim of end
sm.remoteReceiveClaim(end.ClientID(), meta.Topics) sm.remoteReceiveClaim(end.ClientID(), meta.Topics)
// add the end to MQM
sm.mqm.AddMQByEnd(meta.Topics, end)
// handle online event for end // handle online event for end
if err = sm.online(end, meta); err != nil { if err = sm.online(end, meta); err != nil {