Files
frontier/README.md
2024-05-29 19:13:27 +08:00

10 KiB
Raw Blame History

Frontier是一个go开发的开源长连接网关能让微服务直接连通边缘节点或客户端反之边缘节点或客户端也能直接连通到微服务。对于微服务或者边缘节点提供了单双向RPC调用消息发布和接收以及直接点对点拨通通信的特性。Frontier采用云原生架构可以使用Operator快速部署一个集群轻松实现百万连接。

特性

  • RPC 微服务和边缘节点可以直接Call到对方的注册函数并且在微服务侧支持负载均衡
  • 消息 微服务和边缘节点可以直接Publish到对方的topic边缘节点可以Publish到外部消息队列的topic微服务侧topic支持负载均衡
  • 多路复用 微服务可以直接在边缘节点上打开一个新流(连接),你可以封装例如流读写、拷贝文件、代理等,天堑变通途
  • 上线离线控制 微服务可以注册边缘节点获取ID、上线和下线回调当边缘节点请求ID、上线或者离线时Frontier会调用这个方法
  • API简单 在项目api目录下分别对边缘和微服务提供了封装好的sdk你可以非常简单和轻量的基于这个sdk做开发
  • 部署简单 支持多种部署方式(docker docker-compose helm以及operator)来部署和管理你的Frontier实例或集群
  • 水平扩展 提供了Frontiter和Frontlas集群在单实例性能达到瓶颈下可以水平扩展Frontier实例或集群
  • 高可用 Frontlas支持集群部署同时使用微服务和边缘节点永久重连sdk在当前实例宕机情况时切换新可用实例继续服务
  • 支持控制面 同时提供了gRPC和rest接口允许运维人员对微服务和边缘节点查询或者删除删除即踢出微服务或边缘节点下线

架构

Frontier架构

  • Service End微服务侧的功能入口默认连接
  • Edge End边缘节点或客户端侧的功能入口
  • Publish/Receive发布和接收消息
    • Topic发布和接收消息的主题
  • Call/Register调用和注册函数
    • Method调用和注册的函数名
  • OpenStream/AcceptStream打开和接收点到点流连接

Frontier需要微服务和边缘节点两方都主动连接到Frontier这种设计的优势在不需要Frontier主动连接任何一个地址Service和Edge的元信息可以在连接的时候携带过来。连接的默认端口是

  • 30011提供给微服务连接获取Service
  • 30012提供给边缘节点连接获取Edge
  • 30010:提供给运维人员或者程序使用的控制面

详情见部署章节

使用

示例

本仓库携带了一个Chatroom示例

examples/chatroom

仅100行代码可以通过

make examples

在bin目录下得到chatroom_servicechatroom_client可执行程序,运行示例:

https://github.com/singchia/frontier/assets/15531166/ae408745-1997-491a-a55d-c2f8d0e72f9a

可以看到上线离线通知消息Publish等功能

Service

微服务侧获取Service

package main

import (
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, err := service.NewService(dialer)
	// 开始使用service
}

Service接收获取ID、上线/离线通知

package main

import (
	"context"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	srv.RegisterGetEdgeID(context.TODO(), getID)
	srv.RegisterEdgeOnline(context.TODO(), online)
	srv.RegisterEdgeOffline(context.TODO(), offline)
}

// service可以根据meta分配id给edge
func getID(meta []byte) (uint64, error) {
	return 0, nil
}

func online(edgeID uint64, meta []byte, addr net.Addr) error {
	return nil
}

func offline(edgeID uint64, meta []byte, addr net.Addr) error {
	return nil
}

Service发布消息到Edge

package main

import (
	"context"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	msg := srv.NewMessage([]byte("test"))
	// 发布一条消息到ID为1001的边缘节点
	err = srv.Publish(context.TODO(), 1001, msg)
}

Service调用Edge的RPC

package main

import (
	"context"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	req := srv.NewRequest([]byte("test"))
	// 调用ID为1001边缘节点的foo方法前提是边缘节点需要预注册该方法
	rsp, err := srv.Call(context.TODO(), edgeID, "foo", req)
}

