修复获取目标地址IP失败导致程序崩溃的问题

This commit is contained in:
Liujian
2023-11-07 17:50:15 +08:00
parent b47a447b83
commit 00643c3046
7 changed files with 45 additions and 24 deletions

View File

@@ -19,7 +19,7 @@ peer: # 集群间节点通信配置信息
listen_urls: # 节点监听地址 listen_urls: # 节点监听地址
- http://0.0.0.0:9401 - http://0.0.0.0:9401
#advertise_urls: # 节点通信广播地址 #advertise_urls: # 节点通信广播地址
# - http://127.0.0.1:9400 # - http://127.0.0.1:9401
#certificate: # 证书配置允许使用ip的自签证书 #certificate: # 证书配置允许使用ip的自签证书
# - cert: server.pem # - cert: server.pem
# key: server.key # key: server.key

View File

@@ -137,14 +137,14 @@ func (b *Executor) Get(variables map[string]string) (int64, error) {
// 解析JSON // 解析JSON
v := b.expr.Get(result) v := b.expr.Get(result)
if v == nil || len(v) < 1 { if v == nil || len(v) < 1 {
return 0, fmt.Errorf("json path %s not found,id is %s", b.expr.String(), b.Id()) return 0, fmt.Errorf("json path %s not found,body is %s", b.expr.String(), body)
} }
if len(v) != 1 { if len(v) != 1 {
return 0, fmt.Errorf("json path %s found more than one,id is %s", b.expr.String(), b.Id()) return 0, fmt.Errorf("json path %s found more than one,body is %s", b.expr.String(), body)
} }
intV, ok := v[0].(int64) intV, ok := v[0].(int64)
if !ok { if !ok {
return 0, fmt.Errorf("json path %s found not int64,id is %s", b.expr.String(), b.Id()) return 0, fmt.Errorf("json path %s found not int64,body is %s", b.expr.String(), body)
} }
return intV, nil return intV, nil
} }

View File

