From d6e93dbb657ad842297caab49bb904749795fc78 Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Thu, 26 Jun 2025 14:52:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=8A=A8=E6=9E=84?= =?UTF-8?q?=E5=BB=BAarm=E9=95=9C=E5=83=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/actions.yaml | 22 ++-- .gitignore | 1 + app/apinto/work/logs/read_log_test.go | 120 ----------------- build/cmd/docker_build.sh | 12 +- build/cmd/docker_publish.sh | 55 +++++--- build/resources/auto-join.sh | 145 +++++++++++++++------ build/resources/config.yml.k8s.tpl | 8 +- build/resources/leave.sh | 21 ++- checker/checkType.go | 1 + drivers/router/http-router/http-handler.go | 1 + drivers/router/listener.go | 29 +++-- go.mod | 2 +- strategy/checker.go | 73 ++++++++++- strategy/filter.go | 32 +++-- 14 files changed, 286 insertions(+), 236 deletions(-) delete mode 100644 app/apinto/work/logs/read_log_test.go diff --git a/.github/workflows/actions.yaml b/.github/workflows/actions.yaml index 6f96e451..d33a8abd 100644 --- a/.github/workflows/actions.yaml +++ b/.github/workflows/actions.yaml @@ -16,7 +16,7 @@ jobs: - name: SetUpGo uses: actions/setup-go@v3 with: - go-version: '1.21.1' + go-version: '1.23.6' - name: Checkout #Checkout代码 uses: actions/checkout@v3 - name: GoTidy @@ -32,14 +32,17 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} dockerImage: - needs: [releasePackage] name: buildDockerImage runs-on: ubuntu-latest steps: + - name: SetOutput #处理Tag字符串并存进outputs + id: vars + run: | + echo "tag=${GITHUB_REF#refs/*/v}" >> $GITHUB_OUTPUT - name: SetupGo uses: actions/setup-go@v3 with: - go-version: '1.21.1' + go-version: '1.23.6' - uses: actions/checkout@v3 - name: SetOutput id: vars @@ -47,20 +50,11 @@ jobs: - name: GoTidy run: | go mod tidy - - name: Pack #运行打包脚本并且将生成的tar包复制至Dockerfile所在目录 - run: | - ./build/cmd/package.sh ${{ steps.vars.outputs.tag }} - cp ./out/apinto_${{ steps.vars.outputs.tag }}_linux_amd64.tar.gz ./build/resources/apinto.linux.x64.tar.gz - name: Login Docker #登录docker uses: docker/login-action@v1 with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_TOKEN }} - - name: buildImage + - name: Push #运行打包脚本并且将生成的tar包复制至Dockerfile所在目录 run: | - docker build -t ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }} -f ./build/resources/Dockerfile ./build/resources - - name: pushImage - run: | - docker push ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }} - docker tag ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:${{ steps.vars.outputs.tag }} ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:latest - docker push ${{ secrets.DOCKER_USERNAME }}/apinto-gateway:latest + ./build/cmd/docker_publish.sh ${{ steps.vars.outputs.tag }} ${{ secrets.DOCKER_USERNAME }} "" latest diff --git a/.gitignore b/.gitignore index fffb3962..7f2bd5ae 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ .gitlab-ci.yml /outer/ .env +/.secrets diff --git a/app/apinto/work/logs/read_log_test.go b/app/apinto/work/logs/read_log_test.go deleted file mode 100644 index 5157f4e0..00000000 --- a/app/apinto/work/logs/read_log_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package logs - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "strconv" - "strings" - "testing" - "time" - - "github.com/google/uuid" -) - -type LokiRequest struct { - Streams []*Stream `json:"streams"` -} - -type Stream struct { - Stream map[string]string `json:"stream"` - Values [][]interface{} `json:"values"` -} - -func TestSendLogToLoki(t *testing.T) { - // 1. Create a new log - // 2. Send the log to Loki - // 3. Check if the log is in Loki - - items, err := parseLog() - if err != nil { - t.Fatal(err) - } - client := http.Client{} - for _, item := range items { - body, err := json.Marshal(item) - if err != nil { - t.Fatal(err) - } - req, err := http.NewRequest(http.MethodPost, "http://localhost:3100/loki/api/v1/push", bytes.NewBuffer(body)) - if err != nil { - t.Fatal(err) - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Scope-OrgID", "tenant1") - resp, err := client.Do(req) - if err != nil { - t.Fatal(err) - } - respBody, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - if resp.StatusCode != 204 { - t.Fatal(resp.StatusCode, string(respBody)) - } - if err := resp.Body.Close(); err != nil { - t.Fatal(err) - } - } - t.Log("Send log to Loki success") - -} - -func parseLog() ([]*LokiRequest, error) { - data, err := os.ReadFile("access.log") - if err != nil { - return nil, err - } - // 换行分割 - lines := strings.Split(string(data), "\n") - reqMap := map[string]*Stream{} - // 解析日志 - for _, l := range lines { - if l == "" { - continue - } - tmp := make(map[string]interface{}) - err = json.Unmarshal([]byte(l), &tmp) - if err != nil { - return nil, err - } - org := map[string]string{ - "cluster": tmp["cluster"].(string), - "node": tmp["node"].(string), - "service": tmp["service"].(string), - "api": tmp["api"].(string), - "src_ip": tmp["src_ip"].(string), - "block_name": tmp["block_name"].(string), - } - key := genKey(org) - if _, ok := reqMap[key]; !ok { - reqMap[key] = &Stream{ - Stream: org, - Values: make([][]interface{}, 0), - } - } - requestId := uuid.NewString() - reqMap[key].Values = append(reqMap[key].Values, []interface{}{strconv.FormatInt(time.UnixMilli(int64(tmp["msec"].(float64))).Add(5*time.Hour).UnixNano(), 10), l, map[string]interface{}{"request_id": requestId}}) - } - reqs := make([]*LokiRequest, len(reqMap)/10+1) - num := 0 - for _, v := range reqMap { - index := num / 10 - if reqs[index] == nil { - reqs[index] = &LokiRequest{ - Streams: make([]*Stream, 0, 10), - } - } - reqs[index].Streams = append(reqs[index].Streams, v) - num++ - } - return reqs, nil -} - -func genKey(org map[string]string) string { - return fmt.Sprintf("cluster_%s-node_%s-service_%s-api_%s-src_ip_%s-block_name_%s", org["cluster"], org["node"], org["service"], org["api"], org["src_ip"], org["block_name"]) -} diff --git a/build/cmd/docker_build.sh b/build/cmd/docker_build.sh index 1310a4ef..14ca1e11 100755 --- a/build/cmd/docker_build.sh +++ b/build/cmd/docker_build.sh @@ -18,15 +18,19 @@ if [[ "$ARCH" == "" ]] then ARCH="amd64" fi -if [[ "$ARCH" == "amd64" ]] -then - PLATFORM="--platform=linux/amd64" + +SYS_ARCH=$(arch) +if [[ (${SYS_ARCH} == "aarch64" || $(arch) == "arm64") && $ARCH == "amd64" ]];then + OPTIONS="--platform=linux/amd64" +elif [[ $(arch) == "amd64" && $ARCH == "arm64" ]];then + OPTIONS="--platform=linux/arm64" fi + ./build/cmd/package.sh ${VERSION} ${ARCH} PackageName=apinto_${VERSION}_linux_${ARCH}.tar.gz cp out/${PackageName} ./build/resources/apinto.linux.x64.tar.gz -docker build $PLATFORM -t ${Username}/apinto-gateway:${VERSION}-${ARCH} -f ./build/cmd/Dockerfile ./build/resources +docker build $OPTIONS -t ${Username}/apinto-gateway:${VERSION}-${ARCH} -f ./build/cmd/Dockerfile ./build/resources rm -rf ./build/resources/apinto.linux.x64.tar.gz diff --git a/build/cmd/docker_publish.sh b/build/cmd/docker_publish.sh index ae5b168b..a12c0f0b 100755 --- a/build/cmd/docker_publish.sh +++ b/build/cmd/docker_publish.sh @@ -5,40 +5,53 @@ echo $0 . $(dirname $0)/common.sh -Version=$(genVersion) +Version=$(genVersion $1) echo ${Version} -ImageName="docker.eolinker.com/apinto/apinto-gateway" -echo "docker manifest rm \"${ImageName}:${Version}\"" -docker manifest rm "${ImageName}:${Version}" -set -e -./build/cmd/docker_build.sh ${Version} "docker.eolinker.com/apinto" amd64 +Username="eolinker" +if [[ "$1" != "" ]] +then + Username=$2 +fi +ImageName="${Username}/apinto-gateway" -./build/cmd/docker_build.sh ${Version} "docker.eolinker.com/apinto" arm64 +./build/cmd/docker_build.sh ${Version} ${Username} amd64 +./build/cmd/docker_build.sh ${Version} ${Username} arm64 +publish_hub() { + Version=$1 + echo "docker manifest rm \"${ImageName}:${Version}\"" + docker manifest rm "${ImageName}:${Version}" + echo "docker push \"${ImageName}:${Version}-amd64\"" + docker push "${ImageName}:${Version}-amd64" + echo "docker push \"${ImageName}:${Version}-arm64\"" + docker push "${ImageName}:${Version}-arm64" -echo "docker push \"${ImageName}:${Version}-amd64\"" -docker push "${ImageName}:${Version}-amd64" -echo "docker push \"${ImageName}:${Version}-arm64\"" -docker push "${ImageName}:${Version}-arm64" + echo "Create manifest ${ImageName}:${Version}" + docker manifest create "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" "${ImageName}:${Version}-arm64" -echo "Create manifest ${ImageName}:${Version}" -docker manifest create "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" "${ImageName}:${Version}-arm64" + echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-amd64 --os linux --arch amd64" + docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" --os linux --arch amd64 -echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-amd64 --os linux --arch amd64" -docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-amd64" --os linux --arch amd64 + echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-arm64 --os linux --arch arm64" + docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-arm64" --os linux --arch arm64 -echo "Annotate manifest ${ImageName}:${Version} ${ImageName}:${Version}-arm64 --os linux --arch arm64" -docker manifest annotate "${ImageName}:${Version}" "${ImageName}:${Version}-arm64" --os linux --arch arm64 + echo "Push manifest ${ImageName}:${Version}" + docker manifest push "${ImageName}:${Version}" +} -echo "Push manifest ${ImageName}:${Version}" -docker manifest push "${ImageName}:${Version}" +publish_hub ${Version} - -PUBLISH=$1 +PUBLISH=$3 if [[ "${PUBLISH}" == "upload" ]];then ./build/cmd/qiniu_publish.sh amd64 ./build/cmd/qiniu_publish.sh arm64 fi +if [[ $4 == "latest" ]];then + echo "Publish latest version" + docker tag "${ImageName}:${Version}-amd64" "${ImageName}:latest-amd64" + docker tag "${ImageName}:${Version}-arm64" "${ImageName}:latest-arm64" + publish_hub "latest" +fi diff --git a/build/resources/auto-join.sh b/build/resources/auto-join.sh index 69d091e2..27bf7c70 100755 --- a/build/resources/auto-join.sh +++ b/build/resources/auto-join.sh @@ -2,59 +2,120 @@ ERR_LOG=/var/log/apinto/error.log echo_info() { - echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG + echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG } echo_error() { - echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG + echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG } +# 检查环境变量 +if [ -z "$SERVICE" ] || [ -z "$NAMESPACE" ]; then + echo_error "Environment variables SERVICE and NAMESPACE must be set." + exit 1 +fi + # 解析当前 Pod 的序号 CURRENT_INDEX=${HOSTNAME##*-} BASE_NAME=${HOSTNAME%-*} +MAX_ATTEMPTS=60 # 最大尝试次数(节点等待),约1分钟 +RETRY_INTERVAL=5 # 重试间隔,单位秒 +MAX_JOIN_RETRIES=12 # 加入集群失败的最大重试次数 +MAX_POD_INDEX=${MAX_POD_INDEX:-10} # 默认检查最多10个Pod,可通过环境变量配置 -# 检查当前程序是否启动,若无,则等待 -until curl --max-time 5 --silent --fail http://127.0.0.1:9400; do - echo_info "Waiting for localhost to be ready..." - sleep 1 -done - -# 如果当前是 apinto-0,需要特殊处理 -if [ "$CURRENT_INDEX" -eq 0 ]; then - echo_info "This is ${HOSTNAME}. Checking if other nodes exist..." - - # 尝试连接 apinto-1 - NEXT_POD="${BASE_NAME}-$((CURRENT_INDEX + 1)).${SERVICE}.${NAMESPACE}.svc.cluster.local" - if curl --max-time 5 --silent --fail http://${NEXT_POD}:9401; then - echo_info "Found a running node: ${NEXT_POD}. Joining the cluster..." - ./apinto join -addr ${NEXT_POD}:9401 >> $ERR_LOG 2>&1 - if [ $? -ne 0 ]; then - echo_error "Error: Failed to join the cluster." - fi - else - echo_info "No other nodes are available. Starting as the first node." - fi -else - # 对于非 apinto-0 的 Pod,连接到前一个 Pod - PREVIOUS_POD="${BASE_NAME}-$((CURRENT_INDEX - 1)).${SERVICE}.${NAMESPACE}.svc.cluster.local" - echo_info "This is $HOSTNAME. Waiting for $PREVIOUS_POD to be ready..." - - until curl --max-time 5 --silent --fail http://$PREVIOUS_POD:9401; do - echo_info "Waiting for $PREVIOUS_POD to be ready..." +# 等待本地服务启动 +attempt=0 +until curl --max-time 5 --silent --fail http://127.0.0.1:9400 || [ $attempt -ge $MAX_ATTEMPTS ]; do + echo_info "Waiting for localhost to be ready... Attempt $attempt" sleep 1 - done - - echo_info "$PREVIOUS_POD is ready. Joining the cluster..." - ./apinto join -addr $PREVIOUS_POD:9401 >> $ERR_LOG 2>&1 - if [ $? -ne 0 ]; then - echo_error "Error: Failed to join the cluster." - fi + attempt=$((attempt + 1)) +done +if [ $attempt -ge $MAX_ATTEMPTS ]; then + echo_error "Timeout waiting for localhost to be ready after $MAX_ATTEMPTS attempts." + exit 1 fi -if [ $? -ne 0 ]; then - echo_error "Error: Failed to join the cluster." - exit 1 -fi +# 检查是否成功加入集群 +check_cluster_join() { + info_output=$(./apinto info 2>&1) + peer_count=$(echo "$info_output" | grep -c -- "--Peer") + if [ "$peer_count" -ge 2 ]; then + echo_info "Successfully joined the cluster with $peer_count peers. Cluster info: $info_output" + return 0 + else + echo_info "Failed to join the cluster. Only $peer_count peer(s) found. Info: $info_output" + return 1 + fi +} -echo_info "Successfully joined the cluster." +# 尝试加入集群,带重试 +try_join_cluster() { + local target_addr=$1 + local join_retries=0 + while [ $join_retries -lt $MAX_JOIN_RETRIES ]; do + echo_info "Attempting to join cluster via $target_addr (Retry $join_retries/$MAX_JOIN_RETRIES)..." + join_output=$(./apinto join -addr "$target_addr" 2>&1) + if [ $? -eq 0 ]; then + if check_cluster_join; then + return 0 + else + echo_info "Join via $target_addr executed but cluster validation failed.Details: $join_output" + fi + else + echo_info "Join via $target_addr failed. Details: $join_output" + fi + join_retries=$((join_retries + 1)) + if [ $join_retries -lt $MAX_JOIN_RETRIES ]; then + echo_info "Retrying join in $RETRY_INTERVAL seconds..." + sleep $RETRY_INTERVAL + fi + done + echo_error "Failed to join cluster via $target_addr after $MAX_JOIN_RETRIES retries." + return 1 +} +if [ "$CURRENT_INDEX" -eq 0 ]; then + # apinto-0: 检查其他 Pod 是否在运行 + echo_info "This is $HOSTNAME. Checking if other nodes are running..." + for i in $(seq 1 "$MAX_POD_INDEX"); do + OTHER_POD="${BASE_NAME}-${i}.${SERVICE}.${NAMESPACE}.svc.cluster.local" + attempt=0 + while [ $attempt -lt $MAX_ATTEMPTS ]; do + if curl --max-time 5 --silent --fail http://${OTHER_POD}:9401; then + echo_info "Found a running node: ${OTHER_POD}." + if try_join_cluster "${OTHER_POD}:9401"; then + exit 0 + fi + echo_info "Failed to join via ${OTHER_POD}. Trying next node..." + break + else + echo_info "${OTHER_POD} is not ready yet. Retrying in $RETRY_INTERVAL seconds..." + fi + sleep $RETRY_INTERVAL + attempt=$((attempt + 1)) + done + echo_info "Timeout waiting for ${OTHER_POD} after $MAX_ATTEMPTS attempts." + done + echo_info "No other nodes are available or joinable. Starting as the first node." +else + # 非 apinto-0 的 Pod,加入 apinto-0 + LEADER_POD="${BASE_NAME}-0.${SERVICE}.${NAMESPACE}.svc.cluster.local" + echo_info "This is $HOSTNAME. Waiting for $LEADER_POD to be ready..." + attempt=0 + until curl --max-time 5 --silent --fail http://$LEADER_POD:9401 || [ $attempt -ge $MAX_ATTEMPTS ]; do + echo_info "Waiting for $LEADER_POD to be ready... Attempt $attempt" + sleep 1 + attempt=$((attempt + 1)) + done + if [ $attempt -ge $MAX_ATTEMPTS ]; then + echo_error "Timeout waiting for $LEADER_POD to be ready after $MAX_ATTEMPTS attempts." + exit 1 + fi + echo_info "$LEADER_POD is ready." + if try_join_cluster "$LEADER_POD:9401"; then + exit 0 + else + echo_error "Failed to join cluster via $LEADER_POD after $MAX_JOIN_RETRIES retries." + exit 1 + fi +fi \ No newline at end of file diff --git a/build/resources/config.yml.k8s.tpl b/build/resources/config.yml.k8s.tpl index 06ce59a7..618f5ae3 100644 --- a/build/resources/config.yml.k8s.tpl +++ b/build/resources/config.yml.k8s.tpl @@ -2,16 +2,16 @@ version: 2 #certificate: # 证书存放根目录 # dir: /etc/apinto/cert client: - #advertise_urls: # open api 服务的广播地址 - #- http://127.0.0.1:9400 + advertise_urls: # open api 服务的广播地址 + - http://{IP}:9400 listen_urls: # open api 服务的监听地址 - http://0.0.0.0:9400 #certificate: # 证书配置,允许使用ip的自签证书 # - cert: server.pem # key: server.key gateway: - #advertise_urls: # 转发服务的广播地址 - #- http://127.0.0.1:9400 + advertise_urls: # 转发服务的广播地址 + - http://{IP}:8099 listen_urls: # 转发服务的监听地址 - https://0.0.0.0:8099 - http://0.0.0.0:8099 diff --git a/build/resources/leave.sh b/build/resources/leave.sh index 83c3cabf..6e8138d7 100755 --- a/build/resources/leave.sh +++ b/build/resources/leave.sh @@ -1,4 +1,21 @@ #!/bin/bash -set -e + +ERR_LOG=/var/log/apinto/error.log +echo_info() { + echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] $1" >> $ERR_LOG +} + +echo_error() { + echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] $1" >> $ERR_LOG +} #This script is used to leave the K8S cluster -./apinto leave \ No newline at end of file +leaveOutput=$(./apinto leave) +if [[ $? -ne 0 ]]; then + echo_error "Failed to leave the cluster: $leaveOutput" + exit 1 +else + echo_info "Successfully left the cluster." + ./apinto stop + echo_info "Apinto stopped successfully." +fi + diff --git a/checker/checkType.go b/checker/checkType.go index 6a5ea6f3..39c08025 100644 --- a/checker/checkType.go +++ b/checker/checkType.go @@ -29,4 +29,5 @@ const ( // CheckMultiple 复合匹配 CheckMultiple CheckTypeIP + CheckTypeTimeRange ) diff --git a/drivers/router/http-router/http-handler.go b/drivers/router/http-router/http-handler.go index 7ab369ba..4058a039 100644 --- a/drivers/router/http-router/http-handler.go +++ b/drivers/router/http-router/http-handler.go @@ -75,6 +75,7 @@ func (h *httpHandler) Serve(ctx eocontext.EoContext) { ctx.SetLabel("method", httpContext.Request().Method()) ctx.SetLabel("path", httpContext.Request().URI().RequestURI()) ctx.SetLabel("ip", httpContext.Request().RealIp()) + ctx.SetLabel("time", httpContext.AcceptTime().Format("2006-01-02 15:04:05")) ctx.SetCompleteHandler(h.completeHandler) ctx.SetBalance(h.service) diff --git a/drivers/router/listener.go b/drivers/router/listener.go index 84ad09d4..93280697 100644 --- a/drivers/router/listener.go +++ b/drivers/router/listener.go @@ -9,9 +9,9 @@ import ( "strconv" "strings" "sync" - + "github.com/eolinker/eosc/log" - + "github.com/eolinker/apinto/certs" "github.com/eolinker/eosc/common/bean" "github.com/eolinker/eosc/config" @@ -26,7 +26,7 @@ const ( GRPC RouterType = iota Http Dubbo2 - TslTCP + TlsTCP AnyTCP depth ) @@ -50,28 +50,29 @@ type RouterServerHandler func(port int, listener net.Listener) func init() { matchWriters[AnyTCP] = matchersToMatchWriters(cmux.Any()) - - matchWriters[TslTCP] = matchersToMatchWriters(cmux.TLS()) + + matchWriters[TlsTCP] = matchersToMatchWriters(cmux.TLS()) + matchWriters[Http] = matchersToMatchWriters(cmux.HTTP1Fast(http.MethodPatch)) matchWriters[Dubbo2] = matchersToMatchWriters(cmux.PrefixMatcher(string([]byte{0xda, 0xbb}))) matchWriters[GRPC] = []cmux.MatchWriter{cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc")} var tf traffic.ITraffic var listenCfg *config.ListenUrl bean.Autowired(&tf, &listenCfg) - + bean.AddInitializingBeanFunc(func() { initListener(tf, listenCfg) }) } func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) { - + if tf.IsStop() { return } - + wg := sync.WaitGroup{} tcp, ssl := tf.Listen(listenCfg.ListenUrls...) - + listenerByPort := make(map[int][]net.Listener) for _, l := range tcp { port := readPort(l.Addr()) @@ -79,7 +80,7 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) { } if len(ssl) > 0 { tlsConfig := &tls.Config{GetCertificate: certs.GetCertificateFunc()} - + for _, l := range ssl { log.Debug("ssl listen: ", l.Addr().String()) port := readPort(l.Addr()) @@ -87,9 +88,9 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) { } } for port, lns := range listenerByPort { - + var ln net.Listener = mixl.NewMixListener(port, lns...) - + wg.Add(1) go func(ln net.Listener, p int) { wg.Done() @@ -100,10 +101,10 @@ func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) { go handler(p, cMux.MatchWithWriters(matchWriters[i]...)) } } - + cMux.Serve() }(ln, port) - + } wg.Wait() return diff --git a/go.mod b/go.mod index d724827e..8bfea2b7 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/coocood/freecache v1.2.2 //github.com/dgrr/http2 v0.3.5 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.20.5 + github.com/eolinker/eosc v0.20.6 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 diff --git a/strategy/checker.go b/strategy/checker.go index c4a71a1f..c7980ac8 100644 --- a/strategy/checker.go +++ b/strategy/checker.go @@ -3,8 +3,10 @@ package strategy import ( "fmt" "net" + "regexp" "strconv" "strings" + "time" "github.com/eolinker/apinto/checker" "github.com/eolinker/eosc/log" @@ -81,7 +83,7 @@ func (i *ipCidrChecker) Key() string { } func (i *ipCidrChecker) Value() string { - return i.org + return i.cidr.String() } func newIpCidrChecker(ip string) (*ipCidrChecker, error) { @@ -89,7 +91,7 @@ func newIpCidrChecker(ip string) (*ipCidrChecker, error) { if err != nil { return nil, err } - return &ipCidrChecker{cidr: cidr}, nil + return &ipCidrChecker{cidr: cidr, org: ip}, nil } func (i *ipCidrChecker) Check(ip string) bool { @@ -148,6 +150,73 @@ func (i *ipV4RangeChecker) Check(ip string) bool { return true } +type timestampChecker struct { + org string + startTime time.Time + endTime time.Time +} + +func newTimestampChecker(timeRange string) (*timestampChecker, error) { + // 正则表达式:匹配 HH:mm:ss - HH:mm:ss + regex := `^((?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d|24:00:00) - ((?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d|24:00:00)$` + re := regexp.MustCompile(regex) + if !re.MatchString(timeRange) { + return nil, fmt.Errorf("invalid time format, expected HH:mm:ss - HH:mm:ss (00:00:00 - 24:00:00)") + } + + // 提取开始时间和结束时间 + times := strings.Split(timeRange, " - ") + startTimeStr, endTimeStr := times[0], times[1] + // 解析开始时间和结束时间(假设在当前日期) + startTime, err := time.Parse("15:04:05", startTimeStr) + if err != nil { + return nil, fmt.Errorf("failed to parse start time: %v", err) + } + if endTimeStr == "24:00:00" { + // 特殊处理 24:00:00,表示第二天的 00:00:00 + endTimeStr = "23:59:59" + } + + endTime, err := time.Parse("15:04:05", endTimeStr) + if err != nil { + return nil, fmt.Errorf("failed to parse end time: %v", err) + } + if startTime.After(endTime) { + return nil, fmt.Errorf("start time %s cannot be after end time %s", startTimeStr, endTimeStr) + } + return ×tampChecker{startTime: startTime, endTime: endTime}, nil + +} + +func (t *timestampChecker) Check(v string, has bool) bool { + if !has { + return false + } + now, err := time.ParseInLocation("2006-01-02 15:04:05", v, time.Local) + if err != nil { + log.Error("invalid timestamp format: %s, error: %v", v, err) + return false + } + startTime := time.Date(now.Year(), now.Month(), now.Day(), t.startTime.Hour(), t.startTime.Minute(), t.startTime.Second(), 0, time.Local) + endTime := time.Date(now.Year(), now.Month(), now.Day(), t.endTime.Hour(), t.endTime.Minute(), t.endTime.Second(), 0, time.Local) + if startTime.Before(now) && endTime.After(now) { + return true + } + return false +} + +func (t *timestampChecker) Key() string { + return t.org +} + +func (t *timestampChecker) CheckType() checker.CheckType { + return checker.CheckTypeTimeRange +} + +func (t *timestampChecker) Value() string { + return t.org +} + type IChecker interface { Check(v string) bool Key() string diff --git a/strategy/filter.go b/strategy/filter.go index 7f4de587..87faeecc 100644 --- a/strategy/filter.go +++ b/strategy/filter.go @@ -15,25 +15,33 @@ func ParseFilter(config FilterConfig) (IFilter, error) { continue } cks := make([]checker.Checker, 0, len(patterns)) - + OuterLoop: for _, p := range patterns { - if name == "ip" { + switch name { + case "ip": c, err := newIPChecker(p) if err != nil { return nil, err } cks = append(cks, c) - continue + case "time": + c, err := newTimestampChecker(p) + if err != nil { + return nil, err + } + cks = append(cks, c) + default: + c, err := checker.Parse(p) + if err != nil { + return nil, err + } + if c.CheckType() == checker.CheckTypeAll { + cks = nil + break OuterLoop + } + cks = append(cks, c) } - c, err := checker.Parse(p) - if err != nil { - return nil, err - } - if c.CheckType() == checker.CheckTypeAll { - cks = nil - break - } - cks = append(cks, c) + } if len(cks) != 0 {