operator: 1 frontier 1 frontlas deployment finished

This commit is contained in:
singchia
2024-05-15 18:52:07 +08:00
parent b30ddb7e33
commit 65642e0ae7
36 changed files with 905 additions and 83 deletions

View File

@@ -1,6 +1,6 @@
include ./Makefile.defs
REGISTRY?=registry.hub.docker.com/singchia
REGISTRY?=singchia
CC?=cc
all: frontier frontlas
@@ -47,8 +47,8 @@ install-frontier:
install-frontlas:
install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 -d $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontlas $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontlas.yaml $(DESTDIR)$(CONFDIR)
# image
.PHONY: image-frontier
@@ -91,16 +91,16 @@ container-frontier:
.PHONY: container-frontlas
container-frontlas:
docker rm -f frontlas
docker run -d --name frontlas -p 30021:30021 -p 30022:30022 frontlas:${VERSION} --config /usr/conf/frontlas.yaml -v 1
docker run -d --name frontlas -p 40011:40011 -p 40012:40012 ${REGISTRY}/frontlas:${VERSION} --config /usr/conf/frontlas.yaml -v 1
# api
.PHONY: api-frontier
api-frontier:
docker run --rm -v ${PWD}/api/controlplane/frontier/v1:/api/controlplane/frontier/v1 image-gen-api:${VERSION}
docker run --rm -v ${PWD}/api/controlplane/frontier/v1:/api/controlplane/v1 image-gen-api:${VERSION}
.PHONY: api-frontlas
api-frontlas:
docker run --rm -v ${PWD}/api/controlplane/frontlas/v1:/api/controlplane/frontlas/v1 image-gen-api:${VERSION}
docker run --rm -v ${PWD}/api/controlplane/frontlas/v1:/api/controlplane/v1 image-gen-api:${VERSION}
# bench
.PHONY: bench

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc-gen-go v1.34.1
// protoc v3.21.9
// source: controlplane.proto
@@ -1451,11 +1451,12 @@ var file_controlplane_proto_rawDesc = []byte{
0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x12, 0x13, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x42, 0x35, 0x5a,
0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x6e, 0x67,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x42, 0x3e, 0x5a,
0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x6e, 0x67,
0x63, 0x68, 0x69, 0x61, 0x2f, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x69, 0x65, 0x72, 0x2f, 0x61, 0x70,
0x69, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2f, 0x76,
0x31, 0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x69, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2f, 0x66,
0x72, 0x6f, 0x6e, 0x74, 0x69, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-http. DO NOT EDIT.
// versions:
// - protoc-gen-go-http v2.7.2
// - protoc-gen-go-http v2.7.3
// - protoc v3.21.9
// source: controlplane.proto

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v3.14.0
// protoc-gen-go v1.34.1
// protoc v3.21.9
// source: cluster.proto
package v1

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.14.0
// - protoc v3.21.9
// source: cluster.proto
package v1

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-http. DO NOT EDIT.
// versions:
// - protoc-gen-go-http v2.6.1
// - protoc v3.14.0
// - protoc-gen-go-http v2.7.3
// - protoc v3.21.9
// source: cluster.proto
package v1
@@ -232,7 +232,7 @@ func (c *ClusterServiceHTTPClientImpl) GetEdgeByID(ctx context.Context, in *GetE
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) GetEdgesCount(ctx context.Context, in *GetEdgesCountRequest, opts ...http.CallOption) (*GetEdgesCountResponse, error) {
@@ -245,7 +245,7 @@ func (c *ClusterServiceHTTPClientImpl) GetEdgesCount(ctx context.Context, in *Ge
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) GetFrontierByEdge(ctx context.Context, in *GetFrontierByEdgeIDRequest, opts ...http.CallOption) (*GetFrontierByEdgeIDResponse, error) {
@@ -258,7 +258,7 @@ func (c *ClusterServiceHTTPClientImpl) GetFrontierByEdge(ctx context.Context, in
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) GetServiceByID(ctx context.Context, in *GetServiceByIDRequest, opts ...http.CallOption) (*GetServiceByIDResponse, error) {
@@ -271,7 +271,7 @@ func (c *ClusterServiceHTTPClientImpl) GetServiceByID(ctx context.Context, in *G
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) GetServicesCount(ctx context.Context, in *GetServicesCountRequest, opts ...http.CallOption) (*GetServicesCountResponse, error) {
@@ -284,7 +284,7 @@ func (c *ClusterServiceHTTPClientImpl) GetServicesCount(ctx context.Context, in
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) ListEdges(ctx context.Context, in *ListEdgesRequest, opts ...http.CallOption) (*ListEdgesResponse, error) {
@@ -297,7 +297,7 @@ func (c *ClusterServiceHTTPClientImpl) ListEdges(ctx context.Context, in *ListEd
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) ListFrontiers(ctx context.Context, in *ListFrontiersRequest, opts ...http.CallOption) (*ListFrontiersResponse, error) {
@@ -310,7 +310,7 @@ func (c *ClusterServiceHTTPClientImpl) ListFrontiers(ctx context.Context, in *Li
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}
func (c *ClusterServiceHTTPClientImpl) ListServices(ctx context.Context, in *ListServicesRequest, opts ...http.CallOption) (*ListServicesResponse, error) {
@@ -323,5 +323,5 @@ func (c *ClusterServiceHTTPClientImpl) ListServices(ctx context.Context, in *Lis
if err != nil {
return nil, err
}
return &out, err
return &out, nil
}

View File

@@ -0,0 +1,292 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.34.1
// protoc v3.21.9
// source: health.proto
package v1
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type HealthCheckResponse_ServingStatus int32
const (
HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method.
)
// Enum value maps for HealthCheckResponse_ServingStatus.
var (
HealthCheckResponse_ServingStatus_name = map[int32]string{
0: "UNKNOWN",
1: "SERVING",
2: "NOT_SERVING",
3: "SERVICE_UNKNOWN",
}
HealthCheckResponse_ServingStatus_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
"NOT_SERVING": 2,
"SERVICE_UNKNOWN": 3,
}
)
func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus {
p := new(HealthCheckResponse_ServingStatus)
*p = x
return p
}
func (x HealthCheckResponse_ServingStatus) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor {
return file_health_proto_enumTypes[0].Descriptor()
}
func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType {
return &file_health_proto_enumTypes[0]
}
func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead.
func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
return file_health_proto_rawDescGZIP(), []int{1, 0}
}
type HealthCheckRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
}
func (x *HealthCheckRequest) Reset() {
*x = HealthCheckRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_health_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthCheckRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthCheckRequest) ProtoMessage() {}
func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message {
mi := &file_health_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead.
func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
return file_health_proto_rawDescGZIP(), []int{0}
}
func (x *HealthCheckRequest) GetService() string {
if x != nil {
return x.Service
}
return ""
}
type HealthCheckResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=controlplane.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
}
func (x *HealthCheckResponse) Reset() {
*x = HealthCheckResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_health_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthCheckResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthCheckResponse) ProtoMessage() {}
func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message {
mi := &file_health_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead.
func (*HealthCheckResponse) Descriptor() ([]byte, []int) {
return file_health_proto_rawDescGZIP(), []int{1}
}
func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
if x != nil {
return x.Status
}
return HealthCheckResponse_UNKNOWN
}
var File_health_proto protoreflect.FileDescriptor
var file_health_proto_rawDesc = []byte{
0x0a, 0x0c, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c,
0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x1a, 0x1c, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2e, 0x0a, 0x12, 0x48, 0x65,
0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xaf, 0x01, 0x0a, 0x13, 0x48,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x47, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e,
0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4f, 0x0a, 0x0d, 0x53,
0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07,
0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, 0x52,
0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45,
0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, 0x56, 0x49,
0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x32, 0xc2, 0x01, 0x0a,
0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x68, 0x0a, 0x05, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x12, 0x20, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2e,
0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x21, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e,
0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1a, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x14, 0x12, 0x12, 0x2f,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x12, 0x4e, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x20, 0x2e, 0x63, 0x6f, 0x6e,
0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x63,
0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c,
0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30,
0x01, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x73, 0x69, 0x6e, 0x67, 0x63, 0x68, 0x69, 0x61, 0x2f, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x69, 0x65,
0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x70, 0x6c, 0x61,
0x6e, 0x65, 0x2f, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x6c, 0x61, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x76,
0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_health_proto_rawDescOnce sync.Once
file_health_proto_rawDescData = file_health_proto_rawDesc
)
func file_health_proto_rawDescGZIP() []byte {
file_health_proto_rawDescOnce.Do(func() {
file_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_health_proto_rawDescData)
})
return file_health_proto_rawDescData
}
var file_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_health_proto_goTypes = []interface{}{
(HealthCheckResponse_ServingStatus)(0), // 0: controlplane.HealthCheckResponse.ServingStatus
(*HealthCheckRequest)(nil), // 1: controlplane.HealthCheckRequest
(*HealthCheckResponse)(nil), // 2: controlplane.HealthCheckResponse
}
var file_health_proto_depIdxs = []int32{
0, // 0: controlplane.HealthCheckResponse.status:type_name -> controlplane.HealthCheckResponse.ServingStatus
1, // 1: controlplane.Health.Check:input_type -> controlplane.HealthCheckRequest
1, // 2: controlplane.Health.Watch:input_type -> controlplane.HealthCheckRequest
2, // 3: controlplane.Health.Check:output_type -> controlplane.HealthCheckResponse
2, // 4: controlplane.Health.Watch:output_type -> controlplane.HealthCheckResponse
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_health_proto_init() }
func file_health_proto_init() {
if File_health_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthCheckRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthCheckResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_health_proto_rawDesc,
NumEnums: 1,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_health_proto_goTypes,
DependencyIndexes: file_health_proto_depIdxs,
EnumInfos: file_health_proto_enumTypes,
MessageInfos: file_health_proto_msgTypes,
}.Build()
File_health_proto = out.File
file_health_proto_rawDesc = nil
file_health_proto_goTypes = nil
file_health_proto_depIdxs = nil
}

View File

@@ -0,0 +1,31 @@
syntax = "proto3";
package controlplane;
option go_package = "github.com/singchia/frontier/api/controlplane/frontlas/v1;v1";
import "google/api/annotations.proto";
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse) {
option(google.api.http) = {
get: "/cluster/v1/health"
};
};
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

View File

@@ -0,0 +1,174 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.21.9
// source: health.proto
package v1
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const (
Health_Check_FullMethodName = "/controlplane.Health/Check"
Health_Watch_FullMethodName = "/controlplane.Health/Watch"
)
// HealthClient is the client API for Health service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HealthClient interface {
Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error)
}
type healthClient struct {
cc grpc.ClientConnInterface
}
func NewHealthClient(cc grpc.ClientConnInterface) HealthClient {
return &healthClient{cc}
}
func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &healthWatchClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Health_WatchClient interface {
Recv() (*HealthCheckResponse, error)
grpc.ClientStream
}
type healthWatchClient struct {
grpc.ClientStream
}
func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) {
m := new(HealthCheckResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// HealthServer is the server API for Health service.
// All implementations must embed UnimplementedHealthServer
// for forward compatibility
type HealthServer interface {
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
Watch(*HealthCheckRequest, Health_WatchServer) error
mustEmbedUnimplementedHealthServer()
}
// UnimplementedHealthServer must be embedded to have forward compatible implementations.
type UnimplementedHealthServer struct {
}
func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
}
func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {}
// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to HealthServer will
// result in compilation errors.
type UnsafeHealthServer interface {
mustEmbedUnimplementedHealthServer()
}
func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) {
s.RegisterService(&Health_ServiceDesc, srv)
}
func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HealthServer).Check(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Health_Check_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(HealthCheckRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(HealthServer).Watch(m, &healthWatchServer{stream})
}
type Health_WatchServer interface {
Send(*HealthCheckResponse) error
grpc.ServerStream
}
type healthWatchServer struct {
grpc.ServerStream
}
func (x *healthWatchServer) Send(m *HealthCheckResponse) error {
return x.ServerStream.SendMsg(m)
}
// Health_ServiceDesc is the grpc.ServiceDesc for Health service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Health_ServiceDesc = grpc.ServiceDesc{
ServiceName: "controlplane.Health",
HandlerType: (*HealthServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Check",
Handler: _Health_Check_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _Health_Watch_Handler,
ServerStreams: true,
},
},
Metadata: "health.proto",
}

