Add proxy example (#141) (#175)

* cleanup

* add proxy example
This commit is contained in:
Alessandro Ros
2023-01-23 11:23:16 +01:00
committed by GitHub
parent 82dac32f6e
commit 7b6a5f8cb3
16 changed files with 313 additions and 78 deletions

View File

@@ -93,6 +93,7 @@ Features:
* [server](examples/server/main.go) * [server](examples/server/main.go)
* [server-tls](examples/server-tls/main.go) * [server-tls](examples/server-tls/main.go)
* [server-h264-save-to-disk](examples/server-h264-save-to-disk/main.go) * [server-h264-save-to-disk](examples/server-h264-save-to-disk/main.go)
* [proxy](examples/proxy/main.go)
## API Documentation ## API Documentation

View File

@@ -32,27 +32,27 @@ func newClientFormat(cm *clientMedia, forma format.Format) *clientFormat {
} }
} }
func (ct *clientFormat) start(cm *clientMedia) { func (ct *clientFormat) start() {
if cm.c.state == clientStatePlay { if ct.cm.c.state == clientStatePlay {
if cm.udpRTPListener != nil { if ct.cm.udpRTPListener != nil {
ct.udpReorderer = rtpreorderer.New() ct.udpReorderer = rtpreorderer.New()
ct.udpRTCPReceiver = rtcpreceiver.New( ct.udpRTCPReceiver = rtcpreceiver.New(
cm.c.udpReceiverReportPeriod, ct.cm.c.udpReceiverReportPeriod,
nil, nil,
ct.format.ClockRate(), func(pkt rtcp.Packet) { ct.format.ClockRate(), func(pkt rtcp.Packet) {
cm.writePacketRTCP(pkt) ct.cm.writePacketRTCP(pkt)
}) })
} }
} else { } else {
ct.rtcpSender = rtcpsender.New( ct.rtcpSender = rtcpsender.New(
ct.format.ClockRate(), ct.format.ClockRate(),
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
cm.writePacketRTCP(pkt) ct.cm.writePacketRTCP(pkt)
}) })
} }
} }
// start RTCP senders after write() has been allocated in order to avoid a crash // start writing after write*() has been allocated in order to avoid a crash
func (ct *clientFormat) startWriting() { func (ct *clientFormat) startWriting() {
if ct.c.state != clientStatePlay && !ct.c.DisableRTCPSenderReports { if ct.c.state != clientStatePlay && !ct.c.DisableRTCPSenderReports {
ct.rtcpSender.Start(ct.c.senderReportPeriod) ct.rtcpSender.Start(ct.c.senderReportPeriod)

View File

@@ -121,7 +121,7 @@ func (cm *clientMedia) start() {
} }
for _, ct := range cm.formats { for _, ct := range cm.formats {
ct.start(cm) ct.start()
} }
if cm.udpRTPListener != nil { if cm.udpRTPListener != nil {

View File

@@ -11,8 +11,9 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
// This example shows how to connect to a RTSP server // This example shows how to
// and read all medias on a path. // 1. connect to a RTSP server
// 2. read all media streams on a path.
func main() { func main() {
c := gortsplib.Client{} c := gortsplib.Client{}

109
examples/proxy/client.go Normal file
View File

@@ -0,0 +1,109 @@
package main
import (
"log"
"sync"
"time"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/url"
"github.com/pion/rtp"
)
const (
existingStream = "rtsp://x.x.x.x:8554/mystream"
reconnectPause = 2 * time.Second
)
type client struct {
mutex sync.RWMutex
stream *gortsplib.ServerStream
}
func newClient() *client {
c := &client{}
// start a separated routine
go c.run()
return c
}
func (c *client) run() {
for {
err := c.read()
log.Printf("ERR: %s\n", err)
time.Sleep(reconnectPause)
}
}
func (c *client) getStream() *gortsplib.ServerStream {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.stream
}
func (c *client) read() error {
rc := gortsplib.Client{}
// parse URL
u, err := url.Parse(existingStream)
if err != nil {
return err
}
// connect to the server
err = rc.Start(u.Scheme, u.Host)
if err != nil {
return err
}
defer rc.Close()
// find published medias
medias, baseURL, _, err := rc.Describe(u)
if err != nil {
return err
}
// setup all medias
err = rc.SetupAll(medias, baseURL)
if err != nil {
return err
}
// create a server stream
stream := gortsplib.NewServerStream(medias)
defer stream.Close()
log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n")
// make stream available by using getStream()
c.mutex.Lock()
c.stream = stream
c.mutex.Unlock()
defer func() {
// remove stream from getStream()
c.mutex.Lock()
c.stream = nil
c.mutex.Unlock()
}()
// called when a RTP packet arrives
rc.OnPacketRTPAny(func(medi *media.Media, forma format.Format, pkt *rtp.Packet) {
// route incoming packets to the server stream
stream.WritePacketRTP(medi, pkt)
})
// start playing
_, err = rc.Play(nil)
if err != nil {
return err
}
// wait until a fatal error
return rc.Wait()
}

14
examples/proxy/main.go Normal file
View File

@@ -0,0 +1,14 @@
package main
// This example shows how to
// 1. read an existing stream from an external server or camera, with a client
// 2. create a server that allow to proxy that stream
func main() {
// allocate the client
c := newClient()
// allocate the server.
// give server access to the method client.getStream().
newServer(c.getStream)
}

100
examples/proxy/server.go Normal file
View File

@@ -0,0 +1,100 @@
package main
import (
"log"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/base"
)
type server struct {
getStream func() *gortsplib.ServerStream
}
func newServer(
getStream func() *gortsplib.ServerStream,
) *server {
s := &server{
getStream: getStream,
}
// configure the server
rs := &gortsplib.Server{
Handler: s,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
UDPRTCPAddress: ":8001",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8002,
MulticastRTCPPort: 8003,
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(rs.StartAndWait())
}
// called when a connection is opened.
func (s *server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("conn opened")
}
// called when a connection is closed.
func (s *server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("conn closed (%v)", ctx.Error)
}
// called when a session is opened.
func (s *server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
log.Printf("session opened")
}
// called when a session is closed.
func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
}
// called when receiving a DESCRIBE request.
func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("describe request")
stream := s.getStream()
// stream is not available yet
if stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
}
// called when receiving a SETUP request.
func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("setup request")
stream := s.getStream()
// stream is not available yet
if stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
}
// called when receiving a PLAY request.
func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("play request")
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}

View File

@@ -127,7 +127,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
} }
func main() { func main() {
// configure server // configure the server
s := &gortsplib.Server{ s := &gortsplib.Server{
Handler: &serverHandler{}, Handler: &serverHandler{},
RTSPAddress: ":8554", RTSPAddress: ":8554",

View File

@@ -145,7 +145,7 @@ func main() {
panic(err) panic(err)
} }
// configure server // configure the server
s := &gortsplib.Server{ s := &gortsplib.Server{
Handler: &serverHandler{}, Handler: &serverHandler{},
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},

View File

@@ -136,7 +136,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
} }
func main() { func main() {
// configure server // configure the server
s := &gortsplib.Server{ s := &gortsplib.Server{
Handler: &serverHandler{}, Handler: &serverHandler{},
RTSPAddress: ":8554", RTSPAddress: ":8554",

View File

@@ -12,7 +12,7 @@ type typeAndPayload struct {
payload []byte payload []byte
} }
type serverMulticastHandler struct { type serverMulticastWriter struct {
rtpl *serverUDPListener rtpl *serverUDPListener
rtcpl *serverUDPListener rtcpl *serverUDPListener
writeBuffer *ringbuffer.RingBuffer writeBuffer *ringbuffer.RingBuffer
@@ -20,7 +20,7 @@ type serverMulticastHandler struct {
writerDone chan struct{} writerDone chan struct{}
} }
func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) { func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) {
res := make(chan net.IP) res := make(chan net.IP)
select { select {
case s.streamMulticastIP <- streamMulticastIPReq{res: res}: case s.streamMulticastIP <- streamMulticastIPReq{res: res}:
@@ -42,7 +42,7 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) {
wb, _ := ringbuffer.New(uint64(s.WriteBufferCount)) wb, _ := ringbuffer.New(uint64(s.WriteBufferCount))
h := &serverMulticastHandler{ h := &serverMulticastWriter{
rtpl: rtpl, rtpl: rtpl,
rtcpl: rtcpl, rtcpl: rtcpl,
writeBuffer: wb, writeBuffer: wb,
@@ -54,18 +54,18 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) {
return h, nil return h, nil
} }
func (h *serverMulticastHandler) close() { func (h *serverMulticastWriter) close() {
h.rtpl.close() h.rtpl.close()
h.rtcpl.close() h.rtcpl.close()
h.writeBuffer.Close() h.writeBuffer.Close()
<-h.writerDone <-h.writerDone
} }
func (h *serverMulticastHandler) ip() net.IP { func (h *serverMulticastWriter) ip() net.IP {
return h.rtpl.ip() return h.rtpl.ip()
} }
func (h *serverMulticastHandler) runWriter() { func (h *serverMulticastWriter) runWriter() {
defer close(h.writerDone) defer close(h.writerDone)
rtpAddr := &net.UDPAddr{ rtpAddr := &net.UDPAddr{
@@ -93,14 +93,14 @@ func (h *serverMulticastHandler) runWriter() {
} }
} }
func (h *serverMulticastHandler) writePacketRTP(payload []byte) { func (h *serverMulticastWriter) writePacketRTP(payload []byte) {
h.writeBuffer.Push(typeAndPayload{ h.writeBuffer.Push(typeAndPayload{
isRTP: true, isRTP: true,
payload: payload, payload: payload,
}) })
} }
func (h *serverMulticastHandler) writePacketRTCP(payload []byte) { func (h *serverMulticastWriter) writePacketRTCP(payload []byte) {
h.writeBuffer.Push(typeAndPayload{ h.writeBuffer.Push(typeAndPayload{
isRTP: false, isRTP: false,
payload: payload, payload: payload,

View File

@@ -755,13 +755,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
sm := newServerSessionMedia(ss, medi) sm := newServerSessionMedia(ss, medi)
if ss.state == ServerSessionStatePreRecord {
sm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range sm.media.Formats {
sm.formats[forma.PayloadType()] = newServerSessionFormat(sm, forma)
}
}
switch transport { switch transport {
case TransportUDP: case TransportUDP:
sm.udpRTPReadPort = inTH.ClientPorts[0] sm.udpRTPReadPort = inTH.ClientPorts[0]
@@ -791,7 +784,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th.Delivery = &de th.Delivery = &de
v := uint(127) v := uint(127)
th.TTL = &v th.TTL = &v
d := stream.streamMedias[medi].multicastHandler.ip() d := stream.streamMedias[medi].multicastWriter.ip()
th.Destination = &d th.Destination = &d
th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort} th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort}

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/pion/rtcp"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/aler9/gortsplib/v2/pkg/format" "github.com/aler9/gortsplib/v2/pkg/format"
@@ -27,19 +28,40 @@ func newServerSessionFormat(sm *serverSessionMedia, forma format.Format) *server
} }
} }
func (st *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { func (sf *serverSessionFormat) start() {
packets, missing := st.udpReorderer.Process(pkt) if (*sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast) &&
sf.sm.ss.state != ServerSessionStatePlay {
sf.udpReorderer = rtpreorderer.New()
sf.udpRTCPReceiver = rtcpreceiver.New(
sf.sm.ss.s.udpReceiverReportPeriod,
nil,
sf.format.ClockRate(),
func(pkt rtcp.Packet) {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt)
})
}
}
func (sf *serverSessionFormat) stop() {
if sf.udpRTCPReceiver != nil {
sf.udpRTCPReceiver.Close()
sf.udpRTCPReceiver = nil
}
}
func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) {
packets, missing := sf.udpReorderer.Process(pkt)
if missing != 0 { if missing != 0 {
onDecodeError(st.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) onDecodeError(sf.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing))
// do not return // do not return
} }
for _, pkt := range packets { for _, pkt := range packets {
st.udpRTCPReceiver.ProcessPacket(pkt, now, st.format.PTSEqualsDTS(pkt)) sf.udpRTCPReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
st.onPacketRTP(pkt) sf.onPacketRTP(pkt)
} }
} }
func (st *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) {
st.onPacketRTP(pkt) sf.onPacketRTP(pkt)
} }

View File

@@ -11,8 +11,6 @@ import (
"github.com/aler9/gortsplib/v2/pkg/base" "github.com/aler9/gortsplib/v2/pkg/base"
"github.com/aler9/gortsplib/v2/pkg/media" "github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/v2/pkg/rtpreorderer"
) )
type serverSessionMedia struct { type serverSessionMedia struct {
@@ -26,7 +24,7 @@ type serverSessionMedia struct {
tcpRTPFrame *base.InterleavedFrame tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte tcpBuffer []byte
formats map[uint8]*serverSessionFormat // record formats map[uint8]*serverSessionFormat // record only
writePacketRTPInQueue func([]byte) writePacketRTPInQueue func([]byte)
writePacketRTCPInQueue func([]byte) writePacketRTCPInQueue func([]byte)
readRTP func([]byte) error readRTP func([]byte) error
@@ -35,11 +33,20 @@ type serverSessionMedia struct {
} }
func newServerSessionMedia(ss *ServerSession, medi *media.Media) *serverSessionMedia { func newServerSessionMedia(ss *ServerSession, medi *media.Media) *serverSessionMedia {
return &serverSessionMedia{ sm := &serverSessionMedia{
ss: ss, ss: ss,
media: medi, media: medi,
onPacketRTCP: func(rtcp.Packet) {}, onPacketRTCP: func(rtcp.Packet) {},
} }
if ss.state == ServerSessionStatePreRecord {
sm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range medi.Formats {
sm.formats[forma.PayloadType()] = newServerSessionFormat(sm, forma)
}
}
return sm
} }
func (sm *serverSessionMedia) start() { func (sm *serverSessionMedia) start() {
@@ -53,19 +60,6 @@ func (sm *serverSessionMedia) start() {
} else { } else {
sm.readRTP = sm.readRTPUDPRecord sm.readRTP = sm.readRTPUDPRecord
sm.readRTCP = sm.readRTCPUDPRecord sm.readRTCP = sm.readRTCPUDPRecord
for _, tr := range sm.formats {
tr.udpReorderer = rtpreorderer.New()
cmedia := sm.media
tr.udpRTCPReceiver = rtcpreceiver.New(
sm.ss.s.udpReceiverReportPeriod,
nil,
tr.format.ClockRate(),
func(pkt rtcp.Packet) {
sm.ss.WritePacketRTCP(cmedia, pkt)
})
}
} }
case TransportTCP: case TransportTCP:
@@ -87,7 +81,7 @@ func (sm *serverSessionMedia) start() {
if *sm.ss.setuppedTransport == TransportUDP { if *sm.ss.setuppedTransport == TransportUDP {
if sm.ss.state == ServerSessionStatePlay { if sm.ss.state == ServerSessionStatePlay {
// firewall opening is performed by RTCP sender reports generated by ServerStream // firewall opening is performed with RTCP sender reports generated by ServerStream
// readers can send RTCP packets only // readers can send RTCP packets only
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm) sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm)
@@ -100,6 +94,10 @@ func (sm *serverSessionMedia) start() {
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm) sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm)
} }
} }
for _, sf := range sm.formats {
sf.start()
}
} }
func (sm *serverSessionMedia) stop() { func (sm *serverSessionMedia) stop() {
@@ -108,11 +106,8 @@ func (sm *serverSessionMedia) stop() {
sm.ss.s.udpRTCPListener.removeClient(sm) sm.ss.s.udpRTCPListener.removeClient(sm)
} }
for _, tr := range sm.formats { for _, sf := range sm.formats {
if tr.udpRTCPReceiver != nil { sf.stop()
tr.udpRTCPReceiver.Close()
tr.udpRTCPReceiver = nil
}
} }
} }

View File

@@ -205,9 +205,9 @@ func (st *ServerStream) readerRemove(ss *ServerSession) {
if len(st.readers) == 0 { if len(st.readers) == 0 {
for _, media := range st.streamMedias { for _, media := range st.streamMedias {
if media.multicastHandler != nil { if media.multicastWriter != nil {
media.multicastHandler.close() media.multicastWriter.close()
media.multicastHandler = nil media.multicastWriter = nil
} }
} }
} }
@@ -224,8 +224,8 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) {
if *ss.setuppedTransport == TransportUDPMulticast { if *ss.setuppedTransport == TransportUDPMulticast {
for medi, sm := range ss.setuppedMedias { for medi, sm := range ss.setuppedMedias {
streamMedia := st.streamMedias[medi] streamMedia := st.streamMedias[medi]
streamMedia.multicastHandler.rtcpl.addClient( streamMedia.multicastWriter.rtcpl.addClient(
ss.author.ip(), streamMedia.multicastHandler.rtcpl.port(), sm) ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm)
} }
} else { } else {
st.activeUnicastReaders[ss] = struct{}{} st.activeUnicastReaders[ss] = struct{}{}
@@ -243,7 +243,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
if *ss.setuppedTransport == TransportUDPMulticast { if *ss.setuppedTransport == TransportUDPMulticast {
for medi, sm := range ss.setuppedMedias { for medi, sm := range ss.setuppedMedias {
streamMedia := st.streamMedias[medi] streamMedia := st.streamMedias[medi]
streamMedia.multicastHandler.rtcpl.removeClient(sm) streamMedia.multicastWriter.rtcpl.removeClient(sm)
} }
} else { } else {
delete(st.activeUnicastReaders, ss) delete(st.activeUnicastReaders, ss)

View File

@@ -12,10 +12,10 @@ import (
) )
type serverStreamMedia struct { type serverStreamMedia struct {
uuid uuid.UUID uuid uuid.UUID
media *media.Media media *media.Media
formats map[uint8]*serverStreamFormat formats map[uint8]*serverStreamFormat
multicastHandler *serverMulticastHandler multicastWriter *serverMulticastWriter
} }
func newServerStreamMedia(st *ServerStream, medi *media.Media) *serverStreamMedia { func newServerStreamMedia(st *ServerStream, medi *media.Media) *serverStreamMedia {
@@ -51,19 +51,19 @@ func (sm *serverStreamMedia) close() {
} }
} }
if sm.multicastHandler != nil { if sm.multicastWriter != nil {
sm.multicastHandler.close() sm.multicastWriter.close()
} }
} }
func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error { func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error {
if sm.multicastHandler == nil { if sm.multicastWriter == nil {
mh, err := newServerMulticastHandler(s) mh, err := newServerMulticastWriter(s)
if err != nil { if err != nil {
return err return err
} }
sm.multicastHandler = mh sm.multicastWriter = mh
} }
return nil return nil
} }
@@ -89,8 +89,8 @@ func (sm *serverStreamMedia) WritePacketRTPWithNTP(ss *ServerStream, pkt *rtp.Pa
} }
// send multicast // send multicast
if sm.multicastHandler != nil { if sm.multicastWriter != nil {
sm.multicastHandler.writePacketRTP(byts) sm.multicastWriter.writePacketRTP(byts)
} }
} }
@@ -109,7 +109,7 @@ func (sm *serverStreamMedia) writePacketRTCP(ss *ServerStream, pkt rtcp.Packet)
} }
// send multicast // send multicast
if sm.multicastHandler != nil { if sm.multicastWriter != nil {
sm.multicastHandler.writePacketRTCP(byts) sm.multicastWriter.writePacketRTCP(byts)
} }
} }