From f79234d080fb05f3af4e2cce7a34982324a479b7 Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Fri, 15 Dec 2023 11:10:58 +0800 Subject: [PATCH] =?UTF-8?q?nacos=E8=BE=93=E5=87=BA=E6=8A=A5=E9=94=99?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/discovery/nacos/nacos.go | 2 +- drivers/output/nsq/write.go | 8 +++++--- scope-manager/proxy.go | 4 +++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/drivers/discovery/nacos/nacos.go b/drivers/discovery/nacos/nacos.go index 04ea9cd0..1bb2a10a 100644 --- a/drivers/discovery/nacos/nacos.go +++ b/drivers/discovery/nacos/nacos.go @@ -103,7 +103,7 @@ func (n *executor) Start() error { for _, serviceName := range keys { res, err := n.client.GetNodeList(serviceName) if err != nil { - log.Warnf("executor %s:%w for service %s", n.Name(), discovery.ErrDiscoveryDown, serviceName) + log.Warnf("executor %s:%v for service %s,err:%v", n.Name(), discovery.ErrDiscoveryDown, serviceName, err) } //更新目标服务的节点列表 n.services.Set(serviceName, res) diff --git a/drivers/output/nsq/write.go b/drivers/output/nsq/write.go index 977eb9b2..4906a1bd 100644 --- a/drivers/output/nsq/write.go +++ b/drivers/output/nsq/write.go @@ -12,7 +12,7 @@ import ( ) const ( - maxBufSize = 32 + maxBufSize = 4 * 1024 * 1024 // 4M ) type Writer struct { @@ -26,6 +26,7 @@ type Writer struct { //multiBodySize int64 //multiBodies []multiBody bodyChan chan []byte + bodySize int poolChan chan *producerPool } @@ -141,14 +142,15 @@ func (n *Writer) doLoop() { } buf = append(buf, body) + n.bodySize += len(body) if pool == nil { timer.Reset(500 * time.Millisecond) continue } - if len(buf) >= maxBufSize { + if n.bodySize >= maxBufSize { tmp := buf buf = buf[:0] - + n.bodySize = 0 err := pool.Publish(n.topic, tmp) if err != nil { log.Error("nsq publish error: ", err.Error()) diff --git a/scope-manager/proxy.go b/scope-manager/proxy.go index f302cd57..b341e745 100644 --- a/scope-manager/proxy.go +++ b/scope-manager/proxy.go @@ -13,7 +13,9 @@ func newProxy() *_Proxy { } func (p *_Proxy) Set(values []interface{}) { - p.pointer.Store(&values) + tmp := make([]interface{}, 0, len(values)) + tmp = append(tmp, values...) + p.pointer.Store(&tmp) } func (p *_Proxy) List() []interface{} {