diff --git a/client.go b/client.go index 0ae76524..41eb1531 100644 --- a/client.go +++ b/client.go @@ -854,7 +854,8 @@ func (c *Client) trySwitchingProtocol2(medi *description.Media, baseURL *base.UR } func (c *Client) startTransportRoutines() { - c.timeDecoder = rtptime.NewGlobalDecoder2() + c.timeDecoder = &rtptime.GlobalDecoder2{} + c.timeDecoder.Initialize() for _, cm := range c.setuppedMedias { cm.start() @@ -1025,7 +1026,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error // get session from response if v, ok := res.Header["Session"]; ok { var sx headers.Session - err := sx.Unmarshal(v) + err = sx.Unmarshal(v) if err != nil { return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err} } @@ -1041,7 +1042,12 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error pass, _ := req.URL.User.Password() user := req.URL.User.Username() - sender, err := auth.NewSender(res.Header["WWW-Authenticate"], user, pass) + sender := &auth.Sender{ + WWWAuth: res.Header["WWW-Authenticate"], + User: user, + Pass: pass, + } + err = sender.Initialize() if err != nil { return nil, liberrors.ErrClientAuthSetup{Err: err} } diff --git a/examples/client-play-format-h264-mpeg4audio-to-disk/main.go b/examples/client-play-format-h264-mpeg4audio-to-disk/main.go index 8b5f98ed..a23fd041 100644 --- a/examples/client-play-format-h264-mpeg4audio-to-disk/main.go +++ b/examples/client-play-format-h264-mpeg4audio-to-disk/main.go @@ -12,7 +12,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's a H264 stream and an MPEG-4 audio format +// 2. check if there's a H264 stream and a MPEG-4 audio stream // 3. save the content of those formats in a file in MPEG-TS format func main() { diff --git a/examples/client-play-format-h264/main.go b/examples/client-play-format-h264/main.go index 7ded0ad3..43947301 100644 --- a/examples/client-play-format-h264/main.go +++ b/examples/client-play-format-h264/main.go @@ -15,7 +15,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an H264 format +// 2. check if there's an H264 stream // 3. decode the H264 stream into RGBA frames // This example requires the FFmpeg libraries, that can be installed with this command: diff --git a/examples/client-play-format-h265/main.go b/examples/client-play-format-h265/main.go index cf8fd7e1..053cf448 100644 --- a/examples/client-play-format-h265/main.go +++ b/examples/client-play-format-h265/main.go @@ -15,7 +15,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an H265 format +// 2. check if there's a H265 stream // 3. decode the H265 stream into RGBA frames // This example requires the FFmpeg libraries, that can be installed with this command: diff --git a/examples/client-play-format-lpcm/main.go b/examples/client-play-format-lpcm/main.go index cdbdc9c1..6f13a407 100644 --- a/examples/client-play-format-lpcm/main.go +++ b/examples/client-play-format-lpcm/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an LPCM format +// 2. check if there's a LPCM stream // 3. get LPCM samples of that format func main() { diff --git a/examples/client-play-format-mpeg4audio-to-disk/main.go b/examples/client-play-format-mpeg4audio-to-disk/main.go index bb56fbb9..cec96a4d 100644 --- a/examples/client-play-format-mpeg4audio-to-disk/main.go +++ b/examples/client-play-format-mpeg4audio-to-disk/main.go @@ -12,7 +12,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an MPEG-4 audio format +// 2. check if there's a MPEG-4 audio stream // 3. save the content of the format in a file in MPEG-TS format func main() { diff --git a/examples/client-play-format-mpeg4audio/main.go b/examples/client-play-format-mpeg4audio/main.go index 2805b2fe..e3c1c88f 100644 --- a/examples/client-play-format-mpeg4audio/main.go +++ b/examples/client-play-format-mpeg4audio/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an MPEG-4 audio format +// 2. check if there's a MPEG-4 audio stream // 3. get access units of that format func main() { diff --git a/examples/client-play-format-opus/main.go b/examples/client-play-format-opus/main.go index 58668e5b..316ff727 100644 --- a/examples/client-play-format-opus/main.go +++ b/examples/client-play-format-opus/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an Opus format +// 2. check if there's an Opus stream // 3. get Opus packets of that format func main() { diff --git a/examples/client-play-format-vp8/main.go b/examples/client-play-format-vp8/main.go index a2f71543..a9d04721 100644 --- a/examples/client-play-format-vp8/main.go +++ b/examples/client-play-format-vp8/main.go @@ -14,7 +14,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an VP8 format +// 2. check if there's a VP8 stream // 3. decode the VP8 stream into RGBA frames // This example requires the FFmpeg libraries, that can be installed with this command: diff --git a/examples/client-play-format-vp9/main.go b/examples/client-play-format-vp9/main.go index aaabcfdf..58f5134d 100644 --- a/examples/client-play-format-vp9/main.go +++ b/examples/client-play-format-vp9/main.go @@ -14,7 +14,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an VP9 format +// 2. check if there's a VP9 stream // 3. decode the VP9 stream into RGBA frames // This example requires the FFmpeg libraries, that can be installed with this command: diff --git a/examples/proxy/server.go b/examples/proxy/server.go index 680817d4..32334fb6 100644 --- a/examples/proxy/server.go +++ b/examples/proxy/server.go @@ -98,7 +98,11 @@ func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, func (s *server) setStreamReady(desc *description.Session) *gortsplib.ServerStream { s.mutex.Lock() defer s.mutex.Unlock() - s.stream = gortsplib.NewServerStream(s.s, desc) + s.stream = &gortsplib.ServerStream{ + Server: s.s, + Desc: desc, + } + s.stream.Initialize() return s.stream } diff --git a/examples/server-auth/main.go b/examples/server-auth/main.go index 2fab626f..fb1b4b33 100644 --- a/examples/server-auth/main.go +++ b/examples/server-auth/main.go @@ -117,7 +117,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // create the stream and save the publisher - sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) + sh.stream = &gortsplib.ServerStream{ + Server: sh.s, + Desc: ctx.Description, + } + sh.stream.Initialize() sh.publisher = ctx.Session return &base.Response{ diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index 7af7102a..cb62d501 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -89,7 +89,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // create the stream and save the publisher - sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) + sh.stream = &gortsplib.ServerStream{ + Server: sh.s, + Desc: ctx.Description, + } + sh.stream.Initialize() sh.publisher = ctx.Session return &base.Response{ diff --git a/examples/server/main.go b/examples/server/main.go index 1714c60b..270edfdc 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -88,7 +88,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // create the stream and save the publisher - sh.stream = gortsplib.NewServerStream(sh.s, ctx.Description) + sh.stream = &gortsplib.ServerStream{ + Server: sh.s, + Desc: ctx.Description, + } + sh.stream.Initialize() sh.publisher = ctx.Session return &base.Response{ diff --git a/internal/highleveltests/server_test.go b/internal/highleveltests/server_test.go index c6344e43..9507e29a 100644 --- a/internal/highleveltests/server_test.go +++ b/internal/highleveltests/server_test.go @@ -328,7 +328,11 @@ func TestServerRecordRead(t *testing.T) { }, fmt.Errorf("someone is already publishing") } - stream = gortsplib.NewServerStream(s, ctx.Description) + stream = &gortsplib.ServerStream{ + Server: s, + Desc: ctx.Description, + } + stream.Initialize() publisher = ctx.Session return &base.Response{ diff --git a/pkg/auth/sender.go b/pkg/auth/sender.go index 6c337280..1efcf38f 100644 --- a/pkg/auth/sender.go +++ b/pkg/auth/sender.go @@ -7,42 +7,51 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/headers" ) +// NewSender allocates a Sender. +// +// Deprecated: replaced by Sender.Initialize(). +func NewSender(wwwAuth base.HeaderValue, user string, pass string) (*Sender, error) { + s := &Sender{ + WWWAuth: wwwAuth, + User: user, + Pass: pass, + } + err := s.Initialize() + return s, err +} + // Sender allows to send credentials. +// It requires a WWW-Authenticate header (provided by the server) +// and a set of credentials. type Sender struct { - user string - pass string + WWWAuth base.HeaderValue + User string + Pass string + authHeader *headers.Authenticate } -// NewSender allocates a Sender. -// It requires a WWW-Authenticate header (provided by the server) -// and a set of credentials. -func NewSender(wwwAuth base.HeaderValue, user string, pass string) (*Sender, error) { - var bestAuthHeader *headers.Authenticate - - for _, v := range wwwAuth { +// Initialize initializes a Sender. +func (se *Sender) Initialize() error { + for _, v := range se.WWWAuth { var auth headers.Authenticate err := auth.Unmarshal(base.HeaderValue{v}) if err != nil { continue // ignore unrecognized headers } - if bestAuthHeader == nil || + if se.authHeader == nil || (auth.Algorithm != nil && *auth.Algorithm == headers.AuthAlgorithmSHA256) || - (bestAuthHeader.Method == headers.AuthMethodBasic) { - bestAuthHeader = &auth + (se.authHeader.Method == headers.AuthMethodBasic) { + se.authHeader = &auth } } - if bestAuthHeader == nil { - return nil, fmt.Errorf("no authentication methods available") + if se.authHeader == nil { + return fmt.Errorf("no authentication methods available") } - return &Sender{ - user: user, - pass: pass, - authHeader: bestAuthHeader, - }, nil + return nil } // AddAuthorization adds the Authorization header to a Request. @@ -53,10 +62,10 @@ func (se *Sender) AddAuthorization(req *base.Request) { Method: se.authHeader.Method, } - h.Username = se.user + h.Username = se.User if se.authHeader.Method == headers.AuthMethodBasic { - h.BasicPass = se.pass + h.BasicPass = se.Pass } else { // digest h.Realm = se.authHeader.Realm h.Nonce = se.authHeader.Nonce @@ -64,10 +73,10 @@ func (se *Sender) AddAuthorization(req *base.Request) { h.Algorithm = se.authHeader.Algorithm if se.authHeader.Algorithm == nil || *se.authHeader.Algorithm == headers.AuthAlgorithmMD5 { - h.Response = md5Hex(md5Hex(se.user+":"+se.authHeader.Realm+":"+se.pass) + ":" + + h.Response = md5Hex(md5Hex(se.User+":"+se.authHeader.Realm+":"+se.Pass) + ":" + se.authHeader.Nonce + ":" + md5Hex(string(req.Method)+":"+urStr)) } else { // sha256 - h.Response = sha256Hex(sha256Hex(se.user+":"+se.authHeader.Realm+":"+se.pass) + ":" + + h.Response = sha256Hex(sha256Hex(se.User+":"+se.authHeader.Realm+":"+se.Pass) + ":" + se.authHeader.Nonce + ":" + sha256Hex(string(req.Method)+":"+urStr)) } } diff --git a/pkg/rtptime/global_decoder2.go b/pkg/rtptime/global_decoder2.go index 68644e7a..30f68988 100644 --- a/pkg/rtptime/global_decoder2.go +++ b/pkg/rtptime/global_decoder2.go @@ -32,6 +32,15 @@ type GlobalDecoder2Track interface { PTSEqualsDTS(*rtp.Packet) bool } +// NewGlobalDecoder2 allocates a GlobalDecoder. +// +// Deprecated: replaced by GlobalDecoder2.Initialize(). +func NewGlobalDecoder2() *GlobalDecoder2 { + d := &GlobalDecoder2{} + d.Initialize() + return d +} + // GlobalDecoder2 is a RTP timestamp decoder. type GlobalDecoder2 struct { mutex sync.Mutex @@ -42,11 +51,9 @@ type GlobalDecoder2 struct { tracks map[GlobalDecoder2Track]*globalDecoder2TrackData } -// NewGlobalDecoder2 allocates a GlobalDecoder. -func NewGlobalDecoder2() *GlobalDecoder2 { - return &GlobalDecoder2{ - tracks: make(map[GlobalDecoder2Track]*globalDecoder2TrackData), - } +// Initialize initializes a GlobalDecoder2. +func (d *GlobalDecoder2) Initialize() { + d.tracks = make(map[GlobalDecoder2Track]*globalDecoder2TrackData) } // Decode decodes a timestamp. diff --git a/server_conn.go b/server_conn.go index ba2bf7a6..021f02e9 100644 --- a/server_conn.go +++ b/server_conn.go @@ -348,7 +348,7 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err } if stream != nil { - byts, _ := serverSideDescription(stream.desc).Marshal(multicast) + byts, _ := serverSideDescription(stream.Desc).Marshal(multicast) res.Body = byts } } diff --git a/server_play_test.go b/server_play_test.go index 679ae61e..108844c8 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -291,14 +291,18 @@ func TestServerPlayPath(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{ - Medias: []*description.Media{ - testH264Media, - testH264Media, - testH264Media, - testH264Media, + stream = &ServerStream{ + Server: s, + Desc: &description.Session{ + Medias: []*description.Media{ + testH264Media, + testH264Media, + testH264Media, + testH264Media, + }, }, - }) + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -380,7 +384,12 @@ func TestServerPlaySetupErrors(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() + if ca == "closed stream" { stream.Close() } else { @@ -547,7 +556,11 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() for i := 0; i < 2; i++ { @@ -723,7 +736,11 @@ func TestServerPlay(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", listenIP+":8554") @@ -1017,7 +1034,11 @@ func TestServerPlaySocketError(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() func() { nconn, err := net.Dial("tcp", listenIP+":8554") @@ -1183,7 +1204,11 @@ func TestServerPlayDecodeErrors(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1301,7 +1326,11 @@ func TestServerPlayRTCPReport(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1420,7 +1449,11 @@ func TestServerPlayVLCMulticast(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", listenIP+":8554") @@ -1501,7 +1534,11 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1588,7 +1625,11 @@ func TestServerPlayPause(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1681,7 +1722,11 @@ func TestServerPlayPlayPausePausePlay(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1764,7 +1809,11 @@ func TestServerPlayTimeout(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1850,7 +1899,11 @@ func TestServerPlayWithoutTeardown(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1922,7 +1975,11 @@ func TestServerPlayUDPChangeConn(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() sxID := "" @@ -2006,7 +2063,11 @@ func TestServerPlayPartialMedias(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media, testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media, testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -2112,18 +2173,22 @@ func TestServerPlayAdditionalInfos(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{ - Medias: []*description.Media{ - { - Type: "application", - Formats: []format.Format{forma}, - }, - { - Type: "application", - Formats: []format.Format{forma}, + stream = &ServerStream{ + Server: s, + Desc: &description.Session{ + Medias: []*description.Media{ + { + Type: "application", + Formats: []format.Format{forma}, + }, + { + Type: "application", + Formats: []format.Format{forma}, + }, }, }, - }) + } + stream.Initialize() defer stream.Close() err = stream.WritePacketRTP(stream.Description().Medias[0], &rtp.Packet{ @@ -2238,18 +2303,22 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{ - Medias: []*description.Media{ - { - Type: "application", - Formats: []format.Format{forma}, - }, - { - Type: "application", - Formats: []format.Format{forma}, + stream = &ServerStream{ + Server: s, + Desc: &description.Session{ + Medias: []*description.Media{ + { + Type: "application", + Formats: []format.Format{forma}, + }, + { + Type: "application", + Formats: []format.Format{forma}, + }, }, }, - }) + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -2318,7 +2387,12 @@ func TestServerPlayStreamStats(t *testing.T) { err := s.Start() require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() for _, transport := range []string{"tcp", "multicast"} { diff --git a/server_record_test.go b/server_record_test.go index a8042785..800ffb68 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -292,8 +292,12 @@ func TestServerRecordPath(t *testing.T) { s = &Server{ Handler: &testServerHandler{ onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { - // make sure that media URLs are not overridden by NewServerStream() - stream := NewServerStream(s, ctx.Description) + // make sure that media URLs are not overridden by ServerStream.Initialize() + stream := &ServerStream{ + Server: s, + Desc: ctx.Description, + } + stream.Initialize() defer stream.Close() return &base.Response{ diff --git a/server_session.go b/server_session.go index 5cd6c860..054864b1 100644 --- a/server_session.go +++ b/server_session.go @@ -1024,7 +1024,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( var medi *description.Media switch ss.state { case ServerSessionStateInitial, ServerSessionStatePrePlay: // play - medi = findMediaByTrackID(stream.desc.Medias, trackID) + medi = findMediaByTrackID(stream.Desc.Medias, trackID) default: // record medi = findMediaByURL(ss.announcedDesc.Medias, path, query, req.URL) } @@ -1183,7 +1183,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v - ss.timeDecoder = rtptime.NewGlobalDecoder2() + ss.timeDecoder = &rtptime.GlobalDecoder2{} + ss.timeDecoder.Initialize() for _, sm := range ss.setuppedMedias { sm.start() @@ -1269,7 +1270,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v - ss.timeDecoder = rtptime.NewGlobalDecoder2() + ss.timeDecoder = &rtptime.GlobalDecoder2{} + ss.timeDecoder.Initialize() for _, sm := range ss.setuppedMedias { sm.start() diff --git a/server_stream.go b/server_stream.go index 472a6bc3..b10c43f0 100644 --- a/server_stream.go +++ b/server_stream.go @@ -24,14 +24,26 @@ func firstFormat(formats map[uint8]*serverStreamFormat) *serverStreamFormat { return formats[firstKey] } +// NewServerStream allocates a ServerStream. +// +// Deprecated: replaced by ServerStream.Initialize(). +func NewServerStream(s *Server, desc *description.Session) *ServerStream { + st := &ServerStream{ + Server: s, + Desc: desc, + } + st.Initialize() + return st +} + // ServerStream represents a data stream. // This is in charge of // - distributing the stream to each reader // - allocating multicast listeners // - gathering infos about the stream in order to generate SSRC and RTP-Info type ServerStream struct { - s *Server - desc *description.Session + Server *Server + Desc *description.Session mutex sync.RWMutex readers map[*ServerSession]struct{} @@ -41,17 +53,13 @@ type ServerStream struct { closed bool } -// NewServerStream allocates a ServerStream. -func NewServerStream(s *Server, desc *description.Session) *ServerStream { - st := &ServerStream{ - s: s, - desc: desc, - readers: make(map[*ServerSession]struct{}), - activeUnicastReaders: make(map[*ServerSession]struct{}), - } +// Initialize initializes a ServerStream. +func (st *ServerStream) Initialize() { + st.readers = make(map[*ServerSession]struct{}) + st.activeUnicastReaders = make(map[*ServerSession]struct{}) - st.medias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) - for i, medi := range desc.Medias { + st.medias = make(map[*description.Media]*serverStreamMedia, len(st.Desc.Medias)) + for i, medi := range st.Desc.Medias { sm := &serverStreamMedia{ st: st, media: medi, @@ -60,8 +68,6 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream { sm.initialize() st.medias[medi] = sm } - - return st } // Close closes a ServerStream. @@ -91,8 +97,10 @@ func (st *ServerStream) BytesSent() uint64 { } // Description returns the description of the stream. +// +// Deprecated: use ServerStream.Desc. func (st *ServerStream) Description() *description.Session { - return st.desc + return st.Desc } // Stats returns stream statistics. @@ -239,7 +247,7 @@ func (st *ServerStream) readerAdd( if st.multicastReaderCount == 0 { for _, media := range st.medias { mw := &serverMulticastWriter{ - s: st.s, + s: st.Server, } err := mw.initialize() if err != nil { @@ -316,13 +324,13 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { // WritePacketRTP writes a RTP packet to all the readers of the stream. func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error { - return st.WritePacketRTPWithNTP(medi, pkt, st.s.timeNow()) + return st.WritePacketRTPWithNTP(medi, pkt, st.Server.timeNow()) } // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. // ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error { - byts := make([]byte, st.s.MaxPacketSize) + byts := make([]byte, st.Server.MaxPacketSize) n, err := pkt.MarshalTo(byts) if err != nil { return err diff --git a/server_stream_format.go b/server_stream_format.go index d0dde024..26be0a5e 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -24,10 +24,10 @@ func (sf *serverStreamFormat) initialize() { sf.rtcpSender = &rtcpsender.RTCPSender{ ClockRate: sf.format.ClockRate(), - Period: sf.sm.st.s.senderReportPeriod, - TimeNow: sf.sm.st.s.timeNow, + Period: sf.sm.st.Server.senderReportPeriod, + TimeNow: sf.sm.st.Server.timeNow, WritePacketRTCP: func(pkt rtcp.Packet) { - if !sf.sm.st.s.DisableRTCPSenderReports { + if !sf.sm.st.Server.DisableRTCPSenderReports { sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck } }, diff --git a/server_test.go b/server_test.go index 44d87735..f3045194 100644 --- a/server_test.go +++ b/server_test.go @@ -388,7 +388,11 @@ func TestServerErrorMethodNotImplemented(t *testing.T) { require.NoError(t, err) defer s.Close() - stream := NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream := &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() h.stream = stream @@ -481,7 +485,11 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn1, err := net.Dial("tcp", "localhost:8554") @@ -562,7 +570,11 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -625,7 +637,11 @@ func TestServerSetupMultipleTransports(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -721,7 +737,11 @@ func TestServerGetSetParameter(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -865,7 +885,11 @@ func TestServerSessionClose(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -940,7 +964,11 @@ func TestServerSessionAutoClose(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1002,7 +1030,11 @@ func TestServerSessionTeardown(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{testH264Media}}, + } + stream.Initialize() defer stream.Close() nconn, err := net.Dial("tcp", "localhost:8554") @@ -1095,7 +1127,12 @@ func TestServerAuth(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusUnauthorized, res.StatusCode) - sender, err := auth.NewSender(res.Header["WWW-Authenticate"], "myuser", "mypass") + sender := &auth.Sender{ + WWWAuth: res.Header["WWW-Authenticate"], + User: "myuser", + Pass: "mypass", + } + err = sender.Initialize() require.NoError(t, err) sender.AddAuthorization(&req) @@ -1153,7 +1190,12 @@ func TestServerAuthFail(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusUnauthorized, res.StatusCode) - sender, err := auth.NewSender(res.Header["WWW-Authenticate"], "myuser", "mypass") + sender := &auth.Sender{ + WWWAuth: res.Header["WWW-Authenticate"], + User: "myuser", + Pass: "mypass", + } + err = sender.Initialize() require.NoError(t, err) sender.AddAuthorization(&req)