update protobuf api for http and grpc

This commit is contained in:
singchia
2024-03-07 23:12:44 +08:00
parent 883b2b295b
commit 9aadd2c9e4
12 changed files with 2261 additions and 232 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,14 @@ option go_package = "github.com/singchia/frontier/api/controlplane/v1;v1";
import "google/api/annotations.proto";
message Edge {
uint64 edge_id = 1;
string meta = 2;
string addr = 3;
int64 create_time = 4;
}
// list edges
message ListEdgesRequest {
optional string meta = 1;
optional string addr = 2;
@@ -23,14 +31,126 @@ message ListEdgesResponse {
uint32 count = 2;
}
message Edge {
// get edge
message GetEdgeRequest {
uint64 edge_id = 1;
string meta = 2;
}
// kick edge
message KickEdgeRequest {
uint64 edge_id = 1;
}
message KickEdgeResponse {}
// list edge rpcs
message ListEdgeRPCsRequest {
optional string meta = 1;
optional uint64 edge_id = 2;
int64 page = 3;
int64 page_size = 4;
optional int64 start_time = 5;
optional int64 end_time = 6;
optional string order = 7;
}
message ListEdgeRPCsResponse {
repeated string rpcs = 1;
uint32 count = 2;
}
message Service {
uint64 service_id = 1;
string service = 2;
string addr = 3;
int64 create_time = 4;
}
// list services
message ListServicesRequest {
optional string service = 1;
optional string addr = 2;
optional string rpc = 3;
optional string topic = 4;
optional uint64 service_id = 5;
int64 page = 6;
int64 page_size = 7;
optional int64 start_time = 8;
optional int64 end_time = 9;
optional string order = 10;
}
message ListServicesResponse {
repeated Service services = 1;
uint32 count = 2;
}
// get service
message GetServiceRequest {
uint64 service_id = 1;
}
// kick service
message KickServiceRequest {
uint64 service_id = 1;
}
message KickServiceResponse {}
// list service rpcs
message ListServiceRPCsRequest {
optional string service = 1;
optional uint64 service_id = 2;
int64 page = 3;
int64 page_size = 4;
optional int64 start_time = 5;
optional int64 end_time = 6;
optional string order = 7;
}
message ListServiceRPCsResponse {
repeated string rpcs = 1;
uint32 count = 2;
}
// list service topics
message ListServiceTopicsRequest {
optional string service = 1;
optional uint64 service_id = 2;
int64 page = 3;
int64 page_size = 4;
optional int64 start_time = 5;
optional int64 end_time = 6;
optional string order = 7;
}
message ListServiceTopicsResponse {
repeated string topics = 1;
uint32 count = 2;
}
service ControlPlane {
rpc ListEdges(ListEdgesRequest) returns(ListEdgesResponse)
{ option(google.api.http) = { get: "/v1"}; };
// edge related
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse)
{ option(google.api.http) = { get: "/v1/edges"}; };
rpc GetEdge(GetEdgeRequest) returns (Edge)
{ option(google.api.http) = { get: "/v1/edges/{edge_id}"}; };
rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse)
{ option(google.api.http) = { delete: "/v1/edges/{edge_id}"}; };
rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse)
{ option(google.api.http) = { get: "/v1/edges/rpcs"}; };
// service related
rpc ListServices(ListServicesRequest) returns (ListServicesResponse)
{ option(google.api.http) = { get: "/v1/services"}; };
rpc GetService(GetServiceRequest) returns (Service)
{ option(google.api.http) = { get: "/v1/services/{service_id}"}; };
rpc KickService(KickServiceRequest) returns (KickServiceResponse)
{ option(google.api.http) = { delete: "/v1/services/{service_id}"}; };
rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse)
{ option(google.api.http) = { get: "/v1/services/rpcs"}; };
rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse)
{ option(google.api.http) = { get: "/v1/services/topics"}; };
}

View File

