修订代码;添加MsgProducer和MsgConsumer接口

This commit is contained in:
e1732a364fed
2000-01-01 00:00:00 +00:00
parent 7a81617eed
commit d7aaeb481c
9 changed files with 509 additions and 309 deletions

17
main.go
View File

@@ -71,6 +71,9 @@ Use cases: refer to tcp_test.go, udp_test.go or cmd/verysimple.
non-blocking. closer used to stop listening. It means listening failed if closer == nil,
*/
func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.RoutingEnv) (closer io.Closer) {
var extraCloser io.Closer
//tproxy 和 shadowsocks 都用到了 SelfListen
if is, tcp, udp := inServer.SelfListen(); is {
var chantcp chan proxy.IncomeTCPInfo
var chanudp chan proxy.IncomeUDPInfo
@@ -106,7 +109,13 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.
}
closer = inServer.(proxy.ListenerServer).StartListen(chantcp, chanudp)
return
if tcp && udp {
return
} else {
extraCloser = closer
}
}
var handleHere bool
@@ -191,6 +200,9 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.
)
}
if extraCloser != nil {
closer = &utils.MultiCloser{Closers: []io.Closer{closer, extraCloser}}
}
} else {
if err != nil {
if ce := utils.CanLogErr("ListenSer failed"); ce != nil {
@@ -201,6 +213,9 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.
}
}
if extraCloser != nil {
closer = extraCloser
}
}
return
}

View File

