修复订阅时如果无音频时首个视频帧没有被订阅的bug

引入v3版的ps解包逻辑
修复非rtmp系的首个绝对时间戳初始化
This commit is contained in:
dexter
2023-03-07 20:26:56 +08:00
parent ff53746419
commit e92fd1edfd
16 changed files with 854 additions and 43 deletions

112
codec/mpegps/buffer.go Normal file
View File

@@ -0,0 +1,112 @@
package mpegps
import (
"encoding/binary"
"errors"
"io"
)
type IOBuffer struct {
buf []byte // contents are the bytes buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
}
func (b *IOBuffer) Next(n int) []byte {
m := b.Len()
if n > m {
n = m
}
data := b.buf[b.off : b.off+n]
b.off += n
return data
}
func (b *IOBuffer) Uint16() (uint16, error) {
if b.Len() > 1 {
return binary.BigEndian.Uint16(b.Next(2)), nil
}
return 0, io.EOF
}
func (b *IOBuffer) Skip(n int) (err error) {
_, err = b.ReadN(n)
return
}
func (b *IOBuffer) Uint32() (uint32, error) {
if b.Len() > 3 {
return binary.BigEndian.Uint32(b.Next(4)), nil
}
return 0, io.EOF
}
func (b *IOBuffer) ReadN(length int) ([]byte, error) {
if b.Len() >= length {
return b.Next(length), nil
}
return nil, io.EOF
}
//func (b *IOBuffer) Read(buf []byte) (n int, err error) {
// var ret []byte
// ret, err = b.ReadN(len(buf))
// copy(buf, ret)
// return len(ret), err
//}
// empty reports whether the unread portion of the buffer is empty.
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }
func (b *IOBuffer) ReadByte() (byte, error) {
if b.empty() {
// Buffer is empty, reset to recover space.
b.Reset()
return 0, io.EOF
}
c := b.buf[b.off]
b.off++
return c, nil
}
func (b *IOBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
}
func (b *IOBuffer) Len() int { return len(b.buf) - b.off }
// tryGrowByReslice is a inlineable version of grow for the fast-case where the
// internal buffer only needs to be resliced.
// It returns the index where bytes should be written and whether it succeeded.
func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
if l := len(b.buf); n <= cap(b.buf)-l {
b.buf = b.buf[:l+n]
return l, true
}
return 0, false
}
var ErrTooLarge = errors.New("IOBuffer: too large")
func (b *IOBuffer) Write(p []byte) (n int, err error) {
l := copy(b.buf, b.buf[b.off:])
b.buf = append(b.buf[:l], p...)
b.off = 0
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
return len(p), nil
// defer func() {
// if recover() != nil {
// panic(ErrTooLarge)
// }
// }()
// l := len(p)
// oldLen := len(b.buf)
// m, ok := b.tryGrowByReslice(l)
// if !ok {
// m = oldLen - b.off
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
// b.off = 0
// b.buf = buf
// }
// return copy(b.buf[m:], p), nil
}

250
codec/mpegps/demuxer_v3.go Normal file
View File

