add server-h264-save-to-disk example

This commit is contained in:
aler9
2022-10-29 16:56:03 +02:00
parent 6e6be34636
commit b3cde905f7
6 changed files with 328 additions and 3 deletions

View File

@@ -75,6 +75,7 @@ Features:
* [client-publish-pause](examples/client-publish-pause/main.go) * [client-publish-pause](examples/client-publish-pause/main.go)
* [server](examples/server/main.go) * [server](examples/server/main.go)
* [server-tls](examples/server-tls/main.go) * [server-tls](examples/server-tls/main.go)
* [server-h264-save-to-disk](examples/server-h264-save-to-disk/main.go)
## API Documentation ## API Documentation

View File

@@ -11,7 +11,7 @@ import (
"github.com/asticode/go-astits" "github.com/asticode/go-astits"
) )
// mpegtsEncoder allows to encode H264 NALUs into MPEG-TS. // mpegtsEncoder allows to save a H264 stream into a MPEG-TS file.
type mpegtsEncoder struct { type mpegtsEncoder struct {
sps []byte sps []byte
pps []byte pps []byte

View File

@@ -0,0 +1,153 @@
package main
import (
"fmt"
"log"
"sync"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
)
// This example shows how to
// 1. create a RTSP server which accepts plain connections
// 2. allow a single client to publish a stream, containing a H264 track, with TCP or UDP
// 3. save the content of the H264 track into a file in MPEG-TS format
type serverHandler struct {
mutex sync.Mutex
stream *gortsplib.ServerStream
h264TrackID int
h264track *gortsplib.TrackH264
mpegtsMuxer *mpegtsEncoder
}
// called when a connection is opened.
func (sh *serverHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("conn opened")
}
// called when a connection is closed.
func (sh *serverHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("conn closed (%v)", ctx.Error)
}
// called when a session is opened.
func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
log.Printf("session opened")
}
// called when a session is closed.
func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
sh.mutex.Lock()
defer sh.mutex.Unlock()
// close the stream and disconnect any reader.
if sh.stream != nil {
sh.stream.Close()
sh.stream = nil
}
}
// called after receiving an ANNOUNCE request.
func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
log.Printf("announce request")
sh.mutex.Lock()
defer sh.mutex.Unlock()
if sh.stream != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("someone is already publishing")
}
// find the H264 track
h264TrackID, h264track := func() (int, *gortsplib.TrackH264) {
for i, track := range ctx.Tracks {
if h264track, ok := track.(*gortsplib.TrackH264); ok {
return i, h264track
}
}
return -1, nil
}()
if h264TrackID < 0 {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("H264 track not found")
}
// setup H264->MPEGTS encoder
mpegtsMuxer, err := newMPEGTSEncoder(h264track.SafeSPS(), h264track.SafePPS())
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, err
}
// create a stream and save data
sh.stream = gortsplib.NewServerStream(ctx.Tracks)
sh.h264TrackID = h264TrackID
sh.mpegtsMuxer = mpegtsMuxer
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
// called after receiving a SETUP request.
func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("setup request")
return &base.Response{
StatusCode: base.StatusOK,
}, sh.stream, nil
}
// called after receiving a RECORD request.
func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
log.Printf("record request")
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
// called after receiving a RTP packet.
func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
sh.mutex.Lock()
defer sh.mutex.Unlock()
if ctx.TrackID != sh.h264TrackID {
return
}
if ctx.H264NALUs == nil {
return
}
// encode H264 NALUs into MPEG-TS
err := sh.mpegtsMuxer.encode(ctx.H264NALUs, ctx.H264PTS)
if err != nil {
return
}
}
func main() {
// configure server
s := &gortsplib.Server{
Handler: &serverHandler{},
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
UDPRTCPAddress: ":8001",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8002,
MulticastRTCPPort: 8003,
}
// start server and wait until a fatal error
log.Printf("server is ready")
panic(s.StartAndWait())
}

View File

