add example of interactive command-line messaging

This commit is contained in:
singchia
2024-02-20 20:17:24 +08:00
parent d37c5548d4
commit f027597960
21 changed files with 1057 additions and 63 deletions

View File

@@ -59,7 +59,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
// Control Register
func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetEdgeID) error {
return service.End.Register(ctx, "get_edge_id", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
return service.End.Register(ctx, api.RPCGetEdgeID, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
id, err := getEdgeID(req.Data())
if err != nil {
// we just deliver the err back
@@ -73,7 +73,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE
}
func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error {
return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
return service.End.Register(ctx, api.RPCEdgeOnline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
on := &api.OnEdgeOnline{}
err := json.Unmarshal(req.Data(), on)
if err != nil {
@@ -91,7 +91,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed
}
func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error {
return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
return service.End.Register(ctx, api.RPCEdgeOffline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
off := &api.OnEdgeOffline{}
err := json.Unmarshal(req.Data(), off)
if err != nil {

View File

@@ -20,24 +20,33 @@ func main() {
klog.Errorf("parse flags err: %s", err)
return
}
klog.Infof("frontier starts")
defer func() {
klog.Infof("frontier ends")
klog.Flush()
}()
// dao
dao, err := dao.NewDao(conf)
if err != nil {
klog.Errorf("new dao err: %s", err)
return
}
klog.V(5).Infof("new dao succeed")
// mqm
mqm, err := mq.NewMQM(conf)
if err != nil {
klog.Errorf("new mq manager err: %s", err)
return
}
klog.V(5).Infof("new mq manager succeed")
// exchange
exchange, err := exchange.NewExchange(conf)
if err != nil {
klog.Errorf("new exchange err: %s", err)
return
}
klog.V(5).Infof("new exchange succeed")
tmr := timer.NewTimer()
// servicebound
@@ -46,7 +55,8 @@ func main() {
klog.Errorf("new servicebound err: %s", err)
return
}
servicebound.Serve()
go servicebound.Serve()
klog.V(5).Infof("new servicebound succeed")
// edgebound
edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr)
@@ -54,10 +64,12 @@ func main() {
klog.Errorf("new edgebound err: %s", err)
return
}
edgebound.Serve()
go edgebound.Serve()
klog.V(5).Infof("new edgebound succeed")
sig := sigaction.NewSignal()
sig.Wait(context.TODO())
edgebound.Close()
servicebound.Close()
}

9
examples/Makefile Normal file
View File

@@ -0,0 +1,9 @@
GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)
.PHONY: all
all: iclm
.PHONY: iclm
iclm:
make -C iclm

17
examples/iclm/Makefile Normal file
View File

@@ -0,0 +1,17 @@
GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)
.PHONY: all
all: iclm_service iclm_edge
.PHONY: clean
clean:
rm iclm_service iclm_edge
iclm_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o iclm_service service/*.go
iclm_edge: edge/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o iclm_edge edge/*.go

2
examples/iclm/README.md Normal file
View File

@@ -0,0 +1,2 @@
## Iteractive command-line messaging

333
examples/iclm/edge/edge.go Normal file
View File

@@ -0,0 +1,333 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/singchia/frontier/api/v1/edge"
"github.com/spf13/pflag"
armlog "github.com/jumboframes/armorigo/log"
"github.com/singchia/geminio"
)
var (
sns sync.Map
methodSlice []string
)
type LabelData struct {
Label string `json:"label"`
Data []byte `json:"data"`
}
func main() {
methodSlice = []string{}
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:4004", "address to dial")
loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error")
meta := pflag.String("meta", "test", "meta to set on connection")
methods := pflag.String("methods", "", "method name, support echo, calculate")
label := pflag.String("label", "label-01", "label to message or rpc")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
// log
level, err := armlog.ParseLevel(*loglevel)
if err != nil {
fmt.Println("parse log level err:", err)
return
}
armlog.SetLevel(level)
armlog.SetOutput(os.Stdout)
// get edge
cli, err := edge.NewEdge(dialer,
edge.OptionEdgeLog(armlog.DefaultLog), edge.OptionEdgeMeta([]byte(*meta)))
if err != nil {
armlog.Info("new edge err:", err)
return
}
//sms := cli.ListStreams()
//sns.Store("1", sms[0])
if *methods != "" {
methodSlice = strings.Split(*methods, ",")
}
// receive on edge
go func() {
for {
msg, err := cli.Receive(context.TODO())
if err == io.EOF {
return
}
if err != nil {
fmt.Println("> receive err:", err)
fmt.Println(">>> ")
continue
}
msg.Done()
fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data()))
fmt.Print(">>> ")
}
}()
go func() {
for {
st, err := cli.AcceptStream()
if err == io.EOF {
return
} else if err != nil {
fmt.Println("> accept stream err:", err)
fmt.Print(">>> ")
continue
}
fmt.Println("> accept stream", st.StreamID())
fmt.Print(">>> ")
sns.Store(strconv.FormatUint(st.StreamID(), 10), st)
go handleStream(st)
}
}()
go func() {
time.Sleep(200 * time.Millisecond)
// register
for _, method := range methodSlice {
switch method {
case "echo":
err = cli.Register(context.TODO(), "echo", echo)
if err != nil {
armlog.Info("> register echo err:", err)
return
}
}
}
}()
cursor := "1"
fmt.Print(">>> ")
// the command-line protocol
// 1. close
// 2. quit
// 3. switch {streamID}
// 4. open {service}
// 5. close {streamID}
// 6. publish {msg} #note to switch to stream first
// 7. publish {topic} {msg}
// 8. call {method} {req}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
text := scanner.Text()
parts := strings.Split(text, " ")
switch len(parts) {
case 1:
// 1. close
// 2. quit
if parts[0] == "help" {
fmt.Println(`the cli protocol
1. close
2. quit
3. open {service}
4. close {streamID}
5. switch {streamID}
6. publish {msg} #note to switch to stream first
7. publish {topic} {msg}
8. call {method} {req}`)
goto NEXT
}
if parts[0] == "quit" || parts[0] == "close" {
cli.Close()
goto END
}
case 2:
// 1. open {service}
// 2. close {streamID}
// 3. switch {streamID}
// 4. publish {msg}
if parts[0] == "open" {
service := parts[1]
st, err := cli.OpenStream(service)
if err != nil {
fmt.Println("> open stream err:", err)
goto NEXT
}
fmt.Println("> open stream success:", st.StreamID())
sns.Store(strconv.FormatUint(st.StreamID(), 10), st)
go handleStream(st)
goto NEXT
}
if parts[0] == "close" {
// close sessionID
session := parts[1]
sn, ok := sns.LoadAndDelete(session)
if !ok {
fmt.Printf("> stream id: %s not found\n", session)
goto NEXT
}
sn.(geminio.Stream).Close()
fmt.Println("> close stream success:", session)
goto NEXT
}
if parts[0] == "switch" {
session := parts[1]
if session == "1" {
cursor = session
fmt.Println("> swith stream success:", session)
goto NEXT
}
_, ok := sns.Load(session)
if !ok {
fmt.Println("> swith stream failed, not found:", session)
goto NEXT
}
cursor = session
fmt.Println("> swith stream success:", session)
goto NEXT
}
if cursor != "1" && (parts[0] == "publish") {
sn, ok := sns.Load(cursor)
if !ok {
fmt.Printf("> stream: %s not found\n", cursor)
goto NEXT
}
if parts[0] == "publish" {
ld := &LabelData{
Label: *label,
Data: []byte(parts[1]),
}
data, _ := json.Marshal(ld)
msg := cli.NewMessage(data)
err := sn.(geminio.Stream).Publish(context.TODO(), msg)
if err != nil {
fmt.Println("> publish err:", err)
goto NEXT
}
fmt.Println("> publish success")
goto NEXT
}
}
case 3:
// 1. publish {topic} {msg}
// 2. call {method} {req}
if cursor != "1" {
// in stream
sn, ok := sns.Load(cursor)
if !ok {
fmt.Printf("> stream: %s not found\n", cursor)
goto NEXT
}
if parts[0] == "call" {
req := cli.NewRequest([]byte(parts[2]))
rsp, err := sn.(geminio.Stream).Call(context.TODO(), string(parts[1]), req)
if err != nil {
fmt.Println("> call err:", err)
goto NEXT
}
fmt.Println("> call success, ret:", string(rsp.Data()))
goto NEXT
}
}
if parts[0] == "publish" {
ld := &LabelData{
Label: *label,
Data: []byte(parts[2]),
}
data, _ := json.Marshal(ld)
msg := cli.NewMessage(data)
err := cli.Publish(context.TODO(), string(parts[1]), msg)
if err != nil {
fmt.Println("> publish err:", err)
goto NEXT
}
fmt.Println("> publish success")
goto NEXT
}
if parts[0] == "call" {
ld := &LabelData{
Label: *label,
Data: []byte(parts[2]),
}
data, _ := json.Marshal(ld)
req := cli.NewRequest(data)
rsp, err := cli.Call(context.TODO(), string(parts[1]), req)
if err != nil {
fmt.Println("> call err:", err)
goto NEXT
}
fmt.Println("> call success, ret:", string(rsp.Data()))
goto NEXT
}
}
fmt.Println("> illegal operation")
NEXT:
if cursor != "1" {
fmt.Printf("[%20s] >>> ", cursor)
} else {
fmt.Print(">>> ")
}
}
END:
time.Sleep(time.Second)
}
func handleStream(stream geminio.Stream) {
go func() {
for {
msg, err := stream.Receive(context.TODO())
if err != nil {
fmt.Println("> receive err:", err)
fmt.Print(">>> ")
return
}
msg.Done()
fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data()))
fmt.Print(">>> ")
}
}()
go func() {
for {
data := make([]byte, 1024)
_, err := stream.Read(data)
if err != nil {
fmt.Println("> read err:", err)
fmt.Print(">>> ")
return
}
fmt.Println("> read data:", stream.ClientID(),
string(data))
fmt.Print(">>> ")
}
}()
go func() {
time.Sleep(200 * time.Millisecond)
for _, method := range methodSlice {
switch method {
case "echo":
err := stream.Register(context.TODO(), "echo", echo)
if err != nil {
armlog.Info("> register echo err:", err)
return
}
}
}
}()
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
fmt.Println("\ncall > ", req.ClientID(), req.StreamID(), string(req.Data()))
fmt.Print(">>> ")
rsp.SetData(req.Data())
}

View File

@@ -0,0 +1,469 @@
package main
import (
"bufio"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
armlog "github.com/jumboframes/armorigo/log"
"github.com/singchia/frontier/api/v1/service"
"github.com/singchia/geminio"
"github.com/spf13/pflag"
)
var (
edgeID uint64
edgeIDs sync.Map
edges sync.Map
sns sync.Map
methodSlice []string
topicSlice []string
printmessage *bool
srv service.Service
labels map[string]int64 = map[string]int64{}
labelsMtx sync.RWMutex
)
func addLabel(label string, delta int64) {
labelsMtx.Lock()
counter, ok := labels[label]
if ok {
counter += delta
labels[label] = counter
} else {
labels[label] = delta
}
labelsMtx.Unlock()
}
func printLabel() {
labelsMtx.RLock()
defer labelsMtx.RUnlock()
for label, counter := range labels {
fmt.Printf("label: %s, counter: %d\n", label, counter)
}
}
type LabelData struct {
Label string `json:"label"`
Data []byte `json:"data"`
}
func main() {
methodSlice = []string{}
runtime.SetCPUProfileRate(10000)
go func() {
http.ListenAndServe("0.0.0.0:6062", nil)
}()
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:4003", "address to dial")
loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error")
serviceName := pflag.String("service", "foo", "service name")
topics := pflag.String("topics", "", "topics to receive message, empty means without consuming")
methods := pflag.String("methods", "", "method name, support echo, calculate")
printmessage = pflag.Bool("printmessage", true, "whether print message out")
stats := pflag.Bool("stats", false, "print statistics or not")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
// log
level, err := armlog.ParseLevel(*loglevel)
if err != nil {
fmt.Println("parse log level err:", err)
return
}
armlog.SetLevel(level)
armlog.SetOutput(os.Stdout)
// get service
opt := []service.ServiceOption{service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName)}
if *topics != "" {
topicSlice = strings.Split(*topics, ",")
opt = append(opt, service.OptionServiceReceiveTopics(topicSlice))
}
srv, err = service.NewService(dialer, opt...)
if err != nil {
log.Println("new end err:", err)
return
}
// pre register methods
if *methods != "" {
methodSlice = strings.Split(*methods, ",")
for _, method := range methodSlice {
switch method {
case "echo":
err = srv.Register(context.TODO(), "echo", echo)
if err != nil {
log.Println("> register echo err:", err)
return
}
}
}
}
// pre register functions for edges events
err = srv.RegisterGetEdgeID(context.TODO(), getID)
if err != nil {
log.Println("> end register getID err:", err)
return
}
err = srv.RegisterEdgeOnline(context.TODO(), online)
if err != nil {
log.Println("> end register online err:", err)
return
}
err = srv.RegisterEdgeOffline(context.TODO(), offline)
if err != nil {
log.Println("> end register offline err:", err)
return
}
// label counter
if *stats {
go func() {
ticker := time.NewTicker(time.Second)
for {
<-ticker.C
printLabel()
}
}()
}
// service receive
go func() {
for {
msg, err := srv.Receive(context.TODO())
if err == io.EOF {
return
}
if err != nil {
fmt.Println("> receive err:", err)
fmt.Print(">>> ")
continue
}
msg.Done()
value := msg.Data()
ld := &LabelData{}
err = json.Unmarshal(value, ld)
if err == nil {
addLabel(string(ld.Label), 1)
value = ld.Data
}
if *printmessage {
fmt.Printf("> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
fmt.Print(">>> ")
}
}
}()
// service accept stream
go func() {
for {
st, err := srv.AcceptStream()
if err == io.EOF {
return
} else if err != nil {
fmt.Println("> accept stream err:", err)
continue
}
fmt.Println("> accept stream", st.ClientID(), st.StreamID())
sns.Store(strconv.FormatUint(st.StreamID(), 10), st)
go handleStream(st)
}
}()
cursor := "1"
fmt.Print(">>> ")
// the command-line protocol
// 1. close
// 2. quit
// 3. open {edgeID}
// 4. close {streamID}
// 5. switch {streamID}
// 6. publish {msg} #note to switch to stream first
// 7. publish {edgeID} {msg}
// 8. call {method} {req} #note to switch to stream first
// 9. call {edgeID} {method} {req}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
text := scanner.Text()
parts := strings.Split(text, " ")
switch len(parts) {
case 1:
if parts[0] == "help" {
fmt.Println(`the command-line protocol
1. close
2. quit
3. open {clientID}
4. close {streamID}
5. switch {streamID}
6. publish {msg} #note to switch to stream first
7. publish {clientId} {msg}
8. call {method} {req} #note to switch to stream first
9. call {clientId} {method} {req}`)
goto NEXT
}
// 1. close
if parts[0] == "close" || parts[0] == "quit" {
srv.Close()
goto END
}
if parts[0] == "count" {
count := 0
edges.Range(func(key, value interface{}) bool {
count++
return true
})
fmt.Println("> count:", count)
goto NEXT
}
case 2:
// 1. open {edgeID}
// 2. close {streamID}
// 3. switch {streamID}
// 4. publish {msg}
if parts[0] == "open" {
edgeID, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
fmt.Println("> illegal edgeID", err, parts[1])
goto NEXT
}
// 1. open edgeID
st, err := srv.OpenStream(context.TODO(), edgeID)
if err != nil {
fmt.Println("> open stream err", err)
goto NEXT
}
fmt.Println("> open stream success:", edgeID, st.StreamID())
sns.Store(strconv.FormatUint(st.StreamID(), 10), st)
go handleStream(st)
goto NEXT
}
if parts[0] == "close" {
stream := parts[1]
sn, ok := sns.LoadAndDelete(stream)
if !ok {
fmt.Printf("> stream id: %s not found\n", stream)
goto NEXT
}
sn.(geminio.Stream).Close()
fmt.Println("> close stream success:", stream)
goto NEXT
}
if parts[0] == "switch" {
session := parts[1]
if session == "1" {
cursor = session
fmt.Println("> swith stream success:", session)
goto NEXT
}
_, ok := sns.Load(session)
if !ok {
fmt.Println("> swith stream failed, not found:", session)
goto NEXT
}
cursor = session
fmt.Println("> swith stream success:", session)
goto NEXT
}
if cursor != "1" && (parts[0] == "publish") {
sn, ok := sns.Load(cursor)
if !ok {
fmt.Printf("> stream: %s not found\n", cursor)
goto NEXT
}
if parts[0] == "publish" {
msg := srv.NewMessage([]byte(parts[1]))
err := sn.(geminio.Stream).Publish(context.TODO(), msg)
if err != nil {
fmt.Println("> publish err:", err)
goto NEXT
}
fmt.Println("> publish success")
goto NEXT
}
}
case 3:
// 1. publish {edgeID} {msg}
// 2. call {method} {req} if switch to stream
if cursor != "1" {
// in stream
sn, ok := sns.Load(cursor)
if !ok {
fmt.Printf("> stream: %s not found\n", cursor)
goto NEXT
}
if parts[0] == "call" {
req := srv.NewRequest([]byte(parts[2]))
rsp, err := sn.(geminio.Stream).Call(context.TODO(), string(parts[1]), req)
if err != nil {
fmt.Println("> call err:", err)
goto NEXT
}
fmt.Println("> call success, ret:", string(rsp.Data()))
goto NEXT
}
}
if parts[0] == "publish" {
edgeID, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
log.Println("> illegal edge id", err, parts[1])
goto NEXT
}
msg := srv.NewMessage([]byte(parts[2]))
err = srv.Publish(context.TODO(), edgeID, msg)
if err != nil {
log.Println("> publish err:", err)
goto NEXT
}
fmt.Println("> publish success")
goto NEXT
}
case 4:
// call {edgeID} {method} {req}
if parts[0] == "call" {
edgeID, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
log.Println("> illegal edge id", err, parts[1])
goto NEXT
}
req := srv.NewRequest([]byte(parts[3]))
rsp, err := srv.Call(context.TODO(), edgeID, parts[2], req)
if err != nil {
log.Println("> call err:", err)
goto NEXT
}
log.Println("> call success, ret:", string(rsp.Data()))
goto NEXT
}
}
log.Println("illegal operation")
NEXT:
if cursor != "1" {
fmt.Printf("[%20s] >>> ", cursor)
} else {
fmt.Print(">>> ")
}
}
END:
time.Sleep(10 * time.Second)
}
func handleStream(stream geminio.Stream) {
go func() {
for {
msg, err := stream.Receive(context.TODO())
if err != nil {
log.Println("> receive err:", err)
return
}
msg.Done()
value := msg.Data()
ld := &LabelData{}
err = json.Unmarshal(value, ld)
if err == nil {
addLabel(string(ld.Label), 1)
value = ld.Data
}
if *printmessage {
edgeID := binary.BigEndian.Uint64(msg.Custom())
fmt.Println("> receive msg:", edgeID, msg.StreamID(), string(value))
fmt.Print(">>> ")
}
}
}()
go func() {
for {
data := make([]byte, 1024)
_, err := stream.Read(data)
if err != nil {
log.Println("> read err:", err)
return
}
fmt.Println("> read data:", stream.ClientID(),
string(data))
fmt.Print(">>> ")
}
}()
go func() {
time.Sleep(200 * time.Millisecond)
for _, method := range methodSlice {
switch method {
case "echo":
err := stream.Register(context.TODO(), "echo", echo)
if err != nil {
log.Println("> register echo err:", err)
return
}
}
}
}()
}
func snID(edgeID uint64, streamID uint64) string {
return strconv.FormatUint(edgeID, 10) + "-" + strconv.FormatUint(streamID, 10)
}
func pickedge() uint64 {
var edgeID uint64
edges.Range(func(key, value interface{}) bool {
// TODO 先返回第一个
edgeID = key.(uint64)
return false
})
return edgeID
}
func getID(meta []byte) (uint64, error) {
id := uint64(time.Now().UnixMicro())
//id := atomic.AddUint64(&edgeID, 1)
edgeIDs.Store(string(meta), id)
return id, nil
}
func online(edgeID uint64, meta []byte, addr net.Addr) error {
fmt.Printf("\n> online, edgeID: %d, addr: %s\n", edgeID, addr.String())
fmt.Print(">>> ")
edges.Store(edgeID, struct{}{})
return nil
}
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
fmt.Printf("\n> offline, edgeID: %d, addr: %s\n", edgeID, addr.String())
fmt.Print(">>> ")
edges.Delete(edgeID)
return nil
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
value := req.Data()
ld := &LabelData{}
err := json.Unmarshal(value, ld)
if err == nil {
addLabel(string(ld.Label), 1)
value = ld.Data
}
fmt.Println("\ncall > ", req.ClientID(), req.StreamID(), string(value))
fmt.Print(">>> ")
rsp.SetData(value)
}

4
go.mod
View File

@@ -2,6 +2,10 @@ module github.com/singchia/frontier
go 1.20
replace (
github.com/singchia/geminio => ../../moresec/singchia/geminio
)
require (
github.com/jumboframes/armorigo v0.3.0
github.com/singchia/geminio v1.1.2

View File

@@ -1,8 +1,20 @@
package api
import "errors"
import (
"errors"
"gorm.io/gorm"
)
var (
ErrEdgeNotOnline = errors.New("edge not online")
ErrServiceNotOnline = errors.New("service not online")
ErrRPCNotOnline = errors.New("rpc not online")
ErrTopicNotOnline = errors.New("topic not online")
ErrIllegalEdgeID = errors.New("illegal edgeID")
ErrRecordNotFound = gorm.ErrRecordNotFound
)
var (
ErrStrUseOfClosedConnection = "use of closed network connection"
)

View File

@@ -7,10 +7,16 @@ import (
)
type Exchange interface {
// For Service
// rpc, message and raw io to edge
ForwardToEdge(*Meta, geminio.End)
// stream to edge
// TODO StreamToEdge(geminio.Stream)
// For Edge
GetEdgeID(meta []byte) (uint64, error) // get EdgeID for edge
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
@@ -33,12 +39,6 @@ type Edgebound interface {
Close() error
}
type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}
// service related
type Servicebound interface {
ListService() []geminio.End
@@ -52,6 +52,12 @@ type Servicebound interface {
Close() error
}
// informer
type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}
type ServiceInformer interface {
ServiceOnline(serviceID uint64, service string, addr net.Addr)
ServiceOffline(serviceID uint64, service string, addr net.Addr)

View File

@@ -1,6 +1,13 @@
package api
// frontier -> service
// global rpcs
var (
RPCGetEdgeID = "get_edge_id"
RPCEdgeOnline = "edge_online"
RPCEdgeOffline = "edge_offline"
)
type OnEdgeOnline struct {
EdgeID uint64
Meta []byte
@@ -33,11 +40,7 @@ func (offline *OnEdgeOffline) String() string {
}
// service -> frontier
type ReceiveClaim struct {
Topics []string
}
// service -> frontier
// meta carried when service inited
type Meta struct {
Service string
Topics []string

View File

@@ -2,11 +2,12 @@ package config
import (
"flag"
"fmt"
"io"
"os"
"strconv"
armio "github.com/jumboframes/armorigo/io"
"github.com/jumboframes/armorigo/log"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
@@ -102,38 +103,16 @@ type Configuration struct {
func Parse() (*Configuration, error) {
var (
argConfigFile = pflag.String("config", "", "config file, default not configured")
argArmorigoLogLevel = pflag.String("loglevel", "info", "log level for armorigo log")
argDaemonRLimitNofile = pflag.Int("daemon-rlimit-nofile", -1, "SetRLimit for number of file of this daemon, default: -1 means ignore")
config *Configuration
)
pflag.Lookup("daemon-rlimit-nofile").NoOptDefVal = "1048576"
// config file
if *argConfigFile != "" {
data, err := os.ReadFile(*argConfigFile)
if err != nil {
return nil, err
}
config = &Configuration{}
if err = yaml.Unmarshal(data, config); err != nil {
return nil, err
}
}
// set klog
klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(klogFlags)
klogFlags.Set("log_dir", config.Log.LogDir)
klogFlags.Set("log_file", config.Log.LogFile)
klogFlags.Set("log_file_max_file", strconv.FormatUint(config.Log.LogFileMaxSizeMB, 10))
klogFlags.Set("logtostderr", strconv.FormatBool(config.Log.ToStderr))
klogFlags.Set("alsologtostderr", strconv.FormatBool(config.Log.AlsoToStderr))
klogFlags.Set("verbosity", strconv.FormatInt(int64(config.Log.Verbosity), 10))
klogFlags.Set("add_dir_header", strconv.FormatBool(config.Log.AddDirHeader))
klogFlags.Set("skip_headers", strconv.FormatBool(config.Log.SkipHeaders))
klogFlags.Set("one_output", strconv.FormatBool(config.Log.OneOutput))
klogFlags.Set("skip_log_headers", strconv.FormatBool(config.Log.SkipLogHeaders))
klogFlags.Set("stderrthreshold", strconv.FormatInt(int64(config.Log.StderrThreshold), 10))
// sync the glog and klog flags.
pflag.CommandLine.VisitAll(func(f1 *pflag.Flag) {
@@ -151,11 +130,32 @@ func Parse() (*Configuration, error) {
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()
// armorigo log
level, err := log.ParseLevel(*argArmorigoLogLevel)
if err != nil {
fmt.Println("parse log level err:", err)
return nil, err
}
log.SetLevel(level)
log.SetOutput(os.Stdout)
// config file
if *argConfigFile != "" {
// TODO the command-line is prior to config file
data, err := os.ReadFile(*argConfigFile)
if err != nil {
return nil, err
}
config = &Configuration{}
if err = yaml.Unmarshal(data, config); err != nil {
return nil, err
}
}
if config == nil {
config = &Configuration{}
}
config.Daemon.RLimit.NumFile = *argDaemonRLimitNofile
return config, nil
}

View File

@@ -8,7 +8,7 @@ edgebound:
network: tcp
addr: 0.0.0.0:2432
tls:
enable: true
enable: false
mtls: true
ca_certs:
- ca1.cert
@@ -17,11 +17,11 @@ edgebound:
- cert: edgebound.cert
key: edgebound.key
bypass:
enable: true
enable: false
network: tcp
addr: 192.168.1.10:8443
tls:
enable: true
enable: false
mtls: true
ca_certs:
- ca1.cert
@@ -34,7 +34,7 @@ servicebound:
network: tcp
addr: 0.0.0.0:2431
tls:
enable: true
enable: false
mtls: true
ca_certs:
- ca1.cert

View File

@@ -33,7 +33,7 @@ func (em *edgeManager) closedStream(stream geminio.Stream) {
func (em *edgeManager) forward(end geminio.End) {
edgeID := end.ClientID()
meta := end.Meta()
klog.V(5).Infof("edge forward stream, edgeID: %d, meta: %s", edgeID, meta)
klog.V(5).Infof("edge forward raw message and rpc, edgeID: %d, meta: %s", edgeID, meta)
if em.exchange != nil {
em.exchange.ForwardToService(end)
}

View File

@@ -6,6 +6,7 @@ import (
"crypto/x509"
"net"
"os"
"strings"
"sync"
"github.com/jumboframes/armorigo/rproxy"
@@ -77,12 +78,14 @@ func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeI
streams: mapmap.NewMapMap(),
dao: dao,
shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)),
edges: make(map[uint64]geminio.End),
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
// a simple unix timestamp incemental id factory
idFactory: id.DefaultIncIDCounter,
informer: informer,
exchange: exchange,
}
exchange.AddEdgebound(em)
if !listen.TLS.Enable {
if ln, err = net.Listen(network, addr); err != nil {
@@ -240,7 +243,9 @@ func (em *edgeManager) Serve() {
for {
conn, err := em.geminioLn.Accept()
if err != nil {
if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) {
klog.V(4).Infof("edge manager listener accept err: %s", err)
}
return
}
go em.handleConn(conn)
@@ -270,6 +275,7 @@ func (em *edgeManager) handleConn(conn net.Conn) error {
return nil
}
// management apis
func (em *edgeManager) GetEdgeByID(edgeID uint64) geminio.End {
em.mtx.RLock()
defer em.mtx.RUnlock()

View File

@@ -1,12 +1,12 @@
package edgebound
import (
"errors"
"net"
"strconv"
"time"
"github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/geminio"
@@ -95,20 +95,26 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error {
// delegations for all ends from edgebound, called by geminio
func (em *edgeManager) ConnOnline(d delegate.ConnDescriber) error {
edgeID := d.ClientID()
meta := string(d.Meta())
meta := d.Meta()
addr := d.RemoteAddr()
klog.V(4).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, meta, addr)
// notification for others
klog.V(4).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr)
// inform others
if em.informer != nil {
em.informer.EdgeOnline(edgeID, d.Meta(), addr)
}
// exchange to service
if em.exchange != nil {
return em.exchange.EdgeOnline(edgeID, meta, addr)
}
return nil
}
func (em *edgeManager) ConnOffline(d delegate.ConnDescriber) error {
edgeID := d.ClientID()
meta := string(d.Meta())
meta := d.Meta()
addr := d.RemoteAddr()
klog.V(4).Infof("edge offline, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr)
// offline the cache
err := em.offline(edgeID, addr)
@@ -117,10 +123,14 @@ func (em *edgeManager) ConnOffline(d delegate.ConnDescriber) error {
err, edgeID, string(meta), addr)
return err
}
// inform others
if em.informer != nil {
em.informer.EdgeOffline(edgeID, d.Meta(), addr)
}
// notification for others
// exchange to service
if em.exchange != nil {
return em.exchange.EdgeOffline(edgeID, meta, addr)
}
return nil
}
@@ -128,7 +138,7 @@ func (em *edgeManager) Heartbeat(d delegate.ConnDescriber) error {
edgeID := d.ClientID()
meta := string(d.Meta())
addr := d.RemoteAddr()
klog.V(5).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr)
klog.V(6).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr)
if em.informer != nil {
em.informer.EdgeHeartbeat(edgeID, d.Meta(), addr)
}
@@ -151,9 +161,18 @@ func (em *edgeManager) RemoteRegistration(rpc string, edgeID, streamID uint64) {
}
func (em *edgeManager) GetClientID(meta []byte) (uint64, error) {
// TODO
if em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn {
var (
edgeID uint64
err error
)
if em.exchange != nil {
edgeID, err = em.exchange.GetEdgeID(meta)
if err == nil {
return edgeID, err
}
}
if err == api.ErrRecordNotFound && em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn {
return em.idFactory.GetID(), nil
}
return 0, errors.New("unable to get an edgeID")
return 0, err
}

View File

@@ -79,6 +79,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) {
klog.Errorf("forward message, serviceID: %d, receive err: %s", serviceID, err)
continue
}
klog.V(7).Infof("forward message, receive msg: %s from: %d", string(msg.Data()), end.ClientID())
// get target edgeID
custom := msg.Custom()
edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:])
@@ -92,6 +93,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) {
return
}
// publish in sync, TODO publish in async
msg.SetClientID(edgeID)
err = edge.Publish(context.TODO(), msg)
if err != nil {
klog.V(5).Infof("forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err)

96
pkg/exchange/oob.go Normal file
View File

@@ -0,0 +1,96 @@
package exchange
import (
"context"
"encoding/binary"
"encoding/json"
"net"
"github.com/singchia/frontier/pkg/api"
"k8s.io/klog/v2"
)
func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCGetEdgeID)
if err != nil {
klog.Errorf("exchange get edgeID, get service err: %s, meta: %s", err, string(meta))
if err == api.ErrRecordNotFound {
return 0, api.ErrServiceNotOnline
}
return 0, err
}
// call service
req := svc.NewRequest(meta)
rsp, err := svc.Call(context.TODO(), api.RPCGetEdgeID, req)
if err != nil {
klog.V(5).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta)
return 0, err
}
data := rsp.Data()
if data == nil || len(data) != 8 {
return 0, api.ErrIllegalEdgeID
}
return binary.BigEndian.Uint64(data), nil
}
func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOnline)
if err != nil {
klog.Errorf("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == api.ErrRecordNotFound {
return api.ErrServiceNotOnline
}
return err
}
// call service the edge online event
event := &api.OnEdgeOnline{
EdgeID: edgeID,
Meta: meta,
Net: addr.Network(),
Str: addr.String(),
}
data, err := json.Marshal(event)
if err != nil {
klog.Errorf("exchange edge online, json marshal err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
return err
}
// call service
req := svc.NewRequest(data)
_, err = svc.Call(context.TODO(), api.RPCEdgeOnline, req)
if err != nil {
klog.V(5).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr)
return err
}
return nil
}
func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOffline)
if err != nil {
klog.Errorf("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == api.ErrRecordNotFound {
return api.ErrServiceNotOnline
}
return err
}
// call service the edge offline event
event := &api.OnEdgeOffline{
EdgeID: edgeID,
Meta: meta,
Net: addr.Network(),
Str: addr.String(),
}
data, err := json.Marshal(event)
if err != nil {
klog.Errorf("exchange edge offline, json marshal err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
return err
}
// call service
req := svc.NewRequest(data)
_, err = svc.Call(context.TODO(), api.RPCEdgeOffline, req)
if err != nil {
klog.V(5).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr)
return err
}
return nil
}

View File

@@ -34,7 +34,7 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) {
func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) {
serviceID := end.ClientID()
service := meta.Service
klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service)
klog.V(5).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service)
if sm.exchange != nil {
sm.exchange.ForwardToEdge(meta, end)
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"net"
"os"
"strings"
"sync"
"github.com/jumboframes/armorigo/synchub"
@@ -77,6 +78,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.Se
// a simple unix timestamp incremental id factory
idFactory: id.DefaultIncIDCounter,
informer: informer,
exchange: exchange,
}
exchange.AddServicebound(sm)
@@ -143,7 +145,9 @@ func (sm *serviceManager) Serve() {
for {
conn, err := sm.ln.Accept()
if err != nil {
if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) {
klog.V(4).Infof("service manager listener accept err: %s", err)
}
return
}
go sm.handleConn(conn)

View File

@@ -132,7 +132,7 @@ func (sm *serviceManager) Heartbeat(d delegate.ConnDescriber) error {
serviceID := d.ClientID()
meta := string(d.Meta())
addr := d.RemoteAddr()
klog.V(5).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr)
klog.V(6).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr)
if sm.informer != nil {
sm.informer.ServiceHeartbeat(serviceID, meta, addr)
}