From ce735dbb99a7fc7dad7f470c20c43e7ecebf9504 Mon Sep 17 00:00:00 2001 From: hahahrfool <75717694+hahahrfool@users.noreply.github.com> Date: Fri, 8 Apr 2022 20:31:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E8=AE=A2udp=E4=BB=A3=E7=A0=81;=20dial?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=20=E6=B7=BB=E5=8A=A0=20fullcone=20=E9=80=89?= =?UTF-8?q?=E9=A1=B9;=E9=BB=98=E8=AE=A4=E4=B8=BA=E9=9D=9Efullcone?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 现在整个程序均通过了go test, main 也可以正常运行了。 Relay_UDP 函数添加流量计数; 发现之前 Relay函数的流量计数 在main.go里参数传反了,导致实际上计数的是上传而不是下载,已修复 对fullcone的情况做了特别考量。MsgConn的 Close函数在fullcone时不能随便被调用。 因此我添加了一个 CloseConnWithRaddr(raddr Addr) error 方法,以及 Fullcone() bool 方法 在utils包的init部分使用 rand 随机种子 --- backup | 438 +++++++++++++++++++++++++++ examples/vlesss.client.toml | 5 + main.go | 258 +++++++--------- netLayer/netlayer.go | 40 +-- netLayer/udp_relay.go | 576 ++++++++++++++---------------------- netLayer/udp_test.go | 24 ++ proxy/config.go | 3 +- proxy/direct/client.go | 19 +- proxy/dokodemo/server.go | 8 +- proxy/socks5/server.go | 8 + proxy/socks5/udp_test.go | 2 +- proxy/trojan/udpConn.go | 6 + proxy/vless/udpConn.go | 7 + proxy/vless/vless_test.go | 18 +- quic/quic.go | 14 +- utils/error.go | 3 + utils/utils.go | 8 + 17 files changed, 871 insertions(+), 566 deletions(-) create mode 100644 backup create mode 100644 netLayer/udp_test.go diff --git a/backup b/backup new file mode 100644 index 0000000..c52edc4 --- /dev/null +++ b/backup @@ -0,0 +1,438 @@ +// 下面一段代码 单独处理 udp承载数据的特殊转发。 + // + // 这里只处理 vless v1 的CRUMFURS 转发到direct的情况 以及 socks5 的udp associate 转发 的情况; + // 如果条件不符合则会跳过这段代码 并进入下一阶段 + if targetAddr.IsUDP() { + + } + +switch inServer.Name() { + case "vless": + + if client.Name() == "direct" { + + uc := wlc.(*vless.UserTCPConn) + + if uc.GetProtocolVersion() < 1 { + break + } + + // 根据 vless_v1的讨论,vless_v1 的udp转发的 通信方式 也是与tcp连接类似的分离信道方式 + // 上面已经把 CRUMFURS 的情况过滤掉了,所以现在这里就是普通的udp请求 + // + // 因为direct使用 proxy.RelayUDP_to_Direct 函数 直接实现了fullcone + // 那么我们只需要传入一个 UDP_Extractor 即可 + // 我们通过 netLayer.UniUDP_Extractor 达到此目的 + + //unknownRemoteAddrMsgWriter 在 vless v1中的实现就是 theCRUMFURS (vless v0就是mux) + + id := uc.GetIdentityStr() + + vlessServer := inServer.(*vless.Server) + + theCRUMFURS := vlessServer.Get_CRUMFURS(id) + var unknownRemoteAddrMsgWriter netLayer.UDPResponseWriter + + unknownRemoteAddrMsgWriter = theCRUMFURS + + uniExtractor := netLayer.NewUniUDP_Extractor(*targetAddr.ToUDPAddr(), wlc, unknownRemoteAddrMsgWriter) + + netLayer.RelayUDP_to_Direct(uniExtractor) //阻塞 + + return + } + + case "socks5": + // 此时socks5包已经帮我们dial好了一个udp连接,即wlc,但是还未读取到客户端想要访问的东西 + udpConn := wlc.(*socks5.UDPConn) + + dialFunc := func(targetAddr netLayer.Addr) (io.ReadWriter, error) { + rw, ne := dialClient(incomingInserverConnState{}, targetAddr, client, false, nil, true) + if ne != (utils.NumErr{}) { + return rw, ne + } + return rw, nil + } + + if putter, ok := client.(netLayer.UDP_Putter); ok { + + // 将 outClient 视为 UDP_Putter ,就可以转发udp信息了 + // vless.Client 实现了 UDP_Putter, 新连接的Handshake过程会在 dialFunc 被调用 时发生 + + //UDP_Putter 不使用传统的Handshake过程,因为Handshake是用于第一次数据,然后后面接着的双向传输都不再需要额外信息;而 UDP_Putter 每一次数据传输都是需要传输 目标地址的,所以每一次都需要一些额外数据,这就是我们 UDP_Putter 接口去解决的事情。 + + //因为UDP Associate后,就会保证以后的向 wlc 的 所有请求数据都是udp请求,所以可以在这里直接循环转发了。 + + go udpConn.StartPushResponse(putter) + + udpConn.StartReadRequest(putter, dialFunc) + + } else if pc, ok := client.(netLayer.UDP_Putter_Generator); ok { + + // direct 实现了 UDP_Putter_Generator + + putter := pc.GetNewUDP_Putter() + if putter != nil { + go udpConn.StartPushResponse(putter) + + udpConn.StartReadRequest(putter, dialFunc) + } + } else { + if ce := utils.CanLogErr("socks5 udp err"); ce != nil { + ce.Write( + zap.String("detail", "server -> client for udp, but client didn't implement netLayer.UDP_Putter or UDP_Putter_Generator"), + zap.String("client", client.Name()), + ) + } + } + return + + } + + + +//////////////////// 接口 //////////////////// + +type UDPRequestReader interface { + GetNewUDPRequest() (net.UDPAddr, []byte, error) +} + +type UDPResponseWriter interface { + WriteUDPResponse(net.UDPAddr, []byte) error +} + +// UDP_Extractor, 用于从一个虚拟的协议中提取出 udp请求 +// +// 从一个未知协议中读取UDP请求,然后试图得到该请求的回应(大概率是直接通过direct发出) 并写回 +type UDP_Extractor interface { + UDPRequestReader + UDPResponseWriter +} + + + +// 写入一个UDP请求; 可以包裹成任意协议。 +// 因为有时该地址从来没申请过,所以此时就要用dialFunc创建一个新连接 +type UDPRequestWriter interface { + WriteUDPRequest(target net.UDPAddr, request []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) error + CloseUDPRequestWriter() //如果read端失败,则一定需要close Write端. CloseUDPRequestWriter就是这个用途. +} + +//拉取一个新的 UDP 响应 +type UDPResponseReader interface { + GetNewUDPResponse() (net.UDPAddr, []byte, error) +} + +// UDP_Putter, 用于把 udp请求转换成 虚拟的协议 +// +// 向一个特定的协议 写入 UDP请求,然后试图读取 该请求的回应. 比如vless.Client就实现了它 +type UDP_Putter interface { + UDPRequestWriter + UDPResponseReader +} + +type UDP_Putter_Generator interface { + GetNewUDP_Putter() UDP_Putter +} + +//////////////////// 具体实现 //////////////////// + +// 最简单的 UDP_Putter,用于客户端; 不处理内部数据,直接认为要 发送给服务端的信息 要发送到一个特定的地址 +// 如果指定的地址不是 默认的地址,则发送到 unknownRemoteAddrMsgWriter +// +// 对于 vless v1来说, unknownRemoteAddrMsgWriter 要做的 就是 新建一个与服务端的 请求udp的连接, +// 然后这个新连接就变成了新的 UniUDP_Putter +type UniUDP_Putter struct { + targetAddr net.UDPAddr + io.ReadWriter + + unknownRemoteAddrMsgWriter UDPRequestWriter +} + +// +func (e *UniUDP_Putter) GetNewUDPResponse() (net.UDPAddr, []byte, error) { + bs := make([]byte, MaxUDP_packetLen) + n, err := e.ReadWriter.Read(bs) + if err != nil { + return e.targetAddr, nil, err + } + return e.targetAddr, bs[:n], nil +} + +func (e *UniUDP_Putter) WriteUDPRequest(addr net.UDPAddr, bs []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) (err error) { + + if addr.String() == e.targetAddr.String() { + _, err = e.ReadWriter.Write(bs) + + return + } else { + if e.unknownRemoteAddrMsgWriter == nil { + return + } + // 普通的 WriteUDPRequest需要调用 dialFunc来拨号新链接,而我们这里 直接就传递给 unknownRemoteAddrMsgWriter 了 + + return e.unknownRemoteAddrMsgWriter.WriteUDPRequest(addr, bs, dialFunc) + } + +} + +// 最简单的 UDP_Extractor,用于服务端; 不处理内部数据,直接认为客户端传来的内部数据的目标为一个特定值。 +// 收到的响应数据的来源 如果和 targetAddr 相同的话,直接写入传入的 ReadWriter +// 收到的外界数据的来源 如果和 targetAddr 不同的话,那肯定就是使用了fullcone,那么要传入 unknownRemoteAddrMsgWriter; 如果New时传入unknownRemoteAddrMsgWriter的 是nil的话,那么意思就是不支持fullcone,将直接舍弃这一部分数据。 +type UniUDP_Extractor struct { + targetAddr net.UDPAddr + io.ReadWriter + + unknownRemoteAddrMsgWriter UDPResponseWriter +} + +// 新建,unknownRemoteAddrMsgWriter 用于写入 未知来源响应,rw 用于普通的客户请求的目标的响应 +func NewUniUDP_Extractor(addr net.UDPAddr, rw io.ReadWriter, unknownRemoteAddrMsgWriter UDPResponseWriter) *UniUDP_Extractor { + return &UniUDP_Extractor{ + targetAddr: addr, + ReadWriter: rw, + unknownRemoteAddrMsgWriter: unknownRemoteAddrMsgWriter, + } +} + +// 从客户端连接中 提取出 它的 UDP请求,就是直接读取数据。然后搭配上之前设置好的地址 +func (e *UniUDP_Extractor) GetNewUDPRequest() (net.UDPAddr, []byte, error) { + bs := make([]byte, MaxUDP_packetLen) + n, err := e.ReadWriter.Read(bs) + if err != nil { + return e.targetAddr, nil, err + } + return e.targetAddr, bs[:n], nil +} + +// WriteUDPResponse 写入远程服务器的响应;要分情况讨论。 +// 因为是单一目标extractor,所以正常情况下 传入的response 的源地址 也 应和 e.targetAddr 相同, +// 如果地址不同的话,那肯定就是使用了fullcone,那么要传入 unknownRemoteAddrMsgWriter +func (e *UniUDP_Extractor) WriteUDPResponse(addr net.UDPAddr, bs []byte) (err error) { + + if addr.String() == e.targetAddr.String() { + _, err = e.ReadWriter.Write(bs) + + return + } else { + //如果未配置 unknownRemoteAddrMsgWriter, 则说明不支持fullcone。这并不是错误,而是可选的。看你想不想要fullcone + if e.unknownRemoteAddrMsgWriter == nil { + return + } + + return e.unknownRemoteAddrMsgWriter.WriteUDPResponse(addr, bs) + } + +} + + +//一种简单的本地 UDP_Extractor + UDP_Putter +type UDP_Pipe struct { + requestChan, responseChan chan UDPAddrData + requestChanClosed, responseChanClosed bool +} + +func (u *UDP_Pipe) IsInvalid() bool { + return u.requestChanClosed || u.responseChanClosed +} + +func (u *UDP_Pipe) closeRequestChan() { + if !u.requestChanClosed { + close(u.requestChan) + u.requestChanClosed = true + } +} +func (u *UDP_Pipe) closeResponseChan() { + if !u.responseChanClosed { + close(u.responseChan) + u.responseChanClosed = true + } +} + +func (u *UDP_Pipe) Close() { + u.closeRequestChan() + u.closeResponseChan() + +} + +func NewUDP_Pipe() *UDP_Pipe { + return &UDP_Pipe{ + requestChan: make(chan UDPAddrData, 10), + responseChan: make(chan UDPAddrData, 10), + } +} + + +func (u *UDP_Pipe) CloseUDPRequestWriter() { + u.closeRequestChan() +} + +func (u *UDP_Pipe) GetNewUDPRequest() (net.UDPAddr, []byte, error) { + + d, ok := <-u.requestChan + if ok { + return d.Addr, d.Data, nil + + } else { + //如果requestChan被关闭了,就要同时关闭 responseChan + u.closeResponseChan() + return net.UDPAddr{}, nil, io.EOF + } +} + +func (u *UDP_Pipe) GetNewUDPResponse() (net.UDPAddr, []byte, error) { + d, ok := <-u.responseChan + if ok { + return d.Addr, d.Data, nil + + } else { + //如果 responseChan 被关闭了,就要同时关闭 requestChan + u.closeRequestChan() + return net.UDPAddr{}, nil, io.EOF + } + +} + +// 会保存bs的副本,不必担心数据被改变的问题。 +func (u *UDP_Pipe) WriteUDPResponse(addr net.UDPAddr, bs []byte) error { + bsCopy := make([]byte, len(bs)) + copy(bsCopy, bs) + + u.responseChan <- UDPAddrData{ + Addr: addr, + Data: bsCopy, + } + return nil +} + +// 会保存bs的副本,不必担心数据被改变的问题。 +func (u *UDP_Pipe) WriteUDPRequest(addr net.UDPAddr, bs []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) error { + bsCopy := make([]byte, len(bs)) + copy(bsCopy, bs) + + u.requestChan <- UDPAddrData{ + Addr: addr, + Data: bsCopy, + } + return nil +} + + + +// RelayUDP_to_Direct 用于 从一个未知协议读取 udp请求,然后通过 直接的udp连接 发送到 远程udp 地址。 +// 该函数是阻塞的。而且实现了fullcone; 本函数会直接处理 对外新udp 的dial +// +// RelayUDP_to_Direct 与 RelayTCP 函数 的区别是,已经建立的udpConn是可以向其它目的地址发送信息的 +// 服务端可以向 客户端发送 非客户端发送过数据 的地址 发来的信息 +// 原理是,客户端请求第一次后,就会在服务端开放一个端口,然后其它远程主机就会发现这个端口并试图向客户端发送数据 +// 而由于fullcone,所以如果客户端要请求一个 不同的udp地址的话,如果这个udp地址是之前发送来过信息,那么就要用之前建立过的udp连接,这样才能保证端口一致; +// +func RelayUDP_to_Direct(extractor UDP_Extractor) { + + type connState struct { + conn *net.UDPConn + raddrMap map[string]bool //所有与thisconn关联的 raddr + } + + //具体实现: 每当有对新远程udp地址的请求发生时,就会同时 监听 “用于发送该请求到远程udp主机的本地udp端口”,接受一切发往 该端口的数据 + + var dialedUDPConnMap = make(map[string]*connState) + + var mutex sync.RWMutex + + for { + + raddr, requestData, err := extractor.GetNewUDPRequest() + if err != nil { + break + } + + first_raddrStr := raddr.String() + + mutex.RLock() + oldConn := dialedUDPConnMap[first_raddrStr] + mutex.RUnlock() + + if oldConn != nil { + + oldConn.conn.Write(requestData) + + } else { + + newConn, err := net.DialUDP("udp", nil, &raddr) + if err != nil { + + break + } + + _, err = newConn.Write(requestData) + if err != nil { + break + } + + first_cs := &connState{ + conn: newConn, + raddrMap: make(map[string]bool), + } + first_cs.raddrMap[first_raddrStr] = true + + mutex.Lock() + dialedUDPConnMap[first_raddrStr] = first_cs + mutex.Unlock() + + //监听所有发往 newConn的 远程任意主机 发来的消息。 + go func(thisconn *net.UDPConn, supposedRemoteAddr net.UDPAddr) { + bs := make([]byte, MaxUDP_packetLen) + for { + thisconn.SetDeadline(time.Now().Add(UDP_timeout)) + + //log.Println("redirect udp, start read", supposedRemoteAddr) + n, raddr, err := thisconn.ReadFromUDP(bs) + if err != nil { + + //timeout后,就会删掉第一个拨号的raddr,以及因为fullcone而产生的其它raddr + //然后关闭此udp端口 + + mutex.Lock() + + delete(dialedUDPConnMap, first_raddrStr) + + for anotherRaddr := range first_cs.raddrMap { + delete(dialedUDPConnMap, anotherRaddr) + } + mutex.Unlock() + + thisconn.Close() + break + } + + // 这个远程 地址 无论是新的还是旧的, 都是要 和 newConn关联的,下一次向 这个远程地址发消息时,也要用 newConn来发,而不是新dial一个。 + + hasThisRaddr := false + this_raddr_str := raddr.String() + mutex.RLock() + _, hasThisRaddr = dialedUDPConnMap[this_raddr_str] + mutex.RUnlock() + + if !hasThisRaddr { + + mutex.Lock() + dialedUDPConnMap[this_raddr_str] = first_cs + first_cs.raddrMap[this_raddr_str] = true + mutex.Unlock() + } + + //log.Println("redirect udp, will write to extractor", string(bs[:n])) + + err = extractor.WriteUDPResponse(*raddr, bs[:n]) + if err != nil { + break + } + + } + }(newConn, raddr) + } + + } + +} diff --git a/examples/vlesss.client.toml b/examples/vlesss.client.toml index d044531..b50da99 100644 --- a/examples/vlesss.client.toml +++ b/examples/vlesss.client.toml @@ -65,6 +65,11 @@ utls = true #是否使用 utls 来应用 chrome指纹进行伪装 # advancedLayer = "ws" # 也可为 grpc 或 quic # path = "/ohmygod_verysimple_is_very_simple" # ws的path和 grpc的serviceName 都在这个path里填写, 为了防探测这里越长越随机越好 +[[dial]] +tag = "mydirect" +protocol = "direct" # direct 是不需要特地写出的, 程序会自动创建一个 tag 为 direct 的 dial. 不过也许需要控制一下fullcone, 此时就要写出来, 而且tag需要定义为 不为 "direct" 的其它值。 +# fullcone = true # 默认的fullcone是关闭状态, 可以取消注释以打开. + # route 是在我们代理界是分流的意思。 # route 是可选的,如果没给出的话,就不分流; diff --git a/main.go b/main.go index 8340f3f..ab9bb09 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,6 @@ import ( "go.uber.org/zap" "github.com/hahahrfool/v2ray_simple/proxy" - "github.com/hahahrfool/v2ray_simple/proxy/socks5" "github.com/hahahrfool/v2ray_simple/proxy/vless" _ "github.com/hahahrfool/v2ray_simple/proxy/direct" @@ -598,6 +597,8 @@ func handshakeInserver_and_passToOutClient(iics incomingInserverConnState) { inServer := iics.inServer var wlc io.ReadWriteCloser + var udp_wlc netLayer.MsgConn + var targetAddr netLayer.Addr var err error @@ -605,7 +606,8 @@ func handshakeInserver_and_passToOutClient(iics incomingInserverConnState) { goto checkFallback } - wlc, targetAddr, err = inServer.Handshake(wrappedConn) + wlc, udp_wlc, targetAddr, err = inServer.Handshake(wrappedConn) + if err == nil { //log.Println("inServer handshake passed") //无错误时直接跳过回落, 直接执行下一个步骤 @@ -617,6 +619,7 @@ func handshakeInserver_and_passToOutClient(iics incomingInserverConnState) { //下面代码查看是否支持fallback; wlc先设为nil, 当所有fallback都不满足时,可以判断nil然后关闭连接 wlc = nil + udp_wlc = nil if ce := utils.CanLogWarn("failed in inServer proxy handshake"); ce != nil { ce.Write( @@ -722,7 +725,7 @@ checkFallback: afterLocalServerHandshake: - if wlc == nil { + if wlc == nil && udp_wlc == nil { //无wlc证明 inServer 握手失败,且 没有任何回落可用, 直接return utils.Debug("invalid request and no matched fallback, hung up") wrappedConn.Close() @@ -873,8 +876,8 @@ afterLocalServerHandshake: case "socks5": // UDP Associate: // 因为socks5的 UDP Associate 办法是较为特殊的,不使用现有tcp而是新建立udp,所以此时该tcp连接已经没用了 + // 但是根据socks5标准,这个tcp链接同样是 keep alive的,否则客户端就会认为服务端挂掉了. // 另外,此时 targetAddr.IsUDP 只是用于告知此链接是udp Associate,并不包含实际地址信息 - //但是根据socks5标准,这个tcp链接同样是 keep alive的,否则客户端就会认为服务端挂掉了. default: iics.shouldCloseInSerBaseConnWhenFinish = true @@ -896,101 +899,10 @@ afterLocalServerHandshake: } - // 下面一段代码 单独处理 udp承载数据的特殊转发。 - // - // 这里只处理 vless v1 的CRUMFURS 转发到direct的情况 以及 socks5 的udp associate 转发 的情况; - // 如果条件不符合则会跳过这段代码 并进入下一阶段 - if targetAddr.IsUDP() { - - switch inServer.Name() { - case "vless": - - if client.Name() == "direct" { - - uc := wlc.(*vless.UserTCPConn) - - if uc.GetProtocolVersion() < 1 { - break - } - - // 根据 vless_v1的讨论,vless_v1 的udp转发的 通信方式 也是与tcp连接类似的分离信道方式 - // 上面已经把 CRUMFURS 的情况过滤掉了,所以现在这里就是普通的udp请求 - // - // 因为direct使用 proxy.RelayUDP_to_Direct 函数 直接实现了fullcone - // 那么我们只需要传入一个 UDP_Extractor 即可 - // 我们通过 netLayer.UniUDP_Extractor 达到此目的 - - //unknownRemoteAddrMsgWriter 在 vless v1中的实现就是 theCRUMFURS (vless v0就是mux) - - id := uc.GetIdentityStr() - - vlessServer := inServer.(*vless.Server) - - theCRUMFURS := vlessServer.Get_CRUMFURS(id) - var unknownRemoteAddrMsgWriter netLayer.UDPResponseWriter - - unknownRemoteAddrMsgWriter = theCRUMFURS - - uniExtractor := netLayer.NewUniUDP_Extractor(*targetAddr.ToUDPAddr(), wlc, unknownRemoteAddrMsgWriter) - - netLayer.RelayUDP_to_Direct(uniExtractor) //阻塞 - - return - } - - case "socks5": - // 此时socks5包已经帮我们dial好了一个udp连接,即wlc,但是还未读取到客户端想要访问的东西 - udpConn := wlc.(*socks5.UDPConn) - - dialFunc := func(targetAddr netLayer.Addr) (io.ReadWriter, error) { - rw, ne := dialClient(incomingInserverConnState{}, targetAddr, client, false, nil, true) - if ne != (utils.NumErr{}) { - return rw, ne - } - return rw, nil - } - - if putter, ok := client.(netLayer.UDP_Putter); ok { - - // 将 outClient 视为 UDP_Putter ,就可以转发udp信息了 - // vless.Client 实现了 UDP_Putter, 新连接的Handshake过程会在 dialFunc 被调用 时发生 - - //UDP_Putter 不使用传统的Handshake过程,因为Handshake是用于第一次数据,然后后面接着的双向传输都不再需要额外信息;而 UDP_Putter 每一次数据传输都是需要传输 目标地址的,所以每一次都需要一些额外数据,这就是我们 UDP_Putter 接口去解决的事情。 - - //因为UDP Associate后,就会保证以后的向 wlc 的 所有请求数据都是udp请求,所以可以在这里直接循环转发了。 - - go udpConn.StartPushResponse(putter) - - udpConn.StartReadRequest(putter, dialFunc) - - } else if pc, ok := client.(netLayer.UDP_Putter_Generator); ok { - - // direct 实现了 UDP_Putter_Generator - - putter := pc.GetNewUDP_Putter() - if putter != nil { - go udpConn.StartPushResponse(putter) - - udpConn.StartReadRequest(putter, dialFunc) - } - } else { - if ce := utils.CanLogErr("socks5 udp err"); ce != nil { - ce.Write( - zap.String("detail", "server -> client for udp, but client didn't implement netLayer.UDP_Putter or UDP_Putter_Generator"), - zap.String("client", client.Name()), - ) - } - } - return - - } - - } - ////////////////////////////// 拨号阶段 ///////////////////////////////////// //log.Println("will dial", client) - dialClient(iics, targetAddr, client, isTlsLazy_clientEnd, wlc, false) + dialClient(iics, targetAddr, client, isTlsLazy_clientEnd, wlc, udp_wlc, false) } // dialClient 对实际client进行拨号,处理传输层, tls层, 高级层等所有层级后,进行代理层握手, @@ -999,7 +911,9 @@ afterLocalServerHandshake: //client为真实要拨号的client,可能会与iics里的defaultClient不同。以client为准。 // wlc为调用者所提供的 此请求的 来源 链接。wlc主要用于 Copy阶段. // noCopy是为了让其它调用者自行处理 转发 时使用。 -func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client proxy.Client, isTlsLazy_clientEnd bool, wlc io.ReadWriteCloser, noCopy bool) (io.ReadWriter, utils.NumErr) { +func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client proxy.Client, isTlsLazy_clientEnd bool, wlc io.ReadWriteCloser, udp_wlc netLayer.MsgConn, noCopy bool) { + + isudp := targetAddr.IsUDP() if iics.shouldCloseInSerBaseConnWhenFinish && !noCopy { if iics.baseLocalConn != nil { @@ -1027,7 +941,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client zap.String("request", targetAddr.String()), zap.String("uniqueTestDomain", uniqueTestDomain), ) - return nil, utils.NumErr{N: 1, Prefix: "dialClient err, "} + return } } @@ -1049,7 +963,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client if ce := utils.CanLogErr("dial client convert addr err"); ce != nil { ce.Write(zap.Error(err)) } - return nil, utils.NumErr{N: 15, Prefix: "dial client convert addr err "} + return } realTargetAddr.Network = client.Network() } @@ -1081,7 +995,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client } else { //dail失败, 直接return掉 - return nil, utils.NumErr{N: 13, Prefix: "dial quic Client err"} + return } } @@ -1106,7 +1020,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client } } - return nil, utils.NumErr{N: 2, Prefix: "dialClient err, "} + return } defer clientConn.Close() @@ -1130,7 +1044,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client } - return nil, utils.NumErr{N: 3, Prefix: "dialClient err, "} + return } else { clientEndRemoteClientTlsRawReadRecorder = tlsLayer.NewRecorder() @@ -1146,7 +1060,7 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr, client ce.Write(zap.String("target", targetAddr.String()), zap.Error(err)) } - return nil, utils.NumErr{N: 4, Prefix: "dialClient err, "} + return } clientConn = tlsConn @@ -1174,6 +1088,10 @@ advLayerStep: //第一条连接已满,再开一条session dailedCommonConn = qclient.DialCommonConn(true, dailedCommonConn) + if dailedCommonConn == nil { + //再dial还是nil,也许是暂时性的网络错误, 先关闭 + return + } clientConn, err = qclient.DialSubConn(dailedCommonConn) if err != nil { @@ -1182,7 +1100,7 @@ advLayerStep: zap.Error(err), ) } - return nil, utils.NumErr{N: 14, Prefix: "DialSubConnFunc err, "} + return } } else { if ce := utils.CanLogErr("DialSubConnFunc failed"); ce != nil { @@ -1190,7 +1108,7 @@ advLayerStep: zap.Error(err), ) } - return nil, utils.NumErr{N: 14, Prefix: "DialSubConnFunc err, "} + return } } @@ -1206,7 +1124,7 @@ advLayerStep: iics.baseLocalConn.Close() } - return nil, utils.NumErr{N: 5, Prefix: "dialClient err, "} + return } } @@ -1224,7 +1142,7 @@ advLayerStep: if iics.baseLocalConn != nil { iics.baseLocalConn.Close() } - return nil, utils.NumErr{N: 6, Prefix: "dialClient err, "} + return } case "ws": @@ -1241,7 +1159,7 @@ advLayerStep: if ce := utils.CanLogErr("failed to read ws early data"); ce != nil { ce.Write(zap.Error(e)) } - return nil, utils.NumErr{N: 7, Prefix: "dialClient err, "} + return } ed = edBuf[:n] //log.Println("will send early data", n, ed) @@ -1269,7 +1187,7 @@ advLayerStep: zap.Error(err), ) } - return nil, utils.NumErr{N: 8, Prefix: "dialClient err, "} + return } clientConn = wc @@ -1278,63 +1196,99 @@ advLayerStep: ////////////////////////////// 代理层 握手阶段 ///////////////////////////////////// - wrc, err := client.Handshake(clientConn, targetAddr) - if err != nil { - if ce := utils.CanLogErr("failed in handshake"); ce != nil { - ce.Write( - zap.String("target", targetAddr.String()), - zap.Error(err), - ) + if !isudp { + + wrc, err := client.Handshake(clientConn, targetAddr) + if err != nil { + if ce := utils.CanLogErr("Handshake client failed"); ce != nil { + ce.Write( + zap.String("target", targetAddr.String()), + zap.Error(err), + ) + } + return } - return nil, utils.NumErr{N: 9, Prefix: "dialClient err, "} - } - //log.Println("all handshake finished") + //log.Println("all handshake finished") - ////////////////////////////// 实际转发阶段 ///////////////////////////////////// + ////////////////////////////// 实际转发阶段 ///////////////////////////////////// - if noCopy { - return wrc, utils.NumErr{} - } + if noCopy { + return + } - if !iics.routedToDirect && tls_lazy_encrypt { + if tls_lazy_encrypt && !iics.routedToDirect { - // 我们加了回落之后,就无法确定 “未使用tls的outClient 一定是在服务端” 了 - if isTlsLazy_clientEnd { + // 我们加了回落之后,就无法确定 “未使用tls的outClient 一定是在服务端” 了 + if isTlsLazy_clientEnd { - if client.IsUseTLS() { - //必须是 UserClient - if userClient := client.(proxy.UserClient); userClient != nil { - tryTlsLazyRawCopy(false, userClient, nil, netLayer.Addr{}, wrc, wlc, iics.baseLocalConn, true, clientEndRemoteClientTlsRawReadRecorder) - return nil, utils.NumErr{N: 11, Prefix: "dialClient err, "} + if client.IsUseTLS() { + //必须是 UserClient + if userClient := client.(proxy.UserClient); userClient != nil { + tryTlsLazyRawCopy(false, userClient, nil, netLayer.Addr{}, wrc, wlc, iics.baseLocalConn, true, clientEndRemoteClientTlsRawReadRecorder) + return + } } - } - } else if iics.isTlsLazyServerEnd { + } else if iics.isTlsLazyServerEnd { - // 最新代码已经确认,使用uuid 作为 “特殊指令”,所以要求Server必须是一个 proxy.UserServer - // 否则将无法开启splice功能。这是为了防止0-rtt 探测; + // 最新代码已经确认,使用uuid 作为 “特殊指令”,所以要求Server必须是一个 proxy.UserServer + // 否则将无法开启splice功能。这是为了防止0-rtt 探测; + + if userServer, ok := iics.inServer.(proxy.UserServer); ok { + tryTlsLazyRawCopy(false, nil, userServer, netLayer.Addr{}, wrc, wlc, iics.baseLocalConn, false, iics.inServerTlsRawReadRecorder) + return + } - if userServer, ok := iics.inServer.(proxy.UserServer); ok { - tryTlsLazyRawCopy(false, nil, userServer, netLayer.Addr{}, wrc, wlc, iics.baseLocalConn, false, iics.inServerTlsRawReadRecorder) - return nil, utils.NumErr{N: 12, Prefix: "dialClient err, "} } } + if iics.theFallbackFirstBuffer != nil { + //这里注意,因为是把 tls解密了之后的数据发送到目标地址,所以这种方式只支持转发到本机纯http服务器 + wrc.Write(iics.theFallbackFirstBuffer.Bytes()) + utils.PutBytes(iics.theFallbackFirstBuffer.Bytes()) //这个Buf不是从utils.GetBuf创建的,而是从一个 GetBytes的[]byte 包装 的,所以我们要PutBytes,而不是PutBuf + } + + atomic.AddInt32(&activeConnectionCount, 1) + + downloadBytes := netLayer.Relay(&realTargetAddr, wrc, wlc) + + atomic.AddInt32(&activeConnectionCount, -1) + atomic.AddUint64(&allDownloadBytesSinceStart, uint64(downloadBytes)) + + return + + } else { + udp_wrc, err := client.EstablishUDPChannel(clientConn, targetAddr) + if err != nil { + if ce := utils.CanLogErr("EstablishUDPChannel failed"); ce != nil { + ce.Write( + zap.String("target", targetAddr.String()), + zap.Error(err), + ) + } + return + } + + if noCopy { + return + } + + if iics.theFallbackFirstBuffer != nil { + + udp_wrc.WriteTo(iics.theFallbackFirstBuffer.Bytes(), targetAddr) + utils.PutBytes(iics.theFallbackFirstBuffer.Bytes()) + + } + + atomic.AddInt32(&activeConnectionCount, 1) + + downloadBytes := netLayer.RelayUDP(udp_wrc, udp_wlc) + + atomic.AddInt32(&activeConnectionCount, -1) + atomic.AddUint64(&allDownloadBytesSinceStart, uint64(downloadBytes)) + + return } - if iics.theFallbackFirstBuffer != nil { - //这里注意,因为是把 tls解密了之后的数据发送到目标地址,所以这种方式只支持转发到本机纯http服务器 - wrc.Write(iics.theFallbackFirstBuffer.Bytes()) - utils.PutBytes(iics.theFallbackFirstBuffer.Bytes()) //这个Buf不是从utils.GetBuf创建的,而是从一个 GetBytes的[]byte 包装 的,所以我们要PutBytes,而不是PutBuf - } - - atomic.AddInt32(&activeConnectionCount, 1) - - downloadBytes := netLayer.Relay(&realTargetAddr, wlc, wrc) - - atomic.AddInt32(&activeConnectionCount, -1) - atomic.AddUint64(&allDownloadBytesSinceStart, uint64(downloadBytes)) - - return wrc, utils.NumErr{} } diff --git a/netLayer/netlayer.go b/netLayer/netlayer.go index d169ae3..62129db 100644 --- a/netLayer/netlayer.go +++ b/netLayer/netlayer.go @@ -97,41 +97,7 @@ func IsStrUDP_network(s string) bool { return false } -//使用Addr,是因为有可能申请的是域名,而不是ip -type MsgConn interface { - ReadFrom() ([]byte, Addr, error) - WriteTo([]byte, Addr) error -} - -type UDPMsgConnWrapper struct { - *net.UDPConn - IsClient bool - FirstAddr Addr -} - -func (u *UDPMsgConnWrapper) ReadFrom() ([]byte, Addr, error) { - bs := utils.GetPacket() - n, ad, err := u.UDPConn.ReadFromUDP(bs) - if err != nil { - return nil, Addr{}, err - } - return bs[:n], NewAddrFromUDPAddr(ad), err -} - -func (u *UDPMsgConnWrapper) WriteTo(bs []byte, ad Addr) error { - - if u.IsClient { - if ad.GetHashable() == u.FirstAddr.GetHashable() { - _, err := u.UDPConn.Write(bs) - return err - } else { - - return utils.ErrNotImplemented - } - } else { - _, err := u.UDPConn.WriteTo(bs, ad.ToUDPAddr()) - return err - - } - +type UDPAddrData struct { + Addr net.UDPAddr + Data []byte } diff --git a/netLayer/udp_relay.go b/netLayer/udp_relay.go index 713ff81..5ff40ae 100644 --- a/netLayer/udp_relay.go +++ b/netLayer/udp_relay.go @@ -1,10 +1,11 @@ package netLayer import ( - "io" "net" "sync" "time" + + "github.com/hahahrfool/v2ray_simple/utils" ) const ( @@ -21,382 +22,259 @@ var ( //本文件内含 一些 转发 udp 数据的 接口与方法 -// 阻塞. -func RelayUDP(conn1, conn2 MsgConn) { +//MsgConn一般用于 udp. 是一种类似 net.PacketConn 的包装 +// +//使用Addr,是因为有可能申请的是域名,而不是ip +type MsgConn interface { + ReadFrom() ([]byte, Addr, error) + WriteTo([]byte, Addr) error + CloseConnWithRaddr(raddr Addr) error //关闭特定连接 + Close() error //关闭所有连接 + Fullcone() bool //若Fullcone, 则在转发因另一端关闭而结束后, RelayUDP函数不会Close它. +} + +// 阻塞. 返回从 rc 下载的总字节数. 拷贝完成后自动关闭双端连接. +func RelayUDP(rc, lc MsgConn) int { + + //在转发时, 有可能有多种情况 + /* + 1. dokodemo 监听udp 定向 导向到 direct 的远程udp实际地址 + 此时因为是定向的, 所以肯定不是fullcone + + dokodemo 用的是 UniTargetMsgConn, underlay 是 netLayer.UDPConn, 其已经设置了UDP_timeout + + 在 netLayer.UDPConn 超时后, ReadFrom 就会解放, 并触发双向Close, 来关闭我们的 direct的udp连接。 + + 1.5. 比较少见的情况, dokodemo监听tcp, 然后发送到 direct 的udp. 此时客户应用程序可以手动关闭tcp连接来帮我们触发 udp连接 的 close + + 2. socks5监听 udp, 导向到 direct 的远程udp实际地址 + + socks5端只用一个udp连接来监听所有信息, 所以不能关闭, 所以没有设置超时 + + 此时我们需要对 每一个 direct的udp连接 设置超时, 否则就会一直占用端口 + + 3. socks5 监听udp, 导向到 trojan, 然后 服务端的 trojan 再导向 direct + + trojan 也是用一个信道来接收udp的所有请求的, 所以trojan的连接也不能关. + + 所以依然需要在服务端 的 direct上面 加Read 时限 + + 否则 rc.ReadFrom() 会卡住而不返回. + + 因为direct 使用 UDPMsgConnWrapper,而我们已经在 UDPMsgConnWrapper里加了这个逻辑, 所以可以放心了. + + 4. fullcone, 此时不能对整个监听端口进行close,会影响其它外部链接发来的连接。 + + */ go func() { for { - bs, raddr, err := conn1.ReadFrom() + bs, raddr, err := lc.ReadFrom() if err != nil { - //log.Println("RelayUDP e1", err) break } - err = conn2.WriteTo(bs, raddr) + err = rc.WriteTo(bs, raddr) if err != nil { - //log.Println("RelayUDP e2", err) break } } + if !rc.Fullcone() { + rc.Close() + } + + if !lc.Fullcone() { + lc.Close() + } + }() - for { - bs, raddr, err := conn2.ReadFrom() - if err != nil { - //log.Println("RelayUDP e3", err) - - break - } - err = conn1.WriteTo(bs, raddr) - if err != nil { - //log.Println("RelayUDP e4", err) - - break - } - } -} - -//////////////////// 接口 //////////////////// - -type UDPRequestReader interface { - GetNewUDPRequest() (net.UDPAddr, []byte, error) -} - -type UDPResponseWriter interface { - WriteUDPResponse(net.UDPAddr, []byte) error -} - -// UDP_Extractor, 用于从一个虚拟的协议中提取出 udp请求 -// -// 从一个未知协议中读取UDP请求,然后试图得到该请求的回应(大概率是直接通过direct发出) 并写回 -type UDP_Extractor interface { - UDPRequestReader - UDPResponseWriter -} - -// 写入一个UDP请求; 可以包裹成任意协议。 -// 因为有时该地址从来没申请过,所以此时就要用dialFunc创建一个新连接 -type UDPRequestWriter interface { - WriteUDPRequest(target net.UDPAddr, request []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) error - CloseUDPRequestWriter() //如果read端失败,则一定需要close Write端. CloseUDPRequestWriter就是这个用途. -} - -//拉取一个新的 UDP 响应 -type UDPResponseReader interface { - GetNewUDPResponse() (net.UDPAddr, []byte, error) -} - -// UDP_Putter, 用于把 udp请求转换成 虚拟的协议 -// -// 向一个特定的协议 写入 UDP请求,然后试图读取 该请求的回应. 比如vless.Client就实现了它 -type UDP_Putter interface { - UDPRequestWriter - UDPResponseReader -} - -type UDP_Putter_Generator interface { - GetNewUDP_Putter() UDP_Putter -} - -//////////////////// 具体实现 //////////////////// - -// 最简单的 UDP_Putter,用于客户端; 不处理内部数据,直接认为要 发送给服务端的信息 要发送到一个特定的地址 -// 如果指定的地址不是 默认的地址,则发送到 unknownRemoteAddrMsgWriter -// -// 对于 vless v1来说, unknownRemoteAddrMsgWriter 要做的 就是 新建一个与服务端的 请求udp的连接, -// 然后这个新连接就变成了新的 UniUDP_Putter -type UniUDP_Putter struct { - targetAddr net.UDPAddr - io.ReadWriter - - unknownRemoteAddrMsgWriter UDPRequestWriter -} - -// -func (e *UniUDP_Putter) GetNewUDPResponse() (net.UDPAddr, []byte, error) { - bs := make([]byte, MaxUDP_packetLen) - n, err := e.ReadWriter.Read(bs) - if err != nil { - return e.targetAddr, nil, err - } - return e.targetAddr, bs[:n], nil -} - -func (e *UniUDP_Putter) WriteUDPRequest(addr net.UDPAddr, bs []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) (err error) { - - if addr.String() == e.targetAddr.String() { - _, err = e.ReadWriter.Write(bs) - - return - } else { - if e.unknownRemoteAddrMsgWriter == nil { - return - } - // 普通的 WriteUDPRequest需要调用 dialFunc来拨号新链接,而我们这里 直接就传递给 unknownRemoteAddrMsgWriter 了 - - return e.unknownRemoteAddrMsgWriter.WriteUDPRequest(addr, bs, dialFunc) - } - -} - -// 最简单的 UDP_Extractor,用于服务端; 不处理内部数据,直接认为客户端传来的内部数据的目标为一个特定值。 -// 收到的响应数据的来源 如果和 targetAddr 相同的话,直接写入传入的 ReadWriter -// 收到的外界数据的来源 如果和 targetAddr 不同的话,那肯定就是使用了fullcone,那么要传入 unknownRemoteAddrMsgWriter; 如果New时传入unknownRemoteAddrMsgWriter的 是nil的话,那么意思就是不支持fullcone,将直接舍弃这一部分数据。 -type UniUDP_Extractor struct { - targetAddr net.UDPAddr - io.ReadWriter - - unknownRemoteAddrMsgWriter UDPResponseWriter -} - -// 新建,unknownRemoteAddrMsgWriter 用于写入 未知来源响应,rw 用于普通的客户请求的目标的响应 -func NewUniUDP_Extractor(addr net.UDPAddr, rw io.ReadWriter, unknownRemoteAddrMsgWriter UDPResponseWriter) *UniUDP_Extractor { - return &UniUDP_Extractor{ - targetAddr: addr, - ReadWriter: rw, - unknownRemoteAddrMsgWriter: unknownRemoteAddrMsgWriter, - } -} - -// 从客户端连接中 提取出 它的 UDP请求,就是直接读取数据。然后搭配上之前设置好的地址 -func (e *UniUDP_Extractor) GetNewUDPRequest() (net.UDPAddr, []byte, error) { - bs := make([]byte, MaxUDP_packetLen) - n, err := e.ReadWriter.Read(bs) - if err != nil { - return e.targetAddr, nil, err - } - return e.targetAddr, bs[:n], nil -} - -// WriteUDPResponse 写入远程服务器的响应;要分情况讨论。 -// 因为是单一目标extractor,所以正常情况下 传入的response 的源地址 也 应和 e.targetAddr 相同, -// 如果地址不同的话,那肯定就是使用了fullcone,那么要传入 unknownRemoteAddrMsgWriter -func (e *UniUDP_Extractor) WriteUDPResponse(addr net.UDPAddr, bs []byte) (err error) { - - if addr.String() == e.targetAddr.String() { - _, err = e.ReadWriter.Write(bs) - - return - } else { - //如果未配置 unknownRemoteAddrMsgWriter, 则说明不支持fullcone。这并不是错误,而是可选的。看你想不想要fullcone - if e.unknownRemoteAddrMsgWriter == nil { - return - } - - return e.unknownRemoteAddrMsgWriter.WriteUDPResponse(addr, bs) - } - -} - -type UDPAddrData struct { - Addr net.UDPAddr - Data []byte -} - -//一种简单的本地 UDP_Extractor + UDP_Putter -type UDP_Pipe struct { - requestChan, responseChan chan UDPAddrData - requestChanClosed, responseChanClosed bool -} - -func (u *UDP_Pipe) IsInvalid() bool { - return u.requestChanClosed || u.responseChanClosed -} - -func (u *UDP_Pipe) closeRequestChan() { - if !u.requestChanClosed { - close(u.requestChan) - u.requestChanClosed = true - } -} -func (u *UDP_Pipe) closeResponseChan() { - if !u.responseChanClosed { - close(u.responseChan) - u.responseChanClosed = true - } -} - -func (u *UDP_Pipe) Close() { - u.closeRequestChan() - u.closeResponseChan() - -} - -func NewUDP_Pipe() *UDP_Pipe { - return &UDP_Pipe{ - requestChan: make(chan UDPAddrData, 10), - responseChan: make(chan UDPAddrData, 10), - } -} - -func (u *UDP_Pipe) CloseUDPRequestWriter() { - u.closeRequestChan() -} - -func (u *UDP_Pipe) GetNewUDPRequest() (net.UDPAddr, []byte, error) { - - d, ok := <-u.requestChan - if ok { - return d.Addr, d.Data, nil - - } else { - //如果requestChan被关闭了,就要同时关闭 responseChan - u.closeResponseChan() - return net.UDPAddr{}, nil, io.EOF - } -} - -func (u *UDP_Pipe) GetNewUDPResponse() (net.UDPAddr, []byte, error) { - d, ok := <-u.responseChan - if ok { - return d.Addr, d.Data, nil - - } else { - //如果 responseChan 被关闭了,就要同时关闭 requestChan - u.closeRequestChan() - return net.UDPAddr{}, nil, io.EOF - } - -} - -// 会保存bs的副本,不必担心数据被改变的问题。 -func (u *UDP_Pipe) WriteUDPResponse(addr net.UDPAddr, bs []byte) error { - bsCopy := make([]byte, len(bs)) - copy(bsCopy, bs) - - u.responseChan <- UDPAddrData{ - Addr: addr, - Data: bsCopy, - } - return nil -} - -// 会保存bs的副本,不必担心数据被改变的问题。 -func (u *UDP_Pipe) WriteUDPRequest(addr net.UDPAddr, bs []byte, dialFunc func(targetAddr Addr) (io.ReadWriter, error)) error { - bsCopy := make([]byte, len(bs)) - copy(bsCopy, bs) - - u.requestChan <- UDPAddrData{ - Addr: addr, - Data: bsCopy, - } - return nil -} - -// RelayUDP_to_Direct 用于 从一个未知协议读取 udp请求,然后通过 直接的udp连接 发送到 远程udp 地址。 -// 该函数是阻塞的。而且实现了fullcone; 本函数会直接处理 对外新udp 的dial -// -// RelayUDP_to_Direct 与 RelayTCP 函数 的区别是,已经建立的udpConn是可以向其它目的地址发送信息的 -// 服务端可以向 客户端发送 非客户端发送过数据 的地址 发来的信息 -// 原理是,客户端请求第一次后,就会在服务端开放一个端口,然后其它远程主机就会发现这个端口并试图向客户端发送数据 -// 而由于fullcone,所以如果客户端要请求一个 不同的udp地址的话,如果这个udp地址是之前发送来过信息,那么就要用之前建立过的udp连接,这样才能保证端口一致; -// -func RelayUDP_to_Direct(extractor UDP_Extractor) { - - type connState struct { - conn *net.UDPConn - raddrMap map[string]bool //所有与thisconn关联的 raddr - } - - //具体实现: 每当有对新远程udp地址的请求发生时,就会同时 监听 “用于发送该请求到远程udp主机的本地udp端口”,接受一切发往 该端口的数据 - - var dialedUDPConnMap = make(map[string]*connState) - - var mutex sync.RWMutex + count := 0 for { - - raddr, requestData, err := extractor.GetNewUDPRequest() + bs, raddr, err := rc.ReadFrom() if err != nil { + break } + err = lc.WriteTo(bs, raddr) + if err != nil { - first_raddrStr := raddr.String() + break + } + count += len(bs) + } + if !rc.Fullcone() { + rc.Close() + } - mutex.RLock() - oldConn := dialedUDPConnMap[first_raddrStr] - mutex.RUnlock() + if !lc.Fullcone() { + lc.Close() + } + return count +} - if oldConn != nil { +// symmetric, proxy/dokodemo 有用到. +type UniTargetMsgConn struct { + net.Conn + target Addr +} - oldConn.conn.Write(requestData) +func (u UniTargetMsgConn) Fullcone() bool { + return false +} + +func (u UniTargetMsgConn) ReadFrom() ([]byte, Addr, error) { + bs := utils.GetPacket() + + n, err := u.Conn.Read(bs) + if err != nil { + return nil, Addr{}, err + } + return bs[:n], u.target, err +} + +func (u UniTargetMsgConn) WriteTo(bs []byte, _ Addr) error { + _, err := u.Conn.Write(bs) + return err +} + +func (u UniTargetMsgConn) CloseConnWithRaddr(raddr Addr) error { + return u.Conn.Close() +} + +func (u UniTargetMsgConn) Close() error { + return u.Conn.Close() +} + +//可满足fullcone, 由 Fullcone 的值决定. 在proxy/direct 被用到. +// +type UDPMsgConnWrapper struct { + conn *net.UDPConn + IsServer bool + fullcone bool + + symmetricMap map[HashableAddr]*net.UDPConn + symmetricMapMutex sync.RWMutex +} + +//使用传入的laddr监听udp; 若未给出laddr, 使用一个随机端口监听 +func NewUDPMsgConnClientWrapper(laddr *net.UDPAddr, fullcone bool, isserver bool) *UDPMsgConnWrapper { + uc := new(UDPMsgConnWrapper) + + //if laddr == nil { + // laddr, _ = net.ResolveUDPAddr("udp", ":"+RandPortStr()) + //} + + udpConn, _ := net.ListenUDP("udp", laddr) + + uc.conn = udpConn + uc.fullcone = fullcone + uc.IsServer = isserver + if !fullcone { + uc.symmetricMap = make(map[HashableAddr]*net.UDPConn) + } + return uc +} + +func (u *UDPMsgConnWrapper) Fullcone() bool { + return u.fullcone +} + +func (u *UDPMsgConnWrapper) ReadFrom() ([]byte, Addr, error) { + bs := utils.GetPacket() + + if !u.fullcone { + //如果不是fullcone, 则我们需要限时关闭 + + u.conn.SetReadDeadline(time.Now().Add(UDP_timeout)) + } + + n, ad, err := u.conn.ReadFromUDP(bs) + if err != nil { + return nil, Addr{}, err + } + if !u.fullcone { + //既然读到了, 那么就取消限时 + u.conn.SetReadDeadline(time.Time{}) + } + + return bs[:n], NewAddrFromUDPAddr(ad), nil +} + +func (u *UDPMsgConnWrapper) WriteTo(bs []byte, raddr Addr) error { + + if !u.fullcone && !u.IsServer { + //非fullcone时, 强制 symmetryc, 对每个远程地址 都使用一个 对应的新laddr + + thishash := raddr.GetHashable() + + if len(u.symmetricMap) == 0 { + + _, err := u.conn.WriteTo(bs, raddr.ToUDPAddr()) + if err == nil { + u.symmetricMapMutex.Lock() + u.symmetricMap[thishash] = u.conn + u.symmetricMapMutex.Unlock() + } + return err + } + + u.symmetricMapMutex.RLock() + theConn := u.symmetricMap[thishash] + u.symmetricMapMutex.RUnlock() + + if theConn == nil { + var e error + theConn, e = net.ListenUDP("udp", nil) + if e != nil { + return e + } + + u.symmetricMapMutex.Lock() + u.symmetricMap[thishash] = theConn + u.symmetricMapMutex.Unlock() + } + + _, err := theConn.WriteTo(bs, raddr.ToUDPAddr()) + return err + + } else { + _, err := u.conn.WriteTo(bs, raddr.ToUDPAddr()) + return err + + } +} + +func (u *UDPMsgConnWrapper) CloseConnWithRaddr(raddr Addr) error { + if !u.IsServer { + if u.fullcone { + u.conn.SetReadDeadline(time.Now()) } else { + u.symmetricMapMutex.Lock() - newConn, err := net.DialUDP("udp", nil, &raddr) - if err != nil { + thehash := raddr.GetHashable() + theConn := u.symmetricMap[thehash] + + if theConn != nil { + delete(u.symmetricMap, thehash) + theConn.Close() - break } - _, err = newConn.Write(requestData) - if err != nil { - break - } - - first_cs := &connState{ - conn: newConn, - raddrMap: make(map[string]bool), - } - first_cs.raddrMap[first_raddrStr] = true - - mutex.Lock() - dialedUDPConnMap[first_raddrStr] = first_cs - mutex.Unlock() - - //监听所有发往 newConn的 远程任意主机 发来的消息。 - go func(thisconn *net.UDPConn, supposedRemoteAddr net.UDPAddr) { - bs := make([]byte, MaxUDP_packetLen) - for { - thisconn.SetDeadline(time.Now().Add(UDP_timeout)) - - //log.Println("redirect udp, start read", supposedRemoteAddr) - n, raddr, err := thisconn.ReadFromUDP(bs) - if err != nil { - - //timeout后,就会删掉第一个拨号的raddr,以及因为fullcone而产生的其它raddr - //然后关闭此udp端口 - - mutex.Lock() - - delete(dialedUDPConnMap, first_raddrStr) - - for anotherRaddr := range first_cs.raddrMap { - delete(dialedUDPConnMap, anotherRaddr) - } - mutex.Unlock() - - thisconn.Close() - break - } - - // 这个远程 地址 无论是新的还是旧的, 都是要 和 newConn关联的,下一次向 这个远程地址发消息时,也要用 newConn来发,而不是新dial一个。 - - hasThisRaddr := false - this_raddr_str := raddr.String() - mutex.RLock() - _, hasThisRaddr = dialedUDPConnMap[this_raddr_str] - mutex.RUnlock() - - if !hasThisRaddr { - - mutex.Lock() - dialedUDPConnMap[this_raddr_str] = first_cs - first_cs.raddrMap[this_raddr_str] = true - mutex.Unlock() - } - - //log.Println("redirect udp, will write to extractor", string(bs[:n])) - - err = extractor.WriteUDPResponse(*raddr, bs[:n]) - if err != nil { - break - } - - } - }(newConn, raddr) + u.symmetricMapMutex.Unlock() } - } - + return nil +} + +func (u *UDPMsgConnWrapper) Close() error { + if !u.IsServer && u.fullcone { + //Close一般只用于关闭客户端、非fullcone的情况, 因为只有这种情况下,才会有 一个 u仅与一个 raddr对话 的清醒. + + return u.conn.Close() + } else { + return nil + } } diff --git a/netLayer/udp_test.go b/netLayer/udp_test.go new file mode 100644 index 0000000..31f9ef1 --- /dev/null +++ b/netLayer/udp_test.go @@ -0,0 +1,24 @@ +package netLayer + +import ( + "net" + "testing" + "time" + + "github.com/hahahrfool/v2ray_simple/utils" +) + +func TestUDP(t *testing.T) { + //测试setdeadline的情况. 实测证明 SetReadDeadline 在Read过程中也可以使用, 这样就可以防止阻塞 + + laddr, _ := net.ResolveUDPAddr("udp", ":"+RandPortStr()) + + udpConn, _ := net.ListenUDP("udp", laddr) + + go func() { + time.Sleep(time.Second) + udpConn.SetReadDeadline(time.Now()) + }() + udpConn.ReadFrom(utils.GetPacket()) + t.Log("ok") +} diff --git a/proxy/config.go b/proxy/config.go index 2394b61..addbf0c 100644 --- a/proxy/config.go +++ b/proxy/config.go @@ -84,5 +84,6 @@ type ListenConf struct { // CommonConf.Host , CommonConf.IP, CommonConf.Port 为拨号地址与端口 type DialConf struct { CommonConf - Utls bool `toml:"utls"` + Utls bool `toml:"utls"` + Fullcone bool `toml:"fullcone"` //在direct会用到, fullcone的话因为不能关闭udp连接, 所以可能会导致too many open files. fullcone 的话一般人是用不到的, 所以 有需要的人自行手动打开 即可 } diff --git a/proxy/direct/client.go b/proxy/direct/client.go index 0440a1b..cd6217f 100644 --- a/proxy/direct/client.go +++ b/proxy/direct/client.go @@ -19,21 +19,20 @@ func init() { //实现了 proxy.Client, netLayer.UDP_Putter_Generator type Client struct { proxy.ProxyCommonStruct + isfullcone bool } type ClientCreator struct{} -func NewClient() (proxy.Client, error) { +func (_ ClientCreator) NewClientFromURL(*url.URL) (proxy.Client, error) { d := &Client{} return d, nil } -func (_ ClientCreator) NewClientFromURL(*url.URL) (proxy.Client, error) { - return NewClient() -} - -func (_ ClientCreator) NewClient(*proxy.DialConf) (proxy.Client, error) { - return NewClient() +func (_ ClientCreator) NewClient(dc *proxy.DialConf) (proxy.Client, error) { + d := &Client{} + d.isfullcone = dc.Fullcone + return d, nil } func (d *Client) Name() string { return name } @@ -49,8 +48,8 @@ func (d *Client) Handshake(underlay net.Conn, target netLayer.Addr) (io.ReadWrit } -//direct的Client的 EstablishUDPChannel 实际上就是直接拨号udp +//direct的Client的 EstablishUDPChannel 实际上就是直接 监听一个udp端口。 func (d *Client) EstablishUDPChannel(_ net.Conn, target netLayer.Addr) (netLayer.MsgConn, error) { - conn, err := net.DialUDP("udp", nil, target.ToUDPAddr()) - return &netLayer.UDPMsgConnWrapper{UDPConn: conn, IsClient: true, FirstAddr: target}, err + + return netLayer.NewUDPMsgConnClientWrapper(nil, d.isfullcone, false), nil } diff --git a/proxy/dokodemo/server.go b/proxy/dokodemo/server.go index 497d643..31cced1 100644 --- a/proxy/dokodemo/server.go +++ b/proxy/dokodemo/server.go @@ -82,7 +82,11 @@ func NewServer() (proxy.Server, error) { } func (d *Server) Name() string { return name } -//因为dokodemo的单目标性质, 不会建立任何udp通道. func (s *Server) Handshake(underlay net.Conn) (io.ReadWriteCloser, netLayer.MsgConn, netLayer.Addr, error) { - return underlay, nil, s.targetAddr, nil + if s.targetAddr.IsUDP() { + return nil, netLayer.UniTargetMsgConn{Conn: underlay}, s.targetAddr, nil + } else { + return underlay, nil, s.targetAddr, nil + + } } diff --git a/proxy/socks5/server.go b/proxy/socks5/server.go index 9c01bb0..04572a8 100644 --- a/proxy/socks5/server.go +++ b/proxy/socks5/server.go @@ -217,6 +217,14 @@ type UDPConn struct { clientSupposedAddr *net.UDPAddr //客户端指定的客户端自己未来将使用的公网UDP的Addr } +func (u *UDPConn) CloseConnWithRaddr(raddr netLayer.Addr) error { + return u.Close() +} + +func (u *UDPConn) Fullcone() bool { + return true +} + //将远程地址发来的响应 传给客户端 func (u *UDPConn) WriteTo(bs []byte, raddr netLayer.Addr) error { diff --git a/proxy/socks5/udp_test.go b/proxy/socks5/udp_test.go index 939b433..752f5f4 100644 --- a/proxy/socks5/udp_test.go +++ b/proxy/socks5/udp_test.go @@ -27,7 +27,7 @@ func TestUDP(t *testing.T) { t.FailNow() } - direct, _ := direct.NewClient() + direct := &direct.Client{} go func() { for { diff --git a/proxy/trojan/udpConn.go b/proxy/trojan/udpConn.go index 787c703..9d69a9d 100644 --- a/proxy/trojan/udpConn.go +++ b/proxy/trojan/udpConn.go @@ -10,6 +10,12 @@ type UDPConn struct { net.Conn } +func (u UDPConn) Fullcone() bool { + return true +} +func (u UDPConn) CloseConnWithRaddr(raddr netLayer.Addr) error { + return u.Close() +} func (u UDPConn) ReadFrom() ([]byte, netLayer.Addr, error) { return nil, netLayer.Addr{}, nil diff --git a/proxy/vless/udpConn.go b/proxy/vless/udpConn.go index 6647de7..76cb8f1 100644 --- a/proxy/vless/udpConn.go +++ b/proxy/vless/udpConn.go @@ -24,6 +24,13 @@ type UDPConn struct { raddr netLayer.Addr } +func (u *UDPConn) CloseConnWithRaddr(raddr netLayer.Addr) error { + return u.Close() +} +func (u *UDPConn) Fullcone() bool { + return u.version != 0 +} + func (u *UDPConn) WriteTo(p []byte, raddr netLayer.Addr) error { //v0很垃圾,不支持fullcone,无视raddr,始终向最开始的raddr发送。 diff --git a/proxy/vless/vless_test.go b/proxy/vless/vless_test.go index 259e168..36e41ad 100644 --- a/proxy/vless/vless_test.go +++ b/proxy/vless/vless_test.go @@ -257,27 +257,22 @@ func testVLessUDP(version int, port string, t *testing.T) { t.Log("vless got wlc with right hello data") - rc, err := net.Dial("udp", remoteAddrStr) - if err != nil { - t.Logf("failed to connect FakeUDPServer : %v", err) - t.Fail() - return - } - t.Log("vless server dialed remote udp server", remoteAddrStr) na, _ := netLayer.NewAddr(remoteAddrStr) na.Network = "udp" - wrc := &netLayer.UDPMsgConnWrapper{UDPConn: rc.(*net.UDPConn), IsClient: true, FirstAddr: na} + wrc := netLayer.NewUDPMsgConnClientWrapper(nil, false, false) - _, err = rc.Write(bs) + err = wrc.WriteTo(bs, na) if err != nil { t.Logf("failed to write to FakeUDPServer : %v", err) t.Fail() return } - _, err = io.ReadFull(rc, bs) + + bs, _, err = wrc.ReadFrom() + if err != nil { t.Logf("failed io.ReadFull(rc, hello[:]) : %v", err) t.Fail() @@ -293,8 +288,7 @@ func testVLessUDP(version int, port string, t *testing.T) { // 之后转发所有流量,不再特定限制数据 netLayer.RelayUDP(wlc, wrc) - - t.Log("Copy End?!", err) + //t.Log("Copy End?!", ) }() } }() diff --git a/quic/quic.go b/quic/quic.go index 01e1f87..22eaff9 100644 --- a/quic/quic.go +++ b/quic/quic.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "net" + "reflect" "sync" "sync/atomic" "time" @@ -267,7 +268,13 @@ func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any } } else if previous != nil && c.knownServerMaxStreamCount == 0 { - ps := previous.(*sessionState) + ps, ok := previous.(*sessionState) + if !ok { + if ce := utils.CanLogDebug("QUIC: 'previous' parameter was given but with wrong type "); ce != nil { + ce.Write(zap.String("type", reflect.TypeOf(previous).String())) + } + return nil + } c.knownServerMaxStreamCount = ps.openedStreamCount @@ -312,7 +319,10 @@ func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any } func (c *Client) DialSubConn(thing any) (net.Conn, error) { - theState := thing.(*sessionState) + theState, ok := thing.(*sessionState) + if !ok { + return nil, utils.ErrNilOrWrongParameter + } stream, err := theState.OpenStream() if err != nil { diff --git a/utils/error.go b/utils/error.go index c4c888a..2d7f900 100644 --- a/utils/error.go +++ b/utils/error.go @@ -8,6 +8,9 @@ import ( ) var ErrNotImplemented = errors.New("not implemented") +var ErrNilParameter = errors.New("nil parameter") +var ErrNilOrWrongParameter = errors.New("nil or wrong parameter") +var ErrWrongParameter = errors.New("wrong parameter") //没啥特殊的 type NumErr struct { diff --git a/utils/utils.go b/utils/utils.go index 43b2bf2..0465d44 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -4,11 +4,19 @@ package utils import ( "errors" "flag" + "math/rand" "strings" + "time" "github.com/BurntSushi/toml" ) +func init() { + //保证我们随机种子每次都不一样, 这样可以保证go test中使用随机端口时, 不同的test会使用不同的端口, 防止端口冲突 + // 因为我们所有的包应该都引用了 utils包, 所以可以保证这一点. + rand.Seed(time.Now().Unix()) +} + func IsFlagPassed(name string) bool { found := false flag.Visit(func(f *flag.Flag) {