diff --git a/CHANGELOG.md b/CHANGELOG.md index 39a9d04..fec5969 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.11.0 + +- chore: improve kakfa component + ## v1.10.0 - feat: support async flush log to disk diff --git a/config/local/kafka.yaml b/config/local/kafka.yaml new file mode 100644 index 0000000..7394b82 --- /dev/null +++ b/config/local/kafka.yaml @@ -0,0 +1,22 @@ +default: # 实例名称 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "test-topic" + ConsumeTopic: + - "test-topic1" + - "test-topic2" + Brokers: + - "localhost:9092" + GroupID: "test-group" + Partitioner: "hash" + +order: # 另一个实例配置 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "order-topic" + ConsumeTopic: + - "order-topic" + Brokers: + - "localhost:9092" + GroupID: "order-group" + Partitioner: "random" diff --git a/examples/queue/kafka/config/app.yaml b/examples/queue/kafka/config/app.yaml new file mode 100644 index 0000000..28e0df5 --- /dev/null +++ b/examples/queue/kafka/config/app.yaml @@ -0,0 +1,22 @@ +Name: eagle +Version: 1.0.0 +PprofPort: :5555 +Mode: debug # debug, release, test +JwtSecret: JWT_SECRET +JwtTimeout: 86400 +CookieName: jwt-token +SSL: true +CtxDefaultTimeout: 12 +CSRF: true +Debug: false +EnableTrace: false +EnablePprof: true + +HTTP: + Addr: :8080 + ReadTimeout: 3s + WriteTimeout: 3s +GRPC: + Addr: :9090 + ReadTimeout: 5s + WriteTimeout: 5s \ No newline at end of file diff --git a/examples/queue/kafka/config/kafka.yaml b/examples/queue/kafka/config/kafka.yaml new file mode 100644 index 0000000..7394b82 --- /dev/null +++ b/examples/queue/kafka/config/kafka.yaml @@ -0,0 +1,22 @@ +default: # 实例名称 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "test-topic" + ConsumeTopic: + - "test-topic1" + - "test-topic2" + Brokers: + - "localhost:9092" + GroupID: "test-group" + Partitioner: "hash" + +order: # 另一个实例配置 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "order-topic" + ConsumeTopic: + - "order-topic" + Brokers: + - "localhost:9092" + GroupID: "order-group" + Partitioner: "random" diff --git a/examples/queue/kafka/config/logger.yaml b/examples/queue/kafka/config/logger.yaml new file mode 100644 index 0000000..e7e0d05 --- /dev/null +++ b/examples/queue/kafka/config/logger.yaml @@ -0,0 +1,14 @@ +Development: false +DisableCaller: false +DisableStacktrace: false +Encoding: json # json or console +Level: info # 日志级别,INFO, WARN, ERROR +Name: eagle +Writers: console # 有2个可选项:file,console 选择file会将日志记录到logger_file指定的日志文件中,选择console会将日志输出到标准输出,当然也可以两者同时选择 +LoggerFile: /tmp/log/eagle.log +LoggerWarnFile: /tmp/log/eagle.wf.log +LoggerErrorFile: /tmp/log/eagle.err.log +LogRollingPolicy: daily +LogRotateDate: 1 +LogRotateSize: 1 +LogBackupCount: 7 \ No newline at end of file diff --git a/examples/queue/kafka/main.go b/examples/queue/kafka/main.go index 06ffcf3..2ab824f 100644 --- a/examples/queue/kafka/main.go +++ b/examples/queue/kafka/main.go @@ -1,27 +1,51 @@ package main import ( + "context" "log" - "os" - "github.com/Shopify/sarama" "github.com/go-eagle/eagle/pkg/queue/kafka" ) func main() { - var ( - config = sarama.NewConfig() - logger = log.New(os.Stderr, "[sarama_logger]", log.LstdFlags) - groupID = "sarama_consumer" - topic = "go-message-broker-topic" - brokers = []string{"localhost:9093"} - message = "Hello World Kafka!" - ) + // 1. 初始化配置 + kafka.Load() + defer kafka.Close() - // kafka publish message - kafka.NewProducer(config, logger, topic, brokers).Publish(message) + // 2. 获取配置信息(可选) + configs := kafka.GetConfig() + if len(configs) == 0 { + log.Fatal("No kafka config found") + } - // kafka consume message - kafka.NewConsumer(config, logger, topic, groupID, brokers).Consume() + // 3. 使用配置进行消息发布 + ctx := context.Background() + err := kafka.Publish(ctx, "default", "test-topic", "hello world") + if err != nil { + log.Printf("Failed to publish message: %v", err) + } + // 4. 使用配置进行消息消费 + handler := func(data []byte) error { + log.Printf("Received message: %s", string(data)) + return nil + } + + // 从默认实例消费 + go func() { + err := kafka.ConsumePartition(ctx, "default", "test-topic", handler) + if err != nil { + log.Printf("Failed to consume message: %v", err) + } + }() + + // 从order实例消费 + go func() { + err := kafka.ConsumePartition(ctx, "order", "order-topic", handler) + if err != nil { + log.Printf("Failed to consume message: %v", err) + } + }() + + select {} } diff --git a/go.mod b/go.mod index 150cc08..5f3ddfc 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/go-eagle/eagle go 1.22 require ( - github.com/Shopify/sarama v1.19.0 + github.com/IBM/sarama v1.45.1 github.com/alicebob/miniredis/v2 v2.15.1 github.com/cenkalti/backoff/v4 v4.2.1 github.com/dgraph-io/ristretto v0.1.0 @@ -44,7 +44,7 @@ require ( github.com/spf13/cast v1.4.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.10.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/swaggo/gin-swagger v1.2.0 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/toolkits/net v0.0.0-20160910085801-3f39ab6fe3ce @@ -66,8 +66,8 @@ require ( go.opentelemetry.io/otel/trace v1.26.0 go.uber.org/automaxprocs v1.5.1 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.31.0 - golang.org/x/sync v0.10.0 + golang.org/x/crypto v0.33.0 + golang.org/x/sync v0.11.0 google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.33.0 gorm.io/driver/clickhouse v0.6.1 @@ -82,7 +82,6 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.23.2 // indirect github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect - github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect github.com/andybalholm/brotli v1.1.0 // indirect @@ -98,8 +97,8 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/eapache/go-resiliency v1.1.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect @@ -120,10 +119,13 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.4 // indirect github.com/gorilla/securecookie v1.1.1 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -132,11 +134,16 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.5.4 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/strftime v1.0.5 // indirect @@ -157,14 +164,13 @@ require ( github.com/paulmach/orb v0.11.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect - github.com/pierrec/lz4 v2.5.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil/v3 v3.23.12 // indirect @@ -190,9 +196,9 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 88e27f0..97e6c1c 100644 --- a/go.sum +++ b/go.sum @@ -52,15 +52,13 @@ github.com/ClickHouse/clickhouse-go/v2 v2.23.2/go.mod h1:aNap51J1OM3yxQJRgM+AlP/ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= +github.com/IBM/sarama v1.45.1 h1:nY30XqYpqyXOXSNoe2XCgjj9jklGM1Ye94ierUb1jQ0= +github.com/IBM/sarama v1.45.1/go.mod h1:qifDhA3VWSrQ1TjSMyxDl3nYL3oX2C83u+G6L79sq4w= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= -github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= -github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= -github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= -github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -142,10 +140,10 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= -github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= @@ -169,8 +167,8 @@ github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBd github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/foolin/gin-template v0.0.0-20190415034731-41efedfb393b h1:pAJ/RYH5lYGDg55jBX658waK6LlqdPCaB65TG6GCAfE= github.com/foolin/gin-template v0.0.0-20190415034731-41efedfb393b/go.mod h1:C2ca9FYDoq/nsiqURv2kJwzhqXAgSXTTES2q25whECc= -github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= -github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -390,8 +388,10 @@ github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sL github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -422,6 +422,18 @@ github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8= github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -452,8 +464,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -579,10 +591,8 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= -github.com/pierrec/lz4 v2.5.1+incompatible h1:Yq0up0149Hh5Ekhm/91lgkZuD1ZDnXNM26bycpTzYBM= -github.com/pierrec/lz4 v2.5.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -631,8 +641,8 @@ github.com/qiniu/api.v7 v0.0.0-20190520053455-bea02cd22bf4 h1:GIr/KoLaPKm0vmzmc+ github.com/qiniu/api.v7 v0.0.0-20190520053455-bea02cd22bf4/go.mod h1:V8/EzlTgLN6q0s0CJmg/I81ytsvldSF22F7h6MI02+c= github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= -github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= -github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 h1:EaDatTxkdHG+U3Bk4EUr+DZ7fOGwTfezUiUJMaIcaho= github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5/go.mod h1:fyalQWdtzDBECAQFBJuQe5bzQ02jGd5Qcbgb97Flm7U= github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb27yVE+gIAfeqp8LUCc= @@ -692,8 +702,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggo/gin-swagger v1.2.0 h1:YskZXEiv51fjOMTsXrOetAjrMDfFaXD79PEoQBOe2W0= @@ -747,6 +757,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= @@ -826,10 +837,12 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -865,6 +878,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -917,8 +931,11 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -943,8 +960,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1016,16 +1034,19 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1035,8 +1056,9 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1100,6 +1122,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/queue/kafka/README.md b/pkg/queue/kafka/README.md index 179cbdd..eec9033 100644 --- a/pkg/queue/kafka/README.md +++ b/pkg/queue/kafka/README.md @@ -1,4 +1,633 @@ +# This document explains how to use the Kafka package for message queue functionality +## 初始化配置 + +### 1. 配置文件示例 + +首先需要在项目的 `config/{ENV}` 目录下创建 `kafka.yaml` 配置文件: + +```yaml +default: # 实例名称 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "test-topic" + ConsumeTopic: + - "test-topic1" + - "test-topic2" + Brokers: + - "localhost:9092" + GroupID: "test-group" + Partitioner: "hash" + +order: # 另一个实例配置 + Version: "2.8.1" + RequiredAcks: 1 + Topic: "order-topic" + ConsumeTopic: + - "order-topic" + Brokers: + - "localhost:9092" + GroupID: "order-group" + Partitioner: "random" +``` + +### 2. 代码使用示例 + +#### 基本使用 + +```go +package main + +import ( + "fmt" + + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + // 加载配置 + kafka.Load() + defer kafka.Close() + + // 获取配置 + configs := kafka.GetConfig() + + // 获取指定实例的配置 + defaultCfg := configs["default"] + fmt.Printf("Default kafka config: %+v\n", defaultCfg) + + orderCfg := configs["order"] + fmt.Printf("Order kafka config: %+v\n", orderCfg) +} +``` + +#### 完整应用示例 + +```go +package main + +import ( + "context" + "log" + + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + // 1. 初始化配置 + kafka.Load() + defer kafka.Close() + + // 2. 获取配置信息(可选) + configs := kafka.GetConfig() + if len(configs) == 0 { + log.Fatal("No kafka config found") + } + + // 3. 使用配置进行消息发布 + ctx := context.Background() + err := kafka.Publish(ctx, "default", "test-topic", "hello world") + if err != nil { + log.Printf("Failed to publish message: %v", err) + } + + // 4. 使用配置进行消息消费 + handler := func(data []byte) error { + log.Printf("Received message: %s", string(data)) + return nil + } + + // 从默认实例消费 + go func() { + err := kafka.ConsumePartition(ctx, "default", "test-topic", handler) + if err != nil { + log.Printf("Failed to consume message: %v", err) + } + }() + + // 从order实例消费 + go func() { + err := kafka.ConsumePartition(ctx, "order", "order-topic", handler) + if err != nil { + log.Printf("Failed to consume message: %v", err) + } + }() + + select {} +} +``` + +### 3. 配置说明 + +#### Conf 结构体字段说明 + +```go +type Conf struct { + Version string // Kafka 版本号 + RequiredAcks int // 消息确认机制:0=不确认,1=leader确认,-1=所有副本确认 + Topic string // 默认主题 + ConsumeTopic []string // 消费主题列表 + Brokers []string // Kafka broker地址列表 + GroupID string // 消费者组ID + Partitioner string // 分区策略:random/roundrobin/hash +} +``` + +#### 重要方法说明 + +1. `Load()`: 加载配置并初始化 Kafka 管理器 +2. `Close()`: 关闭 Kafka 连接 +3. `GetConfig()`: 获取所有配置信息 + +### 4. 最佳实践 + +1. 在应用启动时调用 `Load()` +2. 在应用退出时调用 `Close()` +3. 使用多个实例时,通过不同的配置名区分 +4. 根据业务需求选择合适的分区策略 +5. 合理设置 `RequiredAcks` 确保消息可靠性 + +### 5. 注意事项 + +1. 配置文件必须位于项目的 `config` 目录下 +2. 配置文件名必须为 `kafka.yaml` +3. 确保配置的 broker 地址可访问 +4. Version 字段要与实际 Kafka 集群版本匹配 +5. 记得在程序退出时调用 `Close()` 释放资源 + +## client 组件的使用 + +使用 `client` 可以快速方便的使用 `kafka` 组件,以下是 Kafka 配置初始化的使用说明和示例。 + +### Method Documentation + +#### 1. Publish + +用于发送消息到指定的 topic。 + +```go +func Publish(ctx context.Context, name string, topic, msg string) error +``` + +**Parameters:** + +- `ctx`: 上下文 +- `name`: kafka 实例名称 +- `topic`: 主题名称 +- `msg`: 消息内容 + +**Example:** + +````go +package main + +import ( + "context" + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + ctx := context.Background() + err := kafka.Publish(ctx, "default", "test-topic", "hello world") + if err != nil { + panic(err) + } +} +```` + +### 2. Consume + +使用消费组模式消费消息。 + +```go +func Consume(ctx context.Context, name string, topics []string, handler sarama.ConsumerGroupHandler) error +``` + +**Parameters:** + +- `ctx`: 上下文 +- `name`: kafka 实例名称 +- `topics`: 主题列表 +- `handler`: 消费组处理器 + +**Example:** + +```go +package main + +import ( + "context" + "github.com/Shopify/sarama" + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +type ConsumerGroupHandler struct { + sarama.ConsumerGroupHandler +} + +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + // 处理消息 + println(string(msg.Value)) + session.MarkMessage(msg, "") + } + return nil +} + +func main() { + ctx := context.Background() + handler := &ConsumerGroupHandler{} + err := kafka.Consume(ctx, "default", []string{"test-topic"}, handler) + if err != nil { + panic(err) + } +} +``` + +### 3. ConsumePartition + +消费指定 topic 的所有分区。 + +```go +func ConsumePartition(ctx context.Context, name, topic string, handler func([]byte) error) error +``` + +**Parameters:** + +- `ctx`: 上下文 +- `name`: kafka 实例名称 +- `topic`: 主题名称 +- `handler`: 消息处理函数 + +**Example:** + +```go +package main + +import ( + "context" + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + ctx := context.Background() + handler := func(data []byte) error { + println(string(data)) + return nil + } + + err := kafka.ConsumePartition(ctx, "default", "test-topic", handler) + if err != nil { + panic(err) + } +} +``` + +### 4. ConsumerByPartitionId + +消费指定 topic 的指定分区。 + +```go +func ConsumerByPartitionId(ctx context.Context, name, topic string, partition int32, handler func([]byte) error) error +``` + +**Parameters:** + +- `ctx`: 上下文 +- `name`: kafka 实例名称 +- `topic`: 主题名称 +- `partition`: 分区 ID +- `handler`: 消息处理函数 + +**Example:** + +```go +package main + +import ( + "context" + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + ctx := context.Background() + handler := func(data []byte) error { + println(string(data)) + return nil + } + + err := kafka.ConsumerByPartitionId(ctx, "default", "test-topic", 0, handler) + if err != nil { + panic(err) + } +} +``` + +### 5. GetPartitionList + +获取指定 topic 的分区列表。 + +```go +func GetPartitionList(ctx context.Context, name, topic string) ([]int32, error) +``` + +**Parameters:** + +- `ctx`: 上下文 +- `name`: kafka 实例名称 +- `topic`: 主题名称 + +**Example:** + +```go +package main + +import ( + "context" + "fmt" + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + ctx := context.Background() + partitions, err := kafka.GetPartitionList(ctx, "default", "test-topic") + if err != nil { + panic(err) + } + + fmt.Printf("Topic partitions: %v\n", partitions) +} +``` + +### 使用场景示例 + +#### 1. 基本生产和消费 + +````go +// 生产者 +err := kafka.Publish(ctx, "default", "test-topic", "message") + +// 消费者(消费组模式) +handler := &ConsumerGroupHandler{} +err = kafka.Consume(ctx, "default", []string{"test-topic"}, handler) +```` + +#### 2. 分区消费 + +````go +// 消费所有分区 +err := kafka.ConsumePartition(ctx, "default", "test-topic", handler) + +// 消费指定分区 +err = kafka.ConsumerByPartitionId(ctx, "default", "test-topic", 0, handler) + +// 获取分区信息 +partitions, err := kafka.GetPartitionList(ctx, "default", "test-topic") +```` + +#### 3. 完整流程示例 + +````go +package main + +import ( + "context" + "log" + + "github.com/go-eagle/eagle/pkg/queue/kafka" +) + +func main() { + ctx := context.Background() + + // 1. 获取分区信息 + partitions, err := kafka.GetPartitionList(ctx, "default", "test-topic") + if err != nil { + panic(err) + } + + // 2. 发送消息 + err = kafka.Publish(ctx, "default", "test-topic", "test message") + if err != nil { + panic(err) + } + + // 3. 消费消息 + handler := func(data []byte) error { + log.Printf("Received message: %s", string(data)) + return nil + } + + // 可以选择以下任一方式消费: + + // 3.1 消费组模式 + groupHandler := &ConsumerGroupHandler{} + go kafka.Consume(ctx, "default", []string{"test-topic"}, groupHandler) + + // 3.2 消费所有分区 + go kafka.ConsumePartition(ctx, "default", "test-topic", handler) + + // 3.3 消费指定分区 + for _, partition := range partitions { + go kafka.ConsumerByPartitionId(ctx, "default", "test-topic", partition, handler) + } + + // 防止程序退出 + select {} +} +```` + +这些方法提供了灵活的消息生产和消费方式,可以根据实际需求选择合适的使用方式。 + +## 独立组件的使用 + +### 生产者(Producer) Example + +如何创建并使用 Kafka producer: + +```go +package main + +import ( + "context" + + "github.com/go-eagle/eagle/pkg/queue/kafka" + "github.com/go-eagle/eagle/pkg/log" +) + +func main() { + // Create producer config + conf := &kafka.Conf{ + Brokers: []string{"localhost:9092"}, + Version: "2.8.1", + RequiredAcks: 1, // 0: no response, 1: wait for leader, -1: wait for all + Partitioner: "hash", // options: random, roundrobin, hash + } + + // Create producer + producer, err := kafka.NewProducer(conf, log.GetLogger()) + if err != nil { + panic(err) + } + defer producer.Close() + + // Publish message + ctx := context.Background() + err = producer.Publish(ctx, "test-topic", "Hello World") + if err != nil { + panic(err) + } +} +``` + +### 消费组(Consumer Group) Example + +Here's how to use consumer groups for parallel processing: + +```go +package main + +import ( + "context" + + "github.com/go-eagle/eagle/pkg/queue/kafka" + "github.com/go-eagle/eagle/pkg/log" +) + +func main() { + // Create consumer config + conf := &kafka.Conf{ + Brokers: []string{"localhost:9092"}, + Version: "2.8.1", + GroupID: "my-group-id", + } + + // Create consumer + consumer, err := kafka.NewConsumer(conf, log.GetLogger()) + if err != nil { + panic(err) + } + defer consumer.Stop() + + // Create handler + handler := &kafka.ConsumerGroupHandler{} + + // Start consuming + ctx := context.Background() + topics := []string{"test-topic"} + err = consumer.Consume(ctx, topics, handler) + if err != nil { + panic(err) + } +} +``` + +### 基于分区的消费者(Partition-Based Consumer) Example + +Here's how to consume messages from specific partitions: + +```go +package main + +import ( + "context" + + "github.com/go-eagle/eagle/pkg/queue/kafka" + "github.com/go-eagle/eagle/pkg/log" +) + +func main() { + conf := &kafka.Conf{ + Brokers: []string{"localhost:9092"}, + Version: "2.8.1", + } + + consumer, err := kafka.NewConsumer(conf, log.GetLogger()) + if err != nil { + panic(err) + } + defer consumer.Stop() + + ctx := context.Background() + + // Consume from all partitions + err = consumer.ConsumePartition(ctx, "test-topic", func(data []byte) error { + // Handle message + log.Printf("Received message: %s", string(data)) + return nil + }) + + // Consume from specific partition + err = consumer.ConsumerByPartitionId(ctx, "test-topic", 0, func(data []byte) error { + // Handle message + log.Printf("Received message from partition 0: %s", string(data)) + return nil + }) +} +``` + +### Features + +#### 消费模式 + +1. **消费组模式** + - 多个消费者可以在同一个组内并行工作 + - 消息会自动分配给各个消费者 + - 通过 `consumer.Consume()` 使用 + +2. **分区模式** + - 消费所有分区:使用 `consumer.ConsumePartition()` + - 消费指定分区:使用 `consumer.ConsumerByPartitionId()` + +#### 生产者特性 + +1. **异步生产** + - 消息以异步方式生产以提高性能 + - 通过内部 goroutine 处理成功/错误 + +2. **分区策略** + - Random(随机):消息随机分配到各个分区 + - RoundRobin(轮询):消息均匀分配到各个分区 + - Hash(哈希):基于消息 key 的哈希值分配到分区 + +### Configuration Options + +#### Producer Config + +```go +type Conf struct { + Brokers []string // Kafka broker addresses + Version string // Kafka version + RequiredAcks int16 // Required acknowledgments + Partitioner string // Partitioning strategy +} +``` + +#### Consumer Config + +```go +type Conf struct { + Brokers []string // Kafka broker addresses + Version string // Kafka version + GroupID string // Consumer group ID +} +``` + +### 最佳实践 + +1. 始终正确关闭生产者和消费者 +2. 使用消费组实现并行处理 +3. 处理成功/错误通道中的错误 +4. 配置合适的消息确认级别 +5. 根据使用场景选择适当的分区策略 +6. 设置适当的错误处理和日志记录 + +### 注意事项 + +- 生产者实现为异步模式以提高性能 +- 消费组的重新平衡是自动处理的 +- 默认从最新偏移量开始消费消息 +- 重连和错误处理已内置在实现中 ## Reference diff --git a/pkg/queue/kafka/client.go b/pkg/queue/kafka/client.go new file mode 100644 index 0000000..b9dbdd4 --- /dev/null +++ b/pkg/queue/kafka/client.go @@ -0,0 +1,56 @@ +package kafka + +import ( + "context" + + "github.com/IBM/sarama" +) + +// Publish add data to queue +func Publish(ctx context.Context, name string, topic, msg string) error { + p, err := DefaultManager.GetProducer(name) + if err != nil { + return err + } + return p.Publish(ctx, topic, msg) +} + +// Consume data from queue +func Consume(ctx context.Context, name string, topics []string, handler sarama.ConsumerGroupHandler) error { + c, err := DefaultManager.GetConsumer(name) + if err != nil { + return err + } + + return c.Consume(ctx, topics, handler) +} + +// ConsumeByPartition consume data by partition +func ConsumePartition(ctx context.Context, name, topic string, handler func([]byte) error) error { + c, err := DefaultManager.GetConsumer(name) + if err != nil { + return err + } + + return c.ConsumePartition(ctx, topic, handler) +} + +// ConsumerByPartitionId consume data by partition id +func ConsumerByPartitionId(ctx context.Context, name, topic string, partition int32, handler func([]byte) error) error { + c, err := DefaultManager.GetConsumer(name) + if err != nil { + return err + } + + return c.ConsumerByPartitionId(ctx, topic, partition, handler) +} + +// GetPartitionList get partition list +func GetPartitionList(ctx context.Context, name, topic string) ([]int32, error) { + c, err := DefaultManager.GetConsumer(name) + if err != nil { + return nil, err + } + + return c.client.Partitions(topic) +} diff --git a/pkg/queue/kafka/config.go b/pkg/queue/kafka/config.go new file mode 100644 index 0000000..be735da --- /dev/null +++ b/pkg/queue/kafka/config.go @@ -0,0 +1,63 @@ +package kafka + +import ( + "sync" + + cfg "github.com/go-eagle/eagle/pkg/config" +) + +var ( + loadOnce sync.Once + closeOnce sync.Once + conf map[string]*Conf +) + +type Conf struct { + Version string `yaml:"Version"` + RequiredAcks int `yaml:"RequiredAcks"` + Topic string `yaml:"Topic"` + ConsumeTopic []string `yaml:"VonsumeTopic"` + Brokers []string `yaml:"Brokers"` + GroupID string `yaml:"GroupID"` + // partitioner type,optional: "random", "roundrobin", "hash" + Partitioner string `yaml:"Partitioner"` +} + +// loadConf load config +func loadConf() (ret map[string]*Conf, err error) { + v, err := cfg.LoadWithType("kafka", "yaml") + if err != nil { + return nil, err + } + + c := make(map[string]*Conf, 0) + err = v.Unmarshal(&c) + if err != nil { + return nil, err + } + + conf = c + + return c, nil +} + +func GetConfig() map[string]*Conf { + return conf +} + +func Load() { + loadOnce.Do(func() { + conf, err := loadConf() + if err != nil { + panic(err) + } + + DefaultManager = NewManager(conf) + }) +} + +func Close() { + closeOnce.Do(func() { + _ = DefaultManager.Close() + }) +} diff --git a/pkg/queue/kafka/consumer.go b/pkg/queue/kafka/consumer.go index fb60856..e54bb6a 100644 --- a/pkg/queue/kafka/consumer.go +++ b/pkg/queue/kafka/consumer.go @@ -3,82 +3,144 @@ package kafka import ( "context" "fmt" - "log" - "os" - "github.com/Shopify/sarama" - - logger "github.com/go-eagle/eagle/pkg/log" + "github.com/IBM/sarama" + "github.com/go-eagle/eagle/pkg/log" ) // Consumer kafka consumer type Consumer struct { - group sarama.ConsumerGroup - topics []string - groupID string - handler sarama.ConsumerGroupHandler - - ctx context.Context - cancel context.CancelFunc + client sarama.Client + logger log.Logger + group sarama.ConsumerGroup + consumer sarama.Consumer } // NewConsumer create a consumer // nolint -func NewConsumer(config *sarama.Config, logger *log.Logger, topic string, groupID string, brokers []string, handler *ConsumerGroupHandler) *Consumer { - // Init config, specify appropriate versio - sarama.Logger = log.New(os.Stderr, "[sarama_logger]", log.LstdFlags) - sarama.Logger = logger - config.Version = sarama.V2_0_0_0 // V2_4_0_0 +func NewConsumer(conf *Conf, logger log.Logger) (*Consumer, error) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetNewest + config.Consumer.Retry.Backoff = 100 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + version, err := sarama.ParseKafkaVersion(conf.Version) + if err != nil { + return nil, fmt.Errorf("kafka: parse version %s error: %v", conf.Version, err) + } + config.Version = version + + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("kafka: consumer config validate error: %v", err) + } // Start with a client - client, err := sarama.NewClient(brokers, config) + client, err := sarama.NewClient(conf.Brokers, config) if err != nil { - panic(err) + return nil, fmt.Errorf("kafka: create client error: %v", err) } // Start a new consumer group - group, err := sarama.NewConsumerGroupFromClient(groupID, client) + group, err := sarama.NewConsumerGroupFromClient(conf.GroupID, client) if err != nil { - panic(err) + return nil, fmt.Errorf("kafka: create consumer group error: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) - return &Consumer{ - group: group, - topics: []string{topic}, - groupID: groupID, - handler: handler, - ctx: ctx, - cancel: cancel, + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + return nil, fmt.Errorf("kafka: create consumer from client error: %v", err) } + + return &Consumer{ + client: client, + logger: logger, + group: group, + consumer: consumer, + }, nil } // Consume consume data -func (c *Consumer) Consume() { +// This method consumes messages from the specified topics using a consumer group. +// It tracks errors in a separate goroutine and logs them. +// The handler function is called for each message received. +func (c *Consumer) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error { // Track errors go func() { for err := range c.group.Errors() { - fmt.Println("ERROR", err) + c.logger.Errorf("kafka: Consume group errors: %v", err) } }() // Iterate over consumer sessions. - ctx := context.Background() for { select { - case <-c.ctx.Done(): - _ = c.group.Close() - logger.Info("[Kafka] Consume ctx done") - return + case <-ctx.Done(): + c.logger.Info("kafka: Consume ctx done") + return nil default: - if err := c.group.Consume(ctx, c.topics, c.handler); err != nil { - logger.Errorf("[Kafka] Consume err: %s", err.Error()) + if err := c.group.Consume(ctx, topics, handler); err != nil { + c.logger.Errorf("kafka: Group Consume err: %v", err) } } } } -// Stop close conn -func (c Consumer) Stop() { - c.cancel() +// ConsumePartition consume data by partition +// This method consumes messages from all partitions of the specified topic. +// It creates a separate goroutine for each partition to handle messages concurrently. +// The handler function is called for each message received. +func (c *Consumer) ConsumePartition(ctx context.Context, topic string, handler func([]byte) error) error { + partitionList, err := c.consumer.Partitions(topic) + if err != nil { + return err + } + + for _, partition := range partitionList { + pc, err := c.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + return err + } + + go func(pc sarama.PartitionConsumer) { + for msg := range pc.Messages() { + if err := handler(msg.Value); err != nil { + // handle error + c.logger.Errorf("kafka: ConsumePartition message error, topic %s, partition %d, err: %v", topic, msg.Partition, err) + continue + } + } + }(pc) + } + + return nil +} + +// Consumer by partition id +func (c *Consumer) ConsumerByPartitionId(ctx context.Context, topic string, partition int32, handler func([]byte) error) error { + pc, err := c.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + return nil + } + + for msg := range pc.Messages() { + if err := handler(msg.Value); err != nil { + // handle error + c.logger.Errorf("kafka: ConsumerByPartitionId message error, topic %s partition %d err: %v", topic, partition, err) + continue + } + } + return nil +} + +// Stop close conn +func (c *Consumer) Stop() { + if c.consumer != nil { + c.consumer.Close() + } + if c.group != nil { + c.group.Close() + } + if c.client != nil { + c.client.Close() + } } diff --git a/pkg/queue/kafka/consumer_handler.go b/pkg/queue/kafka/consumer_handler.go index 1bf77de..fecea9e 100644 --- a/pkg/queue/kafka/consumer_handler.go +++ b/pkg/queue/kafka/consumer_handler.go @@ -4,19 +4,19 @@ import ( "fmt" "log" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) // ConsumerGroupHandler represents the sarama consumer group type ConsumerGroupHandler struct{} // Setup is run before consumer start consuming, is normally used to setup things such as database connections -func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { +func (h ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited -func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { +func (h ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/pkg/queue/kafka/manager.go b/pkg/queue/kafka/manager.go new file mode 100644 index 0000000..f8e84e1 --- /dev/null +++ b/pkg/queue/kafka/manager.go @@ -0,0 +1,81 @@ +package kafka + +import ( + "fmt" + "sync" + + "github.com/go-eagle/eagle/pkg/log" +) + +var ( + DefaultManager *Manager +) + +type Manager struct { + opts map[string]*Conf + + cmu *sync.RWMutex + pmu *sync.RWMutex + + consumers map[string]*Consumer + publishers map[string]*Producer +} + +func NewManager(opts map[string]*Conf) *Manager { + return &Manager{ + opts: opts, + cmu: &sync.RWMutex{}, + pmu: &sync.RWMutex{}, + consumers: make(map[string]*Consumer, 0), + publishers: make(map[string]*Producer, 0), + } +} + +func (c *Manager) GetProducer(name string) (*Producer, error) { + c.pmu.Lock() + defer c.pmu.Unlock() + + if _, ok := c.publishers[name]; ok { + return c.publishers[name], nil + } + if conf, ok := c.opts[name]; ok { + publisher, err := NewProducer(conf, log.GetLogger()) + if err != nil { + return nil, err + } + c.publishers[name] = publisher + return publisher, nil + } + return nil, fmt.Errorf("kafka: GetPublisher error, config %s not found", name) +} + +func (c *Manager) GetConsumer(name string) (*Consumer, error) { + c.cmu.Lock() + defer c.cmu.Unlock() + + if _, ok := c.consumers[name]; ok { + return c.consumers[name], nil + } + if conf, isOk := c.opts[name]; isOk { + consumer, err := NewConsumer(conf, log.GetLogger()) + if err != nil { + return nil, err + } + c.consumers[name] = consumer + return consumer, nil + } + return nil, fmt.Errorf("kafka: GetConsumer error, config %s not found", name) +} + +func (c *Manager) Close() error { + for _, consumer := range c.consumers { + _ = consumer.consumer.Close() + if consumer.group != nil { + _ = consumer.group.Close() + } + } + for _, publisher := range c.publishers { + _ = publisher.Close() + } + return nil +} diff --git a/pkg/queue/kafka/producer.go b/pkg/queue/kafka/producer.go index eb30c7e..ca59038 100644 --- a/pkg/queue/kafka/producer.go +++ b/pkg/queue/kafka/producer.go @@ -1,74 +1,103 @@ package kafka import ( + "context" + "fmt" "log" "os" "os/signal" - "time" - - "github.com/Shopify/sarama" + "github.com/IBM/sarama" logger "github.com/go-eagle/eagle/pkg/log" ) // Producer kafka producer type Producer struct { asyncProducer sarama.AsyncProducer - topic string enqueued int + logger logger.Logger } // NewProducer create producer // nolint -func NewProducer(config *sarama.Config, logger *log.Logger, topic string, brokers []string) *Producer { - sarama.Logger = logger +func NewProducer(conf *Conf, logger logger.Logger) (*Producer, error) { + config := sarama.NewConfig() + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.RequiredAcks(conf.RequiredAcks) + config.Producer.Retry.Max = 3 + config.Producer.Partitioner = getPartitoner(conf) - // Start a new async producer - producer, err := sarama.NewAsyncProducer(brokers, config) - if err != nil { - panic(err) + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("kafka: producer config validate error: %v", err) } - log.Println("Kafka AsyncProducer up and running!") + // Start a new async producer + producer, err := sarama.NewAsyncProducer(conf.Brokers, config) + if err != nil { + return nil, err + } + + log.Println("kafka: AsyncProducer up and running!") p := &Producer{ asyncProducer: producer, - topic: topic, + logger: logger, } go p.asyncDealMessage() - return p + return p, nil } func (p *Producer) asyncDealMessage() { for { select { case res := <-p.asyncProducer.Successes(): - logger.Info("push msg success", "topic is", res.Topic, "partition is ", res.Partition, "offset is ", res.Offset) + p.logger.Info("kafka: push msg success", "topic is", res.Topic, "partition is ", res.Partition, "offset is ", res.Offset) case err := <-p.asyncProducer.Errors(): - logger.Info("push msg failed", "err is ", err.Error()) + p.logger.Error("kafka: push msg failed", "err is ", err.Error()) } } } // Publish push data to queue -func (p *Producer) Publish(message string) { +func (p *Producer) Publish(ctx context.Context, topic string, message string) error { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) for { - time.Sleep(5 * time.Second) - message := &sarama.ProducerMessage{Topic: p.topic, Value: sarama.StringEncoder(message)} + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(message), + } select { case p.asyncProducer.Input() <- message: p.enqueued++ - logger.Infof("New message publish: %s", message.Value) + p.logger.Infof("kafka: New message publish: %s", message.Value) case <-signals: p.asyncProducer.AsyncClose() // Trigger a shutdown of the producer. - logger.Infof("Kafka AsyncProducer finished with %d messages produced.", p.enqueued) - return + p.logger.Infof("kafka: AsyncProducer finished with %d messages produced.", p.enqueued) + return nil } } } + +// Close closes the producer +func (p *Producer) Close() error { + return p.asyncProducer.Close() +} + +// getPartitoner returns the partitioner constructor based on the configuration +func getPartitoner(conf *Conf) sarama.PartitionerConstructor { + switch conf.Partitioner { + case "random": + return sarama.NewRandomPartitioner + case "roundrobin": + return sarama.NewRoundRobinPartitioner + case "hash": + return sarama.NewHashPartitioner + default: + return sarama.NewRandomPartitioner + } +}