From e711b2925ff217332a78bf42d69df7447f4a67fe Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 7 Nov 2020 16:35:05 +0100 Subject: [PATCH] fix client-publish-udp example --- connclient.go | 9 ++++--- dialer_test.go | 46 +++++++++++++++++++++------------- examples/client-publish-udp.go | 37 ++++++++++++++++++--------- 3 files changed, 59 insertions(+), 33 deletions(-) diff --git a/connclient.go b/connclient.go index 1565d39c..e652546f 100644 --- a/connclient.go +++ b/connclient.go @@ -622,18 +622,18 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S } // Play writes a PLAY request and reads a Response. -// This function can be called only after SetupUDP() or SetupTCP(). +// This can be called only after Setup(). func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } if c.streamUrl == nil { - return nil, fmt.Errorf("can be called only after a successful SetupUDP() or SetupTCP()") + return nil, fmt.Errorf("can be called only after a successful Setup()") } if *u != *c.streamUrl { - return nil, fmt.Errorf("must be called with the same url used for SetupUDP() or SetupTCP()") + return nil, fmt.Errorf("must be called with the same url used for Setup())") } res, err := c.Do(&base.Request{ @@ -768,7 +768,7 @@ func (c *ConnClient) LoopUDP() error { // Announce writes an ANNOUNCE request and reads a Response. func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { if c.streamUrl != nil { - return nil, fmt.Errorf("announce has already been sent with another url url") + return nil, fmt.Errorf("announce has already been sent with another url") } res, err := c.Do(&base.Request{ @@ -793,6 +793,7 @@ func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error } // Record writes a RECORD request and reads a Response. +// This can be called only after Announce() and Setup(). func (c *ConnClient) Record(u *base.URL) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") diff --git a/dialer_test.go b/dialer_test.go index 4fae3275..ca062dc3 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -204,14 +204,17 @@ func TestDialPublishUDP(t *testing.T) { defer func() { <-publishDone }() var conn *ConnClient - defer func() { - conn.Close() - conn.LoopUDP() - }() go func() { defer close(publishDone) + var writerDone chan struct{} + defer func() { + if writerDone != nil { + <-writerDone + } + }() + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") require.NoError(t, err) defer pc.Close() @@ -238,18 +241,26 @@ func TestDialPublishUDP(t *testing.T) { StreamProtocolUDP, Tracks{track}) require.NoError(t, err) - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } + writerDone = make(chan struct{}) - err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) - if err != nil { - break + go func() { + defer close(writerDone) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) + if err != nil { + break + } } - } + }() + + conn.LoopUDP() }() if server == "ffmpeg" { @@ -269,6 +280,8 @@ func TestDialPublishUDP(t *testing.T) { code := cnt3.wait() require.Equal(t, 0, code) + + conn.Close() }) } } @@ -303,9 +316,6 @@ func TestDialPublishTCP(t *testing.T) { defer func() { <-publishDone }() var conn *ConnClient - defer func() { - conn.Close() - }() go func() { defer close(publishDone) @@ -367,6 +377,8 @@ func TestDialPublishTCP(t *testing.T) { code := cnt3.wait() require.Equal(t, 0, code) + + conn.Close() }) } } diff --git a/examples/client-publish-udp.go b/examples/client-publish-udp.go index 781f4a6f..335aaebb 100644 --- a/examples/client-publish-udp.go +++ b/examples/client-publish-udp.go @@ -15,6 +15,13 @@ import ( // the frames with the UDP protocol. func main() { + var writerDone chan struct{} + defer func() { + if writerDone != nil { + <-writerDone + } + }() + // open a listener to receive RTP/H264 frames pc, err := net.ListenPacket("udp4", "127.0.0.1:9000") if err != nil { @@ -48,20 +55,26 @@ func main() { } defer conn.Close() - buf := make([]byte, 2048) - for { - // read frames from the source - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } + writerDone = make(chan struct{}) - // write frames to the server - err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) - if err != nil { - break + go func() { + defer close(writerDone) + + buf := make([]byte, 2048) + for { + // read frames from the source + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + // write frames to the server + err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) + if err != nil { + break + } } - } + }() // wait until the connection is closed err = conn.LoopUDP()