diff --git a/README.md b/README.md index c6da654b..6e4f524c 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ Features: * [server](examples/server/main.go) * [server-tls](examples/server-tls/main.go) * [server-h264-save-to-disk](examples/server-h264-save-to-disk/main.go) +* [proxy](examples/proxy/main.go) ## API Documentation diff --git a/client_format.go b/client_format.go index c37ce43a..717f4b83 100644 --- a/client_format.go +++ b/client_format.go @@ -32,27 +32,27 @@ func newClientFormat(cm *clientMedia, forma format.Format) *clientFormat { } } -func (ct *clientFormat) start(cm *clientMedia) { - if cm.c.state == clientStatePlay { - if cm.udpRTPListener != nil { +func (ct *clientFormat) start() { + if ct.cm.c.state == clientStatePlay { + if ct.cm.udpRTPListener != nil { ct.udpReorderer = rtpreorderer.New() ct.udpRTCPReceiver = rtcpreceiver.New( - cm.c.udpReceiverReportPeriod, + ct.cm.c.udpReceiverReportPeriod, nil, ct.format.ClockRate(), func(pkt rtcp.Packet) { - cm.writePacketRTCP(pkt) + ct.cm.writePacketRTCP(pkt) }) } } else { ct.rtcpSender = rtcpsender.New( ct.format.ClockRate(), func(pkt rtcp.Packet) { - cm.writePacketRTCP(pkt) + ct.cm.writePacketRTCP(pkt) }) } } -// start RTCP senders after write() has been allocated in order to avoid a crash +// start writing after write*() has been allocated in order to avoid a crash func (ct *clientFormat) startWriting() { if ct.c.state != clientStatePlay && !ct.c.DisableRTCPSenderReports { ct.rtcpSender.Start(ct.c.senderReportPeriod) diff --git a/client_media.go b/client_media.go index fecef3f0..c99275b8 100644 --- a/client_media.go +++ b/client_media.go @@ -121,7 +121,7 @@ func (cm *clientMedia) start() { } for _, ct := range cm.formats { - ct.start(cm) + ct.start() } if cm.udpRTPListener != nil { diff --git a/examples/client-read/main.go b/examples/client-read/main.go index 4209d53d..82e53a86 100644 --- a/examples/client-read/main.go +++ b/examples/client-read/main.go @@ -11,8 +11,9 @@ import ( "github.com/pion/rtp" ) -// This example shows how to connect to a RTSP server -// and read all medias on a path. +// This example shows how to +// 1. connect to a RTSP server +// 2. read all media streams on a path. func main() { c := gortsplib.Client{} diff --git a/examples/proxy/client.go b/examples/proxy/client.go new file mode 100644 index 00000000..e6c00c95 --- /dev/null +++ b/examples/proxy/client.go @@ -0,0 +1,109 @@ +package main + +import ( + "log" + "sync" + "time" + + "github.com/aler9/gortsplib/v2" + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/aler9/gortsplib/v2/pkg/url" + "github.com/pion/rtp" +) + +const ( + existingStream = "rtsp://x.x.x.x:8554/mystream" + reconnectPause = 2 * time.Second +) + +type client struct { + mutex sync.RWMutex + stream *gortsplib.ServerStream +} + +func newClient() *client { + c := &client{} + + // start a separated routine + go c.run() + + return c +} + +func (c *client) run() { + for { + err := c.read() + log.Printf("ERR: %s\n", err) + + time.Sleep(reconnectPause) + } +} + +func (c *client) getStream() *gortsplib.ServerStream { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.stream +} + +func (c *client) read() error { + rc := gortsplib.Client{} + + // parse URL + u, err := url.Parse(existingStream) + if err != nil { + return err + } + + // connect to the server + err = rc.Start(u.Scheme, u.Host) + if err != nil { + return err + } + defer rc.Close() + + // find published medias + medias, baseURL, _, err := rc.Describe(u) + if err != nil { + return err + } + + // setup all medias + err = rc.SetupAll(medias, baseURL) + if err != nil { + return err + } + + // create a server stream + stream := gortsplib.NewServerStream(medias) + defer stream.Close() + + log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n") + + // make stream available by using getStream() + c.mutex.Lock() + c.stream = stream + c.mutex.Unlock() + + defer func() { + // remove stream from getStream() + c.mutex.Lock() + c.stream = nil + c.mutex.Unlock() + }() + + // called when a RTP packet arrives + rc.OnPacketRTPAny(func(medi *media.Media, forma format.Format, pkt *rtp.Packet) { + // route incoming packets to the server stream + stream.WritePacketRTP(medi, pkt) + }) + + // start playing + _, err = rc.Play(nil) + if err != nil { + return err + } + + // wait until a fatal error + return rc.Wait() +} diff --git a/examples/proxy/main.go b/examples/proxy/main.go new file mode 100644 index 00000000..2efda710 --- /dev/null +++ b/examples/proxy/main.go @@ -0,0 +1,14 @@ +package main + +// This example shows how to +// 1. read an existing stream from an external server or camera, with a client +// 2. create a server that allow to proxy that stream + +func main() { + // allocate the client + c := newClient() + + // allocate the server. + // give server access to the method client.getStream(). + newServer(c.getStream) +} diff --git a/examples/proxy/server.go b/examples/proxy/server.go new file mode 100644 index 00000000..52e7491b --- /dev/null +++ b/examples/proxy/server.go @@ -0,0 +1,100 @@ +package main + +import ( + "log" + + "github.com/aler9/gortsplib/v2" + "github.com/aler9/gortsplib/v2/pkg/base" +) + +type server struct { + getStream func() *gortsplib.ServerStream +} + +func newServer( + getStream func() *gortsplib.ServerStream, +) *server { + s := &server{ + getStream: getStream, + } + + // configure the server + rs := &gortsplib.Server{ + Handler: s, + RTSPAddress: ":8554", + UDPRTPAddress: ":8000", + UDPRTCPAddress: ":8001", + MulticastIPRange: "224.1.0.0/16", + MulticastRTPPort: 8002, + MulticastRTCPPort: 8003, + } + + // start server and wait until a fatal error + log.Printf("server is ready") + panic(rs.StartAndWait()) +} + +// called when a connection is opened. +func (s *server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + log.Printf("conn opened") +} + +// called when a connection is closed. +func (s *server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + log.Printf("conn closed (%v)", ctx.Error) +} + +// called when a session is opened. +func (s *server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + log.Printf("session opened") +} + +// called when a session is closed. +func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + log.Printf("session closed") +} + +// called when receiving a DESCRIBE request. +func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("describe request") + + stream := s.getStream() + + // stream is not available yet + if stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil +} + +// called when receiving a SETUP request. +func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("setup request") + + stream := s.getStream() + + // stream is not available yet + if stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil +} + +// called when receiving a PLAY request. +func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + log.Printf("play request") + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} diff --git a/examples/server-h264-save-to-disk/main.go b/examples/server-h264-save-to-disk/main.go index 348aa10d..2d12c47c 100644 --- a/examples/server-h264-save-to-disk/main.go +++ b/examples/server-h264-save-to-disk/main.go @@ -127,7 +127,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas } func main() { - // configure server + // configure the server s := &gortsplib.Server{ Handler: &serverHandler{}, RTSPAddress: ":8554", diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index 7f7aa034..3562477a 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -145,7 +145,7 @@ func main() { panic(err) } - // configure server + // configure the server s := &gortsplib.Server{ Handler: &serverHandler{}, TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, diff --git a/examples/server/main.go b/examples/server/main.go index ef66109a..e838ea0e 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -136,7 +136,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas } func main() { - // configure server + // configure the server s := &gortsplib.Server{ Handler: &serverHandler{}, RTSPAddress: ":8554", diff --git a/server_multicast_handler.go b/server_multicast_writer.go similarity index 78% rename from server_multicast_handler.go rename to server_multicast_writer.go index 4f460158..876581be 100644 --- a/server_multicast_handler.go +++ b/server_multicast_writer.go @@ -12,7 +12,7 @@ type typeAndPayload struct { payload []byte } -type serverMulticastHandler struct { +type serverMulticastWriter struct { rtpl *serverUDPListener rtcpl *serverUDPListener writeBuffer *ringbuffer.RingBuffer @@ -20,7 +20,7 @@ type serverMulticastHandler struct { writerDone chan struct{} } -func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) { +func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { res := make(chan net.IP) select { case s.streamMulticastIP <- streamMulticastIPReq{res: res}: @@ -42,7 +42,7 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) { wb, _ := ringbuffer.New(uint64(s.WriteBufferCount)) - h := &serverMulticastHandler{ + h := &serverMulticastWriter{ rtpl: rtpl, rtcpl: rtcpl, writeBuffer: wb, @@ -54,18 +54,18 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) { return h, nil } -func (h *serverMulticastHandler) close() { +func (h *serverMulticastWriter) close() { h.rtpl.close() h.rtcpl.close() h.writeBuffer.Close() <-h.writerDone } -func (h *serverMulticastHandler) ip() net.IP { +func (h *serverMulticastWriter) ip() net.IP { return h.rtpl.ip() } -func (h *serverMulticastHandler) runWriter() { +func (h *serverMulticastWriter) runWriter() { defer close(h.writerDone) rtpAddr := &net.UDPAddr{ @@ -93,14 +93,14 @@ func (h *serverMulticastHandler) runWriter() { } } -func (h *serverMulticastHandler) writePacketRTP(payload []byte) { +func (h *serverMulticastWriter) writePacketRTP(payload []byte) { h.writeBuffer.Push(typeAndPayload{ isRTP: true, payload: payload, }) } -func (h *serverMulticastHandler) writePacketRTCP(payload []byte) { +func (h *serverMulticastWriter) writePacketRTCP(payload []byte) { h.writeBuffer.Push(typeAndPayload{ isRTP: false, payload: payload, diff --git a/server_session.go b/server_session.go index 81cdf68c..0885f55b 100644 --- a/server_session.go +++ b/server_session.go @@ -755,13 +755,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base sm := newServerSessionMedia(ss, medi) - if ss.state == ServerSessionStatePreRecord { - sm.formats = make(map[uint8]*serverSessionFormat) - for _, forma := range sm.media.Formats { - sm.formats[forma.PayloadType()] = newServerSessionFormat(sm, forma) - } - } - switch transport { case TransportUDP: sm.udpRTPReadPort = inTH.ClientPorts[0] @@ -791,7 +784,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base th.Delivery = &de v := uint(127) th.TTL = &v - d := stream.streamMedias[medi].multicastHandler.ip() + d := stream.streamMedias[medi].multicastWriter.ip() th.Destination = &d th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort} diff --git a/server_session_format.go b/server_session_format.go index 90ea4465..eb6790ae 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/aler9/gortsplib/v2/pkg/format" @@ -27,19 +28,40 @@ func newServerSessionFormat(sm *serverSessionMedia, forma format.Format) *server } } -func (st *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { - packets, missing := st.udpReorderer.Process(pkt) +func (sf *serverSessionFormat) start() { + if (*sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast) && + sf.sm.ss.state != ServerSessionStatePlay { + sf.udpReorderer = rtpreorderer.New() + sf.udpRTCPReceiver = rtcpreceiver.New( + sf.sm.ss.s.udpReceiverReportPeriod, + nil, + sf.format.ClockRate(), + func(pkt rtcp.Packet) { + sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) + }) + } +} + +func (sf *serverSessionFormat) stop() { + if sf.udpRTCPReceiver != nil { + sf.udpRTCPReceiver.Close() + sf.udpRTCPReceiver = nil + } +} + +func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { + packets, missing := sf.udpReorderer.Process(pkt) if missing != 0 { - onDecodeError(st.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) + onDecodeError(sf.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) // do not return } for _, pkt := range packets { - st.udpRTCPReceiver.ProcessPacket(pkt, now, st.format.PTSEqualsDTS(pkt)) - st.onPacketRTP(pkt) + sf.udpRTCPReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) + sf.onPacketRTP(pkt) } } -func (st *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { - st.onPacketRTP(pkt) +func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { + sf.onPacketRTP(pkt) } diff --git a/server_session_media.go b/server_session_media.go index e44ce85b..86ada2fc 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -11,8 +11,6 @@ import ( "github.com/aler9/gortsplib/v2/pkg/base" "github.com/aler9/gortsplib/v2/pkg/media" - "github.com/aler9/gortsplib/v2/pkg/rtcpreceiver" - "github.com/aler9/gortsplib/v2/pkg/rtpreorderer" ) type serverSessionMedia struct { @@ -26,7 +24,7 @@ type serverSessionMedia struct { tcpRTPFrame *base.InterleavedFrame tcpRTCPFrame *base.InterleavedFrame tcpBuffer []byte - formats map[uint8]*serverSessionFormat // record + formats map[uint8]*serverSessionFormat // record only writePacketRTPInQueue func([]byte) writePacketRTCPInQueue func([]byte) readRTP func([]byte) error @@ -35,11 +33,20 @@ type serverSessionMedia struct { } func newServerSessionMedia(ss *ServerSession, medi *media.Media) *serverSessionMedia { - return &serverSessionMedia{ + sm := &serverSessionMedia{ ss: ss, media: medi, onPacketRTCP: func(rtcp.Packet) {}, } + + if ss.state == ServerSessionStatePreRecord { + sm.formats = make(map[uint8]*serverSessionFormat) + for _, forma := range medi.Formats { + sm.formats[forma.PayloadType()] = newServerSessionFormat(sm, forma) + } + } + + return sm } func (sm *serverSessionMedia) start() { @@ -53,19 +60,6 @@ func (sm *serverSessionMedia) start() { } else { sm.readRTP = sm.readRTPUDPRecord sm.readRTCP = sm.readRTCPUDPRecord - - for _, tr := range sm.formats { - tr.udpReorderer = rtpreorderer.New() - - cmedia := sm.media - tr.udpRTCPReceiver = rtcpreceiver.New( - sm.ss.s.udpReceiverReportPeriod, - nil, - tr.format.ClockRate(), - func(pkt rtcp.Packet) { - sm.ss.WritePacketRTCP(cmedia, pkt) - }) - } } case TransportTCP: @@ -87,7 +81,7 @@ func (sm *serverSessionMedia) start() { if *sm.ss.setuppedTransport == TransportUDP { if sm.ss.state == ServerSessionStatePlay { - // firewall opening is performed by RTCP sender reports generated by ServerStream + // firewall opening is performed with RTCP sender reports generated by ServerStream // readers can send RTCP packets only sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm) @@ -100,6 +94,10 @@ func (sm *serverSessionMedia) start() { sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm) } } + + for _, sf := range sm.formats { + sf.start() + } } func (sm *serverSessionMedia) stop() { @@ -108,11 +106,8 @@ func (sm *serverSessionMedia) stop() { sm.ss.s.udpRTCPListener.removeClient(sm) } - for _, tr := range sm.formats { - if tr.udpRTCPReceiver != nil { - tr.udpRTCPReceiver.Close() - tr.udpRTCPReceiver = nil - } + for _, sf := range sm.formats { + sf.stop() } } diff --git a/server_stream.go b/server_stream.go index b6721f94..e5cc2a21 100644 --- a/server_stream.go +++ b/server_stream.go @@ -205,9 +205,9 @@ func (st *ServerStream) readerRemove(ss *ServerSession) { if len(st.readers) == 0 { for _, media := range st.streamMedias { - if media.multicastHandler != nil { - media.multicastHandler.close() - media.multicastHandler = nil + if media.multicastWriter != nil { + media.multicastWriter.close() + media.multicastWriter = nil } } } @@ -224,8 +224,8 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) { if *ss.setuppedTransport == TransportUDPMulticast { for medi, sm := range ss.setuppedMedias { streamMedia := st.streamMedias[medi] - streamMedia.multicastHandler.rtcpl.addClient( - ss.author.ip(), streamMedia.multicastHandler.rtcpl.port(), sm) + streamMedia.multicastWriter.rtcpl.addClient( + ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm) } } else { st.activeUnicastReaders[ss] = struct{}{} @@ -243,7 +243,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { if *ss.setuppedTransport == TransportUDPMulticast { for medi, sm := range ss.setuppedMedias { streamMedia := st.streamMedias[medi] - streamMedia.multicastHandler.rtcpl.removeClient(sm) + streamMedia.multicastWriter.rtcpl.removeClient(sm) } } else { delete(st.activeUnicastReaders, ss) diff --git a/server_stream_media.go b/server_stream_media.go index fb651609..2bbc935b 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -12,10 +12,10 @@ import ( ) type serverStreamMedia struct { - uuid uuid.UUID - media *media.Media - formats map[uint8]*serverStreamFormat - multicastHandler *serverMulticastHandler + uuid uuid.UUID + media *media.Media + formats map[uint8]*serverStreamFormat + multicastWriter *serverMulticastWriter } func newServerStreamMedia(st *ServerStream, medi *media.Media) *serverStreamMedia { @@ -51,19 +51,19 @@ func (sm *serverStreamMedia) close() { } } - if sm.multicastHandler != nil { - sm.multicastHandler.close() + if sm.multicastWriter != nil { + sm.multicastWriter.close() } } func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error { - if sm.multicastHandler == nil { - mh, err := newServerMulticastHandler(s) + if sm.multicastWriter == nil { + mh, err := newServerMulticastWriter(s) if err != nil { return err } - sm.multicastHandler = mh + sm.multicastWriter = mh } return nil } @@ -89,8 +89,8 @@ func (sm *serverStreamMedia) WritePacketRTPWithNTP(ss *ServerStream, pkt *rtp.Pa } // send multicast - if sm.multicastHandler != nil { - sm.multicastHandler.writePacketRTP(byts) + if sm.multicastWriter != nil { + sm.multicastWriter.writePacketRTP(byts) } } @@ -109,7 +109,7 @@ func (sm *serverStreamMedia) writePacketRTCP(ss *ServerStream, pkt rtcp.Packet) } // send multicast - if sm.multicastHandler != nil { - sm.multicastHandler.writePacketRTCP(byts) + if sm.multicastWriter != nil { + sm.multicastWriter.writePacketRTCP(byts) } }