@@ -0,0 +1,250 @@
package mpegps
import (
"errors"
"m7s.live/engine/v4/util"
)
var (
ErrNotFoundStartCode = errors.New("not found the need start code flag")
ErrMarkerBit = errors.New("marker bit value error")
ErrFormatPack = errors.New("not package standard")
ErrParsePakcet = errors.New("parse ps packet error")
)
/*
This implement from VLC source code
notes: https://github.com/videolan/vlc/blob/master/modules/mux/mpeg/bits.h
*/
/*
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
*/
type DecPSPackage struct {
systemClockReferenceBase uint64
systemClockReferenceExtension uint64
programMuxRate uint32
IOBuffer
Payload []byte
PTS uint32
DTS uint32
EsHandler
audio MpegPsEsStream
video MpegPsEsStream
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
dec.programMuxRate = 0
dec.Payload = nil
dec.PTS = 0
dec.DTS = 0
}
func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
payloadlen, err := dec.Uint16()
if err != nil {
return
}
return dec.ReadN(int(payloadlen))
}
func (dec *DecPSPackage) Feed(ps []byte) {
if len(ps) >= 4 && util.BigEndian.Uint32(ps) == StartCodePS {
if dec.Len() > 0 {
dec.Skip(4)
dec.Read(0)
dec.Reset()
}
dec.Write(ps)
} else if dec.Len() > 0 {
dec.Write(ps)
}
}
// read the buffer and push video or audio
func (dec *DecPSPackage) Read(ts uint32) error {
again:
dec.clean()
if err := dec.Skip(9); err != nil {
return err
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
var video []byte
var nextStartCode, videoTs, videoCts uint32
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
break
}
switch nextStartCode {
case StartCodeSYS:
dec.ReadPayload()
//err = dec.decSystemHeader()
case StartCodeMAP:
err = dec.decProgramStreamMap()
case StartCodeVideo:
// var cts uint32
if err = dec.decPESPacket(); err == nil {
if len(video) == 0 {
dec.video.PTS = dec.PTS
dec.video.DTS = dec.DTS
// if dec.PTS == 0 {
// dec.PTS = ts
// }
// if dec.DTS != 0 {
// cts = dec.PTS - dec.DTS
// } else {
// dec.DTS = dec.PTS
// }
// videoTs = dec.DTS / 90
// videoCts = cts / 90
}
video = append(video, dec.Payload...)
} else {
// utils.Println("video", err)
}
case StartCodeAudio:
if err = dec.decPESPacket(); err == nil {
// ts := ts / 90
// if dec.PTS != 0 {
// ts = dec.PTS / 90
// }
dec.audio.PTS = dec.PTS
dec.audio.Buffer = dec.Payload
dec.ReceiveAudio(dec.audio)
// pusher.PushAudio(ts, dec.Payload)
} else {
// utils.Println("audio", err)
}
case StartCodePS:
break loop
default:
dec.ReadPayload()
}
}
if len(video) > 0 {
dec.video.Buffer = video
dec.ReceiveVideo(dec.video)
if false {
println("video", videoTs, videoCts, len(video))
}
// pusher.PushVideo(videoTs, videoCts, video)
}
if nextStartCode == StartCodePS {
// utils.Println(aurora.Red("StartCodePS recursion..."), err)
goto again
}
return err
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
*/
func (dec *DecPSPackage) decProgramStreamMap() error {
psm, err := dec.ReadPayload()
if err != nil {
return err
}
l := len(psm)
index := 2
programStreamInfoLen := util.BigEndian.Uint16(psm[index:])
index += 2
index += int(programStreamInfoLen)
programStreamMapLen := util.BigEndian.Uint16(psm[index:])
index += 2
for programStreamMapLen > 0 {
if l <= index+1 {
break
}
streamType := psm[index]
index++
elementaryStreamID := psm[index]
index++
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
dec.video.Type = streamType
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
dec.audio.Type = streamType
}
if l <= index+1 {
break
}
elementaryStreamInfoLength := util.BigEndian.Uint16(psm[index:])
index += 2
index += int(elementaryStreamInfoLength)
programStreamMapLen -= 4 + elementaryStreamInfoLength
}
return nil
}
func (dec *DecPSPackage) decPESPacket() error {
payload, err := dec.ReadPayload()
if err != nil {
return err
}
if len(payload) < 4 {
return errors.New("not enough data")
}
//data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1
flag := payload[1]
ptsFlag := flag>>7 == 1
dtsFlag := (flag&0b0100_0000)>>6 == 1
var pts, dts uint32
pesHeaderDataLen := payload[2]
payload = payload[3:]
extraData := payload[:pesHeaderDataLen]
if ptsFlag && len(extraData) > 4 {
pts = uint32(extraData[0]&0b0000_1110) << 29
pts += uint32(extraData[1]) << 22
pts += uint32(extraData[2]&0b1111_1110) << 14
pts += uint32(extraData[3]) << 7
pts += uint32(extraData[4]) >> 1
if dtsFlag && len(extraData) > 9 {
dts = uint32(extraData[5]&0b0000_1110) << 29
dts += uint32(extraData[6]) << 22
dts += uint32(extraData[7]&0b1111_1110) << 14
dts += uint32(extraData[8]) << 7
dts += uint32(extraData[9]) >> 1
}
}
dec.PTS = pts
dec.DTS = dts
dec.Payload = payload[pesHeaderDataLen:]
return err
}

