add server-h264-from-disk example (#719) (#727)

This commit is contained in:
Alessandro Ros
2025-03-23 16:53:54 +01:00
committed by GitHub
parent 8c6495c33b
commit e46ae59920
10 changed files with 278 additions and 35 deletions

View File

@@ -96,6 +96,7 @@ Features:
* [server-tls](examples/server-tls/main.go)
* [server-auth](examples/server-auth/main.go)
* [server-h264-to-disk](examples/server-h264-to-disk/main.go)
* [server-h264-from-disk](examples/server-h264-from-disk/main.go)
* [proxy](examples/proxy/main.go)
## API Documentation

View File

@@ -137,7 +137,7 @@ func main() {
return nil
})
// start reading the MPEG-TS file
// read the MPEG-TS file
for {
err := r.Read()
if err != nil {

View File

@@ -17,7 +17,7 @@ const (
)
type client struct {
s *server
server *server
}
func (c *client) initialize() {
@@ -62,8 +62,8 @@ func (c *client) read() error {
return err
}
stream := c.s.setStreamReady(desc)
defer c.s.setStreamUnready()
stream := c.server.setStreamReady(desc)
defer c.server.setStreamUnready()
log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n")

View File

@@ -14,10 +14,10 @@ func main() {
// allocate the client.
// give client access to the server.
c := &client{s: s}
c := &client{server: s}
c.initialize()
// start server and wait until a fatal error
log.Printf("server is ready")
s.s.StartAndWait()
log.Printf("server is ready on %s", s.server.RTSPAddress)
s.server.StartAndWait()
}

View File

@@ -10,14 +10,14 @@ import (
)
type server struct {
s *gortsplib.Server
server *gortsplib.Server
mutex sync.RWMutex
stream *gortsplib.ServerStream
}
func (s *server) initialize() {
// configure the server
s.s = &gortsplib.Server{
s.server = &gortsplib.Server{
Handler: s,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
@@ -99,7 +99,7 @@ func (s *server) setStreamReady(desc *description.Session) *gortsplib.ServerStre
s.mutex.Lock()
defer s.mutex.Unlock()
s.stream = &gortsplib.ServerStream{
Server: s.s,
Server: s.server,
Desc: desc,
}
err := s.stream.Initialize()

View File

@@ -29,7 +29,7 @@ const (
)
type serverHandler struct {
s *gortsplib.Server
server *gortsplib.Server
mutex sync.RWMutex
stream *gortsplib.ServerStream
publisher *gortsplib.ServerSession
@@ -54,8 +54,8 @@ func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpen
func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
sh.mutex.Lock()
defer sh.mutex.Unlock()
sh.mutex.RLock()
defer sh.mutex.RUnlock()
// if the session is the publisher,
// close the stream and disconnect any reader.
@@ -118,7 +118,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
// create the stream and save the publisher
sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Server: sh.server,
Desc: ctx.Description,
}
err := sh.stream.Initialize()
@@ -194,7 +194,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
func main() {
// configure the server
h := &serverHandler{}
h.s = &gortsplib.Server{
h.server = &gortsplib.Server{
Handler: h,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
@@ -205,6 +205,6 @@ func main() {
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(h.s.StartAndWait())
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.StartAndWait())
}

View File

@@ -0,0 +1,242 @@
package main
import (
"crypto/rand"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
)
// This example shows how to
// 1. create a RTSP server which accepts plain connections
// 2. read from disk a MPEG-TS file which contains a H264 track
// 3. serve the content of the file to connected readers
func findTrack(r *mpegts.Reader) (*mpegts.Track, error) {
for _, track := range r.Tracks() {
if _, ok := track.Codec.(*mpegts.CodecH264); ok {
return track, nil
}
}
return nil, fmt.Errorf("H264 track not found")
}
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
type serverHandler struct {
server *gortsplib.Server
stream *gortsplib.ServerStream
mutex sync.RWMutex
}
// called when a connection is opened.
func (sh *serverHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("conn opened")
}
// called when a connection is closed.
func (sh *serverHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("conn closed (%v)", ctx.Error)
}
// called when a session is opened.
func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
log.Printf("session opened")
}
// called when a session is closed.
func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
}
// called when receiving a DESCRIBE request.
func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("describe request")
sh.mutex.RLock()
defer sh.mutex.RUnlock()
return &base.Response{
StatusCode: base.StatusOK,
}, sh.stream, nil
}
// called when receiving a SETUP request.
func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("setup request")
sh.mutex.RLock()
defer sh.mutex.RUnlock()
return &base.Response{
StatusCode: base.StatusOK,
}, sh.stream, nil
}
// called when receiving a PLAY request.
func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("play request")
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
func readVideoFile(stream *gortsplib.ServerStream, media *description.Media, forma *format.H264) {
// open a file in MPEG-TS format
f, err := os.Open("myvideo.ts")
if err != nil {
panic(err)
}
defer f.Close()
// setup MPEG-TS parser
r := &mpegts.Reader{R: f}
err = r.Initialize()
if err != nil {
panic(err)
}
// find the H264 track inside the file
track, err := findTrack(r)
if err != nil {
panic(err)
}
// setup H264 -> RTP encoder
rtpEnc, err := forma.CreateEncoder()
if err != nil {
panic(err)
}
randomStart, err := randUint32()
if err != nil {
panic(err)
}
timeDecoder := mpegts.TimeDecoder{}
timeDecoder.Initialize()
var firstDTS *int64
var startTime time.Time
// setup a callback that is called whenever a H264 access unit is read from the file
r.OnDataH264(track, func(pts, dts int64, au [][]byte) error {
dts = timeDecoder.Decode(dts)
pts = timeDecoder.Decode(pts)
// sleep between access units
if firstDTS != nil {
timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime)
if timeDrift > 0 {
time.Sleep(timeDrift)
}
} else {
startTime = time.Now()
firstDTS = &dts
}
log.Printf("writing access unit with pts=%d dts=%d", pts, dts)
// wrap the access unit into RTP packets
packets, err := rtpEnc.Encode(au)
if err != nil {
return err
}
// set packet timestamp
// we don't have to perform any conversion
// since H264 clock rate is the same in both MPEG-TS and RTSP
for _, packet := range packets {
packet.Timestamp = uint32(int64(randomStart) + pts)
}
// write RTP packets to the server
for _, packet := range packets {
err := stream.WritePacketRTP(media, packet)
if err != nil {
return err
}
}
return nil
})
for {
err := r.Read()
if err != nil {
panic(err)
}
}
}
func main() {
h := &serverHandler{}
// prevent clients from connecting to the server until the stream is properly set up
h.mutex.Lock()
// create the server
h.server = &gortsplib.Server{
Handler: h,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
UDPRTCPAddress: ":8001",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8002,
MulticastRTCPPort: 8003,
}
// start the server
err := h.server.Start()
if err != nil {
panic(err)
}
// create a RTSP description that contains a H264 format
forma := &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}
desc := &description.Session{
Medias: []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{forma},
}},
}
// create a server stream
h.stream = &gortsplib.ServerStream{
Server: h.server,
Desc: desc,
}
err = h.stream.Initialize()
if err != nil {
panic(err)
}
// in a separate routine, read the MPEG-TS file
go readVideoFile(h.stream, desc.Medias[0], forma)
// allow clients to connect
h.mutex.Unlock()
// wait until a fatal error
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.Wait())
}

