From 6fc45333d8367ef02a2e633e12150ce52de0abf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Thu, 13 Mar 2025 01:52:35 +0900 Subject: [PATCH] feat: impl. trans & ttl --- config/cfg.go | 1 + gold/head/box.go | 21 +++++- gold/head/flags.go | 2 +- gold/head/packet_test.go | 12 +-- gold/head/raw.go | 20 +++++ gold/head/unbox.go | 7 +- gold/link/event.go | 22 +++--- gold/link/listen.go | 128 ++++++++++++-------------------- gold/link/me.go | 14 +++- gold/link/nat.go | 4 +- gold/link/peer.go | 33 +++++++- gold/link/recv.go | 32 +++++++- gold/link/send.go | 4 +- gold/proto/{ => data}/data.go | 4 +- gold/proto/hello.go | 30 -------- gold/proto/hello/hello.go | 26 +++++++ gold/proto/nat.go | 121 ------------------------------ gold/proto/nat/nat.go | 112 ++++++++++++++++++++++++++++ upper/services/tunnel/tunnel.go | 8 +- upper/services/wg/wg.go | 5 +- 20 files changed, 335 insertions(+), 271 deletions(-) create mode 100644 gold/head/raw.go rename gold/proto/{ => data}/data.go (60%) delete mode 100644 gold/proto/hello.go create mode 100644 gold/proto/hello/hello.go delete mode 100644 gold/proto/nat.go create mode 100644 gold/proto/nat/nat.go diff --git a/config/cfg.go b/config/cfg.go index 201f617..622d673 100644 --- a/config/cfg.go +++ b/config/cfg.go @@ -19,6 +19,7 @@ type Config struct { SpeedLoop uint16 `yaml:"SpeedLoop"` Mask uint64 `yaml:"Mask"` // Mask 是异或报文所用掩码, 必须保证各端统一 Base14 bool `yaml:"Base14"` // Base14 是否将最终报文进行 base16384 编码后再发送 + MaxTTL uint8 `yaml:"MaxTTL"` // MaxTTL 默认 64 Peers []Peer `yaml:"Peers"` } diff --git a/gold/head/box.go b/gold/head/box.go index bb98610..f8e7702 100644 --- a/gold/head/box.go +++ b/gold/head/box.go @@ -17,14 +17,18 @@ import ( // PreCRC64 calculate crc64 checksum without idxdatsz. func (p *Packet) PreCRC64() (crc uint64) { w := bin.SelectWriter() + // 固定 TTL 为 0 计算 if bin.IsLittleEndian { + ttl := p.TTL + p.TTL = 0 w.Write((*[PacketHeadNoCRCLen]byte)( (unsafe.Pointer)(p), )[:]) + p.TTL = ttl } else { w.WriteUInt32(p.idxdatsz) w.WriteUInt32(uint32(p.randn)) - w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto)) + w.WriteUInt16(uint16(p.Proto)) // TTL is set to 0 w.WriteUInt16(p.SrcPort) w.WriteUInt16(p.DstPort) w.WriteUInt16(p.Offset) @@ -46,11 +50,16 @@ func (p *Packet) PreCRC64() (crc uint64) { // WriteHeaderTo write header bytes to buf // with crc64 checksum. func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { + // 固定 TTL 为 0 计算 if bin.IsLittleEndian { buf.Write((*[PacketHeadNoCRCLen]byte)( (unsafe.Pointer)(p), )[:]) - p.md5h8rem = int64(algo.MD5Hash8(buf.Bytes())) + pbuf.NewBytes(buf.Len()).V(func(b []byte) { + copy(b, buf.Bytes()) + ClearTTL(b) + p.md5h8rem = int64(algo.MD5Hash8(b)) + }) _ = binary.Write(buf, binary.LittleEndian, p.md5h8rem) return } @@ -63,8 +72,12 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { w.WriteUInt16(p.Offset) w.Write(p.src[:]) w.Write(p.dst[:]) - w.P(func(b *pbuf.Buffer) { - p.md5h8rem = int64(algo.MD5Hash8(b.Bytes())) + w.P(func(buf *pbuf.Buffer) { + pbuf.NewBytes(buf.Len()).V(func(b []byte) { + copy(b, buf.Bytes()) + ClearTTL(b) + p.md5h8rem = int64(algo.MD5Hash8(b)) + }) }) w.WriteUInt64(uint64(p.md5h8rem)) w.P(func(b *pbuf.Buffer) { diff --git a/gold/head/flags.go b/gold/head/flags.go index bf913a6..4af6e5c 100644 --- a/gold/head/flags.go +++ b/gold/head/flags.go @@ -7,7 +7,7 @@ import ( const ( hasmorebit FlagsProto = 0x20 << iota nofragbit - topbit //TODO: 改为 trans 标记 + topbit ) const ( diff --git a/gold/head/packet_test.go b/gold/head/packet_test.go index 37ff826..b9ea499 100644 --- a/gold/head/packet_test.go +++ b/gold/head/packet_test.go @@ -27,13 +27,13 @@ func TestBuilderNative(t *testing.T) { Split(16384, false)[0]).Trans() s := hex.EncodeToString(dat) if s[:8] != "12004593" { - panic("1") + panic(s[:8]) } if s[16:48] != "03ff05000a0000000102030406070809" { - panic("2") + panic(s[16:48]) } if s[80:] != "30313233343536373839" { - panic("3") + panic(s[80:]) } p, err := ParsePacketHeader(dat) if err != nil { @@ -88,13 +88,13 @@ func TestBuilderBE(t *testing.T) { Split(16384, false)[0]).Trans() s := hex.EncodeToString(dat) if s[:8] != "12004593" { - panic("1") + panic(s[:8]) } if s[16:48] != "03ff05000a0000000102030406070809" { - panic("2") + panic(s[16:48]) } if s[80:] != "30313233343536373839" { - panic("3") + panic(s[80:]) } p, err := ParsePacketHeader(dat) if err != nil { diff --git a/gold/head/raw.go b/gold/head/raw.go new file mode 100644 index 0000000..1ef12b3 --- /dev/null +++ b/gold/head/raw.go @@ -0,0 +1,20 @@ +package head + +import ( + "unsafe" +) + +const ( + ttloffset = unsafe.Offsetof(Packet{}.TTL) +) + +// ClearTTL for hash use +func ClearTTL(data []byte) { + data[ttloffset] = 0 +} + +// DecTTL on transferring +func DecTTL(data []byte) (drop bool) { + data[ttloffset]-- + return data[ttloffset] == 0 +} diff --git a/gold/head/unbox.go b/gold/head/unbox.go index 39abdc1..2d84550 100644 --- a/gold/head/unbox.go +++ b/gold/head/unbox.go @@ -54,7 +54,12 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { } return } - crc := algo.MD5Hash8(data[:PacketHeadNoCRCLen]) + var crc uint64 + pbuf.NewBytes(int(PacketHeadNoCRCLen)).V(func(b []byte) { + copy(b, data[:PacketHeadNoCRCLen]) + ClearTTL(b) + crc = algo.MD5Hash8(b) + }) if crc != uint64(pb.DAT.md5h8rem) { err = ErrBadCRCChecksum if config.ShowDebugLog { diff --git a/gold/link/event.go b/gold/link/event.go index eea7451..deb4f07 100644 --- a/gold/link/event.go +++ b/gold/link/event.go @@ -1,22 +1,22 @@ package link import ( - "strconv" - + "github.com/RomiChan/syncx" "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/orbyte/pbuf" ) // 事件分发器 -var dispachers map[uint8]EventDispacher = make(map[uint8]EventDispacher) +var dispachers syncx.Map[uint8, Dispacher] -type EventDispacher func(header *head.Packet, peer *Link, data pbuf.Bytes) +type Dispacher func(header *head.Packet, peer *Link, data pbuf.Bytes) -// AddProto is thread unsafe. Use in init() only. -func AddProto(p uint8, d EventDispacher) { - _, ok := dispachers[p] - if ok { - panic("proto " + strconv.Itoa(int(p)) + " has been registered") - } - dispachers[p] = d +// RegisterDispacher of proto +func RegisterDispacher(p uint8, d Dispacher) (actual Dispacher, hasexist bool) { + return dispachers.LoadOrStore(p, d) +} + +// GetDispacher fn, ok +func GetDispacher(p uint8) (Dispacher, bool) { + return dispachers.Load(p) } diff --git a/gold/link/listen.go b/gold/link/listen.go index e524a0d..3d0e50a 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -6,7 +6,6 @@ import ( "strconv" "sync/atomic" "time" - "unsafe" "github.com/sirupsen/logrus" @@ -14,7 +13,6 @@ import ( "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/p2p" "github.com/fumiama/WireGold/internal/algo" - "github.com/fumiama/WireGold/internal/bin" "github.com/fumiama/WireGold/internal/file" "github.com/fumiama/orbyte/pbuf" ) @@ -76,7 +74,7 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) { atomic.StoreInt64(&m.recvlooptime, now) } buf.V(func(b []byte) { - h := m.wait(b[:n]) + h := m.wait(b[:n], addr) if !h.HasInit() { if config.ShowDebugLog { logrus.Debugln("[listen] queue waiting") @@ -100,94 +98,62 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { } srcip := header.Src() dstip := header.Dst() - p, ok := m.IsInPeer(srcip.String()) - if config.ShowDebugLog { - logrus.Debugln("[listen] recv from endpoint", addr, "src", srcip, "dst", dstip) - } - if !ok { - logrus.Warnln("[listen] packet from", srcip, "to", dstip, "is refused") + p := m.extractPeer(srcip, dstip, addr) + if p == nil { return } - if bin.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { - if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) { - logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) - p.endpoint = addr - } else { // others are all no status link - logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) - p.endpoint = addr - } + if !p.Accept(srcip) { + logrus.Warnln("[listen] refused packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort))) + return } - now := time.Now() - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.lastalive)), unsafe.Pointer(&now)) - switch { - case p.IsToMe(dstip): - if !p.Accept(srcip) { - logrus.Warnln("[listen] refused packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort))) - return + if !p.IsToMe(dstip) { + logrus.Warnln("[listen] unhandled trans packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort))) + return + } + addt := header.AdditionalData() + var err error + data, err := p.decode(header.CipherIndex(), addt, body) + if err != nil { + if config.ShowDebugLog { + logrus.Debugln("[listen] drop invalid packet key idx:", header.CipherIndex(), "addt:", addt, "err:", err) } - addt := header.AdditionalData() - var err error - data, err := p.decode(header.CipherIndex(), addt, body) + return + } + if data.Len() < 8 { + if config.ShowDebugLog { + logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len()) + } + return + } + ok := false + data.V(func(b []byte) { + ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b) + }) + if !ok { + if config.ShowDebugLog { + logrus.Debugln("[listen] drop invalid hash packet") + } + return + } + data = data.SliceFrom(8) + if p.usezstd { + data.V(func(b []byte) { + data, err = algo.DecodeZstd(b) // skip hash + }) if err != nil { if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid packet key idx:", header.CipherIndex(), "addt:", addt, "err:", err) + logrus.Debugln("[listen] drop invalid zstd packet:", err) } return } - if data.Len() < 8 { - if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len()) - } - return - } - ok := false - data.V(func(b []byte) { - ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b) - }) - if !ok { - if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid hash packet") - } - return - } - data = data.SliceFrom(8) - if p.usezstd { - data.V(func(b []byte) { - data, err = algo.DecodeZstd(b) // skip hash - }) - if err != nil { - if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid zstd packet:", err) - } - return - } - if config.ShowDebugLog { - logrus.Debugln("[listen] zstd decoded len:", data.Len()) - } - } - fn, ok := dispachers[header.Proto.Proto()] - if !ok { - logrus.Warnln(file.Header(), "unsupported proto", header.Proto.Proto()) - return - } - fn(header, p, data) - return - case p.Accept(dstip): //TODO: 移除此处转发, 将转发放到 wait - if !p.allowtrans { - logrus.Warnln("[listen] refused to trans packet to", dstip.String()+":"+strconv.Itoa(int(header.DstPort))) - return - } - // 转发 - lnk := m.router.NextHop(dstip.String()) - if lnk == nil { - logrus.Warnln("[listen] transfer drop packet: nil nexthop") - return - } - lnk.WritePacket(head.ProtoTrans, body) if config.ShowDebugLog { - logrus.Debugln("[listen] trans", len(body), "bytes body to", dstip.String()+":"+strconv.Itoa(int(header.DstPort))) + logrus.Debugln("[listen] zstd decoded len:", data.Len()) } - default: - logrus.Warnln("[listen] packet dst", dstip.String()+":"+strconv.Itoa(int(header.DstPort)), "is not in peers") } + fn, ok := GetDispacher(header.Proto.Proto()) + if !ok { + logrus.Warnln(file.Header(), "unsupported proto", header.Proto.Proto()) + return + } + fn(header, p, data) } diff --git a/gold/link/me.go b/gold/link/me.go index e141077..5d87ac9 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -62,6 +62,8 @@ type Me struct { recvloopcnt uintptr // 是否进行 base16384 编码 base14 bool + // 本机初始 ttl + ttl uint8 // 本机网络端点初始化配置 networkconfigs []any } @@ -76,6 +78,7 @@ type MyConfig struct { SrcPort, DstPort, MTU, SpeedLoop uint16 Mask uint64 Base14 bool + MaxTTL uint8 } type NICConfig struct { @@ -131,6 +134,11 @@ func NewMe(cfg *MyConfig) (m Me) { m.mask = cfg.Mask m.recvlooptime = time.Now().UnixMilli() m.base14 = cfg.Base14 + if cfg.MaxTTL == 0 { + m.ttl = 64 + } else { + m.ttl = cfg.MaxTTL + } var buf [8]byte binary.BigEndian.PutUint64(buf[:], m.mask) logrus.Infoln("[me] xor mask", hex.EncodeToString(buf[:])) @@ -179,6 +187,10 @@ func (m *Me) MTU() uint16 { return m.mtu } +func (m *Me) TTL() uint8 { + return m.ttl +} + func (m *Me) EndPoint() p2p.EndPoint { return m.ep } @@ -298,7 +310,7 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) { copy(b, packet) }) go pcp.V(func(b []byte) { - lnk.WritePacket(head.ProtoData, b) + lnk.WritePacket(head.ProtoData, b, lnk.me.ttl) }) return } diff --git a/gold/link/nat.go b/gold/link/nat.go index 95ba55b..40f1e90 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -33,7 +33,7 @@ func (l *Link) keepAlive(dur int64) { logrus.Infoln(file.Header(), "re-connect me succeeded") } } - l.WritePacket(head.ProtoHello, []byte{byte(head.HelloPing)}) + l.WritePacket(head.ProtoHello, []byte{byte(head.HelloPing)}, 64) logrus.Infoln(file.Header(), "send keep alive to", l.peerip) } } @@ -50,7 +50,7 @@ func (l *Link) sendQuery(tick time.Duration, peers ...string) { } t := time.NewTicker(tick) for range t.C { - l.WritePacket(head.ProtoQuery, data) + l.WritePacket(head.ProtoQuery, data, l.me.ttl) logrus.Infoln(file.Header(), "send query to", l.peerip) } } diff --git a/gold/link/peer.go b/gold/link/peer.go index d662714..d4a3e97 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -2,13 +2,19 @@ package link import ( "net" + "sync/atomic" "time" + "unsafe" - "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/internal/algo" curve "github.com/fumiama/go-x25519" "github.com/sirupsen/logrus" "golang.org/x/crypto/chacha20poly1305" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/gold/p2p" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" ) type PeerConfig struct { @@ -123,3 +129,26 @@ func (m *Me) IsInPeer(peer string) (p *Link, ok bool) { m.connmapmu.RUnlock() return } + +func (m *Me) extractPeer(srcip, dstip net.IP, addr p2p.EndPoint) *Link { + p, ok := m.IsInPeer(srcip.String()) + if config.ShowDebugLog { + logrus.Debugln(file.Header(), "recv from endpoint", addr, "src", srcip, "dst", dstip) + } + if !ok { + logrus.Warnln(file.Header(), "packet from", srcip, "to", dstip, "is refused") + return nil + } + if bin.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { + if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) { + logrus.Infoln(file.Header(), "set endpoint of peer", p.peerip, "to", addr.String()) + p.endpoint = addr + } else { // others are all no status link + logrus.Infoln(file.Header(), "set endpoint of peer", p.peerip, "to", addr.String()) + p.endpoint = addr + } + } + now := time.Now() + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.lastalive)), unsafe.Pointer(&now)) + return p +} diff --git a/gold/link/recv.go b/gold/link/recv.go index b405705..3ba17fb 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -8,8 +8,10 @@ import ( "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" "github.com/fumiama/WireGold/internal/bin" base14 "github.com/fumiama/go-base16384" + "github.com/fumiama/orbyte/pbuf" "github.com/sirupsen/logrus" ) @@ -18,8 +20,7 @@ func (l *Link) Read() LinkData { return <-l.pipe } -// wait TODO: 判断是否为 trans 并提前 call dispatch -func (m *Me) wait(data []byte) (h head.PacketBytes) { +func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { if len(data) < int(head.PacketHeadLen)+8 { // not a valid packet if config.ShowDebugLog { logrus.Debugln("[recv] invalid data len", len(data)) @@ -44,7 +45,7 @@ func (m *Me) wait(data []byte) (h head.PacketBytes) { } return } - data = w.ToBytes().Trans() + data = w.ToBytes().Copy().Trans() if len(data) < bound { bound = len(data) endl = "." @@ -94,6 +95,31 @@ func (m *Me) wait(data []byte) (h head.PacketBytes) { } header.B(func(buf []byte, p *head.Packet) { + peer := m.extractPeer(p.Src(), p.Dst(), addr) + if peer == nil { + return + } + if !peer.IsToMe(p.Dst()) { // 提前处理转发 + if !peer.allowtrans { + logrus.Warnln("[recv] refused to trans packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort))) + return + } + // 转发 + lnk := m.router.NextHop(p.Dst().String()) + if lnk == nil { + logrus.Warnln("[recv] transfer drop packet: nil nexthop") + return + } + if head.DecTTL(data) { // need drop + logrus.Warnln("[recv] transfer drop packet: zero ttl") + return + } + go lnk.write2peer(pbuf.ParseBytes(data...).Copy(), seq) + if config.ShowDebugLog { + logrus.Debugln("[listen] trans", len(data), "bytes packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort))) + } + return + } if !p.Proto.HasMore() { ok := p.WriteDataSegment(data, buf) if !ok { diff --git a/gold/link/send.go b/gold/link/send.go index c241bf8..2a3fe97 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -32,7 +32,7 @@ func randseq(i uint16) uint32 { // WritePacket 基于 data 向 peer 发包 // // data 可为空, 因为不保证可达所以不返回错误。 -func (l *Link) WritePacket(proto uint8, data []byte) { +func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) { teatype := l.randkeyidx() sndcnt := uint16(l.incgetsndcnt()) mtu := l.mtu @@ -44,7 +44,7 @@ func (l *Link) WritePacket(proto uint8, data []byte) { } pb := head.NewPacketBuilder(). Src(l.me.me, l.me.srcport).Dst(l.peerip, l.me.dstport). - Proto(proto).TTL(64).With(data) + Proto(proto).TTL(ttl).With(data) if l.usezstd { pb.Zstd() } diff --git a/gold/proto/data.go b/gold/proto/data/data.go similarity index 60% rename from gold/proto/data.go rename to gold/proto/data/data.go index 59b7cac..e7f17e9 100644 --- a/gold/proto/data.go +++ b/gold/proto/data/data.go @@ -1,4 +1,4 @@ -package proto +package data import ( "github.com/fumiama/orbyte/pbuf" @@ -8,7 +8,7 @@ import ( ) func init() { - link.AddProto(head.ProtoData, func(header *head.Packet, peer *link.Link, data pbuf.Bytes) { + link.RegisterDispacher(head.ProtoData, func(header *head.Packet, peer *link.Link, data pbuf.Bytes) { peer.ToLower(header, data) }) } diff --git a/gold/proto/hello.go b/gold/proto/hello.go deleted file mode 100644 index b5e9922..0000000 --- a/gold/proto/hello.go +++ /dev/null @@ -1,30 +0,0 @@ -package proto - -import ( - "github.com/fumiama/orbyte/pbuf" - "github.com/sirupsen/logrus" - - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/gold/link" - "github.com/fumiama/WireGold/internal/file" -) - -func init() { - link.AddProto(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - onHello(data, peer) - }) -} - -func onHello(data pbuf.Bytes, p *link.Link) { - data.V(func(b []byte) { - switch { - case len(b) == 0: - logrus.Warnln(file.Header(), "recv old packet, do nothing") - case b[0] == byte(head.HelloPing): - go p.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}) - logrus.Infoln(file.Header(), "recv, send ack") - default: - logrus.Infoln(file.Header(), "recv ack, do nothing") - } - }) -} diff --git a/gold/proto/hello/hello.go b/gold/proto/hello/hello.go new file mode 100644 index 0000000..1e1cd10 --- /dev/null +++ b/gold/proto/hello/hello.go @@ -0,0 +1,26 @@ +package hello + +import ( + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/link" + "github.com/fumiama/WireGold/internal/file" +) + +func init() { + link.RegisterDispacher(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + data.V(func(b []byte) { + switch { + case len(b) == 0: + logrus.Warnln(file.Header(), "recv old packet, do nothing") + case b[0] == byte(head.HelloPing): + go peer.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}, peer.Me().TTL()) + logrus.Infoln(file.Header(), "recv, send ack") + default: + logrus.Infoln(file.Header(), "recv ack, do nothing") + } + }) + }) +} diff --git a/gold/proto/nat.go b/gold/proto/nat.go deleted file mode 100644 index 00ea949..0000000 --- a/gold/proto/nat.go +++ /dev/null @@ -1,121 +0,0 @@ -package proto - -import ( - "encoding/json" - - "github.com/fumiama/orbyte/pbuf" - "github.com/sirupsen/logrus" - - "github.com/fumiama/WireGold/config" - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/gold/link" - "github.com/fumiama/WireGold/gold/p2p" - - "github.com/fumiama/WireGold/internal/bin" - "github.com/fumiama/WireGold/internal/file" -) - -func init() { - link.AddProto(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - data.V(func(b []byte) { - onNotify(peer, b) - }) - }) - link.AddProto(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - data.V(func(b []byte) { - onQuery(peer, b) - }) - }) -} - -// 收到通告包的处理函数 -func onNotify(l *link.Link, packet []byte) { - // TODO: 完成data解包与endpoint注册 - // 1. Data 解包 - // ---- 使用 head.Notify 解释 packet - notify := make(head.Notify, 32) - err := json.Unmarshal(packet, ¬ify) - if err != nil { - logrus.Errorln(file.Header(), "notify json unmarshal err:", err) - return - } - // 2. endpoint注册 - // ---- 遍历 Notify,注册对方的 endpoint 到 - // ---- connections,注意使用读写锁connmapmu - for peer, ep := range notify { - nw, epstr := ep[0], ep[1] - if nw != l.Me().EndPoint().Network() { - logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr) - continue - } - addr, err := p2p.NewEndPoint(nw, epstr, l.Me().NetworkConfigs()...) - if err == nil { - p, ok := l.Me().IsInPeer(peer) - if ok { - if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) { - p.SetEndPoint(addr) - logrus.Infoln(file.Header(), "notify set ep of peer", peer, "to", ep) - } - continue - } - } - if config.ShowDebugLog { - logrus.Debugln(file.Header(), "notify drop invalid peer:", peer, "ep:", ep) - } - } -} - -// 收到询问包的处理函数 -func onQuery(l *link.Link, packet []byte) { - // 完成data解包与notify分发 - - // 1. Data 解包 - // ---- 使用 head.Query 解释 packet - // ---- 根据 Query 确定需要封装的 Notify - var peers head.Query - err := json.Unmarshal(packet, &peers) - if err != nil { - logrus.Errorln(file.Header(), "query json unmarshal err:", err) - return - } - - if l == nil || l.Me() == nil { - logrus.Errorln(file.Header(), "nil link/me") - return - } - - // 2. notify分发 - // ---- 封装 Notify 到 新的 packet - // ---- 调用 l.Send 发送到对方 - notify := make(head.Notify, len(peers)) - for _, p := range peers { - lnk, ok := l.Me().IsInPeer(p) - eps := "" - if l.Me().EndPoint().Network() == "udp" { // udp has real p2p - if bin.IsNilInterface(lnk.EndPoint()) { - continue - } - eps = lnk.EndPoint().String() - } - if eps == "" { - eps = l.RawEndPoint() // use registered ep only - } - if eps == "" { - continue - } - if ok && bin.IsNonNilInterface(lnk.EndPoint()) { - notify[p] = [2]string{ - lnk.EndPoint().Network(), - eps, - } - } - } - if len(notify) > 0 { - logrus.Infoln(file.Header(), "query wrap", len(notify), "notify") - w := bin.SelectWriter() - _ = json.NewEncoder(w).Encode(¬ify) - w.P(func(b *pbuf.Buffer) { - l.WritePacket(head.ProtoNotify, b.Bytes()) - }) - } -} diff --git a/gold/proto/nat/nat.go b/gold/proto/nat/nat.go new file mode 100644 index 0000000..eeb5b47 --- /dev/null +++ b/gold/proto/nat/nat.go @@ -0,0 +1,112 @@ +package nat + +import ( + "encoding/json" + + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/link" + "github.com/fumiama/WireGold/gold/p2p" + + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" +) + +func init() { + // 收到通告包的处理 + link.RegisterDispacher(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + data.V(func(b []byte) { + // 1. Data 解包 + // ---- 使用 head.Notify 解释 packet + notify := make(head.Notify, 32) + err := json.Unmarshal(b, ¬ify) + if err != nil { + logrus.Errorln(file.Header(), "notify json unmarshal err:", err) + return + } + // 2. endpoint注册 + // ---- 遍历 Notify,注册对方的 endpoint 到 + // ---- connections,注意使用读写锁connmapmu + for ps, ep := range notify { + nw, epstr := ep[0], ep[1] + if nw != peer.Me().EndPoint().Network() { + logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr) + continue + } + addr, err := p2p.NewEndPoint(nw, epstr, peer.Me().NetworkConfigs()...) + if err == nil { + p, ok := peer.Me().IsInPeer(ps) + if ok { + if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) { + p.SetEndPoint(addr) + logrus.Infoln(file.Header(), "notify set ep of peer", ps, "to", ep) + } + continue + } + } + if config.ShowDebugLog { + logrus.Debugln(file.Header(), "notify drop invalid peer:", ps, "ep:", ep) + } + } + }) + }) + // 收到询问包的处理 + link.RegisterDispacher(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + data.V(func(b []byte) { + // 完成data解包与notify分发 + + // 1. Data 解包 + // ---- 使用 head.Query 解释 packet + // ---- 根据 Query 确定需要封装的 Notify + var peers head.Query + err := json.Unmarshal(b, &peers) + if err != nil { + logrus.Errorln(file.Header(), "query json unmarshal err:", err) + return + } + + if peer == nil || peer.Me() == nil { + logrus.Errorln(file.Header(), "nil link/me") + return + } + + // 2. notify分发 + // ---- 封装 Notify 到 新的 packet + // ---- 发送到对方 + notify := make(head.Notify, len(peers)) + for _, p := range peers { + lnk, ok := peer.Me().IsInPeer(p) + eps := "" + if peer.Me().EndPoint().Network() == "udp" { // udp has real p2p + if bin.IsNilInterface(lnk.EndPoint()) { + continue + } + eps = lnk.EndPoint().String() + } + if eps == "" { + eps = peer.RawEndPoint() // use registered ep only + } + if eps == "" { + continue + } + if ok && bin.IsNonNilInterface(lnk.EndPoint()) { + notify[p] = [2]string{ + lnk.EndPoint().Network(), + eps, + } + } + } + if len(notify) > 0 { + logrus.Infoln(file.Header(), "query wrap", len(notify), "notify") + w := bin.SelectWriter() + _ = json.NewEncoder(w).Encode(¬ify) + w.P(func(b *pbuf.Buffer) { + peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL()) + }) + } + }) + }) +} diff --git a/upper/services/tunnel/tunnel.go b/upper/services/tunnel/tunnel.go index 677f20b..839ea98 100644 --- a/upper/services/tunnel/tunnel.go +++ b/upper/services/tunnel/tunnel.go @@ -12,7 +12,9 @@ import ( _ "github.com/fumiama/WireGold/gold/p2p/tcp" // support tcp connection _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection _ "github.com/fumiama/WireGold/gold/p2p/udplite" // support udplite connection - _ "github.com/fumiama/WireGold/gold/proto" // support basic protos + _ "github.com/fumiama/WireGold/gold/proto/data" // support data proto + _ "github.com/fumiama/WireGold/gold/proto/hello" // support hello proto + _ "github.com/fumiama/WireGold/gold/proto/nat" // support nat proto "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" @@ -125,7 +127,7 @@ func (s *Tunnel) handleWrite() { binary.LittleEndian.PutUint32(buf[:4], seq) seq++ copy(buf[4:], b[:s.mtu-4]) - s.l.WritePacket(head.ProtoData, buf) + s.l.WritePacket(head.ProtoData, buf, s.l.Me().TTL()) if config.ShowDebugLog { logrus.Debugln("[tunnel] seq", seq-1, "written") } @@ -134,7 +136,7 @@ func (s *Tunnel) handleWrite() { binary.LittleEndian.PutUint32(buf[:4], seq) seq++ copy(buf[4:], b) - s.l.WritePacket(head.ProtoData, buf[:len(b)+4]) + s.l.WritePacket(head.ProtoData, buf[:len(b)+4], s.l.Me().TTL()) if config.ShowDebugLog { logrus.Debugln("[tunnel] seq", seq-1, "written") } diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index 9fab70b..48d14c0 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -13,7 +13,9 @@ import ( _ "github.com/fumiama/WireGold/gold/p2p/tcp" // support tcp connection _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection _ "github.com/fumiama/WireGold/gold/p2p/udplite" // support udplite connection - _ "github.com/fumiama/WireGold/gold/proto" // support basic protos + _ "github.com/fumiama/WireGold/gold/proto/data" // support data proto + _ "github.com/fumiama/WireGold/gold/proto/hello" // support hello proto + _ "github.com/fumiama/WireGold/gold/proto/nat" // support nat proto "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/link" @@ -115,6 +117,7 @@ func (wg *WG) init(srcport, dstport uint16) { SpeedLoop: wg.c.SpeedLoop, Mask: wg.c.Mask, Base14: wg.c.Base14, + MaxTTL: wg.c.MaxTTL, }) for _, peer := range wg.c.Peers {