mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
webrtc: fix memory leak with udpmux.muxedConnection context (#3243)
This context wasn't being cancelled on all code paths. In particular, contexts for connections that didn't complete negotiation were not being cancelled. The change arranges for either `udpmux.muxedConnection.Close` or `RemoveConnByUfrag` to call the other. Fixes: #3223
This commit is contained in:
@@ -271,12 +271,13 @@ func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {
|
||||
|
||||
for _, isIPv6 := range [...]bool{true, false} {
|
||||
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
|
||||
if _, ok := mux.ufragMap[key]; ok {
|
||||
if conn, ok := mux.ufragMap[key]; ok {
|
||||
delete(mux.ufragMap, key)
|
||||
for _, addr := range mux.ufragAddrMap[key] {
|
||||
delete(mux.addrMap, addr.String())
|
||||
}
|
||||
delete(mux.ufragAddrMap, key)
|
||||
conn.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -293,7 +294,7 @@ func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr ne
|
||||
return false, conn
|
||||
}
|
||||
|
||||
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
|
||||
conn := newMuxedConnection(mux, ufrag)
|
||||
mux.ufragMap[key] = conn
|
||||
mux.addrMap[addr.String()] = conn
|
||||
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/stun"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -246,3 +247,28 @@ func TestMuxedConnection(t *testing.T) {
|
||||
}
|
||||
require.Empty(t, addrUfragMap)
|
||||
}
|
||||
|
||||
func TestRemovingUfragClosesConn(t *testing.T) {
|
||||
c := newPacketConn(t)
|
||||
m := NewUDPMux(c)
|
||||
m.Start()
|
||||
defer m.Close()
|
||||
remoteAddr := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234}
|
||||
conn, err := m.GetConn("a", remoteAddr)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
connClosed := make(chan bool)
|
||||
go func() {
|
||||
_, _, err := conn.ReadFrom(make([]byte, 100))
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
close(connClosed)
|
||||
}()
|
||||
require.NoError(t, err)
|
||||
m.RemoveConnByUfrag("a")
|
||||
select {
|
||||
case <-connClosed:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("expected the connection to be closed")
|
||||
}
|
||||
}
|
||||
|
@@ -23,31 +23,29 @@ const queueLen = 128
|
||||
// from which this connection (indexed by ufrag) received
|
||||
// data.
|
||||
type muxedConnection struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
onClose func()
|
||||
queue chan packet
|
||||
mux *UDPMux
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
queue chan packet
|
||||
mux *UDPMux
|
||||
ufrag string
|
||||
}
|
||||
|
||||
var _ net.PacketConn = &muxedConnection{}
|
||||
|
||||
func newMuxedConnection(mux *UDPMux, onClose func()) *muxedConnection {
|
||||
func newMuxedConnection(mux *UDPMux, ufrag string) *muxedConnection {
|
||||
ctx, cancel := context.WithCancel(mux.ctx)
|
||||
return &muxedConnection{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
queue: make(chan packet, queueLen),
|
||||
onClose: onClose,
|
||||
mux: mux,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
queue: make(chan packet, queueLen),
|
||||
mux: mux,
|
||||
ufrag: ufrag,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *muxedConnection) Push(buf []byte, addr net.Addr) error {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
if c.ctx.Err() != nil {
|
||||
return errors.New("closed")
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case c.queue <- packet{buf: buf, addr: addr}:
|
||||
@@ -76,12 +74,21 @@ func (c *muxedConnection) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
}
|
||||
|
||||
func (c *muxedConnection) Close() error {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
if c.ctx.Err() != nil {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
c.onClose()
|
||||
// mux calls close to actually close the connection
|
||||
//
|
||||
// Removing the connection from the mux or closing the connection
|
||||
// must trigger the other.
|
||||
// Doing this here ensures we don't need to call both RemoveConnByUfrag
|
||||
// and close on all code paths.
|
||||
c.mux.RemoveConnByUfrag(c.ufrag)
|
||||
return nil
|
||||
}
|
||||
|
||||
// closes the connection. Must only be called by the mux.
|
||||
func (c *muxedConnection) close() {
|
||||
c.cancel()
|
||||
// drain the packet queue
|
||||
for {
|
||||
@@ -89,7 +96,7 @@ func (c *muxedConnection) Close() error {
|
||||
case p := <-c.queue:
|
||||
pool.Put(p.buf)
|
||||
default:
|
||||
return nil
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user