diff --git a/client_publish_test.go b/client_publish_test.go index d51efc0a..859b2306 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -131,7 +131,7 @@ func TestClientPublishSerial(t *testing.T) { f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 0, f.Channel) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) } @@ -143,9 +143,8 @@ func TestClientPublishSerial(t *testing.T) { }) } else { err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: []byte{0x05, 0x06, 0x07, 0x08}, + Channel: 1, + Payload: []byte{0x05, 0x06, 0x07, 0x08}, }.Write(bconn.Writer) require.NoError(t, err) } @@ -735,7 +734,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 0, f.Channel) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) req, err = readRequest(bconn.Reader) @@ -841,13 +840,13 @@ func TestClientPublishRTCPReport(t *testing.T) { f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 0, f.Channel) rr.ProcessFrame(time.Now(), StreamTypeRTP, f.Payload) f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 1, f.Channel) pkt, err := rtcp.Unmarshal(f.Payload) require.NoError(t, err) sr, ok := pkt[0].(*rtcp.SenderReport) @@ -862,16 +861,15 @@ func TestClientPublishRTCPReport(t *testing.T) { rr.ProcessFrame(time.Now(), StreamTypeRTCP, f.Payload) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: rr.Report(time.Now()), + Channel: 1, + Payload: rr.Report(time.Now()), }.Write(bconn.Writer) require.NoError(t, err) f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 0, f.Channel) req, err = readRequest(bconn.Reader) require.NoError(t, err) diff --git a/client_read_test.go b/client_read_test.go index d3e63d17..28c54a00 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -346,9 +346,8 @@ func TestClientRead(t *testing.T) { case "tcp", "tls": err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 0, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) } @@ -372,8 +371,7 @@ func TestClientRead(t *testing.T) { f.Payload = make([]byte, 2048) err := f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, 0, f.TrackID) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 1, f.Channel) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) close(frameRecv) } @@ -520,9 +518,8 @@ func TestClientReadPartial(t *testing.T) { require.NoError(t, err) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 0, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) @@ -907,9 +904,8 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte("\x00\x00\x00\x00"), + Channel: 0, + Payload: []byte("\x00\x00\x00\x00"), }.Write(bconn.Writer) require.NoError(t, err) }() @@ -1101,9 +1097,8 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte("\x00\x00\x00\x00"), + Channel: 0, + Payload: []byte("\x00\x00\x00\x00"), }.Write(bconn.Writer) req, err = readRequest(bconn.Reader) @@ -1315,9 +1310,8 @@ func TestClientReadPause(t *testing.T) { }) } else { base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte("\x00\x00\x00\x00"), + Channel: 0, + Payload: []byte("\x00\x00\x00\x00"), }.Write(bconn.Writer) } @@ -1617,17 +1611,15 @@ func TestClientReadRTCPReport(t *testing.T) { Payload: []byte{0x01, 0x02, 0x03, 0x04}, }).Marshal() err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: byts, + Channel: 0, + Payload: byts, }.Write(bconn.Writer) require.NoError(t, err) rs.ProcessFrame(time.Now(), StreamTypeRTP, byts) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: rs.Report(time.Now()), + Channel: 1, + Payload: rs.Report(time.Now()), }.Write(bconn.Writer) require.NoError(t, err) @@ -1635,7 +1627,7 @@ func TestClientReadRTCPReport(t *testing.T) { f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 1, f.Channel) pkt, err := rtcp.Unmarshal(f.Payload) require.NoError(t, err) rr, ok := pkt[0].(*rtcp.ReceiverReport) @@ -1654,9 +1646,8 @@ func TestClientReadRTCPReport(t *testing.T) { }, rr) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: byts, + Channel: 0, + Payload: byts, }.Write(bconn.Writer) require.NoError(t, err) }() @@ -1940,16 +1931,14 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { require.NoError(t, err) err = base.InterleavedFrame{ - TrackID: 3, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 6, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte{0x05, 0x06, 0x07, 0x08}, + Channel: 0, + Payload: []byte{0x05, 0x06, 0x07, 0x08}, }.Write(bconn.Writer) require.NoError(t, err) diff --git a/clientconn.go b/clientconn.go index 50801b79..48198868 100644 --- a/clientconn.go +++ b/clientconn.go @@ -31,6 +31,20 @@ const ( clientConnUDPKeepalivePeriod = 30 * time.Second ) +func clientChannelToTrackID(channel int) (int, StreamType) { + if (channel % 2) == 0 { + return channel / 2, StreamTypeRTP + } + return (channel - 1) / 2, StreamTypeRTCP +} + +func clientTrackIDToChannel(trackID int, streamType StreamType) int { + if streamType == StreamTypeRTP { + return trackID * 2 + } + return (trackID * 2) + 1 +} + func isErrNOUDPPacketsReceivedRecently(err error) bool { _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently) return ok @@ -617,15 +631,17 @@ func (cc *ClientConn) runBackgroundPlayTCP() error { return } - track, ok := cc.tracks[frame.TrackID] + trackID, streamType := clientChannelToTrackID(frame.Channel) + + track, ok := cc.tracks[trackID] if !ok { continue } now := time.Now() atomic.StoreInt64(&lastFrameTime, now.Unix()) - track.rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload) - cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload) + track.rtcpReceiver.ProcessFrame(now, streamType, frame.Payload) + cc.pullReadCB()(trackID, streamType, frame.Payload) } }() @@ -736,7 +752,9 @@ func (cc *ClientConn) runBackgroundRecordTCP() error { return } - cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload) + trackID, streamType := clientChannelToTrackID(frame.Channel) + + cc.pullReadCB()(trackID, streamType, frame.Payload) } }() @@ -1347,8 +1365,7 @@ func (cc *ClientConn) doSetup( return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{} } - if thRes.InterleavedIDs[0] != th.InterleavedIDs[0] || - thRes.InterleavedIDs[1] != th.InterleavedIDs[1] { + if *thRes.InterleavedIDs != *th.InterleavedIDs { return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{ Expected: *th.InterleavedIDs, Value: *thRes.InterleavedIDs, } @@ -1665,11 +1682,12 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b cc.tcpWriteMutex.Lock() defer cc.tcpWriteMutex.Unlock() + channel := clientTrackIDToChannel(trackID, streamType) + cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout)) return base.InterleavedFrame{ - TrackID: trackID, - StreamType: streamType, - Payload: payload, + Channel: channel, + Payload: payload, }.Write(cc.bw) } } diff --git a/pkg/base/interleavedframe.go b/pkg/base/interleavedframe.go index 8002eae9..613d4312 100644 --- a/pkg/base/interleavedframe.go +++ b/pkg/base/interleavedframe.go @@ -60,11 +60,8 @@ func ReadInterleavedFrameOrResponse(frame *InterleavedFrame, res *Response, br * // InterleavedFrame is an interleaved frame, and allows to transfer binary data // within RTSP/TCP connections. It is used to send and receive RTP and RTCP packets with TCP. type InterleavedFrame struct { - // track id - TrackID int - - // stream type - StreamType StreamType + // channel id + Channel int // frame payload Payload []byte @@ -88,15 +85,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error { framelen, len(f.Payload)) } - // convert channel into TrackID and StreamType - channel := header[1] - f.TrackID, f.StreamType = func() (int, StreamType) { - if (channel % 2) == 0 { - return int(channel / 2), StreamTypeRTP - } - return int((channel - 1) / 2), StreamTypeRTCP - }() - + f.Channel = int(header[1]) f.Payload = f.Payload[:framelen] _, err = io.ReadFull(br, f.Payload) @@ -108,15 +97,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error { // Write writes an InterleavedFrame into a buffered writer. func (f InterleavedFrame) Write(bw *bufio.Writer) error { - // convert TrackID and StreamType into channel - channel := func() uint8 { - if f.StreamType == StreamTypeRTP { - return uint8(f.TrackID * 2) - } - return uint8((f.TrackID * 2) + 1) - }() - - buf := []byte{0x24, channel, 0x00, 0x00} + buf := []byte{0x24, byte(f.Channel), 0x00, 0x00} binary.BigEndian.PutUint16(buf[2:], uint16(len(f.Payload))) _, err := bw.Write(buf) diff --git a/pkg/base/interleavedframe_test.go b/pkg/base/interleavedframe_test.go index 4c82501e..ffe0d3a4 100644 --- a/pkg/base/interleavedframe_test.go +++ b/pkg/base/interleavedframe_test.go @@ -17,18 +17,16 @@ var casesInterleavedFrame = []struct { name: "rtp", enc: []byte{0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4}, dec: InterleavedFrame{ - TrackID: 3, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 6, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }, }, { name: "rtcp", enc: []byte{0x24, 0xd, 0x0, 0x4, 0x5, 0x6, 0x7, 0x8}, dec: InterleavedFrame{ - TrackID: 6, - StreamType: StreamTypeRTCP, - Payload: []byte{0x05, 0x06, 0x07, 0x08}, + Channel: 13, + Payload: []byte{0x05, 0x06, 0x07, 0x08}, }, }, } @@ -113,9 +111,8 @@ func TestInterleavedFrameWriteErrors(t *testing.T) { t.Run(ca.name, func(t *testing.T) { bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1) err := InterleavedFrame{ - TrackID: 3, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 3, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bw) require.Equal(t, "capacity reached", err.Error()) }) diff --git a/pkg/liberrors/server.go b/pkg/liberrors/server.go index f0724d08..b870209a 100644 --- a/pkg/liberrors/server.go +++ b/pkg/liberrors/server.go @@ -168,14 +168,11 @@ func (e ErrServerTransportHeaderNoInterleavedIDs) Error() string { } // ErrServerTransportHeaderInvalidInterleavedIDs is an error that can be returned by a server. -type ErrServerTransportHeaderInvalidInterleavedIDs struct { - Expected [2]int - Value [2]int -} +type ErrServerTransportHeaderInvalidInterleavedIDs struct{} // Error implements the error interface. func (e ErrServerTransportHeaderInvalidInterleavedIDs) Error() string { - return fmt.Sprintf("invalid interleaved IDs, expected %v, got %v", e.Expected, e.Value) + return "invalid interleaved IDs" } // ErrServerTracksDifferentProtocols is an error that can be returned by a server. diff --git a/server_publish_test.go b/server_publish_test.go index d5fa9541..b7cae9e4 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -850,16 +850,14 @@ func TestServerPublish(t *testing.T) { } else { err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 0, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: []byte{0x05, 0x06, 0x07, 0x08}, + Channel: 1, + Payload: []byte{0x05, 0x06, 0x07, 0x08}, }.Write(bconn.Writer) require.NoError(t, err) } @@ -881,7 +879,7 @@ func TestServerPublish(t *testing.T) { f.Payload = make([]byte, 2048) err := f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 1, f.Channel) require.Equal(t, []byte{0x09, 0x0A, 0x0B, 0x0C}, f.Payload) } @@ -1003,9 +1001,8 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 0, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) } @@ -1116,9 +1113,8 @@ func TestServerPublishRTCPReport(t *testing.T) { Payload: []byte{0x01, 0x02, 0x03, 0x04}, }).Marshal() err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: byts, + Channel: 0, + Payload: byts, }.Write(bconn.Writer) require.NoError(t, err) @@ -1126,7 +1122,7 @@ func TestServerPublishRTCPReport(t *testing.T) { f.Payload = make([]byte, 2048) f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 1, f.Channel) pkt, err := rtcp.Unmarshal(f.Payload) require.NoError(t, err) rr, ok := pkt[0].(*rtcp.ReceiverReport) @@ -1145,9 +1141,8 @@ func TestServerPublishRTCPReport(t *testing.T) { }, rr) err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTP, - Payload: byts, + Channel: 0, + Payload: byts, }.Write(bconn.Writer) require.NoError(t, err) } diff --git a/server_read_test.go b/server_read_test.go index ca84d326..644d4198 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -413,7 +413,7 @@ func TestServerRead(t *testing.T) { v := base.StreamDeliveryUnicast inTH.Delivery = &v inTH.Protocol = base.StreamProtocolTCP - inTH.InterleavedIDs = &[2]int{0, 1} + inTH.InterleavedIDs = &[2]int{4, 5} } res, err := writeReqReadRes(bconn, base.Request{ @@ -509,15 +509,13 @@ func TestServerRead(t *testing.T) { f.Payload = make([]byte, 2048) err := f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, 0, f.TrackID) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 4, f.Channel) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, 0, f.TrackID) - require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, 5, f.Channel) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) } @@ -539,9 +537,8 @@ func TestServerRead(t *testing.T) { default: err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Channel: 5, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) <-framesReceived @@ -1372,8 +1369,7 @@ func TestServerReadNonSetuppedPath(t *testing.T) { f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) - require.Equal(t, 0, f.TrackID) - require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, 0, f.Channel) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) } diff --git a/serverconn.go b/serverconn.go index f484372e..ae14ad8a 100644 --- a/serverconn.go +++ b/serverconn.go @@ -152,18 +152,25 @@ func (sc *ServerConn) run() { switch what.(type) { case *base.InterleavedFrame: + channel := frame.Channel + streamType := base.StreamTypeRTP + if (channel % 2) != 0 { + channel-- + streamType = base.StreamTypeRTCP + } + // forward frame only if it has been set up - if _, ok := sc.tcpSession.setuppedTracks[frame.TrackID]; ok { + if trackID, ok := sc.tcpSession.setuppedTracksByChannel[channel]; ok { if sc.tcpFrameIsRecording { - sc.tcpSession.announcedTracks[frame.TrackID].rtcpReceiver.ProcessFrame(time.Now(), - frame.StreamType, frame.Payload) + sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessFrame( + time.Now(), streamType, frame.Payload) } if h, ok := sc.s.Handler.(ServerHandlerOnFrame); ok { h.OnFrame(&ServerHandlerOnFrameCtx{ Session: sc.tcpSession, - TrackID: frame.TrackID, - StreamType: frame.StreamType, + TrackID: trackID, + StreamType: streamType, Payload: frame.Payload, }) } diff --git a/serversession.go b/serversession.go index 4ac81356..e8eae2ec 100644 --- a/serversession.go +++ b/serversession.go @@ -106,6 +106,7 @@ func (s ServerSessionState) String() string { // ServerSessionSetuppedTrack is a setupped track of a ServerSession. type ServerSessionSetuppedTrack struct { + tcpChannel int udpRTPPort int udpRTCPPort int } @@ -122,23 +123,24 @@ type ServerSession struct { id string author *ServerConn - ctx context.Context - ctxCancel func() - conns map[*ServerConn]struct{} - state ServerSessionState - setuppedTracks map[int]ServerSessionSetuppedTrack - setuppedProtocol *base.StreamProtocol - setuppedDelivery *base.StreamDelivery - setuppedBaseURL *base.URL // publish - setuppedStream *ServerStream // read - setuppedPath *string - setuppedQuery *string - lastRequestTime time.Time - tcpConn *ServerConn // tcp - udpIP net.IP // udp - udpZone string // udp - announcedTracks []ServerSessionAnnouncedTrack // publish - udpLastFrameTime *int64 // publish, udp + ctx context.Context + ctxCancel func() + conns map[*ServerConn]struct{} + state ServerSessionState + setuppedTracks map[int]ServerSessionSetuppedTrack + setuppedTracksByChannel map[int]int // tcp + setuppedProtocol *base.StreamProtocol + setuppedDelivery *base.StreamDelivery + setuppedBaseURL *base.URL // publish + setuppedStream *ServerStream // read + setuppedPath *string + setuppedQuery *string + lastRequestTime time.Time + tcpConn *ServerConn // tcp + udpIP net.IP // udp + udpZone string // udp + announcedTracks []ServerSessionAnnouncedTrack // publish + udpLastFrameTime *int64 // publish, udp // in request chan sessionRequestReq @@ -608,13 +610,17 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }, liberrors.ErrServerTransportHeaderNoInterleavedIDs{} } - if inTH.InterleavedIDs[0] != (trackID*2) || - inTH.InterleavedIDs[1] != (1+trackID*2) { + if (inTH.InterleavedIDs[0]+1) != inTH.InterleavedIDs[1] || + (inTH.InterleavedIDs[0]%2) != 0 { return &base.Response{ - StatusCode: base.StatusBadRequest, - }, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{ - Expected: [2]int{(trackID * 2), (1 + trackID*2)}, Value: *inTH.InterleavedIDs, - } + StatusCode: base.StatusBadRequest, + }, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{} + } + + if _, ok := ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]]; ok { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, liberrors.ErrServerTransportHeaderInvalidInterleavedIDs{} } } @@ -700,7 +706,15 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()} default: // TCP - ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{} + ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{ + tcpChannel: inTH.InterleavedIDs[0], + } + + if ss.setuppedTracksByChannel == nil { + ss.setuppedTracksByChannel = make(map[int]int) + } + + ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]] = trackID th.Protocol = base.StreamProtocolTCP de := base.StreamDeliveryUnicast @@ -1013,7 +1027,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // WriteFrame writes a frame to the session. func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload []byte) { - if _, ok := ss.SetuppedTracks()[trackID]; !ok { + if _, ok := ss.setuppedTracks[trackID]; !ok { return } @@ -1036,10 +1050,14 @@ func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload } } } else { + channel := ss.setuppedTracks[trackID].tcpChannel + if streamType == base.StreamTypeRTCP { + channel++ + } + ss.tcpConn.tcpFrameWriteBuffer.Push(&base.InterleavedFrame{ - TrackID: trackID, - StreamType: streamType, - Payload: payload, + Channel: channel, + Payload: payload, }) } }