example: add simple chatroom

This commit is contained in:
singchia
2024-05-29 14:47:39 +08:00
parent e447d7e733
commit 7c52832269
10 changed files with 352 additions and 32 deletions

View File

@@ -26,7 +26,8 @@ frontlas-linux:
.PHONY: examples
examples:
make -C examples
mv examples/iclm/bin/* ./bin/
mv examples/iclm/bin/* ./bin/ && rm -rf examples/iclm/bin
mv examples/chatroom/bin/* ./bin/ && rm -rf examples/chatroom/bin
# clean
.PHONY: clean

119
README.md
View File

@@ -14,6 +14,7 @@ Frontier是一个go开发的开源长连接网关能让微服务直接连通
- **部署简单**支持多种部署方式如docker、docker-compose、k8s-helm以及operator来部署和管理你的Frontier实例或集群。
- **水平扩展**提供了Frontiter和Frontlas集群在单实例性能达到瓶颈下可以水平扩展Frontier实例或集群。
- **高可用**Frontlas具有集群视角你可以使用微服务和边缘节点永久重连的sdk在当前Frontier宕机情况下新选择一个可用Frontier实例继续服务。
- **支持控制面**;允许管理员查看微服务和边缘节点,允许踢出某个边缘节点下线
## 架构
@@ -32,15 +33,26 @@ Frontier是一个go开发的开源长连接网关能让微服务直接连通
Frontier需要微服务和边缘节点两方都主动连接到Frontier这种设计的优势在不需要Frontier主动连接任何一个地址Service和Edge的元信息可以在连接的时候携带过来。连接的默认端口是
- 30011提供给微服务连接获取Service
- 30012提供给边缘节点连接获取Edge
- 30010提供给运维人员或者程序使用的控制面
- ```30011```提供给微服务连接获取Service
- ```30012```提供给边缘节点连接获取Edge
- ```30010```:提供给运维人员或者程序使用的控制面
详情见部署章节
## 使用
### 示例
本仓库携带了一个Chatroom示例可以通过
```
make examples
```
在bin目录下得到```chatroom_service```和```chatroom_edge```的可执行程序,以下是运行示例:
### Service
@@ -49,7 +61,11 @@ Frontier需要微服务和边缘节点两方都主动连接到Frontier这种
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -65,7 +81,12 @@ func main() {
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -96,7 +117,12 @@ func offline(edgeID uint64, meta []byte, addr net.Addr) error {
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -114,7 +140,12 @@ func main() {
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -132,7 +163,12 @@ func main() {
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -143,13 +179,19 @@ func main() {
st, err := srv.OpenStream(context.TODO(), 1001)
}
```
基于这个新打开流,你可以用来传递文件、代理流量等。
**Service注册方法以供Edge调用**
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/service"
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -168,6 +210,39 @@ func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
**Service声明接收Topic**
```golang
package main
import (
"context"
"fmt"
"io"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
// 在获取svc时声明需要接收的topic
svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
for {
msg, err := srv.Receive(context.TODO())
if err == io.EOF {
return
}
if err != nil {
fmt.Println("\n> receive err:", err)
continue
}
msg.Done()
fmt.Printf("> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
}
}
```
### Edge
**边缘节点侧获取Edge**
@@ -175,7 +250,11 @@ func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
```golang
package main
import "github.com/singchia/frontier/api/dataplane/v1/edge"
import (
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
)
func main() {
dialer := func() (net.Conn, error) {
@@ -231,15 +310,6 @@ Swagger文档请见[swagger](./docs/swagger/swagger.yaml)
当你配置dao backend使用sqlite3时count才会返回总数默认使用buntdb为性能考虑这个值返回-1
### 示例
本仓库携带了一个ICLM(Iteractive command-line messaging)示例,可以通过
```
make examples
```
在bin目录下得到```iclm_service```和```iclm_edge```的可执行程序,以下是运行示例:
## 配置
@@ -277,17 +347,24 @@ docker-compose up -d frontier
<img src="./docs/diagram/frontlas.png" width="100%" height="100%">
引入了一个Frontlas组件用于构建集群Frontlas同样也是无状态组件并不在内存里留存其他信息
- Frontier最小的Frontier部署实例
- Frontlas同步到
### 高可用
## k8s
### Operator
## 开发
### 路线图
祥见 [ROADMAP](./ROADMAP.md)
### Bug和Feature
如果你发现任何Bug请提出Issue项目Maintainers会及时响应相关问题。
@@ -298,9 +375,7 @@ docker-compose up -d frontier
* 每次提交一个Feature
* 提交的代码都携带单元测试
### 路线图
祥见 [ROADMAP](./ROADMAP.md)
## 许可证

View File

@@ -62,5 +62,9 @@ type Edge interface {
type Dialer func() (net.Conn, error)
func NewEdge(dialer Dialer, opts ...EdgeOption) (Edge, error) {
return newRetryEdgeEnd(client.Dialer(dialer), opts...)
}
func NewNoRetryEdge(dialer Dialer, opts ...EdgeOption) (Edge, error) {
return newEdgeEnd(client.Dialer(dialer), opts...)
}

View File

@@ -40,6 +40,42 @@ func newEdgeEnd(dialer client.Dialer, opts ...EdgeOption) (*edgeEnd, error) {
eopts.SetBufferSize(eopt.readBufferSize, eopt.writeBufferSize)
}
// new geminio end
end, err := client.NewEndWithDialer(dialer, eopts)
if err != nil {
return nil, err
}
return &edgeEnd{end}, nil
}
func newRetryEdgeEnd(dialer client.Dialer, opts ...EdgeOption) (*edgeEnd, error) {
// options
eopt := &edgeOption{
readBufferSize: -1,
writeBufferSize: -1,
}
for _, opt := range opts {
opt(eopt)
}
// turn to end options
eopts := client.NewEndOptions()
if eopt.tmr != nil {
eopts.SetTimer(eopt.tmr)
}
if eopt.logger != nil {
eopts.SetLog(eopt.logger)
}
if eopt.edgeID != nil {
eopts.SetClientID(*eopt.edgeID)
}
if eopt.meta != nil {
eopts.SetMeta(eopt.meta)
}
if eopt.readBufferSize != -1 || eopt.writeBufferSize != -1 {
eopts.SetBufferSize(eopt.readBufferSize, eopt.writeBufferSize)
}
// new geminio end
end, err := client.NewRetryEndWithDialer(dialer, eopts)
if err != nil {

BIN
docs/video/chatroom.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 MiB

View File

@@ -2,12 +2,17 @@ GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)
.PHONY: all
all: iclm
all: iclm chatroom
.PHONY: iclm
iclm:
make -C iclm
.PHONY: chatroom
chatroom:
make -C chatroom
.PHONY: clean
clean:
make clean -C iclm
make clean -C iclm
make clean -C chatroom

View File

@@ -0,0 +1,22 @@
PREFIX?=/usr
BINDIR?=$(PREFIX)/bin
GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)
.PHONY: all
all: chatroom_service chatroom_client
.PHONY: clean
clean:
rm chatroom_service chatroom_client
.PHONY: chatroom_service
chatroom_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/chatroom_service service/*.go
.PHONY: chatroom_client
chatroom_client: client/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/chatroom_client client/*.go

View File

@@ -0,0 +1,65 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/spf13/pflag"
)
func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30012", "address to dial")
name := pflag.String("name", "alice", "user name to join chatroom")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
cli, err := edge.NewNoRetryEdge(dialer, edge.OptionEdgeMeta([]byte(*name)))
if err != nil {
fmt.Println("new edge err:", err)
return
}
go func() {
for {
msg, err := cli.Receive(context.TODO())
if err == io.EOF {
return
}
if err != nil {
fmt.Println("\n> receive err:", err)
fmt.Println(">>> ")
continue
}
msg.Done()
chat := &Chat{}
json.Unmarshal(msg.Data(), chat)
fmt.Printf("\033[2K\r[%10s]: %s\n", chat.User, chat.Msg)
fmt.Print(">>> ")
}
}()
fmt.Print(">>> ")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
text := scanner.Text()
msg := cli.NewMessage([]byte(text))
err = cli.Publish(context.TODO(), "chatroom", msg)
if err != nil {
fmt.Printf("publish err: %s", err)
}
fmt.Print(">>> ")
}
}
type Chat struct {
User string
Msg string
}

View File

@@ -0,0 +1,111 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/geminio/pkg/id"
"github.com/spf13/pflag"
)
var (
clients sync.Map
)
func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
svc, err := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"chatroom"}))
if err != nil {
fmt.Println("new service err:", err)
return
}
err = svc.RegisterGetEdgeID(context.TODO(), getID)
if err != nil {
fmt.Println("svc register getID err:", err)
return
}
err = svc.RegisterEdgeOnline(context.TODO(), online)
if err != nil {
fmt.Println("svc register online err:", err)
return
}
err = svc.RegisterEdgeOffline(context.TODO(), offline)
if err != nil {
fmt.Println("svc register offline err:", err)
return
}
for {
msg, err := svc.Receive(context.TODO())
if err == io.EOF {
return
}
msg.Done()
name := "unknown"
value, ok := clients.Load(msg.ClientID())
if ok {
name = value.(string)
}
fmt.Printf("[%10s]: %s\n", name, string(msg.Data()))
clients.Range(func(key, value any) bool {
if value.(string) == name {
return true
}
chat := &Chat{
User: name,
Msg: string(msg.Data()),
}
data, _ := json.Marshal(chat)
newmsg := svc.NewMessage(data)
svc.Publish(context.TODO(), key.(uint64), newmsg)
return true
})
}
}
type Chat struct {
User string
Msg string
}
func getID(meta []byte) (uint64, error) {
return id.DefaultIncIDCounter.GetID(), nil
}
func online(edgeID uint64, meta []byte, addr net.Addr) error {
err := error(nil)
clients.Range(func(key, value any) bool {
if value.(string) == string(meta) {
err = errors.New("user exists")
return false
}
return true
})
if err != nil {
fmt.Printf("> online failed: %s, name: %s, addr: %s\n", err, string(meta), addr.String())
return err
}
fmt.Printf("> online, name: %s, addr: %s\n", string(meta), addr.String())
clients.Store(edgeID, string(meta))
return err
}
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
fmt.Printf("> offline, name: %s, addr: %s\n", string(meta), addr.String())
clients.Delete(edgeID)
return nil
}

View File

@@ -70,13 +70,7 @@ func (em *edgeManager) online(end geminio.End) error {
if em.informer != nil {
em.informer.EdgeOnline(end.ClientID(), end.Meta(), end.RemoteAddr())
}
// exchange to service
if em.exchange != nil {
err := em.exchange.EdgeOnline(end.ClientID(), end.Meta(), end.RemoteAddr())
if err == apis.ErrServiceNotOnline {
return nil
}
}
return nil
}
@@ -144,6 +138,13 @@ func (em *edgeManager) ConnOnline(d delegate.ConnDescriber) error {
meta := d.Meta()
addr := d.RemoteAddr()
// exchange to service
if em.exchange != nil {
err := em.exchange.EdgeOnline(edgeID, meta, addr)
if err != nil && err != apis.ErrServiceNotOnline {
return err
}
}
klog.V(2).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr)
return nil
}