diff --git a/codec/codec.go b/codec/codec.go index 44dd084..9e25805 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -162,7 +162,7 @@ type ADTSVariableHeader struct { // 所以说number_of_raw_data_blocks_in_frame == 0 表示说ADTS帧中有一个AAC数据块并不是说没有。(一个AAC原始帧包含一段时间内1024个采样及相关数据) -func AudioSpecificConfigToADTS(asc AudioSpecificConfig, rawDataLength int) (adts ADTS, adtsByte []byte, err error) { +func AudioSpecificConfigToADTS(asc *AudioSpecificConfig, rawDataLength int) (adts ADTS, adtsByte []byte, err error) { if asc.ChannelConfiguration > 8 || asc.FrameLengthFlag > 13 { err = errors.New("Reserved field.") return diff --git a/codec/mpegts/mpegts.go b/codec/mpegts/mpegts.go index 58f92ec..2b1b4e4 100644 --- a/codec/mpegts/mpegts.go +++ b/codec/mpegts/mpegts.go @@ -63,11 +63,29 @@ const ( // 0x38 - 0x3F Defined in ISO/IEC 13818-6 // 0x40 - 0xFE User private // 0xFF Forbidden + STREAM_TYPE_VIDEO_MPEG1 = 0x01 + STREAM_TYPE_VIDEO_MPEG2 = 0x02 + STREAM_TYPE_AUDIO_MPEG1 = 0x03 + STREAM_TYPE_AUDIO_MPEG2 = 0x04 + STREAM_TYPE_PRIVATE_SECTIONS = 0x05 + STREAM_TYPE_PRIVATE_DATA = 0x06 + STREAM_TYPE_MHEG = 0x07 - STREAM_TYPE_H264 = 0x1B - STREAM_TYPE_H265 = 0x24 - STREAM_TYPE_AAC = 0x0F + STREAM_TYPE_H264 = 0x1B + STREAM_TYPE_H265 = 0x24 + STREAM_TYPE_AAC = 0x0F + STREAM_TYPE_G711A = 0x90 + STREAM_TYPE_G711U = 0x91 + STREAM_TYPE_G722_1 = 0x92 + STREAM_TYPE_G723_1 = 0x93 + STREAM_TYPE_G726 = 0x94 + STREAM_TYPE_G729 = 0x99 + STREAM_TYPE_ADPCM = 0x11 + STREAM_TYPE_PCM = 0x0A + STREAM_TYPE_AC3 = 0x81 + STREAM_TYPE_DTS = 0x8A + STREAM_TYPE_LPCM = 0x8B // 1110 xxxx // 110x xxxx STREAM_ID_VIDEO = 0xE0 // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC14496-2 video stream number xxxx diff --git a/codec/mpegts/mpegts_crc32.go b/codec/mpegts/mpegts_crc32.go index 9204316..b32d747 100644 --- a/codec/mpegts/mpegts_crc32.go +++ b/codec/mpegts/mpegts_crc32.go @@ -1,5 +1,7 @@ package mpegts +import "net" + // http://www.stmc.edu.hk/~vincent/ffmpeg_0.4.9-pre1/libavformat/mpegtsenc.c var Crc32_Table = []uint32{ @@ -58,3 +60,13 @@ func GetCRC32(data []byte) (crc uint32) { return } + +func GetCRC32_2(data net.Buffers) (crc uint32) { + crc = 0xffffffff + for _, v := range data { + for _, v2 := range v { + crc = (crc << 8) ^ Crc32_Table[((crc>>24)^uint32(v2))&0xff] + } + } + return +} diff --git a/codec/mpegts/mpegts_pes.go b/codec/mpegts/mpegts_pes.go index 84ba074..81b7f3e 100644 --- a/codec/mpegts/mpegts_pes.go +++ b/codec/mpegts/mpegts_pes.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" - "m7s.live/engine/v4/codec" "m7s.live/engine/v4/util" ) @@ -470,126 +469,126 @@ func WritePESPacket(w io.Writer, frame *MpegtsPESFrame, packet MpegTsPESPacket) return } -func IowWritePESPacket(w io.Writer, tsHeader MpegTsHeader, packet MpegTsPESPacket) (err error) { - if packet.Header.PacketStartCodePrefix != 0x000001 { - return errors.New("packetStartCodePrefix != 0x000001") - } +// func IowWritePESPacket(w io.Writer, tsHeader MpegTsHeader, packet MpegTsPESPacket) (err error) { +// if packet.Header.PacketStartCodePrefix != 0x000001 { +// return errors.New("packetStartCodePrefix != 0x000001") +// } - bw := &bytes.Buffer{} +// bw := &bytes.Buffer{} - // TODO:如果头长度大于65536,字段会为0,是否要改? - _, err = WritePESHeader(bw, packet.Header) - if err != nil { - return - } +// // TODO:如果头长度大于65536,字段会为0,是否要改? +// _, err = WritePESHeader(bw, packet.Header) +// if err != nil { +// return +// } - PESPacket := &util.IOVec{} - PESPacket.Append(bw.Bytes()) // header - PESPacket.Append(packet.Payload) // packet +// PESPacket := &util.IOVec{} +// PESPacket.Append(bw.Bytes()) // header +// PESPacket.Append(packet.Payload) // packet - // 用IOVecWriter来写PES包,IOVecWriter实现了Write方法. - // 因为通常在将一帧PES封装成TS包(188字节)的时候,一般情况下一帧PES字节数会大于188,并且分多次封装. - // 例如这一帧PES字节数为189,那么在封装第二个TS包的时候就只会封装1字节,会导致多次写操作,降低性能. - // 因此将所有的字节数,都写进缓冲中去,然后用系统调用syscall来写入. - iow := util.NewIOVecWriter(w) +// // 用IOVecWriter来写PES包,IOVecWriter实现了Write方法. +// // 因为通常在将一帧PES封装成TS包(188字节)的时候,一般情况下一帧PES字节数会大于188,并且分多次封装. +// // 例如这一帧PES字节数为189,那么在封装第二个TS包的时候就只会封装1字节,会导致多次写操作,降低性能. +// // 因此将所有的字节数,都写进缓冲中去,然后用系统调用syscall来写入. +// iow := util.NewIOVecWriter(w) - var isKeyFrame bool - var headerLength int +// var isKeyFrame bool +// var headerLength int - isKeyFrame = CheckPESPacketIsKeyFrame(packet) +// isKeyFrame = CheckPESPacketIsKeyFrame(packet) - // 写一帧PES - // 如果是I帧,会有pcr,所以会有调整字段AF. - // 如果当前包字节不满188字节,会需要填充0xff,所以会有调整字段AF. - for i := 0; PESPacket.Length > 0; i++ { +// // 写一帧PES +// // 如果是I帧,会有pcr,所以会有调整字段AF. +// // 如果当前包字节不满188字节,会需要填充0xff,所以会有调整字段AF. +// for i := 0; PESPacket.Length > 0; i++ { - header := MpegTsHeader{ - SyncByte: 0x47, - Pid: tsHeader.Pid, - AdaptionFieldControl: 1, - ContinuityCounter: byte(i % 15), - } +// header := MpegTsHeader{ +// SyncByte: 0x47, +// Pid: tsHeader.Pid, +// AdaptionFieldControl: 1, +// ContinuityCounter: byte(i % 15), +// } - // 每一帧开头 - if i == 0 { - header.PayloadUnitStartIndicator = 1 - } +// // 每一帧开头 +// if i == 0 { +// header.PayloadUnitStartIndicator = 1 +// } - // I帧 - if isKeyFrame { - header.AdaptionFieldControl = 0x03 - header.AdaptationFieldLength = 7 - header.PCRFlag = 1 - header.RandomAccessIndicator = tsHeader.RandomAccessIndicator - header.ProgramClockReferenceBase = tsHeader.ProgramClockReferenceBase - header.ProgramClockReferenceExtension = tsHeader.ProgramClockReferenceExtension +// // I帧 +// if isKeyFrame { +// header.AdaptionFieldControl = 0x03 +// header.AdaptationFieldLength = 7 +// header.PCRFlag = 1 +// header.RandomAccessIndicator = tsHeader.RandomAccessIndicator +// header.ProgramClockReferenceBase = tsHeader.ProgramClockReferenceBase +// header.ProgramClockReferenceExtension = tsHeader.ProgramClockReferenceExtension - isKeyFrame = false - } +// isKeyFrame = false +// } - // 这个包大小,会在每一次PESPacket.WriteTo中慢慢减少. - packetLength := PESPacket.Length +// // 这个包大小,会在每一次PESPacket.WriteTo中慢慢减少. +// packetLength := PESPacket.Length - // 包不满188字节 - if packetLength < TS_PACKET_SIZE-4 { +// // 包不满188字节 +// if packetLength < TS_PACKET_SIZE-4 { - if header.AdaptionFieldControl >= 2 { - header.AdaptationFieldLength = uint8(TS_PACKET_SIZE - 4 - 1 - packetLength - 7) - } else { - header.AdaptionFieldControl = 0x03 - header.AdaptationFieldLength = uint8(TS_PACKET_SIZE - 4 - 1 - packetLength) - } +// if header.AdaptionFieldControl >= 2 { +// header.AdaptationFieldLength = uint8(TS_PACKET_SIZE - 4 - 1 - packetLength - 7) +// } else { +// header.AdaptionFieldControl = 0x03 +// header.AdaptationFieldLength = uint8(TS_PACKET_SIZE - 4 - 1 - packetLength) +// } - headerLength, err = WriteTsHeader(iow, header) - if err != nil { - return - } +// headerLength, err = WriteTsHeader(iow, header) +// if err != nil { +// return +// } - stuffingLength := int(header.AdaptationFieldLength - 1) - if _, err = iow.Write(util.GetFillBytes(0xff, stuffingLength)); err != nil { - return - } +// stuffingLength := int(header.AdaptationFieldLength - 1) +// if _, err = iow.Write(util.GetFillBytes(0xff, stuffingLength)); err != nil { +// return +// } - headerLength += stuffingLength +// headerLength += stuffingLength - } else { - headerLength, err = WriteTsHeader(iow, header) - if err != nil { - return - } - } +// } else { +// headerLength, err = WriteTsHeader(iow, header) +// if err != nil { +// return +// } +// } - /* - if headerLength, err = writeTsHeader(iow, header, packetLength); err != nil { - return - } - */ +// /* +// if headerLength, err = writeTsHeader(iow, header, packetLength); err != nil { +// return +// } +// */ - payloadLength := 188 - headerLength +// payloadLength := 188 - headerLength - // 写PES负载 - if _, err = PESPacket.WriteTo(iow, payloadLength); err != nil { - return - } - } +// // 写PES负载 +// if _, err = PESPacket.WriteTo(iow, payloadLength); err != nil { +// return +// } +// } - iow.Flush() +// iow.Flush() - return -} +// return +// } -func CheckPESPacketIsKeyFrame(packet MpegTsPESPacket) bool { +// func CheckPESPacketIsKeyFrame(packet MpegTsPESPacket) bool { - nalus := bytes.SplitN(packet.Payload, codec.NALU_Delimiter1, -1) +// nalus := bytes.SplitN(packet.Payload, codec.NALU_Delimiter1, -1) - for _, v := range nalus { - if codec.H264NALUType.ParseBytes(codec.NALU_IDR_Picture, v) == codec.NALU_IDR_Picture { - return true - } - } +// for _, v := range nalus { +// if codec.H264NALUType.ParseBytes(codec.NALU_IDR_Picture, v) == codec.NALU_IDR_Picture { +// return true +// } +// } - return false -} +// return false +// } func TsToPES(tsPkts []MpegTsPacket) (pesPkt MpegTsPESPacket, err error) { var index int @@ -712,7 +711,7 @@ func PESToTs(frame *MpegtsPESFrame, packet MpegTsPESPacket) (tsPkts []byte, err } if tsStuffingLength > 0 { - if _, err = bwTsHeader.Write(util.GetFillBytes(0xff, int(tsStuffingLength))); err != nil { + if _, err = bwTsHeader.Write(stuffing[:tsStuffingLength]); err != nil { return } } diff --git a/codec/mpegts/mpegts_pmt.go b/codec/mpegts/mpegts_pmt.go index 75d59a0..db45bf8 100644 --- a/codec/mpegts/mpegts_pmt.go +++ b/codec/mpegts/mpegts_pmt.go @@ -2,74 +2,31 @@ package mpegts import ( "bytes" - "errors" - "fmt" "io" + "net" + "m7s.live/engine/v4/codec" "m7s.live/engine/v4/util" ) -// Stuffing 157 bytes -var stuffing = []byte{ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, -} - // ios13818-1-CN.pdf 46(60)-153(167)/page // // PMT -var H264PMTPacket = []byte{ - // TS Header - 0x47, 0x41, 0x00, 0x10, - // Pointer Field - 0x00, - // PSI - 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, - // PMT - 0xe1, 0x00, - 0xf0, 0x00, - // H264 - STREAM_TYPE_H264, 0xe1, 0x01, 0xf0, 0x00, - STREAM_TYPE_AAC, 0xe1, 0x02, 0xf0, 0x00, -} - -var H265PMTPacket = []byte{ - /* TS */ - 0x47, 0x41, 0x00, 0x10, - 0x00, - /* PSI */ - 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, - /* PMT */ - 0xe1, 0x00, - 0xf0, 0x00, - STREAM_TYPE_H265, 0xe1, 0x01, 0xf0, 0x00, - STREAM_TYPE_AAC, 0xe1, 0x02, 0xf0, 0x00, -} - +var ( + TSHeader = []byte{0x47, 0x40, 0x00, 0x10, 0x00} + PSI = []byte{0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00} + PMT = []byte{0xe1, 0x01, 0xf0, 0x00} //PID:0x100 + h264 = []byte{STREAM_TYPE_H264, 0xe1, 0x01, 0xf0, 0x00} + h265 = []byte{STREAM_TYPE_H265, 0xe1, 0x01, 0xf0, 0x00} + aac = []byte{STREAM_TYPE_AAC, 0xe1, 0x02, 0xf0, 0x00} + pcma = []byte{STREAM_TYPE_G711A, 0xe1, 0x02, 0xf0, 0x00} + pcmu = []byte{STREAM_TYPE_G711U, 0xe1, 0x02, 0xf0, 0x00} + stuffing []byte +) func init() { - crc := make([]byte, 4) - util.PutBE(crc, GetCRC32(H264PMTPacket[5:])) - H264PMTPacket = append(H264PMTPacket, crc...) - H264PMTPacket = append(H264PMTPacket, stuffing...) - util.PutBE(crc, GetCRC32(H265PMTPacket[5:])) - H265PMTPacket = append(H265PMTPacket, crc...) - H265PMTPacket = append(H265PMTPacket, stuffing...) + stuffing = util.GetFillBytes(0xff, TS_PACKET_SIZE) } // TS Header : @@ -349,35 +306,64 @@ func WritePMT(w io.Writer, pmt MpegTsPMT) (err error) { return } -func WritePMTPacket(w io.Writer, tsHeader []byte, pmt MpegTsPMT) (err error) { - if pmt.TableID != TABLE_TSPMS { - err = errors.New("PMT table ID error") - return +// func WritePMTPacket(w io.Writer, tsHeader []byte, pmt MpegTsPMT) (err error) { +// if pmt.TableID != TABLE_TSPMS { +// err = errors.New("PMT table ID error") +// return +// } + +// // 将所有要写的数据(PMT),全部放入到buffer中去. +// // buffer 里面已经写好了整个PMT表(PointerField+PSI+PMT+CRC) +// bw := &bytes.Buffer{} +// if err = WritePMT(bw, pmt); err != nil { +// return +// } + +// // TODO:如果Pmt.Stream里面包含的信息很大,大于188? +// stuffingBytes := util.GetFillBytes(0xff, TS_PACKET_SIZE-4-bw.Len()) + +// var PMTPacket []byte +// PMTPacket = append(PMTPacket, tsHeader...) +// PMTPacket = append(PMTPacket, bw.Bytes()...) +// PMTPacket = append(PMTPacket, stuffingBytes...) + +// fmt.Println("-------------------------") +// fmt.Println("Write PMT :", PMTPacket) +// fmt.Println("-------------------------") + +// // 写PMT负载 +// if _, err = w.Write(PMTPacket); err != nil { +// return +// } + +// return +// } + +func WritePMTPacket(w io.Writer, videoCodec codec.VideoCodecID, audioCodec codec.AudioCodecID) { + w.Write(TSHeader) + pmt := net.Buffers{PSI, PMT} + pmtlen := len(PSI) + len(PMT) + switch videoCodec { + case codec.CodecID_H264: + pmt = append(pmt, h264) + pmtlen += 5 + case codec.CodecID_H265: + pmt = append(pmt, h265) + pmtlen += 5 } - - // 将所有要写的数据(PMT),全部放入到buffer中去. - // buffer 里面已经写好了整个PMT表(PointerField+PSI+PMT+CRC) - bw := &bytes.Buffer{} - if err = WritePMT(bw, pmt); err != nil { - return + switch audioCodec { + case codec.CodecID_AAC: + pmt = append(pmt, aac) + pmtlen += 5 + case codec.CodecID_PCMA: + pmt = append(pmt, pcma) + pmtlen += 5 + case codec.CodecID_PCMU: + pmt = append(pmt, pcmu) + pmtlen += 5 } - - // TODO:如果Pmt.Stream里面包含的信息很大,大于188? - stuffingBytes := util.GetFillBytes(0xff, TS_PACKET_SIZE-4-bw.Len()) - - var PMTPacket []byte - PMTPacket = append(PMTPacket, tsHeader...) - PMTPacket = append(PMTPacket, bw.Bytes()...) - PMTPacket = append(PMTPacket, stuffingBytes...) - - fmt.Println("-------------------------") - fmt.Println("Write PMT :", PMTPacket) - fmt.Println("-------------------------") - - // 写PMT负载 - if _, err = w.Write(PMTPacket); err != nil { - return - } - - return + crc := make([]byte, 4) + util.PutBE(crc, GetCRC32_2(pmt)) + pmt = append(pmt, crc, stuffing[:TS_PACKET_SIZE-pmtlen-5-4]) + pmt.WriteTo(w) } diff --git a/common/frame.go b/common/frame.go index 0fc1743..a00b095 100644 --- a/common/frame.go +++ b/common/frame.go @@ -114,6 +114,7 @@ type DataFrame[T any] struct { type AVFrame[T RawSlice] struct { BaseFrame IFrame bool + SEI T PTS uint32 DTS uint32 AVCC net.Buffers `json:"-"` // 打包好的AVCC格式 diff --git a/common/index.go b/common/index.go index ea93b28..76b2fc2 100644 --- a/common/index.go +++ b/common/index.go @@ -15,7 +15,7 @@ type TimelineData[T any] struct { type Base struct { Name string Stream IStream `json:"-"` - Attached byte // 0代表准备好后自动attach,1代表已经attach,2代表已经detach + Attached byte // 0代表准备好后自动attach,1代表已经attach,2代表已经detach ts time.Time bytes int frames int @@ -90,4 +90,5 @@ type AudioTrack interface { PreFrame() *AVFrame[AudioSlice] WriteSlice(AudioSlice) WriteADTS([]byte) + WriteRaw(uint32, AudioSlice) } diff --git a/publisher.go b/publisher.go index ee7de32..d380ea7 100644 --- a/publisher.go +++ b/publisher.go @@ -170,6 +170,14 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) { if t.AudioTrack == nil { t.AudioTrack = track.NewAAC(t.Publisher.Stream) } + case mpegts.STREAM_TYPE_G711A: + if t.AudioTrack == nil { + t.AudioTrack = track.NewG711(t.Publisher.Stream, true) + } + case mpegts.STREAM_TYPE_G711U: + if t.AudioTrack == nil { + t.AudioTrack = track.NewG711(t.Publisher.Stream, false) + } default: t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType)) } @@ -182,28 +190,34 @@ func (t *TSPublisher) OnPES(pes mpegts.MpegTsPESPacket) { switch pes.Header.StreamID & 0xF0 { case mpegts.STREAM_ID_AUDIO: if t.AudioTrack != nil { - if t.adts == nil { - t.adts = append(t.adts, pes.Payload[:7]...) - t.AudioTrack.WriteADTS(t.adts) - } - current := t.AudioTrack.CurrentFrame() - current.PTS = uint32(pes.Header.Pts) - current.DTS = uint32(pes.Header.Dts) - remainLen := len(pes.Payload) - current.BytesIn += remainLen - for remainLen > 0 { - // AACFrameLength(13) - // xx xxxxxxxx xxx - frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5) - if frameLen > remainLen { - break + switch t.AudioTrack.(type) { + case *track.AAC: + if t.adts == nil { + t.adts = append(t.adts, pes.Payload[:7]...) + t.AudioTrack.WriteADTS(t.adts) } + current := t.AudioTrack.CurrentFrame() + current.PTS = uint32(pes.Header.Pts) + current.DTS = uint32(pes.Header.Dts) + remainLen := len(pes.Payload) + current.BytesIn += remainLen + for remainLen > 0 { + // AACFrameLength(13) + // xx xxxxxxxx xxx + frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5) + if frameLen > remainLen { + break + } - t.AudioTrack.WriteSlice(pes.Payload[7:frameLen]) - pes.Payload = pes.Payload[frameLen:remainLen] - remainLen -= frameLen - t.AudioTrack.Flush() + t.AudioTrack.WriteSlice(pes.Payload[7:frameLen]) + pes.Payload = pes.Payload[frameLen:remainLen] + remainLen -= frameLen + t.AudioTrack.Flush() + } + case *track.G711: + t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload) } + } case mpegts.STREAM_ID_VIDEO: if t.VideoTrack != nil { diff --git a/stream.go b/stream.go index 301407c..3ed4bc4 100644 --- a/stream.go +++ b/stream.go @@ -482,7 +482,7 @@ func (s *Stream) run() { if dts := io.Args.Get("dts"); dts != "" { waits.data.Wait(strings.Split(dts, ",")...) } else { - waits.data.Wait() + // waits.data.Wait() } io.Stream = s io.StartTime = time.Now() diff --git a/subscriber.go b/subscriber.go index 0099277..1ffee56 100644 --- a/subscriber.go +++ b/subscriber.go @@ -46,8 +46,11 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) { func copyBuffers(b net.Buffers) (r net.Buffers) { return append(r, b...) } - func (v *VideoFrame) GetAnnexB() (r net.Buffers) { + if v.SEI != nil { + r = append(r, codec.NALU_Delimiter2) + r = append(r, v.SEI...) + } r = append(r, codec.NALU_Delimiter2) for i, nalu := range v.Raw { if i > 0 { @@ -57,7 +60,6 @@ func (v *VideoFrame) GetAnnexB() (r net.Buffers) { } return } - func (v VideoDeConf) GetAnnexB() (r net.Buffers) { for _, nalu := range v.Raw { r = append(r, codec.NALU_Delimiter2, nalu) diff --git a/track/audio.go b/track/audio.go index 8094bdb..cd03df5 100644 --- a/track/audio.go +++ b/track/audio.go @@ -85,6 +85,18 @@ func (a *Audio) WriteADTS(adts []byte) { a.Attach() } +func (av *Audio) WriteRaw(pts uint32, raw AudioSlice) { + curValue := &av.Value + curValue.BytesIn += len(raw) + if len(av.AVCCHead) == 2 { + raw = raw[7:] //AAC 去掉7个字节的ADTS头 + } + av.WriteSlice(raw) + curValue.DTS = pts + curValue.PTS = pts + av.Flush() +} + func (av *Audio) WriteAVCC(ts uint32, frame AVCCFrame) { curValue := &av.AVRing.RingBuffer.Value curValue.BytesIn += len(frame) diff --git a/track/base.go b/track/base.go index cde9c81..129b019 100644 --- a/track/base.go +++ b/track/base.go @@ -4,6 +4,7 @@ import ( "context" "net" "time" + "unsafe" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" @@ -45,6 +46,7 @@ type Media[T RawSlice] struct { Base AVRing[T] SampleRate uint32 + SSRC uint32 DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) // util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 RTPMuxer @@ -52,6 +54,11 @@ type Media[T RawSlice] struct { 流速控制 } +func (av *Media[T]) Init(n int) { + av.AVRing.Init(n) + av.SSRC = uint32(uintptr(unsafe.Pointer(av))) +} + func (av *Media[T]) LastWriteTime() time.Time { return av.AVRing.RingBuffer.LastValue.Timestamp } @@ -144,7 +151,7 @@ func (av *Media[T]) PacketizeRTP(payloads ...net.Buffers) { packet.Version = 2 packet.PayloadType = av.DecoderConfiguration.PayloadType packet.Payload = make([]byte, 0, 1200) - packet.SSRC = av.Stream.SSRC() + packet.SSRC = av.SSRC } packet.Payload = packet.Payload[:0] packet.SequenceNumber = av.rtpSequence diff --git a/track/h264.go b/track/h264.go index 700ad1c..f3fa03a 100644 --- a/track/h264.go +++ b/track/h264.go @@ -66,17 +66,16 @@ func (vt *H264) WriteSlice(slice NALUSlice) { vt.Video.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(vt.Video.DecoderConfiguration.AVCC, 0) vt.Video.DecoderConfiguration.Seq++ case codec.NALU_IDR_Picture: - vt.Video.Media.RingBuffer.Value.IFrame = true - if vt.sei != nil { - vt.Video.WriteSlice(vt.sei) - vt.sei = nil - } + vt.Value.IFrame = true vt.Video.WriteSlice(slice) - case codec.NALU_Non_IDR_Picture: - vt.Video.Media.RingBuffer.Value.IFrame = false + case codec.NALU_Non_IDR_Picture, + codec.NALU_Data_Partition_A, + codec.NALU_Data_Partition_B, + codec.NALU_Data_Partition_C: + vt.Value.IFrame = false vt.Video.WriteSlice(slice) case codec.NALU_SEI: - vt.sei = slice + vt.Value.SEI = slice } } @@ -164,9 +163,8 @@ func (vt *H264) Flush() { if vt.ComplementRTP() { var out []net.Buffers if vt.Value.IFrame { - out = append(out, net.Buffers(vt.DecoderConfiguration.Raw)) + out = append(out, net.Buffers{vt.DecoderConfiguration.Raw[0]}, net.Buffers{vt.DecoderConfiguration.Raw[1]}) } - for _, nalu := range vt.Value.Raw { buffers := util.SplitBuffers(nalu, 1200) firstBuffer := NALUSlice(buffers[0]) diff --git a/track/h265.go b/track/h265.go index e0c8609..1d79840 100644 --- a/track/h265.go +++ b/track/h265.go @@ -66,16 +66,12 @@ func (vt *H265) WriteSlice(slice NALUSlice) { codec.NAL_UNIT_CODED_SLICE_IDR_N_LP, codec.NAL_UNIT_CODED_SLICE_CRA: vt.Value.IFrame = true - if vt.sei != nil { - vt.Video.WriteSlice(vt.sei) - vt.sei = nil - } vt.Video.WriteSlice(slice) case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9: vt.Value.IFrame = false vt.Video.WriteSlice(slice) case codec.NAL_UNIT_SEI: - vt.sei = slice + vt.Value.SEI = slice default: vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(slice.H265Type()))) } @@ -172,7 +168,7 @@ func (vt *H265) Flush() { if vt.ComplementRTP() { var out []net.Buffers if vt.Value.IFrame { - out = append(out, net.Buffers(vt.DecoderConfiguration.Raw)) + out = append(out, net.Buffers{vt.DecoderConfiguration.Raw[0]}, net.Buffers{vt.DecoderConfiguration.Raw[1]}, net.Buffers{vt.DecoderConfiguration.Raw[2]}) } for _, nalu := range vt.Video.Media.RingBuffer.Value.Raw { buffers := util.SplitBuffers(nalu, 1200) diff --git a/track/video.go b/track/video.go index d43b86a..e642135 100644 --- a/track/video.go +++ b/track/video.go @@ -22,7 +22,6 @@ type Video struct { idrCount int //缓存中包含的idr数量 dcChanged bool //解码器配置是否改变了,一般由于变码率导致 dtsEst *DTSEstimator - sei NALUSlice } func (vt *Video) SnapForJson() { @@ -72,6 +71,10 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e data = append(data, codec.NALU_Delimiter2, nalu) } } + if vp.SEI != nil { + data = append(data, codec.NALU_Delimiter2) + data = append(data, vp.SEI...) + } data = append(data, codec.NALU_Delimiter2) for i, nalu := range vp.Raw { if i > 0 {