diff --git a/examples/client-record-format-h264-from-disk/main.go b/examples/client-record-format-h264-from-disk/main.go index 38a45452..477ff2a2 100644 --- a/examples/client-record-format-h264-from-disk/main.go +++ b/examples/client-record-format-h264-from-disk/main.go @@ -2,11 +2,14 @@ package main import ( "crypto/rand" + "errors" "fmt" + "io" "log" "os" "time" + "github.com/asticode/go-astits" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" @@ -38,26 +41,6 @@ func randUint32() (uint32, error) { } func main() { - // open a file in MPEG-TS format - f, err := os.Open("myvideo.ts") - if err != nil { - panic(err) - } - defer f.Close() - - // setup MPEG-TS parser - r := &mpegts.Reader{R: f} - err = r.Initialize() - if err != nil { - panic(err) - } - - // find the H264 track inside the file - track, err := findTrack(r) - if err != nil { - panic(err) - } - // create a RTSP description that contains a H264 format forma := &format.H264{ PayloadTyp: 96, @@ -72,12 +55,19 @@ func main() { // connect to the server, announce the format and start recording c := gortsplib.Client{} - err = c.StartRecording("rtsp://myuser:mypass@localhost:8554/mystream", desc) + err := c.StartRecording("rtsp://myuser:mypass@localhost:8554/mystream", desc) if err != nil { panic(err) } defer c.Close() + // open a file in MPEG-TS format + f, err := os.Open("myvideo.ts") + if err != nil { + panic(err) + } + defer f.Close() + // setup H264 -> RTP encoder rtpEnc, err := forma.CreateEncoder() if err != nil { @@ -89,59 +79,92 @@ func main() { panic(err) } - timeDecoder := mpegts.TimeDecoder{} - timeDecoder.Initialize() - - var firstDTS *int64 - var startTime time.Time - - // setup a callback that is called whenever a H264 access unit is read from the file - r.OnDataH264(track, func(pts, dts int64, au [][]byte) error { - dts = timeDecoder.Decode(dts) - pts = timeDecoder.Decode(pts) - - // sleep between access units - if firstDTS != nil { - timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime) - if timeDrift > 0 { - time.Sleep(timeDrift) - } - } else { - startTime = time.Now() - firstDTS = &dts - } - - log.Printf("writing access unit with pts=%d dts=%d", pts, dts) - - // wrap the access unit into RTP packets - packets, err := rtpEnc.Encode(au) - if err != nil { - return err - } - - // set packet timestamp - // we don't have to perform any conversion - // since H264 clock rate is the same in both MPEG-TS and RTSP - for _, packet := range packets { - packet.Timestamp = uint32(int64(randomStart) + pts) - } - - // write RTP packets to the server - for _, packet := range packets { - err := c.WritePacketRTP(desc.Medias[0], packet) - if err != nil { - return err - } - } - - return nil - }) - - // read the MPEG-TS file for { - err := r.Read() + // setup MPEG-TS parser + r := &mpegts.Reader{R: f} + err = r.Initialize() if err != nil { panic(err) } + + // find the H264 track inside the file + track, err := findTrack(r) + if err != nil { + panic(err) + } + + timeDecoder := mpegts.TimeDecoder{} + timeDecoder.Initialize() + + var firstDTS *int64 + var startTime time.Time + var lastRTPTime uint32 + + // setup a callback that is called when a H264 access unit is read from the file + r.OnDataH264(track, func(pts, dts int64, au [][]byte) error { + dts = timeDecoder.Decode(dts) + pts = timeDecoder.Decode(pts) + + // sleep between access units + if firstDTS != nil { + timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime) + if timeDrift > 0 { + time.Sleep(timeDrift) + } + } else { + startTime = time.Now() + firstDTS = &dts + } + + log.Printf("writing access unit with pts=%d dts=%d", pts, dts) + + // wrap the access unit into RTP packets + packets, err := rtpEnc.Encode(au) + if err != nil { + return err + } + + // set packet timestamp + // we don't have to perform any conversion + // since H264 clock rate is the same in both MPEG-TS and RTSP + lastRTPTime = uint32(int64(randomStart) + pts) + for _, packet := range packets { + packet.Timestamp = lastRTPTime + } + + // write RTP packets to the server + for _, packet := range packets { + err := c.WritePacketRTP(desc.Medias[0], packet) + if err != nil { + return err + } + } + + return nil + }) + + // read the file + for { + err := r.Read() + if err != nil { + // file has ended + if errors.Is(err, astits.ErrNoMorePackets) { + log.Printf("file has ended, rewinding") + + // rewind to start position + _, err = f.Seek(0, io.SeekStart) + if err != nil { + panic(err) + } + + // keep current timestamp + randomStart = lastRTPTime + 1 + + break + } + + panic(err) + } + } } } diff --git a/examples/proxy/server.go b/examples/proxy/server.go index 432d0c96..52980065 100644 --- a/examples/proxy/server.go +++ b/examples/proxy/server.go @@ -50,7 +50,7 @@ func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { // called when receiving a DESCRIBE request. func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("describe request") + log.Printf("DESCRIBE request") s.mutex.RLock() defer s.mutex.RUnlock() @@ -69,7 +69,7 @@ func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re // called when receiving a SETUP request. func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("setup request") + log.Printf("SETUP request") s.mutex.RLock() defer s.mutex.RUnlock() @@ -88,7 +88,7 @@ func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response // called when receiving a PLAY request. func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("play request") + log.Printf("PLAY request") return &base.Response{ StatusCode: base.StatusOK, diff --git a/examples/server-auth/main.go b/examples/server-auth/main.go index 22c18e77..828c8990 100644 --- a/examples/server-auth/main.go +++ b/examples/server-auth/main.go @@ -67,7 +67,7 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo // called when receiving a DESCRIBE request. func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("describe request") + log.Printf("DESCRIBE request") // Verify reader credentials. // In case of readers, credentials have to be verified during DESCRIBE and SETUP. @@ -96,7 +96,7 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) ( // called when receiving an ANNOUNCE request. func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { - log.Printf("announce request") + log.Printf("ANNOUNCE request") // Verify publisher credentials. // In case of publishers, credentials have to be verified during ANNOUNCE. @@ -134,7 +134,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( // called when receiving a SETUP request. func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("setup request") + log.Printf("SETUP request") // SETUP is used by both readers and publishers. In case of publishers, just return StatusOK. if ctx.Session.State() == gortsplib.ServerSessionStatePreRecord { @@ -169,7 +169,7 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. // called when receiving a PLAY request. func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("play request") + log.Printf("PLAY request") return &base.Response{ StatusCode: base.StatusOK, @@ -178,7 +178,7 @@ func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Re // called when receiving a RECORD request. func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { - log.Printf("record request") + log.Printf("RECORD request") // called when receiving a RTP packet ctx.Session.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { diff --git a/examples/server-h264-from-disk/main.go b/examples/server-h264-from-disk/main.go index 8be126a1..b7096424 100644 --- a/examples/server-h264-from-disk/main.go +++ b/examples/server-h264-from-disk/main.go @@ -2,12 +2,15 @@ package main import ( "crypto/rand" + "errors" "fmt" + "io" "log" "os" "sync" "time" + "github.com/asticode/go-astits" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" @@ -38,6 +41,107 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } +func routeFrames(f *os.File, stream *gortsplib.ServerStream) { + // setup H264 -> RTP encoder + rtpEnc, err := stream.Desc.Medias[0].Formats[0].(*format.H264).CreateEncoder() + if err != nil { + panic(err) + } + + randomStart, err := randUint32() + if err != nil { + panic(err) + } + + for { + // setup MPEG-TS parser + r := &mpegts.Reader{R: f} + err = r.Initialize() + if err != nil { + panic(err) + } + + // find the H264 track inside the file + track, err := findTrack(r) + if err != nil { + panic(err) + } + + timeDecoder := mpegts.TimeDecoder{} + timeDecoder.Initialize() + + var firstDTS *int64 + var firstTime time.Time + var lastRTPTime uint32 + + // setup a callback that is called when a H264 access unit is read from the file + r.OnDataH264(track, func(pts, dts int64, au [][]byte) error { + dts = timeDecoder.Decode(dts) + pts = timeDecoder.Decode(pts) + + // sleep between access units + if firstDTS != nil { + timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(firstTime) + if timeDrift > 0 { + time.Sleep(timeDrift) + } + } else { + firstTime = time.Now() + firstDTS = &dts + } + + log.Printf("writing access unit with pts=%d dts=%d", pts, dts) + + // wrap the access unit into RTP packets + packets, err := rtpEnc.Encode(au) + if err != nil { + return err + } + + // set packet timestamp + // we don't have to perform any conversion + // since H264 clock rate is the same in both MPEG-TS and RTSP + lastRTPTime = uint32(int64(randomStart) + pts) + for _, packet := range packets { + packet.Timestamp = lastRTPTime + } + + // write RTP packets to the server + for _, packet := range packets { + err := stream.WritePacketRTP(stream.Desc.Medias[0], packet) + if err != nil { + return err + } + } + + return nil + }) + + // read the file + for { + err := r.Read() + if err != nil { + // file has ended + if errors.Is(err, astits.ErrNoMorePackets) { + log.Printf("file has ended, rewinding") + + // rewind to start position + _, err = f.Seek(0, io.SeekStart) + if err != nil { + panic(err) + } + + // keep current timestamp + randomStart = lastRTPTime + 1 + + break + } + panic(err) + } + } + } +} + type serverHandler struct { server *gortsplib.Server stream *gortsplib.ServerStream @@ -66,7 +170,7 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo // called when receiving a DESCRIBE request. func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("describe request") + log.Printf("DESCRIBE request") sh.mutex.RLock() defer sh.mutex.RUnlock() @@ -78,7 +182,7 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) ( // called when receiving a SETUP request. func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("setup request") + log.Printf("SETUP request") sh.mutex.RLock() defer sh.mutex.RUnlock() @@ -90,101 +194,13 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. // called when receiving a PLAY request. func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("play request") + log.Printf("PLAY request") return &base.Response{ StatusCode: base.StatusOK, }, nil } -func readVideoFile(stream *gortsplib.ServerStream, media *description.Media, forma *format.H264) { - // open a file in MPEG-TS format - f, err := os.Open("myvideo.ts") - if err != nil { - panic(err) - } - defer f.Close() - - // setup MPEG-TS parser - r := &mpegts.Reader{R: f} - err = r.Initialize() - if err != nil { - panic(err) - } - - // find the H264 track inside the file - track, err := findTrack(r) - if err != nil { - panic(err) - } - - // setup H264 -> RTP encoder - rtpEnc, err := forma.CreateEncoder() - if err != nil { - panic(err) - } - - randomStart, err := randUint32() - if err != nil { - panic(err) - } - - timeDecoder := mpegts.TimeDecoder{} - timeDecoder.Initialize() - - var firstDTS *int64 - var startTime time.Time - - // setup a callback that is called whenever a H264 access unit is read from the file - r.OnDataH264(track, func(pts, dts int64, au [][]byte) error { - dts = timeDecoder.Decode(dts) - pts = timeDecoder.Decode(pts) - - // sleep between access units - if firstDTS != nil { - timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime) - if timeDrift > 0 { - time.Sleep(timeDrift) - } - } else { - startTime = time.Now() - firstDTS = &dts - } - - log.Printf("writing access unit with pts=%d dts=%d", pts, dts) - - // wrap the access unit into RTP packets - packets, err := rtpEnc.Encode(au) - if err != nil { - return err - } - - // set packet timestamp - // we don't have to perform any conversion - // since H264 clock rate is the same in both MPEG-TS and RTSP - for _, packet := range packets { - packet.Timestamp = uint32(int64(randomStart) + pts) - } - - // write RTP packets to the server - for _, packet := range packets { - err := stream.WritePacketRTP(media, packet) - if err != nil { - return err - } - } - - return nil - }) - - for { - err := r.Read() - if err != nil { - panic(err) - } - } -} - func main() { h := &serverHandler{} @@ -207,16 +223,16 @@ func main() { if err != nil { panic(err) } + defer h.server.Close() // create a RTSP description that contains a H264 format - forma := &format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - } desc := &description.Session{ Medias: []*description.Media{{ - Type: description.MediaTypeVideo, - Formats: []format.Format{forma}, + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, }}, } @@ -229,9 +245,17 @@ func main() { if err != nil { panic(err) } + defer h.stream.Close() - // in a separate routine, read the MPEG-TS file - go readVideoFile(h.stream, desc.Medias[0], forma) + // open a file in MPEG-TS format + f, err := os.Open("myvideo.ts") + if err != nil { + panic(err) + } + defer f.Close() + + // in a separate routine, route frames from file to ServerStream + go routeFrames(f, h.stream) // allow clients to connect h.mutex.Unlock() diff --git a/examples/server-h264-to-disk/main.go b/examples/server-h264-to-disk/main.go index 6d0b6f38..057b416c 100644 --- a/examples/server-h264-to-disk/main.go +++ b/examples/server-h264-to-disk/main.go @@ -57,7 +57,7 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo // called when receiving an ANNOUNCE request. func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { - log.Printf("announce request") + log.Printf("ANNOUNCE request") sh.mutex.Lock() defer sh.mutex.Unlock() @@ -115,7 +115,7 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. }, nil, nil } - log.Printf("setup request") + log.Printf("SETUP request") return &base.Response{ StatusCode: base.StatusOK, @@ -124,7 +124,7 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. // called when receiving a RECORD request. func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { - log.Printf("record request") + log.Printf("RECORD request") // called when receiving a RTP packet ctx.Session.OnPacketRTP(sh.media, sh.format, func(pkt *rtp.Packet) { diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index 5a6bfc8e..dfafb214 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -57,7 +57,7 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo // called when receiving a DESCRIBE request. func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("describe request") + log.Printf("DESCRIBE request") sh.mutex.RLock() defer sh.mutex.RUnlock() @@ -77,7 +77,7 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) ( // called when receiving an ANNOUNCE request. func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { - log.Printf("announce request") + log.Printf("ANNOUNCE request") sh.mutex.RLock() defer sh.mutex.RUnlock() @@ -106,7 +106,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( // called when receiving a SETUP request. func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("setup request") + log.Printf("SETUP request") // SETUP is used by both readers and publishers. In case of publishers, just return StatusOK. if ctx.Session.State() == gortsplib.ServerSessionStatePreRecord { @@ -132,7 +132,7 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. // called when receiving a PLAY request. func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("play request") + log.Printf("PLAY request") return &base.Response{ StatusCode: base.StatusOK, @@ -141,7 +141,7 @@ func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Re // called when receiving a RECORD request. func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { - log.Printf("record request") + log.Printf("RECORD request") // called when receiving a RTP packet ctx.Session.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { diff --git a/examples/server/main.go b/examples/server/main.go index 03b55dc1..f9b0a14a 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -56,7 +56,7 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo // called when receiving a DESCRIBE request. func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("describe request") + log.Printf("DESCRIBE request") sh.mutex.RLock() defer sh.mutex.RUnlock() @@ -76,7 +76,7 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) ( // called when receiving an ANNOUNCE request. func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { - log.Printf("announce request") + log.Printf("ANNOUNCE request") sh.mutex.Lock() defer sh.mutex.Unlock() @@ -105,7 +105,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( // called when receiving a SETUP request. func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("setup request") + log.Printf("SETUP request") // SETUP is used by both readers and publishers. In case of publishers, just return StatusOK. if ctx.Session.State() == gortsplib.ServerSessionStatePreRecord { @@ -131,7 +131,7 @@ func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base. // called when receiving a PLAY request. func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("play request") + log.Printf("PLAY request") return &base.Response{ StatusCode: base.StatusOK, @@ -140,7 +140,7 @@ func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Re // called when receiving a RECORD request. func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { - log.Printf("record request") + log.Printf("RECORD request") // called when receiving a RTP packet ctx.Session.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { diff --git a/go.mod b/go.mod index a6a3180a..453abea9 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/bluenviron/gortsplib/v4 go 1.21.0 require ( + github.com/asticode/go-astits v1.13.0 github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250222132106-205c4f7f3850 github.com/google/uuid v1.6.0 github.com/pion/rtcp v1.2.15 @@ -14,7 +15,6 @@ require ( require ( github.com/asticode/go-astikit v0.30.0 // indirect - github.com/asticode/go-astits v1.13.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect