From 7355b33d9b46792f5cd85c6d0dc44663b08cc4a1 Mon Sep 17 00:00:00 2001 From: e1732a364fed <75717694+e1732a364fed@users.noreply.github.com> Date: Sat, 1 Jan 2000 00:00:00 +0000 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90ss=E7=9A=84server=E7=9A=84udp?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 4 +- netLayer/relay_udp.go | 4 +- proxy/shadowsocks/client.go | 2 +- proxy/shadowsocks/server.go | 125 ++++++++++++++++++++++-------------- proxy/shadowsocks/udp.go | 96 ++++++++++++++++++++++----- 5 files changed, 162 insertions(+), 69 deletions(-) diff --git a/main.go b/main.go index 003a509..c1b048a 100644 --- a/main.go +++ b/main.go @@ -79,7 +79,7 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy. var chanudp chan proxy.IncomeUDPInfo if tcp { - chantcp = make(chan proxy.IncomeTCPInfo) + chantcp = make(chan proxy.IncomeTCPInfo, 2) go func() { for tcpInfo := range chantcp { go passToOutClient(incomingInserverConnState{ @@ -94,7 +94,7 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy. } if udp { - chanudp = make(chan proxy.IncomeUDPInfo) + chanudp = make(chan proxy.IncomeUDPInfo, 2) go func() { for udpInfo := range chanudp { diff --git a/netLayer/relay_udp.go b/netLayer/relay_udp.go index b238545..7ba8b29 100644 --- a/netLayer/relay_udp.go +++ b/netLayer/relay_udp.go @@ -309,7 +309,9 @@ relay udp 有两种针对不同通道的技术 如此,一个 UDPConn就相当于一个 MsgProducer, 它的to 可以由 tproxy 或者 msg内部的数据提取出来 -而且 这个模型也可以实现 单来源,所以更实用 +不过,这种抽象一样需要map进行记忆,才能区分不同来源的from。针对不同的from,要使用以前对它发信号所使用的端口。 + +所以两种技术可能是等价的。 */ func RelayMsg(rc, lc MsgHub, downloadByteCount, uploadByteCount *uint64) uint64 { go CopyMsgFromP2C(lc, rc, uploadByteCount) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 288d434..fe5969b 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -106,7 +106,7 @@ func (c *Client) EstablishUDPChannel(underlay net.Conn, firstPayload []byte, tar pc = c.cipher.PacketConn(pc) } - mc = &shadowUDPPacketConn{ + mc = &clientUDPMsgConn{ PacketConn: pc, raddr: underlay.RemoteAddr(), } diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 2e0ddec..0a8cf42 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -3,6 +3,7 @@ package shadowsocks import ( "bytes" "io" + "log" "net" "net/url" "sync" @@ -11,6 +12,7 @@ import ( "github.com/e1732a364fed/v2ray_simple/proxy" "github.com/e1732a364fed/v2ray_simple/utils" "github.com/shadowsocks/go-shadowsocks2/core" + "go.uber.org/zap" ) func init() { @@ -63,12 +65,13 @@ type Server struct { cipher core.Cipher m sync.RWMutex - udpMsgConnMap map[netLayer.HashableAddr]*shadowUDPPacketConn + udpMsgConnMap map[netLayer.HashableAddr]*serverMsgConn } func newServer(info MethodPass, lc *proxy.ListenConf) *Server { s := &Server{ - cipher: initShadowCipher(info), + cipher: initShadowCipher(info), + udpMsgConnMap: make(map[netLayer.HashableAddr]*serverMsgConn), } return s @@ -77,9 +80,19 @@ func (*Server) Name() string { return Name } -func (s *Server) SelfListen() (is, tcp, udp bool) { - udp = true - is = true +func (s *Server) SelfListen() (is, _, udp bool) { + switch n := s.Network(); n { + case "", netLayer.DualNetworkName: + udp = true + + case "tcp": + + case "udp": + udp = true + } + + is = udp + return } @@ -125,64 +138,80 @@ realPart: return } -func (ss *Server) StartListen(_ chan<- proxy.IncomeTCPInfo, udpInfoChan chan<- proxy.IncomeUDPInfo) io.Closer { - // uc, err := net.ListenUDP("udp", ss.LUA) - // if err != nil { - // log.Panicln("shadowsocks listen udp failed", err) - // } - // pc := ss.cipher.PacketConn(uc) +func (m *Server) removeUDPByHash(hash netLayer.HashableAddr) { + m.Lock() + delete(m.udpMsgConnMap, hash) + m.Unlock() +} - // sp := &shadowUDPPacketConn{ - // PacketConn: pc, - // } +func (s *Server) StartListen(_ chan<- proxy.IncomeTCPInfo, udpInfoChan chan<- proxy.IncomeUDPInfo) io.Closer { + uc, err := net.ListenUDP("udp", s.LUA) + if err != nil { + log.Panicln("shadowsocks listen udp failed", err) + } + pc := s.cipher.PacketConn(uc) + //逻辑完全类似tproxy,使用一个map存储不同终端的链接 go func() { - // for { - // bs := utils.GetPacket() + for { + bs := utils.GetPacket() - // n, addr, err := pc.ReadFrom(bs) - // if err != nil { - // return - // } - // ad, err := netLayer.NewAddrFromAny(addr) - // if err != nil { - // return - // } - // hash := ad.GetHashable() + n, addr, err := pc.ReadFrom(bs) + if err != nil { + return + } + ad, err := netLayer.NewAddrFromAny(addr) + if err != nil { + if ce := utils.CanLogWarn("shadowsocks GetAddrFrom err"); ce != nil { + ce.Write(zap.Error(err)) + } + return + } + hash := ad.GetHashable() - // ss.m.RLock() - // conn, found := ss.udpMsgConnMap[hash] - // ss.m.RUnlock() + s.m.RLock() + conn, found := s.udpMsgConnMap[hash] + s.m.RUnlock() - // if !found { - // conn = &shadowUDPPacketConn{ - // ourSrcAddr: src, - // readChan: make(chan netLayer.AddrData, 5), - // closeChan: make(chan struct{}), - // parentMachine: m, - // hash: hash, - // } - // conn.InitEasyDeadline() + if !found { + conn = &serverMsgConn{ + raddr: addr, + ourPacketConn: pc, + readChan: make(chan netLayer.AddrData, 5), + closeChan: make(chan struct{}), + server: s, + hash: hash, + } + conn.InitEasyDeadline() - // m.Lock() - // m.udpMsgConnMap[hash] = conn - // m.Unlock() + s.m.Lock() + s.udpMsgConnMap[hash] = conn + s.m.Unlock() - // } + } - // destAddr := netLayer.NewAddrFromUDPAddr(dst) + readbuf := bytes.NewBuffer(bs[:n]) - // conn.readChan <- netLayer.AddrData{Data: bs[:n], Addr: destAddr} + destAddr, err := GetAddrFrom(readbuf) + if err != nil { + if ce := utils.CanLogWarn("shadowsocks GetAddrFrom err"); ce != nil { + ce.Write(zap.Error(err)) + } + continue + } - // if !found { - // return conn, destAddr, nil + conn.readChan <- netLayer.AddrData{Data: readbuf.Bytes(), Addr: destAddr} - // } + if !found { + udpInfoChan <- proxy.IncomeUDPInfo{ + MsgConn: conn, Target: destAddr, + } + } - // } + } }() - return nil + return uc } diff --git a/proxy/shadowsocks/udp.go b/proxy/shadowsocks/udp.go index d5623e7..71026f3 100644 --- a/proxy/shadowsocks/udp.go +++ b/proxy/shadowsocks/udp.go @@ -2,28 +2,29 @@ package shadowsocks import ( "bytes" + "io" "net" + "os" + "time" "github.com/e1732a364fed/v2ray_simple/netLayer" "github.com/e1732a364fed/v2ray_simple/utils" ) -type shadowUDPPacketConn struct { +type clientUDPMsgConn struct { net.PacketConn raddr net.Addr - - handshakeBuf *bytes.Buffer } -func (c *shadowUDPPacketConn) CloseConnWithRaddr(raddr netLayer.Addr) error { +func (c *clientUDPMsgConn) CloseConnWithRaddr(raddr netLayer.Addr) error { return c.PacketConn.Close() } -func (c *shadowUDPPacketConn) Fullcone() bool { +func (c *clientUDPMsgConn) Fullcone() bool { return true } -func (c *shadowUDPPacketConn) ReadMsgFrom() (bs []byte, targetAddr netLayer.Addr, err error) { +func (c *clientUDPMsgConn) ReadMsgFrom() (bs []byte, targetAddr netLayer.Addr, err error) { buf := utils.GetPacket() var n int @@ -44,17 +45,8 @@ func (c *shadowUDPPacketConn) ReadMsgFrom() (bs []byte, targetAddr netLayer.Addr } -func (c *shadowUDPPacketConn) WriteMsgTo(bs []byte, addr netLayer.Addr) (err error) { - var buf *bytes.Buffer - - if c.handshakeBuf != nil { - buf = c.handshakeBuf - c.handshakeBuf = nil - - } else { - buf = utils.GetBuf() - - } +func makeWriteBuf(bs []byte, addr netLayer.Addr) *bytes.Buffer { + buf := utils.GetBuf() abs, atype := addr.AddressBytes() @@ -70,8 +62,78 @@ func (c *shadowUDPPacketConn) WriteMsgTo(bs []byte, addr netLayer.Addr) (err err buf.WriteByte(byte(len(bs) << 8 >> 8)) buf.Write(bs) + return buf +} + +func (c *clientUDPMsgConn) WriteMsgTo(bs []byte, addr netLayer.Addr) (err error) { + + buf := makeWriteBuf(bs, addr) + _, err = c.PacketConn.WriteTo(buf.Bytes(), c.raddr) utils.PutBuf(buf) return err } + +// implements netLayer.serverMsgConn, 完全类似tproxy +type serverMsgConn struct { + netLayer.EasyDeadline + + hash netLayer.HashableAddr + + ourPacketConn net.PacketConn + raddr net.Addr + + readChan chan netLayer.AddrData + + closeChan chan struct{} + + fullcone bool + + server *Server +} + +func (mc *serverMsgConn) Close() error { + select { + case <-mc.closeChan: + default: + close(mc.closeChan) + mc.server.removeUDPByHash(mc.hash) + + } + return nil +} + +func (mc *serverMsgConn) CloseConnWithRaddr(raddr netLayer.Addr) error { + return mc.Close() +} + +func (mc *serverMsgConn) Fullcone() bool { + return mc.fullcone +} + +func (mc *serverMsgConn) SetFullcone(f bool) { + mc.fullcone = f +} + +func (mc *serverMsgConn) ReadMsgFrom() ([]byte, netLayer.Addr, error) { + + must_timeoutChan := time.After(netLayer.UDP_timeout) + select { + case <-mc.closeChan: + return nil, netLayer.Addr{}, io.EOF + case <-must_timeoutChan: + return nil, netLayer.Addr{}, os.ErrDeadlineExceeded + case newmsg := <-mc.readChan: + return newmsg.Data, newmsg.Addr, nil + + } + +} + +func (mc *serverMsgConn) WriteMsgTo(p []byte, addr netLayer.Addr) error { + buf := makeWriteBuf(p, addr) + _, err := mc.ourPacketConn.WriteTo(buf.Bytes(), mc.raddr) + + return err +}