View File

@@ -0,0 +1,75 @@
// Code generated by protoc-gen-go-http. DO NOT EDIT.
// versions:
// - protoc-gen-go-http v2.7.3
// - protoc v3.21.9
// source: health.proto
package v1
import (
context "context"
http "github.com/go-kratos/kratos/v2/transport/http"
binding "github.com/go-kratos/kratos/v2/transport/http/binding"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the kratos package it is being compiled against.
var _ = new(context.Context)
var _ = binding.EncodeURL
const _ = http.SupportPackageIsVersion1
const OperationHealthCheck = "/controlplane.Health/Check"
type HealthHTTPServer interface {
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
}
func RegisterHealthHTTPServer(s *http.Server, srv HealthHTTPServer) {
r := s.Route("/")
r.GET("/cluster/v1/health", _Health_Check0_HTTP_Handler(srv))
}
func _Health_Check0_HTTP_Handler(srv HealthHTTPServer) func(ctx http.Context) error {
return func(ctx http.Context) error {
var in HealthCheckRequest
if err := ctx.BindQuery(&in); err != nil {
return err
}
http.SetOperation(ctx, OperationHealthCheck)
h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.Check(ctx, req.(*HealthCheckRequest))
})
out, err := h(ctx, &in)
if err != nil {
return err
}
reply := out.(*HealthCheckResponse)
return ctx.Result(200, reply)
}
}
type HealthHTTPClient interface {
Check(ctx context.Context, req *HealthCheckRequest, opts ...http.CallOption) (rsp *HealthCheckResponse, err error)
}
type HealthHTTPClientImpl struct {
cc *http.Client
}
func NewHealthHTTPClient(client *http.Client) HealthHTTPClient {
return &HealthHTTPClientImpl{client}
}
func (c *HealthHTTPClientImpl) Check(ctx context.Context, in *HealthCheckRequest, opts ...http.CallOption) (*HealthCheckResponse, error) {
var out HealthCheckResponse
pattern := "/cluster/v1/health"
path := binding.EncodeURL(pattern, in, true)
opts = append(opts, http.Operation(OperationHealthCheck))
opts = append(opts, http.PathTemplate(pattern))
err := c.cc.Invoke(ctx, "GET", path, nil, &out, opts...)
if err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -60,7 +60,7 @@ frontlas:
enable: false
dial:
network: tcp
addr: 127.0.0.1:30022
addr: 127.0.0.1:40012
metrics:
enable: false
interval: 0

View File

@@ -9,11 +9,11 @@ daemon:
control_plane:
listen:
network: tcp
addr: 0.0.0.0:30021
addr: 0.0.0.0:40011
frontier_plane:
listen:
network: tcp
addr: 0.0.0.0:30022
addr: 0.0.0.0:40012
expiration:
service_meta: 30
edge_meta: 30
@@ -21,5 +21,5 @@ redis:
mode: standalone
standalone:
network: tcp
addr: 127.0.0.1:6379
addr: redis:6379
db: 0

View File

@@ -72,7 +72,7 @@ func main() {
}()
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
frontlasAddress := pflag.String("frontlas_address", "127.0.0.1:30021", "frontlas address to dial, mutex with address")
frontlasAddress := pflag.String("frontlas_address", "127.0.0.1:40011", "frontlas address to dial, mutually exclusive with address")
frontlas := pflag.Bool("frontlas", false, "frontlas or frontier")
loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error")
serviceName := pflag.String("service", "foo", "service name")

View File

@@ -3,6 +3,9 @@ FROM golang:1.18-alpine
# Install curl and unzip, which are required to add protoc.
RUN apk add --no-cache curl unzip protoc protobuf-dev
ENV GO111MODULE=on \
GOPROXY=https://goproxy.io,direct
RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest \
&& go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest \
&& go install github.com/go-kratos/kratos/cmd/kratos/v2@latest \
@@ -16,5 +19,5 @@ RUN curl --location https://github.com/googleapis/api-common-protos/archive/refs
&& tar zxvf 1.50.0.tar.gz \
&& cp -R /usr/src/protoc/api-common-protos-1.50.0/ /protos/
WORKDIR /api/controlplane/frontier/v1
WORKDIR /api/controlplane/v1
CMD protoc --proto_path=./ --proto_path=/protos/ --go_out=paths=source_relative:./ --go-http_out=paths=source_relative:./ --go-grpc_out=paths=source_relative:./ --go-errors_out=paths=source_relative:./ --experimental_allow_proto3_optional=true ./*.proto

View File

@@ -8,8 +8,8 @@ ARG TARGETARCH
ENV GO111MODULE=on \
GOPROXY=https://goproxy.io,direct
WORKDIR /go/src/github.com/singchia/frontlas
RUN --mount=type=bind,readwrite,target=/go/src/github.com/singchia/frontlas \
WORKDIR /go/src/github.com/singchia/frontier
RUN --mount=type=bind,readwrite,target=/go/src/github.com/singchia/frontier \
make DESTDIR=/tmp/install all install-frontlas
FROM alpine:3.14
@@ -21,8 +21,8 @@ RUN wget -q -O /etc/apk/keys/sgerrand.rsa.pub https://alpine-pkgs.sgerrand.com/s
RUN wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/2.34-r0/glibc-2.34-r0.apk
RUN apk add glibc-2.34-r0.apk
EXPOSE 30021
EXPOSE 30022
EXPOSE 40011
EXPOSE 40012
ENTRYPOINT ["/usr/bin/frontlas"]
CMD ["--config", "/usr/conf/frontlas.yaml"]

View File

@@ -351,6 +351,11 @@ func Parse() (*Configuration, error) {
if nodeName != "" {
config.Daemon.FrontierID = "frontier-" + nodeName
}
frontlasAddr := os.Getenv("FRONTLAS_ADDR")
if frontlasAddr != "" {
config.Frontlas.Enable = true
config.Frontlas.Dial.Addr = frontlasAddr
}
return config, nil
}
@@ -440,7 +445,7 @@ func genDefaultConfig(writer io.Writer) error {
Enable: false,
Dial: config.Dial{
Network: "tcp",
Addr: "127.0.0.1:30022",
Addr: "127.0.0.1:40012",
TLS: config.TLS{
Enable: false,
MTLS: false,

View File

@@ -31,7 +31,9 @@ func (em *edgeManager) online(end geminio.End) error {
}
}
em.edges[end.ClientID()] = end
em.informer.SetEdgeCount(len(em.edges))
if em.informer != nil {
em.informer.SetEdgeCount(len(em.edges))
}
em.mtx.Unlock()
if sync != nil {
@@ -69,7 +71,9 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error {
} else {
klog.Warningf("edge offline, edgeID: %d not found in cache", edgeID)
}
em.informer.SetEdgeCount(len(em.edges))
if em.informer != nil {
em.informer.SetEdgeCount(len(em.edges))
}
em.mtx.Unlock()
defer func() {

View File

@@ -23,10 +23,16 @@ func NewServer(conf *config.Configuration, repo apis.Repo, mqm apis.MQM) (*Serve
tmr := timer.NewTimer()
// informer
inf, err := frontlas.NewInformer(conf, tmr)
if err != nil {
klog.Errorf("new informer err: %s", err)
return nil, err
var (
inf *frontlas.Informer
err error
)
if conf.Frontlas.Enable {
inf, err = frontlas.NewInformer(conf, tmr)
if err != nil {
klog.Errorf("new informer err: %s", err)
return nil, err
}
}
// exchange

View File

@@ -33,7 +33,9 @@ func (sm *serviceManager) online(end geminio.End, meta *apis.Meta) error {
}
}
sm.services[end.ClientID()] = end
sm.informer.SetServiceCount(len(sm.services))
if sm.informer != nil {
sm.informer.SetServiceCount(len(sm.services))
}
if sync != nil {
// unlikely here
@@ -71,7 +73,9 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error {
} else {
klog.Warningf("service offline, serviceID: %d not found in cache", serviceID)
}
sm.informer.SetServiceCount(len(sm.services))
if sm.informer != nil {
sm.informer.SetServiceCount(len(sm.services))
}
defer func() {
if legacy {

View File

@@ -1,6 +1,8 @@
package cluster
import (
"sync/atomic"
"github.com/go-kratos/kratos/v2"
"github.com/singchia/frontier/pkg/frontlas/cluster/server"
"github.com/singchia/frontier/pkg/frontlas/cluster/service"
@@ -12,8 +14,9 @@ import (
)
type Cluster struct {
cm cmux.CMux
app *kratos.App
cm cmux.CMux
app *kratos.App
ready int32
}
func NewCluster(conf *config.Configuration, dao *repo.Dao) (*Cluster, error) {
@@ -23,23 +26,32 @@ func NewCluster(conf *config.Configuration, dao *repo.Dao) (*Cluster, error) {
klog.Errorf("control plane listen err: %s", err)
return nil, err
}
cluster := &Cluster{}
// service
svc := service.NewClusterService(dao)
clustersvc := service.NewClusterService(dao, cluster)
// http and grpc server
cm := cmux.New(ln)
grpcLn := cm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpLn := cm.Match(cmux.Any())
gs := server.NewGRPCServer(grpcLn, svc)
hs := server.NewHTTPServer(httpLn, svc)
gs := server.NewGRPCServer(grpcLn, clustersvc, clustersvc)
hs := server.NewHTTPServer(httpLn, clustersvc, clustersvc)
app := kratos.New(kratos.Server(gs, hs))
return &Cluster{
cm: cm,
app: app,
}, nil
cluster.cm = cm
cluster.app = app
return cluster, nil
}
func (cluster *Cluster) Ready() bool {
value := atomic.LoadInt32(&cluster.ready)
return value == 1
}
func (cluster *Cluster) SetReady() {
atomic.StoreInt32(&cluster.ready, 1)
}
func (cluster *Cluster) Serve() error {

View File

@@ -8,13 +8,14 @@ import (
v1 "github.com/singchia/frontier/api/controlplane/frontlas/v1"
)
func NewGRPCServer(ln net.Listener, svc v1.ClusterServiceServer) *grpc.Server {
func NewGRPCServer(ln net.Listener, clustersvc v1.ClusterServiceServer, healthsvc v1.HealthServer) *grpc.Server {
// new server
opts := []grpc.ServerOption{
grpc.Middleware(recovery.Recovery()),
grpc.Listener(ln),
}
srv := grpc.NewServer(opts...)
v1.RegisterClusterServiceServer(srv, svc)
v1.RegisterClusterServiceServer(srv, clustersvc)
v1.RegisterHealthServer(srv, healthsvc)
return srv
}

View File

@@ -3,19 +3,38 @@ package server
import (
"net"
nethttp "net/http"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/http"
v1 "github.com/singchia/frontier/api/controlplane/frontlas/v1"
)
func NewHTTPServer(ln net.Listener, svc v1.ClusterServiceHTTPServer) *http.Server {
func NewHTTPServer(ln net.Listener, clustersvc v1.ClusterServiceHTTPServer, healthsvc v1.HealthServer) *http.Server {
// new server
opts := []http.ServerOption{
http.Middleware(recovery.Recovery()),
http.Listener(ln),
}
opts = append(opts, http.ResponseEncoder(responseEncoder))
srv := http.NewServer(opts...)
v1.RegisterClusterServiceHTTPServer(srv, svc)
v1.RegisterClusterServiceHTTPServer(srv, clustersvc)
v1.RegisterHealthHTTPServer(srv, healthsvc)
return srv
}
func responseEncoder(w http.ResponseWriter, r *http.Request, v interface{}) error {
if v == nil {
return nil
}
healthCheckResponse, ok := v.(*v1.HealthCheckResponse)
if ok {
if healthCheckResponse.Status == v1.HealthCheckResponse_SERVING {
w.WriteHeader(nethttp.StatusOK)
} else {
w.WriteHeader(nethttp.StatusExpectationFailed)
}
}
return nil
}

View File

@@ -9,14 +9,17 @@ import (
type ClusterService struct {
v1.UnimplementedClusterServiceServer
v1.UnimplementedHealthServer
// repo
repo *repo.Dao
repo *repo.Dao
readiness Readiness
}
func NewClusterService(repo *repo.Dao) *ClusterService {
func NewClusterService(repo *repo.Dao, readiness Readiness) *ClusterService {
cs := &ClusterService{
repo: repo,
repo: repo,
readiness: readiness,
}
return cs
}

View File

@@ -0,0 +1,29 @@
package service
import (
"context"
v1 "github.com/singchia/frontier/api/controlplane/frontlas/v1"
)
type Readiness interface {
Ready() bool
}
func (cs *ClusterService) Check(context.Context, *v1.HealthCheckRequest) (*v1.HealthCheckResponse, error) {
ready := cs.readiness.Ready()
status := v1.HealthCheckResponse_NOT_SERVING
if ready {
status = v1.HealthCheckResponse_SERVING
}
return &v1.HealthCheckResponse{Status: status}, nil
}
func (cs *ClusterService) Watch(_ *v1.HealthCheckRequest, stream v1.Health_WatchServer) error {
ready := cs.readiness.Ready()
status := v1.HealthCheckResponse_NOT_SERVING
if ready {
status = v1.HealthCheckResponse_SERVING
}
return stream.Send(&v1.HealthCheckResponse{Status: status})
}

View File

@@ -5,6 +5,8 @@ import (
"io"
"net"
"os"
"strconv"
"strings"
armio "github.com/jumboframes/armorigo/io"
"github.com/singchia/frontier/pkg/config"
@@ -190,7 +192,7 @@ func Parse() (*Configuration, error) {
if config.Daemon.PProf.CPUProfileRate == 0 {
config.Daemon.PProf.CPUProfileRate = 10000
}
// env
// env, set only exists
cpPort := os.Getenv("FRONTLAS_CONTROLPLANE_PORT")
if cpPort != "" {
host, _, err := net.SplitHostPort(config.ControlPlane.Listen.Addr)
@@ -199,6 +201,38 @@ func Parse() (*Configuration, error) {
}
config.ControlPlane.Listen.Addr = net.JoinHostPort(host, cpPort)
}
redisType := os.Getenv("REDIS_TYPE")
redisAddrs := os.Getenv("REDIS_ADDRS")
redisUser := os.Getenv("REDIS_USER")
redisPassword := os.Getenv("REDIS_PASSWORD")
redisDB := os.Getenv("REDIS_DB")
redisMasterName := os.Getenv("MASTER_NAME")
switch redisType {
case "standalone":
addrs := strings.Split(redisAddrs, ",")
db, err := strconv.Atoi(redisDB)
if err != nil {
return nil, err
}
config.Redis.Standalone.DB = db
config.Redis.Standalone.Addr = addrs[0]
config.Redis.Username = redisUser
config.Redis.Password = redisPassword
config.Redis.Mode = redisType
case "sentinel":
addrs := strings.Split(redisAddrs, ",")
config.Redis.Sentinel.Addrs = addrs
config.Redis.Sentinel.MasterName = redisMasterName
config.Redis.Username = redisUser
config.Redis.Password = redisPassword
config.Redis.Mode = redisType
case "cluster":
addrs := strings.Split(redisAddrs, ",")
config.Redis.Cluster.Addrs = addrs
config.Redis.Username = redisUser
config.Redis.Password = redisPassword
config.Redis.Mode = redisType
}
return config, nil
}
@@ -216,13 +250,13 @@ func genDefaultConfig(writer io.Writer) error {
ControlPlane: ControlPlane{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:30021",
Addr: "0.0.0.0:40011",
},
},
FrontierManager: FrontierManager{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:30022",
Addr: "0.0.0.0:40012",
},
},
Redis: Redis{

View File

@@ -31,6 +31,7 @@ func NewServer(conf *config.Configuration, repo *repo.Dao) (*Server, error) {
klog.Errorf("new cluster err: %s", err)
return nil, err
}
cluster.SetReady()
return &Server{
tmr: tmr,

View File

@@ -1,5 +1,5 @@
# Image URL to use all building/pushing image targets
IMG ?= singchia/frontiercluster-controller:1.0.0
IMG ?= singchia/frontiercluster-controller:1.0.0-dev
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.29.0

View File

@@ -3,6 +3,7 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
)
func (fc *FrontierCluster) FrontierServiceboundServicePort() (string, corev1.ServiceType, corev1.ServicePort) {
@@ -14,10 +15,14 @@ func (fc *FrontierCluster) FrontierServiceboundServicePort() (string, corev1.Ser
if fc.Spec.Frontier.Servicebound.Port != 0 {
port.Port = int32(fc.Spec.Frontier.Servicebound.Port)
}
port.TargetPort = intstr.FromInt32(port.Port)
// service type
serviceType := corev1.ServiceTypeClusterIP
if fc.Spec.Frontier.Servicebound.ServiceType != "" {
serviceType = fc.Spec.Frontier.Servicebound.ServiceType
if serviceType == corev1.ServiceTypeNodePort {
port.NodePort = port.Port
}
}
// service name
@@ -37,11 +42,15 @@ func (fc *FrontierCluster) FrontierEdgeboundServicePort() (string, corev1.Servic
if fc.Spec.Frontier.Edgebound.Port != 0 {
port.Port = int32(fc.Spec.Frontier.Edgebound.Port)
}
port.TargetPort = intstr.FromInt32(port.Port)
// service type
serviceType := corev1.ServiceTypeNodePort
if fc.Spec.Frontier.Edgebound.ServiceType != "" {
serviceType = fc.Spec.Frontier.Edgebound.ServiceType
}
if serviceType == corev1.ServiceTypeNodePort {
port.NodePort = port.Port
}
// service name
serviceName := fc.Spec.Frontier.Edgebound.ServiceName
if serviceName != "" {
@@ -50,26 +59,37 @@ func (fc *FrontierCluster) FrontierEdgeboundServicePort() (string, corev1.Servic
return fc.Name + "-edgebound-svc", serviceType, port
}
func (fc *FrontierCluster) FrontlasControlPlaneServicePort() (string, corev1.ServiceType, corev1.ServicePort) {
func (fc *FrontierCluster) FrontlasServicePort() (string, corev1.ServiceType, corev1.ServicePort, corev1.ServicePort) {
// port
port := corev1.ServicePort{
Port: 30012,
cpport := corev1.ServicePort{
Port: 40011,
Name: fc.Name + "-controlplane",
}
if fc.Spec.Frontlas.ControlPlane.Port != 0 {
port.Port = int32(fc.Spec.Frontlas.ControlPlane.Port)
cpport.Port = int32(fc.Spec.Frontlas.ControlPlane.Port)
}
cpport.TargetPort = intstr.FromInt32(cpport.Port)
fpport := corev1.ServicePort{
Port: 40012,
TargetPort: intstr.FromInt32(40012),
Name: fc.Name + "-frontierplane",
}
// service type
serviceType := corev1.ServiceTypeNodePort
serviceType := corev1.ServiceTypeClusterIP
if fc.Spec.Frontlas.ControlPlane.ServiceType != "" {
serviceType = fc.Spec.Frontlas.ControlPlane.ServiceType
if serviceType == corev1.ServiceTypeNodePort {
cpport.NodePort = cpport.Port
fpport.NodePort = fpport.Port
}
}
// service name
serviceName := fc.Spec.Frontlas.ControlPlane.ServiceName
if serviceName != "" {
return serviceName, serviceType, port
return serviceName, serviceType, cpport, fpport
}
return fc.Name + "-controlplane-svc", serviceType, port
return fc.Name + "-frontlas-svc", serviceType, cpport, fpport
}
// EBTLSCASecretNamespacedName will get the namespaced name of the Secret containing the CA certificate

View File

@@ -83,16 +83,19 @@ const (
)
type Redis struct {
Addrs []string `json:"addrs"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
RedisType RedisType `json:"redisType"`
Addrs []string `json:"addrs"`
DB int `json:"db,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
RedisType RedisType `json:"redisType"`
MasterName string `json:"masterName,omitempty"`
}
type Frontlas struct {
Replicas int `json:"replicas,omitempty"` // frontlas replicas, default 1
ControlPlane ControlPlane `json:"controlplane,omitempty"`
NodeAffinity corev1.NodeAffinity `json:"nodeAffinity,omitempty"`
Redis Redis `json:"redis"`
}
// FrontierClusterSpec defines the desired state of FrontierCluster

View File

@@ -170,6 +170,7 @@ func (in *Frontlas) DeepCopyInto(out *Frontlas) {
*out = *in
out.ControlPlane = in.ControlPlane
in.NodeAffinity.DeepCopyInto(&out.NodeAffinity)
in.Redis.DeepCopyInto(&out.Redis)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Frontlas.

View File

@@ -505,8 +505,30 @@ spec:
type: object
x-kubernetes-map-type: atomic
type: object
redis:
properties:
addrs:
items:
type: string
type: array
db:
type: integer
masterName:
type: string
password:
type: string
redisType:
type: string
user:
type: string
required:
- addrs
- redisType
type: object
replicas:
type: integer
required:
- redis
type: object
required:
- frontier

View File

@@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: singchia/frontiercluster-controller
newTag: 1.0.0
newTag: 1.0.0-dev

View File

@@ -517,8 +517,28 @@ spec:
type: object
x-kubernetes-map-type: atomic
type: object
redis:
properties:
addrs:
items:
type: string
type: array
db:
type: integer
password:
type: string
redisType:
type: string
user:
type: string
required:
- addrs
- redisType
type: object
replicas:
type: integer
required:
- redis
type: object
required:
- frontier

View File

@@ -3,7 +3,9 @@ package controller
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"github.com/singchia/frontier/operator/api/v1alpha1"
"github.com/singchia/frontier/operator/pkg/kube/container"
@@ -15,20 +17,32 @@ import (
)
const (
// image
FrontierImageEnv = "FRONTIER_IMAGE"
FrontlasImageEnv = "FRONTLAS_IMAGE"
// node
// node for frontlas
NodeNameEnv = "NODE_NAME"
// port
// port for frontier and frontlas
FrontierServiceboundPortEnv = "FRONTIER_SERVICEBOUND_PORT"
FrontierEdgeboundPortEnv = "FRONTIER_EDGEBOUND_PORT"
FrontlasControlPlanePortEnv = "FRONTLAS_CONTROLPLANE_PORT"
// tls
// tls for frontier
FrontierEdgeboundTLSCAMountPath = "/app/conf/edgebound/tls/ca"
FrontierEdgebountTLSCertKeyMountPath = "/app/conf/edgebound/tls/secret"
// redis for frontlas
FrontlasRedisAddrsEnv = "REDIS_ADDRS"
FrontlasRedisDBEnv = "REDIS_DB"
FrontlasRedisUserEnv = "REDIS_USER"
FrontlasRedisPasswordEnv = "REDIS_PASSWORD"
FrontlasRedisTypeEnv = "REDIS_TYPE"
FrontlasRedisMasterName = "MASTER_NAME"
// inner addr
FrontlasAddrEnv = "FRONTLAS_ADDR" // service + frontierport
)
func (r *FrontierClusterReconciler) ensureDeployment(ctx context.Context, fc v1alpha1.FrontierCluster) (bool, error) {
@@ -109,11 +123,13 @@ func (r *FrontierClusterReconciler) ensureFrontierDeployment(ctx context.Context
sbservice, _, sbport := fc.FrontierServiceboundServicePort()
_, _, ebport := fc.FrontierEdgeboundServicePort()
frontierservice, _, _, fpport := fc.FrontlasServicePort()
// container
container := container.Builder().
SetName("frontier").
SetImage("singchia/frontier:1.0.0-dev").
SetImagePullPolicy(corev1.PullAlways).
SetEnvs([]corev1.EnvVar{{
Name: FrontierServiceboundPortEnv,
Value: strconv.Itoa(int(sbport.Port)),
@@ -127,6 +143,9 @@ func (r *FrontierClusterReconciler) ensureFrontierDeployment(ctx context.Context
FieldPath: "spec.nodeName",
},
},
}, {
Name: FrontlasAddrEnv,
Value: net.JoinHostPort(frontierservice, strconv.Itoa(int(fpport.Port))),
}}).
SetCommand(nil).
SetArgs(nil).
@@ -185,16 +204,50 @@ func (r *FrontierClusterReconciler) ensureFrontlasDeployment(ctx context.Context
"app": app,
}
service, _, port := fc.FrontlasControlPlaneServicePort()
service, _, cpport, _ := fc.FrontlasServicePort()
// container
container := container.Builder().
SetName("frontlas").
SetImage("singchia/frontlas:1.0.0-dev").
SetImagePullPolicy(corev1.PullAlways).
SetEnvs([]corev1.EnvVar{{
Name: FrontlasControlPlanePortEnv,
Value: strconv.Itoa(int(port.Port)),
Value: strconv.Itoa(int(cpport.Port)),
}, {
Name: FrontlasRedisAddrsEnv,
Value: strings.Join(fc.Spec.Frontlas.Redis.Addrs, ","),
}, {
Name: FrontlasRedisUserEnv,
Value: fc.Spec.Frontlas.Redis.User,
}, {
Name: FrontlasRedisPasswordEnv,
Value: fc.Spec.Frontlas.Redis.Password,
}, {
Name: FrontlasRedisTypeEnv,
Value: string(fc.Spec.Frontlas.Redis.RedisType),
}, {
Name: FrontlasRedisDBEnv,
Value: strconv.Itoa(fc.Spec.Frontlas.Redis.DB),
}, {
Name: FrontlasRedisMasterName,
Value: fc.Spec.Frontlas.Redis.MasterName,
}}).
SetReadinessProbe(&corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
/* 1.24+
GRPC: &corev1.GRPCAction{
Port: cpport.TargetPort.IntVal,
Service: &service,
},
*/
HTTPGet: &corev1.HTTPGetAction{
Port: cpport.TargetPort,
Path: "/cluster/v1/health",
},
},
PeriodSeconds: 5,
}).
SetCommand(nil).
SetArgs(nil).
Build()

