Supplement nats related code.

This commit is contained in:
yqchilde
2021-10-13 19:38:11 +08:00
parent ca7ca5ed5f
commit dd08a4aac5
8 changed files with 310 additions and 12 deletions

1
go.mod
View File

@@ -41,6 +41,7 @@ require (
github.com/lib/pq v1.3.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
github.com/nats-io/nats.go v1.9.1
github.com/opentracing/opentracing-go v1.2.0
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5
github.com/openzipkin/zipkin-go v0.2.2

5
go.sum
View File

@@ -448,11 +448,16 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=

View File

@@ -3,6 +3,7 @@
- RocketMQ
- RabbitMQ
- Kafka
- Nats
## 作用
@@ -15,6 +16,7 @@
- RocketMQ Go客户端: https://github.com/apache/rocketmq-client-go
- kafka-go: https://github.com/segmentio/kafka-go
- Nats.go: github.com/nats-io/nats.go
> 如果是阿里云RocketMQ: 可以使用官方自己出的库

View File

@@ -1,18 +1,22 @@
package queue
import (
"encoding/base64"
"flag"
"fmt"
"log"
"os"
"strings"
"sync"
"testing"
"github.com/go-eagle/eagle/pkg/testing/lich"
"time"
"github.com/Shopify/sarama"
"github.com/go-eagle/eagle/pkg/queue/kafka"
"github.com/go-eagle/eagle/pkg/queue/nats"
"github.com/go-eagle/eagle/pkg/queue/rabbitmq"
"github.com/go-eagle/eagle/pkg/testing/lich"
)
func TestMain(m *testing.M) {
@@ -84,3 +88,54 @@ func TestKafka(t *testing.T) {
kafka.NewConsumer(config, logger, topic, groupID, brokers).Consume()
})
}
func TestNats(t *testing.T) {
var (
addr = "nats://localhost:4222"
topic = "hello"
)
producer := nats.NewProducer(addr)
consumer := nats.NewConsumer(addr)
published := make(chan struct{})
received := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for {
select {
case <-published:
time.Sleep(3 * time.Second)
if err := producer.Publish(topic, []byte("hello nats")); err != nil {
t.Error(err)
}
t.Log("producer handler publish msg: ", "hello nats")
case <-received:
wg.Done()
break
}
}
}()
go func() {
for {
select {
default:
handler := func(message []byte) error {
decodeMessage, _ := base64.StdEncoding.DecodeString(strings.Trim(string(message), "\""))
t.Log("consumer handler receive msg: ", string(decodeMessage))
received <- struct{}{}
wg.Done()
return nil
}
if err := consumer.Consume(topic, handler); err != nil {
t.Error(err)
}
time.Sleep(5 * time.Second)
}
}
}()
published <- struct{}{}
wg.Wait()
}

View File