@@ -0,0 +1,171 @@
package main
import (
"bufio"
"context"
"log"
"os"
"time"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/asticode/go-astits"
)
// mpegtsEncoder allows to save a H264 stream into a MPEG-TS file.
type mpegtsEncoder struct {
sps []byte
pps []byte
f *os.File
b *bufio.Writer
mux *astits.Muxer
dtsExtractor *h264.DTSExtractor
firstIDRReceived bool
startDTS time.Duration
}
// newMPEGTSEncoder allocates a mpegtsEncoder.
func newMPEGTSEncoder(sps []byte, pps []byte) (*mpegtsEncoder, error) {
f, err := os.Create("mystream.ts")
if err != nil {
return nil, err
}
b := bufio.NewWriter(f)
mux := astits.NewMuxer(context.Background(), b)
mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
mux.SetPCRPID(256)
return &mpegtsEncoder{
sps: sps,
pps: pps,
f: f,
b: b,
mux: mux,
}, nil
}
// close closes all the mpegtsEncoder resources.
func (e *mpegtsEncoder) close() {
e.b.Flush()
e.f.Close()
}
// encode encodes H264 NALUs into MPEG-TS.
func (e *mpegtsEncoder) encode(nalus [][]byte, pts time.Duration) error {
// prepend an AUD. This is required by some players
filteredNALUs := [][]byte{
{byte(h264.NALUTypeAccessUnitDelimiter), 240},
}
nonIDRPresent := false
idrPresent := false
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
e.sps = append([]byte(nil), nalu...)
continue
case h264.NALUTypePPS:
e.pps = append([]byte(nil), nalu...)
continue
case h264.NALUTypeAccessUnitDelimiter:
continue
case h264.NALUTypeIDR:
idrPresent = true
// add SPS and PPS before every IDR
if e.sps != nil && e.pps != nil {
filteredNALUs = append(filteredNALUs, e.sps, e.pps)
}
case h264.NALUTypeNonIDR:
nonIDRPresent = true
}
filteredNALUs = append(filteredNALUs, nalu)
}
if !nonIDRPresent && !idrPresent {
return nil
}
var dts time.Duration
if !e.firstIDRReceived {
// skip samples silently until we find one with a IDR
if !idrPresent {
return nil
}
e.firstIDRReceived = true
e.dtsExtractor = h264.NewDTSExtractor()
var err error
dts, err = e.dtsExtractor.Extract(filteredNALUs, pts)
if err != nil {
return err
}
e.startDTS = dts
dts = 0
pts -= e.startDTS
} else {
var err error
dts, err = e.dtsExtractor.Extract(filteredNALUs, pts)
if err != nil {
return err
}
dts -= e.startDTS
pts -= e.startDTS
}
oh := &astits.PESOptionalHeader{
MarkerBits: 2,
}
if dts == pts {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS
oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}
} else {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent
oh.DTS = &astits.ClockReference{Base: int64(dts.Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}
}
// encode into Annex-B
annexb, err := h264.AnnexBMarshal(filteredNALUs)
if err != nil {
return err
}
// write TS packet
_, err = e.mux.WriteData(&astits.MuxerData{
PID: 256,
AdaptationField: &astits.PacketAdaptationField{
RandomAccessIndicator: idrPresent,
},
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: oh,
StreamID: 224, // video
},
Data: annexb,
},
})
if err != nil {
return err
}
log.Println("wrote TS packet")
return nil
}

View File

@@ -84,7 +84,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
}, fmt.Errorf("someone is already publishing") }, fmt.Errorf("someone is already publishing")
} }
// save the track list and the publisher // create the stream and save the publisher
sh.stream = gortsplib.NewServerStream(ctx.Tracks) sh.stream = gortsplib.NewServerStream(ctx.Tracks)
sh.publisher = ctx.Session sh.publisher = ctx.Session

View File

@@ -83,7 +83,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
}, fmt.Errorf("someone is already publishing") }, fmt.Errorf("someone is already publishing")
} }
// save the track list and the publisher // create the stream and save the publisher
sh.stream = gortsplib.NewServerStream(ctx.Tracks) sh.stream = gortsplib.NewServerStream(ctx.Tracks)
sh.publisher = ctx.Session sh.publisher = ctx.Session