diff --git a/go.mod b/go.mod index a1f76ab..656d3c9 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/nsqio/go-nsq v1.0.8 github.com/olivere/elastic v6.2.31+incompatible github.com/pkg/errors v0.9.1 + github.com/rabbitmq/amqp091-go v1.8.1 github.com/spf13/cobra v1.0.0 github.com/syndtr/goleveldb v1.0.0 github.com/usthooz/gutil v0.0.0-20220415021411-9948fc1f4e88 @@ -59,7 +60,6 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/sirupsen/logrus v1.4.0 // indirect github.com/spf13/pflag v1.0.3 // indirect - github.com/stretchr/testify v1.6.1 // indirect github.com/tidwall/gjson v1.13.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect diff --git a/go.sum b/go.sum index 9a27558..e5a5ed0 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,8 @@ github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= +github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -242,11 +244,13 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= @@ -273,6 +277,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= @@ -420,8 +426,9 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.0.1 h1:omJoilUzyrAp0xNoio88lGJCroGdIOen9hq2A/+3ifw= gorm.io/driver/mysql v1.0.1/go.mod h1:KtqSthtg55lFp3S5kUXqlGaelnWpKitn4k1xZTnoiPw= gorm.io/gorm v1.9.19/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= diff --git a/myrabbitmq/myrocketmq.go b/myrabbitmq/myrocketmq.go new file mode 100644 index 0000000..0adf76b --- /dev/null +++ b/myrabbitmq/myrocketmq.go @@ -0,0 +1,180 @@ +package myrabbitmq + +import ( + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/xxjwxc/public/message" +) + +type HandlerRocketMqRead func(msg []byte) error //ID , MESSAGEID,内容 + +// MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost (默认是5672端口) +// 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件设置,也可以启动后通过netstat -tlnp查看 +const MQURL = "amqp://admin:huan91uncc@172.21.138.131:5672/" + +type MyRabbitMQ struct { + Conn *amqp.Connection + Channel *amqp.Channel + // 队列名称 + QueueName string + // 交换机 + Exchange string + // routing Key + RoutingKey string + //MQ链接字符串 + Mqurl string +} + +// NewRabbitMQ 创建结构体实例 +func NewRabbitMQ(mqurl string) (*MyRabbitMQ, error) { + rabbitMQ := MyRabbitMQ{ + Mqurl: mqurl, + } + var err error + //创建rabbitmq连接 + rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl) + if err != nil { + return nil, err + } + + //创建Channel + rabbitMQ.Channel, err = rabbitMQ.Conn.Channel() + if err != nil { + return nil, err + } + + return &rabbitMQ, nil +} + +// 释放资源,建议NewRabbitMQ获取实例后 配合defer使用 +func (mq *MyRabbitMQ) Close() error { + if mq.Conn == nil || mq.Channel == nil { + return message.GetError(message.InValidOp) + } + + err := mq.Conn.Close() + if err != nil { + return err + } + + return mq.Channel.Close() +} + +// NewProducer 创建生产者 +func (mq *MyRabbitMQ) NewProducer(queueName, exchange, routingKey string) (*amqp.Channel, error) { + mq.QueueName = queueName + mq.Exchange = exchange + mq.RoutingKey = routingKey + // 1.声明队列 + /* + 如果只有一方声明队列,可能会导致下面的情况: + a)消费者是无法订阅或者获取不存在的MessageQueue中信息 + b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃 + + 为了避免上面的问题,所以最好选择两方一起声明 + ps:如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的 + */ + _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用 + mq.QueueName, // 队列名 + true, // 是否持久化 + false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列) + false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除) + false, // 是否阻塞 + nil, // 额外属性(我还不会用) + ) + if err != nil { + return nil, err + } + + // 2.声明交换器 + err = mq.Channel.ExchangeDeclare( + mq.Exchange, //交换器名 + "topic", //exchange type:一般用fanout、direct、topic + true, // 是否持久化 + false, // 是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑) + false, // 设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式 + false, // 是否阻塞 + nil, // 额外属性 + ) + if err != nil { + return nil, err + } + + // 3.建立Binding(可随心所欲建立多个绑定关系) + err = mq.Channel.QueueBind( + mq.QueueName, // 绑定的队列名称 + mq.RoutingKey, // bindkey 用于消息路由分发的key + mq.Exchange, // 绑定的exchange名 + false, // 是否阻塞 + nil, // 额外属性 + ) + if err != nil { + return nil, err + } + + return mq.Channel, nil +} + +// SendMessage 发送消息(level 代表延迟级别) +func (mq *MyRabbitMQ) SendMessage(msg []byte) error { + // 4.发送消息 + return mq.Channel.Publish( + mq.Exchange, // 交换器名 + mq.RoutingKey, // routing key + false, // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者 + false, // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者 + amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象 + ContentType: "text/plain", // 消息内容的类型 + Body: msg, // 消息内容 + }, + ) +} + +// 消费者 +func (mq *MyRabbitMQ) NewConsumer(queueName string) (<-chan amqp.Delivery, error) { + mq.QueueName = queueName + // 1.声明队列(两端都要声明,原因在生产者处已经说明) + _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用 + mq.QueueName, // 队列名 + true, // 是否持久化 + false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列) + false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除) + false, // 是否阻塞 + nil, // 额外属性(我还不会用) + ) + if err != nil { + fmt.Println("声明队列失败", err) + return nil, err + } + + // 2.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息 + msgChanl, err := mq.Channel.Consume( + mq.QueueName, // 队列名 + "", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略 + false, // 是否自动应答 + false, // 是否排他 + false, // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息 + false, // 队列消费是否阻塞 + nil, // 额外属性 + ) + if err != nil { + fmt.Println("获取消息失败", err) + return nil, err + } + + return msgChanl, nil +} + +func (m *MyRabbitMQ) Start(msgChanl <-chan amqp.Delivery, hand HandlerRocketMqRead) error { // 阻塞模式 + for msg := range msgChanl { + // 这里写你的处理逻辑 + // 获取到的消息是amqp.Delivery对象,从中可以获取消息信息 + if hand(msg.Body) == nil { + msg.Ack(true) // 主动应答 + } + + } + + return nil +} diff --git a/myrabbitmq/myrocketmq_test.go b/myrabbitmq/myrocketmq_test.go new file mode 100644 index 0000000..7d3c906 --- /dev/null +++ b/myrabbitmq/myrocketmq_test.go @@ -0,0 +1,57 @@ +package myrabbitmq + +import ( + "fmt" + "testing" + "time" + + "github.com/xxjwxc/public/message" +) + +func Test_NewAdmin(t *testing.T) { + topic := "xxjtest" + tag := "tagtest" + host := "amqp://admin:admin@192.155.1.151:5672/" + group := "nlp_cmd_train" + // 初始化mq + mq, err := NewRabbitMQ(host) + if err != nil { + fmt.Println(err) + } + defer mq.Close() // 完成任务释放资源 + _, err = mq.NewProducer(topic, tag, group) + fmt.Println(err) + // go func() { + // for i := 0; i < 1000; i++ { + // mq.SendMessage([]byte(fmt.Sprintf("this is xxj test %v", i))) + // time.Sleep(1 * time.Microsecond) + // } + // }() + // time.Sleep(10 * time.Minute) + + go func() { + ch, err := mq.NewConsumer(topic) // 消費者 + fmt.Println(err) + mq.Start(ch, func(msg []byte) error { + fmt.Println("消费者1", string(msg)) + time.Sleep(1 * time.Second) + return nil + }) + time.Sleep(10 * time.Minute) + }() + + go func() { + ch, err := mq.NewConsumer(topic) // 消費者 + fmt.Println(err) + mq.Start(ch, func(msg []byte) error { + fmt.Println("消费者2", string(msg)) + time.Sleep(1 * time.Second) + return message.GetError(message.ActvFailure) + }) + time.Sleep(10 * time.Minute) + + }() + + time.Sleep(10 * time.Minute) + mq.Close() +} diff --git a/myrocketmq/myrocketmq.go b/myrocketmq/myrocketmq.go index 208a183..8c50eba 100644 --- a/myrocketmq/myrocketmq.go +++ b/myrocketmq/myrocketmq.go @@ -44,10 +44,10 @@ func (m *MyRocketAdmin) CreateTopic(topic string, readQueueNums int, writeQueueN func (m *MyRocketAdmin) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) { if m.admin == nil { - return message.GetError(message.StateError) + return nil, message.GetError(message.StateError) } - - return m.admin.FetchPublishMessageQueues(context.Background(), admin.WithTopicCreate(topic)) + + return m.admin.FetchPublishMessageQueues(context.Background(), topic) } func (m *MyRocketAdmin) Close() error { diff --git a/myrocketmq/myrocketmq_test.go b/myrocketmq/myrocketmq_test.go index 2ea1c2a..4a9b899 100644 --- a/myrocketmq/myrocketmq_test.go +++ b/myrocketmq/myrocketmq_test.go @@ -15,8 +15,6 @@ func Test_NewAdmin(t *testing.T) { fmt.Println(err) adm.CreateTopic(topic, 0, 0) - - pwd, err := NewProducer(host, group, 2) // 生产者 fmt.Println(err) go func() {