@@ -1,17 +1,19 @@
package kafka package kafka
import ( import (
"github.com/Shopify/sarama"
"github.com/eolinker/eosc/log"
"testing" "testing"
"time" "time"
"github.com/Shopify/sarama"
"github.com/eolinker/eosc/log"
) )
var ( var (
addr = []string{ addr = []string{
"alikafka-post-cn-7mz2jfjap00k-1-vpc.alikafka.aliyuncs.com:9092", "alikafka-post-cn-7mz2jfjap00k-1-vpc.alikafka.aliyuncs.com:9092",
"alikafka-post-cn-7mz2jfjap00k-2-vpc.alikafka.aliyuncs.com:9092", "alikafka-post-cn-7mz2jfjap00k-2-vpc.alikafka.aliyuncs.com:9092",
"alikafka-post-cn-7mz2jfjap00k-3-vpc.alikafka.aliyuncs.com:9092"} "alikafka-post-cn-7mz2jfjap00k-3-vpc.alikafka.aliyuncs.com:9092",
}
) )
func beginConsumer(topic string, addr []string, partition int32) { func beginConsumer(topic string, addr []string, partition int32) {
@@ -66,7 +68,7 @@ func TestSendMessageSync(t *testing.T) {
config.Producer.Timeout = 3 * time.Second config.Producer.Timeout = 3 * time.Second
config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Partitioner = sarama.NewManualPartitioner config.Producer.Partitioner = sarama.NewManualPartitioner
config.Version = sarama.V0_11_0_2 config.Version = sarama.V2_5_0_0
p, err := sarama.NewSyncProducer(msg.address, config) p, err := sarama.NewSyncProducer(msg.address, config)
if err != nil { if err != nil {
t.Errorf("sarama.NewSyncProducer err, message=%s \n", err) t.Errorf("sarama.NewSyncProducer err, message=%s \n", err)
@@ -84,6 +86,7 @@ func TestSendMessageSync(t *testing.T) {
t.Logf("send success, partition=%d, offset=%d \n", part, offset) t.Logf("send success, partition=%d, offset=%d \n", part, offset)
} }
} }
func TestSendMessageAsync(t *testing.T) { func TestSendMessageAsync(t *testing.T) {
msg := TestProducerConfig{ msg := TestProducerConfig{
address: addr, address: addr,
@@ -141,3 +144,19 @@ func TestMain(m *testing.M) {
m.Run() m.Run()
<-time.After(60 * time.Second) <-time.After(60 * time.Second)
} }
func TestConsumer(t *testing.T) {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
consumer, err := sarama.NewConsumer([]string{"172.18.166.219:9092"}, config)
if err != nil {
t.Fatal(err)
}
pc, err := consumer.ConsumePartition("apinto", 0, sarama.OffsetNewest)
if err != nil {
t.Fatal(err)
}
for msg := range pc.Messages() {
t.Log("key:", string(msg.Key), "value:", string(msg.Value), "offset:", msg.Offset, "partition:", msg.Partition, "timestamp:", msg.Timestamp)
}
}

View File

@@ -104,7 +104,9 @@ func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IC
ctx.SetLabel("file_name", h.Filename) ctx.SetLabel("file_name", h.Filename)
ctx.SetLabel("file_suffix", suffix) ctx.SetLabel("file_suffix", suffix)
ctx.WithValue("file_size", h.Size) ctx.WithValue("file_size", h.Size)
if h.Size > e.largeWarn {
ctx.SetLabel("file_size_warn", e.largeWarnStr)
}
break break
} }
} }

View File

@@ -19,21 +19,21 @@ type Addr struct {
} }
func resolveAddr(scheme string, addr string) (*Addr, error) { func resolveAddr(scheme string, addr string) (*Addr, error) {
as := strings.Split(addr, ":")
if len(as) < 2 {
if scheme == "http" {
addr = fmt.Sprintf("%s:80", addr)
} else if scheme == "https" {
addr = fmt.Sprintf("%s:443", addr)
}
}
tcpAddr, err := net.ResolveTCPAddr("tcp", addr) tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
port := tcpAddr.Port
if port == 0 {
if scheme == "http" {
port = 80
} else if scheme == "https" {
port = 443
}
}
return &Addr{ return &Addr{
IP: tcpAddr.IP, IP: tcpAddr.IP,
Port: port, Port: tcpAddr.Port,
}, nil }, nil
} }

View File

@@ -147,10 +147,11 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t
agent.setStatusCode(504) agent.setStatusCode(504)
} else { } else {
agent.setStatusCode(ctx.response.Response.StatusCode()) agent.setStatusCode(ctx.response.Response.StatusCode())
}
agent.responseBody = string(ctx.response.Response.Body())
agent.setRemoteIP(tcpAddr.IP.String()) agent.setRemoteIP(tcpAddr.IP.String())
agent.setRemotePort(tcpAddr.Port) agent.setRemotePort(tcpAddr.Port)
}
agent.responseBody = string(ctx.response.Response.Body())
agent.setResponseLength(ctx.response.Response.Header.ContentLength()) agent.setResponseLength(ctx.response.Response.Header.ContentLength())
ctx.proxyRequests = append(ctx.proxyRequests, agent) ctx.proxyRequests = append(ctx.proxyRequests, agent)

View File

@@ -151,15 +151,14 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
agent.setStatusCode(504) agent.setStatusCode(504)
} else { } else {
ctx.response.ResponseHeader.refresh() ctx.response.ResponseHeader.refresh()
agent.setRemoteIP(tcpAddr.IP.String())
agent.setRemotePort(tcpAddr.Port)
agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode()) agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode())
} }
agent.responseBody = string(ctx.response.Response.Body()) agent.responseBody = string(ctx.response.Response.Body())
agent.setRemoteIP(tcpAddr.IP.String())
agent.setRemotePort(tcpAddr.Port)
agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength()) agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength())
ctx.response.remoteIP = tcpAddr.IP.String()
ctx.response.remotePort = tcpAddr.Port
ctx.proxyRequests = append(ctx.proxyRequests, agent) ctx.proxyRequests = append(ctx.proxyRequests, agent)
return ctx.response.responseError return ctx.response.responseError