From e2bbb05a3e673cd5ee76c236025081af1b92bf6e Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 1 Oct 2024 11:32:39 +0545 Subject: [PATCH] feat: Add README.md --- broker.go | 41 ++++++-- examples/ca.crt | 22 +++++ examples/ca.key | 27 +++++ examples/consumer.crt | 21 ++++ examples/consumer.key | 27 +++++ examples/consumer_tls.go | 49 ++++++++++ examples/generate_cert.go | 200 ++++++++++++++++++++++++++++++++++++++ examples/publisher.crt | 21 ++++ examples/publisher.key | 27 +++++ examples/publisher_tls.go | 65 +++++++++++++ examples/server.crt | 20 ++++ examples/server.key | 27 +++++ examples/server_tls.go | 66 +++++++++++++ options.go | 14 +++ tls.go | 51 ++++++++++ 15 files changed, 671 insertions(+), 7 deletions(-) create mode 100644 examples/ca.crt create mode 100644 examples/ca.key create mode 100644 examples/consumer.crt create mode 100644 examples/consumer.key create mode 100644 examples/consumer_tls.go create mode 100644 examples/generate_cert.go create mode 100644 examples/publisher.crt create mode 100644 examples/publisher.key create mode 100644 examples/publisher_tls.go create mode 100644 examples/server.crt create mode 100644 examples/server.key create mode 100644 examples/server_tls.go create mode 100644 tls.go diff --git a/broker.go b/broker.go index c660f32..116f247 100644 --- a/broker.go +++ b/broker.go @@ -2,6 +2,7 @@ package mq import ( "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -166,21 +167,47 @@ func (b *Broker) onError(_ context.Context, conn net.Conn, err error) { fmt.Println("Error reading from connection:", err, conn.RemoteAddr()) } +// Start the broker server with optional TLS support func (b *Broker) Start(ctx context.Context) error { - listener, err := net.Listen("tcp", b.opts.brokerAddr) - if err != nil { - return err + var listener net.Listener + var err error + + if b.opts.useTLS { + // Load the TLS certificate and key + cert, err := tls.LoadX509KeyPair(b.opts.tlsCertPath, b.opts.tlsKeyPath) + if err != nil { + return fmt.Errorf("failed to load TLS certificates: %v", err) + } + + // Configure TLS + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + + // Start TLS listener + listener, err = tls.Listen("tcp", b.opts.brokerAddr, tlsConfig) + if err != nil { + return fmt.Errorf("failed to start TLS listener: %v", err) + } + log.Println("TLS server started on", b.opts.brokerAddr) + } else { + // Start plain TCP listener + listener, err = net.Listen("tcp", b.opts.brokerAddr) + if err != nil { + return fmt.Errorf("failed to start TCP listener: %v", err) + } + log.Println("TCP server started on", b.opts.brokerAddr) } - defer func() { - _ = listener.Close() - }() - log.Println("Server started on", b.opts.brokerAddr) + defer listener.Close() + for { conn, err := listener.Accept() if err != nil { fmt.Println("Error accepting connection:", err) continue } + + // Handle the connection (same logic as before) go ReadFromConn(ctx, conn, b.opts.messageHandler, b.opts.closeHandler, b.opts.errorHandler) } } diff --git a/examples/ca.crt b/examples/ca.crt new file mode 100644 index 0000000..c366a6e --- /dev/null +++ b/examples/ca.crt @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDqDCCApCgAwIBAgIQBuz6Swcf+c/9yR95gWHeMzANBgkqhkiG9w0BAQsFADBu +MQswCQYDVQQGEwJVUzEJMAcGA1UECBMAMRAwDgYDVQQHEwdNeSBDaXR5MRIwEAYD +VQQJEwlNeSBTdHJlZXQxDjAMBgNVBBETBTAwMDAwMQ4wDAYDVQQKEwVNeSBDQTEO +MAwGA1UEAxMFTXkgQ0EwHhcNMjQxMDAxMDU0MzE2WhcNMjUxMDAxMDU0MzE2WjBu +MQswCQYDVQQGEwJVUzEJMAcGA1UECBMAMRAwDgYDVQQHEwdNeSBDaXR5MRIwEAYD +VQQJEwlNeSBTdHJlZXQxDjAMBgNVBBETBTAwMDAwMQ4wDAYDVQQKEwVNeSBDQTEO +MAwGA1UEAxMFTXkgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDD +hr0IwUOXUWzfZOdxjMEGxbp7q5jGzeC0VmjCRmvvWQgV4aX+pCYSEzDBwe+2ryFx +Kpvp75/SDTTT93bC3/wYSS0XpcJoISoSb3qVhcoQXGB7d80tPfyZO+MCPOCtnJf2 +2mRT1uN79tA2gsMLYboatTQVQ1HJZQbs7h+HQQsm2PBdIRBwZGRlh90eFzF/BUIj +rJq9Dg0xmI1lmKXW/XuUR8P8rwMf0CZSpKYAvf4P6ORF8oJlhfuj5im+eH2pYkGZ +kpA7I2+1wk8UJSIXXR4XMkd9PsByHjeRWLBDViFaiKPqf/XdHda56lxhnOQGTidU +cGud/8MsWJj5c9hdBDPPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIChDAPBgNVHRMB +Af8EBTADAQH/MB0GA1UdDgQWBBTiLIKo+Kb2S1OK6Ch3p3+ZghZfEzANBgkqhkiG +9w0BAQsFAAOCAQEAN49h/WETvhyCuLTWFgl+dGK+uM0k5yRwTFtThu7FLZdqOvis +l4WthkLt3oanNSyxs4RhDvd6oZ6lQvY7lNy4Z6U83QYR1O7dTVPb7wWrsnjUd4/l +be3qfoIo5SMPr3db0D019LI+vq5UUk4DC0YpI/DPFLL9kfDHvdtlRbIrmgbEjwwN +smu2wcbSNM22yk2P5vFE9jgRLZ4rQYpvMPrnTEACr6uLdVut6rpx2PpVq6W0vNK4 +4c2PGNxXolTlZBmz7knih3WJrgOuoaeVIN8WqR9GiZQjFPzr4AFM31q5BNUlK4TU +Xi1a0yAjs0xWu0NCq1zbwh9+SB8SuYtIvsjILQ== +-----END CERTIFICATE----- diff --git a/examples/ca.key b/examples/ca.key new file mode 100644 index 0000000..4a61a2f --- /dev/null +++ b/examples/ca.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAw4a9CMFDl1Fs32TncYzBBsW6e6uYxs3gtFZowkZr71kIFeGl +/qQmEhMwwcHvtq8hcSqb6e+f0g000/d2wt/8GEktF6XCaCEqEm96lYXKEFxge3fN +LT38mTvjAjzgrZyX9tpkU9bje/bQNoLDC2G6GrU0FUNRyWUG7O4fh0ELJtjwXSEQ +cGRkZYfdHhcxfwVCI6yavQ4NMZiNZZil1v17lEfD/K8DH9AmUqSmAL3+D+jkRfKC +ZYX7o+Ypvnh9qWJBmZKQOyNvtcJPFCUiF10eFzJHfT7Ach43kViwQ1YhWoij6n/1 +3R3WuepcYZzkBk4nVHBrnf/DLFiY+XPYXQQzzwIDAQABAoIBAAkE2w1fVM3TDLGV +RvO+6Vx1nG9979Mjxfyri7OCahIlSjEwMmb3jWYCCpq1ZmhH1cQRkhWNXxLiVxB7 +9rdwe4FnRrQzii8hcH5fNAlXnYV5rV2kngs7M76hu4vr4PVBJuVVF5GidOXP8bTB +/Vs2C86Vkyxz6X7fsR0Wss+bWXdWL8RRTrFqPlHRjaaCcOK6dDzFRafvBN1CzaJ0 +Acv03BZITVSgjTfg2kNhwyrrq6EqExxLAxXrY/3cSI+bDx68W0LxVgNTfjceY5uP +uJhnYO6riuSaSD4uz3SKUUXH6zR5RDUGAPnICgM5MEDtAA3ZY3jGfbu9cJYkx9oL +rB6+1nECgYEA39PKyhHo++/wu46P94RpLZ1+RoKiE94DoZjdw9gRa4QGJUO7KGJ+ +Utnp+KzoeB3FSaXJyGj/1cLrmccsdRdSs6l4NAihQRDM8kB+7TEUCFoNfCRDQZ+J +Pf+afv/v+jR0WdlguDhFmMnAN3euJXAuHwOaZseeIRTXu8RDPkDE6qcCgYEA36GM +5Szc0TVTbcVhK3vPEBn/CgcLyjfrDbyAQp+Y0InUaQnrpzcW37sejsiK3rVyFTeQ +WQdobm6MGT+QkO3nE1/riJmyEUq3/WRYtRnGjurq6EwYeG6QxXV6SqkA9OXZSqKX +aFfH6lUs+6m4drPPzzmNEBSxt3esZGDXq29pGpkCgYByr2Z822hxjqPetlF2FdZ+ +lPAa2NyLKXra1iTrME7ctC0h8u525uCrOxTzYkVLJpXsApK9qW9M7C8kADX7WRP7 +Ep6QqstVN3KLvhhLGJaXIO0/6qS7fy8nIUzcPe+MWEw1rXgtbEfc3aMryJrme/Bl +28bFWwrfEHrprsp1n2JGiQKBgDgROdDveYFePEeGOAF97gEcc2vhLlyJvn3YJ9QM +TXTjSYT4PsPStQJs2JF1yBNkLHETWDZp/A3L24YtAKLFcqzR3KyH1DQvpod6FB97 +keOdFD4fbfcryVIoTPvQ+XNs+RiUQR+g+ndO2ZNTDvN7y3sp86r3dUMJVwhnm0rZ +COHpAoGBAJf7y/PM4T5xeBZ4LhHzGIIWaBJRRo9JkWCyLkQBVNpDs9chK6CPUcR4 +61ouAGAIRRUJt4KTJAuyt+rnEvvCTGkpq2MtrMx5+SLmj4Soh9Fo+FTqkVA15Kr/ ++05wVMIWkVqKXGF7Um9duPRUpRHhGSML3uG6rujSQWHJqibooyje +-----END RSA PRIVATE KEY----- diff --git a/examples/consumer.crt b/examples/consumer.crt new file mode 100644 index 0000000..5b637ed --- /dev/null +++ b/examples/consumer.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDXjCCAkagAwIBAgIRAMFvjYXNdG0zTX15qxFE+V0wDQYJKoZIhvcNAQELBQAw +bjELMAkGA1UEBhMCVVMxCTAHBgNVBAgTADEQMA4GA1UEBxMHTXkgQ2l0eTESMBAG +A1UECRMJTXkgU3RyZWV0MQ4wDAYDVQQREwUwMDAwMDEOMAwGA1UEChMFTXkgQ0Ex +DjAMBgNVBAMTBU15IENBMB4XDTI0MTAwMTA1NDMxN1oXDTI1MTAwMTA1NDMxN1ow +EzERMA8GA1UEAxMIY29uc3VtZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQD4YBRg07h+khv0tFZTkhHXPlR+rXPoCVKpnlHn074d+3DVmTCWQWt5gB9s +Ow62PtG2P5OGPuoTiVIDRqafZI4Mq12LvZYpIjaWEp1k0nWrlCYU6mMixsmxnHXK +fcZBzulIE1LUyGHWpgo6J/B0aHdvsQy3ogzxnjiAkyvFkL9LqgcL0sTcItkwSqR2 +ieQMNHiQ4aaKSEw+1FJ7Ls8D0qyjZLZ4F6dLba2sdWnhUbxTDccradOq7kpacQLG +/KneGbfHWT+s2/PnLPlafM6jvZYu1EtAYMZjDTYyIM4L4L4JcFBTYTu4CcRumTre +DhyDtVlwSZpmYHM+nZBwEEqBWYJFAgMBAAGjUjBQMA4GA1UdDwEB/wQEAwIHgDAd +BgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwHwYDVR0jBBgwFoAU4iyCqPim +9ktTiugod6d/mYIWXxMwDQYJKoZIhvcNAQELBQADggEBALx6LEZBgDpedNMG7NJE +0YqfUPJc+6XBaX5Gcb0ylUtMv8LS7g3I4NT4tG8muJNV4QN4p8SkV7oufeaU1F85 +YSS11k/lT4OItxMiZZmZ1+5DnESOP0HlSnT1gljpqBHdjymJi83Ls/qQ2GEer7x6 +IOM/N5fwi+1tmVcWHL0v+J0JF+4szz0SEXPc5ML/9j5oiAeg/YycfWXxZHPWsvPj +TX3RcltYOwyX20MAOT5Zk6SqhDOMAr3K48/4Fy4FDGwLnDU53SQCH5Beu1YgG9oW +pSTOLk0D6FLgJUVNAd8e6YA7o4mESW/3sK6I52VNr6JwRRnGZKC41Qga7XBdkLpI +nk8= +-----END CERTIFICATE----- diff --git a/examples/consumer.key b/examples/consumer.key new file mode 100644 index 0000000..1810943 --- /dev/null +++ b/examples/consumer.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEA+GAUYNO4fpIb9LRWU5IR1z5Ufq1z6AlSqZ5R59O+Hftw1Zkw +lkFreYAfbDsOtj7Rtj+Thj7qE4lSA0amn2SODKtdi72WKSI2lhKdZNJ1q5QmFOpj +IsbJsZx1yn3GQc7pSBNS1Mhh1qYKOifwdGh3b7EMt6IM8Z44gJMrxZC/S6oHC9LE +3CLZMEqkdonkDDR4kOGmikhMPtRSey7PA9Kso2S2eBenS22trHVp4VG8Uw3HK2nT +qu5KWnECxvyp3hm3x1k/rNvz5yz5WnzOo72WLtRLQGDGYw02MiDOC+C+CXBQU2E7 +uAnEbpk63g4cg7VZcEmaZmBzPp2QcBBKgVmCRQIDAQABAoIBAQCDL4Qz7D/bImL/ +qaya8WDY9kP8sLKykRIHIucR8mXNQjxDpWjjQ+R3RTPTn5HSsnVjtErAMkTCUtpt +foiNUbgxeuWq5FUGntqEA4r8cKjUGijPDUmVWbe4RvJ0JGBt69KNTh+G4dvDWum1 +89huM6s55CLabw41JCOlzZXmAOD5Hee+5w9HxqboLSFO93wBMT1olOLKsTI0lY3z +dDNibkvk/USPHzR5sesS+9wq9IbuxK7n6mJs0/2SYRAu7C+LYbDwhcZ5dwLXg83R +n2X4PDYtKRM5rTSwcDhY2fcfPKIPSJ/lBR7uRZgheoLyCAzBUslFN8DZeC+ieDEi +Gj2FalGhAoGBAP5qONtobitWHjqeMuIz2RkLPxjwIvMj2+tIKb1F1ZkXA/ePwjDZ +B6WMUF5KhtUsuzWHNS3ZZFB7utMirJoM5RxemiP3X7E2+vEiH0VMBGu75MKZuAP+ +7qXnodIoWxEhKr59iqzdPGxrgJGpj3cR0F2TPIvJykWXS0nDyFc4aq09AoGBAPns +OYIoh5U2FCyjWCGtqPxkZ3lZPerJR6acA7l4dQf8opXFJ4+ZpPDUfOHTxZngvWTX +dI0Ivox2rCsSSmDkMhzDwmRVEoAoK4st0U2taGcEOJtz2vr8MlGoJQnT+nUuikBr +WwlUuQSB5a4ANSSBsAMLa+Xy5LoEmLFgA7THjgmpAoGBAKbaOUHEIoSvbRHakNqD +UH05P/919iXpvaB36k/kjepiCssAcEYi//3VoRvF2tnSBVFcxEa2jTCvhN5VnkdF +77iEXqj54rtRqJAeZc3Hxhp11ti9gc61EgSifiFbMCZyzxqxMRKf4bHlayOcxac8 +ep/0IUA4bjzntYvCKBDzwhqtAoGAa9k1g+Zrg8c/c3fVm0ruZmQJhwMsxfmv8sTW +kp3ZmuW5X3ohtUUvOUHo1ibl2Z5y/GZBhG7mk4TOjROqhx9SRLlxVIylnCo5iCjw +bl3LdNEMgIDBWZelOzmdKh2QsiqwPFZXhbJK2RgY/jpWtHdGdjniiOuFKoS7Q6fU +UZnfwGkCgYEAuuQLQQD3Nsdd9hkXqu0rX6zBubMPYSIuCA18FjlDBbP8JGzD4tKC +eJpraeJMqaTAVpoEU0zwSePbaCsSR7iAdJuGoR3G1Y0KIpqJD0Qz4QP064fz9WNo +0U6JfMBRi6k35TRQE9iYIAjP+Iewooc90DV5sWbpEpvVKvjvJGh8ZN4= +-----END RSA PRIVATE KEY----- diff --git a/examples/consumer_tls.go b/examples/consumer_tls.go new file mode 100644 index 0000000..780c281 --- /dev/null +++ b/examples/consumer_tls.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" + "log" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" +) + +func main() { + // Load consumer's certificate and private key + cert, err := tls.LoadX509KeyPair("consumer.crt", "consumer.key") + if err != nil { + log.Fatalf("Failed to load consumer certificate and key: %v", err) + } + + // Load the CA certificate + caCert, err := ioutil.ReadFile("ca.crt") + if err != nil { + log.Fatalf("Failed to read CA certificate: %v", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Configure TLS for the consumer + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, // Ensure we verify the server certificate + } + + // Dial TLS connection to the broker + conn, err := tls.Dial("tcp", "localhost:8443", tlsConfig) + if err != nil { + log.Fatalf("Failed to connect to broker: %v", err) + } + defer conn.Close() + + consumer := mq.NewConsumer("consumer-1") + consumer.RegisterHandler("queue1", tasks.Node1) + consumer.RegisterHandler("queue2", tasks.Node2) + + // Start consuming tasks + consumer.Consume(context.Background()) +} diff --git a/examples/generate_cert.go b/examples/generate_cert.go new file mode 100644 index 0000000..2f334ec --- /dev/null +++ b/examples/generate_cert.go @@ -0,0 +1,200 @@ +package main + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "log" + "math/big" + "os" + "time" +) + +func main() { + // 1. Generate the CA private key + caPrivateKey, err := generatePrivateKey(2048) + if err != nil { + log.Fatalf("Failed to generate CA private key: %v", err) + } + err = savePrivateKey("ca.key", caPrivateKey) + if err != nil { + log.Fatalf("Failed to save CA private key: %v", err) + } + + // 2. Generate the CA certificate + caCertBytes, caCert, err := generateCACertificate(caPrivateKey) + if err != nil { + log.Fatalf("Failed to generate CA certificate: %v", err) + } + err = saveCertificate("ca.crt", caCertBytes) + if err != nil { + log.Fatalf("Failed to save CA certificate: %v", err) + } + + log.Println("CA Certificate and key generated") + + // 3. Generate the server certificate + serverCertBytes, serverPrivateKey, err := generateSignedCertificate(caCert, caPrivateKey, "server", true) + if err != nil { + log.Fatalf("Failed to generate server certificate: %v", err) + } + err = saveCertificate("server.crt", serverCertBytes) + if err != nil { + log.Fatalf("Failed to save server certificate: %v", err) + } + err = savePrivateKey("server.key", serverPrivateKey) + if err != nil { + log.Fatalf("Failed to save server private key: %v", err) + } + + log.Println("Server certificate and key generated") + + // 4. Generate the publisher certificate + publisherCertBytes, publisherPrivateKey, err := generateSignedCertificate(caCert, caPrivateKey, "publisher", false) + if err != nil { + log.Fatalf("Failed to generate publisher certificate: %v", err) + } + err = saveCertificate("publisher.crt", publisherCertBytes) + if err != nil { + log.Fatalf("Failed to save publisher certificate: %v", err) + } + err = savePrivateKey("publisher.key", publisherPrivateKey) + if err != nil { + log.Fatalf("Failed to save publisher private key: %v", err) + } + + log.Println("Publisher certificate and key generated") + + // 5. Generate the consumer certificate + consumerCertBytes, consumerPrivateKey, err := generateSignedCertificate(caCert, caPrivateKey, "consumer", false) + if err != nil { + log.Fatalf("Failed to generate consumer certificate: %v", err) + } + err = saveCertificate("consumer.crt", consumerCertBytes) + if err != nil { + log.Fatalf("Failed to save consumer certificate: %v", err) + } + err = savePrivateKey("consumer.key", consumerPrivateKey) + if err != nil { + log.Fatalf("Failed to save consumer private key: %v", err) + } + + log.Println("Consumer certificate and key generated") +} + +// Generate a private key +func generatePrivateKey(bits int) (*rsa.PrivateKey, error) { + privateKey, err := rsa.GenerateKey(rand.Reader, bits) + if err != nil { + return nil, err + } + return privateKey, nil +} + +// Save private key to file +func savePrivateKey(filename string, privateKey *rsa.PrivateKey) error { + keyOut, err := os.Create(filename) + if err != nil { + return err + } + defer keyOut.Close() + + privateKeyPEM := pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(privateKey), + } + + return pem.Encode(keyOut, &privateKeyPEM) +} + +// Generate a self-signed certificate for the CA +func generateCACertificate(privateKey *rsa.PrivateKey) ([]byte, *x509.Certificate, error) { + serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128)) + if err != nil { + return nil, nil, err + } + + caTemplate := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"My CA"}, + Country: []string{"US"}, + Province: []string{""}, + Locality: []string{"My City"}, + StreetAddress: []string{"My Street"}, + PostalCode: []string{"00000"}, + CommonName: "My CA", + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(365 * 24 * time.Hour), // 1 year validity + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + IsCA: true, + BasicConstraintsValid: true, + MaxPathLen: 0, + } + + caCertBytes, err := x509.CreateCertificate(rand.Reader, &caTemplate, &caTemplate, &privateKey.PublicKey, privateKey) + if err != nil { + return nil, nil, err + } + + caCert, err := x509.ParseCertificate(caCertBytes) + if err != nil { + return nil, nil, err + } + + return caCertBytes, caCert, nil +} + +// Save the certificate to a file +func saveCertificate(filename string, certBytes []byte) error { + certOut, err := os.Create(filename) + if err != nil { + return err + } + defer certOut.Close() + + certPEM := pem.Block{ + Type: "CERTIFICATE", + Bytes: certBytes, + } + return pem.Encode(certOut, &certPEM) +} + +// Generate a signed certificate using the CA +func generateSignedCertificate(caCert *x509.Certificate, caPrivateKey *rsa.PrivateKey, commonName string, isServer bool) ([]byte, *rsa.PrivateKey, error) { + privateKey, err := generatePrivateKey(2048) + if err != nil { + return nil, nil, err + } + + serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128)) + if err != nil { + return nil, nil, err + } + + certTemplate := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: commonName, + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(365 * 24 * time.Hour), // 1 year + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + } + + if isServer { + certTemplate.KeyUsage |= x509.KeyUsageKeyEncipherment + certTemplate.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth} + } + + certBytes, err := x509.CreateCertificate(rand.Reader, &certTemplate, caCert, &privateKey.PublicKey, caPrivateKey) + if err != nil { + return nil, nil, err + } + + return certBytes, privateKey, nil +} diff --git a/examples/publisher.crt b/examples/publisher.crt new file mode 100644 index 0000000..6e205be --- /dev/null +++ b/examples/publisher.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDXzCCAkegAwIBAgIRANiBUeaWr8u7wGyCMy1E+ckwDQYJKoZIhvcNAQELBQAw +bjELMAkGA1UEBhMCVVMxCTAHBgNVBAgTADEQMA4GA1UEBxMHTXkgQ2l0eTESMBAG +A1UECRMJTXkgU3RyZWV0MQ4wDAYDVQQREwUwMDAwMDEOMAwGA1UEChMFTXkgQ0Ex +DjAMBgNVBAMTBU15IENBMB4XDTI0MTAwMTA1NDMxNloXDTI1MTAwMTA1NDMxNlow +FDESMBAGA1UEAxMJcHVibGlzaGVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEApu84WKcbivql7d/torz6LCHHW/6aq7peXyYea+cquOclqhlvouVgRm1G +Y2Ef8qVmjtPMqksvcRE9+xTZRi5L1AXFehI3CHJyg2RFH3RBqbZDoF886w/NyMIx +rjh6iLzFFkcWMndpgSdMMxmtD47av04qsQCTBfFeO8obtqPEY7O3ps9erPmvKoex ++xzlK5LLWm/1lFF4kZQVKynvKhiRLB/L4HprpuzlyVRykxJXyM8TbiZoK97jISDE +32vQoZrME+jSUs3EyxEft0E/ZVPT8ImLfZnJp9FDrr9CpekHjBUYG3Hwvca//tWh +GzwVTpdnd1oIfSWKxM7APAh60K4ZDwIDAQABo1IwUDAOBgNVHQ8BAf8EBAMCB4Aw +HQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB8GA1UdIwQYMBaAFOIsgqj4 +pvZLU4roKHenf5mCFl8TMA0GCSqGSIb3DQEBCwUAA4IBAQBI05MwvoC3v9K2hUIY +jFrFqnkJGF1cswOlmr5vPUvyH89JeM4IpPXiWCPZfVe2ABitfqDj+qzuHJvcZdLi +CcGnnuUtb5M6Fjgn0TLGU9W0/0rjL7/8mweI79Kk2uhViKNO4YVnKtTKekTZ0T/z +jeesF9V7TxhDt1dZ5VytNSYYuFVpSEkC1W5guhntpdqTrtAEInuvL3OTogjuOrzU +lNemWoRVicIYCo1gjgjAOvIZlpjDc8f6lKWowo38Shq3y/m7LsHb6yxBwB5HGTyu +Y0+5xpr+PTsB+KRTvGr6R9I3MupAv9brHSG79Y8J4EgUiWtrmm5DzzK0aH8BAMwh +IQU8 +-----END CERTIFICATE----- diff --git a/examples/publisher.key b/examples/publisher.key new file mode 100644 index 0000000..9a90409 --- /dev/null +++ b/examples/publisher.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEApu84WKcbivql7d/torz6LCHHW/6aq7peXyYea+cquOclqhlv +ouVgRm1GY2Ef8qVmjtPMqksvcRE9+xTZRi5L1AXFehI3CHJyg2RFH3RBqbZDoF88 +6w/NyMIxrjh6iLzFFkcWMndpgSdMMxmtD47av04qsQCTBfFeO8obtqPEY7O3ps9e +rPmvKoex+xzlK5LLWm/1lFF4kZQVKynvKhiRLB/L4HprpuzlyVRykxJXyM8TbiZo +K97jISDE32vQoZrME+jSUs3EyxEft0E/ZVPT8ImLfZnJp9FDrr9CpekHjBUYG3Hw +vca//tWhGzwVTpdnd1oIfSWKxM7APAh60K4ZDwIDAQABAoIBADK1kGQ9vvwkz6rG +zyUUStqFGE41tT5dSrWUmsEkY7HvZV2ahHLzQp+iKjOeGVpYLCNO62j/ldW+6xkO +aEINZjNt3WMLjXQfb/HBz82SPzASkdBxncu57UUC2JyXw4Et9qChz3hdFcuJ6HfN +YOjM+F4MnKNQDVlMzNdUub5tWQb/3u+jWMbJLmdieyjHCFac9g6ACYMe09ODkYGU +ES//0v6+v1st3RKRKXbKAsCS7v3vbR2WFzJ+D/rZDwoQDQs1R9QGOdZoFcO4acFW +joNGhBIrrsIy35VlJoX7tGsJmssiiigEp+5ePzwv9vO8z2u0JqiOhM+Qlso9JEMs +UhIvTMECgYEA3aR2YccwSQ9Nn/0qpEISRgy9oRdv8QRDIeUYRyIy0jEndBy77Sn3 ++T/Yn0POA9ZrKV957Qa8HsDoZtAgiPi17lrp5XpMlz3z9WUnDgMD6DSNY82RAKES +jBJWcmN1kUB4PprBc3sD+po9qgGtmCLevwCclMM/tNB/c2l/O6yohJcCgYEAwM/B +jZzgOK44phZm5b7K/gg68gyJ/3jaK4fWDqViIfZ5Apy9Ts0Dyeu7/51gn6Z9fZCC +jNTuEHBboPBKG74OVh50LTx4/0/thBtEUkenoO/I6YcsM8D8zWE6iocXx0xrJS86 +phAELYVRTWoO1YhC46iD+tLGbdyZLq0vhnN5RkkCgYEAmbkNWjeap7FzdzlRr+cb +YXZIt0fYHeOE5CdOtVdkxB/Pa1rahF0NXdQIXD3czxCR9nn/yINZSkMlbNmBFz7Y +f96SRtSR9nvDjjl/4tn4hb6dKdFTdopAoOG/D0soKXv7agBPl9aEJfWAOz86tT/K +GdNExnC86J2LJ/LNigfwQTUCgYEAi0CA5an7NnR9bVaYki0tpjKDf/UWZW//AZll +O8auDSFZXE2fW54tslOSv5YGBsfTsjAAWX4fQbgkNSPWIhstd30fItyd9qbfP54B +xfsCUfVcAW0iu24SJrKl+Q6AWewTJhPqI61FlyrGAUcr14RF6u32A9zeetHrwA5M +WKplWCECgYAiPjT2mYE0eWGNgfSkH+7y/q5avGv8Hovq+Z2Z3NtXDd+S2xktXmmp +5/MtGohgz5602D0k+nc9H3LrihlK4RxFxI9XA51XxnFce11PdazS5Qh0NKXyJrcF +fOSRcExDqRSPUCkJK6dfzbzWTFwFRizVe0itvIlqMKp0qIVpxQxfSw== +-----END RSA PRIVATE KEY----- diff --git a/examples/publisher_tls.go b/examples/publisher_tls.go new file mode 100644 index 0000000..d0a5ed0 --- /dev/null +++ b/examples/publisher_tls.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + + "github.com/oarkflow/mq" +) + +func main() { + // Load publisher's certificate and private key + cert, err := tls.LoadX509KeyPair("publisher.crt", "publisher.key") + if err != nil { + log.Fatalf("Failed to load publisher certificate and key: %v", err) + } + + // Load the CA certificate + caCert, err := ioutil.ReadFile("ca.crt") + if err != nil { + log.Fatalf("Failed to read CA certificate: %v", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Configure TLS for the publisher + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, // Ensure we verify the server certificate + } + + // Dial TLS connection to the broker + conn, err := tls.Dial("tcp", "localhost:8443", tlsConfig) + if err != nil { + log.Fatalf("Failed to connect to broker: %v", err) + } + defer conn.Close() + + payload := []byte(`{"message":"Message Publisher \n Task"}`) + task := mq.Task{ + Payload: payload, + } + + publisher := mq.NewPublisher("publish-1") + err = publisher.Publish(context.Background(), "queue1", task) + if err != nil { + log.Fatalf("Failed to publish task: %v", err) + } + fmt.Println("Async task published successfully") + + // Example for request (sync) + payload = []byte(`{"message":"Fire-and-Forget \n Task"}`) + task = mq.Task{ + Payload: payload, + } + result, err := publisher.Request(context.Background(), "queue1", task) + if err != nil { + log.Fatalf("Failed to send sync request: %v", err) + } + fmt.Printf("Sync task published. Result: %v\n", string(result.Payload)) +} diff --git a/examples/server.crt b/examples/server.crt new file mode 100644 index 0000000..cfd7865 --- /dev/null +++ b/examples/server.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDUjCCAjqgAwIBAgIRAMdsW2PjhMRI5lI74hlnjacwDQYJKoZIhvcNAQELBQAw +bjELMAkGA1UEBhMCVVMxCTAHBgNVBAgTADEQMA4GA1UEBxMHTXkgQ2l0eTESMBAG +A1UECRMJTXkgU3RyZWV0MQ4wDAYDVQQREwUwMDAwMDEOMAwGA1UEChMFTXkgQ0Ex +DjAMBgNVBAMTBU15IENBMB4XDTI0MTAwMTA1NDMxNloXDTI1MTAwMTA1NDMxNlow +ETEPMA0GA1UEAxMGc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC +AQEAxMrkyIFnEaGoZ9Z5iVVdSzoq8FTrJE3iGCZYjLRLTO/1Hq6L5C6tDqzYq3fv +V64G3B6yuWYE1SpqJ5C8T9G89Gc1jp6ZklP92nL+S7hOPjWsm+y33vM4WQQzqmY7 +BucE4yMXVZSAkr4uCe9/iTIeUBYgDOPmoJRwOS+y9mlBi6gqoWre3NDHbt9h4zim +Hg2Nsd6HT0kKcSKhrr3Xz87o8pWHyi/O7hexB3WBLIjgX43Wh0jhxwZ84FVHyCH3 +VR1UuhrInUxrWBE2HF9hhRp/8RUMgPggYIXDTNycUBJy0PEjBHy1s1hIqX75tEfP +JHNQj0NCHJ7UPFf7x1GsKPF62QIDAQABo0gwRjAOBgNVHQ8BAf8EBAMCBaAwEwYD +VR0lBAwwCgYIKwYBBQUHAwEwHwYDVR0jBBgwFoAU4iyCqPim9ktTiugod6d/mYIW +XxMwDQYJKoZIhvcNAQELBQADggEBACdY5KqLAXNHoZDof02daC+veHMM09VhFBrZ +UugYnXh6xmP+cKiINKfylr3Vqdtt4JXXMR8teTdv/Dk5ho17XtWyHbQ22bZN2DwH +vCgs4pPyqZvwfuBuWJ85fiu7B0AsrInSmdxMJeBIenTWyWU4bg19NsTejfJKIhk9 +dkvTLWryCZpoaA8lQZ+l39p10/L2PPnOdNU+TOzsbrJKnZkwCdlkAvZhyaVzTQfk +YSjVr1cakmq5T9u/8kWeb3Bx1z0GVXy0Jbgr1XBUv88IuGYH/KnrEfCweN5B+KQ7 +zwGD9PPu7E7+TpFkRH5uzv+3y8C6bICetlhWnWhQ237IEaoRpfU= +-----END CERTIFICATE----- diff --git a/examples/server.key b/examples/server.key new file mode 100644 index 0000000..d1e01d4 --- /dev/null +++ b/examples/server.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAxMrkyIFnEaGoZ9Z5iVVdSzoq8FTrJE3iGCZYjLRLTO/1Hq6L +5C6tDqzYq3fvV64G3B6yuWYE1SpqJ5C8T9G89Gc1jp6ZklP92nL+S7hOPjWsm+y3 +3vM4WQQzqmY7BucE4yMXVZSAkr4uCe9/iTIeUBYgDOPmoJRwOS+y9mlBi6gqoWre +3NDHbt9h4zimHg2Nsd6HT0kKcSKhrr3Xz87o8pWHyi/O7hexB3WBLIjgX43Wh0jh +xwZ84FVHyCH3VR1UuhrInUxrWBE2HF9hhRp/8RUMgPggYIXDTNycUBJy0PEjBHy1 +s1hIqX75tEfPJHNQj0NCHJ7UPFf7x1GsKPF62QIDAQABAoIBAESksSDvYlBYHzH5 +MfOhfyVaaNfkBxFmyVK7LXAHA60Wll3ZbJpvXZYc3IcTEr12ypXFb3oUB+ODI/wh +FE6TTmHCDoBs+gx8l7O3INSwuToh5s+MxqZSGHmUaaEqf7RsqNvBxcXoQuDszYpR +rB7jCIfO7+cPJ8cjf/Gyna4uENrxdQgy8ESDYiLif5RhXZtYcuJ1dlojAshrzgz+ +PhFN7TeXTYNAit9txVybHt6m7hCzkmGjyUAhIrNoeIxizIu4osu/IoRrwhCuBRSw +3zqvOErr7JG2gI8Wj5Bs9Mkwn9iJeB4tZzGAwmw2t2eqqfdVNFFUNHE7vVRZ03/T +t/DwjLECgYEA1c79kQjzhwbuccPEOpBwl6hBngbbNw5+PtqBtizo12T3b2LsOqlZ +eb2G+5yUFW2enrZIl+KS6iCrutW1wHbxIxYijy13hlp/ecmLJeDRRdseOEagh8Re +NticHiNjluTns/sBl56unpm2lw+dp75cm+R1IiRkyiOdPcdvsfaPUZsCgYEA66BN +Z7mS6HL/juMEO5IGVluVMpIwq8seVWG8s6vQF1COlYfC/tWwxM/+95bKI/SmCMTK +83CtDrvGQ6dax2QjsxeP9mJ1GKIwEGrFrPn+vsmh0KBChG9HxVq2miWicH32WA+8 +MDwVFQUQa0MI1LsSrZsqhBGq+3KkKuBPuejTVpsCgYASnzCejTUIsaXa6r4Qi7wC +uXjdlqNJLE36k3VwtICjIfwbC3aftVhBriwvhfev1hhWonG4KNe65JWQdEScOr/N +2oOwDLm4TfGEXfVsmyQe/XKoXB5nNMcv57XROivWXKGBn38IAZ4b2i95ALcugPn3 +6fH5w0m0AV4Un2YvDdZ1uQKBgH2kUuIWYDG28HK+tskVCnAOEbaPoYhZnOkmXrrn +yORFvmIZrG66f7HSv0BClbMqh0ZxuU6qLH2IvyXgHVXpHegnjkpxIcNq6Ho4lQOx +opcVaUWXzyBTPlAMGQaFPuMBJ9S5Pz3xK8SzmJe5fQICZulPrhISYbwG22dJiPm3 +Hso1AoGAUqBG8FEyw6qQ+M3wg1n79QAWKQO8cJft7MU5ruTTJEZzm8kSqMzJ8UPz +C/CQiCehFjKNflYUZQV6RNWJp6H2vhe2qprDV88j6fhKfFi4va3CsELAKE/tAWs/ +WqydAe/dOQ8HwJCBrC4vRds8KOQTxPJUhPm/eM0Jf2Zocs5Axdc= +-----END RSA PRIVATE KEY----- diff --git a/examples/server_tls.go b/examples/server_tls.go new file mode 100644 index 0000000..772f4c4 --- /dev/null +++ b/examples/server_tls.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + "net" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" +) + +func main() { + // Load the server's certificate and key + cert, err := tls.LoadX509KeyPair("server.crt", "server.key") + if err != nil { + log.Fatalf("Failed to load server certificate and key: %v", err) + } + + // Load the CA certificate + caCert, err := ioutil.ReadFile("ca.crt") + if err != nil { + log.Fatalf("Failed to read CA certificate: %v", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Configure TLS for the server + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, // Mutual TLS + } + + // Start a TLS listener + listener, err := tls.Listen("tcp", ":8443", tlsConfig) + if err != nil { + log.Fatalf("Failed to start TLS listener: %v", err) + } + defer listener.Close() + + b := mq.NewBroker(mq.WithCallback(tasks.Callback)) + b.NewQueue("queue1") + b.NewQueue("queue2") + + log.Println("TLS-enabled broker started on :8443") + + // Handle incoming connections + for { + conn, err := listener.Accept() + if err != nil { + fmt.Println("Error accepting connection:", err) + continue + } + go handleConnection(b, conn) + } +} + +func handleConnection(b *mq.Broker, conn net.Conn) { + defer conn.Close() + ctx := context.Background() + b.Start(ctx) +} diff --git a/options.go b/options.go index c1e3d3a..ee055e1 100644 --- a/options.go +++ b/options.go @@ -16,6 +16,10 @@ type Options struct { initialDelay time.Duration maxBackoff time.Duration jitterPercent float64 + useTLS bool // Add a flag to enable/disable TLS + tlsCertPath string // Path to TLS certificate + tlsKeyPath string // Path to TLS private key + tlsCAPath string // Path to the Certificate Authority (optional) } func defaultOptions() Options { @@ -39,6 +43,16 @@ func WithBrokerURL(url string) Option { } } +// Option to enable/disable TLS +func WithTLS(enableTLS bool, certPath, keyPath, caPath string) Option { + return func(o *Options) { + o.useTLS = enableTLS + o.tlsCertPath = certPath + o.tlsKeyPath = keyPath + o.tlsCAPath = caPath + } +} + // WithSyncMode - func WithSyncMode(mode bool) Option { return func(opts *Options) { diff --git a/tls.go b/tls.go new file mode 100644 index 0000000..15a168e --- /dev/null +++ b/tls.go @@ -0,0 +1,51 @@ +package mq + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "os" +) + +// Connect to broker with optional TLS support +func connectToBroker(address string, useTLS bool, certFile string, caCertFile string) (net.Conn, error) { + if useTLS { + // Load the client certificate + cert, err := tls.LoadX509KeyPair(certFile, certFile) + if err != nil { + return nil, err + } + + // Load CA certificate if provided (optional) + var tlsConfig *tls.Config + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + } else { + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + InsecureSkipVerify: true, // For testing without CA verification + } + } + + // Dial TLS connection + conn, err := tls.Dial("tcp", address, tlsConfig) + if err != nil { + return nil, fmt.Errorf("failed to connect to broker via TLS: %v", err) + } + return conn, nil + } + + // If not using TLS, use plain TCP connection + return net.Dial("tcp", address) +}