@@ -1,12 +1,126 @@
package nats
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
type Consumer struct {
addr string
conn *nats.Conn
subscribe *nats.Subscription
connClose chan bool
quit chan struct{}
}
func NewConsumer() *Consumer {
return &Consumer{}
func NewConsumer(addr string) *Consumer {
c := &Consumer{
addr: addr,
connClose: make(chan bool),
quit: make(chan struct{}),
}
if err := c.Start(); err != nil {
log.Println("nats start consumer err: ", err)
}
return c
}
func (c *Consumer) Consume() {
func (c *Consumer) Start() error {
if err := c.Run(); err != nil {
return err
}
log.Println("nats consumer connected and running!")
go c.ReConnect()
return nil
}
func (c *Consumer) Stop() {
close(c.quit)
if !c.conn.IsClosed() {
_ = c.subscribe.Unsubscribe()
c.conn.Close()
}
}
func (c *Consumer) Run() error {
var err error
opts := nats.Options{
MaxReconnect: -1,
ClosedCB: func(conn *nats.Conn) {
c.connClose <- true
log.Println("nats consumer - connection closed cb")
},
DisconnectedErrCB: func(conn *nats.Conn, err error) {
log.Println("nats consumer - connection disconnected err cb")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("nats consumer - connection reconnected cb")
},
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("nats consumer - connection async err cb")
},
}
c.conn, err = opts.Connect()
return err
}
func (c *Consumer) ReConnect() {
for {
select {
case closed := <-c.connClose:
if closed {
log.Println("nats consumer - connection closed")
}
case <-c.quit:
return
}
if !c.conn.IsClosed() {
c.conn.Close()
}
quit:
for {
select {
case <-c.quit:
return
default:
log.Println("nats consumer - reconnect")
if err := c.Run(); err != nil {
log.Println("nats consumer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
log.Println("nats consumer connected and running!")
break quit
}
}
}
}
func (c *Consumer) Consume(topic string, handler interface{}) error {
encodeConn, err := nats.NewEncodedConn(c.conn, nats.JSON_ENCODER)
if err != nil {
return err
}
if c.subscribe != nil {
_ = c.subscribe.Unsubscribe()
c.subscribe = nil
}
c.subscribe, err = encodeConn.Subscribe(topic, handler)
if err != nil {
return err
}
_ = encodeConn.Flush()
return nil
}

13
pkg/queue/nats/init.go Normal file
View File

@@ -0,0 +1,13 @@
package nats
var (
Queue *Producer
)
type Config struct {
Addr string `mapstructure:"name"`
}
func Init(cfg *Config) {
Queue = NewProducer(cfg.Addr)
}

View File

@@ -1,17 +1,112 @@
package nats
import "log"
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
type Producer struct {
topic string
addr string
conn *nats.Conn
connClose chan bool
quit chan struct{}
}
func NewProducer(logger *log.Logger, topic string) *Producer {
return &Producer{
topic: topic,
func NewProducer(addr string) *Producer {
p := &Producer{
addr: addr,
connClose: make(chan bool),
quit: make(chan struct{}),
}
if err := p.Start(); err != nil {
log.Println("nats start producer err: ", err)
}
return p
}
func (p *Producer) Start() error {
if err := p.Run(); err != nil {
return err
}
log.Println("nats producer connected and running!")
go p.ReConnect()
return nil
}
func (p *Producer) Stop() {
close(p.quit)
if !p.conn.IsClosed() {
p.conn.Close()
}
}
func (p *Producer) Publish(message string) {
func (p *Producer) Run() error {
var err error
opts := nats.Options{
MaxReconnect: -1,
ClosedCB: func(conn *nats.Conn) {
p.connClose <- true
log.Println("nats producer - connection closed cb")
},
DisconnectedErrCB: func(conn *nats.Conn, err error) {
log.Println("nats producer - connection disconnected err cb")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("nats producer - connection reconnected cb")
},
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("nats producer - connection async err cb")
},
}
p.conn, err = opts.Connect()
return err
}
func (p *Producer) ReConnect() {
for {
select {
case closed := <-p.connClose:
if closed {
log.Println("nats producer - connection closed")
}
case <-p.quit:
return
}
if !p.conn.IsClosed() {
p.conn.Close()
}
quit:
for {
select {
case <-p.quit:
return
default:
log.Println("nats producer - reconnect")
if err := p.Run(); err != nil {
log.Println("nats producer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
log.Println("nats producer connected and running!")
break quit
}
}
}
}
func (p *Producer) Publish(topic string, data interface{}) error {
encodeConn, err := nats.NewEncodedConn(p.conn, nats.JSON_ENCODER)
if err != nil {
return err
}
return encodeConn.Publish(topic, data)
}

View File

@@ -0,0 +1,13 @@
version: "3.3"
services:
nats:
container_name: "nats"
image: "nats"
hostname: "nats"
ports:
- "4222:4222"
- "6222:6222"
- "8222:8222"
labels:
NAME: "nats"