client: support servers that change interleaved IDs (#72)

This commit is contained in:
aler9
2021-08-25 18:04:22 +02:00
parent 21617a343a
commit accfc7cd5d
3 changed files with 195 additions and 39 deletions

View File

@@ -1136,6 +1136,135 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
})
}
func TestClientReadDifferentInterleavedIDs(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
conn, err := l.Accept()
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
req, err := readRequest(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
req, err = readRequest(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
track1, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
tracks := cloneAndClearTracks(Tracks{track1})
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
"Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"},
},
Body: tracks.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
req, err = readRequest(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), req.URL)
var inTH headers.Transport
err = inTH.Read(req.Header["Transport"])
require.NoError(t, err)
th := headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Protocol: base.StreamProtocolTCP,
InterleavedIDs: &[2]int{2, 3},
}
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": th.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
req, err = readRequest(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
Channel: 2,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
req, err = readRequest(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
}()
c := &Client{
Protocol: func() *ClientProtocol {
v := ClientProtocolTCP
return &v
}(),
}
conn, err := c.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
frameRecv := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
require.Equal(t, 0, id)
close(frameRecv)
})
}()
<-frameRecv
conn.Close()
<-done
}
func TestClientReadRedirect(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)

View File

@@ -31,20 +31,6 @@ const (
clientConnUDPKeepalivePeriod = 30 * time.Second
)
func clientChannelToTrackID(channel int) (int, StreamType) {
if (channel % 2) == 0 {
return channel / 2, StreamTypeRTP
}
return (channel - 1) / 2, StreamTypeRTCP
}
func clientTrackIDToChannel(trackID int, streamType StreamType) int {
if streamType == StreamTypeRTP {
return trackID * 2
}
return (trackID * 2) + 1
}
func isErrNOUDPPacketsReceivedRecently(err error) bool {
_, ok := err.(liberrors.ErrClientNoUDPPacketsRecently)
return ok
@@ -68,6 +54,7 @@ type clientConnTrack struct {
track *Track
udpRTPListener *clientConnUDPListener
udpRTCPListener *clientConnUDPListener
tcpChannel int
rtcpReceiver *rtcpreceiver.RTCPReceiver
rtcpSender *rtcpsender.RTCPSender
}
@@ -151,6 +138,7 @@ type ClientConn struct {
streamBaseURL *base.URL
protocol *ClientProtocol
tracks map[int]clientConnTrack
tracksByChannel map[int]int
lastRange *headers.Range
backgroundRunning bool
backgroundErr error
@@ -226,7 +214,6 @@ func newClientConn(c *Client, scheme string, host string) (*ClientConn, error) {
host: host,
ctx: ctx,
ctxCancel: ctxCancel,
tracks: make(map[int]clientConnTrack),
options: make(chan optionsReq),
describe: make(chan describeReq),
announce: make(chan announceReq),
@@ -633,16 +620,21 @@ func (cc *ClientConn) runBackgroundPlayTCP() error {
return
}
trackID, streamType := clientChannelToTrackID(frame.Channel)
channel := frame.Channel
streamType := base.StreamTypeRTP
if (channel % 2) != 0 {
channel--
streamType = base.StreamTypeRTCP
}
track, ok := cc.tracks[trackID]
trackID, ok := cc.tracksByChannel[channel]
if !ok {
continue
}
now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
track.rtcpReceiver.ProcessFrame(now, streamType, frame.Payload)
cc.tracks[trackID].rtcpReceiver.ProcessFrame(now, streamType, frame.Payload)
cc.pullReadCB()(trackID, streamType, frame.Payload)
}
}()
@@ -754,7 +746,17 @@ func (cc *ClientConn) runBackgroundRecordTCP() error {
return
}
trackID, streamType := clientChannelToTrackID(frame.Channel)
channel := frame.Channel
streamType := base.StreamTypeRTP
if (channel % 2) != 0 {
channel--
streamType = base.StreamTypeRTCP
}
trackID, ok := cc.tracksByChannel[channel]
if !ok {
continue
}
cc.pullReadCB()(trackID, streamType, frame.Payload)
}
@@ -1367,10 +1369,15 @@ func (cc *ClientConn) doSetup(
return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{}
}
if *thRes.InterleavedIDs != *th.InterleavedIDs {
return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{
Expected: *th.InterleavedIDs, Value: *thRes.InterleavedIDs,
}
if (thRes.InterleavedIDs[0]%2) != 0 ||
(thRes.InterleavedIDs[0]+1) != thRes.InterleavedIDs[1] {
return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{}
}
if _, ok := cc.tracksByChannel[thRes.InterleavedIDs[0]]; ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrClientTransportHeaderInterleavedIDsAlreadyUsed{}
}
}
@@ -1379,6 +1386,17 @@ func (cc *ClientConn) doSetup(
track: track,
}
if mode == headers.TransportModePlay {
cc.state = clientConnStatePrePlay
cct.rtcpReceiver = rtcpreceiver.New(nil, clockRate)
} else {
cc.state = clientConnStatePreRecord
cct.rtcpSender = rtcpsender.New(clockRate)
}
cc.streamBaseURL = baseURL
cc.protocol = &proto
switch proto {
case ClientProtocolUDP:
rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP
@@ -1422,19 +1440,20 @@ func (cc *ClientConn) doSetup(
if cc.tcpFrameBuffer == nil {
cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize))
}
if cc.tracksByChannel == nil {
cc.tracksByChannel = make(map[int]int)
}
cc.tracksByChannel[thRes.InterleavedIDs[0]] = trackID
cct.tcpChannel = thRes.InterleavedIDs[0]
}
if mode == headers.TransportModePlay {
cc.state = clientConnStatePrePlay
cct.rtcpReceiver = rtcpreceiver.New(nil, clockRate)
} else {
cc.state = clientConnStatePreRecord
cct.rtcpSender = rtcpsender.New(clockRate)
if cc.tracks == nil {
cc.tracks = make(map[int]clientConnTrack)
}
cc.streamBaseURL = baseURL
cc.protocol = &proto
cc.tracks[trackID] = cct
return res, nil
@@ -1685,11 +1704,14 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b
return cc.tracks[trackID].udpRTCPListener.write(payload)
default: // TCP
channel := cc.tracks[trackID].tcpChannel
if streamType == base.StreamTypeRTCP {
channel++
}
cc.tcpWriteMutex.Lock()
defer cc.tcpWriteMutex.Unlock()
channel := clientTrackIDToChannel(trackID, streamType)
cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout))
return base.InterleavedFrame{
Channel: channel,

View File

@@ -148,14 +148,19 @@ func (e ErrClientTransportHeaderNoInterleavedIDs) Error() string {
}
// ErrClientTransportHeaderInvalidInterleavedIDs is an error that can be returned by a client.
type ErrClientTransportHeaderInvalidInterleavedIDs struct {
Expected [2]int
Value [2]int
}
type ErrClientTransportHeaderInvalidInterleavedIDs struct{}
// Error implements the error interface.
func (e ErrClientTransportHeaderInvalidInterleavedIDs) Error() string {
return fmt.Sprintf("invalid interleaved IDs, expected %v, got %v", e.Expected, e.Value)
return "invalid interleaved IDs"
}
// ErrClientTransportHeaderInterleavedIDsAlreadyUsed is an error that can be returned by a client.
type ErrClientTransportHeaderInterleavedIDsAlreadyUsed struct{}
// Error implements the error interface.
func (e ErrClientTransportHeaderInterleavedIDsAlreadyUsed) Error() string {
return "interleaved IDs already used"
}
// ErrClientNoUDPPacketsRecently is an error that can be returned by a client.