@@ -488,6 +488,25 @@ func (a *Addr) IsUDP() bool {
return IsStrUDP_network(a.Network)
}
func (a *Addr) ToAddr() net.Addr {
n := a.Network
if n == "" {
n = "tcp"
}
switch StrToTransportProtocol(a.Network) {
case TCP:
return a.ToTCPAddr()
case UDP:
return a.ToUDPAddr()
case UNIX:
u, _ := net.ResolveUnixAddr("unix", a.Name)
return u
case IP:
return &net.IPAddr{IP: a.IP, Zone: a.Name}
}
return nil
}
// 如果a里只含有域名则会自动解析域名为IP。注意若出现错误则会返回nil
func (a *Addr) ToUDPAddr() *net.UDPAddr {
ua, err := net.ResolveUDPAddr("udp", a.String())

316
netLayer/msgconn.go Normal file
View File

@@ -0,0 +1,316 @@
package netLayer
import (
"errors"
"net"
"sync"
"time"
"github.com/e1732a364fed/v2ray_simple/utils"
"go.uber.org/zap"
)
// MsgConn一般用于 udp. 是一种类似 net.PacketConn 的包装.
// 实现 MsgConn接口 的类型 可以被用于 RelayUDP 进行转发。
//
// ReadMsgFrom直接返回数据, 这样可以尽量避免多次数据拷贝。
//
// 使用Addr是因为有可能请求地址是个域名而不是ip; 而且通过Addr, MsgConn实际上有可能支持通用的情况,
// 即可以用于 客户端 一会 请求tcp一会又请求udp一会又请求什么其它网络层 这种奇葩情况.
type MsgConn interface {
NetDeadliner
ReadMsgFrom() ([]byte, Addr, error)
WriteMsgTo([]byte, Addr) error
CloseConnWithRaddr(raddr Addr) error //关闭特定连接
Close() error //关闭所有连接
Fullcone() bool //若Fullcone, 则在转发因另一端关闭而结束后, RelayUDP函数不会Close它.
}
// symmetric, proxy/dokodemo 有用到. 实现 MsgConn
type UniTargetMsgConn struct {
net.Conn
Target Addr
}
func (u UniTargetMsgConn) Fullcone() bool {
return false
}
func (u UniTargetMsgConn) ReadMsgFrom() ([]byte, Addr, error) {
bs := utils.GetPacket()
n, err := u.Conn.Read(bs)
if err != nil {
return nil, Addr{}, err
}
return bs[:n], u.Target, err
}
func (u UniTargetMsgConn) WriteMsgTo(bs []byte, _ Addr) error {
_, err := u.Conn.Write(bs)
return err
}
func (u UniTargetMsgConn) CloseConnWithRaddr(raddr Addr) error {
return u.Conn.Close()
}
func (u UniTargetMsgConn) Close() error {
return u.Conn.Close()
}
// UDPMsgConn 实现 MsgConn 和 net.PacketConn。 可满足fullcone/symmetric. 在proxy/direct 被用到.
type UDPMsgConn struct {
*net.UDPConn
IsServer, fullcone, closed bool
symmetricMap map[HashableAddr]*net.UDPConn
symmetricMapMutex sync.RWMutex
symmetricMsgReadChan chan AddrData
}
// NewUDPMsgConn 创建一个 UDPMsgConn 并使用传入的 laddr 监听udp; 若未给出laddr, 将使用一个随机可用的端口监听.
// 如果是普通的单目标的客户端,用 (nil,false,false) 即可.
//
// 满足fullcone/symmetric, 由 fullcone 的值决定.
func NewUDPMsgConn(laddr *net.UDPAddr, fullcone bool, isserver bool, sockopt *Sockopt) (*UDPMsgConn, error) {
uc := new(UDPMsgConn)
var udpConn *net.UDPConn
var err error
if sockopt != nil {
if laddr == nil {
laddr = &net.UDPAddr{}
}
a := NewAddrFromUDPAddr(laddr)
pConn, e := a.ListenUDP_withOpt(sockopt)
if e != nil {
err = e
} else {
udpConn = pConn.(*net.UDPConn)
}
} else {
udpConn, err = net.ListenUDP("udp", laddr)
}
if err != nil {
return nil, err
}
udpConn.SetReadBuffer(MaxUDP_packetLen)
udpConn.SetWriteBuffer(MaxUDP_packetLen)
uc.UDPConn = udpConn
uc.fullcone = fullcone
uc.IsServer = isserver
if !fullcone {
uc.symmetricMap = make(map[HashableAddr]*net.UDPConn)
uc.symmetricMsgReadChan = make(chan AddrData, 50) //缓存大一点比较好. 假设有十个udp连接, 每一个都连续读了5个信息这样就会装满50个缓存了。
//我们暂时不把udpConn放入 symmetricMap 中而是等待第一次Write成功后再放入.
}
return uc, nil
}
func (u *UDPMsgConn) Fullcone() bool {
return u.fullcone
}
func (u *UDPMsgConn) readSymmetricMsgFromConn(conn *net.UDPConn, thishash HashableAddr) {
if ce := utils.CanLogDebug("readSymmetricMsgFromConn called"); ce != nil {
ce.Write(zap.String("addr", thishash.String()))
}
for {
bs := utils.GetPacket()
conn.SetReadDeadline(time.Now().Add(UDP_timeout))
n, ad, err := conn.ReadFromUDP(bs)
if err != nil || u.closed {
break
}
u.symmetricMsgReadChan <- AddrData{Data: bs[:n], Addr: NewAddrFromUDPAddr(ad)}
}
u.symmetricMapMutex.Lock()
delete(u.symmetricMap, thishash)
u.symmetricMapMutex.Unlock()
conn.Close()
}
func (u *UDPMsgConn) ReadMsgFrom() ([]byte, Addr, error) {
if u.fullcone {
bs := utils.GetPacket()
u.UDPConn.SetReadDeadline(time.Now().Add(UDP_fullcone_timeout))
n, ad, err := u.UDPConn.ReadFromUDP(bs)
if err != nil {
return nil, Addr{}, err
}
return bs[:n], NewAddrFromUDPAddr(ad), nil
} else {
ad, ok := <-u.symmetricMsgReadChan
if ok {
ad.Addr.Network = "udp"
return ad.Data, ad.Addr, nil
} else {
return nil, Addr{}, net.ErrClosed
}
}
}
func (u *UDPMsgConn) WriteMsgTo(bs []byte, raddr Addr) error {
var theConn *net.UDPConn
if !u.fullcone && !u.IsServer {
//非fullcone时, 强制 symmetric, 对每个远程地址 都使用一个 对应的新laddr
//UDPMsgConn 一般用于 direct此时 一定有 !u.IsServer 成立
thishash := raddr.GetHashable()
thishash.Network = "udp" //有可能调用者忘配置Network项.
if len(u.symmetricMap) == 0 {
_, err := u.UDPConn.WriteTo(bs, raddr.ToUDPAddr())
if err == nil {
u.symmetricMapMutex.Lock()
u.symmetricMap[thishash] = u.UDPConn
u.symmetricMapMutex.Unlock()
}
go u.readSymmetricMsgFromConn(u.UDPConn, thishash)
return err
}
u.symmetricMapMutex.RLock()
theConn = u.symmetricMap[thishash]
u.symmetricMapMutex.RUnlock()
if theConn == nil {
var e error
theConn, e = net.ListenUDP("udp", nil)
if e != nil {
return e
}
u.symmetricMapMutex.Lock()
u.symmetricMap[thishash] = theConn
u.symmetricMapMutex.Unlock()
go u.readSymmetricMsgFromConn(theConn, thishash)
}
} else {
theConn = u.UDPConn
}
_, err := theConn.WriteTo(bs, raddr.ToUDPAddr())
return err
}
func (u *UDPMsgConn) CloseConnWithRaddr(raddr Addr) error {
if !u.IsServer {
if u.fullcone {
//u.UDPConn.SetReadDeadline(time.Now())
} else {
u.symmetricMapMutex.Lock()
thehash := raddr.GetHashable()
theConn := u.symmetricMap[thehash]
if theConn != nil {
delete(u.symmetricMap, thehash)
theConn.Close()
}
u.symmetricMapMutex.Unlock()
}
}
return nil
}
func (u *UDPMsgConn) Close() error {
if !u.closed {
u.closed = true
if !u.fullcone {
close(u.symmetricMsgReadChan)
}
return u.UDPConn.Close()
}
return nil
}
// 实现 net.PacketConn
func (uc *UDPMsgConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
var bs []byte
var a Addr
bs, a, err = uc.ReadMsgFrom()
if err == nil {
n = copy(p, bs)
addr = a.ToUDPAddr()
}
return
}
// 实现 net.PacketConn
func (uc *UDPMsgConn) WriteTo(p []byte, raddr net.Addr) (n int, err error) {
if ua, ok := raddr.(*net.UDPAddr); ok {
err = uc.WriteMsgTo(p, NewAddrFromUDPAddr(ua))
if err == nil {
n = len(p)
}
} else {
err = errors.New("UDPMsgConn.WriteTo, raddr can't cast to *net.UDPAddr")
}
return
}
// Wraps net.PacketConn and implements MsgConn
type MsgConnForPacketConn struct {
net.PacketConn
}
func (mc *MsgConnForPacketConn) ReadMsgFrom() ([]byte, Addr, error) {
bs := utils.GetPacket()
n, addr, err := mc.ReadFrom(bs)
if err != nil {
return nil, Addr{}, err
}
a, err := NewAddrFromAny(addr)
if err != nil {
return nil, Addr{}, err
}
return bs[:n], a, nil
}
func (mc *MsgConnForPacketConn) WriteMsgTo(p []byte, a Addr) error {
_, err := mc.WriteTo(p, a.ToAddr())
return err
}
func (mc *MsgConnForPacketConn) CloseConnWithRaddr(raddr Addr) error {
return mc.PacketConn.Close()
}
func (mc *MsgConnForPacketConn) Close() error {
return mc.PacketConn.Close()
}
func (mc *MsgConnForPacketConn) Fullcone() bool {
return true
}

View File

@@ -1,8 +1,6 @@
package netLayer
import (
"errors"
"net"
"sync"
"sync/atomic"
"time"
@@ -11,6 +9,8 @@ import (
"go.uber.org/zap"
)
//本文件内含 转发 udp 数据的方法
const (
MaxUDP_packetLen = 64 * 1024 // 关于 udp包数据长度可参考 https://cloud.tencent.com/developer/article/1021196
@@ -32,25 +32,6 @@ var (
UDP_fullcone_timeout = time.Minute * 30
)
//本文件内含 一些 转发 udp 数据的 接口与方法
// MsgConn一般用于 udp. 是一种类似 net.PacketConn 的包装.
// 实现 MsgConn接口 的类型 可以被用于 RelayUDP 进行转发。
//
// ReadMsgFrom直接返回数据, 这样可以尽量避免多次数据拷贝。
//
// 使用Addr是因为有可能请求地址是个域名而不是ip; 而且通过Addr, MsgConn实际上有可能支持通用的情况,
// 即可以用于 客户端 一会 请求tcp一会又请求udp一会又请求什么其它网络层 这种奇葩情况.
type MsgConn interface {
NetDeadliner
ReadMsgFrom() ([]byte, Addr, error)
WriteMsgTo([]byte, Addr) error
CloseConnWithRaddr(raddr Addr) error //关闭特定连接
Close() error //关闭所有连接
Fullcone() bool //若Fullcone, 则在转发因另一端关闭而结束后, RelayUDP函数不会Close它.
}
/*
udp是无连接的所以需要考虑超时问题。
在转发udp时, 有可能有多种情况:
@@ -316,257 +297,35 @@ func RelayUDP_separate(rc, lc MsgConn, firstAddr *Addr, downloadByteCount, uploa
return count2
}
// symmetric, proxy/dokodemo 有用到. 实现 MsgConn
type UniTargetMsgConn struct {
net.Conn
Target Addr
}
/*
relay udp 有两种针对不同通道的技术
func (u UniTargetMsgConn) Fullcone() bool {
return false
}
一种是针对单来源通道的技术通常是udp in tcp的情况此时我们用MsgConn + RelayUDP 方法很方便
func (u UniTargetMsgConn) ReadMsgFrom() ([]byte, Addr, error) {
bs := utils.GetPacket()
另一种是直接读udp的技术目前我们用很多代码来适配它也能包装MsgConn里但是非常不美观繁琐。
因为udp是多来源的我们为了确定单一来源就要全局读然后定义一个map, 为每一个来源的地址存储一个MsgConn
n, err := u.Conn.Read(bs)
if err != nil {
return nil, Addr{}, err
}
return bs[:n], u.Target, err
}
我们重新定义一个MsgProducer 和 MsgConsumer就方便很多. 这是针对多来源的转发。
func (u UniTargetMsgConn) WriteMsgTo(bs []byte, _ Addr) error {
_, err := u.Conn.Write(bs)
return err
}
func (u UniTargetMsgConn) CloseConnWithRaddr(raddr Addr) error {
return u.Conn.Close()
}
func (u UniTargetMsgConn) Close() error {
return u.Conn.Close()
}
// UDPMsgConn 实现 MsgConn 和 net.PacketConn。 可满足fullcone/symmetric. 在proxy/direct 被用到.
type UDPMsgConn struct {
*net.UDPConn
IsServer, fullcone, closed bool
symmetricMap map[HashableAddr]*net.UDPConn
symmetricMapMutex sync.RWMutex
symmetricMsgReadChan chan AddrData
}
// NewUDPMsgConn 创建一个 UDPMsgConn 并使用传入的 laddr 监听udp; 若未给出laddr, 将使用一个随机可用的端口监听.
// 如果是普通的单目标的客户端,用 (nil,false,false) 即可.
//
// 满足fullcone/symmetric, 由 fullcone 的值决定.
func NewUDPMsgConn(laddr *net.UDPAddr, fullcone bool, isserver bool, sockopt *Sockopt) (*UDPMsgConn, error) {
uc := new(UDPMsgConn)
var udpConn *net.UDPConn
var err error
if sockopt != nil {
if laddr == nil {
laddr = &net.UDPAddr{}
}
a := NewAddrFromUDPAddr(laddr)
pConn, e := a.ListenUDP_withOpt(sockopt)
if e != nil {
err = e
} else {
udpConn = pConn.(*net.UDPConn)
}
} else {
udpConn, err = net.ListenUDP("udp", laddr)
}
if err != nil {
return nil, err
}
udpConn.SetReadBuffer(MaxUDP_packetLen)
udpConn.SetWriteBuffer(MaxUDP_packetLen)
uc.UDPConn = udpConn
uc.fullcone = fullcone
uc.IsServer = isserver
if !fullcone {
uc.symmetricMap = make(map[HashableAddr]*net.UDPConn)
uc.symmetricMsgReadChan = make(chan AddrData, 50) //缓存大一点比较好. 假设有十个udp连接, 每一个都连续读了5个信息这样就会装满50个缓存了。
//我们暂时不把udpConn放入 symmetricMap 中而是等待第一次Write成功后再放入.
}
return uc, nil
}
func (u *UDPMsgConn) Fullcone() bool {
return u.fullcone
}
func (u *UDPMsgConn) readSymmetricMsgFromConn(conn *net.UDPConn, thishash HashableAddr) {
if ce := utils.CanLogDebug("readSymmetricMsgFromConn called"); ce != nil {
ce.Write(zap.String("addr", thishash.String()))
}
如此,一个 UDPConn就相当于一个 MsgProducer, 它的to 可以由 tproxy 或者 msg内部的数据提取出来
*/
func RelayMsgP2C(p MsgProducer, c MsgConsumer) {
for {
bs := utils.GetPacket()
conn.SetReadDeadline(time.Now().Add(UDP_timeout))
n, ad, err := conn.ReadFromUDP(bs)
if err != nil || u.closed {
break
}
u.symmetricMsgReadChan <- AddrData{Data: bs[:n], Addr: NewAddrFromUDPAddr(ad)}
}
u.symmetricMapMutex.Lock()
delete(u.symmetricMap, thishash)
u.symmetricMapMutex.Unlock()
conn.Close()
}
func (u *UDPMsgConn) ReadMsgFrom() ([]byte, Addr, error) {
if u.fullcone {
bs := utils.GetPacket()
u.UDPConn.SetReadDeadline(time.Now().Add(UDP_fullcone_timeout))
n, ad, err := u.UDPConn.ReadFromUDP(bs)
msg, from, to, err := p.ProduceMsg()
if err != nil {
return nil, Addr{}, err
return
}
return bs[:n], NewAddrFromUDPAddr(ad), nil
} else {
ad, ok := <-u.symmetricMsgReadChan
if ok {
ad.Addr.Network = "udp"
return ad.Data, ad.Addr, nil
} else {
return nil, Addr{}, net.ErrClosed
err = c.ConsumeMsg(msg, from, to)
if err != nil {
return
}
}
}
func (u *UDPMsgConn) WriteMsgTo(bs []byte, raddr Addr) error {
var theConn *net.UDPConn
if !u.fullcone && !u.IsServer {
//非fullcone时, 强制 symmetric, 对每个远程地址 都使用一个 对应的新laddr
//UDPMsgConn 一般用于 direct此时 一定有 !u.IsServer 成立
thishash := raddr.GetHashable()
thishash.Network = "udp" //有可能调用者忘配置Network项.
if len(u.symmetricMap) == 0 {
_, err := u.UDPConn.WriteTo(bs, raddr.ToUDPAddr())
if err == nil {
u.symmetricMapMutex.Lock()
u.symmetricMap[thishash] = u.UDPConn
u.symmetricMapMutex.Unlock()
}
go u.readSymmetricMsgFromConn(u.UDPConn, thishash)
return err
}
u.symmetricMapMutex.RLock()
theConn = u.symmetricMap[thishash]
u.symmetricMapMutex.RUnlock()
if theConn == nil {
var e error
theConn, e = net.ListenUDP("udp", nil)
if e != nil {
return e
}
u.symmetricMapMutex.Lock()
u.symmetricMap[thishash] = theConn
u.symmetricMapMutex.Unlock()
go u.readSymmetricMsgFromConn(theConn, thishash)
}
} else {
theConn = u.UDPConn
}
_, err := theConn.WriteTo(bs, raddr.ToUDPAddr())
return err
type MsgProducer interface {
ProduceMsg() (msg []byte, from, to Addr, err error)
}
func (u *UDPMsgConn) CloseConnWithRaddr(raddr Addr) error {
if !u.IsServer {
if u.fullcone {
//u.UDPConn.SetReadDeadline(time.Now())
} else {
u.symmetricMapMutex.Lock()
thehash := raddr.GetHashable()
theConn := u.symmetricMap[thehash]
if theConn != nil {
delete(u.symmetricMap, thehash)
theConn.Close()
}
u.symmetricMapMutex.Unlock()
}
}
return nil
}
func (u *UDPMsgConn) Close() error {
if !u.closed {
u.closed = true
if !u.fullcone {
close(u.symmetricMsgReadChan)
}
return u.UDPConn.Close()
}
return nil
}
// 实现 net.PacketConn
func (uc *UDPMsgConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
var bs []byte
var a Addr
bs, a, err = uc.ReadMsgFrom()
if err == nil {
n = copy(p, bs)
addr = a.ToUDPAddr()
}
return
}
// 实现 net.PacketConn
func (uc *UDPMsgConn) WriteTo(p []byte, raddr net.Addr) (n int, err error) {
if ua, ok := raddr.(*net.UDPAddr); ok {
err = uc.WriteMsgTo(p, NewAddrFromUDPAddr(ua))
if err == nil {
n = len(p)
}
} else {
err = errors.New("UDPMsgConn.WriteTo, raddr can't cast to *net.UDPAddr")
}
return
type MsgConsumer interface {
ConsumeMsg(msg []byte, from, to Addr) (err error)
}

View File

@@ -109,7 +109,6 @@ func (c *Client) EstablishUDPChannel(underlay net.Conn, firstPayload []byte, tar
mc = &shadowUDPPacketConn{
PacketConn: pc,
raddr: underlay.RemoteAddr(),
taddr: target.ToUDPAddr(),
}
if firstPayload != nil {

View File

@@ -5,6 +5,7 @@ import (
"io"
"net"
"net/url"
"sync"
"github.com/e1732a364fed/v2ray_simple/netLayer"
"github.com/e1732a364fed/v2ray_simple/proxy"
@@ -54,44 +55,15 @@ func (ServerCreator) URLToListenConf(u *url.URL, lc *proxy.ListenConf, format in
}
// loop read udp
// func (ServerCreator) AfterCommonConfServer(s proxy.Server) {
// if s.Network() == "udp" || s.Network() == netLayer.DualNetworkName {
// ss := s.(*Server)
// uc, err := net.ListenUDP("udp", ss.LUA)
// if err != nil {
// log.Panicln("shadowsocks listen udp failed", err)
// }
// pc := ss.cipher.PacketConn(uc)
// for {
// buf := utils.GetPacket()
// defer utils.PutPacket(buf)
// n, saddr, err := pc.ReadFrom(buf)
// if err != nil {
// if ce := utils.CanLogErr("shadowsocks read udp failed"); ce != nil {
// ce.Write(zap.Error(err))
// }
// return
// }
// r := bytes.NewBuffer(buf[:n])
// taddr, err := GetAddrFrom(r)
// if err != nil {
// if ce := utils.CanLogErr("shadowsocks GetAddrFrom failed"); ce != nil {
// ce.Write(zap.Error(err))
// }
// return
// }
// }
// }
// }
type Server struct {
proxy.Base
*utils.MultiUserMap
cipher core.Cipher
m sync.RWMutex
udpMsgConnMap map[netLayer.HashableAddr]*shadowUDPPacketConn
}
func newServer(info MethodPass, lc *proxy.ListenConf) *Server {
@@ -105,6 +77,12 @@ func (*Server) Name() string {
return Name
}
func (s *Server) SelfListen() (is, tcp, udp bool) {
udp = true
is = true
return
}
func (s *Server) Handshake(underlay net.Conn) (result net.Conn, msgConn netLayer.MsgConn, targetAddr netLayer.Addr, returnErr error) {
result = s.cipher.StreamConn(underlay)
readbs := utils.GetBytes(utils.MTU)
@@ -146,3 +124,65 @@ realPart:
return
}
func (ss *Server) StartListen(_ chan<- proxy.IncomeTCPInfo, udpInfoChan chan<- proxy.IncomeUDPInfo) io.Closer {
// uc, err := net.ListenUDP("udp", ss.LUA)
// if err != nil {
// log.Panicln("shadowsocks listen udp failed", err)
// }
// pc := ss.cipher.PacketConn(uc)
// sp := &shadowUDPPacketConn{
// PacketConn: pc,
// }
go func() {
// for {
// bs := utils.GetPacket()
// n, addr, err := pc.ReadFrom(bs)
// if err != nil {
// return
// }
// ad, err := netLayer.NewAddrFromAny(addr)
// if err != nil {
// return
// }
// hash := ad.GetHashable()
// ss.m.RLock()
// conn, found := ss.udpMsgConnMap[hash]
// ss.m.RUnlock()
// if !found {
// conn = &shadowUDPPacketConn{
// ourSrcAddr: src,
// readChan: make(chan netLayer.AddrData, 5),
// closeChan: make(chan struct{}),
// parentMachine: m,
// hash: hash,
// }
// conn.InitEasyDeadline()
// m.Lock()
// m.udpMsgConnMap[hash] = conn
// m.Unlock()
// }
// destAddr := netLayer.NewAddrFromUDPAddr(dst)
// conn.readChan <- netLayer.AddrData{Data: bs[:n], Addr: destAddr}
// if !found {
// return conn, destAddr, nil
// }
// }
}()
return nil
}

View File

@@ -11,7 +11,6 @@ import (
type shadowUDPPacketConn struct {
net.PacketConn
raddr net.Addr
taddr net.Addr
handshakeBuf *bytes.Buffer
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
)
var (
@@ -23,7 +24,7 @@ var (
type InvalidDataErr string
//return err == e || err == ErrInvalidData
// return err == e || err == ErrInvalidData
func (e InvalidDataErr) Is(err error) bool {
return err == e || err == ErrInvalidData
}
@@ -32,7 +33,7 @@ func (e InvalidDataErr) Error() string {
return string(e)
}
//nothing special. Normally, N==0 means no error
// nothing special. Normally, N==0 means no error
type NumErr struct {
N int
E error
@@ -52,7 +53,7 @@ func (ef NumErr) Unwarp() error {
return ef.E
}
//nothing special
// nothing special
type NumStrErr struct {
N int
Prefix string
@@ -63,7 +64,7 @@ func (ne NumStrErr) Error() string {
return ne.Prefix + strconv.Itoa(ne.N)
}
//an err with a buffer, nothing special
// an err with a buffer, nothing special
type ErrBuffer struct {
Err error
Buf *bytes.Buffer
@@ -141,3 +142,34 @@ func (e ErrInErr) String() string {
return e.ErrDesc
}
type Errs struct {
List []ErrsItem
}
type ErrsItem struct {
Index int
E error
}
func (ee *Errs) Add(e ErrsItem) {
ee.List = append(ee.List, e)
}
func (e Errs) OK() bool {
return len(e.List) == 0
}
func (e Errs) String() string {
var sb strings.Builder
for _, err := range e.List {
sb.WriteString(strconv.Itoa(err.Index))
sb.WriteString(", ")
sb.WriteString(err.E.Error())
sb.WriteString("\n")
}
return sb.String()
}
func (e Errs) Error() string {
return e.String()
}

View File

@@ -5,7 +5,7 @@ import (
"sync"
)
//一种简单的读写组合, 在ws包中被用到.
// 一种简单的读写组合, 在ws包中被用到.
type RW struct {
io.Reader
io.Writer
@@ -30,7 +30,7 @@ type ByteWriter interface {
Write(p []byte) (n int, err error)
}
//optionally read from OptionalReader
// optionally read from OptionalReader
type ReadWrapper struct {
io.Reader
OptionalReader io.Reader
@@ -62,7 +62,7 @@ type DummyReadCloser struct {
}
// ReadCount -= 1 at each call.
//if ReadCount<0, return 0, io.EOF
// if ReadCount<0, return 0, io.EOF
func (d *DummyReadCloser) Read(p []byte) (int, error) {
d.ReadCount -= 1
//log.Println("read called", d.ReadCount)
@@ -74,7 +74,7 @@ func (d *DummyReadCloser) Read(p []byte) (int, error) {
}
}
//return nil
// return nil
func (DummyReadCloser) Close() error {
return nil
}
@@ -84,7 +84,7 @@ type DummyWriteCloser struct {
}
// WriteCount -= 1 at each call.
//if WriteCount<0, return 0, io.EOF
// if WriteCount<0, return 0, io.EOF
func (d *DummyWriteCloser) Write(p []byte) (int, error) {
d.WriteCount -= 1
//log.Println("write called", d.WriteCount)
@@ -97,12 +97,12 @@ func (d *DummyWriteCloser) Write(p []byte) (int, error) {
}
}
//return nil
// return nil
func (DummyWriteCloser) Close() error {
return nil
}
//先从Old读若SwitchChan被关闭, 立刻改为从New读
// 先从Old读若SwitchChan被关闭, 立刻改为从New读
type ReadSwitcher struct {
Old, New io.Reader //non-nil
SwitchChan chan struct{} //non-nil
@@ -154,7 +154,7 @@ func (d *ReadSwitcher) Close() error {
return nil
}
//先向Old写若SwitchChan被关闭, 改向New写
// 先向Old写若SwitchChan被关闭, 改向New写
type WriteSwitcher struct {
Old, New io.Writer //non-nil
SwitchChan chan struct{} //non-nil
@@ -187,7 +187,7 @@ func (d *WriteSwitcher) Close() error {
return nil
}
//simple structure that send a signal by chan when Close called.
// simple structure that send a signal by chan when Close called.
type ChanCloser struct {
closeChan chan struct{}
once sync.Once
@@ -206,3 +206,24 @@ func (cc *ChanCloser) Close() error {
})
return nil
}
type MultiCloser struct {
Closers []io.Closer
sync.Once
}
func (cc *MultiCloser) Close() (result error) {
cc.Once.Do(func() {
var es Errs
for i, c := range cc.Closers {
e := c.Close()
if e != nil {
es.Add(ErrsItem{Index: i, E: e})
}
}
if !es.OK() {
result = es
}
})
return
}