mirror of
https://github.com/aler9/gortsplib
synced 2025-10-04 14:52:46 +08:00
148
server.go
148
server.go
@@ -3,6 +3,7 @@ package gortsplib
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
@@ -19,6 +20,8 @@ const (
|
||||
serverAuthRealm = "ipcam"
|
||||
)
|
||||
|
||||
var errHTTPUpgraded = errors.New("upgraded to HTTP conn")
|
||||
|
||||
func extractPort(address string) (int, error) {
|
||||
_, tmp, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
@@ -47,6 +50,13 @@ type sessionRequestReq struct {
|
||||
res chan sessionRequestRes
|
||||
}
|
||||
|
||||
type sessionHandleHTTPChannelReq struct {
|
||||
sc *ServerConn
|
||||
write bool
|
||||
tunnelID string
|
||||
res chan error
|
||||
}
|
||||
|
||||
type chGetMulticastIPReq struct {
|
||||
res chan net.IP
|
||||
}
|
||||
@@ -129,25 +139,27 @@ type Server struct {
|
||||
sessionTimeout time.Duration
|
||||
checkStreamPeriod time.Duration
|
||||
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
wg sync.WaitGroup
|
||||
multicastNet *net.IPNet
|
||||
multicastNextIP net.IP
|
||||
tcpListener *serverTCPListener
|
||||
udpRTPListener *serverUDPListener
|
||||
udpRTCPListener *serverUDPListener
|
||||
sessions map[string]*ServerSession
|
||||
conns map[*ServerConn]struct{}
|
||||
closeError error
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
wg sync.WaitGroup
|
||||
multicastNet *net.IPNet
|
||||
multicastNextIP net.IP
|
||||
tcpListener *serverTCPListener
|
||||
udpRTPListener *serverUDPListener
|
||||
udpRTCPListener *serverUDPListener
|
||||
conns map[*ServerConn]struct{}
|
||||
httpReadChannels map[*ServerConn]chan error
|
||||
sessions map[string]*ServerSession
|
||||
closeError error
|
||||
|
||||
// in
|
||||
chNewConn chan net.Conn
|
||||
chAcceptErr chan error
|
||||
chCloseConn chan *ServerConn
|
||||
chHandleRequest chan sessionRequestReq
|
||||
chCloseSession chan *ServerSession
|
||||
chGetMulticastIP chan chGetMulticastIPReq
|
||||
chNewConn chan net.Conn
|
||||
chAcceptErr chan error
|
||||
chCloseConn chan *ServerConn
|
||||
chHandleHTTPChannel chan sessionHandleHTTPChannelReq
|
||||
chHandleRequest chan sessionRequestReq
|
||||
chCloseSession chan *ServerSession
|
||||
chGetMulticastIP chan chGetMulticastIPReq
|
||||
}
|
||||
|
||||
// Start starts the server.
|
||||
@@ -305,18 +317,18 @@ func (s *Server) Start() error {
|
||||
|
||||
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
|
||||
|
||||
s.sessions = make(map[string]*ServerSession)
|
||||
s.conns = make(map[*ServerConn]struct{})
|
||||
s.httpReadChannels = make(map[*ServerConn]chan error)
|
||||
s.sessions = make(map[string]*ServerSession)
|
||||
s.chNewConn = make(chan net.Conn)
|
||||
s.chAcceptErr = make(chan error)
|
||||
s.chCloseConn = make(chan *ServerConn)
|
||||
s.chHandleHTTPChannel = make(chan sessionHandleHTTPChannelReq)
|
||||
s.chHandleRequest = make(chan sessionRequestReq)
|
||||
s.chCloseSession = make(chan *ServerSession)
|
||||
s.chGetMulticastIP = make(chan chGetMulticastIPReq)
|
||||
|
||||
s.tcpListener = &serverTCPListener{
|
||||
s: s,
|
||||
}
|
||||
s.tcpListener = &serverTCPListener{s: s}
|
||||
err := s.tcpListener.initialize()
|
||||
if err != nil {
|
||||
if s.udpRTPListener != nil {
|
||||
@@ -355,6 +367,8 @@ func (s *Server) run() {
|
||||
|
||||
s.ctxCancel()
|
||||
|
||||
s.tcpListener.close()
|
||||
|
||||
if s.udpRTCPListener != nil {
|
||||
s.udpRTCPListener.close()
|
||||
}
|
||||
@@ -362,8 +376,6 @@ func (s *Server) run() {
|
||||
if s.udpRTPListener != nil {
|
||||
s.udpRTPListener.close()
|
||||
}
|
||||
|
||||
s.tcpListener.close()
|
||||
}
|
||||
|
||||
func (s *Server) runInner() error {
|
||||
@@ -385,10 +397,36 @@ func (s *Server) runInner() error {
|
||||
continue
|
||||
}
|
||||
delete(s.conns, sc)
|
||||
delete(s.httpReadChannels, sc)
|
||||
sc.Close()
|
||||
|
||||
case req := <-s.chHandleHTTPChannel:
|
||||
if !req.write {
|
||||
req.sc.httpReadTunnelID = req.tunnelID
|
||||
s.httpReadChannels[req.sc] = req.res
|
||||
} else {
|
||||
readChan, readChanRes := s.findHTTPReadChannel(req.sc, req.tunnelID)
|
||||
if readChan == nil {
|
||||
req.res <- fmt.Errorf("did not found a corresponding HTTP GET request")
|
||||
} else {
|
||||
delete(s.httpReadChannels, readChan)
|
||||
close(readChanRes)
|
||||
req.res <- errHTTPUpgraded
|
||||
|
||||
nconn := newServerHTTPTunnel(req.sc.nconn, req.sc.httpReadBuf, readChan.nconn)
|
||||
sc := &ServerConn{
|
||||
s: s,
|
||||
nconn: nconn,
|
||||
isHTTP: true,
|
||||
}
|
||||
sc.initialize()
|
||||
s.conns[sc] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
case req := <-s.chHandleRequest:
|
||||
if ss, ok := s.sessions[req.id]; ok {
|
||||
ss, ok := s.sessions[req.id]
|
||||
if ok {
|
||||
if !req.sc.ip().Equal(ss.author.ip()) ||
|
||||
req.sc.zone() != ss.author.zone() {
|
||||
req.res <- sessionRequestRes{
|
||||
@@ -399,17 +437,6 @@ func (s *Server) runInner() error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case ss.chHandleRequest <- req:
|
||||
case <-ss.ctx.Done():
|
||||
req.res <- sessionRequestRes{
|
||||
res: &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
},
|
||||
err: liberrors.ErrServerTerminated{},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !req.create {
|
||||
req.res <- sessionRequestRes{
|
||||
@@ -427,19 +454,10 @@ func (s *Server) runInner() error {
|
||||
}
|
||||
ss.initialize()
|
||||
s.sessions[ss.secretID] = ss
|
||||
|
||||
select {
|
||||
case ss.chHandleRequest <- req:
|
||||
case <-ss.ctx.Done():
|
||||
req.res <- sessionRequestRes{
|
||||
res: &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
},
|
||||
err: liberrors.ErrServerTerminated{},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ss.handleRequestNoWait(req)
|
||||
|
||||
case ss := <-s.chCloseSession:
|
||||
if sss, ok := s.sessions[ss.secretID]; !ok || sss != ss {
|
||||
continue
|
||||
@@ -478,6 +496,16 @@ func (s *Server) StartAndWait() error {
|
||||
return s.Wait()
|
||||
}
|
||||
|
||||
func (s *Server) findHTTPReadChannel(writeChan *ServerConn, tunnelID string) (*ServerConn, chan error) {
|
||||
for readChan, readChanRes := range s.httpReadChannels {
|
||||
if readChan.remoteAddr.IP.Equal(writeChan.remoteAddr.IP) &&
|
||||
readChan.httpReadTunnelID == tunnelID {
|
||||
return readChan, readChanRes
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *Server) getMulticastIP() (net.IP, error) {
|
||||
res := make(chan net.IP)
|
||||
select {
|
||||
@@ -518,6 +546,34 @@ func (s *Server) closeSession(ss *ServerSession) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleHTTPChannel(req sessionHandleHTTPChannelReq) error {
|
||||
req.res = make(chan error)
|
||||
|
||||
select {
|
||||
case s.chHandleHTTPChannel <- req:
|
||||
case <-req.sc.ctx.Done():
|
||||
return fmt.Errorf("terminated")
|
||||
case <-s.ctx.Done():
|
||||
return fmt.Errorf("terminated")
|
||||
}
|
||||
|
||||
if !req.write {
|
||||
t := time.NewTimer(5 * time.Second)
|
||||
defer t.Stop()
|
||||
|
||||
select {
|
||||
case <-req.res:
|
||||
case <-req.sc.ctx.Done():
|
||||
return fmt.Errorf("terminated")
|
||||
case <-t.C:
|
||||
return fmt.Errorf("did not found a corresponding HTTP POST request")
|
||||
}
|
||||
return errHTTPUpgraded
|
||||
}
|
||||
|
||||
return <-req.res
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) {
|
||||
select {
|
||||
case s.chHandleRequest <- req:
|
||||
|
Reference in New Issue
Block a user