replace New* with Initialize (#723)

This commit is contained in:
Alessandro Ros
2025-03-16 13:58:35 +01:00
committed by GitHub
parent db959f854e
commit 376fb9e821
24 changed files with 296 additions and 124 deletions

View File

@@ -854,7 +854,8 @@ func (c *Client) trySwitchingProtocol2(medi *description.Media, baseURL *base.UR
} }
func (c *Client) startTransportRoutines() { func (c *Client) startTransportRoutines() {
c.timeDecoder = rtptime.NewGlobalDecoder2() c.timeDecoder = &rtptime.GlobalDecoder2{}
c.timeDecoder.Initialize()
for _, cm := range c.setuppedMedias { for _, cm := range c.setuppedMedias {
cm.start() cm.start()
@@ -1025,7 +1026,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
// get session from response // get session from response
if v, ok := res.Header["Session"]; ok { if v, ok := res.Header["Session"]; ok {
var sx headers.Session var sx headers.Session
err := sx.Unmarshal(v) err = sx.Unmarshal(v)
if err != nil { if err != nil {
return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err} return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err}
} }
@@ -1041,7 +1042,12 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
pass, _ := req.URL.User.Password() pass, _ := req.URL.User.Password()
user := req.URL.User.Username() user := req.URL.User.Username()
sender, err := auth.NewSender(res.Header["WWW-Authenticate"], user, pass) sender := &auth.Sender{
WWWAuth: res.Header["WWW-Authenticate"],
User: user,
Pass: pass,
}
err = sender.Initialize()
if err != nil { if err != nil {
return nil, liberrors.ErrClientAuthSetup{Err: err} return nil, liberrors.ErrClientAuthSetup{Err: err}
} }

View File

@@ -12,7 +12,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's a H264 stream and an MPEG-4 audio format // 2. check if there's a H264 stream and a MPEG-4 audio stream
// 3. save the content of those formats in a file in MPEG-TS format // 3. save the content of those formats in a file in MPEG-TS format
func main() { func main() {

View File

@@ -15,7 +15,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an H264 format // 2. check if there's an H264 stream
// 3. decode the H264 stream into RGBA frames // 3. decode the H264 stream into RGBA frames
// This example requires the FFmpeg libraries, that can be installed with this command: // This example requires the FFmpeg libraries, that can be installed with this command:

View File

@@ -15,7 +15,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an H265 format // 2. check if there's a H265 stream
// 3. decode the H265 stream into RGBA frames // 3. decode the H265 stream into RGBA frames
// This example requires the FFmpeg libraries, that can be installed with this command: // This example requires the FFmpeg libraries, that can be installed with this command:

View File

@@ -11,7 +11,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an LPCM format // 2. check if there's a LPCM stream
// 3. get LPCM samples of that format // 3. get LPCM samples of that format
func main() { func main() {

View File

@@ -12,7 +12,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an MPEG-4 audio format // 2. check if there's a MPEG-4 audio stream
// 3. save the content of the format in a file in MPEG-TS format // 3. save the content of the format in a file in MPEG-TS format
func main() { func main() {

View File

@@ -11,7 +11,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an MPEG-4 audio format // 2. check if there's a MPEG-4 audio stream
// 3. get access units of that format // 3. get access units of that format
func main() { func main() {

View File

@@ -11,7 +11,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an Opus format // 2. check if there's an Opus stream
// 3. get Opus packets of that format // 3. get Opus packets of that format
func main() { func main() {

View File

@@ -14,7 +14,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an VP8 format // 2. check if there's a VP8 stream
// 3. decode the VP8 stream into RGBA frames // 3. decode the VP8 stream into RGBA frames
// This example requires the FFmpeg libraries, that can be installed with this command: // This example requires the FFmpeg libraries, that can be installed with this command:

View File

@@ -14,7 +14,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an VP9 format // 2. check if there's a VP9 stream
// 3. decode the VP9 stream into RGBA frames // 3. decode the VP9 stream into RGBA frames
// This example requires the FFmpeg libraries, that can be installed with this command: // This example requires the FFmpeg libraries, that can be installed with this command:

View File

@@ -98,7 +98,11 @@ func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response,
func (s *server) setStreamReady(desc *description.Session) *gortsplib.ServerStream { func (s *server) setStreamReady(desc *description.Session) *gortsplib.ServerStream {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.stream = gortsplib.NewServerStream(s.s, desc) s.stream = &gortsplib.ServerStream{
Server: s.s,
Desc: desc,
}
s.stream.Initialize()
return s.stream return s.stream
} }

View File

@@ -117,7 +117,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
} }
// create the stream and save the publisher // create the stream and save the publisher
sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Desc: ctx.Description,
}
sh.stream.Initialize()
sh.publisher = ctx.Session sh.publisher = ctx.Session
return &base.Response{ return &base.Response{

View File

@@ -89,7 +89,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
} }
// create the stream and save the publisher // create the stream and save the publisher
sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Desc: ctx.Description,
}
sh.stream.Initialize()
sh.publisher = ctx.Session sh.publisher = ctx.Session
return &base.Response{ return &base.Response{

View File

@@ -88,7 +88,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
} }
// create the stream and save the publisher // create the stream and save the publisher
sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Desc: ctx.Description,
}
sh.stream.Initialize()
sh.publisher = ctx.Session sh.publisher = ctx.Session
return &base.Response{ return &base.Response{

View File

@@ -328,7 +328,11 @@ func TestServerRecordRead(t *testing.T) {
}, fmt.Errorf("someone is already publishing") }, fmt.Errorf("someone is already publishing")
} }
stream = gortsplib.NewServerStream(s, ctx.Description) stream = &gortsplib.ServerStream{
Server: s,
Desc: ctx.Description,
}
stream.Initialize()
publisher = ctx.Session publisher = ctx.Session
return &base.Response{ return &base.Response{

View File

@@ -7,42 +7,51 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/headers"
) )
// NewSender allocates a Sender.
//
// Deprecated: replaced by Sender.Initialize().
func NewSender(wwwAuth base.HeaderValue, user string, pass string) (*Sender, error) {
s := &Sender{
WWWAuth: wwwAuth,
User: user,
Pass: pass,
}
err := s.Initialize()
return s, err
}
// Sender allows to send credentials. // Sender allows to send credentials.
// It requires a WWW-Authenticate header (provided by the server)
// and a set of credentials.
type Sender struct { type Sender struct {
user string WWWAuth base.HeaderValue
pass string User string
Pass string
authHeader *headers.Authenticate authHeader *headers.Authenticate
} }
// NewSender allocates a Sender. // Initialize initializes a Sender.
// It requires a WWW-Authenticate header (provided by the server) func (se *Sender) Initialize() error {
// and a set of credentials. for _, v := range se.WWWAuth {
func NewSender(wwwAuth base.HeaderValue, user string, pass string) (*Sender, error) {
var bestAuthHeader *headers.Authenticate
for _, v := range wwwAuth {
var auth headers.Authenticate var auth headers.Authenticate
err := auth.Unmarshal(base.HeaderValue{v}) err := auth.Unmarshal(base.HeaderValue{v})
if err != nil { if err != nil {
continue // ignore unrecognized headers continue // ignore unrecognized headers
} }
if bestAuthHeader == nil || if se.authHeader == nil ||
(auth.Algorithm != nil && *auth.Algorithm == headers.AuthAlgorithmSHA256) || (auth.Algorithm != nil && *auth.Algorithm == headers.AuthAlgorithmSHA256) ||
(bestAuthHeader.Method == headers.AuthMethodBasic) { (se.authHeader.Method == headers.AuthMethodBasic) {
bestAuthHeader = &auth se.authHeader = &auth
} }
} }
if bestAuthHeader == nil { if se.authHeader == nil {
return nil, fmt.Errorf("no authentication methods available") return fmt.Errorf("no authentication methods available")
} }
return &Sender{ return nil
user: user,
pass: pass,
authHeader: bestAuthHeader,
}, nil
} }
// AddAuthorization adds the Authorization header to a Request. // AddAuthorization adds the Authorization header to a Request.
@@ -53,10 +62,10 @@ func (se *Sender) AddAuthorization(req *base.Request) {
Method: se.authHeader.Method, Method: se.authHeader.Method,
} }
h.Username = se.user h.Username = se.User
if se.authHeader.Method == headers.AuthMethodBasic { if se.authHeader.Method == headers.AuthMethodBasic {
h.BasicPass = se.pass h.BasicPass = se.Pass
} else { // digest } else { // digest
h.Realm = se.authHeader.Realm h.Realm = se.authHeader.Realm
h.Nonce = se.authHeader.Nonce h.Nonce = se.authHeader.Nonce
@@ -64,10 +73,10 @@ func (se *Sender) AddAuthorization(req *base.Request) {
h.Algorithm = se.authHeader.Algorithm h.Algorithm = se.authHeader.Algorithm
if se.authHeader.Algorithm == nil || *se.authHeader.Algorithm == headers.AuthAlgorithmMD5 { if se.authHeader.Algorithm == nil || *se.authHeader.Algorithm == headers.AuthAlgorithmMD5 {
h.Response = md5Hex(md5Hex(se.user+":"+se.authHeader.Realm+":"+se.pass) + ":" + h.Response = md5Hex(md5Hex(se.User+":"+se.authHeader.Realm+":"+se.Pass) + ":" +
se.authHeader.Nonce + ":" + md5Hex(string(req.Method)+":"+urStr)) se.authHeader.Nonce + ":" + md5Hex(string(req.Method)+":"+urStr))
} else { // sha256 } else { // sha256
h.Response = sha256Hex(sha256Hex(se.user+":"+se.authHeader.Realm+":"+se.pass) + ":" + h.Response = sha256Hex(sha256Hex(se.User+":"+se.authHeader.Realm+":"+se.Pass) + ":" +
se.authHeader.Nonce + ":" + sha256Hex(string(req.Method)+":"+urStr)) se.authHeader.Nonce + ":" + sha256Hex(string(req.Method)+":"+urStr))
} }
} }

View File

@@ -32,6 +32,15 @@ type GlobalDecoder2Track interface {
PTSEqualsDTS(*rtp.Packet) bool PTSEqualsDTS(*rtp.Packet) bool
} }
// NewGlobalDecoder2 allocates a GlobalDecoder.
//
// Deprecated: replaced by GlobalDecoder2.Initialize().
func NewGlobalDecoder2() *GlobalDecoder2 {
d := &GlobalDecoder2{}
d.Initialize()
return d
}
// GlobalDecoder2 is a RTP timestamp decoder. // GlobalDecoder2 is a RTP timestamp decoder.
type GlobalDecoder2 struct { type GlobalDecoder2 struct {
mutex sync.Mutex mutex sync.Mutex
@@ -42,11 +51,9 @@ type GlobalDecoder2 struct {
tracks map[GlobalDecoder2Track]*globalDecoder2TrackData tracks map[GlobalDecoder2Track]*globalDecoder2TrackData
} }
// NewGlobalDecoder2 allocates a GlobalDecoder. // Initialize initializes a GlobalDecoder2.
func NewGlobalDecoder2() *GlobalDecoder2 { func (d *GlobalDecoder2) Initialize() {
return &GlobalDecoder2{ d.tracks = make(map[GlobalDecoder2Track]*globalDecoder2TrackData)
tracks: make(map[GlobalDecoder2Track]*globalDecoder2TrackData),
}
} }
// Decode decodes a timestamp. // Decode decodes a timestamp.

View File

@@ -348,7 +348,7 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err
} }
if stream != nil { if stream != nil {
byts, _ := serverSideDescription(stream.desc).Marshal(multicast) byts, _ := serverSideDescription(stream.Desc).Marshal(multicast)
res.Body = byts res.Body = byts
} }
} }

View File

@@ -291,14 +291,18 @@ func TestServerPlayPath(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{ stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{ Medias: []*description.Media{
testH264Media, testH264Media,
testH264Media, testH264Media,
testH264Media, testH264Media,
testH264Media, testH264Media,
}, },
}) },
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -380,7 +384,12 @@ func TestServerPlaySetupErrors(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
if ca == "closed stream" { if ca == "closed stream" {
stream.Close() stream.Close()
} else { } else {
@@ -547,7 +556,11 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
@@ -723,7 +736,11 @@ func TestServerPlay(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", listenIP+":8554") nconn, err := net.Dial("tcp", listenIP+":8554")
@@ -1017,7 +1034,11 @@ func TestServerPlaySocketError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
func() { func() {
nconn, err := net.Dial("tcp", listenIP+":8554") nconn, err := net.Dial("tcp", listenIP+":8554")
@@ -1183,7 +1204,11 @@ func TestServerPlayDecodeErrors(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1301,7 +1326,11 @@ func TestServerPlayRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1420,7 +1449,11 @@ func TestServerPlayVLCMulticast(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", listenIP+":8554") nconn, err := net.Dial("tcp", listenIP+":8554")
@@ -1501,7 +1534,11 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1588,7 +1625,11 @@ func TestServerPlayPause(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1681,7 +1722,11 @@ func TestServerPlayPlayPausePausePlay(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1764,7 +1809,11 @@ func TestServerPlayTimeout(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1850,7 +1899,11 @@ func TestServerPlayWithoutTeardown(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1922,7 +1975,11 @@ func TestServerPlayUDPChangeConn(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
sxID := "" sxID := ""
@@ -2006,7 +2063,11 @@ func TestServerPlayPartialMedias(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media, testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media, testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -2112,7 +2173,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{ stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{ Medias: []*description.Media{
{ {
Type: "application", Type: "application",
@@ -2123,7 +2186,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
Formats: []format.Format{forma}, Formats: []format.Format{forma},
}, },
}, },
}) },
}
stream.Initialize()
defer stream.Close() defer stream.Close()
err = stream.WritePacketRTP(stream.Description().Medias[0], &rtp.Packet{ err = stream.WritePacketRTP(stream.Description().Medias[0], &rtp.Packet{
@@ -2238,7 +2303,9 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{ stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{ Medias: []*description.Media{
{ {
Type: "application", Type: "application",
@@ -2249,7 +2316,9 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
Formats: []format.Format{forma}, Formats: []format.Format{forma},
}, },
}, },
}) },
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -2318,7 +2387,12 @@ func TestServerPlayStreamStats(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}})
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
for _, transport := range []string{"tcp", "multicast"} { for _, transport := range []string{"tcp", "multicast"} {

View File

@@ -292,8 +292,12 @@ func TestServerRecordPath(t *testing.T) {
s = &Server{ s = &Server{
Handler: &testServerHandler{ Handler: &testServerHandler{
onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
// make sure that media URLs are not overridden by NewServerStream() // make sure that media URLs are not overridden by ServerStream.Initialize()
stream := NewServerStream(s, ctx.Description) stream := &ServerStream{
Server: s,
Desc: ctx.Description,
}
stream.Initialize()
defer stream.Close() defer stream.Close()
return &base.Response{ return &base.Response{

View File

@@ -1024,7 +1024,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
var medi *description.Media var medi *description.Media
switch ss.state { switch ss.state {
case ServerSessionStateInitial, ServerSessionStatePrePlay: // play case ServerSessionStateInitial, ServerSessionStatePrePlay: // play
medi = findMediaByTrackID(stream.desc.Medias, trackID) medi = findMediaByTrackID(stream.Desc.Medias, trackID)
default: // record default: // record
medi = findMediaByURL(ss.announcedDesc.Medias, path, query, req.URL) medi = findMediaByURL(ss.announcedDesc.Medias, path, query, req.URL)
} }
@@ -1183,7 +1183,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
v := ss.s.timeNow().Unix() v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v ss.udpLastPacketTime = &v
ss.timeDecoder = rtptime.NewGlobalDecoder2() ss.timeDecoder = &rtptime.GlobalDecoder2{}
ss.timeDecoder.Initialize()
for _, sm := range ss.setuppedMedias { for _, sm := range ss.setuppedMedias {
sm.start() sm.start()
@@ -1269,7 +1270,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
v := ss.s.timeNow().Unix() v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v ss.udpLastPacketTime = &v
ss.timeDecoder = rtptime.NewGlobalDecoder2() ss.timeDecoder = &rtptime.GlobalDecoder2{}
ss.timeDecoder.Initialize()
for _, sm := range ss.setuppedMedias { for _, sm := range ss.setuppedMedias {
sm.start() sm.start()

View File

@@ -24,14 +24,26 @@ func firstFormat(formats map[uint8]*serverStreamFormat) *serverStreamFormat {
return formats[firstKey] return formats[firstKey]
} }
// NewServerStream allocates a ServerStream.
//
// Deprecated: replaced by ServerStream.Initialize().
func NewServerStream(s *Server, desc *description.Session) *ServerStream {
st := &ServerStream{
Server: s,
Desc: desc,
}
st.Initialize()
return st
}
// ServerStream represents a data stream. // ServerStream represents a data stream.
// This is in charge of // This is in charge of
// - distributing the stream to each reader // - distributing the stream to each reader
// - allocating multicast listeners // - allocating multicast listeners
// - gathering infos about the stream in order to generate SSRC and RTP-Info // - gathering infos about the stream in order to generate SSRC and RTP-Info
type ServerStream struct { type ServerStream struct {
s *Server Server *Server
desc *description.Session Desc *description.Session
mutex sync.RWMutex mutex sync.RWMutex
readers map[*ServerSession]struct{} readers map[*ServerSession]struct{}
@@ -41,17 +53,13 @@ type ServerStream struct {
closed bool closed bool
} }
// NewServerStream allocates a ServerStream. // Initialize initializes a ServerStream.
func NewServerStream(s *Server, desc *description.Session) *ServerStream { func (st *ServerStream) Initialize() {
st := &ServerStream{ st.readers = make(map[*ServerSession]struct{})
s: s, st.activeUnicastReaders = make(map[*ServerSession]struct{})
desc: desc,
readers: make(map[*ServerSession]struct{}),
activeUnicastReaders: make(map[*ServerSession]struct{}),
}
st.medias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) st.medias = make(map[*description.Media]*serverStreamMedia, len(st.Desc.Medias))
for i, medi := range desc.Medias { for i, medi := range st.Desc.Medias {
sm := &serverStreamMedia{ sm := &serverStreamMedia{
st: st, st: st,
media: medi, media: medi,
@@ -60,8 +68,6 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
sm.initialize() sm.initialize()
st.medias[medi] = sm st.medias[medi] = sm
} }
return st
} }
// Close closes a ServerStream. // Close closes a ServerStream.
@@ -91,8 +97,10 @@ func (st *ServerStream) BytesSent() uint64 {
} }
// Description returns the description of the stream. // Description returns the description of the stream.
//
// Deprecated: use ServerStream.Desc.
func (st *ServerStream) Description() *description.Session { func (st *ServerStream) Description() *description.Session {
return st.desc return st.Desc
} }
// Stats returns stream statistics. // Stats returns stream statistics.
@@ -239,7 +247,7 @@ func (st *ServerStream) readerAdd(
if st.multicastReaderCount == 0 { if st.multicastReaderCount == 0 {
for _, media := range st.medias { for _, media := range st.medias {
mw := &serverMulticastWriter{ mw := &serverMulticastWriter{
s: st.s, s: st.Server,
} }
err := mw.initialize() err := mw.initialize()
if err != nil { if err != nil {
@@ -316,13 +324,13 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
// WritePacketRTP writes a RTP packet to all the readers of the stream. // WritePacketRTP writes a RTP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error { func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error {
return st.WritePacketRTPWithNTP(medi, pkt, st.s.timeNow()) return st.WritePacketRTPWithNTP(medi, pkt, st.Server.timeNow())
} }
// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. // ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error { func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, st.s.MaxPacketSize) byts := make([]byte, st.Server.MaxPacketSize)
n, err := pkt.MarshalTo(byts) n, err := pkt.MarshalTo(byts)
if err != nil { if err != nil {
return err return err

View File

@@ -24,10 +24,10 @@ func (sf *serverStreamFormat) initialize() {
sf.rtcpSender = &rtcpsender.RTCPSender{ sf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: sf.format.ClockRate(), ClockRate: sf.format.ClockRate(),
Period: sf.sm.st.s.senderReportPeriod, Period: sf.sm.st.Server.senderReportPeriod,
TimeNow: sf.sm.st.s.timeNow, TimeNow: sf.sm.st.Server.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) { WritePacketRTCP: func(pkt rtcp.Packet) {
if !sf.sm.st.s.DisableRTCPSenderReports { if !sf.sm.st.Server.DisableRTCPSenderReports {
sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
} }
}, },

View File

@@ -388,7 +388,11 @@ func TestServerErrorMethodNotImplemented(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream := NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream := &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
h.stream = stream h.stream = stream
@@ -481,7 +485,11 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn1, err := net.Dial("tcp", "localhost:8554") nconn1, err := net.Dial("tcp", "localhost:8554")
@@ -562,7 +570,11 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -625,7 +637,11 @@ func TestServerSetupMultipleTransports(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -721,7 +737,11 @@ func TestServerGetSetParameter(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -865,7 +885,11 @@ func TestServerSessionClose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -940,7 +964,11 @@ func TestServerSessionAutoClose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1002,7 +1030,11 @@ func TestServerSessionTeardown(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
stream.Initialize()
defer stream.Close() defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1095,7 +1127,12 @@ func TestServerAuth(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusUnauthorized, res.StatusCode) require.Equal(t, base.StatusUnauthorized, res.StatusCode)
sender, err := auth.NewSender(res.Header["WWW-Authenticate"], "myuser", "mypass") sender := &auth.Sender{
WWWAuth: res.Header["WWW-Authenticate"],
User: "myuser",
Pass: "mypass",
}
err = sender.Initialize()
require.NoError(t, err) require.NoError(t, err)
sender.AddAuthorization(&req) sender.AddAuthorization(&req)
@@ -1153,7 +1190,12 @@ func TestServerAuthFail(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusUnauthorized, res.StatusCode) require.Equal(t, base.StatusUnauthorized, res.StatusCode)
sender, err := auth.NewSender(res.Header["WWW-Authenticate"], "myuser", "mypass") sender := &auth.Sender{
WWWAuth: res.Header["WWW-Authenticate"],
User: "myuser",
Pass: "mypass",
}
err = sender.Initialize()
require.NoError(t, err) require.NoError(t, err)
sender.AddAuthorization(&req) sender.AddAuthorization(&req)