ConnClient: new methods DialRead and DialPublish

This commit is contained in:
aler9
2020-10-04 16:15:11 +02:00
parent a36d16b015
commit 973464ed1d
7 changed files with 216 additions and 281 deletions

View File

@@ -737,7 +737,15 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
// LoopUDP must be called after SetupUDP() and Play(); it keeps // LoopUDP must be called after SetupUDP() and Play(); it keeps
// the TCP connection open with keepalives, and returns when the TCP // the TCP connection open with keepalives, and returns when the TCP
// connection closes. // connection closes.
func (c *ConnClient) LoopUDP(u *url.URL) error { func (c *ConnClient) LoopUDP() error {
if c.state != connClientStateReading {
return fmt.Errorf("can be called only after a successful Play()")
}
if *c.streamProtocol != StreamProtocolUDP {
return fmt.Errorf("stream protocol is not UDP")
}
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
for { for {
@@ -767,8 +775,8 @@ func (c *ConnClient) LoopUDP(u *url.URL) error {
Method: OPTIONS, Method: OPTIONS,
Url: &url.URL{ Url: &url.URL{
Scheme: "rtsp", Scheme: "rtsp",
Host: u.Host, Host: c.streamUrl.Host,
User: u.User, User: c.streamUrl.User,
Path: "/", Path: "/",
}, },
SkipResponse: true, SkipResponse: true,

108
connclientdial.go Normal file
View File

@@ -0,0 +1,108 @@
package gortsplib
import (
"net/url"
)
// DialRead connects to the address and starts reading all tracks.
func DialRead(address string, proto StreamProtocol) (*ConnClient, Tracks, error) {
u, err := url.Parse(address)
if err != nil {
return nil, nil, err
}
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
if err != nil {
return nil, nil, err
}
_, err = conn.Options(u)
if err != nil {
conn.Close()
return nil, nil, err
}
tracks, _, err := conn.Describe(u)
if err != nil {
conn.Close()
return nil, nil, err
}
if proto == StreamProtocolUDP {
for _, track := range tracks {
_, err := conn.SetupUDP(u, SetupModePlay, track, 0, 0)
if err != nil {
return nil, nil, err
}
}
} else {
for _, track := range tracks {
_, err := conn.SetupTCP(u, SetupModePlay, track)
if err != nil {
conn.Close()
return nil, nil, err
}
}
}
_, err = conn.Play(u)
if err != nil {
conn.Close()
return nil, nil, err
}
return conn, tracks, nil
}
// DialPublish connects to the address and starts publishing the tracks.
func DialPublish(address string, proto StreamProtocol, tracks Tracks) (*ConnClient, error) {
u, err := url.Parse(address)
if err != nil {
return nil, err
}
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
if err != nil {
return nil, err
}
_, err = conn.Options(u)
if err != nil {
conn.Close()
return nil, err
}
_, err = conn.Announce(u, tracks)
if err != nil {
conn.Close()
return nil, err
}
if proto == StreamProtocolUDP {
for _, track := range tracks {
_, err = conn.SetupUDP(u, SetupModeRecord, track, 0, 0)
if err != nil {
conn.Close()
return nil, err
}
}
} else {
for _, track := range tracks {
_, err = conn.SetupTCP(u, SetupModeRecord, track)
if err != nil {
conn.Close()
return nil, err
}
}
}
_, err = conn.Record(u)
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}

View File

@@ -3,7 +3,6 @@ package gortsplib
import ( import (
"fmt" "fmt"
"net" "net"
"net/url"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@@ -59,108 +58,6 @@ func (c *container) wait() int {
return int(code) return int(code)
} }
func TestConnClientReadUDP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
defer conn.Close()
_, err = conn.Options(u)
require.NoError(t, err)
tracks, _, err := conn.Describe(u)
require.NoError(t, err)
for _, track := range tracks {
_, err := conn.SetupUDP(u, SetupModePlay, track, 0, 0)
require.NoError(t, err)
}
_, err = conn.Play(u)
require.NoError(t, err)
loopDone := make(chan struct{})
defer func() { <-loopDone }()
go func() {
defer close(loopDone)
conn.LoopUDP(u)
}()
_, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp)
require.NoError(t, err)
conn.CloseUDPListeners()
}
func TestConnClientReadTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
defer conn.Close()
_, err = conn.Options(u)
require.NoError(t, err)
tracks, _, err := conn.Describe(u)
require.NoError(t, err)
for _, track := range tracks {
_, err := conn.SetupTCP(u, SetupModePlay, track)
require.NoError(t, err)
}
_, err = conn.Play(u)
require.NoError(t, err)
_, err = conn.ReadFrameTCP()
require.NoError(t, err)
}
func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) { func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) {
var sps []byte var sps []byte
var pps []byte var pps []byte
@@ -201,22 +98,88 @@ func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) {
} }
} }
func TestConnClientPublishUDP(t *testing.T) { func TestConnClientDialReadUDP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream") cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
conn, tracks, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP)
require.NoError(t, err)
defer conn.Close()
loopDone := make(chan struct{})
defer func() { <-loopDone }()
go func() {
defer close(loopDone)
conn.LoopUDP()
}()
_, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp)
require.NoError(t, err) require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host}) conn.CloseUDPListeners()
}
func TestConnClientDialReadTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
conn, _, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolTCP)
require.NoError(t, err)
defer conn.Close()
_, err = conn.ReadFrameTCP()
require.NoError(t, err)
}
func TestConnClientDialPublishUDP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
publishDone := make(chan struct{}) publishDone := make(chan struct{})
defer func() { <-publishDone }() defer func() { <-publishDone }()
defer conn.Close()
var conn *ConnClient
defer func() {
conn.Close()
}()
go func() { go func() {
defer close(publishDone) defer close(publishDone)
@@ -235,19 +198,11 @@ func TestConnClientPublishUDP(t *testing.T) {
sps, pps, err := getH264SPSandPPS(pc) sps, pps, err := getH264SPSandPPS(pc)
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Options(u)
require.NoError(t, err)
track, err := NewTrackH264(0, sps, pps) track, err := NewTrackH264(0, sps, pps)
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Announce(u, Tracks{track}) conn, err = DialPublish("rtsp://localhost:8554/teststream",
require.NoError(t, err) StreamProtocolUDP, Tracks{track})
_, err = conn.SetupUDP(u, SetupModeRecord, track, 0, 0)
require.NoError(t, err)
_, err = conn.Record(u)
require.NoError(t, err) require.NoError(t, err)
buf := make([]byte, 2048) buf := make([]byte, 2048)
@@ -280,22 +235,20 @@ func TestConnClientPublishUDP(t *testing.T) {
require.Equal(t, 0, code) require.Equal(t, 0, code)
} }
func TestConnClientPublishTCP(t *testing.T) { func TestConnClientDialPublishTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
publishDone := make(chan struct{}) publishDone := make(chan struct{})
defer func() { <-publishDone }() defer func() { <-publishDone }()
defer conn.Close()
var conn *ConnClient
defer func() {
conn.Close()
}()
go func() { go func() {
defer close(publishDone) defer close(publishDone)
@@ -314,19 +267,11 @@ func TestConnClientPublishTCP(t *testing.T) {
sps, pps, err := getH264SPSandPPS(pc) sps, pps, err := getH264SPSandPPS(pc)
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Options(u)
require.NoError(t, err)
track, err := NewTrackH264(0, sps, pps) track, err := NewTrackH264(0, sps, pps)
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Announce(u, Tracks{track}) conn, err = DialPublish("rtsp://localhost:8554/teststream",
require.NoError(t, err) StreamProtocolTCP, Tracks{track})
_, err = conn.SetupTCP(u, SetupModeRecord, track)
require.NoError(t, err)
_, err = conn.Record(u)
require.NoError(t, err) require.NoError(t, err)
buf := make([]byte, 2048) buf := make([]byte, 2048)

View File

@@ -5,7 +5,6 @@ package main
import ( import (
"fmt" "fmt"
"net" "net"
"net/url"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -74,48 +73,19 @@ func main() {
} }
fmt.Println("stream connected") fmt.Println("stream connected")
// parse url
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil {
panic(err)
}
defer conn.Close()
// get allowed commands
_, err = conn.Options(u)
if err != nil {
panic(err)
}
// create a H264 track // create a H264 track
track, err := gortsplib.NewTrackH264(0, sps, pps) track, err := gortsplib.NewTrackH264(0, sps, pps)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// announce the track // connect to the server and start publishing the track
_, err = conn.Announce(u, gortsplib.Tracks{track}) conn, err := gortsplib.DialPublish("rtsp://localhost:8554/mystream",
if err != nil { gortsplib.StreamProtocolTCP, gortsplib.Tracks{track})
panic(err)
}
// setup the track with TCP
_, err = conn.SetupTCP(u, gortsplib.SetupModeRecord, track)
if err != nil {
panic(err)
}
// start publishing
_, err = conn.Record(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close()
buf := make([]byte, 2048) buf := make([]byte, 2048)
for { for {

View File

@@ -5,7 +5,6 @@ package main
import ( import (
"fmt" "fmt"
"net" "net"
"net/url"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -74,48 +73,19 @@ func main() {
} }
fmt.Println("stream connected") fmt.Println("stream connected")
// parse url
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil {
panic(err)
}
defer conn.Close()
// get allowed commands
_, err = conn.Options(u)
if err != nil {
panic(err)
}
// create a H264 track // create a H264 track
track, err := gortsplib.NewTrackH264(0, sps, pps) track, err := gortsplib.NewTrackH264(0, sps, pps)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// announce the track // connect to the server and start publishing the track
_, err = conn.Announce(u, gortsplib.Tracks{track}) conn, err := gortsplib.DialPublish("rtsp://localhost:8554/mystream",
if err != nil { gortsplib.StreamProtocolUDP, gortsplib.Tracks{track})
panic(err)
}
// setup the track with UDP
_, err = conn.SetupUDP(u, gortsplib.SetupModeRecord, track, 0, 0)
if err != nil {
panic(err)
}
// start publishing
_, err = conn.Record(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close()
buf := make([]byte, 2048) buf := make([]byte, 2048)
for { for {

View File

@@ -4,54 +4,21 @@ package main
import ( import (
"fmt" "fmt"
"net/url"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
// This example shows how to create a RTSP client, connect to a server, list // This example shows how to create a RTSP client, connect to a server and
// and read tracks with the TCP protocol. // read all tracks with the TCP protocol.
func main() { func main() {
// parse url // connect to the server and start reading all tracks
u, err := url.Parse("rtsp://localhost:8554/mystream") conn, _, err := gortsplib.DialRead("rtsp://localhost:8554/mystream", gortsplib.StreamProtocolTCP)
if err != nil {
panic(err)
}
// connect to the server
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close() defer conn.Close()
// get allowed commands
_, err = conn.Options(u)
if err != nil {
panic(err)
}
// list tracks published on the path
tracks, _, err := conn.Describe(u)
if err != nil {
panic(err)
}
// setup tracks with TCP
for _, track := range tracks {
_, err := conn.SetupTCP(u, gortsplib.SetupModePlay, track)
if err != nil {
panic(err)
}
}
// start reading
_, err = conn.Play(u)
if err != nil {
panic(err)
}
for { for {
// read frames // read frames
frame, err := conn.ReadFrameTCP() frame, err := conn.ReadFrameTCP()

View File

@@ -4,55 +4,22 @@ package main
import ( import (
"fmt" "fmt"
"net/url"
"sync" "sync"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
// This example shows how to create a RTSP client, connect to a server, list // This example shows how to create a RTSP client, connect to a server and
// and read tracks with the UDP protocol. // read all tracks with the UDP protocol.
func main() { func main() {
// parse url // connect to the server and start reading all tracks
u, err := url.Parse("rtsp://localhost:8554/mystream") conn, tracks, err := gortsplib.DialRead("rtsp://localhost:8554/mystream", gortsplib.StreamProtocolUDP)
if err != nil {
panic(err)
}
// connect to the server
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close() defer conn.Close()
// get allowed commands
_, err = conn.Options(u)
if err != nil {
panic(err)
}
// list tracks published on the path
tracks, _, err := conn.Describe(u)
if err != nil {
panic(err)
}
// setup tracks with UDP
for _, track := range tracks {
_, err := conn.SetupUDP(u, gortsplib.SetupModePlay, track, 0, 0)
if err != nil {
panic(err)
}
}
// start reading
_, err = conn.Play(u)
if err != nil {
panic(err)
}
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
defer conn.CloseUDPListeners() defer conn.CloseUDPListeners()
@@ -89,6 +56,6 @@ func main() {
}(track) }(track)
} }
err = conn.LoopUDP(u) err = conn.LoopUDP()
fmt.Println("connection is closed (%s)", err) fmt.Println("connection is closed (%s)", err)
} }