From d7aaeb481cd8c786a4ed91f102d884eb65249350 Mon Sep 17 00:00:00 2001 From: e1732a364fed <75717694+e1732a364fed@users.noreply.github.com> Date: Sat, 1 Jan 2000 00:00:00 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E8=AE=A2=E4=BB=A3=E7=A0=81;=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0MsgProducer=E5=92=8CMsgConsumer=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 17 +- netLayer/addr.go | 19 +++ netLayer/msgconn.go | 316 ++++++++++++++++++++++++++++++++++++ netLayer/relay_udp.go | 281 +++----------------------------- proxy/shadowsocks/client.go | 1 - proxy/shadowsocks/server.go | 104 ++++++++---- proxy/shadowsocks/udp.go | 1 - utils/error.go | 40 ++++- utils/io.go | 39 ++++- 9 files changed, 509 insertions(+), 309 deletions(-) create mode 100644 netLayer/msgconn.go diff --git a/main.go b/main.go index 8519ee8..003a509 100644 --- a/main.go +++ b/main.go @@ -71,6 +71,9 @@ Use cases: refer to tcp_test.go, udp_test.go or cmd/verysimple. non-blocking. closer used to stop listening. It means listening failed if closer == nil, */ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.RoutingEnv) (closer io.Closer) { + var extraCloser io.Closer + + //tproxy 和 shadowsocks 都用到了 SelfListen if is, tcp, udp := inServer.SelfListen(); is { var chantcp chan proxy.IncomeTCPInfo var chanudp chan proxy.IncomeUDPInfo @@ -106,7 +109,13 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy. } closer = inServer.(proxy.ListenerServer).StartListen(chantcp, chanudp) - return + + if tcp && udp { + return + + } else { + extraCloser = closer + } } var handleHere bool @@ -191,6 +200,9 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy. ) } + if extraCloser != nil { + closer = &utils.MultiCloser{Closers: []io.Closer{closer, extraCloser}} + } } else { if err != nil { if ce := utils.CanLogErr("ListenSer failed"); ce != nil { @@ -201,6 +213,9 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy. } } + if extraCloser != nil { + closer = extraCloser + } } return } diff --git a/netLayer/addr.go b/netLayer/addr.go index 3fbb5ff..77834ca 100644 --- a/netLayer/addr.go +++ b/netLayer/addr.go @@ -488,6 +488,25 @@ func (a *Addr) IsUDP() bool { return IsStrUDP_network(a.Network) } +func (a *Addr) ToAddr() net.Addr { + n := a.Network + if n == "" { + n = "tcp" + } + switch StrToTransportProtocol(a.Network) { + case TCP: + return a.ToTCPAddr() + case UDP: + return a.ToUDPAddr() + case UNIX: + u, _ := net.ResolveUnixAddr("unix", a.Name) + return u + case IP: + return &net.IPAddr{IP: a.IP, Zone: a.Name} + } + return nil +} + // 如果a里只含有域名,则会自动解析域名为IP。注意,若出现错误,则会返回nil func (a *Addr) ToUDPAddr() *net.UDPAddr { ua, err := net.ResolveUDPAddr("udp", a.String()) diff --git a/netLayer/msgconn.go b/netLayer/msgconn.go new file mode 100644 index 0000000..37b89ec --- /dev/null +++ b/netLayer/msgconn.go @@ -0,0 +1,316 @@ +package netLayer + +import ( + "errors" + "net" + "sync" + "time" + + "github.com/e1732a364fed/v2ray_simple/utils" + "go.uber.org/zap" +) + +// MsgConn一般用于 udp. 是一种类似 net.PacketConn 的包装. +// 实现 MsgConn接口 的类型 可以被用于 RelayUDP 进行转发。 +// +// ReadMsgFrom直接返回数据, 这样可以尽量避免多次数据拷贝。 +// +// 使用Addr,是因为有可能请求地址是个域名,而不是ip; 而且通过Addr, MsgConn实际上有可能支持通用的情况, +// 即可以用于 客户端 一会 请求tcp,一会又请求udp,一会又请求什么其它网络层 这种奇葩情况. +type MsgConn interface { + NetDeadliner + + ReadMsgFrom() ([]byte, Addr, error) + WriteMsgTo([]byte, Addr) error + CloseConnWithRaddr(raddr Addr) error //关闭特定连接 + Close() error //关闭所有连接 + Fullcone() bool //若Fullcone, 则在转发因另一端关闭而结束后, RelayUDP函数不会Close它. +} + +// symmetric, proxy/dokodemo 有用到. 实现 MsgConn +type UniTargetMsgConn struct { + net.Conn + Target Addr +} + +func (u UniTargetMsgConn) Fullcone() bool { + return false +} + +func (u UniTargetMsgConn) ReadMsgFrom() ([]byte, Addr, error) { + bs := utils.GetPacket() + + n, err := u.Conn.Read(bs) + if err != nil { + return nil, Addr{}, err + } + return bs[:n], u.Target, err +} + +func (u UniTargetMsgConn) WriteMsgTo(bs []byte, _ Addr) error { + _, err := u.Conn.Write(bs) + return err +} + +func (u UniTargetMsgConn) CloseConnWithRaddr(raddr Addr) error { + return u.Conn.Close() +} + +func (u UniTargetMsgConn) Close() error { + return u.Conn.Close() +} + +// UDPMsgConn 实现 MsgConn 和 net.PacketConn。 可满足fullcone/symmetric. 在proxy/direct 被用到. +type UDPMsgConn struct { + *net.UDPConn + IsServer, fullcone, closed bool + + symmetricMap map[HashableAddr]*net.UDPConn + symmetricMapMutex sync.RWMutex + + symmetricMsgReadChan chan AddrData +} + +// NewUDPMsgConn 创建一个 UDPMsgConn 并使用传入的 laddr 监听udp; 若未给出laddr, 将使用一个随机可用的端口监听. +// 如果是普通的单目标的客户端,用 (nil,false,false) 即可. +// +// 满足fullcone/symmetric, 由 fullcone 的值决定. +func NewUDPMsgConn(laddr *net.UDPAddr, fullcone bool, isserver bool, sockopt *Sockopt) (*UDPMsgConn, error) { + uc := new(UDPMsgConn) + var udpConn *net.UDPConn + var err error + if sockopt != nil { + if laddr == nil { + laddr = &net.UDPAddr{} + } + a := NewAddrFromUDPAddr(laddr) + pConn, e := a.ListenUDP_withOpt(sockopt) + if e != nil { + err = e + } else { + udpConn = pConn.(*net.UDPConn) + } + } else { + udpConn, err = net.ListenUDP("udp", laddr) + + } + + if err != nil { + return nil, err + } + udpConn.SetReadBuffer(MaxUDP_packetLen) + udpConn.SetWriteBuffer(MaxUDP_packetLen) + + uc.UDPConn = udpConn + uc.fullcone = fullcone + uc.IsServer = isserver + if !fullcone { + uc.symmetricMap = make(map[HashableAddr]*net.UDPConn) + uc.symmetricMsgReadChan = make(chan AddrData, 50) //缓存大一点比较好. 假设有十个udp连接, 每一个都连续读了5个信息,这样就会装满50个缓存了。 + + //我们暂时不把udpConn放入 symmetricMap 中,而是等待第一次Write成功后再放入. + } + return uc, nil +} + +func (u *UDPMsgConn) Fullcone() bool { + return u.fullcone +} + +func (u *UDPMsgConn) readSymmetricMsgFromConn(conn *net.UDPConn, thishash HashableAddr) { + if ce := utils.CanLogDebug("readSymmetricMsgFromConn called"); ce != nil { + ce.Write(zap.String("addr", thishash.String())) + } + for { + bs := utils.GetPacket() + + conn.SetReadDeadline(time.Now().Add(UDP_timeout)) + + n, ad, err := conn.ReadFromUDP(bs) + + if err != nil || u.closed { + break + } + + u.symmetricMsgReadChan <- AddrData{Data: bs[:n], Addr: NewAddrFromUDPAddr(ad)} + } + + u.symmetricMapMutex.Lock() + delete(u.symmetricMap, thishash) + u.symmetricMapMutex.Unlock() + + conn.Close() + +} + +func (u *UDPMsgConn) ReadMsgFrom() ([]byte, Addr, error) { + if u.fullcone { + bs := utils.GetPacket() + + u.UDPConn.SetReadDeadline(time.Now().Add(UDP_fullcone_timeout)) + + n, ad, err := u.UDPConn.ReadFromUDP(bs) + + if err != nil { + return nil, Addr{}, err + } + + return bs[:n], NewAddrFromUDPAddr(ad), nil + } else { + ad, ok := <-u.symmetricMsgReadChan + if ok { + ad.Addr.Network = "udp" + return ad.Data, ad.Addr, nil + } else { + return nil, Addr{}, net.ErrClosed + } + } + +} + +func (u *UDPMsgConn) WriteMsgTo(bs []byte, raddr Addr) error { + + var theConn *net.UDPConn + + if !u.fullcone && !u.IsServer { + //非fullcone时, 强制 symmetric, 对每个远程地址 都使用一个 对应的新laddr + + //UDPMsgConn 一般用于 direct,此时 一定有 !u.IsServer 成立 + + thishash := raddr.GetHashable() + thishash.Network = "udp" //有可能调用者忘配置Network项. + + if len(u.symmetricMap) == 0 { + + _, err := u.UDPConn.WriteTo(bs, raddr.ToUDPAddr()) + if err == nil { + u.symmetricMapMutex.Lock() + u.symmetricMap[thishash] = u.UDPConn + u.symmetricMapMutex.Unlock() + } + go u.readSymmetricMsgFromConn(u.UDPConn, thishash) + return err + } + + u.symmetricMapMutex.RLock() + theConn = u.symmetricMap[thishash] + u.symmetricMapMutex.RUnlock() + + if theConn == nil { + var e error + theConn, e = net.ListenUDP("udp", nil) + if e != nil { + return e + } + + u.symmetricMapMutex.Lock() + u.symmetricMap[thishash] = theConn + u.symmetricMapMutex.Unlock() + + go u.readSymmetricMsgFromConn(theConn, thishash) + } + + } else { + theConn = u.UDPConn + } + + _, err := theConn.WriteTo(bs, raddr.ToUDPAddr()) + return err +} + +func (u *UDPMsgConn) CloseConnWithRaddr(raddr Addr) error { + if !u.IsServer { + if u.fullcone { + //u.UDPConn.SetReadDeadline(time.Now()) + + } else { + u.symmetricMapMutex.Lock() + + thehash := raddr.GetHashable() + theConn := u.symmetricMap[thehash] + + if theConn != nil { + delete(u.symmetricMap, thehash) + theConn.Close() + + } + + u.symmetricMapMutex.Unlock() + } + } + return nil +} + +func (u *UDPMsgConn) Close() error { + + if !u.closed { + u.closed = true + + if !u.fullcone { + close(u.symmetricMsgReadChan) + } + return u.UDPConn.Close() + } + + return nil +} + +// 实现 net.PacketConn +func (uc *UDPMsgConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + var bs []byte + var a Addr + bs, a, err = uc.ReadMsgFrom() + if err == nil { + n = copy(p, bs) + addr = a.ToUDPAddr() + } + return +} + +// 实现 net.PacketConn +func (uc *UDPMsgConn) WriteTo(p []byte, raddr net.Addr) (n int, err error) { + if ua, ok := raddr.(*net.UDPAddr); ok { + err = uc.WriteMsgTo(p, NewAddrFromUDPAddr(ua)) + if err == nil { + n = len(p) + } + + } else { + err = errors.New("UDPMsgConn.WriteTo, raddr can't cast to *net.UDPAddr") + } + return + +} + +// Wraps net.PacketConn and implements MsgConn +type MsgConnForPacketConn struct { + net.PacketConn +} + +func (mc *MsgConnForPacketConn) ReadMsgFrom() ([]byte, Addr, error) { + bs := utils.GetPacket() + n, addr, err := mc.ReadFrom(bs) + if err != nil { + return nil, Addr{}, err + } + a, err := NewAddrFromAny(addr) + if err != nil { + return nil, Addr{}, err + } + return bs[:n], a, nil +} + +func (mc *MsgConnForPacketConn) WriteMsgTo(p []byte, a Addr) error { + _, err := mc.WriteTo(p, a.ToAddr()) + return err +} +func (mc *MsgConnForPacketConn) CloseConnWithRaddr(raddr Addr) error { + return mc.PacketConn.Close() + +} +func (mc *MsgConnForPacketConn) Close() error { + return mc.PacketConn.Close() +} +func (mc *MsgConnForPacketConn) Fullcone() bool { + return true +} diff --git a/netLayer/relay_udp.go b/netLayer/relay_udp.go index c5b67f7..86d1c0c 100644 --- a/netLayer/relay_udp.go +++ b/netLayer/relay_udp.go @@ -1,8 +1,6 @@ package netLayer import ( - "errors" - "net" "sync" "sync/atomic" "time" @@ -11,6 +9,8 @@ import ( "go.uber.org/zap" ) +//本文件内含 转发 udp 数据的方法 + const ( MaxUDP_packetLen = 64 * 1024 // 关于 udp包数据长度,可参考 https://cloud.tencent.com/developer/article/1021196 @@ -32,25 +32,6 @@ var ( UDP_fullcone_timeout = time.Minute * 30 ) -//本文件内含 一些 转发 udp 数据的 接口与方法 - -// MsgConn一般用于 udp. 是一种类似 net.PacketConn 的包装. -// 实现 MsgConn接口 的类型 可以被用于 RelayUDP 进行转发。 -// -// ReadMsgFrom直接返回数据, 这样可以尽量避免多次数据拷贝。 -// -// 使用Addr,是因为有可能请求地址是个域名,而不是ip; 而且通过Addr, MsgConn实际上有可能支持通用的情况, -// 即可以用于 客户端 一会 请求tcp,一会又请求udp,一会又请求什么其它网络层 这种奇葩情况. -type MsgConn interface { - NetDeadliner - - ReadMsgFrom() ([]byte, Addr, error) - WriteMsgTo([]byte, Addr) error - CloseConnWithRaddr(raddr Addr) error //关闭特定连接 - Close() error //关闭所有连接 - Fullcone() bool //若Fullcone, 则在转发因另一端关闭而结束后, RelayUDP函数不会Close它. -} - /* udp是无连接的,所以需要考虑超时问题。 在转发udp时, 有可能有多种情况: @@ -316,257 +297,35 @@ func RelayUDP_separate(rc, lc MsgConn, firstAddr *Addr, downloadByteCount, uploa return count2 } -// symmetric, proxy/dokodemo 有用到. 实现 MsgConn -type UniTargetMsgConn struct { - net.Conn - Target Addr -} +/* +relay udp 有两种针对不同通道的技术 -func (u UniTargetMsgConn) Fullcone() bool { - return false -} +一种是针对单来源通道的技术,通常是udp in tcp的情况,此时,我们用MsgConn + RelayUDP 方法很方便 -func (u UniTargetMsgConn) ReadMsgFrom() ([]byte, Addr, error) { - bs := utils.GetPacket() +另一种是直接读udp的技术,目前我们用很多代码来适配它,也能包装MsgConn里,但是非常不美观,繁琐。 +因为udp是多来源的,我们为了确定单一来源,就要全局读,然后定义一个map, 为每一个来源的地址存储一个MsgConn - n, err := u.Conn.Read(bs) - if err != nil { - return nil, Addr{}, err - } - return bs[:n], u.Target, err -} +我们重新定义一个MsgProducer 和 MsgConsumer,就方便很多. 这是针对多来源的转发。 -func (u UniTargetMsgConn) WriteMsgTo(bs []byte, _ Addr) error { - _, err := u.Conn.Write(bs) - return err -} - -func (u UniTargetMsgConn) CloseConnWithRaddr(raddr Addr) error { - return u.Conn.Close() -} - -func (u UniTargetMsgConn) Close() error { - return u.Conn.Close() -} - -// UDPMsgConn 实现 MsgConn 和 net.PacketConn。 可满足fullcone/symmetric. 在proxy/direct 被用到. -type UDPMsgConn struct { - *net.UDPConn - IsServer, fullcone, closed bool - - symmetricMap map[HashableAddr]*net.UDPConn - symmetricMapMutex sync.RWMutex - - symmetricMsgReadChan chan AddrData -} - -// NewUDPMsgConn 创建一个 UDPMsgConn 并使用传入的 laddr 监听udp; 若未给出laddr, 将使用一个随机可用的端口监听. -// 如果是普通的单目标的客户端,用 (nil,false,false) 即可. -// -// 满足fullcone/symmetric, 由 fullcone 的值决定. -func NewUDPMsgConn(laddr *net.UDPAddr, fullcone bool, isserver bool, sockopt *Sockopt) (*UDPMsgConn, error) { - uc := new(UDPMsgConn) - var udpConn *net.UDPConn - var err error - if sockopt != nil { - if laddr == nil { - laddr = &net.UDPAddr{} - } - a := NewAddrFromUDPAddr(laddr) - pConn, e := a.ListenUDP_withOpt(sockopt) - if e != nil { - err = e - } else { - udpConn = pConn.(*net.UDPConn) - } - } else { - udpConn, err = net.ListenUDP("udp", laddr) - - } - - if err != nil { - return nil, err - } - udpConn.SetReadBuffer(MaxUDP_packetLen) - udpConn.SetWriteBuffer(MaxUDP_packetLen) - - uc.UDPConn = udpConn - uc.fullcone = fullcone - uc.IsServer = isserver - if !fullcone { - uc.symmetricMap = make(map[HashableAddr]*net.UDPConn) - uc.symmetricMsgReadChan = make(chan AddrData, 50) //缓存大一点比较好. 假设有十个udp连接, 每一个都连续读了5个信息,这样就会装满50个缓存了。 - - //我们暂时不把udpConn放入 symmetricMap 中,而是等待第一次Write成功后再放入. - } - return uc, nil -} - -func (u *UDPMsgConn) Fullcone() bool { - return u.fullcone -} - -func (u *UDPMsgConn) readSymmetricMsgFromConn(conn *net.UDPConn, thishash HashableAddr) { - if ce := utils.CanLogDebug("readSymmetricMsgFromConn called"); ce != nil { - ce.Write(zap.String("addr", thishash.String())) - } +如此,一个 UDPConn就相当于一个 MsgProducer, 它的to 可以由 tproxy 或者 msg内部的数据提取出来 +*/ +func RelayMsgP2C(p MsgProducer, c MsgConsumer) { for { - bs := utils.GetPacket() - - conn.SetReadDeadline(time.Now().Add(UDP_timeout)) - - n, ad, err := conn.ReadFromUDP(bs) - - if err != nil || u.closed { - break - } - - u.symmetricMsgReadChan <- AddrData{Data: bs[:n], Addr: NewAddrFromUDPAddr(ad)} - } - - u.symmetricMapMutex.Lock() - delete(u.symmetricMap, thishash) - u.symmetricMapMutex.Unlock() - - conn.Close() - -} - -func (u *UDPMsgConn) ReadMsgFrom() ([]byte, Addr, error) { - if u.fullcone { - bs := utils.GetPacket() - - u.UDPConn.SetReadDeadline(time.Now().Add(UDP_fullcone_timeout)) - - n, ad, err := u.UDPConn.ReadFromUDP(bs) - + msg, from, to, err := p.ProduceMsg() if err != nil { - return nil, Addr{}, err + return } - - return bs[:n], NewAddrFromUDPAddr(ad), nil - } else { - ad, ok := <-u.symmetricMsgReadChan - if ok { - ad.Addr.Network = "udp" - return ad.Data, ad.Addr, nil - } else { - return nil, Addr{}, net.ErrClosed + err = c.ConsumeMsg(msg, from, to) + if err != nil { + return } } - } -func (u *UDPMsgConn) WriteMsgTo(bs []byte, raddr Addr) error { - - var theConn *net.UDPConn - - if !u.fullcone && !u.IsServer { - //非fullcone时, 强制 symmetric, 对每个远程地址 都使用一个 对应的新laddr - - //UDPMsgConn 一般用于 direct,此时 一定有 !u.IsServer 成立 - - thishash := raddr.GetHashable() - thishash.Network = "udp" //有可能调用者忘配置Network项. - - if len(u.symmetricMap) == 0 { - - _, err := u.UDPConn.WriteTo(bs, raddr.ToUDPAddr()) - if err == nil { - u.symmetricMapMutex.Lock() - u.symmetricMap[thishash] = u.UDPConn - u.symmetricMapMutex.Unlock() - } - go u.readSymmetricMsgFromConn(u.UDPConn, thishash) - return err - } - - u.symmetricMapMutex.RLock() - theConn = u.symmetricMap[thishash] - u.symmetricMapMutex.RUnlock() - - if theConn == nil { - var e error - theConn, e = net.ListenUDP("udp", nil) - if e != nil { - return e - } - - u.symmetricMapMutex.Lock() - u.symmetricMap[thishash] = theConn - u.symmetricMapMutex.Unlock() - - go u.readSymmetricMsgFromConn(theConn, thishash) - } - - } else { - theConn = u.UDPConn - } - - _, err := theConn.WriteTo(bs, raddr.ToUDPAddr()) - return err +type MsgProducer interface { + ProduceMsg() (msg []byte, from, to Addr, err error) } -func (u *UDPMsgConn) CloseConnWithRaddr(raddr Addr) error { - if !u.IsServer { - if u.fullcone { - //u.UDPConn.SetReadDeadline(time.Now()) - - } else { - u.symmetricMapMutex.Lock() - - thehash := raddr.GetHashable() - theConn := u.symmetricMap[thehash] - - if theConn != nil { - delete(u.symmetricMap, thehash) - theConn.Close() - - } - - u.symmetricMapMutex.Unlock() - } - } - return nil -} - -func (u *UDPMsgConn) Close() error { - - if !u.closed { - u.closed = true - - if !u.fullcone { - close(u.symmetricMsgReadChan) - } - return u.UDPConn.Close() - } - - return nil -} - -// 实现 net.PacketConn -func (uc *UDPMsgConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { - var bs []byte - var a Addr - bs, a, err = uc.ReadMsgFrom() - if err == nil { - n = copy(p, bs) - addr = a.ToUDPAddr() - } - return -} - -// 实现 net.PacketConn -func (uc *UDPMsgConn) WriteTo(p []byte, raddr net.Addr) (n int, err error) { - if ua, ok := raddr.(*net.UDPAddr); ok { - err = uc.WriteMsgTo(p, NewAddrFromUDPAddr(ua)) - if err == nil { - n = len(p) - } - - } else { - err = errors.New("UDPMsgConn.WriteTo, raddr can't cast to *net.UDPAddr") - } - return - +type MsgConsumer interface { + ConsumeMsg(msg []byte, from, to Addr) (err error) } diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 47cf0f0..288d434 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -109,7 +109,6 @@ func (c *Client) EstablishUDPChannel(underlay net.Conn, firstPayload []byte, tar mc = &shadowUDPPacketConn{ PacketConn: pc, raddr: underlay.RemoteAddr(), - taddr: target.ToUDPAddr(), } if firstPayload != nil { diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index ede899e..2e0ddec 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -5,6 +5,7 @@ import ( "io" "net" "net/url" + "sync" "github.com/e1732a364fed/v2ray_simple/netLayer" "github.com/e1732a364fed/v2ray_simple/proxy" @@ -54,44 +55,15 @@ func (ServerCreator) URLToListenConf(u *url.URL, lc *proxy.ListenConf, format in } -// loop read udp -// func (ServerCreator) AfterCommonConfServer(s proxy.Server) { -// if s.Network() == "udp" || s.Network() == netLayer.DualNetworkName { -// ss := s.(*Server) -// uc, err := net.ListenUDP("udp", ss.LUA) -// if err != nil { -// log.Panicln("shadowsocks listen udp failed", err) -// } -// pc := ss.cipher.PacketConn(uc) -// for { -// buf := utils.GetPacket() -// defer utils.PutPacket(buf) -// n, saddr, err := pc.ReadFrom(buf) -// if err != nil { -// if ce := utils.CanLogErr("shadowsocks read udp failed"); ce != nil { -// ce.Write(zap.Error(err)) -// } -// return -// } -// r := bytes.NewBuffer(buf[:n]) -// taddr, err := GetAddrFrom(r) -// if err != nil { -// if ce := utils.CanLogErr("shadowsocks GetAddrFrom failed"); ce != nil { -// ce.Write(zap.Error(err)) -// } -// return -// } - -// } -// } -// } - type Server struct { proxy.Base *utils.MultiUserMap cipher core.Cipher + + m sync.RWMutex + udpMsgConnMap map[netLayer.HashableAddr]*shadowUDPPacketConn } func newServer(info MethodPass, lc *proxy.ListenConf) *Server { @@ -105,6 +77,12 @@ func (*Server) Name() string { return Name } +func (s *Server) SelfListen() (is, tcp, udp bool) { + udp = true + is = true + return +} + func (s *Server) Handshake(underlay net.Conn) (result net.Conn, msgConn netLayer.MsgConn, targetAddr netLayer.Addr, returnErr error) { result = s.cipher.StreamConn(underlay) readbs := utils.GetBytes(utils.MTU) @@ -146,3 +124,65 @@ realPart: return } + +func (ss *Server) StartListen(_ chan<- proxy.IncomeTCPInfo, udpInfoChan chan<- proxy.IncomeUDPInfo) io.Closer { + // uc, err := net.ListenUDP("udp", ss.LUA) + // if err != nil { + // log.Panicln("shadowsocks listen udp failed", err) + // } + // pc := ss.cipher.PacketConn(uc) + + // sp := &shadowUDPPacketConn{ + // PacketConn: pc, + // } + + go func() { + + // for { + // bs := utils.GetPacket() + + // n, addr, err := pc.ReadFrom(bs) + // if err != nil { + // return + // } + // ad, err := netLayer.NewAddrFromAny(addr) + // if err != nil { + // return + // } + // hash := ad.GetHashable() + + // ss.m.RLock() + // conn, found := ss.udpMsgConnMap[hash] + // ss.m.RUnlock() + + // if !found { + // conn = &shadowUDPPacketConn{ + // ourSrcAddr: src, + // readChan: make(chan netLayer.AddrData, 5), + // closeChan: make(chan struct{}), + // parentMachine: m, + // hash: hash, + // } + // conn.InitEasyDeadline() + + // m.Lock() + // m.udpMsgConnMap[hash] = conn + // m.Unlock() + + // } + + // destAddr := netLayer.NewAddrFromUDPAddr(dst) + + // conn.readChan <- netLayer.AddrData{Data: bs[:n], Addr: destAddr} + + // if !found { + // return conn, destAddr, nil + + // } + + // } + + }() + return nil + +} diff --git a/proxy/shadowsocks/udp.go b/proxy/shadowsocks/udp.go index 9e60baa..d5623e7 100644 --- a/proxy/shadowsocks/udp.go +++ b/proxy/shadowsocks/udp.go @@ -11,7 +11,6 @@ import ( type shadowUDPPacketConn struct { net.PacketConn raddr net.Addr - taddr net.Addr handshakeBuf *bytes.Buffer } diff --git a/utils/error.go b/utils/error.go index 952e768..4a78abe 100644 --- a/utils/error.go +++ b/utils/error.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "strings" ) var ( @@ -23,7 +24,7 @@ var ( type InvalidDataErr string -//return err == e || err == ErrInvalidData +// return err == e || err == ErrInvalidData func (e InvalidDataErr) Is(err error) bool { return err == e || err == ErrInvalidData } @@ -32,7 +33,7 @@ func (e InvalidDataErr) Error() string { return string(e) } -//nothing special. Normally, N==0 means no error +// nothing special. Normally, N==0 means no error type NumErr struct { N int E error @@ -52,7 +53,7 @@ func (ef NumErr) Unwarp() error { return ef.E } -//nothing special +// nothing special type NumStrErr struct { N int Prefix string @@ -63,7 +64,7 @@ func (ne NumStrErr) Error() string { return ne.Prefix + strconv.Itoa(ne.N) } -//an err with a buffer, nothing special +// an err with a buffer, nothing special type ErrBuffer struct { Err error Buf *bytes.Buffer @@ -141,3 +142,34 @@ func (e ErrInErr) String() string { return e.ErrDesc } + +type Errs struct { + List []ErrsItem +} + +type ErrsItem struct { + Index int + E error +} + +func (ee *Errs) Add(e ErrsItem) { + ee.List = append(ee.List, e) +} +func (e Errs) OK() bool { + return len(e.List) == 0 +} +func (e Errs) String() string { + var sb strings.Builder + for _, err := range e.List { + sb.WriteString(strconv.Itoa(err.Index)) + sb.WriteString(", ") + sb.WriteString(err.E.Error()) + sb.WriteString("\n") + + } + return sb.String() +} + +func (e Errs) Error() string { + return e.String() +} diff --git a/utils/io.go b/utils/io.go index 02589e5..1b2c5af 100644 --- a/utils/io.go +++ b/utils/io.go @@ -5,7 +5,7 @@ import ( "sync" ) -//一种简单的读写组合, 在ws包中被用到. +// 一种简单的读写组合, 在ws包中被用到. type RW struct { io.Reader io.Writer @@ -30,7 +30,7 @@ type ByteWriter interface { Write(p []byte) (n int, err error) } -//optionally read from OptionalReader +// optionally read from OptionalReader type ReadWrapper struct { io.Reader OptionalReader io.Reader @@ -62,7 +62,7 @@ type DummyReadCloser struct { } // ReadCount -= 1 at each call. -//if ReadCount<0, return 0, io.EOF +// if ReadCount<0, return 0, io.EOF func (d *DummyReadCloser) Read(p []byte) (int, error) { d.ReadCount -= 1 //log.Println("read called", d.ReadCount) @@ -74,7 +74,7 @@ func (d *DummyReadCloser) Read(p []byte) (int, error) { } } -//return nil +// return nil func (DummyReadCloser) Close() error { return nil } @@ -84,7 +84,7 @@ type DummyWriteCloser struct { } // WriteCount -= 1 at each call. -//if WriteCount<0, return 0, io.EOF +// if WriteCount<0, return 0, io.EOF func (d *DummyWriteCloser) Write(p []byte) (int, error) { d.WriteCount -= 1 //log.Println("write called", d.WriteCount) @@ -97,12 +97,12 @@ func (d *DummyWriteCloser) Write(p []byte) (int, error) { } } -//return nil +// return nil func (DummyWriteCloser) Close() error { return nil } -//先从Old读,若SwitchChan被关闭, 立刻改为从New读 +// 先从Old读,若SwitchChan被关闭, 立刻改为从New读 type ReadSwitcher struct { Old, New io.Reader //non-nil SwitchChan chan struct{} //non-nil @@ -154,7 +154,7 @@ func (d *ReadSwitcher) Close() error { return nil } -//先向Old写,若SwitchChan被关闭, 改向New写 +// 先向Old写,若SwitchChan被关闭, 改向New写 type WriteSwitcher struct { Old, New io.Writer //non-nil SwitchChan chan struct{} //non-nil @@ -187,7 +187,7 @@ func (d *WriteSwitcher) Close() error { return nil } -//simple structure that send a signal by chan when Close called. +// simple structure that send a signal by chan when Close called. type ChanCloser struct { closeChan chan struct{} once sync.Once @@ -206,3 +206,24 @@ func (cc *ChanCloser) Close() error { }) return nil } + +type MultiCloser struct { + Closers []io.Closer + sync.Once +} + +func (cc *MultiCloser) Close() (result error) { + cc.Once.Do(func() { + var es Errs + for i, c := range cc.Closers { + e := c.Close() + if e != nil { + es.Add(ErrsItem{Index: i, E: e}) + } + } + if !es.OK() { + result = es + } + }) + return +}