diff --git a/examples/client-read-save-to-disk/main.go b/examples/client-read-save-to-disk/main.go index 03432794..c1342bea 100644 --- a/examples/client-read-save-to-disk/main.go +++ b/examples/client-read-save-to-disk/main.go @@ -67,7 +67,6 @@ func main() { dec := rtph264.NewDecoder() dtsEst := h264.NewDTSEstimator() firstPacketWritten := false - var naluBuffer [][]byte var startPTS time.Duration // add an H264 track to the MPEG-TS muxer @@ -94,91 +93,83 @@ func main() { return } - // convert RTP packets into H264 NALUs - nalus, pts, err := dec.DecodeRTP(&pkt) + // decode H264 NALUs from RTP packets + nalus, pts, err := dec.DecodeRTPUntilMarker(&pkt) if err != nil { return } - // store NALUs in a buffer until a packet with the Marker flag is received - naluBuffer = append(naluBuffer, nalus...) - - // RTP marker means that all the NALUs with the same PTS have been received. - if pkt.Marker { - if !firstPacketWritten { - firstPacketWritten = true - startPTS = pts - } - - // check whether there's an IDR - idrPresent := func() bool { - for _, nalu := range naluBuffer { - typ := h264.NALUType(nalu[0] & 0x1F) - if typ == h264.NALUTypeIDR { - return true - } - } - return false - }() - - // prepend an AUD. This is required by some players - filteredNALUs := [][]byte{ - {byte(h264.NALUTypeAccessUnitDelimiter), 240}, - } - - for _, nalu := range naluBuffer { - // remove existing SPS, PPS, AUD - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: - continue - } - - // add SPS and PPS before every IDR - if typ == h264.NALUTypeIDR { - filteredNALUs = append(filteredNALUs, h264Conf.SPS) - filteredNALUs = append(filteredNALUs, h264Conf.PPS) - } - - filteredNALUs = append(filteredNALUs, nalu) - } - - naluBuffer = nil - - // encode into Annex-B - enc, err := h264.EncodeAnnexB(filteredNALUs) - if err != nil { - panic(err) - } - - dts := dtsEst.Feed(pts - startPTS) - pts = pts - startPTS - - // write TS packet - _, err = mux.WriteData(&astits.MuxerData{ - PID: 256, - AdaptationField: &astits.PacketAdaptationField{ - RandomAccessIndicator: idrPresent, - }, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent, - DTS: &astits.ClockReference{Base: int64(dts.Seconds() * 90000)}, - PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}, - }, - StreamID: 224, // = video - }, - Data: enc, - }, - }) - if err != nil { - panic(err) - } - - fmt.Println("wrote ts packet") + if !firstPacketWritten { + firstPacketWritten = true + startPTS = pts } + + // check whether there's an IDR + idrPresent := func() bool { + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + if typ == h264.NALUTypeIDR { + return true + } + } + return false + }() + + // prepend an AUD. This is required by some players + filteredNALUs := [][]byte{ + {byte(h264.NALUTypeAccessUnitDelimiter), 240}, + } + + for _, nalu := range nalus { + // remove existing SPS, PPS, AUD + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: + continue + } + + // add SPS and PPS before every IDR + if typ == h264.NALUTypeIDR { + filteredNALUs = append(filteredNALUs, h264Conf.SPS) + filteredNALUs = append(filteredNALUs, h264Conf.PPS) + } + + filteredNALUs = append(filteredNALUs, nalu) + } + + // encode into Annex-B + enc, err := h264.EncodeAnnexB(filteredNALUs) + if err != nil { + panic(err) + } + + dts := dtsEst.Feed(pts - startPTS) + pts = pts - startPTS + + // write TS packet + _, err = mux.WriteData(&astits.MuxerData{ + PID: 256, + AdaptationField: &astits.PacketAdaptationField{ + RandomAccessIndicator: idrPresent, + }, + PES: &astits.PESData{ + Header: &astits.PESHeader{ + OptionalHeader: &astits.PESOptionalHeader{ + MarkerBits: 2, + PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent, + DTS: &astits.ClockReference{Base: int64(dts.Seconds() * 90000)}, + PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}, + }, + StreamID: 224, // = video + }, + Data: enc, + }, + }) + if err != nil { + panic(err) + } + + fmt.Println("wrote ts packet") }) panic(err) } diff --git a/pkg/rtph264/decoder.go b/pkg/rtph264/decoder.go index 44d572c8..1ca400f5 100644 --- a/pkg/rtph264/decoder.go +++ b/pkg/rtph264/decoder.go @@ -42,7 +42,10 @@ type Decoder struct { // for Decode() startingPacketReceived bool isDecodingFragmented bool - fragmentedBuf []byte + fragmentedBuffer []byte + + // for DecodeUntilMarker() + naluBuffer [][]byte } // NewDecoder allocates a Decoder. @@ -67,7 +70,7 @@ func (d *Decoder) Decode(byts []byte) ([][]byte, time.Duration, error) { return d.DecodeRTP(&pkt) } -// DecodeRTP decodes NALUs from a rtp.Packet. +// DecodeRTP decodes NALUs from a RTP/H264 packet. func (d *Decoder) DecodeRTP(pkt *rtp.Packet) ([][]byte, time.Duration, error) { if !d.isDecodingFragmented { if !d.initialTsSet { @@ -129,7 +132,7 @@ func (d *Decoder) DecodeRTP(pkt *rtp.Packet) ([][]byte, time.Duration, error) { nri := (pkt.Payload[0] >> 5) & 0x03 typ := pkt.Payload[1] & 0x1F - d.fragmentedBuf = append([]byte{(nri << 5) | typ}, pkt.Payload[2:]...) + d.fragmentedBuffer = append([]byte{(nri << 5) | typ}, pkt.Payload[2:]...) d.isDecodingFragmented = true d.startingPacketReceived = true @@ -165,7 +168,7 @@ func (d *Decoder) DecodeRTP(pkt *rtp.Packet) ([][]byte, time.Duration, error) { return nil, 0, fmt.Errorf("invalid FU-A packet (decoded two starting packets in a row)") } - d.fragmentedBuf = append(d.fragmentedBuf, pkt.Payload[2:]...) + d.fragmentedBuffer = append(d.fragmentedBuffer, pkt.Payload[2:]...) if end != 1 { return nil, 0, ErrMorePacketsNeeded @@ -173,7 +176,28 @@ func (d *Decoder) DecodeRTP(pkt *rtp.Packet) ([][]byte, time.Duration, error) { d.isDecodingFragmented = false d.startingPacketReceived = true - return [][]byte{d.fragmentedBuf}, d.decodeTimestamp(pkt.Timestamp), nil + return [][]byte{d.fragmentedBuffer}, d.decodeTimestamp(pkt.Timestamp), nil +} + +// DecodeRTPUntilMarker decodes NALUs from a RTP/H264 packet and puts them in a buffer. +// When a packet has the marker flag (meaning that all the NALUs with the same PTS have +// been received), the buffer is returned. +func (d *Decoder) DecodeRTPUntilMarker(pkt *rtp.Packet) ([][]byte, time.Duration, error) { + nalus, pts, err := d.DecodeRTP(pkt) + if err != nil { + return nil, 0, err + } + + d.naluBuffer = append(d.naluBuffer, nalus...) + + if !pkt.Marker { + return nil, 0, ErrMorePacketsNeeded + } + + ret := d.naluBuffer + d.naluBuffer = d.naluBuffer[:0] + + return ret, pts, nil } // ReadSPSPPS reads RTP/H264 packets from a reader until SPS and PPS are