diff --git a/README.md b/README.md index 66dd4b17..0d3b2586 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Features: * [client-publish-pause](examples/client-publish-pause/main.go) * [server](examples/server/main.go) * [server-tls](examples/server-tls/main.go) +* [server-h264-save-to-disk](examples/server-h264-save-to-disk/main.go) ## API Documentation diff --git a/examples/client-read-h264-save-to-disk/mpegtsencoder.go b/examples/client-read-h264-save-to-disk/mpegtsencoder.go index 79937752..b67f9235 100644 --- a/examples/client-read-h264-save-to-disk/mpegtsencoder.go +++ b/examples/client-read-h264-save-to-disk/mpegtsencoder.go @@ -11,7 +11,7 @@ import ( "github.com/asticode/go-astits" ) -// mpegtsEncoder allows to encode H264 NALUs into MPEG-TS. +// mpegtsEncoder allows to save a H264 stream into a MPEG-TS file. type mpegtsEncoder struct { sps []byte pps []byte diff --git a/examples/server-h264-save-to-disk/main.go b/examples/server-h264-save-to-disk/main.go new file mode 100644 index 00000000..300dacf8 --- /dev/null +++ b/examples/server-h264-save-to-disk/main.go @@ -0,0 +1,153 @@ +package main + +import ( + "fmt" + "log" + "sync" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/base" +) + +// This example shows how to +// 1. create a RTSP server which accepts plain connections +// 2. allow a single client to publish a stream, containing a H264 track, with TCP or UDP +// 3. save the content of the H264 track into a file in MPEG-TS format + +type serverHandler struct { + mutex sync.Mutex + stream *gortsplib.ServerStream + h264TrackID int + h264track *gortsplib.TrackH264 + mpegtsMuxer *mpegtsEncoder +} + +// called when a connection is opened. +func (sh *serverHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + log.Printf("conn opened") +} + +// called when a connection is closed. +func (sh *serverHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + log.Printf("conn closed (%v)", ctx.Error) +} + +// called when a session is opened. +func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + log.Printf("session opened") +} + +// called when a session is closed. +func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + log.Printf("session closed") + + sh.mutex.Lock() + defer sh.mutex.Unlock() + + // close the stream and disconnect any reader. + if sh.stream != nil { + sh.stream.Close() + sh.stream = nil + } +} + +// called after receiving an ANNOUNCE request. +func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { + log.Printf("announce request") + + sh.mutex.Lock() + defer sh.mutex.Unlock() + + if sh.stream != nil { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("someone is already publishing") + } + + // find the H264 track + h264TrackID, h264track := func() (int, *gortsplib.TrackH264) { + for i, track := range ctx.Tracks { + if h264track, ok := track.(*gortsplib.TrackH264); ok { + return i, h264track + } + } + return -1, nil + }() + if h264TrackID < 0 { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("H264 track not found") + } + + // setup H264->MPEGTS encoder + mpegtsMuxer, err := newMPEGTSEncoder(h264track.SafeSPS(), h264track.SafePPS()) + if err != nil { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, err + } + + // create a stream and save data + sh.stream = gortsplib.NewServerStream(ctx.Tracks) + sh.h264TrackID = h264TrackID + sh.mpegtsMuxer = mpegtsMuxer + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +// called after receiving a SETUP request. +func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("setup request") + + return &base.Response{ + StatusCode: base.StatusOK, + }, sh.stream, nil +} + +// called after receiving a RECORD request. +func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { + log.Printf("record request") + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +// called after receiving a RTP packet. +func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { + sh.mutex.Lock() + defer sh.mutex.Unlock() + + if ctx.TrackID != sh.h264TrackID { + return + } + + if ctx.H264NALUs == nil { + return + } + + // encode H264 NALUs into MPEG-TS + err := sh.mpegtsMuxer.encode(ctx.H264NALUs, ctx.H264PTS) + if err != nil { + return + } +} + +func main() { + // configure server + s := &gortsplib.Server{ + Handler: &serverHandler{}, + RTSPAddress: ":8554", + UDPRTPAddress: ":8000", + UDPRTCPAddress: ":8001", + MulticastIPRange: "224.1.0.0/16", + MulticastRTPPort: 8002, + MulticastRTCPPort: 8003, + } + + // start server and wait until a fatal error + log.Printf("server is ready") + panic(s.StartAndWait()) +} diff --git a/examples/server-h264-save-to-disk/mpegtsencoder.go b/examples/server-h264-save-to-disk/mpegtsencoder.go new file mode 100644 index 00000000..b67f9235 --- /dev/null +++ b/examples/server-h264-save-to-disk/mpegtsencoder.go @@ -0,0 +1,171 @@ +package main + +import ( + "bufio" + "context" + "log" + "os" + "time" + + "github.com/aler9/gortsplib/pkg/h264" + "github.com/asticode/go-astits" +) + +// mpegtsEncoder allows to save a H264 stream into a MPEG-TS file. +type mpegtsEncoder struct { + sps []byte + pps []byte + + f *os.File + b *bufio.Writer + mux *astits.Muxer + dtsExtractor *h264.DTSExtractor + firstIDRReceived bool + startDTS time.Duration +} + +// newMPEGTSEncoder allocates a mpegtsEncoder. +func newMPEGTSEncoder(sps []byte, pps []byte) (*mpegtsEncoder, error) { + f, err := os.Create("mystream.ts") + if err != nil { + return nil, err + } + b := bufio.NewWriter(f) + + mux := astits.NewMuxer(context.Background(), b) + mux.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 256, + StreamType: astits.StreamTypeH264Video, + }) + mux.SetPCRPID(256) + + return &mpegtsEncoder{ + sps: sps, + pps: pps, + f: f, + b: b, + mux: mux, + }, nil +} + +// close closes all the mpegtsEncoder resources. +func (e *mpegtsEncoder) close() { + e.b.Flush() + e.f.Close() +} + +// encode encodes H264 NALUs into MPEG-TS. +func (e *mpegtsEncoder) encode(nalus [][]byte, pts time.Duration) error { + // prepend an AUD. This is required by some players + filteredNALUs := [][]byte{ + {byte(h264.NALUTypeAccessUnitDelimiter), 240}, + } + + nonIDRPresent := false + idrPresent := false + + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + e.sps = append([]byte(nil), nalu...) + continue + + case h264.NALUTypePPS: + e.pps = append([]byte(nil), nalu...) + continue + + case h264.NALUTypeAccessUnitDelimiter: + continue + + case h264.NALUTypeIDR: + idrPresent = true + + // add SPS and PPS before every IDR + if e.sps != nil && e.pps != nil { + filteredNALUs = append(filteredNALUs, e.sps, e.pps) + } + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } + + filteredNALUs = append(filteredNALUs, nalu) + } + + if !nonIDRPresent && !idrPresent { + return nil + } + + var dts time.Duration + + if !e.firstIDRReceived { + // skip samples silently until we find one with a IDR + if !idrPresent { + return nil + } + + e.firstIDRReceived = true + e.dtsExtractor = h264.NewDTSExtractor() + + var err error + dts, err = e.dtsExtractor.Extract(filteredNALUs, pts) + if err != nil { + return err + } + + e.startDTS = dts + dts = 0 + pts -= e.startDTS + + } else { + var err error + dts, err = e.dtsExtractor.Extract(filteredNALUs, pts) + if err != nil { + return err + } + + dts -= e.startDTS + pts -= e.startDTS + } + + oh := &astits.PESOptionalHeader{ + MarkerBits: 2, + } + + if dts == pts { + oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS + oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)} + } else { + oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent + oh.DTS = &astits.ClockReference{Base: int64(dts.Seconds() * 90000)} + oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)} + } + + // encode into Annex-B + annexb, err := h264.AnnexBMarshal(filteredNALUs) + if err != nil { + return err + } + + // write TS packet + _, err = e.mux.WriteData(&astits.MuxerData{ + PID: 256, + AdaptationField: &astits.PacketAdaptationField{ + RandomAccessIndicator: idrPresent, + }, + PES: &astits.PESData{ + Header: &astits.PESHeader{ + OptionalHeader: oh, + StreamID: 224, // video + }, + Data: annexb, + }, + }) + if err != nil { + return err + } + + log.Println("wrote TS packet") + return nil +} diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index a0abc54c..674c8621 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -84,7 +84,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( }, fmt.Errorf("someone is already publishing") } - // save the track list and the publisher + // create the stream and save the publisher sh.stream = gortsplib.NewServerStream(ctx.Tracks) sh.publisher = ctx.Session diff --git a/examples/server/main.go b/examples/server/main.go index ebc21dd1..680eab66 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -83,7 +83,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( }, fmt.Errorf("someone is already publishing") } - // save the track list and the publisher + // create the stream and save the publisher sh.stream = gortsplib.NewServerStream(ctx.Tracks) sh.publisher = ctx.Session