@@ -19,14 +19,32 @@ import (
const _ = grpc.SupportPackageIsVersion7
const (
ControlPlane_ListEdges_FullMethodName = "/controlplane.ControlPlane/ListEdges"
ControlPlane_ListEdges_FullMethodName = "/controlplane.ControlPlane/ListEdges"
ControlPlane_GetEdge_FullMethodName = "/controlplane.ControlPlane/GetEdge"
ControlPlane_KickEdge_FullMethodName = "/controlplane.ControlPlane/KickEdge"
ControlPlane_ListEdgeRPCs_FullMethodName = "/controlplane.ControlPlane/ListEdgeRPCs"
ControlPlane_ListServices_FullMethodName = "/controlplane.ControlPlane/ListServices"
ControlPlane_GetService_FullMethodName = "/controlplane.ControlPlane/GetService"
ControlPlane_KickService_FullMethodName = "/controlplane.ControlPlane/KickService"
ControlPlane_ListServiceRPCs_FullMethodName = "/controlplane.ControlPlane/ListServiceRPCs"
ControlPlane_ListServiceTopics_FullMethodName = "/controlplane.ControlPlane/ListServiceTopics"
)
// ControlPlaneClient is the client API for ControlPlane service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ControlPlaneClient interface {
// edge related
ListEdges(ctx context.Context, in *ListEdgesRequest, opts ...grpc.CallOption) (*ListEdgesResponse, error)
GetEdge(ctx context.Context, in *GetEdgeRequest, opts ...grpc.CallOption) (*Edge, error)
KickEdge(ctx context.Context, in *KickEdgeRequest, opts ...grpc.CallOption) (*KickEdgeResponse, error)
ListEdgeRPCs(ctx context.Context, in *ListEdgeRPCsRequest, opts ...grpc.CallOption) (*ListEdgeRPCsResponse, error)
// service related
ListServices(ctx context.Context, in *ListServicesRequest, opts ...grpc.CallOption) (*ListServicesResponse, error)
GetService(ctx context.Context, in *GetServiceRequest, opts ...grpc.CallOption) (*Service, error)
KickService(ctx context.Context, in *KickServiceRequest, opts ...grpc.CallOption) (*KickServiceResponse, error)
ListServiceRPCs(ctx context.Context, in *ListServiceRPCsRequest, opts ...grpc.CallOption) (*ListServiceRPCsResponse, error)
ListServiceTopics(ctx context.Context, in *ListServiceTopicsRequest, opts ...grpc.CallOption) (*ListServiceTopicsResponse, error)
}
type controlPlaneClient struct {
@@ -46,11 +64,93 @@ func (c *controlPlaneClient) ListEdges(ctx context.Context, in *ListEdgesRequest
return out, nil
}
func (c *controlPlaneClient) GetEdge(ctx context.Context, in *GetEdgeRequest, opts ...grpc.CallOption) (*Edge, error) {
out := new(Edge)
err := c.cc.Invoke(ctx, ControlPlane_GetEdge_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) KickEdge(ctx context.Context, in *KickEdgeRequest, opts ...grpc.CallOption) (*KickEdgeResponse, error) {
out := new(KickEdgeResponse)
err := c.cc.Invoke(ctx, ControlPlane_KickEdge_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) ListEdgeRPCs(ctx context.Context, in *ListEdgeRPCsRequest, opts ...grpc.CallOption) (*ListEdgeRPCsResponse, error) {
out := new(ListEdgeRPCsResponse)
err := c.cc.Invoke(ctx, ControlPlane_ListEdgeRPCs_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) ListServices(ctx context.Context, in *ListServicesRequest, opts ...grpc.CallOption) (*ListServicesResponse, error) {
out := new(ListServicesResponse)
err := c.cc.Invoke(ctx, ControlPlane_ListServices_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) GetService(ctx context.Context, in *GetServiceRequest, opts ...grpc.CallOption) (*Service, error) {
out := new(Service)
err := c.cc.Invoke(ctx, ControlPlane_GetService_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) KickService(ctx context.Context, in *KickServiceRequest, opts ...grpc.CallOption) (*KickServiceResponse, error) {
out := new(KickServiceResponse)
err := c.cc.Invoke(ctx, ControlPlane_KickService_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) ListServiceRPCs(ctx context.Context, in *ListServiceRPCsRequest, opts ...grpc.CallOption) (*ListServiceRPCsResponse, error) {
out := new(ListServiceRPCsResponse)
err := c.cc.Invoke(ctx, ControlPlane_ListServiceRPCs_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlPlaneClient) ListServiceTopics(ctx context.Context, in *ListServiceTopicsRequest, opts ...grpc.CallOption) (*ListServiceTopicsResponse, error) {
out := new(ListServiceTopicsResponse)
err := c.cc.Invoke(ctx, ControlPlane_ListServiceTopics_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ControlPlaneServer is the server API for ControlPlane service.
// All implementations must embed UnimplementedControlPlaneServer
// for forward compatibility
type ControlPlaneServer interface {
// edge related
ListEdges(context.Context, *ListEdgesRequest) (*ListEdgesResponse, error)
GetEdge(context.Context, *GetEdgeRequest) (*Edge, error)
KickEdge(context.Context, *KickEdgeRequest) (*KickEdgeResponse, error)
ListEdgeRPCs(context.Context, *ListEdgeRPCsRequest) (*ListEdgeRPCsResponse, error)
// service related
ListServices(context.Context, *ListServicesRequest) (*ListServicesResponse, error)
GetService(context.Context, *GetServiceRequest) (*Service, error)
KickService(context.Context, *KickServiceRequest) (*KickServiceResponse, error)
ListServiceRPCs(context.Context, *ListServiceRPCsRequest) (*ListServiceRPCsResponse, error)
ListServiceTopics(context.Context, *ListServiceTopicsRequest) (*ListServiceTopicsResponse, error)
mustEmbedUnimplementedControlPlaneServer()
}
@@ -61,6 +161,30 @@ type UnimplementedControlPlaneServer struct {
func (UnimplementedControlPlaneServer) ListEdges(context.Context, *ListEdgesRequest) (*ListEdgesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListEdges not implemented")
}
func (UnimplementedControlPlaneServer) GetEdge(context.Context, *GetEdgeRequest) (*Edge, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetEdge not implemented")
}
func (UnimplementedControlPlaneServer) KickEdge(context.Context, *KickEdgeRequest) (*KickEdgeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method KickEdge not implemented")
}
func (UnimplementedControlPlaneServer) ListEdgeRPCs(context.Context, *ListEdgeRPCsRequest) (*ListEdgeRPCsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListEdgeRPCs not implemented")
}
func (UnimplementedControlPlaneServer) ListServices(context.Context, *ListServicesRequest) (*ListServicesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListServices not implemented")
}
func (UnimplementedControlPlaneServer) GetService(context.Context, *GetServiceRequest) (*Service, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetService not implemented")
}
func (UnimplementedControlPlaneServer) KickService(context.Context, *KickServiceRequest) (*KickServiceResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method KickService not implemented")
}
func (UnimplementedControlPlaneServer) ListServiceRPCs(context.Context, *ListServiceRPCsRequest) (*ListServiceRPCsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListServiceRPCs not implemented")
}
func (UnimplementedControlPlaneServer) ListServiceTopics(context.Context, *ListServiceTopicsRequest) (*ListServiceTopicsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListServiceTopics not implemented")
}
func (UnimplementedControlPlaneServer) mustEmbedUnimplementedControlPlaneServer() {}
// UnsafeControlPlaneServer may be embedded to opt out of forward compatibility for this service.
@@ -92,6 +216,150 @@ func _ControlPlane_ListEdges_Handler(srv interface{}, ctx context.Context, dec f
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_GetEdge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetEdgeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).GetEdge(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_GetEdge_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).GetEdge(ctx, req.(*GetEdgeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_KickEdge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(KickEdgeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).KickEdge(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_KickEdge_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).KickEdge(ctx, req.(*KickEdgeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_ListEdgeRPCs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListEdgeRPCsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).ListEdgeRPCs(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_ListEdgeRPCs_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).ListEdgeRPCs(ctx, req.(*ListEdgeRPCsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_ListServices_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListServicesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).ListServices(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_ListServices_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).ListServices(ctx, req.(*ListServicesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_GetService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetServiceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).GetService(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_GetService_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).GetService(ctx, req.(*GetServiceRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_KickService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(KickServiceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).KickService(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_KickService_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).KickService(ctx, req.(*KickServiceRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_ListServiceRPCs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListServiceRPCsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).ListServiceRPCs(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_ListServiceRPCs_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).ListServiceRPCs(ctx, req.(*ListServiceRPCsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlPlane_ListServiceTopics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListServiceTopicsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlPlaneServer).ListServiceTopics(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlPlane_ListServiceTopics_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlPlaneServer).ListServiceTopics(ctx, req.(*ListServiceTopicsRequest))
}
return interceptor(ctx, in, info, handler)
}
// ControlPlane_ServiceDesc is the grpc.ServiceDesc for ControlPlane service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -103,6 +371,38 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{
MethodName: "ListEdges",
Handler: _ControlPlane_ListEdges_Handler,
},
{
MethodName: "GetEdge",
Handler: _ControlPlane_GetEdge_Handler,
},
{
MethodName: "KickEdge",
Handler: _ControlPlane_KickEdge_Handler,
},
{
MethodName: "ListEdgeRPCs",
Handler: _ControlPlane_ListEdgeRPCs_Handler,
},
{
MethodName: "ListServices",
Handler: _ControlPlane_ListServices_Handler,
},
{
MethodName: "GetService",
Handler: _ControlPlane_GetService_Handler,
},
{
MethodName: "KickService",
Handler: _ControlPlane_KickService_Handler,
},
{
MethodName: "ListServiceRPCs",
Handler: _ControlPlane_ListServiceRPCs_Handler,
},
{
MethodName: "ListServiceTopics",
Handler: _ControlPlane_ListServiceTopics_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "controlplane.proto",

View File

@@ -19,15 +19,41 @@ var _ = binding.EncodeURL
const _ = http.SupportPackageIsVersion1
const OperationControlPlaneGetEdge = "/controlplane.ControlPlane/GetEdge"
const OperationControlPlaneGetService = "/controlplane.ControlPlane/GetService"
const OperationControlPlaneKickEdge = "/controlplane.ControlPlane/KickEdge"
const OperationControlPlaneKickService = "/controlplane.ControlPlane/KickService"
const OperationControlPlaneListEdgeRPCs = "/controlplane.ControlPlane/ListEdgeRPCs"
const OperationControlPlaneListEdges = "/controlplane.ControlPlane/ListEdges"
const OperationControlPlaneListServiceRPCs = "/controlplane.ControlPlane/ListServiceRPCs"
const OperationControlPlaneListServiceTopics = "/controlplane.ControlPlane/ListServiceTopics"
const OperationControlPlaneListServices = "/controlplane.ControlPlane/ListServices"
type ControlPlaneHTTPServer interface {
GetEdge(context.Context, *GetEdgeRequest) (*Edge, error)
GetService(context.Context, *GetServiceRequest) (*Service, error)
KickEdge(context.Context, *KickEdgeRequest) (*KickEdgeResponse, error)
KickService(context.Context, *KickServiceRequest) (*KickServiceResponse, error)
ListEdgeRPCs(context.Context, *ListEdgeRPCsRequest) (*ListEdgeRPCsResponse, error)
// ListEdges edge related
ListEdges(context.Context, *ListEdgesRequest) (*ListEdgesResponse, error)
ListServiceRPCs(context.Context, *ListServiceRPCsRequest) (*ListServiceRPCsResponse, error)
ListServiceTopics(context.Context, *ListServiceTopicsRequest) (*ListServiceTopicsResponse, error)
// ListServices service related
ListServices(context.Context, *ListServicesRequest) (*ListServicesResponse, error)
}
func RegisterControlPlaneHTTPServer(s *http.Server, srv ControlPlaneHTTPServer) {
r := s.Route("/")
r.GET("/v1", _ControlPlane_ListEdges0_HTTP_Handler(srv))
r.GET("/v1/edges", _ControlPlane_ListEdges0_HTTP_Handler(srv))
r.GET("/v1/edges/{edge_id}", _ControlPlane_GetEdge0_HTTP_Handler(srv))
r.DELETE("/v1/edges/{edge_id}", _ControlPlane_KickEdge0_HTTP_Handler(srv))
r.GET("/v1/edges/rpcs", _ControlPlane_ListEdgeRPCs0_HTTP_Handler(srv))
r.GET("/v1/services", _ControlPlane_ListServices0_HTTP_Handler(srv))
r.GET("/v1/services/{service_id}", _ControlPlane_GetService0_HTTP_Handler(srv))
r.DELETE("/v1/services/{service_id}", _ControlPlane_KickService0_HTTP_Handler(srv))
r.GET("/v1/services/rpcs", _ControlPlane_ListServiceRPCs0_HTTP_Handler(srv))
r.GET("/v1/services/topics", _ControlPlane_ListServiceTopics0_HTTP_Handler(srv))
}
func _ControlPlane_ListEdges0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
@@ -49,8 +75,180 @@ func _ControlPlane_ListEdges0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx
}
}
func _ControlPlane_GetEdge0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in GetEdgeRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
if err := ctx.BindVars(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneGetEdge)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.GetEdge(ctx, req.(*GetEdgeRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*Edge)
return ctx.Result(200, reply)
}
}
func _ControlPlane_KickEdge0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in KickEdgeRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
if err := ctx.BindVars(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneKickEdge)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.KickEdge(ctx, req.(*KickEdgeRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*KickEdgeResponse)
return ctx.Result(200, reply)
}
}
func _ControlPlane_ListEdgeRPCs0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in ListEdgeRPCsRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneListEdgeRPCs)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.ListEdgeRPCs(ctx, req.(*ListEdgeRPCsRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*ListEdgeRPCsResponse)
return ctx.Result(200, reply)
}
}
func _ControlPlane_ListServices0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in ListServicesRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneListServices)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.ListServices(ctx, req.(*ListServicesRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*ListServicesResponse)
return ctx.Result(200, reply)
}
}
func _ControlPlane_GetService0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in GetServiceRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
if err := ctx.BindVars(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneGetService)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.GetService(ctx, req.(*GetServiceRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*Service)
return ctx.Result(200, reply)
}
}
func _ControlPlane_KickService0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in KickServiceRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
if err := ctx.BindVars(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneKickService)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.KickService(ctx, req.(*KickServiceRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*KickServiceResponse)
return ctx.Result(200, reply)
}
}
func _ControlPlane_ListServiceRPCs0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in ListServiceRPCsRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneListServiceRPCs)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.ListServiceRPCs(ctx, req.(*ListServiceRPCsRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*ListServiceRPCsResponse)
return ctx.Result(200, reply)
}
}
func _ControlPlane_ListServiceTopics0_HTTP_Handler(srv ControlPlaneHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in ListServiceTopicsRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationControlPlaneListServiceTopics)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.ListServiceTopics(ctx, req.(*ListServiceTopicsRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*ListServiceTopicsResponse)
return ctx.Result(200, reply)
}
}
type ControlPlaneHTTPClient interface {
GetEdge(ctx context.Context, req *GetEdgeRequest, opts ...http.CallOption) (rsp *Edge, err error)
GetService(ctx context.Context, req *GetServiceRequest, opts ...http.CallOption) (rsp *Service, err error)
KickEdge(ctx context.Context, req *KickEdgeRequest, opts ...http.CallOption) (rsp *KickEdgeResponse, err error)
KickService(ctx context.Context, req *KickServiceRequest, opts ...http.CallOption) (rsp *KickServiceResponse, err error)
ListEdgeRPCs(ctx context.Context, req *ListEdgeRPCsRequest, opts ...http.CallOption) (rsp *ListEdgeRPCsResponse, err error)
ListEdges(ctx context.Context, req *ListEdgesRequest, opts ...http.CallOption) (rsp *ListEdgesResponse, err error)
ListServiceRPCs(ctx context.Context, req *ListServiceRPCsRequest, opts ...http.CallOption) (rsp *ListServiceRPCsResponse, err error)
ListServiceTopics(ctx context.Context, req *ListServiceTopicsRequest, opts ...http.CallOption) (rsp *ListServiceTopicsResponse, err error)
ListServices(ctx context.Context, req *ListServicesRequest, opts ...http.CallOption) (rsp *ListServicesResponse, err error)
}
type ControlPlaneHTTPClientImpl struct {
@@ -61,9 +259,74 @@ func NewControlPlaneHTTPClient(client *http.Client) ControlPlaneHTTPClient {
return &ControlPlaneHTTPClientImpl{client}
}
func (c *ControlPlaneHTTPClientImpl) GetEdge(ctx context.Context, in *GetEdgeRequest, opts ...http.CallOption) (*Edge, error) {
var out Edge
pattern := "/v1/edges/{edge_id}"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneGetEdge))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) GetService(ctx context.Context, in *GetServiceRequest, opts ...http.CallOption) (*Service, error) {
var out Service
pattern := "/v1/services/{service_id}"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneGetService))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) KickEdge(ctx context.Context, in *KickEdgeRequest, opts ...http.CallOption) (*KickEdgeResponse, error) {
var out KickEdgeResponse
pattern := "/v1/edges/{edge_id}"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneKickEdge))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "DELETE", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) KickService(ctx context.Context, in *KickServiceRequest, opts ...http.CallOption) (*KickServiceResponse, error) {
var out KickServiceResponse
pattern := "/v1/services/{service_id}"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneKickService))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "DELETE", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) ListEdgeRPCs(ctx context.Context, in *ListEdgeRPCsRequest, opts ...http.CallOption) (*ListEdgeRPCsResponse, error) {
var out ListEdgeRPCsResponse
pattern := "/v1/edges/rpcs"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneListEdgeRPCs))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) ListEdges(ctx context.Context, in *ListEdgesRequest, opts ...http.CallOption) (*ListEdgesResponse, error) {
var out ListEdgesResponse
pattern := "/v1"
pattern := "/v1/edges"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneListEdges))
opts = append(opts, http.PathTemplate(pattern))
@@ -73,3 +336,42 @@ func (c *ControlPlaneHTTPClientImpl) ListEdges(ctx context.Context, in *ListEdge
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) ListServiceRPCs(ctx context.Context, in *ListServiceRPCsRequest, opts ...http.CallOption) (*ListServiceRPCsResponse, error) {
var out ListServiceRPCsResponse
pattern := "/v1/services/rpcs"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneListServiceRPCs))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) ListServiceTopics(ctx context.Context, in *ListServiceTopicsRequest, opts ...http.CallOption) (*ListServiceTopicsResponse, error) {
var out ListServiceTopicsResponse
pattern := "/v1/services/topics"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneListServiceTopics))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}
func (c *ControlPlaneHTTPClientImpl) ListServices(ctx context.Context, in *ListServicesRequest, opts ...http.CallOption) (*ListServicesResponse, error) {
var out ListServicesResponse
pattern := "/v1/services"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationControlPlaneListServices))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}

