Files
engine/memory-ts.go

193 lines
5.8 KiB
Go

package engine
import (
"errors"
"fmt"
"io"
"net"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/util"
)
type MemoryTs struct {
util.BytesPool
PMT util.Buffer
util.BLL
}
func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID) {
ts.PMT.Reset()
mpegts.WritePMTPacket(&ts.PMT, video, audio)
}
func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error) {
w.Write(mpegts.DefaultPATPacket)
w.Write(ts.PMT)
return ts.BLL.WriteTo(w)
}
func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error) {
if packet.Header.PacketStartCodePrefix != 0x000001 {
err = errors.New("packetStartCodePrefix != 0x000001")
return
}
pesHeadItem := ts.Get(32)
pesHeadItem.Value.Reset()
_, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header)
if err != nil {
return
}
pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...)
defer pesHeadItem.Recycle()
pesPktLength := util.SizeOfBuffers(pesBuffers)
buffer := ts.Get((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength)
bwTsHeader := &buffer.Value
bigLen := bwTsHeader.Len()
bwTsHeader.Reset()
ts.BLL.Push(buffer)
var tsHeaderLength int
for i := 0; len(pesBuffers) > 0; i++ {
if bigLen < mpegts.TS_PACKET_SIZE {
if i == 0 {
ts.Recycle()
}
headerItem := ts.Get(mpegts.TS_PACKET_SIZE)
ts.BLL.Push(headerItem)
bwTsHeader = &headerItem.Value
bwTsHeader.Reset()
}
bigLen -= mpegts.TS_PACKET_SIZE
pesPktLength = util.SizeOfBuffers(pesBuffers)
tsHeader := mpegts.MpegTsHeader{
SyncByte: 0x47,
TransportErrorIndicator: 0,
PayloadUnitStartIndicator: 0,
TransportPriority: 0,
Pid: frame.Pid,
TransportScramblingControl: 0,
AdaptionFieldControl: 1,
ContinuityCounter: frame.ContinuityCounter,
}
frame.ContinuityCounter++
frame.ContinuityCounter = frame.ContinuityCounter % 16
// 每一帧的开头,当含有pcr的时候,包含调整字段
if i == 0 {
tsHeader.PayloadUnitStartIndicator = 1
// 当PCRFlag为1的时候,包含调整字段
if frame.IsKeyFrame {
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = 7
tsHeader.PCRFlag = 1
tsHeader.RandomAccessIndicator = 1
tsHeader.ProgramClockReferenceBase = frame.ProgramClockReferenceBase
}
}
// 每一帧的结尾,当不满足188个字节的时候,包含调整字段
if pesPktLength < mpegts.TS_PACKET_SIZE-4 {
var tsStuffingLength uint8
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = uint8(mpegts.TS_PACKET_SIZE - 4 - 1 - pesPktLength)
// TODO:如果第一个TS包也是最后一个TS包,是不是需要考虑这个情况?
// MpegTsHeader最少占6个字节.(前4个走字节 + AdaptationFieldLength(1 byte) + 3个指示符5个标志位(1 byte))
if tsHeader.AdaptationFieldLength >= 1 {
tsStuffingLength = tsHeader.AdaptationFieldLength - 1
} else {
tsStuffingLength = 0
}
// error
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
if tsStuffingLength > 0 {
if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
return
}
}
tsHeaderLength += int(tsStuffingLength)
} else {
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
}
tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength
//fmt.Println("tsPayloadLength :", tsPayloadLength)
// 这里不断的减少PES包
io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength))
// tmp := tsHeaderByte[3] << 2
// tmp = tmp >> 6
// if tmp == 2 {
// fmt.Println("fuck you mother.")
// }
tsPktByteLen := bwTsHeader.Len()
if tsPktByteLen != (i+1)*mpegts.TS_PACKET_SIZE && tsPktByteLen != mpegts.TS_PACKET_SIZE {
err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen))
return
}
}
return nil
}
func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) {
// packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts)
var packet mpegts.MpegTsPESPacket
if frame.CodecID == codec.CodecID_AAC {
packet.Header.PesPacketLength = uint16(7 + frame.AUList.ByteLength + 8)
packet.Buffers = frame.GetADTS()
} else {
packet.Header.PesPacketLength = uint16(frame.AUList.ByteLength + 8)
packet.Buffers = frame.AUList.ToBuffers()
}
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_AUDIO
packet.Header.Pts = uint64(frame.PTS)
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.PtsDtsFlags = 0x80
packet.Header.PesHeaderDataLength = 5
return ts.WritePESPacket(pes, packet)
}
func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) {
var buffer net.Buffers
//需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息.
if len(frame.ParamaterSets) == 2 {
buffer = append(buffer, codec.NALU_AUD_BYTE)
} else {
buffer = append(buffer, codec.AudNalu)
}
buffer = append(buffer, frame.GetAnnexB()...)
pktLength := util.SizeOfBuffers(buffer) + 10 + 3
if pktLength > 0xffff {
pktLength = 0
}
var packet mpegts.MpegTsPESPacket
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_VIDEO
packet.Header.PesPacketLength = uint16(pktLength)
packet.Header.Pts = uint64(frame.PTS)
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.Dts = uint64(frame.DTS)
packet.Header.PtsDtsFlags = 0xC0
packet.Header.PesHeaderDataLength = 10
packet.Buffers = buffer
return ts.WritePESPacket(pes, packet)
}