implement PacketBroadcaster

This commit is contained in:
rkonfj
2024-01-20 21:53:07 +08:00
parent a85b4edfec
commit 7b54b06c0e
4 changed files with 39 additions and 117 deletions

View File

@@ -4,12 +4,10 @@ import (
"fmt"
"math/rand"
"net/http"
"strings"
_ "net/http/pprof"
"github.com/rkonfj/peerguard/p2p"
"github.com/rkonfj/peerguard/peer"
"github.com/spf13/cobra"
)
@@ -73,13 +71,13 @@ func runInteraction(network, peerID string, servers []string) error {
var read string
_, err := fmt.Scanln(&read)
if err != nil {
panic(err)
}
msg := strings.Split(read, ",")
if len(msg) != 2 {
fmt.Println("usage: peer,msg")
fmt.Println("ERR", err)
continue
}
packetConn.WriteTo([]byte(msg[1]), peer.PeerID(msg[0]))
count, err := packetConn.Broadcast([]byte(read))
if err != nil {
panic(err)
}
fmt.Println(count, "peers sent")
}
}

View File

@@ -1,109 +0,0 @@
package main
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
)
// TokenAuthSystem 包含加密密钥
type TokenAuthSystem struct {
key []byte
}
// NewTokenAuthSystem 创建一个新的 TokenAuthSystem 实例
func NewTokenAuthSystem(key []byte) *TokenAuthSystem {
return &TokenAuthSystem{key: key}
}
// PKCS7Padding 添加PKCS7填充
func PKCS7Padding(data []byte, blockSize int) []byte {
padding := blockSize - len(data)%blockSize
padText := bytes.Repeat([]byte{byte(padding)}, padding)
return append(data, padText...)
}
// PKCS7UnPadding 移除PKCS7填充
func PKCS7UnPadding(data []byte) []byte {
padding := data[len(data)-1]
return data[:len(data)-int(padding)]
}
// GenerateToken 生成加密后的 token
func (tas *TokenAuthSystem) GenerateToken(data string) (string, error) {
block, err := aes.NewCipher(tas.key)
if err != nil {
return "", err
}
// 添加PKCS7填充
dataBytes := PKCS7Padding([]byte(data), aes.BlockSize)
cipherText := make([]byte, aes.BlockSize+len(dataBytes))
iv := cipherText[:aes.BlockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return "", err
}
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(cipherText[aes.BlockSize:], dataBytes)
return base64.StdEncoding.EncodeToString(cipherText), nil
}
// VerifyToken 验证并解密 token
func (tas *TokenAuthSystem) VerifyToken(token string) (string, error) {
block, err := aes.NewCipher(tas.key)
if err != nil {
return "", err
}
cipherText, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return "", err
}
if len(cipherText) < aes.BlockSize {
return "", fmt.Errorf("invalid token")
}
iv := cipherText[:aes.BlockSize]
cipherText = cipherText[aes.BlockSize:]
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(cipherText, cipherText)
// 移除PKCS7填充
decryptedData := PKCS7UnPadding(cipherText)
return string(decryptedData), nil
}
func main() {
// 替换为你自己的密钥
key := []byte("32-byte-secret-key-1234567890123")
// 创建 TokenAuthSystem 实例
tokenAuthSystem := NewTokenAuthSystem(key)
// 生成一个 token
dataToEncrypt := "user_id:12345"
token, err := tokenAuthSystem.GenerateToken(dataToEncrypt)
if err != nil {
fmt.Println("Token generation failed:", err)
return
}
fmt.Println("Generated Token:", token)
// 验证 token
decryptedData, err := tokenAuthSystem.VerifyToken(token)
if err != nil {
fmt.Println("Token verification failed:", err)
return
}
fmt.Println("Decrypted Data:", decryptedData)
}

View File

@@ -2,6 +2,7 @@ package disco
import (
"encoding/hex"
"errors"
"fmt"
"log/slog"
"net"
@@ -299,6 +300,29 @@ func (n *UDPConn) WriteToUDP(p []byte, peerID peer.PeerID) (int, error) {
return 0, ErrUseOfClosedConnection
}
func (c *UDPConn) Broadcast(b []byte) (peerCount int, err error) {
c.peersMapMutex.RLock()
peerCount = len(c.peersMap)
peers := make([]peer.PeerID, 0, peerCount)
for k := range c.peersMap {
peers = append(peers, k)
}
c.peersMapMutex.RUnlock()
var errs []error
for _, peer := range peers {
_, err := c.WriteToUDP(b, peer)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
err = errors.Join(errs...)
}
return
}
func ListenUDP(port int, disableIPv4, disableIPv6 bool, id peer.PeerID) (*UDPConn, error) {
conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
if err != nil {

View File

@@ -14,6 +14,10 @@ import (
"storj.io/common/base58"
)
type PacketBroadcaster interface {
Broadcast([]byte) error
}
type PeerPacketConn struct {
closedSig chan struct{}
readTimeout chan struct{}
@@ -138,6 +142,11 @@ func (c *PeerPacketConn) SetWriteBuffer(bytes int) error {
return c.udpConn.SetWriteBuffer(bytes)
}
// Broadcast broadcast packet to all found peers using direct udpConn
func (c *PeerPacketConn) Broadcast(b []byte) (int, error) {
return c.udpConn.Broadcast(b)
}
// ListenPacket listen the p2p network for read/write packets
func ListenPacket(network string, cluster peer.PeermapCluster, opts ...Option) (*PeerPacketConn, error) {
id := make([]byte, 32)