diff --git a/ROADMAP.md b/ROADMAP.md index 1f0685e..a6c1eac 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -8,4 +8,8 @@ - Nats/JetStream - Redis - NSQ -- Rabbitmq/AMQP \ No newline at end of file +- Rabbitmq/AMQP + +### Metrics +- Services/Edges +- Streams \ No newline at end of file diff --git a/go.mod b/go.mod index f1319d3..8ebf11b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/IBM/sarama v1.43.0 github.com/go-kratos/kratos/v2 v2.7.2 github.com/jumboframes/armorigo v0.4.0-rc.1 + github.com/rabbitmq/amqp091-go v1.9.0 github.com/singchia/geminio v1.1.5-rc.2 github.com/singchia/go-timer/v2 v2.2.1 github.com/soheilhy/cmux v0.1.5 diff --git a/go.sum b/go.sum index 9b4c711..1e4a193 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -130,6 +132,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/pkg/config/config.go b/pkg/config/config.go index 80ff646..783bf9e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "time" "github.com/IBM/sarama" armio "github.com/jumboframes/armorigo/io" @@ -82,10 +81,13 @@ type MQ struct { } type Kafka struct { - Addrs []string `yaml:"addrs"` + Enable bool `yaml:"enable"` + Addrs []string `yaml:"addrs"` // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { + // topics to notify frontier which topics to allow to publish + Topics []string // The maximum permitted size of a message (defaults to 1000000). Should be // set equal to or smaller than the broker's `message.max.bytes`. MaxMessageBytes int @@ -98,7 +100,7 @@ type Kafka struct { // RequiredAcks is set to WaitForAll or a number > 1. Only supports // millisecond resolution, nanoseconds will be truncated. Equivalent to // the JVM producer's `request.timeout.ms` setting. - Timeout time.Duration + Timeout int // The type of compression to use on messages (defaults to no compression). // Similar to `compression.codec` setting of the JVM producer. Compression sarama.CompressionCodec @@ -123,7 +125,7 @@ type Kafka struct { Messages int // The best-effort frequency of flushes. Equivalent to // `queue.buffering.max.ms` setting of JVM producer. - Frequency time.Duration + Frequency int // The maximum number of messages the producer will send in a single // broker request. Defaults to 0 for unlimited. Similar to // `queue.buffering.max.messages` in the JVM producer. @@ -136,13 +138,80 @@ type Kafka struct { // How long to wait for the cluster to settle between retries // (default 100ms). Similar to the `retry.backoff.ms` setting of the // JVM producer. - Backoff time.Duration + Backoff int } } } +type AMQP struct { + Enable bool `yaml:"enable"` + // TODO we don't support multiple addresses for now + Addrs []string `yaml:"addrs"` + // Vhost specifies the namespace of permissions, exchanges, queues and + // bindings on the server. Dial sets this to the path parsed from the URL. + Vhost string + // 0 max channels means 2^16 - 1 + ChannelMax int + // 0 max bytes means unlimited + FrameSize int + // less than 1s uses the server's interval + Heartbeat int + // Connection locale that we expect to always be en_US + // Even though servers must return it as per the AMQP 0-9-1 spec, + // we are not aware of it being used other than to satisfy the spec requirements + Locale string + // exchange to declare + Exchanges []struct { + // exchange name to declare + Name string + // direct topic fanout headers, default direct + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + } + // queues to declare, default nil + Queues []struct { + Name string + Durable bool + AutoDelete bool + Exclustive bool + NoWait bool + } + // queue bindings to exchange, default nil + QueueBindings []struct { + QueueName string + ExchangeName string + BindingKey string + NoWait bool + } + Producer struct { + // there are actually topics + RoutingKeys []string + Exchange string + Mandatory bool + Immediate bool + + // message related + Headers map[string]interface{} + // properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + Priority uint8 // 0 to 9 + ReplyTo string // address to to reply to (ex: RPC) + Expiration string // message expiration spec + Type string // message type name + UserId string // creating user id - ex: "guest" + AppId string // creating application id + + } +} + type MQM struct { Kafka Kafka `yaml:"kafka"` + AMQP AMQP `yaml:"amqp"` } // exchange diff --git a/pkg/mq/mq_amqp.go b/pkg/mq/mq_amqp.go new file mode 100644 index 0000000..dc67db2 --- /dev/null +++ b/pkg/mq/mq_amqp.go @@ -0,0 +1,126 @@ +package mq + +import ( + "context" + "time" + + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/singchia/frontier/pkg/apis" + "github.com/singchia/frontier/pkg/config" + "github.com/singchia/geminio" + "k8s.io/klog/v2" +) + +type mqAMQP struct { + // TODO reconnect + conn *amqp.Connection + channel *amqp.Channel + + // conf + conf *config.AMQP +} + +func newAMQP(config *config.Configuration) (*mqAMQP, error) { + conf := config.MQM.AMQP + aconf := initAMQPConfig(&conf) + + url := "amqp://" + conf.Addrs[0] + conn, err := amqp.DialConfig(url, *aconf) + if err != nil { + return nil, err + } + channel, err := conn.Channel() + if err != nil { + return nil, err + } + // exchanges declare + if conf.Exchanges != nil { + for _, elem := range conf.Exchanges { + err = channel.ExchangeDeclare(elem.Name, elem.Kind, elem.Durable, elem.AutoDelete, elem.Internal, elem.NoWait, nil) + if err != nil { + klog.Errorf("exchange declare err: %s, name: %s, kind: %s", err, elem.Name, elem.Kind) + } + } + } + // queues declare + if conf.Queues != nil { + for _, elem := range conf.Queues { + _, err = channel.QueueDeclare(elem.Name, elem.Durable, elem.AutoDelete, elem.Exclustive, elem.NoWait, nil) + if err != nil { + klog.Errorf("queue declare err: %s, name: %s", err, elem.Name) + } + } + } + // queue bindings + if conf.QueueBindings != nil { + for _, elem := range conf.QueueBindings { + err = channel.QueueBind(elem.QueueName, elem.BindingKey, elem.ExchangeName, elem.NoWait, nil) + if err != nil { + klog.Errorf("queue bind err: %s, queue name: %s, binding key: %s, exchange name: %s", err, elem.QueueName, elem.BindingKey, elem.ExchangeName) + } + } + } + return &mqAMQP{ + conn: conn, + channel: channel, + }, nil +} + +func initAMQPConfig(conf *config.AMQP) *amqp.Config { + aconf := &amqp.Config{} + if conf.Vhost != "" { + aconf.Vhost = conf.Vhost + } + if conf.ChannelMax != 0 { + aconf.ChannelMax = conf.ChannelMax + } + if conf.FrameSize != 0 { + aconf.FrameSize = conf.FrameSize + } + if conf.Heartbeat != 0 { + aconf.Heartbeat = time.Duration(conf.Heartbeat) * time.Second + } + if conf.Locale != "" { + aconf.Locale = conf.Locale + } + return aconf +} + +func (mq *mqAMQP) Produce(topic string, data []byte, opts ...apis.OptionProduce) error { + opt := &apis.ProduceOption{} + for _, fun := range opts { + fun(opt) + } + message := opt.Origin.(geminio.Message) + + publishing := amqp.Publishing{ + ContentType: mq.conf.Producer.ContentType, + ContentEncoding: mq.conf.Producer.ContentEncoding, + DeliveryMode: mq.conf.Producer.DeliveryMode, + Priority: mq.conf.Producer.Priority, + ReplyTo: mq.conf.Producer.ReplyTo, + Expiration: mq.conf.Producer.Expiration, + MessageId: uuid.New().String(), + Timestamp: time.Now(), + Type: mq.conf.Producer.Type, + UserId: mq.conf.Producer.UserId, + AppId: mq.conf.Producer.AppId, + Body: data, + } + // TODO async or confirmation handle + err := mq.channel.PublishWithContext(context.TODO(), + mq.conf.Producer.Exchange, topic, mq.conf.Producer.Mandatory, mq.conf.Producer.Immediate, publishing) + if err != nil { + klog.Errorf("mq amqp producer, publish err: %s", err) + message.Error(err) + return err + } + message.Done() + return nil +} + +func (mq *mqAMQP) Close() error { + mq.channel.Close() + return mq.conn.Close() +} diff --git a/pkg/mq/mq_kafka.go b/pkg/mq/mq_kafka.go index d29ee94..fdf68c7 100644 --- a/pkg/mq/mq_kafka.go +++ b/pkg/mq/mq_kafka.go @@ -1,6 +1,8 @@ package mq import ( + "time" + "github.com/IBM/sarama" "github.com/singchia/frontier/pkg/apis" "github.com/singchia/frontier/pkg/config" @@ -12,9 +14,9 @@ type mqKafka struct { producer sarama.AsyncProducer } -func newMQKafka(conf *config.Configuration) (*mqKafka, error) { - kafka := conf.MQM.Kafka - sconf := initConfig(&kafka) +func newKafka(config *config.Configuration) (*mqKafka, error) { + kafka := config.MQM.Kafka + sconf := initKafkaConfig(&kafka) producer, err := sarama.NewAsyncProducer(kafka.Addrs, sconf) if err != nil { @@ -53,48 +55,48 @@ func newMQKafka(conf *config.Configuration) (*mqKafka, error) { }, nil } -func initConfig(kafka *config.Kafka) *sarama.Config { +func initKafkaConfig(conf *config.Kafka) *sarama.Config { sconf := sarama.NewConfig() sconf.Producer.Return.Successes = true sconf.Producer.Return.Errors = true - if kafka.Producer.MaxMessageBytes != 0 { - sconf.Producer.MaxMessageBytes = kafka.Producer.MaxMessageBytes + if conf.Producer.MaxMessageBytes != 0 { + sconf.Producer.MaxMessageBytes = conf.Producer.MaxMessageBytes } - if kafka.Producer.RequiredAcks != 0 { - sconf.Producer.RequiredAcks = kafka.Producer.RequiredAcks + if conf.Producer.RequiredAcks != 0 { + sconf.Producer.RequiredAcks = conf.Producer.RequiredAcks } - if kafka.Producer.Timeout != 0 { - sconf.Producer.Timeout = kafka.Producer.Timeout + if conf.Producer.Timeout != 0 { + sconf.Producer.Timeout = time.Duration(conf.Producer.Timeout) * time.Second } - if kafka.Producer.Idempotent { - sconf.Producer.Idempotent = kafka.Producer.Idempotent + if conf.Producer.Idempotent { + sconf.Producer.Idempotent = conf.Producer.Idempotent } // compression - if kafka.Producer.Compression != 0 { - sconf.Producer.Compression = kafka.Producer.Compression + if conf.Producer.Compression != 0 { + sconf.Producer.Compression = conf.Producer.Compression } - if kafka.Producer.CompressionLevel != 0 { - sconf.Producer.CompressionLevel = kafka.Producer.CompressionLevel + if conf.Producer.CompressionLevel != 0 { + sconf.Producer.CompressionLevel = conf.Producer.CompressionLevel } // retry - if kafka.Producer.Retry.Backoff != 0 { - sconf.Producer.Retry.Backoff = kafka.Producer.Retry.Backoff + if conf.Producer.Retry.Backoff != 0 { + sconf.Producer.Retry.Backoff = time.Duration(conf.Producer.Retry.Backoff) * time.Second } - if kafka.Producer.Retry.Max != 0 { - sconf.Producer.Retry.Max = kafka.Producer.Retry.Max + if conf.Producer.Retry.Max != 0 { + sconf.Producer.Retry.Max = conf.Producer.Retry.Max } // flush - if kafka.Producer.Flush.Bytes != 0 { - sconf.Producer.Flush.Bytes = kafka.Producer.Flush.Bytes + if conf.Producer.Flush.Bytes != 0 { + sconf.Producer.Flush.Bytes = conf.Producer.Flush.Bytes } - if kafka.Producer.Flush.Frequency != 0 { - sconf.Producer.Flush.Frequency = kafka.Producer.Flush.Frequency + if conf.Producer.Flush.Frequency != 0 { + sconf.Producer.Flush.Frequency = time.Duration(conf.Producer.Flush.Frequency) * time.Second } - if kafka.Producer.Flush.MaxMessages != 0 { - sconf.Producer.Flush.MaxMessages = kafka.Producer.Flush.MaxMessages + if conf.Producer.Flush.MaxMessages != 0 { + sconf.Producer.Flush.MaxMessages = conf.Producer.Flush.MaxMessages } - if kafka.Producer.Flush.Messages != 0 { - sconf.Producer.Flush.Messages = kafka.Producer.Flush.Messages + if conf.Producer.Flush.Messages != 0 { + sconf.Producer.Flush.Messages = conf.Producer.Flush.Messages } return sconf }