implement publishing (#4)

This commit is contained in:
aler9
2020-09-27 15:35:47 +02:00
parent 8f2ecffa3c
commit 0a70915c8c
16 changed files with 807 additions and 182 deletions

View File

@@ -9,12 +9,15 @@ RTSP 1.0 library for the Go programming language, written for [rtsp-simple-serve
Features:
* Read streams via TCP or UDP
* Publish streams via TCP or UDP
* Provides primitives, a class for building clients (`ConnClient`) and a class for building servers (`ConnServer`)
## Examples
* [read-tcp](examples/read-tcp.go)
* [read-udp](examples/read-udp.go)
* [publish-tcp](examples/publish-tcp.go)
* [publish-udp](examples/publish-udp.go)
## Documentation

View File

@@ -29,6 +29,14 @@ const (
clientUDPFrameReadBufferSize = 2048
)
type connClientState int
const (
connClientStateInitial connClientState = iota
connClientStateReading
connClientStatePublishing
)
// ConnClientConf allows to configure a ConnClient.
type ConnClientConf struct {
// target address in format hostname:port
@@ -66,6 +74,7 @@ type ConnClient struct {
session string
cseq int
auth *authClient
state connClientState
streamUrl *url.URL
streamProtocol *StreamProtocol
rtcpReceivers map[int]*RtcpReceiver
@@ -73,7 +82,6 @@ type ConnClient struct {
udpRtpListeners map[int]*connClientUDPListener
udpRtcpListeners map[int]*connClientUDPListener
tcpFrames *multiFrame
playing bool
receiverReportTerminate chan struct{}
receiverReportDone chan struct{}
@@ -116,7 +124,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
// Close closes all the ConnClient resources.
func (c *ConnClient) Close() error {
if c.playing {
if c.state == connClientStateReading {
c.Do(&Request{
Method: TEARDOWN,
Url: c.streamUrl,
@@ -142,50 +150,22 @@ func (c *ConnClient) Close() error {
return err
}
// CloseUDPListeners closes any open UDP listener.
func (c *ConnClient) CloseUDPListeners() {
for _, l := range c.udpRtpListeners {
l.close()
}
for _, l := range c.udpRtcpListeners {
l.close()
}
}
// NetConn returns the underlying net.Conn.
func (c *ConnClient) NetConn() net.Conn {
return c.nconn
}
// ReadFrame reads an InterleavedFrame.
func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
frame := c.tcpFrames.next()
err := frame.Read(c.br)
if err != nil {
return nil, err
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
return frame, nil
}
// ReadFrameUDP reads an UDP frame.
func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) {
if streamType == StreamTypeRtp {
buf, err := c.udpRtpListeners[track.Id].read()
if err != nil {
return nil, err
}
atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix())
c.rtcpReceivers[track.Id].OnFrame(streamType, buf)
return buf, nil
}
buf, err := c.udpRtcpListeners[track.Id].read()
if err != nil {
return nil, err
}
atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix())
c.rtcpReceivers[track.Id].OnFrame(streamType, buf)
return buf, nil
}
func (c *ConnClient) readFrameOrResponse() (interface{}, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
b, err := c.br.ReadByte()
@@ -206,6 +186,60 @@ func (c *ConnClient) readFrameOrResponse() (interface{}, error) {
return ReadResponse(c.br)
}
// ReadFrameTCP reads an InterleavedFrame.
// This can't be used when recording.
func (c *ConnClient) ReadFrameTCP() (*InterleavedFrame, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
frame := c.tcpFrames.next()
err := frame.Read(c.br)
if err != nil {
return nil, err
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
return frame, nil
}
// ReadFrameUDP reads an UDP frame.
func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) {
var buf []byte
var err error
if streamType == StreamTypeRtp {
buf, err = c.udpRtpListeners[track.Id].read()
if err != nil {
return nil, err
}
} else {
buf, err = c.udpRtcpListeners[track.Id].read()
if err != nil {
return nil, err
}
}
atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix())
c.rtcpReceivers[track.Id].OnFrame(streamType, buf)
return buf, nil
}
// WriteFrameTCP writes an interleaved frame.
// this can't be used when playing.
func (c *ConnClient) WriteFrameTCP(frame *InterleavedFrame) error {
c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.Write(c.bw)
}
// WriteFrameUDP writes an UDP frame.
func (c *ConnClient) WriteFrameUDP(track *Track, streamType StreamType, content []byte) error {
if streamType == StreamTypeRtp {
return c.udpRtpListeners[track.Id].write(content)
}
return c.udpRtcpListeners[track.Id].write(content)
}
// Do writes a Request and reads a Response.
func (c *ConnClient) Do(req *Request) (*Response, error) {
if req.Header == nil {
@@ -274,19 +308,12 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
return res, nil
}
// this can't be exported
// otherwise there's a race condition with the rtcp receiver report routine
func (c *ConnClient) writeFrame(frame *InterleavedFrame) error {
c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.Write(c.bw)
}
// Options writes an OPTIONS request and reads a response, that contains
// the methods allowed by the server. Since this method is not implemented by
// every RTSP server, the function does not fail if the returned code is StatusNotFound.
func (c *ConnClient) Options(u *url.URL) (*Response, error) {
if c.playing {
return nil, fmt.Errorf("can't be called when playing")
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
res, err := c.Do(&Request{
@@ -304,18 +331,16 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) {
}
if res.StatusCode != StatusOK && res.StatusCode != StatusNotFound {
return nil, fmt.Errorf("OPTIONS: bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
// Describe writes a DESCRIBE request, that means that we want to obtain the SDP
// document that describes the tracks available in the given URL. It then
// reads a Response.
// Describe writes a DESCRIBE request and reads a Response.
func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) {
if c.playing {
return nil, nil, fmt.Errorf("can't be called when playing")
if c.state != connClientStateInitial {
return nil, nil, fmt.Errorf("can't be called when reading or publishing")
}
res, err := c.Do(&Request{
@@ -330,16 +355,16 @@ func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) {
}
if res.StatusCode != StatusOK {
return nil, nil, fmt.Errorf("DESCRIBE: bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
contentType, ok := res.Header["Content-Type"]
if !ok || len(contentType) != 1 {
return nil, nil, fmt.Errorf("DESCRIBE: Content-Type not provided")
return nil, nil, fmt.Errorf("Content-Type not provided")
}
if contentType[0] != "application/sdp" {
return nil, nil, fmt.Errorf("DESCRIBE: wrong Content-Type, expected application/sdp")
return nil, nil, fmt.Errorf("wrong Content-Type, expected application/sdp")
}
tracks, err := ReadTracks(res.Content)
@@ -419,19 +444,18 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp
}
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("SETUP: bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
// SetupUDP writes a SETUP request, that means that we want to read
// a given track with the UDP transport. It then reads a Response.
// SetupUDP writes a SETUP request and reads a Response.
// If rtpPort and rtcpPort are zero, they will be chosen automatically.
func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort int,
rtcpPort int) (*Response, error) {
if c.playing {
return nil, fmt.Errorf("can't be called when playing")
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
if c.streamUrl != nil && *u != *c.streamUrl {
@@ -442,10 +466,6 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
return nil, fmt.Errorf("cannot setup tracks with different protocols")
}
if _, ok := c.rtcpReceivers[track.Id]; ok {
return nil, fmt.Errorf("track has already been setup")
}
if (rtpPort == 0 && rtcpPort != 0) ||
(rtpPort != 0 && rtcpPort == 0) {
return nil, fmt.Errorf("rtpPort and rtcpPort must be both zero or non-zero")
@@ -457,12 +477,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
rtpListener, rtcpListener, err := func() (*connClientUDPListener, *connClientUDPListener, error) {
if rtpPort != 0 {
rtpListener, err := newConnClientUDPListener(c, rtpPort)
rtpListener, err := newConnClientUDPListener(c.conf, rtpPort)
if err != nil {
return nil, nil, err
}
rtcpListener, err := newConnClientUDPListener(c, rtcpPort)
rtcpListener, err := newConnClientUDPListener(c.conf, rtcpPort)
if err != nil {
rtpListener.close()
return nil, nil, err
@@ -477,12 +497,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort = rtpPort + 1
rtpListener, err := newConnClientUDPListener(c, rtpPort)
rtpListener, err := newConnClientUDPListener(c.conf, rtpPort)
if err != nil {
continue
}
rtcpListener, err := newConnClientUDPListener(c, rtcpPort)
rtcpListener, err := newConnClientUDPListener(c.conf, rtcpPort)
if err != nil {
rtpListener.close()
continue
@@ -503,6 +523,15 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
return &ret
}(),
ClientPorts: &[2]int{rtpPort, rtcpPort},
Mode: func() *string {
var v string
if mode == SetupModeRecord {
v = "record"
} else {
v = "play"
}
return &v
}(),
})
if err != nil {
rtpListener.close()
@@ -514,39 +543,43 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
if err != nil {
rtpListener.close()
rtcpListener.close()
return nil, fmt.Errorf("SETUP: transport header: %s", err)
return nil, fmt.Errorf("transport header: %s", err)
}
if th.ServerPorts == nil {
rtpListener.close()
rtcpListener.close()
return nil, fmt.Errorf("SETUP: server ports not provided")
return nil, fmt.Errorf("server ports not provided")
}
c.streamUrl = u
streamProtocol := StreamProtocolUDP
c.streamProtocol = &streamProtocol
if mode == SetupModePlay {
c.rtcpReceivers[track.Id] = NewRtcpReceiver()
v := time.Now().Unix()
c.udpLastFrameTimes[track.Id] = &v
}
rtpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.publisherPort = (*th.ServerPorts)[0]
rtpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone
rtpListener.remotePort = (*th.ServerPorts)[0]
c.udpRtpListeners[track.Id] = rtpListener
rtcpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtcpListener.publisherPort = (*th.ServerPorts)[1]
rtcpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtcpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone
rtcpListener.remotePort = (*th.ServerPorts)[1]
c.udpRtcpListeners[track.Id] = rtcpListener
return res, nil
}
// SetupTCP writes a SETUP request, that means that we want to read
// a given track with the TCP transport. It then reads a Response.
func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) {
if c.playing {
return nil, fmt.Errorf("can't be called when playing")
// SetupTCP writes a SETUP request and reads a Response.
func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Response, error) {
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
if c.streamUrl != nil && *u != *c.streamUrl {
@@ -557,10 +590,6 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) {
return nil, fmt.Errorf("cannot setup tracks with different protocols")
}
if _, ok := c.rtcpReceivers[track.Id]; ok {
return nil, fmt.Errorf("track has already been setup")
}
interleavedIds := [2]int{(track.Id * 2), (track.Id * 2) + 1}
res, err := c.setup(u, track, &HeaderTransport{
Protocol: StreamProtocolTCP,
@@ -569,6 +598,15 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) {
return &ret
}(),
InterleavedIds: &interleavedIds,
Mode: func() *string {
var v string
if mode == SetupModeRecord {
v = "record"
} else {
v = "play"
}
return &v
}(),
})
if err != nil {
return nil, err
@@ -576,29 +614,32 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) {
th, err := ReadHeaderTransport(res.Header["Transport"])
if err != nil {
return nil, fmt.Errorf("SETUP: transport header: %s", err)
return nil, fmt.Errorf("transport header: %s", err)
}
if th.InterleavedIds == nil || (*th.InterleavedIds)[0] != interleavedIds[0] ||
if th.InterleavedIds == nil ||
(*th.InterleavedIds)[0] != interleavedIds[0] ||
(*th.InterleavedIds)[1] != interleavedIds[1] {
return nil, fmt.Errorf("SETUP: transport header does not have interleaved ids %v (%s)",
return nil, fmt.Errorf("transport header does not have interleaved ids %v (%s)",
interleavedIds, res.Header["Transport"])
}
c.streamUrl = u
streamProtocol := StreamProtocolTCP
c.streamProtocol = &streamProtocol
if mode == SetupModePlay {
c.rtcpReceivers[track.Id] = NewRtcpReceiver()
}
return res, nil
}
// Play writes a PLAY request, that means that we want to start the stream.
// It then reads a Response. This function can be called only after SetupUDP()
// or SetupTCP().
// Play writes a PLAY request and reads a Response
// This function can be called only after SetupUDP() or SetupTCP().
func (c *ConnClient) Play(u *url.URL) (*Response, error) {
if c.playing {
return nil, fmt.Errorf("can't be called when playing")
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
if c.streamUrl == nil {
@@ -655,26 +696,16 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.playing = true
c.state = connClientStateReading
// open the firewall by sending packets to every channel
if *c.streamProtocol == StreamProtocolUDP {
for trackId := range c.udpRtpListeners {
c.udpRtpListeners[trackId].pc.WriteTo(
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
&net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: c.udpRtpListeners[trackId].publisherPort,
})
c.udpRtpListeners[trackId].write(
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
c.udpRtcpListeners[trackId].pc.WriteTo(
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00},
&net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: c.udpRtcpListeners[trackId].publisherPort,
})
c.udpRtcpListeners[trackId].write(
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
}
@@ -696,14 +727,10 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
frame := c.rtcpReceivers[trackId].Report()
if *c.streamProtocol == StreamProtocolUDP {
c.udpRtcpListeners[trackId].pc.WriteTo(frame, &net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: c.udpRtcpListeners[trackId].publisherPort,
})
c.udpRtcpListeners[trackId].write(frame)
} else {
c.writeFrame(&InterleavedFrame{
c.WriteFrameTCP(&InterleavedFrame{
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: frame,
@@ -777,3 +804,57 @@ func (c *ConnClient) LoopUDP(u *url.URL) error {
}
}
}
// Announce writes an ANNOUNCE request and reads a Response.
func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) {
if c.streamUrl != nil {
fmt.Errorf("announce has already been sent with another url url")
}
res, err := c.Do(&Request{
Method: ANNOUNCE,
Url: u,
Header: Header{
"Content-Type": HeaderValue{"application/sdp"},
},
Content: tracks.Write(),
})
if err != nil {
return nil, err
}
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.streamUrl = u
return res, nil
}
// Record writes a RECORD request and reads a Response.
func (c *ConnClient) Record(u *url.URL) (*Response, error) {
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
if *u != *c.streamUrl {
return nil, fmt.Errorf("must be called with the same url used for Announce()")
}
res, err := c.Do(&Request{
Method: RECORD,
Url: u,
})
if err != nil {
return nil, err
}
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.state = connClientStatePublishing
return nil, nil
}

View File

@@ -1,6 +1,8 @@
package gortsplib
import (
"fmt"
"net"
"net/url"
"os"
"os/exec"
@@ -8,6 +10,7 @@ import (
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@@ -56,52 +59,6 @@ func (c *container) wait() int {
return int(code)
}
func TestConnClientReadTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
defer conn.Close()
_, err = conn.Options(u)
require.NoError(t, err)
tracks, _, err := conn.Describe(u)
require.NoError(t, err)
for _, track := range tracks {
_, err := conn.SetupTCP(u, track)
require.NoError(t, err)
}
_, err = conn.Play(u)
require.NoError(t, err)
_, err = conn.ReadFrame()
require.NoError(t, err)
}
func TestConnClientReadUDP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
@@ -137,15 +94,269 @@ func TestConnClientReadUDP(t *testing.T) {
require.NoError(t, err)
for _, track := range tracks {
_, err := conn.SetupUDP(u, track, 0, 0)
_, err := conn.SetupUDP(u, SetupModePlay, track, 0, 0)
require.NoError(t, err)
}
_, err = conn.Play(u)
require.NoError(t, err)
go conn.LoopUDP(u)
loopDone := make(chan struct{})
defer func() { <-loopDone }()
go func() {
defer close(loopDone)
conn.LoopUDP(u)
}()
_, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp)
require.NoError(t, err)
conn.CloseUDPListeners()
}
func TestConnClientReadTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
defer conn.Close()
_, err = conn.Options(u)
require.NoError(t, err)
tracks, _, err := conn.Describe(u)
require.NoError(t, err)
for _, track := range tracks {
_, err := conn.SetupTCP(u, SetupModePlay, track)
require.NoError(t, err)
}
_, err = conn.Play(u)
require.NoError(t, err)
_, err = conn.ReadFrameTCP()
require.NoError(t, err)
}
func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) {
var sps []byte
var pps []byte
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
return nil, nil, err
}
packet := &rtp.Packet{}
err = packet.Unmarshal(buf[:n])
if err != nil {
return nil, nil, err
}
// require h264
if packet.PayloadType != 96 {
return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96",
packet.PayloadType)
}
// switch by NALU type
switch packet.Payload[0] & 0x1F {
case 0x07: // sps
sps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
case 0x08: // pps
pps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
}
}
}
func TestConnClientPublishUDP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
publishDone := make(chan struct{})
defer func() { <-publishDone }()
defer conn.Close()
go func() {
defer close(publishDone)
pc, err := net.ListenPacket("udp4", "127.0.0.1:0")
require.NoError(t, err)
defer pc.Close()
cnt2, err := newContainer("gstreamer", "source", []string{
"filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" +
" ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10),
})
require.NoError(t, err)
defer cnt2.close()
sps, pps, err := getH264SPSandPPS(pc)
require.NoError(t, err)
_, err = conn.Options(u)
require.NoError(t, err)
track := NewTrackH264(0, sps, pps)
_, err = conn.Announce(u, Tracks{track})
require.NoError(t, err)
_, err = conn.SetupUDP(u, SetupModeRecord, track, 0, 0)
require.NoError(t, err)
_, err = conn.Record(u)
require.NoError(t, err)
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameUDP(track, StreamTypeRtp, buf[:n])
if err != nil {
break
}
}
}()
time.Sleep(1 * time.Second)
cnt3, err := newContainer("ffmpeg", "read", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://localhost:8554/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt3.close()
code := cnt3.wait()
require.Equal(t, 0, code)
}
func TestConnClientPublishTCP(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
u, err := url.Parse("rtsp://localhost:8554/teststream")
require.NoError(t, err)
conn, err := NewConnClient(ConnClientConf{Host: u.Host})
require.NoError(t, err)
publishDone := make(chan struct{})
defer func() { <-publishDone }()
defer conn.Close()
go func() {
defer close(publishDone)
pc, err := net.ListenPacket("udp4", "127.0.0.1:0")
require.NoError(t, err)
defer pc.Close()
cnt2, err := newContainer("gstreamer", "source", []string{
"filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" +
" ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10),
})
require.NoError(t, err)
defer cnt2.close()
sps, pps, err := getH264SPSandPPS(pc)
require.NoError(t, err)
_, err = conn.Options(u)
require.NoError(t, err)
track := NewTrackH264(0, sps, pps)
_, err = conn.Announce(u, Tracks{track})
require.NoError(t, err)
_, err = conn.SetupTCP(u, SetupModeRecord, track)
require.NoError(t, err)
_, err = conn.Record(u)
require.NoError(t, err)
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameTCP(&InterleavedFrame{
TrackId: track.Id,
StreamType: StreamTypeRtp,
Content: buf[:n],
})
if err != nil {
break
}
}
}()
time.Sleep(1 * time.Second)
cnt3, err := newContainer("ffmpeg", "read", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://localhost:8554/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt3.close()
code := cnt3.wait()
require.Equal(t, 0, code)
}

View File

@@ -7,20 +7,21 @@ import (
type connClientUDPListener struct {
pc net.PacketConn
publisherIp net.IP
publisherPort int
remoteIp net.IP
remoteZone string
remotePort int
udpFrameReadBuf *MultiBuffer
}
func newConnClientUDPListener(c *ConnClient, port int) (*connClientUDPListener, error) {
pc, err := c.conf.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10))
func newConnClientUDPListener(conf ConnClientConf, port int) (*connClientUDPListener, error) {
pc, err := conf.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10))
if err != nil {
return nil, err
}
return &connClientUDPListener{
pc: pc,
udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048),
udpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, 2048),
}, nil
}
@@ -38,10 +39,19 @@ func (l *connClientUDPListener) read() ([]byte, error) {
uaddr := addr.(*net.UDPAddr)
if !l.publisherIp.Equal(uaddr.IP) || l.publisherPort != uaddr.Port {
if !l.remoteIp.Equal(uaddr.IP) || l.remotePort != uaddr.Port {
continue
}
return buf[:n], nil
}
}
func (l *connClientUDPListener) write(buf []byte) error {
_, err := l.pc.WriteTo(buf, &net.UDPAddr{
IP: l.remoteIp,
Zone: l.remoteZone,
Port: l.remotePort,
})
return err
}

View File

@@ -106,7 +106,7 @@ func (s *ConnServer) WriteResponse(res *Response) error {
}
// WriteFrame writes an InterleavedFrame.
func (s *ConnServer) WriteFrame(frame *InterleavedFrame) error {
func (s *ConnServer) WriteFrameTCP(frame *InterleavedFrame) error {
s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
return frame.Write(s.bw)
}

121
examples/publish-tcp.go Normal file
View File

@@ -0,0 +1,121 @@
// +build ignore
package main
import (
"fmt"
"net"
"net/url"
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
)
func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) {
var sps []byte
var pps []byte
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
return nil, nil, err
}
packet := &rtp.Packet{}
err = packet.Unmarshal(buf[:n])
if err != nil {
return nil, nil, err
}
// require h264
if packet.PayloadType != 96 {
return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96",
packet.PayloadType)
}
// switch by NALU type
switch packet.Payload[0] & 0x1F {
case 0x07: // sps
sps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
case 0x08: // pps
pps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
}
}
}
func main() {
pc, err := net.ListenPacket("udp4", "127.0.0.1:9000")
if err != nil {
panic(err)
}
defer pc.Close()
fmt.Println("Waiting for a rtp/h264 stream on port 9000 - you can send one with gstreamer:\n" +
"gst-launch-1.0 filesrc location=video.mp4 ! qtdemux ! video/x-h264" +
" ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=9000")
sps, pps, err := getH264SPSandPPS(pc)
if err != nil {
panic(err)
}
fmt.Println("stream is ok")
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil {
panic(err)
}
defer conn.Close()
_, err = conn.Options(u)
if err != nil {
panic(err)
}
track := gortsplib.NewTrackH264(0, sps, pps)
_, err = conn.Announce(u, gortsplib.Tracks{track})
if err != nil {
panic(err)
}
_, err = conn.SetupTCP(u, gortsplib.SetupModeRecord, track)
if err != nil {
panic(err)
}
_, err = conn.Record(u)
if err != nil {
panic(err)
}
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameTCP(&gortsplib.InterleavedFrame{
TrackId: track.Id,
StreamType: gortsplib.StreamTypeRtp,
Content: buf[:n],
})
if err != nil {
break
}
}
}

117
examples/publish-udp.go Normal file
View File

@@ -0,0 +1,117 @@
// +build ignore
package main
import (
"fmt"
"net"
"net/url"
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
)
func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) {
var sps []byte
var pps []byte
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
return nil, nil, err
}
packet := &rtp.Packet{}
err = packet.Unmarshal(buf[:n])
if err != nil {
return nil, nil, err
}
// require h264
if packet.PayloadType != 96 {
return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96",
packet.PayloadType)
}
// switch by NALU type
switch packet.Payload[0] & 0x1F {
case 0x07: // sps
sps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
case 0x08: // pps
pps = append([]byte(nil), packet.Payload...)
if sps != nil && pps != nil {
return sps, pps, nil
}
}
}
}
func main() {
pc, err := net.ListenPacket("udp4", "127.0.0.1:9000")
if err != nil {
panic(err)
}
defer pc.Close()
fmt.Println("Waiting for a rtp/h264 stream on port 9000 - you can send one with gstreamer:\n" +
"gst-launch-1.0 filesrc location=video.mp4 ! qtdemux ! video/x-h264" +
" ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=9000")
sps, pps, err := getH264SPSandPPS(pc)
if err != nil {
panic(err)
}
fmt.Println("stream is ok")
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil {
panic(err)
}
defer conn.Close()
_, err = conn.Options(u)
if err != nil {
panic(err)
}
track := gortsplib.NewTrackH264(0, sps, pps)
_, err = conn.Announce(u, gortsplib.Tracks{track})
if err != nil {
panic(err)
}
_, err = conn.SetupUDP(u, gortsplib.SetupModeRecord, track, 0, 0)
if err != nil {
panic(err)
}
_, err = conn.Record(u)
if err != nil {
panic(err)
}
buf := make([]byte, 2048)
for {
n, _, err := pc.ReadFrom(buf)
if err != nil {
break
}
err = conn.WriteFrameUDP(track, gortsplib.StreamTypeRtp, buf[:n])
if err != nil {
break
}
}
}

View File

@@ -32,7 +32,7 @@ func main() {
}
for _, track := range tracks {
_, err := conn.SetupTCP(u, track)
_, err := conn.SetupTCP(u, gortsplib.SetupModePlay, track)
if err != nil {
panic(err)
}
@@ -44,9 +44,8 @@ func main() {
}
for {
frame, err := conn.ReadFrame()
frame, err := conn.ReadFrameTCP()
if err != nil {
conn.Close()
fmt.Println("connection is closed (%s)", err)
break
}

View File

@@ -33,7 +33,7 @@ func main() {
}
for _, track := range tracks {
_, err := conn.SetupUDP(u, track, 0, 0)
_, err := conn.SetupUDP(u, gortsplib.SetupModePlay, track, 0, 0)
if err != nil {
panic(err)
}
@@ -45,6 +45,8 @@ func main() {
}
var wg sync.WaitGroup
defer wg.Wait()
defer conn.CloseUDPListeners()
// read RTP frames
for _, track := range tracks {
@@ -83,7 +85,5 @@ func main() {
}
err = conn.LoopUDP(u)
conn.Close()
wg.Wait()
fmt.Println("connection is closed (%s)", err)
}

1
go.mod
View File

@@ -5,5 +5,6 @@ go 1.12
require (
github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625
github.com/pion/rtcp v1.2.3
github.com/pion/rtp v1.6.0
github.com/stretchr/testify v1.6.1
)

3
go.sum
View File

@@ -2,10 +2,13 @@ github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQT
github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk=
github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@@ -0,0 +1,16 @@
FROM amd64/alpine:3.12
RUN apk add --no-cache \
gstreamer-tools \
gst-plugins-good \
gst-plugins-bad \
&& apk add --no-cache \
-X http://dl-cdn.alpinelinux.org/alpine/edge/testing \
gst-rtsp-server
COPY emptyvideo.ts /
COPY start.sh /
RUN chmod +x /start.sh
ENTRYPOINT [ "/start.sh" ]

Binary file not shown.

View File

@@ -0,0 +1,3 @@
#!/bin/sh -e
exec gst-launch-1.0 $@

View File

@@ -1,7 +1,10 @@
package gortsplib
import (
"encoding/base64"
"encoding/hex"
"strconv"
"strings"
"github.com/aler9/sdp-dirty/v3"
)
@@ -15,6 +18,40 @@ type Track struct {
Media *sdp.MediaDescription
}
// NewTrackH264 initializes an H264 track.
func NewTrackH264(id int, sps []byte, pps []byte) *Track {
spropParameterSets := base64.StdEncoding.EncodeToString(sps) +
"," + base64.StdEncoding.EncodeToString(pps)
profileLevelId := strings.ToUpper(hex.EncodeToString(sps[1:4]))
return &Track{
Id: id,
Media: &sdp.MediaDescription{
MediaName: sdp.MediaName{
Media: "video",
Protos: []string{"RTP", "AVP"},
Formats: []string{"96"},
},
Attributes: []sdp.Attribute{
{
Key: "rtpmap",
Value: "96 H264/90000",
},
{
Key: "fmtp",
Value: "96 packetization-mode=1; " +
"sprop-parameter-sets=" + spropParameterSets + "; " +
"profile-level-id=" + profileLevelId,
},
{
Key: "control",
Value: "trackID=0",
},
},
},
}
}
// Tracks is a list of tracks.
type Tracks []*Track

View File

@@ -80,6 +80,29 @@ func (st StreamType) String() string {
return "unknown"
}
// SetupMode is a setup mode.
type SetupMode int
const (
// SetupModePlay is the "play" setup mode
SetupModePlay SetupMode = iota
// SetupModeRecord is the "record" setup mode
SetupModeRecord
)
// String implements fmt.Stringer
func (sm SetupMode) String() string {
switch sm {
case SetupModePlay:
return "play"
case SetupModeRecord:
return "record"
}
return "unknown"
}
func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) {
for i := 1; i <= n; i++ {
byts, err := rb.Peek(i)