283
codec/mpegps/ps-demuxer.go Normal file
View File

@@ -0,0 +1,283 @@
package mpegps
import (
"io"
"github.com/yapingcat/gomedia/go-codec"
"github.com/yapingcat/gomedia/go-mpeg2"
)
type psstream struct {
sid uint8
cid mpeg2.PS_STREAM_TYPE
pts uint64
dts uint64
streamBuf []byte
}
func newpsstream(sid uint8, cid mpeg2.PS_STREAM_TYPE) *psstream {
return &psstream{
sid: sid,
cid: cid,
streamBuf: make([]byte, 0, 4096),
}
}
type PSDemuxer struct {
streamMap map[uint8]*psstream
pkg *mpeg2.PSPacket
mpeg1 bool
cache []byte
OnFrame func(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64)
//解ps包过程中解码回调psmsystem headerpes包等
//decodeResult 解码ps包时的产生的错误
//这个回调主要用于debug查看是否ps包存在问题
OnPacket func(pkg mpeg2.Display, decodeResult error)
}
func NewPSDemuxer() *PSDemuxer {
return &PSDemuxer{
streamMap: make(map[uint8]*psstream),
pkg: new(mpeg2.PSPacket),
cache: make([]byte, 0, 256),
OnFrame: nil,
OnPacket: nil,
}
}
func (psdemuxer *PSDemuxer) Feed(data []byte) error {
var bs *codec.BitStream
if len(psdemuxer.cache) > 0 {
psdemuxer.cache = append(psdemuxer.cache, data...)
bs = codec.NewBitStream(psdemuxer.cache)
} else {
bs = codec.NewBitStream(data)
}
saveReseved := func() {
tmpcache := make([]byte, bs.RemainBytes())
copy(tmpcache, bs.RemainData())
psdemuxer.cache = tmpcache
}
var ret error = nil
for !bs.EOS() {
if mpegerr, ok := ret.(mpeg2.Error); ok {
if mpegerr.NeedMore() {
saveReseved()
}
break
}
if bs.RemainBits() < 32 {
ret = io.ErrShortBuffer
saveReseved()
break
}
prefix_code := bs.NextBits(32)
switch prefix_code {
case 0x000001BA: //pack header
if psdemuxer.pkg.Header == nil {
psdemuxer.pkg.Header = new(mpeg2.PSPackHeader)
}
ret = psdemuxer.pkg.Header.Decode(bs)
psdemuxer.mpeg1 = psdemuxer.pkg.Header.IsMpeg1
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Header, ret)
}
case 0x000001BB: //system header
if psdemuxer.pkg.Header == nil {
panic("psdemuxer.pkg.Header must not be nil")
}
if psdemuxer.pkg.System == nil {
psdemuxer.pkg.System = new(mpeg2.System_header)
}
ret = psdemuxer.pkg.System.Decode(bs)
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.System, ret)
}
case 0x000001BC: //program stream map
if psdemuxer.pkg.Psm == nil {
psdemuxer.pkg.Psm = new(mpeg2.Program_stream_map)
}
if ret = psdemuxer.pkg.Psm.Decode(bs); ret == nil {
for _, streaminfo := range psdemuxer.pkg.Psm.Stream_map {
if _, found := psdemuxer.streamMap[streaminfo.Elementary_stream_id]; !found {
stream := newpsstream(streaminfo.Elementary_stream_id, mpeg2.PS_STREAM_TYPE(streaminfo.Stream_type))
psdemuxer.streamMap[stream.sid] = stream
}
}
}
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Psm, ret)
}
case 0x000001BD, 0x000001BE, 0x000001BF, 0x000001F0, 0x000001F1,
0x000001F2, 0x000001F3, 0x000001F4, 0x000001F5, 0x000001F6,
0x000001F7, 0x000001F8, 0x000001F9, 0x000001FA, 0x000001FB:
if psdemuxer.pkg.CommPes == nil {
psdemuxer.pkg.CommPes = new(mpeg2.CommonPesPacket)
}
ret = psdemuxer.pkg.CommPes.Decode(bs)
case 0x000001FF: //program stream directory
if psdemuxer.pkg.Psd == nil {
psdemuxer.pkg.Psd = new(mpeg2.Program_stream_directory)
}
ret = psdemuxer.pkg.Psd.Decode(bs)
case 0x000001B9: //MPEG_program_end_code
continue
default:
if prefix_code&0xFFFFFFE0 == 0x000001C0 || prefix_code&0xFFFFFFE0 == 0x000001E0 {
if psdemuxer.pkg.Pes == nil {
psdemuxer.pkg.Pes = mpeg2.NewPesPacket()
}
if psdemuxer.mpeg1 {
ret = psdemuxer.pkg.Pes.DecodeMpeg1(bs)
} else {
ret = psdemuxer.pkg.Pes.Decode(bs)
}
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Pes, ret)
}
if ret == nil {
if stream, found := psdemuxer.streamMap[psdemuxer.pkg.Pes.Stream_id]; found {
if psdemuxer.mpeg1 && stream.cid == mpeg2.PS_STREAM_UNKNOW {
psdemuxer.guessCodecid(stream)
}
psdemuxer.demuxPespacket(stream, psdemuxer.pkg.Pes)
} else {
if psdemuxer.mpeg1 {
stream := newpsstream(psdemuxer.pkg.Pes.Stream_id, mpeg2.PS_STREAM_UNKNOW)
psdemuxer.streamMap[stream.sid] = stream
stream.streamBuf = append(stream.streamBuf, psdemuxer.pkg.Pes.Pes_payload...)
stream.pts = psdemuxer.pkg.Pes.Pts
stream.dts = psdemuxer.pkg.Pes.Dts
}
}
}
} else {
bs.SkipBits(8)
}
}
}
if ret == nil && len(psdemuxer.cache) > 0 {
psdemuxer.cache = nil
}
return ret
}
func (psdemuxer *PSDemuxer) Drop() {
psdemuxer.cache = psdemuxer.cache[:0]
for _, stream := range psdemuxer.streamMap {
if len(stream.streamBuf) == 0 {
continue
}
}
}
func (psdemuxer *PSDemuxer) guessCodecid(stream *psstream) {
if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_AUDIO) {
stream.cid = mpeg2.PS_STREAM_AAC
} else if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_VIDEO) {
h264score := 0
h265score := 0
codec.SplitFrame(stream.streamBuf, func(nalu []byte) bool {
h264nalutype := codec.H264NaluTypeWithoutStartCode(nalu)
h265nalutype := codec.H265NaluTypeWithoutStartCode(nalu)
if h264nalutype == codec.H264_NAL_PPS ||
h264nalutype == codec.H264_NAL_SPS ||
h264nalutype == codec.H264_NAL_I_SLICE {
h264score += 2
} else if h264nalutype < 5 {
h264score += 1
} else if h264nalutype > 20 {
h264score -= 1
}
if h265nalutype == codec.H265_NAL_PPS ||
h265nalutype == codec.H265_NAL_SPS ||
h265nalutype == codec.H265_NAL_VPS ||
(h265nalutype >= codec.H265_NAL_SLICE_BLA_W_LP && h265nalutype <= codec.H265_NAL_SLICE_CRA) {
h265score += 2
} else if h265nalutype >= codec.H265_NAL_Slice_TRAIL_N && h265nalutype <= codec.H265_NAL_SLICE_RASL_R {
h265score += 1
} else if h265nalutype > 40 {
h265score -= 1
}
if h264score > h265score && h264score >= 4 {
stream.cid = mpeg2.PS_STREAM_H264
} else if h264score < h265score && h265score >= 4 {
stream.cid = mpeg2.PS_STREAM_H265
}
return true
})
}
}
func (psdemuxer *PSDemuxer) demuxPespacket(stream *psstream, pes *mpeg2.PesPacket) error {
switch stream.cid {
case mpeg2.PS_STREAM_AAC, mpeg2.PS_STREAM_G711A, mpeg2.PS_STREAM_G711U:
return psdemuxer.demuxAudio(stream, pes)
case mpeg2.PS_STREAM_H264, mpeg2.PS_STREAM_H265:
return psdemuxer.demuxAudio(stream, pes)
case mpeg2.PS_STREAM_UNKNOW:
if stream.pts != pes.Pts {
stream.streamBuf = nil
}
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
stream.pts = pes.Pts
stream.dts = pes.Dts
}
return nil
}
func (psdemuxer *PSDemuxer) demuxAudio(stream *psstream, pes *mpeg2.PesPacket) error {
if stream.pts != pes.Pts && len(stream.streamBuf) > 0 {
if psdemuxer.OnFrame != nil {
psdemuxer.OnFrame(stream.streamBuf, stream.cid, stream.pts, stream.dts)
}
stream.streamBuf = nil
// stream.streamBuf = stream.streamBuf[:0]
}
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
stream.pts = pes.Pts
stream.dts = pes.Dts
return nil
}
// func (psdemuxer *PSDemuxer) demuxH26x(stream *psstream, pes *mpeg2.PesPacket) error {
// if len(stream.streamBuf) == 0 {
// stream.pts = pes.Pts
// stream.dts = pes.Dts
// }
// stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
// start, sc := codec.FindStartCode(stream.streamBuf, 0)
// for start >= 0 {
// end, sc2 := codec.FindStartCode(stream.streamBuf, start+int(sc))
// if end < 0 {
// break
// }
// if stream.cid == mpeg2.PS_STREAM_H264 {
// naluType := codec.H264NaluType(stream.streamBuf[start:])
// if naluType != codec.H264_NAL_AUD {
// if psdemuxer.OnFrame != nil {
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
// }
// }
// } else if stream.cid == mpeg2.PS_STREAM_H265 {
// naluType := codec.H265NaluType(stream.streamBuf[start:])
// if naluType != codec.H265_NAL_AUD {
// if psdemuxer.OnFrame != nil {
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
// }
// }
// }
// start = end
// sc = sc2
// }
// stream.streamBuf = stream.streamBuf[start:]
// stream.pts = pes.Pts
// stream.dts = pes.Dts
// return nil
// }

