server: support setupping tracks with arbitrary interleaved IDs (#47)

This commit is contained in:
aler9
2021-06-26 12:51:45 +02:00
parent 8062cfdf42
commit a512762ba0
10 changed files with 146 additions and 150 deletions

View File

@@ -131,7 +131,7 @@ func TestClientPublishSerial(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 0, f.Channel)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload)
}
@@ -143,8 +143,7 @@ func TestClientPublishSerial(t *testing.T) {
})
} else {
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTCP,
Channel: 1,
Payload: []byte{0x05, 0x06, 0x07, 0x08},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -735,7 +734,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 0, f.Channel)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload)
req, err = readRequest(bconn.Reader)
@@ -841,13 +840,13 @@ func TestClientPublishRTCPReport(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 0, f.Channel)
rr.ProcessFrame(time.Now(), StreamTypeRTP, f.Payload)
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 1, f.Channel)
pkt, err := rtcp.Unmarshal(f.Payload)
require.NoError(t, err)
sr, ok := pkt[0].(*rtcp.SenderReport)
@@ -862,8 +861,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
rr.ProcessFrame(time.Now(), StreamTypeRTCP, f.Payload)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTCP,
Channel: 1,
Payload: rr.Report(time.Now()),
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -871,7 +869,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 0, f.Channel)
req, err = readRequest(bconn.Reader)
require.NoError(t, err)

View File

@@ -346,8 +346,7 @@ func TestClientRead(t *testing.T) {
case "tcp", "tls":
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -372,8 +371,7 @@ func TestClientRead(t *testing.T) {
f.Payload = make([]byte, 2048)
err := f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, 0, f.TrackID)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 1, f.Channel)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload)
close(frameRecv)
}
@@ -520,8 +518,7 @@ func TestClientReadPartial(t *testing.T) {
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -907,8 +904,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1101,8 +1097,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
@@ -1315,8 +1310,7 @@ func TestClientReadPause(t *testing.T) {
})
} else {
base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
}
@@ -1617,16 +1611,14 @@ func TestClientReadRTCPReport(t *testing.T) {
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal()
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: byts,
}.Write(bconn.Writer)
require.NoError(t, err)
rs.ProcessFrame(time.Now(), StreamTypeRTP, byts)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTCP,
Channel: 1,
Payload: rs.Report(time.Now()),
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1635,7 +1627,7 @@ func TestClientReadRTCPReport(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 1, f.Channel)
pkt, err := rtcp.Unmarshal(f.Payload)
require.NoError(t, err)
rr, ok := pkt[0].(*rtcp.ReceiverReport)
@@ -1654,8 +1646,7 @@ func TestClientReadRTCPReport(t *testing.T) {
}, rr)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: byts,
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1940,15 +1931,13 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 3,
StreamType: StreamTypeRTP,
Channel: 6,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte{0x05, 0x06, 0x07, 0x08},
}.Write(bconn.Writer)
require.NoError(t, err)

View File

@@ -31,6 +31,20 @@ const (
clientConnUDPKeepalivePeriod = 30 * time.Second
)
func clientChannelToTrackID(channel int) (int, StreamType) {
if (channel % 2) == 0 {
return channel / 2, StreamTypeRTP
}
return (channel - 1) / 2, StreamTypeRTCP
}
func clientTrackIDToChannel(trackID int, streamType StreamType) int {
if streamType == StreamTypeRTP {
return trackID * 2
}
return (trackID * 2) + 1
}
func isErrNOUDPPacketsReceivedRecently(err error) bool {
_, ok := err.(liberrors.ErrClientNoUDPPacketsRecently)
return ok
@@ -617,15 +631,17 @@ func (cc *ClientConn) runBackgroundPlayTCP() error {
return
}
track, ok := cc.tracks[frame.TrackID]
trackID, streamType := clientChannelToTrackID(frame.Channel)
track, ok := cc.tracks[trackID]
if !ok {
continue
}
now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
track.rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload)
cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload)
track.rtcpReceiver.ProcessFrame(now, streamType, frame.Payload)
cc.pullReadCB()(trackID, streamType, frame.Payload)
}
}()
@@ -736,7 +752,9 @@ func (cc *ClientConn) runBackgroundRecordTCP() error {
return
}
cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload)
trackID, streamType := clientChannelToTrackID(frame.Channel)
cc.pullReadCB()(trackID, streamType, frame.Payload)
}
}()
@@ -1347,8 +1365,7 @@ func (cc *ClientConn) doSetup(
return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{}
}
if thRes.InterleavedIDs[0] != th.InterleavedIDs[0] ||
thRes.InterleavedIDs[1] != th.InterleavedIDs[1] {
if *thRes.InterleavedIDs != *th.InterleavedIDs {
return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{
Expected: *th.InterleavedIDs, Value: *thRes.InterleavedIDs,
}
@@ -1665,10 +1682,11 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b
cc.tcpWriteMutex.Lock()
defer cc.tcpWriteMutex.Unlock()
channel := clientTrackIDToChannel(trackID, streamType)
cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout))
return base.InterleavedFrame{
TrackID: trackID,
StreamType: streamType,
Channel: channel,
Payload: payload,
}.Write(cc.bw)
}

