server: fix race condition

This commit is contained in:
aler9
2021-05-10 23:13:00 +02:00
parent c878b68b70
commit 19a18393e5
3 changed files with 8 additions and 18 deletions

View File

@@ -245,13 +245,11 @@ func (s *Server) run() {
s.sessionRequest = make(chan request)
s.sessionClose = make(chan *ServerSession)
var wg sync.WaitGroup
wg.Add(1)
s.wg.Add(1)
connNew := make(chan net.Conn)
acceptErr := make(chan error)
go func() {
defer wg.Done()
defer s.wg.Done()
err := func() error {
for {
nconn, err := s.tcpListener.Accept()
@@ -281,7 +279,7 @@ outer:
break outer
case nconn := <-connNew:
sc := newServerConn(s, &wg, nconn)
sc := newServerConn(s, nconn)
s.conns[sc] = struct{}{}
case sc := <-s.connClose:
@@ -313,7 +311,7 @@ outer:
continue
}
ss := newServerSession(s, id, &wg, req.sc)
ss := newServerSession(s, id, req.sc)
s.sessions[id] = ss
select {

View File

@@ -6,7 +6,6 @@ import (
"crypto/tls"
"net"
"strings"
"sync"
"time"
"github.com/aler9/gortsplib/pkg/base"
@@ -44,7 +43,6 @@ type readReq struct {
// ServerConn is a server-side RTSP connection.
type ServerConn struct {
s *Server
wg *sync.WaitGroup
nconn net.Conn
ctx context.Context
ctxCancel func()
@@ -67,21 +65,19 @@ type ServerConn struct {
func newServerConn(
s *Server,
wg *sync.WaitGroup,
nconn net.Conn) *ServerConn {
ctx, ctxCancel := context.WithCancel(s.ctx)
sc := &ServerConn{
s: s,
wg: wg,
nconn: nconn,
ctx: ctx,
ctxCancel: ctxCancel,
sessionRemove: make(chan *ServerSession),
}
wg.Add(1)
s.wg.Add(1)
go sc.run()
return sc
@@ -107,7 +103,7 @@ func (sc *ServerConn) zone() string {
}
func (sc *ServerConn) run() {
defer sc.wg.Done()
defer sc.s.wg.Done()
if h, ok := sc.s.Handler.(ServerHandlerOnConnOpen); ok {
h.OnConnOpen(&ServerHandlerOnConnOpenCtx{

View File

@@ -6,7 +6,6 @@ import (
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@@ -117,7 +116,6 @@ type ServerSessionAnnouncedTrack struct {
type ServerSession struct {
s *Server
id string
wg *sync.WaitGroup
author *ServerConn
ctx context.Context
@@ -143,7 +141,6 @@ type ServerSession struct {
func newServerSession(
s *Server,
id string,
wg *sync.WaitGroup,
author *ServerConn,
) *ServerSession {
@@ -152,7 +149,6 @@ func newServerSession(
ss := &ServerSession{
s: s,
id: id,
wg: wg,
author: author,
ctx: ctx,
ctxCancel: ctxCancel,
@@ -162,7 +158,7 @@ func newServerSession(
connRemove: make(chan *ServerConn),
}
wg.Add(1)
s.wg.Add(1)
go ss.run()
return ss
@@ -214,7 +210,7 @@ func (ss *ServerSession) checkState(allowed map[ServerSessionState]struct{}) err
}
func (ss *ServerSession) run() {
defer ss.wg.Done()
defer ss.s.wg.Done()
if h, ok := ss.s.Handler.(ServerHandlerOnSessionOpen); ok {
h.OnSessionOpen(&ServerHandlerOnSessionOpenCtx{