diff --git a/connclientdial_test.go b/connclientdial_test.go index 1c52dede..7e5ae567 100644 --- a/connclientdial_test.go +++ b/connclientdial_test.go @@ -130,141 +130,197 @@ func TestConnClientDialReadTCP(t *testing.T) { } func TestConnClientDialPublishUDP(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) - require.NoError(t, err) - defer cnt1.close() + for _, server := range []string{ + "rtsp-simple-server", + "ffmpeg", + } { + t.Run(server, func(t *testing.T) { + switch server { + case "rtsp-simple-server": + cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) + require.NoError(t, err) + defer cnt1.close() - time.Sleep(1 * time.Second) + default: + cnt0, err := newContainer("rtsp-simple-server", "server0", []string{}) + require.NoError(t, err) + defer cnt0.close() - publishDone := make(chan struct{}) - defer func() { <-publishDone }() + cnt1, err := newContainer("ffmpeg", "server", []string{ + "-fflags nobuffer -re -rtsp_flags listen -i rtsp://localhost:8555/teststream -c copy -f rtsp rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + } - var conn *ConnClient - defer func() { - conn.Close() - }() + time.Sleep(1 * time.Second) - go func() { - defer close(publishDone) + publishDone := make(chan struct{}) + defer func() { <-publishDone }() - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() + var conn *ConnClient + defer func() { + conn.Close() + }() - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + go func() { + defer close(publishDone) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + + port := "8554" + if server == "ffmpeg" { + port = "8555" + } + conn, err = DialPublish("rtsp://localhost:"+port+"/teststream", + StreamProtocolUDP, Tracks{track}) + require.NoError(t, err) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) + if err != nil { + break + } + } + }() + + if server == "ffmpeg" { + time.Sleep(5 * time.Second) + } + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoderFromPacketConn(pc) - sps, pps, err := decoder.ReadSPSPPS() - require.NoError(t, err) - - track, err := NewTrackH264(0, sps, pps) - require.NoError(t, err) - - conn, err = DialPublish("rtsp://localhost:8554/teststream", - StreamProtocolUDP, Tracks{track}) - require.NoError(t, err) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } - - err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) - if err != nil { - break - } - } - }() - - time.Sleep(1 * time.Second) - - cnt3, err := newContainer("ffmpeg", "read", []string{ - "-rtsp_transport", "udp", - "-i", "rtsp://localhost:8554/teststream", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", - }) - require.NoError(t, err) - defer cnt3.close() - - code := cnt3.wait() - require.Equal(t, 0, code) + } } func TestConnClientDialPublishTCP(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) - require.NoError(t, err) - defer cnt1.close() + for _, server := range []string{ + "rtsp-simple-server", + "ffmpeg", + } { + t.Run(server, func(t *testing.T) { + switch server { + case "rtsp-simple-server": + cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) + require.NoError(t, err) + defer cnt1.close() - time.Sleep(1 * time.Second) + default: + cnt0, err := newContainer("rtsp-simple-server", "server0", []string{}) + require.NoError(t, err) + defer cnt0.close() - publishDone := make(chan struct{}) - defer func() { <-publishDone }() + cnt1, err := newContainer("ffmpeg", "server", []string{ + "-fflags nobuffer -re -rtsp_flags listen -i rtsp://localhost:8555/teststream -c copy -f rtsp rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + } - var conn *ConnClient - defer func() { - conn.Close() - }() + time.Sleep(1 * time.Second) - go func() { - defer close(publishDone) + publishDone := make(chan struct{}) + defer func() { <-publishDone }() - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() + var conn *ConnClient + defer func() { + conn.Close() + }() - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + go func() { + defer close(publishDone) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + + port := "8554" + if server == "ffmpeg" { + port = "8555" + } + conn, err = DialPublish("rtsp://localhost:"+port+"/teststream", + StreamProtocolTCP, Tracks{track}) + require.NoError(t, err) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameTCP(track.Id, StreamTypeRtp, buf[:n]) + if err != nil { + break + } + } + }() + + if server == "ffmpeg" { + time.Sleep(5 * time.Second) + } + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoderFromPacketConn(pc) - sps, pps, err := decoder.ReadSPSPPS() - require.NoError(t, err) - - track, err := NewTrackH264(0, sps, pps) - require.NoError(t, err) - - conn, err = DialPublish("rtsp://localhost:8554/teststream", - StreamProtocolTCP, Tracks{track}) - require.NoError(t, err) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } - - err = conn.WriteFrameTCP(track.Id, StreamTypeRtp, buf[:n]) - if err != nil { - break - } - } - }() - - time.Sleep(1 * time.Second) - - cnt3, err := newContainer("ffmpeg", "read", []string{ - "-rtsp_transport", "udp", - "-i", "rtsp://localhost:8554/teststream", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", - }) - require.NoError(t, err) - defer cnt3.close() - - code := cnt3.wait() - require.Equal(t, 0, code) + } }