View File

@@ -60,11 +60,8 @@ func ReadInterleavedFrameOrResponse(frame *InterleavedFrame, res *Response, br *
// InterleavedFrame is an interleaved frame, and allows to transfer binary data
// within RTSP/TCP connections. It is used to send and receive RTP and RTCP packets with TCP.
type InterleavedFrame struct {
// track id
TrackID int
// stream type
StreamType StreamType
// channel id
Channel int
// frame payload
Payload []byte
@@ -88,15 +85,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error {
framelen, len(f.Payload))
}
// convert channel into TrackID and StreamType
channel := header[1]
f.TrackID, f.StreamType = func() (int, StreamType) {
if (channel % 2) == 0 {
return int(channel / 2), StreamTypeRTP
}
return int((channel - 1) / 2), StreamTypeRTCP
}()
f.Channel = int(header[1])
f.Payload = f.Payload[:framelen]
_, err = io.ReadFull(br, f.Payload)
@@ -108,15 +97,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error {
// Write writes an InterleavedFrame into a buffered writer.
func (f InterleavedFrame) Write(bw *bufio.Writer) error {
// convert TrackID and StreamType into channel
channel := func() uint8 {
if f.StreamType == StreamTypeRTP {
return uint8(f.TrackID * 2)
}
return uint8((f.TrackID * 2) + 1)
}()
buf := []byte{0x24, channel, 0x00, 0x00}
buf := []byte{0x24, byte(f.Channel), 0x00, 0x00}
binary.BigEndian.PutUint16(buf[2:], uint16(len(f.Payload)))
_, err := bw.Write(buf)

View File

@@ -17,8 +17,7 @@ var casesInterleavedFrame = []struct {
name: "rtp",
enc: []byte{0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4},
dec: InterleavedFrame{
TrackID: 3,
StreamType: StreamTypeRTP,
Channel: 6,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
},
@@ -26,8 +25,7 @@ var casesInterleavedFrame = []struct {
name: "rtcp",
enc: []byte{0x24, 0xd, 0x0, 0x4, 0x5, 0x6, 0x7, 0x8},
dec: InterleavedFrame{
TrackID: 6,
StreamType: StreamTypeRTCP,
Channel: 13,
Payload: []byte{0x05, 0x06, 0x07, 0x08},
},
},
@@ -113,8 +111,7 @@ func TestInterleavedFrameWriteErrors(t *testing.T) {
t.Run(ca.name, func(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1)
err := InterleavedFrame{
TrackID: 3,
StreamType: StreamTypeRTP,
Channel: 3,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bw)
require.Equal(t, "capacity reached", err.Error())

View File

@@ -168,14 +168,11 @@ func (e ErrServerTransportHeaderNoInterleavedIDs) Error() string {
}
// ErrServerTransportHeaderInvalidInterleavedIDs is an error that can be returned by a server.
type ErrServerTransportHeaderInvalidInterleavedIDs struct {
Expected [2]int
Value [2]int
}
type ErrServerTransportHeaderInvalidInterleavedIDs struct{}
// Error implements the error interface.
func (e ErrServerTransportHeaderInvalidInterleavedIDs) Error() string {
return fmt.Sprintf("invalid interleaved IDs, expected %v, got %v", e.Expected, e.Value)
return "invalid interleaved IDs"
}
// ErrServerTracksDifferentProtocols is an error that can be returned by a server.

View File

@@ -850,15 +850,13 @@ func TestServerPublish(t *testing.T) {
} else {
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTCP,
Channel: 1,
Payload: []byte{0x05, 0x06, 0x07, 0x08},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -881,7 +879,7 @@ func TestServerPublish(t *testing.T) {
f.Payload = make([]byte, 2048)
err := f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 1, f.Channel)
require.Equal(t, []byte{0x09, 0x0A, 0x0B, 0x0C}, f.Payload)
}
@@ -1003,8 +1001,7 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) {
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1116,8 +1113,7 @@ func TestServerPublishRTCPReport(t *testing.T) {
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal()
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: byts,
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1126,7 +1122,7 @@ func TestServerPublishRTCPReport(t *testing.T) {
f.Payload = make([]byte, 2048)
f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 1, f.Channel)
pkt, err := rtcp.Unmarshal(f.Payload)
require.NoError(t, err)
rr, ok := pkt[0].(*rtcp.ReceiverReport)
@@ -1145,8 +1141,7 @@ func TestServerPublishRTCPReport(t *testing.T) {
}, rr)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Channel: 0,
Payload: byts,
}.Write(bconn.Writer)
require.NoError(t, err)

View File

@@ -413,7 +413,7 @@ func TestServerRead(t *testing.T) {
v := base.StreamDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = base.StreamProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
inTH.InterleavedIDs = &[2]int{4, 5}
}
res, err := writeReqReadRes(bconn, base.Request{
@@ -509,15 +509,13 @@ func TestServerRead(t *testing.T) {
f.Payload = make([]byte, 2048)
err := f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, 0, f.TrackID)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 4, f.Channel)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload)
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, 0, f.TrackID)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, 5, f.Channel)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload)
}
@@ -539,8 +537,7 @@ func TestServerRead(t *testing.T) {
default:
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTCP,
Channel: 5,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bconn.Writer)
require.NoError(t, err)
@@ -1372,8 +1369,7 @@ func TestServerReadNonSetuppedPath(t *testing.T) {
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, 0, f.TrackID)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, 0, f.Channel)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload)
}