3
go.mod
View File

@@ -6,7 +6,6 @@ replace github.com/singchia/geminio => ../../moresec/singchia/geminio
require (
github.com/go-kratos/kratos/v2 v2.7.2
github.com/gorilla/mux v1.8.1
github.com/jumboframes/armorigo v0.4.0-rc.1
github.com/singchia/geminio v1.1.5-rc.1
github.com/singchia/go-timer/v2 v2.2.1
@@ -27,12 +26,14 @@ require (
github.com/go-playground/form/v4 v4.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/singchia/yafsm v1.0.1 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect

6
go.sum
View File

@@ -1,7 +1,11 @@
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/go-kratos/aegis v0.2.0 h1:dObzCDWn3XVjUkgxyBp6ZeWtx/do0DPZ7LY3yNSJLUQ=
github.com/go-kratos/aegis v0.2.0/go.mod h1:v0R2m73WgEEYB3XYu6aE2WcMwsZkJ/Rzuf5eVccm7bI=
github.com/go-kratos/kratos/v2 v2.7.2 h1:WVPGFNLKpv+0odMnCPxM4ZHa2hy9I5FOnwpG3Vv4w5c=
@@ -59,6 +63,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -34,7 +34,7 @@ type Edgebound interface {
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
Serve()
Serve() error
Close() error
}
@@ -47,7 +47,7 @@ type Servicebound interface {
GetServiceByTopic(topic string) (geminio.End, error)
DelSerivces(service string) error
Serve()
Serve() error
Close() error
}

View File

@@ -0,0 +1,21 @@
package server
import (
"net"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/http"
v1 "github.com/singchia/frontier/api/controlplane/v1"
)
func NewHTTPServer(ln net.Listener, svc v1.ControlPlaneServer) *http.Server {
// new server
opts := []http.ServerOption{
http.Middleware(recovery.Recovery()),
http.Listener(ln),
}
srv := http.NewServer(opts...)
v1.RegisterControlPlaneHTTPServer(srv, svc)
return srv
}

View File

@@ -234,7 +234,7 @@ func (em *edgeManager) bypassDial(_ net.Addr, _ interface{}) (net.Conn, error) {
}
// Serve blocks until the Accept error
func (em *edgeManager) Serve() {
func (em *edgeManager) Serve() error {
bypass := &em.conf.Edgebound.Bypass
if bypass.Enable {
go em.cm.Serve()
@@ -246,11 +246,13 @@ func (em *edgeManager) Serve() {
if err != nil {
if !strings.Contains(err.Error(), apis.ErrStrUseOfClosedConnection) {
klog.V(1).Infof("edge manager listener accept err: %s", err)
return err
}
return
break
}
go em.handleConn(conn)
}
return nil
}
func (em *edgeManager) handleConn(conn net.Conn) error {
@@ -308,7 +310,15 @@ func (em *edgeManager) ListStreams(edgeID uint64) []geminio.Stream {
}
func (em *edgeManager) DelEdgeByID(edgeID uint64) error {
panic("TODO")
// TODO test it
em.mtx.RLock()
defer em.mtx.RUnlock()
edge, ok := em.edges[edgeID]
if !ok {
return apis.ErrEdgeNotOnline
}
return edge.Close()
}
// Close all edges and manager

View File

@@ -1,107 +0,0 @@
package http
import (
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"os"
"strings"
"github.com/gorilla/mux"
"github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/security"
"k8s.io/klog/v2"
)
type Rest struct {
conf *config.Configuration
router *mux.Router
// listener for http
ln net.Listener
}
func NewRest(conf *config.Configuration) (*Rest, error) {
listen := &conf.Http.Listen
var (
ln net.Listener
network string = listen.Network
addr string = listen.Addr
err error
)
rest := &Rest{
conf: conf,
}
if !listen.TLS.Enable {
if ln, err = net.Listen(network, addr); err != nil {
klog.Errorf("rest net listen err: %s, network: %s, addr: %s", err, network, addr)
return nil, err
}
} else {
certs := []tls.Certificate{}
for _, certFile := range listen.TLS.Certs {
cert, err := tls.LoadX509KeyPair(certFile.Cert, certFile.Key)
if err != nil {
klog.Errorf("service manager tls load x509 cert err: %s, cert: %s, key: %s", err, certFile.Cert, certFile.Key)
continue
}
certs = append(certs, cert)
}
if !listen.TLS.MTLS {
// tls
if ln, err = tls.Listen(network, addr, &tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: security.CiperSuites,
Certificates: certs,
}); err != nil {
klog.Errorf("service manager tls listen err: %s, network: %s, addr: %s", err, network, addr)
return nil, err
}
} else {
// mtls, require for edge cert
// load all ca certs to pool
caPool := x509.NewCertPool()
for _, caFile := range listen.TLS.CACerts {
ca, err := os.ReadFile(caFile)
if err != nil {
klog.Errorf("service manager read ca cert err: %s, file: %s", err, caFile)
return nil, err
}
if !caPool.AppendCertsFromPEM(ca) {
klog.Warningf("service manager append ca cert to ca pool err: %s, file: %s", err, caFile)
continue
}
}
if ln, err = tls.Listen(network, addr, &tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: security.CiperSuites,
ClientCAs: caPool,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: certs,
}); err != nil {
klog.Errorf("service manager tls listen err: %s, network: %s, addr: %s", err, network, addr)
return nil, err
}
}
}
rest.ln = ln
// router
return rest, nil
}
func (rest *Rest) Serve() {
err := http.Serve(rest.ln, rest.router)
if err != nil {
if !strings.Contains(err.Error(), apis.ErrStrUseOfClosedConnection) {
klog.V(1).Infof("rest listener serve err: %s", err)
}
}
}

View File

@@ -253,7 +253,7 @@ func buildServiceRPCQuery(tx *gorm.DB, query *ServiceRPCQuery) *gorm.DB {
type ServiceTopicQuery struct {
Query
// Condition fields
Topic string
Service string
ServiceID uint64
}
@@ -333,8 +333,8 @@ func (dao *Dao) CreateServiceTopic(topic *model.ServiceTopic) error {
func buildServiceTopicQuery(tx *gorm.DB, query *ServiceTopicQuery) *gorm.DB {
// join and search
if query.Topic != "" {
tx = tx.InnerJoins("INNER JOIN services ON services.service_id = service_topics.service_id AND service LIKE ?", "%"+query.Topic+"%")
if query.Service != "" {
tx = tx.InnerJoins("INNER JOIN services ON services.service_id = service_topics.service_id AND service LIKE ?", "%"+query.Service+"%")
}
// time range
if query.StartTime != 0 && query.EndTime != 0 && query.EndTime > query.StartTime {

View File

@@ -150,17 +150,19 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.S
return sm, nil
}
func (sm *serviceManager) Serve() {
func (sm *serviceManager) Serve() error {
for {
conn, err := sm.ln.Accept()
if err != nil {
if !strings.Contains(err.Error(), apis.ErrStrUseOfClosedConnection) {
klog.V(1).Infof("service manager listener accept err: %s", err)
return err
}
return
break
}
go sm.handleConn(conn)
}
return nil
}
func (sm *serviceManager) handleConn(conn net.Conn) error {
@@ -303,6 +305,18 @@ func (sm *serviceManager) DelSerivces(service string) error {
panic("TODO")
}
func (sm *serviceManager) DelServiceByID(serviceID uint64) error {
// TODO test it
sm.mtx.RLock()
defer sm.mtx.RUnlock()
service, ok := sm.services[serviceID]
if !ok {
return apis.ErrEdgeNotOnline
}
return service.Close()
}
func (sm *serviceManager) ListStreams(serviceID uint64) []geminio.Stream {
all := sm.streams.MGetAll(serviceID)
return utils.Slice2streams(all)