mirror of
https://github.com/eolinker/apinto
synced 2025-09-26 12:51:12 +08:00
支持自动构建arm镜像
This commit is contained in:
22
.github/workflows/actions.yaml
vendored
22
.github/workflows/actions.yaml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
- name: SetUpGo
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.21.1'
|
||||
go-version: '1.23.6'
|
||||
- name: Checkout #Checkout代码
|
||||
uses: actions/checkout@v3
|
||||
- name: GoTidy
|
||||
@@ -32,14 +32,17 @@ jobs:
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
dockerImage:
|
||||
needs: [releasePackage]
|
||||
name: buildDockerImage
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: SetOutput #处理Tag字符串并存进outputs
|
||||
id: vars
|
||||
run: |
|
||||
echo "tag=${GITHUB_REF#refs/*/v}" >> $GITHUB_OUTPUT
|
||||
- name: SetupGo
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.21.1'
|
||||
go-version: '1.23.6'
|
||||
- uses: actions/checkout@v3
|
||||
- name: SetOutput
|
||||
id: vars
|
||||
@@ -47,20 +50,11 @@ jobs:
|
||||
- name: GoTidy
|
||||
run: |
|
||||
go mod tidy
|
||||
- name: Pack #运行打包脚本并且将生成的tar包复制至Dockerfile所在目录
|
||||
run: |
|
||||
./build/cmd/package.sh ${{ steps.vars.outputs.tag }}
|
||||
cp ./out/apinto_${{ steps.vars.outputs.tag }}_linux_amd64.tar.gz ./build/resources/apinto.linux.x64.tar.gz
|
||||
- name: Login Docker #登录docker
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_TOKEN }}
|
||||
- name: buildImage
|
||||
- name: Push #运行打包脚本并且将生成的tar包复制至Dockerfile所在目录
|
||||
run: |
|
||||
docker build -t ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }} -f ./build/resources/Dockerfile ./build/resources
|
||||
- name: pushImage
|
||||
run: |
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }}
|
||||
docker tag ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }} ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:latest
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:latest
|
||||
./build/cmd/docker_publish.sh ${{ steps.vars.outputs.tag }} ${{ secrets.DOCKER_USERNAME }} "" latest
|
||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@@ -22,3 +22,4 @@
|
||||
.gitlab-ci.yml
|
||||
/outer/
|
||||
.env
|
||||
/.secrets
|
||||
|
@@ -1,120 +0,0 @@
|
||||
package logs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type LokiRequest struct {
|
||||
Streams []*Stream `json:"streams"`
|
||||
}
|
||||
|
||||
type Stream struct {
|
||||
Stream map[string]string `json:"stream"`
|
||||
Values [][]interface{} `json:"values"`
|
||||
}
|
||||
|
||||
func TestSendLogToLoki(t *testing.T) {
|
||||
// 1. Create a new log
|
||||
// 2. Send the log to Loki
|
||||
// 3. Check if the log is in Loki
|
||||
|
||||
items, err := parseLog()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
client := http.Client{}
|
||||
for _, item := range items {
|
||||
body, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPost, "http://localhost:3100/loki/api/v1/push", bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-Scope-OrgID", "tenant1")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.StatusCode != 204 {
|
||||
t.Fatal(resp.StatusCode, string(respBody))
|
||||
}
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
t.Log("Send log to Loki success")
|
||||
|
||||
}
|
||||
|
||||
func parseLog() ([]*LokiRequest, error) {
|
||||
data, err := os.ReadFile("access.log")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 换行分割
|
||||
lines := strings.Split(string(data), "\n")
|
||||
reqMap := map[string]*Stream{}
|
||||
// 解析日志
|
||||
for _, l := range lines {
|
||||
if l == "" {
|
||||
continue
|
||||
}
|
||||
tmp := make(map[string]interface{})
|
||||
err = json.Unmarshal([]byte(l), &tmp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
org := map[string]string{
|
||||
"cluster": tmp["cluster"].(string),
|
||||
"node": tmp["node"].(string),
|
||||
"service": tmp["service"].(string),
|
||||
"api": tmp["api"].(string),
|
||||
"src_ip": tmp["src_ip"].(string),
|
||||
"block_name": tmp["block_name"].(string),
|
||||
}
|
||||
key := genKey(org)
|
||||
if _, ok := reqMap[key]; !ok {
|
||||
reqMap[key] = &Stream{
|
||||
Stream: org,
|
||||
Values: make([][]interface{}, 0),
|
||||
}
|
||||
}
|
||||
requestId := uuid.NewString()
|
||||
reqMap[key].Values = append(reqMap[key].Values, []interface{}{strconv.FormatInt(time.UnixMilli(int64(tmp["msec"].(float64))).Add(5*time.Hour).UnixNano(), 10), l, map[string]interface{}{"request_id": requestId}})
|
||||
}
|
||||
reqs := make([]*LokiRequest, len(reqMap)/10+1)
|
||||
num := 0
|
||||
for _, v := range reqMap {
|
||||
index := num / 10
|
||||
if reqs[index] == nil {
|
||||
reqs[index] = &LokiRequest{
|
||||
Streams: make([]*Stream, 0, 10),
|
||||
}
|
||||
}
|
||||
reqs[index].Streams = append(reqs[index].Streams, v)
|
||||
num++
|
||||
}
|
||||
return reqs, nil
|
||||
}
|
||||
|
||||
func genKey(org map[string]string) string {
|
||||
return fmt.Sprintf("cluster_%s-node_%s-service_%s-api_%s-src_ip_%s-block_name_%s", org["cluster"], org["node"], org["service"], org["api"], org["src_ip"], org["block_name"])
|
||||
}
|
@@ -18,15 +18,19 @@ if [[ "$ARCH" == "" ]]
|
||||
then
|
||||
ARCH="amd64"
|
||||
fi
|
||||
if [[ "$ARCH" == "amd64" ]]
|
||||
then
|
||||
PLATFORM="--platform=linux/amd64"
|
||||
|
||||
SYS_ARCH=$(arch)
|
||||
if [[ (${SYS_ARCH} == "aarch64" || $(arch) == "arm64") && $ARCH == "amd64" ]];then
|
||||
OPTIONS="--platform=linux/amd64"
|
||||
elif [[ $(arch) == "amd64" && $ARCH == "arm64" ]];then
|
||||
OPTIONS="--platform=linux/arm64"
|
||||
fi
|
||||
|
||||
./build/cmd/package.sh ${VERSION} ${ARCH}
|
||||
PackageName=apinto_${VERSION}_linux_${ARCH}.tar.gz
|
||||
cp out/${PackageName} ./build/resources/apinto.linux.x64.tar.gz
|
||||
|
||||
docker build $PLATFORM -t ${Username}/apinto-gateway:${VERSION}-${ARCH} -f ./build/cmd/Dockerfile ./build/resources
|
||||
docker build $OPTIONS -t ${Username}/apinto-gateway:${VERSION}-${ARCH} -f ./build/cmd/Dockerfile ./build/resources
|
||||
|
||||
rm -rf ./build/resources/apinto.linux.x64.tar.gz
|
||||
|
||||
|
@@ -5,40 +5,53 @@ echo $0
|
||||
. $(dirname $0)/common.sh
|
||||
|
||||
|
||||
Version=$(genVersion)
|
||||
Version=$(genVersion $1)
|
||||
echo ${Version}
|
||||
ImageName="docker.eolinker.com/apinto/apinto-gateway"
|
||||
echo "docker manifest rm \"${ImageName}:${Version}\""
|
||||
docker manifest rm "${ImageName}:${Version}"
|
||||
|
||||
set -e
|
||||
./build/cmd/docker_build.sh ${Version} "docker.eolinker.com/apinto" amd64
|
||||
Username="eolinker"
|
||||
if [[ "$1" != "" ]]
|
||||
then
|
||||
Username=$2
|
||||
fi
|
||||
ImageName="${Username}/apinto-gateway"
|
||||
|
||||
./build/cmd/docker_build.sh ${Version} "docker.eolinker.com/apinto" arm64
|
||||
./build/cmd/docker_build.sh ${Version} ${Username} amd64
|
||||
|
||||
./build/cmd/docker_build.sh ${Version} ${Username} arm64
|
||||
|
||||
publish_hub() {
|
||||
Version=$1
|
||||
echo "docker manifest rm \"${ImageName}:${Version}\""
|
||||
docker manifest rm "${ImageName}:${Version}"
|
||||
echo "docker push \"${ImageName}:${Version}-amd64\""
|
||||
docker push "${ImageName}:${Version}-amd64"
|
||||
echo "docker push \"${ImageName}:${Version}-arm64\""
|
||||
docker push "${ImageName}:${Version}-arm64"
|
||||
|
||||
echo "docker push \"${ImageName}:${Version}-amd64\""
|
||||
docker push "${ImageName}:${Version}-amd64"
|
||||
echo "docker push \"${ImageName}:${Version}-arm64\""
|
||||
docker push "${ImageName}:${Version}-arm64"
|
||||
echo "Create manifest ${ImageName}:${Version}"
|
||||
docker manifest create "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" "${ImageName}:${Version}-arm64"
|
||||
|
||||
echo "Create manifest ${ImageName}:${Version}"
|
||||
docker manifest create "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" "${ImageName}:${Version}-arm64"
|
||||
echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-amd64 --os linux --arch amd64"
|
||||
docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" --os linux --arch amd64
|
||||
|
||||
echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-amd64 --os linux --arch amd64"
|
||||
docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" --os linux --arch amd64
|
||||
echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-arm64 --os linux --arch arm64"
|
||||
docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-arm64" --os linux --arch arm64
|
||||
|
||||
echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-arm64 --os linux --arch arm64"
|
||||
docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-arm64" --os linux --arch arm64
|
||||
echo "Push manifest ${ImageName}:${Version}"
|
||||
docker manifest push "${ImageName}:${Version}"
|
||||
}
|
||||
|
||||
echo "Push manifest ${ImageName}:${Version}"
|
||||
docker manifest push "${ImageName}:${Version}"
|
||||
publish_hub ${Version}
|
||||
|
||||
|
||||
PUBLISH=$1
|
||||
PUBLISH=$3
|
||||
if [[ "${PUBLISH}" == "upload" ]];then
|
||||
./build/cmd/qiniu_publish.sh amd64
|
||||
./build/cmd/qiniu_publish.sh arm64
|
||||
fi
|
||||
|
||||
if [[ $4 == "latest" ]];then
|
||||
echo "Publish latest version"
|
||||
docker tag "${ImageName}:${Version}-amd64" "${ImageName}:latest-amd64"
|
||||
docker tag "${ImageName}:${Version}-arm64" "${ImageName}:latest-arm64"
|
||||
publish_hub "latest"
|
||||
fi
|
||||
|
@@ -2,59 +2,120 @@
|
||||
|
||||
ERR_LOG=/var/log/apinto/error.log
|
||||
echo_info() {
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG
|
||||
}
|
||||
|
||||
echo_error() {
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG
|
||||
}
|
||||
|
||||
# 检查环境变量
|
||||
if [ -z "$SERVICE" ] || [ -z "$NAMESPACE" ]; then
|
||||
echo_error "Environment variables SERVICE and NAMESPACE must be set."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 解析当前 Pod 的序号
|
||||
CURRENT_INDEX=${HOSTNAME##*-}
|
||||
BASE_NAME=${HOSTNAME%-*}
|
||||
MAX_ATTEMPTS=60 # 最大尝试次数(节点等待),约1分钟
|
||||
RETRY_INTERVAL=5 # 重试间隔,单位秒
|
||||
MAX_JOIN_RETRIES=12 # 加入集群失败的最大重试次数
|
||||
MAX_POD_INDEX=${MAX_POD_INDEX:-10} # 默认检查最多10个Pod,可通过环境变量配置
|
||||
|
||||
# 检查当前程序是否启动,若无,则等待
|
||||
until curl --max-time 5 --silent --fail http://127.0.0.1:9400; do
|
||||
echo_info "Waiting for localhost to be ready..."
|
||||
sleep 1
|
||||
done
|
||||
|
||||
# 如果当前是 apinto-0,需要特殊处理
|
||||
if [ "$CURRENT_INDEX" -eq 0 ]; then
|
||||
echo_info "This is ${HOSTNAME}. Checking if other nodes exist..."
|
||||
|
||||
# 尝试连接 apinto-1
|
||||
NEXT_POD="${BASE_NAME}-$((CURRENT_INDEX + 1)).${SERVICE}.${NAMESPACE}.svc.cluster.local"
|
||||
if curl --max-time 5 --silent --fail http://${NEXT_POD}:9401; then
|
||||
echo_info "Found a running node: ${NEXT_POD}. Joining the cluster..."
|
||||
./apinto join -addr ${NEXT_POD}:9401 >> $ERR_LOG 2>&1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo_error "Error: Failed to join the cluster."
|
||||
fi
|
||||
else
|
||||
echo_info "No other nodes are available. Starting as the first node."
|
||||
fi
|
||||
else
|
||||
# 对于非 apinto-0 的 Pod,连接到前一个 Pod
|
||||
PREVIOUS_POD="${BASE_NAME}-$((CURRENT_INDEX - 1)).${SERVICE}.${NAMESPACE}.svc.cluster.local"
|
||||
echo_info "This is $HOSTNAME. Waiting for $PREVIOUS_POD to be ready..."
|
||||
|
||||
until curl --max-time 5 --silent --fail http://$PREVIOUS_POD:9401; do
|
||||
echo_info "Waiting for $PREVIOUS_POD to be ready..."
|
||||
# 等待本地服务启动
|
||||
attempt=0
|
||||
until curl --max-time 5 --silent --fail http://127.0.0.1:9400 || [ $attempt -ge $MAX_ATTEMPTS ]; do
|
||||
echo_info "Waiting for localhost to be ready... Attempt $attempt"
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo_info "$PREVIOUS_POD is ready. Joining the cluster..."
|
||||
./apinto join -addr $PREVIOUS_POD:9401 >> $ERR_LOG 2>&1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo_error "Error: Failed to join the cluster."
|
||||
fi
|
||||
attempt=$((attempt + 1))
|
||||
done
|
||||
if [ $attempt -ge $MAX_ATTEMPTS ]; then
|
||||
echo_error "Timeout waiting for localhost to be ready after $MAX_ATTEMPTS attempts."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
echo_error "Error: Failed to join the cluster."
|
||||
exit 1
|
||||
fi
|
||||
# 检查是否成功加入集群
|
||||
check_cluster_join() {
|
||||
info_output=$(./apinto info 2>&1)
|
||||
peer_count=$(echo "$info_output" | grep -c -- "--Peer")
|
||||
if [ "$peer_count" -ge 2 ]; then
|
||||
echo_info "Successfully joined the cluster with $peer_count peers. Cluster info: $info_output"
|
||||
return 0
|
||||
else
|
||||
echo_info "Failed to join the cluster. Only $peer_count peer(s) found. Info: $info_output"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
echo_info "Successfully joined the cluster."
|
||||
# 尝试加入集群,带重试
|
||||
try_join_cluster() {
|
||||
local target_addr=$1
|
||||
local join_retries=0
|
||||
while [ $join_retries -lt $MAX_JOIN_RETRIES ]; do
|
||||
echo_info "Attempting to join cluster via $target_addr (Retry $join_retries/$MAX_JOIN_RETRIES)..."
|
||||
join_output=$(./apinto join -addr "$target_addr" 2>&1)
|
||||
if [ $? -eq 0 ]; then
|
||||
if check_cluster_join; then
|
||||
return 0
|
||||
else
|
||||
echo_info "Join via $target_addr executed but cluster validation failed.Details: $join_output"
|
||||
fi
|
||||
else
|
||||
echo_info "Join via $target_addr failed. Details: $join_output"
|
||||
fi
|
||||
join_retries=$((join_retries + 1))
|
||||
if [ $join_retries -lt $MAX_JOIN_RETRIES ]; then
|
||||
echo_info "Retrying join in $RETRY_INTERVAL seconds..."
|
||||
sleep $RETRY_INTERVAL
|
||||
fi
|
||||
done
|
||||
echo_error "Failed to join cluster via $target_addr after $MAX_JOIN_RETRIES retries."
|
||||
return 1
|
||||
}
|
||||
|
||||
if [ "$CURRENT_INDEX" -eq 0 ]; then
|
||||
# apinto-0: 检查其他 Pod 是否在运行
|
||||
echo_info "This is $HOSTNAME. Checking if other nodes are running..."
|
||||
for i in $(seq 1 "$MAX_POD_INDEX"); do
|
||||
OTHER_POD="${BASE_NAME}-${i}.${SERVICE}.${NAMESPACE}.svc.cluster.local"
|
||||
attempt=0
|
||||
while [ $attempt -lt $MAX_ATTEMPTS ]; do
|
||||
if curl --max-time 5 --silent --fail http://${OTHER_POD}:9401; then
|
||||
echo_info "Found a running node: ${OTHER_POD}."
|
||||
if try_join_cluster "${OTHER_POD}:9401"; then
|
||||
exit 0
|
||||
fi
|
||||
echo_info "Failed to join via ${OTHER_POD}. Trying next node..."
|
||||
break
|
||||
else
|
||||
echo_info "${OTHER_POD} is not ready yet. Retrying in $RETRY_INTERVAL seconds..."
|
||||
fi
|
||||
sleep $RETRY_INTERVAL
|
||||
attempt=$((attempt + 1))
|
||||
done
|
||||
echo_info "Timeout waiting for ${OTHER_POD} after $MAX_ATTEMPTS attempts."
|
||||
done
|
||||
echo_info "No other nodes are available or joinable. Starting as the first node."
|
||||
else
|
||||
# 非 apinto-0 的 Pod,加入 apinto-0
|
||||
LEADER_POD="${BASE_NAME}-0.${SERVICE}.${NAMESPACE}.svc.cluster.local"
|
||||
echo_info "This is $HOSTNAME. Waiting for $LEADER_POD to be ready..."
|
||||
attempt=0
|
||||
until curl --max-time 5 --silent --fail http://$LEADER_POD:9401 || [ $attempt -ge $MAX_ATTEMPTS ]; do
|
||||
echo_info "Waiting for $LEADER_POD to be ready... Attempt $attempt"
|
||||
sleep 1
|
||||
attempt=$((attempt + 1))
|
||||
done
|
||||
if [ $attempt -ge $MAX_ATTEMPTS ]; then
|
||||
echo_error "Timeout waiting for $LEADER_POD to be ready after $MAX_ATTEMPTS attempts."
|
||||
exit 1
|
||||
fi
|
||||
echo_info "$LEADER_POD is ready."
|
||||
if try_join_cluster "$LEADER_POD:9401"; then
|
||||
exit 0
|
||||
else
|
||||
echo_error "Failed to join cluster via $LEADER_POD after $MAX_JOIN_RETRIES retries."
|
||||
exit 1
|
||||
fi
|
||||
fi
|
@@ -2,16 +2,16 @@ version: 2
|
||||
#certificate: # 证书存放根目录
|
||||
# dir: /etc/apinto/cert
|
||||
client:
|
||||
#advertise_urls: # open api 服务的广播地址
|
||||
#- http://127.0.0.1:9400
|
||||
advertise_urls: # open api 服务的广播地址
|
||||
- http://{IP}:9400
|
||||
listen_urls: # open api 服务的监听地址
|
||||
- http://0.0.0.0:9400
|
||||
#certificate: # 证书配置,允许使用ip的自签证书
|
||||
# - cert: server.pem
|
||||
# key: server.key
|
||||
gateway:
|
||||
#advertise_urls: # 转发服务的广播地址
|
||||
#- http://127.0.0.1:9400
|
||||
advertise_urls: # 转发服务的广播地址
|
||||
- http://{IP}:8099
|
||||
listen_urls: # 转发服务的监听地址
|
||||
- https://0.0.0.0:8099
|
||||
- http://0.0.0.0:8099
|
||||
|
@@ -1,4 +1,21 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
ERR_LOG=/var/log/apinto/error.log
|
||||
echo_info() {
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG
|
||||
}
|
||||
|
||||
echo_error() {
|
||||
echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG
|
||||
}
|
||||
#This script is used to leave the K8S cluster
|
||||
./apinto leave
|
||||
leaveOutput=$(./apinto leave)
|
||||
if [[ $? -ne 0 ]]; then
|
||||
echo_error "Failed to leave the cluster: $leaveOutput"
|
||||
exit 1
|
||||
else
|
||||
echo_info "Successfully left the cluster."
|
||||
./apinto stop
|
||||
echo_info "Apinto stopped successfully."
|
||||
fi
|
||||
|
||||
|
@@ -29,4 +29,5 @@ const (
|
||||
// CheckMultiple 复合匹配
|
||||
CheckMultiple
|
||||
CheckTypeIP
|
||||
CheckTypeTimeRange
|
||||
)
|
||||
|
@@ -75,6 +75,7 @@ func (h *httpHandler) Serve(ctx eocontext.EoContext) {
|
||||
ctx.SetLabel("method", httpContext.Request().Method())
|
||||
ctx.SetLabel("path", httpContext.Request().URI().RequestURI())
|
||||
ctx.SetLabel("ip", httpContext.Request().RealIp())
|
||||
ctx.SetLabel("time", httpContext.AcceptTime().Format("2006-01-02 15:04:05"))
|
||||
|
||||
ctx.SetCompleteHandler(h.completeHandler)
|
||||
ctx.SetBalance(h.service)
|
||||
|
@@ -9,9 +9,9 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
|
||||
"github.com/eolinker/eosc/log"
|
||||
|
||||
|
||||
"github.com/eolinker/apinto/certs"
|
||||
"github.com/eolinker/eosc/common/bean"
|
||||
"github.com/eolinker/eosc/config"
|
||||
@@ -26,7 +26,7 @@ const (
|
||||
GRPC RouterType = iota
|
||||
Http
|
||||
Dubbo2
|
||||
TslTCP
|
||||
TlsTCP
|
||||
AnyTCP
|
||||
depth
|
||||
)
|
||||
@@ -50,28 +50,29 @@ type RouterServerHandler func(port int, listener net.Listener)
|
||||
|
||||
func init() {
|
||||
matchWriters[AnyTCP] = matchersToMatchWriters(cmux.Any())
|
||||
|
||||
matchWriters[TslTCP] = matchersToMatchWriters(cmux.TLS())
|
||||
|
||||
matchWriters[TlsTCP] = matchersToMatchWriters(cmux.TLS())
|
||||
|
||||
matchWriters[Http] = matchersToMatchWriters(cmux.HTTP1Fast(http.MethodPatch))
|
||||
matchWriters[Dubbo2] = matchersToMatchWriters(cmux.PrefixMatcher(string([]byte{0xda, 0xbb})))
|
||||
matchWriters[GRPC] = []cmux.MatchWriter{cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc")}
|
||||
var tf traffic.ITraffic
|
||||
var listenCfg *config.ListenUrl
|
||||
bean.Autowired(&tf, &listenCfg)
|
||||
|
||||
|
||||
bean.AddInitializingBeanFunc(func() {
|
||||
initListener(tf, listenCfg)
|
||||
})
|
||||
}
|
||||
func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) {
|
||||
|
||||
|
||||
if tf.IsStop() {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
tcp, ssl := tf.Listen(listenCfg.ListenUrls...)
|
||||
|
||||
|
||||
listenerByPort := make(map[int][]net.Listener)
|
||||
for _, l := range tcp {
|
||||
port := readPort(l.Addr())
|
||||
@@ -79,7 +80,7 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) {
|
||||
}
|
||||
if len(ssl) > 0 {
|
||||
tlsConfig := &tls.Config{GetCertificate: certs.GetCertificateFunc()}
|
||||
|
||||
|
||||
for _, l := range ssl {
|
||||
log.Debug("ssl listen: ", l.Addr().String())
|
||||
port := readPort(l.Addr())
|
||||
@@ -87,9 +88,9 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) {
|
||||
}
|
||||
}
|
||||
for port, lns := range listenerByPort {
|
||||
|
||||
|
||||
var ln net.Listener = mixl.NewMixListener(port, lns...)
|
||||
|
||||
|
||||
wg.Add(1)
|
||||
go func(ln net.Listener, p int) {
|
||||
wg.Done()
|
||||
@@ -100,10 +101,10 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) {
|
||||
go handler(p, cMux.MatchWithWriters(matchWriters[i]...))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
cMux.Serve()
|
||||
}(ln, port)
|
||||
|
||||
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
|
2
go.mod
2
go.mod
@@ -12,7 +12,7 @@ require (
|
||||
github.com/coocood/freecache v1.2.2
|
||||
//github.com/dgrr/http2 v0.3.5
|
||||
github.com/dubbogo/gost v1.13.1
|
||||
github.com/eolinker/eosc v0.20.5
|
||||
github.com/eolinker/eosc v0.20.6
|
||||
github.com/fasthttp/websocket v1.5.0
|
||||
github.com/fullstorydev/grpcurl v1.8.7
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
|
@@ -3,8 +3,10 @@ package strategy
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/eolinker/apinto/checker"
|
||||
"github.com/eolinker/eosc/log"
|
||||
@@ -81,7 +83,7 @@ func (i *ipCidrChecker) Key() string {
|
||||
}
|
||||
|
||||
func (i *ipCidrChecker) Value() string {
|
||||
return i.org
|
||||
return i.cidr.String()
|
||||
}
|
||||
|
||||
func newIpCidrChecker(ip string) (*ipCidrChecker, error) {
|
||||
@@ -89,7 +91,7 @@ func newIpCidrChecker(ip string) (*ipCidrChecker, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ipCidrChecker{cidr: cidr}, nil
|
||||
return &ipCidrChecker{cidr: cidr, org: ip}, nil
|
||||
}
|
||||
|
||||
func (i *ipCidrChecker) Check(ip string) bool {
|
||||
@@ -148,6 +150,73 @@ func (i *ipV4RangeChecker) Check(ip string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type timestampChecker struct {
|
||||
org string
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
}
|
||||
|
||||
func newTimestampChecker(timeRange string) (*timestampChecker, error) {
|
||||
// 正则表达式:匹配 HH:mm:ss - HH:mm:ss
|
||||
regex := `^((?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d|24:00:00) - ((?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d|24:00:00)$`
|
||||
re := regexp.MustCompile(regex)
|
||||
if !re.MatchString(timeRange) {
|
||||
return nil, fmt.Errorf("invalid time format, expected HH:mm:ss - HH:mm:ss (00:00:00 - 24:00:00)")
|
||||
}
|
||||
|
||||
// 提取开始时间和结束时间
|
||||
times := strings.Split(timeRange, " - ")
|
||||
startTimeStr, endTimeStr := times[0], times[1]
|
||||
// 解析开始时间和结束时间(假设在当前日期)
|
||||
startTime, err := time.Parse("15:04:05", startTimeStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse start time: %v", err)
|
||||
}
|
||||
if endTimeStr == "24:00:00" {
|
||||
// 特殊处理 24:00:00,表示第二天的 00:00:00
|
||||
endTimeStr = "23:59:59"
|
||||
}
|
||||
|
||||
endTime, err := time.Parse("15:04:05", endTimeStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse end time: %v", err)
|
||||
}
|
||||
if startTime.After(endTime) {
|
||||
return nil, fmt.Errorf("start time %s cannot be after end time %s", startTimeStr, endTimeStr)
|
||||
}
|
||||
return ×tampChecker{startTime: startTime, endTime: endTime}, nil
|
||||
|
||||
}
|
||||
|
||||
func (t *timestampChecker) Check(v string, has bool) bool {
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
now, err := time.ParseInLocation("2006-01-02 15:04:05", v, time.Local)
|
||||
if err != nil {
|
||||
log.Error("invalid timestamp format: %s, error: %v", v, err)
|
||||
return false
|
||||
}
|
||||
startTime := time.Date(now.Year(), now.Month(), now.Day(), t.startTime.Hour(), t.startTime.Minute(), t.startTime.Second(), 0, time.Local)
|
||||
endTime := time.Date(now.Year(), now.Month(), now.Day(), t.endTime.Hour(), t.endTime.Minute(), t.endTime.Second(), 0, time.Local)
|
||||
if startTime.Before(now) && endTime.After(now) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *timestampChecker) Key() string {
|
||||
return t.org
|
||||
}
|
||||
|
||||
func (t *timestampChecker) CheckType() checker.CheckType {
|
||||
return checker.CheckTypeTimeRange
|
||||
}
|
||||
|
||||
func (t *timestampChecker) Value() string {
|
||||
return t.org
|
||||
}
|
||||
|
||||
type IChecker interface {
|
||||
Check(v string) bool
|
||||
Key() string
|
||||
|
@@ -15,25 +15,33 @@ func ParseFilter(config FilterConfig) (IFilter, error) {
|
||||
continue
|
||||
}
|
||||
cks := make([]checker.Checker, 0, len(patterns))
|
||||
|
||||
OuterLoop:
|
||||
for _, p := range patterns {
|
||||
if name == "ip" {
|
||||
switch name {
|
||||
case "ip":
|
||||
c, err := newIPChecker(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cks = append(cks, c)
|
||||
continue
|
||||
case "time":
|
||||
c, err := newTimestampChecker(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cks = append(cks, c)
|
||||
default:
|
||||
c, err := checker.Parse(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.CheckType() == checker.CheckTypeAll {
|
||||
cks = nil
|
||||
break OuterLoop
|
||||
}
|
||||
cks = append(cks, c)
|
||||
}
|
||||
c, err := checker.Parse(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.CheckType() == checker.CheckTypeAll {
|
||||
cks = nil
|
||||
break
|
||||
}
|
||||
cks = append(cks, c)
|
||||
|
||||
}
|
||||
|
||||
if len(cks) != 0 {
|
||||
|
Reference in New Issue
Block a user