Service打开Edge的点到点流

package main

import (
	"context"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	// 打开ID为1001边缘节点的新流同时st也是一个net.Conn前提是edge需要AcceptStream接收该流
	st, err := srv.OpenStream(context.TODO(), 1001)
}

基于这个新打开流,你可以用来传递文件、代理流量等。

Service注册方法以供Edge调用

package main

import (
	"context"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	// 注册一个"echo"方法
	srv.Register(context.TODO(), "echo", echo)
}

func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
	value := req.Data()
	rsp.SetData(value)
}

Service声明接收Topic

package main

import (
	"context"
	"fmt"
	"io"
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	// 在获取svc时声明需要接收的topic
	svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
	for {
		msg, err := srv.Receive(context.TODO())
		if err == io.EOF {
			return
		}
		if err != nil {
			fmt.Println("\n> receive err:", err)
			continue
		}
		msg.Done()
		fmt.Printf("> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
	}
}

Edge

边缘节点侧获取Edge

package main

import (
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, err := edge.NewEdge(dialer)
	// 开始使用eg
}

边缘节点发布消息到Topic

Service需要提前声明接收该Topic或者在配置文件中配置外部MQ。

package main

import (
	"net"

	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, err := edge.NewEdge(dialer)
	// 开始使用eg
	msg := cli.NewMessage([]byte("test"))
	err = cli.Publish(context.TODO(), "foo", msg)
}

控制面

Frontier控制面提供gRPC和rest接口运维人员可以使用这些api来确定本实例的连接情况定义在

api/controlplane/frontier/v1/controlplane.proto

service ControlPlane {
    // 列举所有边缘节点
    rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse)
        { option(google.api.http) = { get: "/v1/edges"}; };
    // 获取边缘节点详情
    rpc GetEdge(GetEdgeRequest) returns (Edge)
        { option(google.api.http) = { get: "/v1/edges/{edge_id}"}; };
    // 踢除某个边缘节点下线
    rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse)
        { option(google.api.http) = { delete: "/v1/edges/{edge_id}"}; };
    // 列举边缘节点注册的RPC
    rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse)
        { option(google.api.http) = { get: "/v1/edges/rpcs"}; };

    // 列举所有微服务
    rpc ListServices(ListServicesRequest) returns (ListServicesResponse)
        { option(google.api.http) = { get: "/v1/services"}; };
    // 获取微服务详情
    rpc GetService(GetServiceRequest) returns (Service)
        { option(google.api.http) = { get: "/v1/services/{service_id}"}; };
    // 提出某个微服务下线
    rpc KickService(KickServiceRequest) returns (KickServiceResponse)
        { option(google.api.http) = { delete: "/v1/services/{service_id}"}; };
    // 列举微服务注册的RPC
    rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse)
        { option(google.api.http) = { get: "/v1/services/rpcs"}; };
    // 列举微服务接收的Topic
    rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse)
        { option(google.api.http) = { get: "/v1/services/topics"}; };
}

Swagger文档请见swagger

注意

当你配置dao backend使用sqlite3时count才会返回总数默认使用buntdb为性能考虑这个值返回-1

配置

TLS

部署

docker

docker run -d --name frontier -p 30011:30011 -p 30012:30012 singchia/frontier:1.0.0

然后你可以使用上面所说的iclm示例来测试功能性

docker-compose

git clone https://github.com/singchia/frontier.git
cd dist/compose
docker-compose up -d frontier

helm


集群

Frontier + Frontlas架构

引入了一个Frontlas组件用于构建集群Frontlas同样也是无状态组件并不在内存里留存其他信息

  • Frontier最小的Frontier部署实例
  • Frontlas同步到

高可用

k8s

Operator

开发

路线图

祥见 ROADMAP

Bug和Feature

如果你发现任何Bug请提出Issue项目Maintainers会及时响应相关问题。

如果你希望能够提交Feature更快速解决项目问题满足以下简单条件下欢迎提交PR

  • 代码风格保持一致
  • 每次提交一个Feature
  • 提交的代码都携带单元测试

许可证

Released under the Apache License 2.0