View File

@@ -152,18 +152,25 @@ func (sc *ServerConn) run() {
switch what.(type) {
case *base.InterleavedFrame:
channel := frame.Channel
streamType := base.StreamTypeRTP
if (channel % 2) != 0 {
channel--
streamType = base.StreamTypeRTCP
}
// forward frame only if it has been set up
if _, ok := sc.tcpSession.setuppedTracks[frame.TrackID]; ok {
if trackID, ok := sc.tcpSession.setuppedTracksByChannel[channel]; ok {
if sc.tcpFrameIsRecording {
sc.tcpSession.announcedTracks[frame.TrackID].rtcpReceiver.ProcessFrame(time.Now(),
frame.StreamType, frame.Payload)
sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessFrame(
time.Now(), streamType, frame.Payload)
}
if h, ok := sc.s.Handler.(ServerHandlerOnFrame); ok {
h.OnFrame(&ServerHandlerOnFrameCtx{
Session: sc.tcpSession,
TrackID: frame.TrackID,
StreamType: frame.StreamType,
TrackID: trackID,
StreamType: streamType,
Payload: frame.Payload,
})
}

View File

@@ -106,6 +106,7 @@ func (s ServerSessionState) String() string {
// ServerSessionSetuppedTrack is a setupped track of a ServerSession.
type ServerSessionSetuppedTrack struct {
tcpChannel int
udpRTPPort int
udpRTCPPort int
}
@@ -127,6 +128,7 @@ type ServerSession struct {
conns map[*ServerConn]struct{}
state ServerSessionState
setuppedTracks map[int]ServerSessionSetuppedTrack
setuppedTracksByChannel map[int]int // tcp
setuppedProtocol *base.StreamProtocol
setuppedDelivery *base.StreamDelivery
setuppedBaseURL *base.URL // publish
@@ -608,13 +610,17 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}, liberrors.ErrServerTransportHeaderNoInterleavedIDs{}
}
if inTH.InterleavedIDs[0] != (trackID*2) ||
inTH.InterleavedIDs[1] != (1+trackID*2) {
if (inTH.InterleavedIDs[0]+1) != inTH.InterleavedIDs[1] ||
(inTH.InterleavedIDs[0]%2) != 0 {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{
Expected: [2]int{(trackID * 2), (1 + trackID*2)}, Value: *inTH.InterleavedIDs,
}, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{}
}
if _, ok := ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]]; ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{}
}
}
@@ -700,7 +706,15 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()}
default: // TCP
ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{}
ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{
tcpChannel: inTH.InterleavedIDs[0],
}
if ss.setuppedTracksByChannel == nil {
ss.setuppedTracksByChannel = make(map[int]int)
}
ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]] = trackID
th.Protocol = base.StreamProtocolTCP
de := base.StreamDeliveryUnicast
@@ -1013,7 +1027,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
// WriteFrame writes a frame to the session.
func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload []byte) {
if _, ok := ss.SetuppedTracks()[trackID]; !ok {
if _, ok := ss.setuppedTracks[trackID]; !ok {
return
}
@@ -1036,9 +1050,13 @@ func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload
}
}
} else {
channel := ss.setuppedTracks[trackID].tcpChannel
if streamType == base.StreamTypeRTCP {
channel++
}
ss.tcpConn.tcpFrameWriteBuffer.Push(&base.InterleavedFrame{
TrackID: trackID,
StreamType: streamType,
Channel: channel,
Payload: payload,
})
}