diff --git a/connclient.go b/connclient.go index ec7b650b..a38c842f 100644 --- a/connclient.go +++ b/connclient.go @@ -454,26 +454,26 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S return rtpListener, rtcpListener, nil - } else { - for { - // choose two consecutive ports in range 65535-10000 - // rtp must be even and rtcp odd - rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 - rtcpPort = rtpPort + 1 + } - rtpListener, err := newConnClientUDPListener(c, rtpPort) - if err != nil { - continue - } + // choose two consecutive ports in range 65535-10000 + // rtp must be even and rtcp odd + for { + rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 + rtcpPort = rtpPort + 1 - rtcpListener, err := newConnClientUDPListener(c, rtcpPort) - if err != nil { - rtpListener.close() - continue - } - - return rtpListener, rtcpListener, nil + rtpListener, err := newConnClientUDPListener(c, rtpPort) + if err != nil { + continue } + + rtcpListener, err := newConnClientUDPListener(c, rtcpPort) + if err != nil { + rtpListener.close() + continue + } + + return rtpListener, rtcpListener, nil } }() if err != nil { diff --git a/dialer_test.go b/dialer_test.go index 1e1a9e36..a9f7df46 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -108,7 +108,7 @@ func TestDialRead(t *testing.T) { } } -func TestDialReadClose(t *testing.T) { +func TestDialReadParallel(t *testing.T) { for _, proto := range []string{ "udp", "tcp", @@ -252,6 +252,63 @@ func TestDialReadPause(t *testing.T) { } func TestDialPublish(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() + + conn, err := dialer.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + + buf := make([]byte, 2048) + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + require.NoError(t, err) + + conn.Close() + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + require.Error(t, err) + }) + } +} + +func TestDialPublishParallel(t *testing.T) { for _, ca := range []struct { proto string server string @@ -300,8 +357,8 @@ func TestDialPublish(t *testing.T) { track, err := NewTrackH264(0, sps, pps) require.NoError(t, err) - publishDone := make(chan struct{}) - defer func() { <-publishDone }() + writeDone := make(chan struct{}) + defer func() { <-writeDone }() var conn *ConnClient defer func() { conn.Close() }() @@ -314,7 +371,7 @@ func TestDialPublish(t *testing.T) { }() go func() { - defer close(publishDone) + defer close(writeDone) port := "8554" if ca.server == "ffmpeg" { @@ -360,71 +417,6 @@ func TestDialPublish(t *testing.T) { } } -func TestDialPublishClose(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoderFromPacketConn(pc) - sps, pps, err := decoder.ReadSPSPPS() - require.NoError(t, err) - - track, err := NewTrackH264(0, sps, pps) - require.NoError(t, err) - - dialer := func() Dialer { - if proto == "udp" { - return Dialer{} - } - return Dialer{StreamProtocol: StreamProtocolTCP} - }() - - conn, err := dialer.DialPublish("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - - writeDone := make(chan struct{}) - go func() { - defer close(writeDone) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - require.NoError(t, err) - - err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) - if err != nil { - break - } - } - }() - - time.Sleep(1 * time.Second) - - conn.Close() - <-writeDone - }) - } -} - func TestDialPublishPause(t *testing.T) { for _, proto := range []string{ "udp",