View File

@@ -20,7 +20,7 @@ import (
// 3. save the content of the H264 media in a file in MPEG-TS format
type serverHandler struct {
s *gortsplib.Server
server *gortsplib.Server
mutex sync.Mutex
publisher *gortsplib.ServerSession
media *description.Media
@@ -151,7 +151,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
func main() {
// configure the server
h := &serverHandler{}
h.s = &gortsplib.Server{
h.server = &gortsplib.Server{
Handler: h,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
@@ -162,6 +162,6 @@ func main() {
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(h.s.StartAndWait())
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.StartAndWait())
}

View File

@@ -19,7 +19,7 @@ import (
// 3. allow multiple clients to read that stream with TCP
type serverHandler struct {
s *gortsplib.Server
server *gortsplib.Server
mutex sync.RWMutex
stream *gortsplib.ServerStream
publisher *gortsplib.ServerSession
@@ -44,8 +44,8 @@ func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpen
func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
sh.mutex.Lock()
defer sh.mutex.Unlock()
sh.mutex.RLock()
defer sh.mutex.RUnlock()
// if the session is the publisher,
// close the stream and disconnect any reader.
@@ -79,8 +79,8 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (
func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
log.Printf("announce request")
sh.mutex.Lock()
defer sh.mutex.Unlock()
sh.mutex.RLock()
defer sh.mutex.RUnlock()
// disconnect existing publisher
if sh.stream != nil {
@@ -90,7 +90,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
// create the stream and save the publisher
sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Server: sh.server,
Desc: ctx.Description,
}
err := sh.stream.Initialize()
@@ -165,13 +165,13 @@ func main() {
// configure the server
h := &serverHandler{}
h.s = &gortsplib.Server{
h.server = &gortsplib.Server{
Handler: h,
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
RTSPAddress: ":8322",
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(h.s.StartAndWait())
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.StartAndWait())
}

View File

@@ -18,7 +18,7 @@ import (
// 3. allow multiple clients to read that stream with TCP, UDP or UDP-multicast
type serverHandler struct {
s *gortsplib.Server
server *gortsplib.Server
mutex sync.RWMutex
stream *gortsplib.ServerStream
publisher *gortsplib.ServerSession
@@ -89,7 +89,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
// create the stream and save the publisher
sh.stream = &gortsplib.ServerStream{
Server: sh.s,
Server: sh.server,
Desc: ctx.Description,
}
err := sh.stream.Initialize()
@@ -156,7 +156,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
func main() {
// configure the server
h := &serverHandler{}
h.s = &gortsplib.Server{
h.server = &gortsplib.Server{
Handler: h,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
@@ -167,6 +167,6 @@ func main() {
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(h.s.StartAndWait())
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.StartAndWait())
}