View File

@@ -107,6 +107,7 @@ type AVTrack interface {
type VideoTrack interface {
AVTrack
WriteSliceBytes(slice []byte)
WriteNalu(uint32, uint32, []byte)
WriteAnnexB(uint32, uint32, []byte)
SetLostFlag()
}

View File

@@ -1,6 +1,8 @@
package common
import (
"time"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
@@ -16,4 +18,5 @@ type IStream interface {
Receive(any) bool
SetIDR(Track)
GetPublisherConfig() *config.Publish
GetStartTime() time.Time
}

1
go.mod
View File

@@ -39,6 +39,7 @@ require (
github.com/quic-go/qtls-go1-20 v0.1.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect

2
go.sum
View File

@@ -144,6 +144,8 @@ github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7Am
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 h1:uyZY++dluUg7iTSsNzuOVln/mC2U2KXwgKLfKLCJ74Y=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

View File

@@ -290,12 +290,18 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int)
time.Sleep(time.Second * 5)
} else {
if err = opt.Publish(streamPath, puller); err != nil {
if stream := Streams.Get(streamPath); stream != nil && stream.Publisher != puller && stream.Publisher != nil {
io := stream.Publisher.GetPublisher()
opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err))
if stream := Streams.Get(streamPath); stream != nil {
if stream.Publisher != puller && stream.Publisher != nil {
io := stream.Publisher.GetPublisher()
opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err))
return
} else {
opt.Warn("pull publish", zurl, zap.Error(err))
}
} else {
opt.Error("pull publish", zurl, zap.Error(err))
return
}
opt.Error("pull publish", zurl, zap.Error(err))
}
if err = puller.Pull(); err != nil && !puller.IsShutdown() {
opt.Error("pull", zurl, zap.Error(err))

View File

@@ -2,8 +2,9 @@ package engine
import (
"github.com/pion/rtp/v2"
"github.com/yapingcat/gomedia/go-mpeg2"
"go.uber.org/zap"
. "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegps"
"m7s.live/engine/v4/codec/mpegts"
. "m7s.live/engine/v4/track"
@@ -17,8 +18,10 @@ type cacheItem struct {
type PSPublisher struct {
Publisher
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
mpegps.MpegPsStream `json:"-"`
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
// mpegps.MpegPsStream `json:"-"`
// *mpegps.PSDemuxer `json:"-"`
mpegps.DecPSPackage `json:"-"`
reorder util.RTPReorder[*cacheItem]
pool util.BytesPool
lastSeq uint16
@@ -30,6 +33,9 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
return
}
if p.EsHandler == nil {
// p.PSDemuxer = mpegps.NewPSDemuxer()
// p.PSDemuxer.OnPacket = p.OnPacket
// p.PSDemuxer.OnFrame = p.OnFrame
p.EsHandler = p
p.lastSeq = rtp.SequenceNumber - 1
if p.pool == nil {
@@ -45,7 +51,7 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
for cacheItem := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); cacheItem != nil; cacheItem = p.reorder.Pop() {
if cacheItem.Seq != p.lastSeq+1 {
p.Debug("drop", zap.Uint16("seq", cacheItem.Seq), zap.Uint16("lastSeq", p.lastSeq))
p.Drop()
p.Reset()
if p.VideoTrack != nil {
p.SetLostFlag()
}
@@ -56,6 +62,75 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
}
}
}
func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) {
switch cid {
case mpeg2.PS_STREAM_AAC:
if p.AudioTrack != nil {
p.AudioTrack.WriteADTS(uint32(pts), frame)
} else {
p.AudioTrack = NewAAC(p.Publisher.Stream)
}
case mpeg2.PS_STREAM_G711A:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, true)
}
case mpeg2.PS_STREAM_G711U:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, false)
}
case mpeg2.PS_STREAM_H264:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH264(p.Publisher.Stream)
}
case mpeg2.PS_STREAM_H265:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH265(p.Publisher.Stream)
}
}
}
func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
// switch value := pkg.(type) {
// case *mpeg2.PSPackHeader:
// // fd3.WriteString("--------------PS Pack Header--------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.System_header:
// // fd3.WriteString("--------------System Header--------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.Program_stream_map:
// // fd3.WriteString("--------------------PSM-------------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.PesPacket:
// // fd3.WriteString("-------------------PES--------------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// }
}
func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if p.VideoTrack == nil {
@@ -66,15 +141,15 @@ func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
p.VideoTrack = NewH265(p.Publisher.Stream)
default:
//推测编码类型
var maybe264 H264NALUType
var maybe264 codec.H264NALUType
maybe264 = maybe264.Parse(es.Buffer[4])
switch maybe264 {
case NALU_Non_IDR_Picture,
NALU_IDR_Picture,
NALU_SEI,
NALU_SPS,
NALU_PPS,
NALU_Access_Unit_Delimiter:
case codec.NALU_Non_IDR_Picture,
codec.NALU_IDR_Picture,
codec.NALU_SEI,
codec.NALU_SPS,
codec.NALU_PPS,
codec.NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))

