From c466c342ba630ff5e7a6a2b38d7528ae1ad5847c Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 6 Sep 2025 16:28:40 +0200 Subject: [PATCH] expose transport details (#850) add Client.Transport, ServerConn.Transport ServerSession.Transport --- client.go | 87 ++++++++------ client_play_test.go | 23 ++-- client_record_test.go | 20 ++-- client_stats.go | 4 +- client_transport.go | 8 ++ stats_conn.go => conn_stats.go | 7 +- conn_transport.go | 4 + server_conn.go | 9 +- server_handler.go | 16 ++- server_play_test.go | 1 + server_session.go | 109 +++++++++++------- server_session_format.go | 9 +- server_session_media.go | 14 +-- server_stream.go | 8 +- server_stream_format.go | 2 +- server_stream_media.go | 2 +- stats_session.go => session_stats.go | 29 +++-- session_transport.go | 9 ++ transport.go => transport_protocol.go | 13 ++- ...port_test.go => transport_protocol_test.go | 4 +- 20 files changed, 241 insertions(+), 137 deletions(-) create mode 100644 client_transport.go rename stats_conn.go => conn_stats.go (50%) create mode 100644 conn_transport.go rename stats_session.go => session_stats.go (76%) create mode 100644 session_transport.go rename transport.go => transport_protocol.go (53%) rename transport_test.go => transport_protocol_test.go (72%) diff --git a/client.go b/client.go index c6020762..66fb8c3e 100644 --- a/client.go +++ b/client.go @@ -468,7 +468,7 @@ type Client struct { // transport protocol (UDP, Multicast or TCP). // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). // It defaults to nil. - Transport *Transport + Transport *TransportProtocol // If the client is reading with UDP, it must receive // at least a packet within this timeout, otherwise it switches to TCP. // It defaults to 3 seconds. @@ -556,8 +556,7 @@ type Client struct { lastDescribeDesc *description.Session baseURL *base.URL announceData map[*description.Media]*clientAnnounceDataMedia // record - setuppedTransport *Transport - setuppedProfile headers.TransportProfile + setuppedTransport *SessionTransport backChannelSetupped bool stdChannelSetupped bool setuppedMedias map[*description.Media]*clientMedia @@ -1047,8 +1046,9 @@ func (c *Client) trySwitchingProtocol() error { c.reset() - v := TransportTCP - c.setuppedTransport = &v + c.setuppedTransport = &SessionTransport{ + Protocol: TransportTCP, + } // some Hikvision cameras require a describe before a setup _, _, err := c.doDescribe(c.lastDescribeURL) @@ -1084,18 +1084,18 @@ func (c *Client) startTransportRoutines() { cm.start() } - if *c.setuppedTransport == TransportTCP { + if c.setuppedTransport.Protocol == TransportTCP { c.tcpFrame = &base.InterleavedFrame{} c.tcpBuffer = make([]byte, c.MaxPacketSize+4) } // always enable keepalives unless we are recording with TCP - if c.state == clientStatePlay || *c.setuppedTransport != TransportTCP { + if c.state == clientStatePlay || c.setuppedTransport.Protocol != TransportTCP { c.keepAliveTimer = time.NewTimer(c.keepAlivePeriod) } if c.state == clientStatePlay && c.stdChannelSetupped { - switch *c.setuppedTransport { + switch c.setuppedTransport.Protocol { case TransportUDP: c.checkTimeoutTimer = time.NewTimer(c.InitialUDPReadTimeout) c.checkTimeoutInitial = true @@ -1110,7 +1110,7 @@ func (c *Client) startTransportRoutines() { } } - if *c.setuppedTransport == TransportTCP { + if c.setuppedTransport.Protocol == TransportTCP { c.reader.setAllowInterleavedFrames(true) } } @@ -1325,8 +1325,8 @@ func (c *Client) isInTCPTimeout() bool { } func (c *Client) doCheckTimeout() error { - if *c.setuppedTransport == TransportUDP || - *c.setuppedTransport == TransportUDPMulticast { + if c.setuppedTransport.Protocol == TransportUDP || + c.setuppedTransport.Protocol == TransportUDPMulticast { if c.checkTimeoutInitial && !c.backChannelSetupped && c.Transport == nil { c.checkTimeoutInitial = false @@ -1626,17 +1626,17 @@ func (c *Client) doSetup( th.Mode = &v } - var transport Transport + var protocol TransportProtocol switch { // use transport from previous SETUP calls case c.setuppedTransport != nil: - transport = *c.setuppedTransport - th.Profile = c.setuppedProfile + protocol = c.setuppedTransport.Protocol + th.Profile = c.setuppedTransport.Profile // use transport from config, secure flag from server case c.Transport != nil: - transport = *c.Transport + protocol = *c.Transport if isSecure(medi.Profile) && c.Scheme == "rtsps" { th.Profile = headers.TransportProfileSAVP } else { @@ -1654,9 +1654,9 @@ func (c *Client) doSetup( } if th.Profile == headers.TransportProfileSAVP || c.Scheme == "rtsp" { - transport = TransportUDP + protocol = TransportUDP } else { - transport = TransportTCP + protocol = TransportTCP } } @@ -1692,7 +1692,7 @@ func (c *Client) doSetup( } }() - switch transport { + switch protocol { case TransportUDP, TransportUDPMulticast: if c.Scheme == "rtsps" && !isSecure(th.Profile) { return nil, fmt.Errorf("unable to setup secure UDP") @@ -1700,7 +1700,7 @@ func (c *Client) doSetup( th.Protocol = headers.TransportProtocolUDP - if transport == TransportUDP { + if protocol == TransportUDP { if (rtpPort == 0 && rtcpPort != 0) || (rtpPort != 0 && rtcpPort == 0) { return nil, liberrors.ErrClientUDPPortsZero{} @@ -1808,9 +1808,10 @@ func (c *Client) doSetup( if res.StatusCode == base.StatusUnsupportedTransport && c.setuppedTransport == nil && c.Transport == nil { c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP2{}) - v := TransportTCP - c.setuppedTransport = &v - c.setuppedProfile = th.Profile + c.setuppedTransport = &SessionTransport{ + Protocol: TransportTCP, + Profile: th.Profile, + } return c.doSetup(baseURL, medi, 0, 0) } @@ -1824,7 +1825,7 @@ func (c *Client) doSetup( return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err} } - switch transport { + switch protocol { case TransportUDP, TransportUDPMulticast: if thRes.Protocol == headers.TransportProtocolTCP { // switch transport automatically @@ -1835,9 +1836,10 @@ func (c *Client) doSetup( c.reset() - v := TransportTCP - c.setuppedTransport = &v - c.setuppedProfile = th.Profile + c.setuppedTransport = &SessionTransport{ + Protocol: TransportTCP, + Profile: th.Profile, + } // some Hikvision cameras require a describe before a setup _, _, err = c.doDescribe(c.lastDescribeURL) @@ -1852,7 +1854,7 @@ func (c *Client) doSetup( } } - switch transport { + switch protocol { case TransportUDP: if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} @@ -2032,8 +2034,10 @@ func (c *Client) doSetup( c.setuppedMedias[medi] = cm c.baseURL = baseURL - c.setuppedTransport = &transport - c.setuppedProfile = th.Profile + c.setuppedTransport = &SessionTransport{ + Protocol: protocol, + Profile: th.Profile, + } c.propsMutex.Unlock() @@ -2136,7 +2140,7 @@ func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) { // when protocol is UDP, // open the firewall by sending empty packets to the remote part. // do this before sending the PLAY request. - if *c.setuppedTransport == TransportUDP { + if c.setuppedTransport.Protocol == TransportUDP { for _, cm := range c.setuppedMedias { if !cm.media.IsBackChannel && cm.udpRTPListener.writeAddr != nil { buf, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() @@ -2428,6 +2432,17 @@ func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, return ct.rtpReceiver.PacketNTP(pkt.Timestamp) } +// Transport2 returns transport details. +func (c *Client) Transport2() *ClientTransport { + c.propsMutex.RLock() + defer c.propsMutex.RUnlock() + + return &ClientTransport{ + Conn: ConnTransport{}, + Session: c.setuppedTransport, + } +} + // Stats returns client statistics. func (c *Client) Stats() *ClientStats { c.propsMutex.RLock() @@ -2437,15 +2452,15 @@ func (c *Client) Stats() *ClientStats { ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias)) for med, sm := range c.setuppedMedias { - ret[med] = StatsSessionMedia{ + ret[med] = SessionStatsMedia{ BytesReceived: atomic.LoadUint64(sm.bytesReceived), BytesSent: atomic.LoadUint64(sm.bytesSent), RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), - Formats: func() map[format.Format]StatsSessionFormat { - ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + Formats: func() map[format.Format]SessionStatsFormat { + ret := make(map[format.Format]SessionStatsFormat, len(sm.formats)) for _, fo := range sm.formats { recvStats := func() *rtpreceiver.Stats { @@ -2461,7 +2476,7 @@ func (c *Client) Stats() *ClientStats { return nil }() - ret[fo.format] = StatsSessionFormat{ //nolint:dupl + ret[fo.format] = SessionStatsFormat{ //nolint:dupl RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), @@ -2517,11 +2532,11 @@ func (c *Client) Stats() *ClientStats { }() return &ClientStats{ - Conn: StatsConn{ + Conn: ConnStats{ BytesReceived: atomic.LoadUint64(c.bytesReceived), BytesSent: atomic.LoadUint64(c.bytesSent), }, - Session: StatsSession{ //nolint:dupl + Session: SessionStats{ //nolint:dupl BytesReceived: func() uint64 { v := uint64(0) for _, ms := range mediaStats { diff --git a/client_play_test.go b/client_play_test.go index 77465d0c..63d5b596 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -36,7 +36,7 @@ func deliveryPtr(v headers.TransportDelivery) *headers.TransportDelivery { return &v } -func transportPtr(v Transport) *Transport { +func transportPtr(v TransportProtocol) *TransportProtocol { return &v } @@ -616,7 +616,7 @@ func TestClientPlay(t *testing.T) { Scheme: u.Scheme, Host: u.Host, TLSConfig: &tls.Config{InsecureSkipVerify: true}, - Transport: func() *Transport { + Transport: func() *TransportProtocol { switch ca.transport { case "udp": v := TransportUDP @@ -643,6 +643,7 @@ func TestClientPlay(t *testing.T) { // test that properties can be accessed in parallel go func() { c.Stats() + c.Transport2() }() err = c.SetupAll(sd.BaseURL, sd.Medias) @@ -661,23 +662,23 @@ func TestClientPlay(t *testing.T) { s := c.Stats() require.Equal(t, &ClientStats{ - Conn: StatsConn{ + Conn: ConnStats{ BytesReceived: s.Conn.BytesReceived, BytesSent: s.Conn.BytesSent, }, - Session: StatsSession{ + Session: SessionStats{ BytesReceived: s.Session.BytesReceived, BytesSent: s.Session.BytesSent, RTPPacketsReceived: s.Session.RTPPacketsReceived, RTCPPacketsReceived: s.Session.RTCPPacketsReceived, RTCPPacketsSent: s.Session.RTCPPacketsSent, - Medias: map[*description.Media]StatsSessionMedia{ + Medias: map[*description.Media]SessionStatsMedia{ sd.Medias[0]: { //nolint:dupl BytesReceived: s.Session.Medias[sd.Medias[0]].BytesReceived, BytesSent: s.Session.Medias[sd.Medias[0]].BytesSent, RTCPPacketsReceived: s.Session.Medias[sd.Medias[0]].RTCPPacketsReceived, RTCPPacketsSent: s.Session.Medias[sd.Medias[0]].RTCPPacketsSent, - Formats: map[format.Format]StatsSessionFormat{ + Formats: map[format.Format]SessionStatsFormat{ sd.Medias[0].Formats[0]: { RTPPacketsReceived: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RTPPacketsReceived, LocalSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].LocalSSRC, @@ -690,7 +691,7 @@ func TestClientPlay(t *testing.T) { BytesSent: s.Session.Medias[sd.Medias[1]].BytesSent, RTCPPacketsReceived: s.Session.Medias[sd.Medias[1]].RTCPPacketsReceived, RTCPPacketsSent: s.Session.Medias[sd.Medias[1]].RTCPPacketsSent, - Formats: map[format.Format]StatsSessionFormat{ + Formats: map[format.Format]SessionStatsFormat{ sd.Medias[1].Formats[0]: { RTPPacketsReceived: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RTPPacketsReceived, LocalSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].LocalSSRC, @@ -2427,7 +2428,7 @@ func TestClientPlayPausePlay(t *testing.T) { packetRecv := make(chan struct{}) c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if transport == "udp" { v := TransportUDP return &v @@ -2746,7 +2747,7 @@ func TestClientPlayErrorTimeout(t *testing.T) { }() c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { switch transport { case "udp": v := TransportUDP @@ -3424,7 +3425,7 @@ func TestClientPlayDecodeErrors(t *testing.T) { }() c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if ca.proto == "udp" { v := TransportUDP return &v @@ -3908,7 +3909,7 @@ func TestClientPlayBackChannel(t *testing.T) { Scheme: u.Scheme, Host: u.Host, RequestBackChannels: true, - Transport: func() *Transport { + Transport: func() *TransportProtocol { if transport == "tcp" { return transportPtr(TransportTCP) } diff --git a/client_record_test.go b/client_record_test.go index 5f6166a1..5f34bb8b 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -416,7 +416,7 @@ func TestClientRecord(t *testing.T) { TLSConfig: &tls.Config{ InsecureSkipVerify: true, }, - Transport: func() *Transport { + Transport: func() *TransportProtocol { if ca.transport == "udp" { v := TransportUDP return &v @@ -449,24 +449,24 @@ func TestClientRecord(t *testing.T) { s := c.Stats() require.Equal(t, &ClientStats{ - Conn: StatsConn{ + Conn: ConnStats{ BytesReceived: s.Conn.BytesReceived, BytesSent: s.Conn.BytesSent, }, - Session: StatsSession{ + Session: SessionStats{ BytesReceived: s.Session.BytesReceived, BytesSent: s.Session.BytesSent, RTPPacketsSent: s.Session.RTPPacketsSent, RTPPacketsReceived: s.Session.RTPPacketsReceived, RTCPPacketsReceived: s.Session.RTCPPacketsReceived, RTCPPacketsSent: s.Session.RTCPPacketsSent, - Medias: map[*description.Media]StatsSessionMedia{ + Medias: map[*description.Media]SessionStatsMedia{ medias[0]: { BytesReceived: s.Session.Medias[medias[0]].BytesReceived, BytesSent: s.Session.Medias[medias[0]].BytesSent, RTCPPacketsReceived: s.Session.Medias[medias[0]].RTCPPacketsReceived, RTCPPacketsSent: s.Session.Medias[medias[0]].RTCPPacketsSent, - Formats: map[format.Format]StatsSessionFormat{ + Formats: map[format.Format]SessionStatsFormat{ medias[0].Formats[0]: { RTPPacketsSent: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsSent, RTPPacketsReceived: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsReceived, @@ -586,7 +586,7 @@ func TestClientRecordSocketError(t *testing.T) { TLSConfig: &tls.Config{ InsecureSkipVerify: true, }, - Transport: func() *Transport { + Transport: func() *TransportProtocol { if transport == "udp" { v := TransportUDP return &v @@ -730,7 +730,7 @@ func TestClientRecordPauseRecordSerial(t *testing.T) { }() c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if transport == "udp" { v := TransportUDP return &v @@ -889,7 +889,7 @@ func TestClientRecordPauseRecordParallel(t *testing.T) { }() c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if transport == "udp" { v := TransportUDP return &v @@ -1214,7 +1214,7 @@ func TestClientRecordDecodeErrors(t *testing.T) { }() c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if ca.proto == "udp" { v := TransportUDP return &v @@ -1388,7 +1388,7 @@ func TestClientRecordRTCPReport(t *testing.T) { var curTimeMutex sync.Mutex c := Client{ - Transport: func() *Transport { + Transport: func() *TransportProtocol { if ca == "udp" { v := TransportUDP return &v diff --git a/client_stats.go b/client_stats.go index 22451ea3..7ca3786b 100644 --- a/client_stats.go +++ b/client_stats.go @@ -2,6 +2,6 @@ package gortsplib // ClientStats are client statistics type ClientStats struct { - Conn StatsConn - Session StatsSession + Conn ConnStats + Session SessionStats } diff --git a/client_transport.go b/client_transport.go new file mode 100644 index 00000000..c08d1629 --- /dev/null +++ b/client_transport.go @@ -0,0 +1,8 @@ +package gortsplib + +// ClientTransport contains details about the client transport. +type ClientTransport struct { + Conn ConnTransport + // present only when SETUP has been called at least once. + Session *SessionTransport +} diff --git a/stats_conn.go b/conn_stats.go similarity index 50% rename from stats_conn.go rename to conn_stats.go index 635519b9..073f6b3d 100644 --- a/stats_conn.go +++ b/conn_stats.go @@ -1,7 +1,12 @@ package gortsplib // StatsConn are connection statistics. -type StatsConn struct { +// +// Deprecated: renamed into ConnStats. +type StatsConn = ConnStats + +// ConnStats are connection statistics. +type ConnStats struct { // received bytes BytesReceived uint64 // sent bytes diff --git a/conn_transport.go b/conn_transport.go new file mode 100644 index 00000000..bc8a14a6 --- /dev/null +++ b/conn_transport.go @@ -0,0 +1,4 @@ +package gortsplib + +// ConnTransport contains details about the transport of a connection. +type ConnTransport struct{} diff --git a/server_conn.go b/server_conn.go index 5be5087f..7ab95a80 100644 --- a/server_conn.go +++ b/server_conn.go @@ -276,9 +276,14 @@ func (sc *ServerConn) Session() *ServerSession { return sc.session } +// Transport returns transport details. +func (sc *ServerConn) Transport() *ConnTransport { + return &ConnTransport{} +} + // Stats returns connection statistics. -func (sc *ServerConn) Stats() *StatsConn { - return &StatsConn{ +func (sc *ServerConn) Stats() *ConnStats { + return &ConnStats{ BytesReceived: sc.bc.BytesReceived(), BytesSent: sc.bc.BytesSent(), } diff --git a/server_handler.go b/server_handler.go index 389dcc16..bd816ac5 100644 --- a/server_handler.go +++ b/server_handler.go @@ -99,12 +99,16 @@ type ServerHandlerOnAnnounce interface { // ServerHandlerOnSetupCtx is the context of OnSetup. type ServerHandlerOnSetupCtx struct { - Session *ServerSession - Conn *ServerConn - Request *base.Request - Path string - Query string - Transport Transport + Session *ServerSession + Conn *ServerConn + Request *base.Request + Path string + Query string + + // Deprecated: replaced by Transport2. + Transport TransportProtocol + + Transport2 *SessionTransport } // ServerHandlerOnSetup can be implemented by a ServerHandler. diff --git a/server_play_test.go b/server_play_test.go index c9c5e74c..78651aca 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -678,6 +678,7 @@ func TestServerPlay(t *testing.T) { ctx.Session.Stream() ctx.Session.SetuppedTransport() ctx.Session.SetuppedSecure() + ctx.Session.Transport() }() return &base.Response{ diff --git a/server_session.go b/server_session.go index 833bc701..1d74f590 100644 --- a/server_session.go +++ b/server_session.go @@ -435,8 +435,7 @@ type ServerSession struct { setuppedMedias map[*description.Media]*serverSessionMedia setuppedMediasOrdered []*serverSessionMedia tcpCallbackByChannel map[int]readFunc - setuppedTransport *Transport - setuppedProfile headers.TransportProfile + setuppedTransport *SessionTransport setuppedStream *ServerStream // play setuppedPath string setuppedQuery string @@ -514,21 +513,31 @@ func (ss *ServerSession) State() ServerSessionState { } // SetuppedTransport returns the transport negotiated during SETUP. -func (ss *ServerSession) SetuppedTransport() *Transport { +// +// Deprecated: replaced by Transport. +func (ss *ServerSession) SetuppedTransport() *TransportProtocol { ss.propsMutex.RLock() defer ss.propsMutex.RUnlock() - return ss.setuppedTransport + if ss.setuppedTransport == nil { + return nil + } + return &ss.setuppedTransport.Protocol } // SetuppedSecure returns whether a secure profile is in use. // If this is false, it does not mean that the stream is not secure, since // there are some combinations that are secure nonetheless, like RTSPS+TCP+unsecure. +// +// Deprecated: replaced by Transport. func (ss *ServerSession) SetuppedSecure() bool { ss.propsMutex.RLock() defer ss.propsMutex.RUnlock() - return isSecure(ss.setuppedProfile) + if ss.setuppedTransport == nil { + return false + } + return isSecure(ss.setuppedTransport.Profile) } // SetuppedStream returns the stream associated with the session. @@ -613,6 +622,15 @@ func (ss *ServerSession) UserData() interface{} { return ss.userData } +// Transport returns transport details. +// This is non-nil only if SETUP has been called at least once. +func (ss *ServerSession) Transport() *SessionTransport { + ss.propsMutex.RLock() + defer ss.propsMutex.RUnlock() + + return ss.setuppedTransport +} + // Stats returns server session statistics. func (ss *ServerSession) Stats() *StatsSession { ss.propsMutex.RLock() @@ -622,15 +640,15 @@ func (ss *ServerSession) Stats() *StatsSession { ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias)) for med, sm := range ss.setuppedMedias { - ret[med] = StatsSessionMedia{ + ret[med] = SessionStatsMedia{ BytesReceived: atomic.LoadUint64(sm.bytesReceived), BytesSent: atomic.LoadUint64(sm.bytesSent), RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), - Formats: func() map[format.Format]StatsSessionFormat { - ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + Formats: func() map[format.Format]SessionStatsFormat { + ret := make(map[format.Format]SessionStatsFormat, len(sm.formats)) for _, fo := range sm.formats { recvStats := func() *rtpreceiver.Stats { @@ -652,7 +670,7 @@ func (ss *ServerSession) Stats() *StatsSession { return nil }() - ret[fo.format] = StatsSessionFormat{ //nolint:dupl + ret[fo.format] = SessionStatsFormat{ //nolint:dupl RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), @@ -707,7 +725,7 @@ func (ss *ServerSession) Stats() *StatsSession { return ret }() - return &StatsSession{ //nolint:dupl + return &SessionStats{ //nolint:dupl BytesReceived: func() uint64 { v := uint64(0) for _, ms := range mediaStats { @@ -943,8 +961,8 @@ func (ss *ServerSession) runInner() error { // and transport is UDP or UDP-multicast. if (ss.state == ServerSessionStatePrePlay || ss.state == ServerSessionStatePlay) && - (*ss.setuppedTransport == TransportUDP || - *ss.setuppedTransport == TransportUDPMulticast) { + (ss.setuppedTransport.Protocol == TransportUDP || + ss.setuppedTransport.Protocol == TransportUDPMulticast) { v := uint(ss.s.sessionTimeout / time.Second) return &v } @@ -980,7 +998,7 @@ func (ss *ServerSession) runInner() error { // close the session. if ((ss.state != ServerSessionStateRecord && ss.state != ServerSessionStatePlay) || - *ss.setuppedTransport == TransportTCP) && + ss.setuppedTransport.Protocol == TransportTCP) && len(ss.conns) == 0 { return liberrors.ErrServerSessionNotInUse{} } @@ -988,7 +1006,7 @@ func (ss *ServerSession) runInner() error { case <-ss.chAsyncStartWriter: if (ss.state == ServerSessionStateRecord || ss.state == ServerSessionStatePlay) && - *ss.setuppedTransport == TransportTCP { + ss.setuppedTransport.Protocol == TransportTCP { ss.startWriter() } @@ -1187,18 +1205,18 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( query = ss.setuppedQuery } - var transport Transport + var protocol TransportProtocol switch inTH.Protocol { case headers.TransportProtocolUDP: if inTH.Delivery != nil && *inTH.Delivery == headers.TransportDeliveryMulticast { - transport = TransportUDPMulticast + protocol = TransportUDPMulticast } else { - transport = TransportUDP + protocol = TransportUDP } case headers.TransportProtocolTCP: - transport = TransportTCP + protocol = TransportTCP } var srtpInCtx *wrappedSRTPContext @@ -1220,13 +1238,20 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } } - if ss.setuppedTransport != nil && (*ss.setuppedTransport != transport || ss.setuppedProfile != inTH.Profile) { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, liberrors.ErrServerMediasDifferentTransports{} + if ss.setuppedTransport != nil { + cmp := SessionTransport{ + Protocol: protocol, + Profile: inTH.Profile, + } + + if *ss.setuppedTransport != cmp { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, liberrors.ErrServerMediasDifferentTransports{} + } } - switch transport { + switch protocol { case TransportUDP: if inTH.ClientPorts == nil { return &base.Response{ @@ -1259,7 +1284,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } default: // record - if transport == TransportUDPMulticast { + if protocol == TransportUDPMulticast { return &base.Response{ StatusCode: base.StatusUnsupportedTransport, }, nil @@ -1278,7 +1303,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( Request: req, Path: path, Query: query, - Transport: transport, + Transport: protocol, + Transport2: &SessionTransport{ + Protocol: protocol, + Profile: inTH.Profile, + }, }) // workaround to prevent a bug in rtspclientsink @@ -1332,7 +1361,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( if ss.state == ServerSessionStateInitial { err = stream.readerAdd(ss, inTH.ClientPorts, - transport, + protocol, ) if err != nil { return &base.Response{ @@ -1410,11 +1439,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( var udpRTCPWriteAddr *net.UDPAddr var tcpChannel int - switch transport { + switch protocol { case TransportUDP, TransportUDPMulticast: th.Protocol = headers.TransportProtocolUDP - if transport == TransportUDP { + if protocol == TransportUDP { udpRTPReadPort = inTH.ClientPorts[0] udpRTCPReadPort = inTH.ClientPorts[1] @@ -1460,8 +1489,10 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.propsMutex.Lock() - ss.setuppedTransport = &transport - ss.setuppedProfile = inTH.Profile + ss.setuppedTransport = &SessionTransport{ + Protocol: protocol, + Profile: inTH.Profile, + } sm := &serverSessionMedia{ ss: ss, @@ -1542,7 +1573,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } if ss.state != ServerSessionStatePlay && - *ss.setuppedTransport != TransportUDPMulticast { + ss.setuppedTransport.Protocol != TransportUDPMulticast { ss.createWriter() } @@ -1575,12 +1606,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } } - if *ss.setuppedTransport == TransportTCP { + if ss.setuppedTransport.Protocol == TransportTCP { ss.tcpFrame = &base.InterleavedFrame{} ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4) } - switch *ss.setuppedTransport { + switch ss.setuppedTransport.Protocol { case TransportUDP: ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.startWriter() @@ -1613,7 +1644,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } } else { if ss.state != ServerSessionStatePlay && - *ss.setuppedTransport != TransportUDPMulticast { + ss.setuppedTransport.Protocol != TransportUDPMulticast { ss.destroyWriter() } } @@ -1670,12 +1701,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } } - if *ss.setuppedTransport == TransportTCP { + if ss.setuppedTransport.Protocol == TransportTCP { ss.tcpFrame = &base.InterleavedFrame{} ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4) } - switch *ss.setuppedTransport { + switch ss.setuppedTransport.Protocol { case TransportUDP: ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.startWriter() @@ -1733,7 +1764,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.state = ServerSessionStatePrePlay ss.propsMutex.Unlock() - switch *ss.setuppedTransport { + switch ss.setuppedTransport.Protocol { case TransportUDP: ss.udpCheckStreamTimer = emptyTimer() @@ -1746,7 +1777,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } case ServerSessionStateRecord: - switch *ss.setuppedTransport { + switch ss.setuppedTransport.Protocol { case TransportUDP: ss.udpCheckStreamTimer = emptyTimer() @@ -1767,7 +1798,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( case base.Teardown: var err error if (ss.state == ServerSessionStatePlay || ss.state == ServerSessionStateRecord) && - *ss.setuppedTransport == TransportTCP { + ss.setuppedTransport.Protocol == TransportTCP { err = switchReadFuncError{false} } diff --git a/server_session_format.go b/server_session_format.go index e581bba3..93a067ac 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -31,7 +31,8 @@ func (sf *serverSessionFormat) initialize() { sf.rtpPacketsSent = new(uint64) sf.rtpPacketsLost = new(uint64) - udp := *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast + udp := sf.sm.ss.setuppedTransport.Protocol == TransportUDP || + sf.sm.ss.setuppedTransport.Protocol == TransportUDPMulticast if udp { sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP @@ -119,7 +120,7 @@ func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error { pkt.SSRC = sf.localSSRC maxPlainPacketSize := sf.sm.ss.s.MaxPacketSize - if isSecure(sf.sm.ss.setuppedProfile) { + if isSecure(sf.sm.ss.setuppedTransport.Profile) { maxPlainPacketSize -= srtpOverhead } @@ -131,7 +132,7 @@ func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error { plain = plain[:n] var encr []byte - if isSecure(sf.sm.ss.setuppedProfile) { + if isSecure(sf.sm.ss.setuppedTransport.Profile) { encr = make([]byte, sf.sm.ss.s.MaxPacketSize) encr, err = sf.sm.srtpOutCtx.encryptRTP(encr, plain, &pkt.Header) if err != nil { @@ -139,7 +140,7 @@ func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error { } } - if isSecure(sf.sm.ss.setuppedProfile) { + if isSecure(sf.sm.ss.setuppedTransport.Profile) { return sf.writePacketRTPEncoded(encr) } return sf.writePacketRTPEncoded(plain) diff --git a/server_session_media.go b/server_session_media.go index 07077f35..6708bb06 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -58,7 +58,7 @@ func (sm *serverSessionMedia) initialize() { sm.formats[forma.PayloadType()] = f } - switch *sm.ss.setuppedTransport { + switch sm.ss.setuppedTransport.Protocol { case TransportUDP, TransportUDPMulticast: sm.writePacketRTCPInQueue = sm.writePacketRTCPInQueueUDP @@ -88,9 +88,9 @@ func (sm *serverSessionMedia) close() { } func (sm *serverSessionMedia) start() error { - switch *sm.ss.setuppedTransport { + switch sm.ss.setuppedTransport.Protocol { case TransportUDP, TransportUDPMulticast: - if *sm.ss.setuppedTransport == TransportUDP { + if sm.ss.setuppedTransport.Protocol == TransportUDP { if sm.ss.state == ServerSessionStatePlay { if sm.media.IsBackChannel { sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readPacketRTPUDPPlay) @@ -136,7 +136,7 @@ func (sm *serverSessionMedia) start() error { } func (sm *serverSessionMedia) stop() { - if *sm.ss.setuppedTransport == TransportUDP { + if sm.ss.setuppedTransport.Protocol == TransportUDP { sm.ss.s.udpRTPListener.removeClient(sm.ss.author.ip(), sm.udpRTPReadPort) sm.ss.s.udpRTCPListener.removeClient(sm.ss.author.ip(), sm.udpRTCPReadPort) } @@ -427,7 +427,7 @@ func (sm *serverSessionMedia) writePacketRTCP(pkt rtcp.Packet) error { } maxPlainPacketSize := sm.ss.s.MaxPacketSize - if isSecure(sm.ss.setuppedProfile) { + if isSecure(sm.ss.setuppedTransport.Profile) { maxPlainPacketSize -= srtcpOverhead } @@ -436,7 +436,7 @@ func (sm *serverSessionMedia) writePacketRTCP(pkt rtcp.Packet) error { } var encr []byte - if isSecure(sm.ss.setuppedProfile) { + if isSecure(sm.ss.setuppedTransport.Profile) { encr = make([]byte, sm.ss.s.MaxPacketSize) encr, err = sm.srtpOutCtx.encryptRTCP(encr, plain, nil) if err != nil { @@ -444,7 +444,7 @@ func (sm *serverSessionMedia) writePacketRTCP(pkt rtcp.Packet) error { } } - if isSecure(sm.ss.setuppedProfile) { + if isSecure(sm.ss.setuppedTransport.Profile) { return sm.writePacketRTCPEncoded(encr) } return sm.writePacketRTCPEncoded(plain) diff --git a/server_stream.go b/server_stream.go index 6283181a..a12029a2 100644 --- a/server_stream.go +++ b/server_stream.go @@ -211,7 +211,7 @@ func (st *ServerStream) Stats() *ServerStreamStats { func (st *ServerStream) readerAdd( ss *ServerSession, clientPorts *[2]int, - protocol Transport, + protocol TransportProtocol, ) error { st.mutex.Lock() defer st.mutex.Unlock() @@ -266,7 +266,7 @@ func (st *ServerStream) readerRemove(ss *ServerSession) { delete(st.readers, ss) - if *ss.setuppedTransport == TransportUDPMulticast { + if ss.setuppedTransport.Protocol == TransportUDPMulticast { st.multicastReaderCount-- if st.multicastReaderCount == 0 { for _, media := range st.medias { @@ -285,7 +285,7 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) { return } - if *ss.setuppedTransport == TransportUDPMulticast { + if ss.setuppedTransport.Protocol == TransportUDPMulticast { for medi, sm := range ss.setuppedMedias { streamMedia := st.medias[medi] streamMedia.multicastWriter.rtcpl.addClient( @@ -304,7 +304,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { return } - if *ss.setuppedTransport == TransportUDPMulticast { + if ss.setuppedTransport.Protocol == TransportUDPMulticast { for medi := range ss.setuppedMedias { streamMedia := st.medias[medi] streamMedia.multicastWriter.rtcpl.removeClient(ss.author.ip(), streamMedia.multicastWriter.rtcpl.port()) diff --git a/server_stream_format.go b/server_stream_format.go index eb14cc62..1cabc276 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -86,7 +86,7 @@ func (sf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) err if rsm, ok := r.setuppedMedias[sf.sm.media]; ok { rsf := rsm.formats[pkt.PayloadType] - if isSecure(r.setuppedProfile) { + if isSecure(r.setuppedTransport.Profile) { err = rsf.writePacketRTPEncoded(encr) if err != nil { r.onStreamWriteError(err) diff --git a/server_stream_media.go b/server_stream_media.go index 80058985..4c41fd12 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -78,7 +78,7 @@ func (sm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error { // send unicast for r := range sm.st.activeUnicastReaders { if sm, ok := r.setuppedMedias[sm.media]; ok { - if isSecure(r.setuppedProfile) { + if isSecure(r.setuppedTransport.Profile) { err = sm.writePacketRTCPEncoded(encr) if err != nil { r.onStreamWriteError(err) diff --git a/stats_session.go b/session_stats.go similarity index 76% rename from stats_session.go rename to session_stats.go index 5540ef62..3b7cca20 100644 --- a/stats_session.go +++ b/session_stats.go @@ -8,7 +8,22 @@ import ( ) // StatsSessionFormat are session format statistics. -type StatsSessionFormat struct { +// +// Deprecated: replaced by SessionStatsFormat +type StatsSessionFormat = SessionStatsFormat + +// StatsSessionMedia are session media statistics. +// +// Deprecated: replaced by SessionStatsMedia +type StatsSessionMedia = SessionStatsMedia + +// StatsSession are session statistics. +// +// Deprecated: replaced by SessionStats. +type StatsSession = SessionStats + +// SessionStatsFormat are session format statistics. +type SessionStatsFormat struct { // number of RTP packets correctly received and processed RTPPacketsReceived uint64 // number of sent RTP packets @@ -29,8 +44,8 @@ type StatsSessionFormat struct { RTPPacketsLastNTP time.Time } -// StatsSessionMedia are session media statistics. -type StatsSessionMedia struct { +// SessionStatsMedia are session media statistics. +type SessionStatsMedia struct { // received bytes BytesReceived uint64 // sent bytes @@ -45,11 +60,11 @@ type StatsSessionMedia struct { RTCPPacketsInError uint64 // format statistics - Formats map[format.Format]StatsSessionFormat + Formats map[format.Format]SessionStatsFormat } -// StatsSession are session statistics. -type StatsSession struct { +// SessionStats are session statistics. +type SessionStats struct { // received bytes BytesReceived uint64 // sent bytes @@ -72,5 +87,5 @@ type StatsSession struct { RTCPPacketsInError uint64 // media statistics - Medias map[*description.Media]StatsSessionMedia + Medias map[*description.Media]SessionStatsMedia } diff --git a/session_transport.go b/session_transport.go new file mode 100644 index 00000000..a3acc3ab --- /dev/null +++ b/session_transport.go @@ -0,0 +1,9 @@ +package gortsplib + +import "github.com/bluenviron/gortsplib/v4/pkg/headers" + +// SessionTransport contains details about the transport of a session. +type SessionTransport struct { + Protocol TransportProtocol + Profile headers.TransportProfile +} diff --git a/transport.go b/transport_protocol.go similarity index 53% rename from transport.go rename to transport_protocol.go index 470f7d7c..66398a0f 100644 --- a/transport.go +++ b/transport_protocol.go @@ -1,23 +1,28 @@ package gortsplib // Transport is a RTSP transport protocol. -type Transport int +// +// Deprecated: renamed into TransportProtocol. +type Transport = TransportProtocol + +// TransportProtocol is a RTSP transport protocol. +type TransportProtocol int // transport protocols. const ( - TransportUDP Transport = iota + TransportUDP TransportProtocol = iota TransportUDPMulticast TransportTCP ) -var transportLabels = map[Transport]string{ +var transportLabels = map[TransportProtocol]string{ TransportUDP: "UDP", TransportUDPMulticast: "UDP-multicast", TransportTCP: "TCP", } // String implements fmt.Stringer. -func (t Transport) String() string { +func (t TransportProtocol) String() string { if l, ok := transportLabels[t]; ok { return l } diff --git a/transport_test.go b/transport_protocol_test.go similarity index 72% rename from transport_test.go rename to transport_protocol_test.go index 23263a64..8082a51d 100644 --- a/transport_test.go +++ b/transport_protocol_test.go @@ -6,10 +6,10 @@ import ( "github.com/stretchr/testify/require" ) -func TestTransportString(t *testing.T) { +func TestTransportProtocolString(t *testing.T) { tr := TransportUDPMulticast require.NotEqual(t, "unknown", tr.String()) - tr = Transport(15) + tr = TransportProtocol(15) require.Equal(t, "unknown", tr.String()) }