diff --git a/proxy/vmess/anti_replay.go b/proxy/vmess/anti_replay.go index 63f3257..32f3267 100644 --- a/proxy/vmess/anti_replay.go +++ b/proxy/vmess/anti_replay.go @@ -5,11 +5,16 @@ import ( "time" ) -//authid 120 second anti replay. -const authid_antiReplyDuration = time.Second * 120 +const ( + //authid 120 seconds anti replay. + authid_antiReplyDuration = time.Second * 120 + + //在v2ray的代码中找到的。暂未找到对应的 “3分钟内不重复” 的文档。先照葫芦画瓢,到时候再追究。 + sessionAntiReplayDuration = time.Minute * 3 +) /* - 我们用自己的代码 实现 authid 防重放 机制. 不用v2ray的代码实现。 + 我们用map的方式 实现 authid 防重放 机制. 不用v2ray的代码实现。 v2ray中 “使用 两个filter,每隔120秒swap一次filter” 的方式,感觉不够严谨。第121~240秒的话, 实际上 第一个 filter 仍然会存储之前所有数据,而只是第二个filter被重置了,导致 第一个filter是防240秒重放。 @@ -17,26 +22,26 @@ const authid_antiReplyDuration = time.Second * 120 因为这样的话,实际上把时间按120秒分块了,如果一个id在 第 1秒被使用,然后在第122秒被重新使用,按v2ray的实现, 依然会被认定为重放攻击。 - 我们只要和 sessionHistory的方式一样,存储过期时间,然后定时清理 即可。 + 我们只要用 v2ray的 sessionHistory的方式,存储过期时间,然后定时清理 即可。 */ -type anitReplayMachine struct { +type authid_antiReplayMachine struct { sync.RWMutex - antiReplyMap map[[16]byte]time.Time //key: authid, value: expireTime + authidMap map[[16]byte]time.Time //key: authid, value: expireTime ticker *time.Ticker stopChan chan struct{} closed bool } -func newAntiReplyMachine() *anitReplayMachine { - arm := &anitReplayMachine{ - antiReplyMap: make(map[[16]byte]time.Time), - ticker: time.NewTicker(authid_antiReplyDuration * 2), - stopChan: make(chan struct{}), +func newAuthIDAntiReplyMachine() *authid_antiReplayMachine { + arm := &authid_antiReplayMachine{ + authidMap: make(map[[16]byte]time.Time), + ticker: time.NewTicker(authid_antiReplyDuration * 2), + stopChan: make(chan struct{}), } //定时清理过时数据,避免缓存无限增长 - go func(a *anitReplayMachine) { + go func(a *authid_antiReplayMachine) { for { select { @@ -44,9 +49,9 @@ func newAntiReplyMachine() *anitReplayMachine { return case now := <-a.ticker.C: a.Lock() - for authid, expireTime := range a.antiReplyMap { + for authid, expireTime := range a.authidMap { if expireTime.Before(now) { - delete(a.antiReplyMap, authid) + delete(a.authidMap, authid) } } a.Unlock() @@ -56,7 +61,7 @@ func newAntiReplyMachine() *anitReplayMachine { return arm } -func (arm *anitReplayMachine) stop() { +func (arm *authid_antiReplayMachine) stop() { arm.Lock() defer arm.Unlock() @@ -69,22 +74,22 @@ func (arm *anitReplayMachine) stop() { } -func (arm *anitReplayMachine) check(authid [16]byte) (ok bool) { +func (arm *authid_antiReplayMachine) check(authid [16]byte) (ok bool) { now := time.Now() arm.RLock() - expireTime, has := arm.antiReplyMap[authid] + expireTime, has := arm.authidMap[authid] arm.RUnlock() if !has { arm.Lock() - arm.antiReplyMap[authid] = now.Add(authid_antiReplyDuration) + arm.authidMap[authid] = now.Add(authid_antiReplyDuration) arm.Unlock() return true } if expireTime.Before(now) { arm.Lock() - arm.antiReplyMap[authid] = now.Add(authid_antiReplyDuration) + arm.authidMap[authid] = now.Add(authid_antiReplyDuration) arm.Unlock() return true @@ -97,53 +102,86 @@ type sessionID struct { key [16]byte nonce [16]byte } -type sessionHistory struct { +type session_antiReplayMachine struct { sync.RWMutex - cache map[sessionID]time.Time + sessionMap map[sessionID]time.Time + + ticker *time.Ticker + stopChan chan struct{} + closed bool } -func NewSessionHistory() *sessionHistory { - h := &sessionHistory{} +func newSessionAntiReplayMachine() *session_antiReplayMachine { + h := &session_antiReplayMachine{ + ticker: time.NewTicker(sessionAntiReplayDuration * 2), + stopChan: make(chan struct{}), + } h.initCache() + //定时清理过时数据,避免缓存无限增长 + go func(sh *session_antiReplayMachine) { + + for { + select { + case <-sh.stopChan: + return + case now := <-sh.ticker.C: + sh.Lock() + sh.removeExpiredEntries(now) + sh.Unlock() + } + } + }(h) + return h } -func (h *sessionHistory) initCache() { - h.cache = make(map[sessionID]time.Time, 128) +func (sh *session_antiReplayMachine) stop() { + sh.Lock() + defer sh.Unlock() + + if sh.closed { + return + } + sh.closed = true + close(sh.stopChan) + sh.ticker.Stop() + } -func (h *sessionHistory) addIfNotExits(session sessionID) bool { +func (h *session_antiReplayMachine) initCache() { + h.sessionMap = make(map[sessionID]time.Time, 128) +} + +func (h *session_antiReplayMachine) check(session sessionID) bool { h.Lock() now := time.Now() - h.removeExpiredEntries(now) - - if expire, found := h.cache[session]; found && expire.After(now) { + if expire, found := h.sessionMap[session]; found && expire.After(now) { h.Unlock() return false } - h.cache[session] = time.Now().Add(time.Minute * 3) //在v2ray的代码中找到的。暂未找到对应的 “3分钟内不重复” 的文档。 + h.sessionMap[session] = now.Add(sessionAntiReplayDuration) h.Unlock() return true } -func (h *sessionHistory) removeExpiredEntries(now time.Time) { +func (h *session_antiReplayMachine) removeExpiredEntries(now time.Time) { - if len(h.cache) == 0 { + if len(h.sessionMap) == 0 { return } - for session, expire := range h.cache { + for session, expire := range h.sessionMap { if expire.Before(now) { - delete(h.cache, session) + delete(h.sessionMap, session) } } - if len(h.cache) == 0 { + if len(h.sessionMap) == 0 { h.initCache() //这是为了回收内存。 } diff --git a/proxy/vmess/client.go b/proxy/vmess/client.go index ecdfe10..8df7130 100644 --- a/proxy/vmess/client.go +++ b/proxy/vmess/client.go @@ -379,16 +379,17 @@ func (c *ClientConn) Write(b []byte) (n int, err error) { } } - n, err = c.dataWriter.Write(b) - if len(b) != 0 { + if len(b) > 0 { + n, err = c.dataWriter.Write(b) close(switchChan) c.vmessout = nil + if err != nil { + return + } + _, err = c.Conn.Write(outBuf.Bytes()) } - if err != nil { - return - } - _, err = c.Conn.Write(outBuf.Bytes()) + return } diff --git a/proxy/vmess/header.go b/proxy/vmess/header.go index 6a918c4..cae2a39 100644 --- a/proxy/vmess/header.go +++ b/proxy/vmess/header.go @@ -90,7 +90,7 @@ func generateCipherByV2rayUser(u utils.V2rayUser) (cipher.Block, error) { } //为0表示匹配成功, 如果不为0,则匹配失败;若为1,则CRC 校验失败(正常地匹配失败,不意味着被攻击); 若为2,则表明校验成功 但是 时间差距超过 authID_timeMaxSecondGap 秒,如果为3,则表明遇到了重放攻击。 -func tryMatchAuthIDByBlock(now int64, block cipher.Block, encrypted_authID [16]byte, anitReplayMachine *anitReplayMachine) (failReason int) { +func tryMatchAuthIDByBlock(now int64, block cipher.Block, encrypted_authID [16]byte, anitReplayMachine *authid_antiReplayMachine) (failReason int) { var t int64 //var rand int32 diff --git a/proxy/vmess/server.go b/proxy/vmess/server.go index a2b6436..8a4d49f 100644 --- a/proxy/vmess/server.go +++ b/proxy/vmess/server.go @@ -34,7 +34,7 @@ type pair struct { cipher.Block } -func authUserByAuthPairList(bs []byte, authPairList []pair, anitReplayMachine *anitReplayMachine) (user utils.V2rayUser, err error) { +func authUserByAuthPairList(bs []byte, authPairList []pair, anitReplayMachine *authid_antiReplayMachine) (user utils.V2rayUser, err error) { now := time.Now().Unix() var encrypted_authid [authid_len]byte @@ -110,16 +110,15 @@ type Server struct { authPairList []pair - sessionHistory *sessionHistory - - anitReplayMachine *anitReplayMachine + authid_anitReplayMachine *authid_antiReplayMachine + session_antiReplayMachine *session_antiReplayMachine } func NewServer() *Server { s := &Server{ - MultiUserMap: utils.NewMultiUserMap(), - sessionHistory: NewSessionHistory(), - anitReplayMachine: newAntiReplyMachine(), + MultiUserMap: utils.NewMultiUserMap(), + authid_anitReplayMachine: newAuthIDAntiReplyMachine(), + session_antiReplayMachine: newSessionAntiReplayMachine(), } s.SetUseUUIDStr_asKey() return s @@ -127,7 +126,8 @@ func NewServer() *Server { func (s *Server) Name() string { return Name } func (s *Server) Stop() { - s.anitReplayMachine.stop() + s.authid_anitReplayMachine.stop() + s.session_antiReplayMachine.stop() } func (s *Server) addUser(u utils.V2rayUser) { @@ -160,7 +160,7 @@ func (s *Server) Handshake(underlay net.Conn) (tcpConn net.Conn, msgConn netLaye returnErr = utils.NumErr{E: utils.ErrInvalidData, N: 1} return } - user, err := authUserByAuthPairList(data[:authid_len], s.authPairList, s.anitReplayMachine) + user, err := authUserByAuthPairList(data[:authid_len], s.authPairList, s.authid_anitReplayMachine) if err != nil { returnErr = err @@ -176,6 +176,13 @@ func (s *Server) Handshake(underlay net.Conn) (tcpConn net.Conn, msgConn netLaye returnErr = errorReason if ce := utils.CanLogWarn("vmess openAEADHeader err"); ce != nil { + //看v2ray有一个 "drain"的用法, + //然而,我们这里是不需要drain的,区别在于,v2ray 不是一次性读取一大串数据, + // 而是用一个 reader 一点一点读,这就会产生一些可探测的问题,所以才要drain + // 而我们直接用 64K 的大buf 一下子读取整个客户端发来的整个数据, 没有读取长度的差别。 + + //不过 为了尊重v2ray的代码,也 以防 我的想法有错误,还是把这个情况陈列在这里,留作备用。 + ce.Write(zap.Any("things", []any{errorReason, shouldDrain, bytesRead})) } @@ -211,7 +218,7 @@ func (s *Server) Handshake(underlay net.Conn) (tcpConn net.Conn, msgConn netLaye sid.key = sc.reqBodyKey sid.nonce = sc.reqBodyIV - if !s.sessionHistory.addIfNotExits(sid) { + if !s.session_antiReplayMachine.check(sid) { returnErr = ErrReplaySessionAttack return }