feat(mqtt): 修改MQTT配置参数

This commit is contained in:
ylw
2023-02-22 17:37:04 +08:00
parent 769236237c
commit 394f01ab6b
5 changed files with 15 additions and 11 deletions

View File

@@ -3,10 +3,11 @@ package engine
import (
"context"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/panjf2000/ants"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"github.com/yuelwish/mqtt-bridge/pkg/xmqtt"
"log"
"runtime"
)
const (
@@ -45,7 +46,7 @@ func (e *Engine) Dial() error {
}
func (e *Engine) handlerMessage(ctx context.Context) {
gPool, _ := ants.NewPool(ants.DEFAULT_ANTS_POOL_SIZE)
gPool, _ := ants.NewPool(runtime.NumCPU() * 10)
defer gPool.Release()
for msg := range e.MessageChan {
@@ -80,7 +81,7 @@ func (e *Engine) handlerMessage(ctx context.Context) {
}
if err = gPool.Submit(func() {
err := xmqtt.Send(client, msg.Topic, msg.Payload)
err := xmqtt.Send(client, msg.Topic, 0, false, msg.Payload)
if err != nil {
log.Printf("[send message] %s ==> %v t:%s failed: %v", msg.FromTag, tTag, msg.Topic, err)
} else {

View File

@@ -142,6 +142,6 @@ func (e *EngineHelper) BuildEngine() (*Engine, error) {
cliSubMap: cliSubMap,
filterTree: filterTree,
toTopicMap: toTopicMap,
MessageChan: make(chan *Message, 4096),
MessageChan: make(chan *Message, 1024),
}, nil
}

3
go.mod
View File

@@ -4,7 +4,7 @@ go 1.19
require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/panjf2000/ants v1.3.0
github.com/panjf2000/ants/v2 v2.7.1
github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.15.0
)
@@ -16,6 +16,7 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect

7
go.sum
View File

@@ -140,8 +140,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M=
github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -151,7 +151,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=

View File

@@ -17,7 +17,8 @@ func Init(clientIdPrefix, addr string, optFns ...func(opt *mqtt.ClientOptions))
opts := mqtt.NewClientOptions()
opts.AddBroker(addr)
opts.SetClientID(clientIdPrefix + "-" + strconv.FormatInt(time.Now().UnixNano(), 36))
opts.SetKeepAlive(10 * time.Second)
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(5 * time.Second)
opts.SetMaxReconnectInterval(10 * time.Second)
opts.SetOnConnectHandler(func(client mqtt.Client) {
@@ -69,8 +70,8 @@ func UnSubscribe(client mqtt.Client, topic ...string) error {
return nil
}
func Send(client mqtt.Client, topic string, payload []byte) error {
token := client.Publish(topic, 1, false, payload)
func Send(client mqtt.Client, topic string, qos byte, retained bool, payload []byte) error {
token := client.Publish(topic, qos, retained, payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}