View File

@@ -218,6 +218,10 @@ type StreamSummay struct {
BPS int
}
func (s *Stream) GetStartTime() time.Time {
return s.StartTime
}
func (s *Stream) GetPublisherConfig() *config.Publish {
return s.Publisher.GetPublisher().Config
}

View File

@@ -215,6 +215,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
case SUBTYPE_RTP:
var videoSeq, audioSeq uint16
sendVideoFrame = func(frame *AVFrame) {
// fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
frame.RTP.Range(func(vp RTPFrame) bool {
videoSeq++
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipRTPTs
@@ -225,6 +226,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
sendAudioFrame = func(frame *AVFrame) {
// fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
frame.RTP.Range(func(ap RTPFrame) bool {
audioSeq++
ap.Header.SequenceNumber = audioSeq
@@ -282,31 +284,36 @@ func (s *Subscriber) PlayBlock(subType byte) {
for ctx.Err() == nil {
s.VideoReader.Read(ctx, subMode)
frame := s.VideoReader.Frame
// println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if frame == nil || ctx.Err() != nil {
return
}
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if frame.IFrame && s.VideoReader.DecConfChanged() {
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
sendVideoDecConf()
}
if audioFrame != nil {
if frame.AbsTime > lastAbsTime {
if audioFrame.CanRead {
sendAudioFrame(audioFrame)
if hasAudio {
if audioFrame != nil {
if frame.AbsTime > lastAbsTime {
// fmt.Println("switch audio", audioFrame.CanRead)
if audioFrame.CanRead {
sendAudioFrame(audioFrame)
}
videoFrame = frame
lastAbsTime = frame.AbsTime
break
}
} else if lastAbsTime == 0 {
if lastAbsTime = frame.AbsTime; lastAbsTime != 0 {
videoFrame = frame
break
}
videoFrame = frame
lastAbsTime = frame.AbsTime
break
}
} else if lastAbsTime == 0 {
if lastAbsTime = frame.AbsTime; lastAbsTime != 0 {
videoFrame = frame
break
}
}
if !conf.IFrameOnly || frame.IFrame {
sendVideoFrame(frame)
} else {
// fmt.Println("skip video", frame.Sequence)
}
}
}
@@ -327,16 +334,17 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
s.AudioReader.Read(ctx, subMode)
frame := s.AudioReader.Frame
// println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if frame == nil || ctx.Err() != nil {
return
}
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if s.AudioReader.DecConfChanged() {
s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
sendAudioDecConf()
}
if videoFrame != nil {
if hasVideo && videoFrame != nil {
if frame.AbsTime > lastAbsTime {
// fmt.Println("switch video", videoFrame.CanRead)
if videoFrame.CanRead {
sendVideoFrame(videoFrame)
}
@@ -347,6 +355,8 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
if frame.AbsTime >= s.AudioReader.SkipTs {
sendAudioFrame(frame)
} else {
// fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs)
}
}
}

View File

@@ -247,6 +247,9 @@ func (av *Media) Flush() {
if av.起始时间.IsZero() {
curValue.DeltaTime = 0
if curValue.AbsTime == 0 {
curValue.AbsTime = uint32(time.Since(av.Stream.GetStartTime()).Milliseconds())
}
av.重置(curValue.AbsTime)
} else if curValue.AbsTime == 0 {
curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90
@@ -254,7 +257,7 @@ func (av *Media) Flush() {
} else {
curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime
}
// fmt.Println(av.Name,curValue.DTS, curValue.AbsTime, curValue.DeltaTime)
// fmt.Println(av.Name, curValue.DTS, curValue.AbsTime, curValue.DeltaTime)
if curValue.AUList.Length > 0 {
// 补完RTP
if config.Global.EnableRTP && curValue.RTP.Length == 0 {

View File

@@ -29,10 +29,11 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
func (vt *H264) WriteSliceBytes(slice []byte) {
naluType := codec.ParseH264NALUType(slice[0])
// println("naluType", naluType)
// vt.Info("naluType", zap.Uint8("naluType", naluType.Byte()))
switch naluType {
case codec.NALU_SPS:
vt.SPSInfo, _ = codec.ParseSPS(slice)
vt.Debug("SPS", zap.Any("SPSInfo", vt.SPSInfo))
vt.Video.SPS = slice
vt.ParamaterSets[0] = slice
case codec.NALU_PPS:
@@ -100,6 +101,10 @@ func (vt *H264) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
}
func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 {
vt.lostFlag = true
vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2))
}
rv := &vt.Value
if naluType := frame.H264Type(); naluType < 24 {
vt.WriteSliceBytes(frame.Payload)
@@ -119,7 +124,12 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
if util.Bit1(frame.Payload[1], 0) {
vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60))
}
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():]))
if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil {
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():]))
} else {
vt.Error("fu have no start")
return
}
}
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延

View File

@@ -2,6 +2,7 @@ package track
import (
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -23,9 +24,13 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) {
func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
av.Value.BytesIn += len(frame.Value.Payload) + 12
av.Value.RTP.Push(frame)
if len(frame.Value.Payload) > 0 {
av.Value.RTP.Push(frame)
av.WriteRTPFrame(&frame.Value)
// av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
} else {
av.Warn("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))
frame.Recycle()
}
}
}
@@ -54,19 +59,31 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) {
}
type RTPDemuxer struct {
lastSeq uint16 //上一个收到的序号,用于乱序重排
lastSeq2 uint16 //记录上上一个收到的序
lastSeq uint16 //上一个rtp包的序号
lastSeq2 uint16 //上上一个rtp包的序号
乱序重排 util.RTPReorder[*util.ListItem[RTPFrame]]
}
// 获取缓存中下一个rtpFrame
func (av *RTPDemuxer) nextRTPFrame() (frame *util.ListItem[RTPFrame]) {
return av.乱序重排.Pop()
frame = av.乱序重排.Pop()
if frame == nil {
return
}
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.Value.SequenceNumber
return
}
// 对RTP包乱序重排
func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) *util.ListItem[RTPFrame] {
return av.乱序重排.Push(item.Value.SequenceNumber, item)
func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) (frame *util.ListItem[RTPFrame]) {
frame = av.乱序重排.Push(item.Value.SequenceNumber, item)
if frame == nil {
return
}
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.Value.SequenceNumber
return
}
type RTPMuxer struct {

View File

@@ -91,7 +91,17 @@ func (vt *Video) computeGOP() {
func (vt *Video) writeAnnexBSlice(nalu []byte) {
common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1)
}
func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Value.PTS = pts
vt.Value.DTS = dts
}
vt.Value.BytesIn += len(nalu)
vt.WriteSliceBytes(nalu)
vt.Flush()
}
func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
if dts == 0 {
vt.generateTimestamp(pts)
@@ -101,10 +111,13 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
}
vt.Value.BytesIn += len(frame)
common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
vt.Flush()
if vt.Value.AUList.ByteLength > 0 {
vt.Flush()
}
}
func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) error {
func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (e error) {
// bbb := util.Buffer(frame.ToBytes()[5:])
r := frame.NewReader()
b, err := r.ReadByte()
if err != nil {
@@ -120,7 +133,23 @@ func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) error {
vt.Value.PTS = vt.Ms2MpegTs(ts + cts)
vt.Value.DTS = vt.Ms2MpegTs(ts)
// println(":", vt.Value.Sequence)
for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
var nalulen uint32
for nalulen, e = r.ReadBE(vt.nalulenSize); e == nil; nalulen, e = r.ReadBE(vt.nalulenSize) {
if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) {
vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain))
frame.Recycle()
vt.Value.Reset()
return
// for bbb.CanRead() {
// nalulen = bbb.ReadUint32()
// if bbb.CanReadN(int(nalulen)) {
// bbb.ReadN(int(nalulen))
// } else {
// panic("read nalu error1")
// }
// }
// panic("read nalu error2")
}
// var au util.BLL
// for _, bb := range r.ReadN(int(nalulen)) {
// au.Push(vt.BytesPool.GetShell(bb))
@@ -148,6 +177,7 @@ func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) error {
}
func (vt *Video) WriteSliceByte(b ...byte) {
// fmt.Println("write slice byte", b)
vt.WriteSliceBytes(b)
}

View File

@@ -72,6 +72,10 @@ func (r *BLLReader) ReadN(n int) (result net.Buffers) {
return
}
func (r *BLLReader) GetOffset() int {
return r.pos
}
type BLLsReader struct {
*ListItem[*BLL]
BLLReader