fix client-publish-udp example

This commit is contained in:
aler9
2020-11-07 16:35:05 +01:00
parent c7c3e00388
commit e711b2925f
3 changed files with 59 additions and 33 deletions

View File

@@ -622,18 +622,18 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
} }
// Play writes a PLAY request and reads a Response. // Play writes a PLAY request and reads a Response.
// This function can be called only after SetupUDP() or SetupTCP(). // This can be called only after Setup().
func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { func (c *ConnClient) Play(u *base.URL) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
if c.streamUrl == nil { if c.streamUrl == nil {
return nil, fmt.Errorf("can be called only after a successful SetupUDP() or SetupTCP()") return nil, fmt.Errorf("can be called only after a successful Setup()")
} }
if *u != *c.streamUrl { if *u != *c.streamUrl {
return nil, fmt.Errorf("must be called with the same url used for SetupUDP() or SetupTCP()") return nil, fmt.Errorf("must be called with the same url used for Setup())")
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
@@ -768,7 +768,7 @@ func (c *ConnClient) LoopUDP() error {
// Announce writes an ANNOUNCE request and reads a Response. // Announce writes an ANNOUNCE request and reads a Response.
func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) {
if c.streamUrl != nil { if c.streamUrl != nil {
return nil, fmt.Errorf("announce has already been sent with another url url") return nil, fmt.Errorf("announce has already been sent with another url")
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
@@ -793,6 +793,7 @@ func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error
} }
// Record writes a RECORD request and reads a Response. // Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ConnClient) Record(u *base.URL) (*base.Response, error) { func (c *ConnClient) Record(u *base.URL) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")

View File

@@ -204,14 +204,17 @@ func TestDialPublishUDP(t *testing.T) {
defer func() { <-publishDone }() defer func() { <-publishDone }()
var conn *ConnClient var conn *ConnClient
defer func() {
conn.Close()
conn.LoopUDP()
}()
go func() { go func() {
defer close(publishDone) defer close(publishDone)
var writerDone chan struct{}
defer func() {
if writerDone != nil {
<-writerDone
}
}()
pc, err := net.ListenPacket("udp4", "127.0.0.1:0") pc, err := net.ListenPacket("udp4", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
defer pc.Close() defer pc.Close()
@@ -238,18 +241,26 @@ func TestDialPublishUDP(t *testing.T) {
StreamProtocolUDP, Tracks{track}) StreamProtocolUDP, Tracks{track})
require.NoError(t, err) require.NoError(t, err)
buf := make([]byte, 2048) writerDone = make(chan struct{})
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) go func() {
if err != nil { defer close(writerDone)
break
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n])
if err != nil {
break
}
} }
} }()
conn.LoopUDP()
}() }()
if server == "ffmpeg" { if server == "ffmpeg" {
@@ -269,6 +280,8 @@ func TestDialPublishUDP(t *testing.T) {
code := cnt3.wait() code := cnt3.wait()
require.Equal(t, 0, code) require.Equal(t, 0, code)
conn.Close()
}) })
} }
} }
@@ -303,9 +316,6 @@ func TestDialPublishTCP(t *testing.T) {
defer func() { <-publishDone }() defer func() { <-publishDone }()
var conn *ConnClient var conn *ConnClient
defer func() {
conn.Close()
}()
go func() { go func() {
defer close(publishDone) defer close(publishDone)
@@ -367,6 +377,8 @@ func TestDialPublishTCP(t *testing.T) {
code := cnt3.wait() code := cnt3.wait()
require.Equal(t, 0, code) require.Equal(t, 0, code)
conn.Close()
}) })
} }
} }

View File

@@ -15,6 +15,13 @@ import (
// the frames with the UDP protocol. // the frames with the UDP protocol.
func main() { func main() {
var writerDone chan struct{}
defer func() {
if writerDone != nil {
<-writerDone
}
}()
// open a listener to receive RTP/H264 frames // open a listener to receive RTP/H264 frames
pc, err := net.ListenPacket("udp4", "127.0.0.1:9000") pc, err := net.ListenPacket("udp4", "127.0.0.1:9000")
if err != nil { if err != nil {
@@ -48,20 +55,26 @@ func main() {
} }
defer conn.Close() defer conn.Close()
buf := make([]byte, 2048) writerDone = make(chan struct{})
for {
// read frames from the source
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
// write frames to the server go func() {
err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) defer close(writerDone)
if err != nil {
break buf := make([]byte, 2048)
for {
// read frames from the source
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
// write frames to the server
err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n])
if err != nil {
break
}
} }
} }()
// wait until the connection is closed // wait until the connection is closed
err = conn.LoopUDP() err = conn.LoopUDP()