diff --git a/broker.go b/broker.go index 7846dcf..5eb3a91 100644 --- a/broker.go +++ b/broker.go @@ -191,7 +191,11 @@ func (b *Broker) Start(ctx context.Context) error { fmt.Println("Error accepting connection:", err) continue } - go ReadFromConn(ctx, conn, b.opts.messageHandler, b.opts.closeHandler, b.opts.errorHandler) + go ReadFromConn(ctx, conn, Handlers{ + MessageHandler: b.opts.messageHandler, + CloseHandler: b.opts.closeHandler, + ErrorHandler: b.opts.errorHandler, + }) } } diff --git a/consumer.go b/consumer.go index 70090a7..40d457e 100644 --- a/consumer.go +++ b/consumer.go @@ -162,7 +162,11 @@ func (c *Consumer) Consume(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - ReadFromConn(ctx, c.conn, c.opts.messageHandler, c.opts.closeHandler, c.opts.errorHandler) + ReadFromConn(ctx, c.conn, Handlers{ + MessageHandler: c.opts.messageHandler, + CloseHandler: c.opts.closeHandler, + ErrorHandler: c.opts.errorHandler, + }) fmt.Println("Stopping consumer") }() for _, q := range c.queues { diff --git a/ctx.go b/ctx.go index 194238e..a6312ad 100644 --- a/ctx.go +++ b/ctx.go @@ -21,6 +21,18 @@ type Message struct { Data json.RawMessage `json:"data"` } +type MessageHandler func(context.Context, net.Conn, []byte) error + +type CloseHandler func(context.Context, net.Conn) error + +type ErrorHandler func(context.Context, net.Conn, error) + +type Handlers struct { + MessageHandler MessageHandler + CloseHandler CloseHandler + ErrorHandler ErrorHandler +} + func IsClosed(conn net.Conn) bool { _, err := conn.Read(make([]byte, 1)) if err != nil { @@ -101,16 +113,10 @@ func Write(ctx context.Context, conn net.Conn, data any) error { return err } -type MessageHandler func(context.Context, net.Conn, []byte) error - -type CloseHandler func(context.Context, net.Conn) error - -type ErrorHandler func(context.Context, net.Conn, error) - -func ReadFromConn(ctx context.Context, conn net.Conn, handler MessageHandler, closeHandler CloseHandler, errorHandler ErrorHandler) { +func ReadFromConn(ctx context.Context, conn net.Conn, handlers Handlers) { defer func() { - if closeHandler != nil { - if err := closeHandler(ctx, conn); err != nil { + if handlers.CloseHandler != nil { + if err := handlers.CloseHandler(ctx, conn); err != nil { fmt.Println("Error in close handler:", err) } } @@ -123,8 +129,8 @@ func ReadFromConn(ctx context.Context, conn net.Conn, handler MessageHandler, cl if err == io.EOF || IsClosed(conn) || strings.Contains(err.Error(), "closed network connection") { break } - if errorHandler != nil { - errorHandler(ctx, conn, err) + if handlers.ErrorHandler != nil { + handlers.ErrorHandler(ctx, conn, err) } continue } @@ -135,17 +141,17 @@ func ReadFromConn(ctx context.Context, conn net.Conn, handler MessageHandler, cl var msg Message err = json.Unmarshal(messageBytes, &msg) if err != nil { - if errorHandler != nil { - errorHandler(ctx, conn, err) + if handlers.ErrorHandler != nil { + handlers.ErrorHandler(ctx, conn, err) } continue } ctx = SetHeaders(ctx, msg.Headers) - if handler != nil { - err = handler(ctx, conn, msg.Data) + if handlers.MessageHandler != nil { + err = handlers.MessageHandler(ctx, conn, msg.Data) if err != nil { - if errorHandler != nil { - errorHandler(ctx, conn, err) + if handlers.ErrorHandler != nil { + handlers.ErrorHandler(ctx, conn, err) } continue } diff --git a/examples/ca.crt b/examples/certs/ca.crt similarity index 100% rename from examples/ca.crt rename to examples/certs/ca.crt diff --git a/examples/ca.key b/examples/certs/ca.key similarity index 100% rename from examples/ca.key rename to examples/certs/ca.key diff --git a/examples/server.crt b/examples/certs/server.crt similarity index 100% rename from examples/server.crt rename to examples/certs/server.crt diff --git a/examples/server.key b/examples/certs/server.key similarity index 100% rename from examples/server.key rename to examples/certs/server.key diff --git a/examples/consumer.crt b/examples/consumer.crt deleted file mode 100644 index 5b637ed..0000000 --- a/examples/consumer.crt +++ /dev/null @@ -1,21 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDXjCCAkagAwIBAgIRAMFvjYXNdG0zTX15qxFE+V0wDQYJKoZIhvcNAQELBQAw -bjELMAkGA1UEBhMCVVMxCTAHBgNVBAgTADEQMA4GA1UEBxMHTXkgQ2l0eTESMBAG -A1UECRMJTXkgU3RyZWV0MQ4wDAYDVQQREwUwMDAwMDEOMAwGA1UEChMFTXkgQ0Ex -DjAMBgNVBAMTBU15IENBMB4XDTI0MTAwMTA1NDMxN1oXDTI1MTAwMTA1NDMxN1ow -EzERMA8GA1UEAxMIY29uc3VtZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK -AoIBAQD4YBRg07h+khv0tFZTkhHXPlR+rXPoCVKpnlHn074d+3DVmTCWQWt5gB9s -Ow62PtG2P5OGPuoTiVIDRqafZI4Mq12LvZYpIjaWEp1k0nWrlCYU6mMixsmxnHXK -fcZBzulIE1LUyGHWpgo6J/B0aHdvsQy3ogzxnjiAkyvFkL9LqgcL0sTcItkwSqR2 -ieQMNHiQ4aaKSEw+1FJ7Ls8D0qyjZLZ4F6dLba2sdWnhUbxTDccradOq7kpacQLG -/KneGbfHWT+s2/PnLPlafM6jvZYu1EtAYMZjDTYyIM4L4L4JcFBTYTu4CcRumTre -DhyDtVlwSZpmYHM+nZBwEEqBWYJFAgMBAAGjUjBQMA4GA1UdDwEB/wQEAwIHgDAd -BgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwHwYDVR0jBBgwFoAU4iyCqPim -9ktTiugod6d/mYIWXxMwDQYJKoZIhvcNAQELBQADggEBALx6LEZBgDpedNMG7NJE -0YqfUPJc+6XBaX5Gcb0ylUtMv8LS7g3I4NT4tG8muJNV4QN4p8SkV7oufeaU1F85 -YSS11k/lT4OItxMiZZmZ1+5DnESOP0HlSnT1gljpqBHdjymJi83Ls/qQ2GEer7x6 -IOM/N5fwi+1tmVcWHL0v+J0JF+4szz0SEXPc5ML/9j5oiAeg/YycfWXxZHPWsvPj -TX3RcltYOwyX20MAOT5Zk6SqhDOMAr3K48/4Fy4FDGwLnDU53SQCH5Beu1YgG9oW -pSTOLk0D6FLgJUVNAd8e6YA7o4mESW/3sK6I52VNr6JwRRnGZKC41Qga7XBdkLpI -nk8= ------END CERTIFICATE----- diff --git a/examples/consumer.go b/examples/consumer.go index 708a0cd..8da8815 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -9,6 +9,7 @@ import ( func main() { consumer := mq.NewConsumer("consumer-1") + // consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) consumer.RegisterHandler("queue1", tasks.Node1) consumer.RegisterHandler("queue2", tasks.Node2) consumer.Consume(context.Background()) diff --git a/examples/consumer.key b/examples/consumer.key deleted file mode 100644 index 1810943..0000000 --- a/examples/consumer.key +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpQIBAAKCAQEA+GAUYNO4fpIb9LRWU5IR1z5Ufq1z6AlSqZ5R59O+Hftw1Zkw -lkFreYAfbDsOtj7Rtj+Thj7qE4lSA0amn2SODKtdi72WKSI2lhKdZNJ1q5QmFOpj -IsbJsZx1yn3GQc7pSBNS1Mhh1qYKOifwdGh3b7EMt6IM8Z44gJMrxZC/S6oHC9LE -3CLZMEqkdonkDDR4kOGmikhMPtRSey7PA9Kso2S2eBenS22trHVp4VG8Uw3HK2nT -qu5KWnECxvyp3hm3x1k/rNvz5yz5WnzOo72WLtRLQGDGYw02MiDOC+C+CXBQU2E7 -uAnEbpk63g4cg7VZcEmaZmBzPp2QcBBKgVmCRQIDAQABAoIBAQCDL4Qz7D/bImL/ -qaya8WDY9kP8sLKykRIHIucR8mXNQjxDpWjjQ+R3RTPTn5HSsnVjtErAMkTCUtpt -foiNUbgxeuWq5FUGntqEA4r8cKjUGijPDUmVWbe4RvJ0JGBt69KNTh+G4dvDWum1 -89huM6s55CLabw41JCOlzZXmAOD5Hee+5w9HxqboLSFO93wBMT1olOLKsTI0lY3z -dDNibkvk/USPHzR5sesS+9wq9IbuxK7n6mJs0/2SYRAu7C+LYbDwhcZ5dwLXg83R -n2X4PDYtKRM5rTSwcDhY2fcfPKIPSJ/lBR7uRZgheoLyCAzBUslFN8DZeC+ieDEi -Gj2FalGhAoGBAP5qONtobitWHjqeMuIz2RkLPxjwIvMj2+tIKb1F1ZkXA/ePwjDZ -B6WMUF5KhtUsuzWHNS3ZZFB7utMirJoM5RxemiP3X7E2+vEiH0VMBGu75MKZuAP+ -7qXnodIoWxEhKr59iqzdPGxrgJGpj3cR0F2TPIvJykWXS0nDyFc4aq09AoGBAPns -OYIoh5U2FCyjWCGtqPxkZ3lZPerJR6acA7l4dQf8opXFJ4+ZpPDUfOHTxZngvWTX -dI0Ivox2rCsSSmDkMhzDwmRVEoAoK4st0U2taGcEOJtz2vr8MlGoJQnT+nUuikBr -WwlUuQSB5a4ANSSBsAMLa+Xy5LoEmLFgA7THjgmpAoGBAKbaOUHEIoSvbRHakNqD -UH05P/919iXpvaB36k/kjepiCssAcEYi//3VoRvF2tnSBVFcxEa2jTCvhN5VnkdF -77iEXqj54rtRqJAeZc3Hxhp11ti9gc61EgSifiFbMCZyzxqxMRKf4bHlayOcxac8 -ep/0IUA4bjzntYvCKBDzwhqtAoGAa9k1g+Zrg8c/c3fVm0ruZmQJhwMsxfmv8sTW -kp3ZmuW5X3ohtUUvOUHo1ibl2Z5y/GZBhG7mk4TOjROqhx9SRLlxVIylnCo5iCjw -bl3LdNEMgIDBWZelOzmdKh2QsiqwPFZXhbJK2RgY/jpWtHdGdjniiOuFKoS7Q6fU -UZnfwGkCgYEAuuQLQQD3Nsdd9hkXqu0rX6zBubMPYSIuCA18FjlDBbP8JGzD4tKC -eJpraeJMqaTAVpoEU0zwSePbaCsSR7iAdJuGoR3G1Y0KIpqJD0Qz4QP064fz9WNo -0U6JfMBRi6k35TRQE9iYIAjP+Iewooc90DV5sWbpEpvVKvjvJGh8ZN4= ------END RSA PRIVATE KEY----- diff --git a/examples/consumer_tls.go b/examples/consumer_tls.go deleted file mode 100644 index fc1ef35..0000000 --- a/examples/consumer_tls.go +++ /dev/null @@ -1,15 +0,0 @@ -package main - -import ( - "context" - - "github.com/oarkflow/mq" - "github.com/oarkflow/mq/examples/tasks" -) - -func main() { - consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "consumer.crt", "consumer.key")) - consumer.RegisterHandler("queue1", tasks.Node1) - consumer.RegisterHandler("queue2", tasks.Node2) - consumer.Consume(context.Background()) -} diff --git a/examples/publisher.crt b/examples/publisher.crt deleted file mode 100644 index 6e205be..0000000 --- a/examples/publisher.crt +++ /dev/null @@ -1,21 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDXzCCAkegAwIBAgIRANiBUeaWr8u7wGyCMy1E+ckwDQYJKoZIhvcNAQELBQAw -bjELMAkGA1UEBhMCVVMxCTAHBgNVBAgTADEQMA4GA1UEBxMHTXkgQ2l0eTESMBAG -A1UECRMJTXkgU3RyZWV0MQ4wDAYDVQQREwUwMDAwMDEOMAwGA1UEChMFTXkgQ0Ex -DjAMBgNVBAMTBU15IENBMB4XDTI0MTAwMTA1NDMxNloXDTI1MTAwMTA1NDMxNlow -FDESMBAGA1UEAxMJcHVibGlzaGVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB -CgKCAQEApu84WKcbivql7d/torz6LCHHW/6aq7peXyYea+cquOclqhlvouVgRm1G -Y2Ef8qVmjtPMqksvcRE9+xTZRi5L1AXFehI3CHJyg2RFH3RBqbZDoF886w/NyMIx -rjh6iLzFFkcWMndpgSdMMxmtD47av04qsQCTBfFeO8obtqPEY7O3ps9erPmvKoex -+xzlK5LLWm/1lFF4kZQVKynvKhiRLB/L4HprpuzlyVRykxJXyM8TbiZoK97jISDE -32vQoZrME+jSUs3EyxEft0E/ZVPT8ImLfZnJp9FDrr9CpekHjBUYG3Hwvca//tWh -GzwVTpdnd1oIfSWKxM7APAh60K4ZDwIDAQABo1IwUDAOBgNVHQ8BAf8EBAMCB4Aw -HQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB8GA1UdIwQYMBaAFOIsgqj4 -pvZLU4roKHenf5mCFl8TMA0GCSqGSIb3DQEBCwUAA4IBAQBI05MwvoC3v9K2hUIY -jFrFqnkJGF1cswOlmr5vPUvyH89JeM4IpPXiWCPZfVe2ABitfqDj+qzuHJvcZdLi -CcGnnuUtb5M6Fjgn0TLGU9W0/0rjL7/8mweI79Kk2uhViKNO4YVnKtTKekTZ0T/z -jeesF9V7TxhDt1dZ5VytNSYYuFVpSEkC1W5guhntpdqTrtAEInuvL3OTogjuOrzU -lNemWoRVicIYCo1gjgjAOvIZlpjDc8f6lKWowo38Shq3y/m7LsHb6yxBwB5HGTyu -Y0+5xpr+PTsB+KRTvGr6R9I3MupAv9brHSG79Y8J4EgUiWtrmm5DzzK0aH8BAMwh -IQU8 ------END CERTIFICATE----- diff --git a/examples/publisher.go b/examples/publisher.go index a74281b..3686e84 100644 --- a/examples/publisher.go +++ b/examples/publisher.go @@ -13,6 +13,7 @@ func main() { Payload: payload, } publisher := mq.NewPublisher("publish-1") + // publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) err := publisher.Publish(context.Background(), "queue1", task) if err != nil { panic(err) diff --git a/examples/publisher.key b/examples/publisher.key deleted file mode 100644 index 9a90409..0000000 --- a/examples/publisher.key +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEApu84WKcbivql7d/torz6LCHHW/6aq7peXyYea+cquOclqhlv -ouVgRm1GY2Ef8qVmjtPMqksvcRE9+xTZRi5L1AXFehI3CHJyg2RFH3RBqbZDoF88 -6w/NyMIxrjh6iLzFFkcWMndpgSdMMxmtD47av04qsQCTBfFeO8obtqPEY7O3ps9e -rPmvKoex+xzlK5LLWm/1lFF4kZQVKynvKhiRLB/L4HprpuzlyVRykxJXyM8TbiZo -K97jISDE32vQoZrME+jSUs3EyxEft0E/ZVPT8ImLfZnJp9FDrr9CpekHjBUYG3Hw -vca//tWhGzwVTpdnd1oIfSWKxM7APAh60K4ZDwIDAQABAoIBADK1kGQ9vvwkz6rG -zyUUStqFGE41tT5dSrWUmsEkY7HvZV2ahHLzQp+iKjOeGVpYLCNO62j/ldW+6xkO -aEINZjNt3WMLjXQfb/HBz82SPzASkdBxncu57UUC2JyXw4Et9qChz3hdFcuJ6HfN -YOjM+F4MnKNQDVlMzNdUub5tWQb/3u+jWMbJLmdieyjHCFac9g6ACYMe09ODkYGU -ES//0v6+v1st3RKRKXbKAsCS7v3vbR2WFzJ+D/rZDwoQDQs1R9QGOdZoFcO4acFW -joNGhBIrrsIy35VlJoX7tGsJmssiiigEp+5ePzwv9vO8z2u0JqiOhM+Qlso9JEMs -UhIvTMECgYEA3aR2YccwSQ9Nn/0qpEISRgy9oRdv8QRDIeUYRyIy0jEndBy77Sn3 -+T/Yn0POA9ZrKV957Qa8HsDoZtAgiPi17lrp5XpMlz3z9WUnDgMD6DSNY82RAKES -jBJWcmN1kUB4PprBc3sD+po9qgGtmCLevwCclMM/tNB/c2l/O6yohJcCgYEAwM/B -jZzgOK44phZm5b7K/gg68gyJ/3jaK4fWDqViIfZ5Apy9Ts0Dyeu7/51gn6Z9fZCC -jNTuEHBboPBKG74OVh50LTx4/0/thBtEUkenoO/I6YcsM8D8zWE6iocXx0xrJS86 -phAELYVRTWoO1YhC46iD+tLGbdyZLq0vhnN5RkkCgYEAmbkNWjeap7FzdzlRr+cb -YXZIt0fYHeOE5CdOtVdkxB/Pa1rahF0NXdQIXD3czxCR9nn/yINZSkMlbNmBFz7Y -f96SRtSR9nvDjjl/4tn4hb6dKdFTdopAoOG/D0soKXv7agBPl9aEJfWAOz86tT/K -GdNExnC86J2LJ/LNigfwQTUCgYEAi0CA5an7NnR9bVaYki0tpjKDf/UWZW//AZll -O8auDSFZXE2fW54tslOSv5YGBsfTsjAAWX4fQbgkNSPWIhstd30fItyd9qbfP54B -xfsCUfVcAW0iu24SJrKl+Q6AWewTJhPqI61FlyrGAUcr14RF6u32A9zeetHrwA5M -WKplWCECgYAiPjT2mYE0eWGNgfSkH+7y/q5avGv8Hovq+Z2Z3NtXDd+S2xktXmmp -5/MtGohgz5602D0k+nc9H3LrihlK4RxFxI9XA51XxnFce11PdazS5Qh0NKXyJrcF -fOSRcExDqRSPUCkJK6dfzbzWTFwFRizVe0itvIlqMKp0qIVpxQxfSw== ------END RSA PRIVATE KEY----- diff --git a/examples/publisher_tls.go b/examples/publisher_tls.go deleted file mode 100644 index 32a6806..0000000 --- a/examples/publisher_tls.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/oarkflow/mq" -) - -func main() { - payload := []byte(`{"message":"Message Publisher \n Task"}`) - task := mq.Task{ - Payload: payload, - } - publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "publisher.crt", "publisher.key")) - err := publisher.Publish(context.Background(), "queue1", task) - if err != nil { - panic(err) - } - fmt.Println("Async task published successfully") - payload = []byte(`{"message":"Fire-and-Forget \n Task"}`) - task = mq.Task{ - Payload: payload, - } - result, err := publisher.Request(context.Background(), "queue1", task) - if err != nil { - panic(err) - } - fmt.Printf("Sync task published. Result: %v\n", string(result.Payload)) -} diff --git a/examples/server.go b/examples/server.go index 91aa3e5..268d00b 100644 --- a/examples/server.go +++ b/examples/server.go @@ -9,6 +9,7 @@ import ( func main() { b := mq.NewBroker(mq.WithCallback(tasks.Callback)) + // b := mq.NewBroker(mq.WithCallback(tasks.Callback), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert")) b.NewQueue("queue1") b.NewQueue("queue2") b.Start(context.Background()) diff --git a/examples/server_tls.go b/examples/server_tls.go deleted file mode 100644 index 1a58723..0000000 --- a/examples/server_tls.go +++ /dev/null @@ -1,15 +0,0 @@ -package main - -import ( - "context" - - "github.com/oarkflow/mq" - "github.com/oarkflow/mq/examples/tasks" -) - -func main() { - b := mq.NewBroker(mq.WithCallback(tasks.Callback), mq.WithTLS(true, "server.crt", "server.key"), mq.WithCAPath("ca.cert")) - b.NewQueue("queue1") - b.NewQueue("queue2") - b.Start(context.Background()) -} diff --git a/go.mod b/go.mod index 37780c3..48f8543 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,15 @@ module github.com/oarkflow/mq -go 1.23 +go 1.23.0 require ( - github.com/oarkflow/xsync v0.0.5 + github.com/oarkflow/crypto v0.0.1 github.com/oarkflow/xid v1.2.5 + github.com/oarkflow/xsync v0.0.5 ) +require ( + github.com/oarkflow/paseto v0.0.0-20231006103046-f6852c552c83 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/sys v0.13.0 // indirect +) diff --git a/go.sum b/go.sum index e0e25be..57f7357 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,12 @@ +github.com/oarkflow/crypto v0.0.1 h1:3J3Vd98fIqBaPLv3yg/CFTnsdGXftQq3MViI9R5e1h4= +github.com/oarkflow/crypto v0.0.1/go.mod h1:Wc5OqLO5ARB0GbPCuXSoE+yHnfIiBx5Kn8jX4NdMwnQ= +github.com/oarkflow/paseto v0.0.0-20231006103046-f6852c552c83 h1:3VAVJCmtUoMemvgshkQl2HP80+6M4o8kLq6NtAXJcTw= +github.com/oarkflow/paseto v0.0.0-20231006103046-f6852c552c83/go.mod h1:OSQERpM5KAg55QSZGGcehgMVqeJQ23GvV4oVRtN6aCA= github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= github.com/oarkflow/xsync v0.0.5 h1:7HBQjmDus4YFLQFC5D197TB4c2YJTVwsTFuqk5zWKBM= github.com/oarkflow/xsync v0.0.5/go.mod h1:KAaEc506OEd3ISxfhgUBKxk8eQzkz+mb0JkpGGd/QwU= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/publisher.go b/publisher.go index c343d83..9970b6e 100644 --- a/publisher.go +++ b/publisher.go @@ -75,6 +75,10 @@ func (p *Publisher) Request(ctx context.Context, queue string, task Task) (Resul return conn.Close() } } - ReadFromConn(ctx, conn, p.opts.messageHandler, p.opts.closeHandler, p.opts.errorHandler) + ReadFromConn(ctx, conn, Handlers{ + MessageHandler: p.opts.messageHandler, + CloseHandler: p.opts.closeHandler, + ErrorHandler: p.opts.errorHandler, + }) return result, nil }