View File

@@ -50,7 +50,7 @@ func (r *FrontierClusterReconciler) ensureService(ctx context.Context, fc v1alph
}
// controlplane
cpServiceName, cpServiceType, port := fc.FrontlasControlPlaneServicePort()
cpServiceName, cpServiceType, cpport, fpport := fc.FrontlasServicePort()
label = map[string]string{
"app": fc.Name + "-frontlas",
}
@@ -62,7 +62,9 @@ func (r *FrontierClusterReconciler) ensureService(ctx context.Context, fc v1alph
SetServiceType(cpServiceType).
SetPublishNotReadyAddresses(true).
SetOwnerReferences(fc.GetOwnerReferences()).
AddPort(&port).Build()
AddPort(&cpport).
AddPort(&fpport).
Build()
if err := service.CreateOrUpdate(ctx, r.client, cpService); err != nil {
return fmt.Errorf("Could not ensure controlplane service: %s", err)

View File

@@ -13,6 +13,7 @@ type builder struct {
command []string
args []string
envs []corev1.EnvVar
readinessProbe *corev1.Probe
volumeMounts []corev1.VolumeMount
ports []corev1.ContainerPort
@@ -63,6 +64,11 @@ func (b *builder) SetPorts(ports []corev1.ContainerPort) *builder {
return b
}
func (b *builder) SetReadinessProbe(readinessProbe *corev1.Probe) *builder {
b.readinessProbe = readinessProbe
return b
}
func (b *builder) Build() corev1.Container {
return corev1.Container{
Name: b.name,
@@ -74,6 +80,7 @@ func (b *builder) Build() corev1.Container {
Env: b.envs,
VolumeMounts: b.volumeMounts,
Ports: b.ports,
ReadinessProbe: b.readinessProbe,
}
}