diff --git a/server_publish_test.go b/server_publish_test.go index cd888b1d..0b7181c0 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -638,6 +638,10 @@ func TestServerPublish(t *testing.T) { }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { + // send RTCP packets directly to the session. + // these are sent after the response, only if onRecord returns StatusOK. + ctx.Session.WritePacketRTCP(0, &testRTCPPacket) + return &base.Response{ StatusCode: base.StatusOK, }, nil @@ -766,6 +770,28 @@ func TestServerPublish(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + // server -> client (direct) + if transport == "udp" { + buf := make([]byte, 2048) + n, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, testRTCPPacketMarshaled, buf[:n]) + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err := f.Read(br) + require.NoError(t, err) + require.Equal(t, 1, f.Channel) + require.Equal(t, testRTCPPacketMarshaled, f.Payload) + } + + // skip firewall opening + if transport == "udp" { + buf := make([]byte, 2048) + _, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + } + // client -> server if transport == "udp" { time.Sleep(1 * time.Second) @@ -799,12 +825,7 @@ func TestServerPublish(t *testing.T) { // server -> client (RTCP) if transport == "udp" { - // skip firewall opening buf := make([]byte, 2048) - _, _, err := l2.ReadFrom(buf) - require.NoError(t, err) - - buf = make([]byte, 2048) n, _, err := l2.ReadFrom(buf) require.NoError(t, err) require.Equal(t, testRTCPPacketMarshaled, buf[:n]) diff --git a/server_read_test.go b/server_read_test.go index e4f5b2a4..060f322d 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -287,10 +287,19 @@ func TestServerRead(t *testing.T) { }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + // send RTCP packets directly to the session. + // these are sent after the response, only if onPlay returns StatusOK. + if transport != "multicast" { + ctx.Session.WritePacketRTCP(0, &testRTCPPacket) + } + + // the session is added to the stream only after onPlay returns + // with StatusOK; therefore we must wait before calling + // ServerStream.WritePacket*() go func() { time.Sleep(1 * time.Second) - stream.WritePacketRTP(0, &testRTPPacket) stream.WritePacketRTCP(0, &testRTCPPacket) + stream.WritePacketRTP(0, &testRTPPacket) }() return &base.Response{ @@ -461,37 +470,70 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - // server -> client + // skip firewall opening + if transport == "udp" { + buf := make([]byte, 2048) + _, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + } + + // server -> client (direct) + switch transport { + case "udp": + buf := make([]byte, 2048) + n, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, testRTCPPacketMarshaled, buf[:n]) + + case "tcp", "tls": + var f base.InterleavedFrame + + f.Payload = make([]byte, 2048) + err := f.Read(br) + require.NoError(t, err) + + switch f.Channel { + case 4: + require.Equal(t, testRTPPacketMarshaled, f.Payload) + + case 5: + require.Equal(t, testRTCPPacketMarshaled, f.Payload) + + default: + t.Errorf("should not happen") + } + } + + // server -> client (through stream) if transport == "udp" || transport == "multicast" { buf := make([]byte, 2048) n, _, err := l1.ReadFrom(buf) require.NoError(t, err) require.Equal(t, testRTPPacketMarshaled, buf[:n]) - // skip firewall opening - if transport == "udp" { - buf := make([]byte, 2048) - _, _, err := l2.ReadFrom(buf) - require.NoError(t, err) - } - buf = make([]byte, 2048) n, _, err = l2.ReadFrom(buf) require.NoError(t, err) require.Equal(t, testRTCPPacketMarshaled, buf[:n]) } else { var f base.InterleavedFrame - f.Payload = make([]byte, 2048) - err := f.Read(br) - require.NoError(t, err) - require.Equal(t, 4, f.Channel) - require.Equal(t, testRTPPacketMarshaled, f.Payload) - f.Payload = make([]byte, 2048) - err = f.Read(br) - require.NoError(t, err) - require.Equal(t, 5, f.Channel) - require.Equal(t, testRTCPPacketMarshaled, f.Payload) + for i := 0; i < 2; i++ { + f.Payload = make([]byte, 2048) + err := f.Read(br) + require.NoError(t, err) + + switch f.Channel { + case 4: + require.Equal(t, testRTPPacketMarshaled, f.Payload) + + case 5: + require.Equal(t, testRTCPPacketMarshaled, f.Payload) + + default: + t.Errorf("should not happen") + } + } } // client -> server (RTCP) diff --git a/serversession.go b/serversession.go index 8d7c5f4c..ec0f7f5b 100644 --- a/serversession.go +++ b/serversession.go @@ -826,6 +826,14 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }, liberrors.ErrServerPathHasChanged{Prev: *ss.setuppedPath, Cur: path} } + // allocate writeBuffer before calling OnPlay(). + // in this way it's possible to call ServerSession.WritePacket*() + // inside the callback. + if ss.state != ServerSessionStatePlay && + *ss.setuppedTransport != TransportUDPMulticast { + ss.writeBuffer = ringbuffer.New(uint64(ss.s.ReadBufferCount)) + } + res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{ Session: ss, Conn: sc, @@ -835,7 +843,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }) if res.StatusCode != base.StatusOK { - if ss.State() == ServerSessionStatePrePlay { + if ss.state != ServerSessionStatePlay { ss.writeBuffer = nil } return res, err @@ -851,13 +859,12 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base case TransportUDP: ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) - ss.writeBuffer = ringbuffer.New(uint64(ss.s.ReadBufferCount)) ss.writerRunning = true ss.writerDone = make(chan struct{}) go ss.runWriter() for trackID, track := range ss.setuppedTracks { - // readers can send RTCP packets + // readers can send RTCP packets only sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false) // open the firewall by sending packets to the counterpart @@ -876,10 +883,11 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.tcpConn.readFunc = ss.tcpConn.readFuncTCP err = errSwitchReadFunc - ss.writeBuffer = ringbuffer.New(uint64(ss.s.ReadBufferCount)) - // runWriter() is called by conn after sending the response + // runWriter() is called by ServerConn after the response has been sent } + ss.setuppedStream.readerSetActive(ss) + // add RTP-Info var trackIDs []int for trackID := range ss.setuppedTracks { @@ -917,8 +925,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base res.Header["RTP-Info"] = ri.Write() } - ss.setuppedStream.readerSetActive(ss) - return res, err case base.Record: @@ -955,6 +961,14 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }, liberrors.ErrServerPathHasChanged{Prev: *ss.setuppedPath, Cur: path} } + // allocate writeBuffer before calling OnRecord(). + // in this way it's possible to call ServerSession.WritePacket*() + // inside the callback. + // when recording, writeBuffer is only used to send RTCP receiver reports, + // that are much smaller than RTP packets and are sent at a fixed interval. + // decrease RAM consumption by allocating less buffers. + ss.writeBuffer = ringbuffer.New(uint64(8)) + res, err := ss.s.Handler.(ServerHandlerOnRecord).OnRecord(&ServerHandlerOnRecordCtx{ Session: ss, Conn: sc, @@ -964,6 +978,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }) if res.StatusCode != base.StatusOK { + ss.writeBuffer = nil return res, err } @@ -974,10 +989,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod) - // when recording, writeBuffer is only used to send RTCP receiver reports, - // that are much smaller than RTP packets and are sent at a fixed interval. - // decrease RAM consumption by allocating less buffers. - ss.writeBuffer = ringbuffer.New(uint64(8)) ss.writerRunning = true ss.writerDone = make(chan struct{}) go ss.runWriter() @@ -1000,10 +1011,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.tcpConn.readFunc = ss.tcpConn.readFuncTCP err = errSwitchReadFunc - // when recording, writeBuffer is only used to send RTCP receiver reports, - // that are much smaller than RTP packets and are sent at a fixed interval. - // decrease RAM consumption by allocating less buffers. - ss.writeBuffer = ringbuffer.New(uint64(8)) // runWriter() is called by conn after sending the response } diff --git a/serverstream.go b/serverstream.go index 44fd592d..9aae499e 100644 --- a/serverstream.go +++ b/serverstream.go @@ -224,10 +224,8 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) { atomic.StoreUint32(&track.lastSequenceNumber, uint32(pkt.Header.SequenceNumber)) - atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp) atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix()) - atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC) st.mutex.RLock()