修复WritePESPacket功能,AddTrack增加Promise

This commit is contained in:
dexter
2023-02-14 20:53:09 +08:00
parent 80fbe901b4
commit d6797d34a7
15 changed files with 138 additions and 95 deletions

View File

@@ -1,6 +1,7 @@
package common
import (
"bytes"
"io"
"net"
"time"
@@ -11,7 +12,13 @@ import (
"m7s.live/engine/v4/util"
)
type AnnexBFrame []byte // 一帧AnnexB格式数据
func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) {
for found, after := true, frame; len(frame) > 0 && found; frame = after {
frame, after, found = bytes.Cut(frame, delimiter)
process(frame)
}
}
type RTPFrame struct {
rtp.Packet
}

View File

@@ -106,12 +106,12 @@ type AVTrack interface {
type VideoTrack interface {
AVTrack
WriteSliceBytes(slice []byte)
WriteAnnexB(uint32, uint32, AnnexBFrame)
WriteAnnexB(uint32, uint32, []byte)
SetLostFlag()
}
type AudioTrack interface {
AVTrack
WriteADTS([]byte)
WriteADTS(uint32, []byte)
WriteRaw(uint32, []byte)
}

View File

@@ -3,10 +3,11 @@ package common
import (
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
)
type IStream interface {
AddTrack(Track)
AddTrack(*util.Promise[Track])
RemoveTrack(Track)
Close()
IsClosed() bool

17
io.go
View File

@@ -127,11 +127,14 @@ func (io *IO) Stop() {
}
}
var ErrBadName = errors.New("Stream Already Exist")
var ErrStreamIsClosed = errors.New("Stream Is Closed")
var ErrPublisherLost = errors.New("Publisher Lost")
var OnAuthSub func(p *util.Promise[ISubscriber]) error
var OnAuthPub func(p *util.Promise[IPublisher]) error
var (
ErrBadStreamName = errors.New("Stream Already Exist")
ErrBadTrackName = errors.New("Track Already Exist")
ErrStreamIsClosed = errors.New("Stream Is Closed")
ErrPublisherLost = errors.New("Publisher Lost")
OnAuthSub func(p *util.Promise[ISubscriber]) error
OnAuthPub func(p *util.Promise[IPublisher]) error
)
// receive 用于接收发布或者订阅
func (io *IO) receive(streamPath string, specific IIO) error {
@@ -153,7 +156,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
s, create := findOrCreateStream(u.Path, wt)
Streams.Unlock()
if s == nil {
return ErrBadName
return ErrBadStreamName
}
io.Stream = s
io.Spesific = specific
@@ -177,7 +180,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
} else if oldPublisher == specific {
//断线重连
} else {
return ErrBadName
return ErrBadStreamName
}
}
s.PublishTimeout = conf.PublishTimeout

View File

@@ -29,23 +29,22 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M
err = errors.New("packetStartCodePrefix != 0x000001")
return
}
var pesBuffers net.Buffers = packet.Buffers
pesHeadItem := ts.Get(32)
pesHeadItem.Value.Reset()
_, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header)
if err != nil {
return
}
pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...)
defer pesHeadItem.Recycle()
var pesPktLength int
var tsHeaderLength int
for i := 0; len(pesBuffers) > 0; i++ {
headerItem := ts.Get(mpegts.TS_PACKET_SIZE)
ts.BLL.Push(headerItem)
bwTsHeader := headerItem.Value
tsHeaderBuffer := bwTsHeader.SubBuf(0, 0)
copy(bwTsHeader, mpegts.Stuffing)
if i == 0 {
_, err = mpegts.WritePESHeader(&tsHeaderBuffer, packet.Header)
if err != nil {
return
}
pesPktLength = len(tsHeaderBuffer) + util.SizeOfBuffers(pesBuffers)
}
bwTsHeader := &headerItem.Value
bwTsHeader.Reset()
pesPktLength = util.SizeOfBuffers(pesBuffers)
tsHeader := mpegts.MpegTsHeader{
SyncByte: 0x47,
TransportErrorIndicator: 0,
@@ -89,14 +88,19 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M
tsStuffingLength = 0
}
// error
tsHeaderLength, err = mpegts.WriteTsHeader(&tsHeaderBuffer, tsHeader)
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
if tsStuffingLength > 0 {
if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
return
}
}
tsHeaderLength += int(tsStuffingLength)
} else {
tsHeaderLength, err = mpegts.WriteTsHeader(&tsHeaderBuffer, tsHeader)
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
@@ -107,13 +111,13 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M
//fmt.Println("tsPayloadLength :", tsPayloadLength)
// 这里不断的减少PES包
io.CopyN(&tsHeaderBuffer, &pesBuffers, int64(tsPayloadLength))
io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength))
// tmp := tsHeaderByte[3] << 2
// tmp = tmp >> 6
// if tmp == 2 {
// fmt.Println("fuck you mother.")
// }
tsPktByteLen := len(tsHeaderBuffer)
tsPktByteLen := bwTsHeader.Len()
if tsPktByteLen != mpegts.TS_PACKET_SIZE {
err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen))

View File

@@ -3,7 +3,6 @@ package engine
import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/track"
)
@@ -69,22 +68,7 @@ func (t *TSPublisher) ReadPES() {
if t.AudioTrack != nil {
switch t.AudioTrack.(type) {
case *track.AAC:
if t.adts == nil {
t.adts = append(t.adts, pes.Payload[:7]...)
t.AudioTrack.WriteADTS(t.adts)
}
remainLen := len(pes.Payload)
for remainLen > 0 {
// AACFrameLength(13)
// xx xxxxxxxx xxx
frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5)
if frameLen > remainLen {
break
}
t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload[:frameLen])
pes.Payload = pes.Payload[frameLen:remainLen]
remainLen -= frameLen
}
t.AudioTrack.WriteADTS(uint32(pes.Header.Pts), pes.Payload)
case *track.G711:
t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload)
}
@@ -96,7 +80,7 @@ func (t *TSPublisher) ReadPES() {
}
}
if t.VideoTrack != nil {
t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), common.AnnexBFrame(pes.Payload))
t.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload)
}
}
}

View File

@@ -432,7 +432,7 @@ func (s *Stream) run() {
if s.action(ACTION_PUBLISH) || republish {
v.Resolve()
} else {
v.Reject(ErrBadName)
v.Reject(ErrBadStreamName)
}
case *util.Promise[ISubscriber]:
if s.IsClosed() {
@@ -494,14 +494,16 @@ func (s *Stream) run() {
dt.Dispose()
}
}
case Track:
case *util.Promise[Track]:
if s.State == STATE_WAITPUBLISH {
s.action(ACTION_PUBLISH)
}
name := v.GetBase().Name
if s.Tracks.Add(name, v) {
s.Info("track +1", zap.String("name", name))
s.Subscribers.OnTrack(v)
name := v.Value.GetBase().Name
if s.Tracks.Add(name, v.Value) {
v.Resolve()
s.Subscribers.OnTrack(v.Value)
} else {
v.Reject(ErrBadTrackName)
}
case StreamAction:
s.action(v)
@@ -521,7 +523,7 @@ func (s *Stream) run() {
}
}
func (s *Stream) AddTrack(t Track) {
func (s *Stream) AddTrack(t *util.Promise[Track]) {
s.Receive(t)
}
@@ -533,11 +535,11 @@ func (s *Stream) RemoveTrack(t Track) {
s.Receive(TrackRemoved{t})
}
func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data) {
func (r *Stream) NewDataTrack(name string, locker sync.Locker) (dt *track.Data) {
dt = &track.Data{
Locker: locker,
}
dt.Init(10)
dt.Stream = r
dt.SetStuff(name, r)
return
}

View File

@@ -63,15 +63,11 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) {
return t.WriteTo(w)
}
// func copyBuffers(b net.Buffers) (r net.Buffers) {
// return append(r, b...)
// }
func (v VideoFrame) GetAnnexB() (r net.Buffers) {
v.AUList.Range(func(au *util.BLL) bool {
r = append(append(r, codec.NALU_Delimiter1), au.ToBuffers()...)
r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...)
return true
})
r[0] = codec.NALU_Delimiter2
return
}
@@ -178,12 +174,12 @@ func (s *Subscriber) PlayBlock(subType byte) {
return
}
sendVideoDecConf := func() {
s.Debug("sendVideoDecConf")
// s.Debug("sendVideoDecConf")
spesic.OnEvent(s.Video.ParamaterSets)
spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead))
}
sendAudioDecConf := func() {
s.Debug("sendAudioDecConf")
// s.Debug("sendAudioDecConf")
spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead))
}
var sendAudioFrame, sendVideoFrame func(*AVFrame)
@@ -248,7 +244,6 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendVideoFrame = func(frame *AVFrame) {
// println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame)
// b := util.Buffer(frame.AVCC.ToBytes()[5:])
// println(frame.Sequence)
// for b.CanRead() {
// nalulen := int(b.ReadUint32())
// if b.CanReadN(nalulen) {
@@ -278,6 +273,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
for ctx.Err() == nil {
s.VideoReader.Read(ctx, subMode)
frame := s.VideoReader.Frame
// println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if frame == nil || ctx.Err() != nil {
return
}
@@ -322,7 +318,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
s.AudioReader.Read(ctx, subMode)
frame := s.AudioReader.Frame
// println("audio", frame.Sequence, frame.AbsTime)
// println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if frame == nil || ctx.Err() != nil {
return
}

View File

@@ -34,7 +34,8 @@ type AAC struct {
lack int // 用于处理不完整的AU,缺少的字节数
}
func (aac *AAC) WriteADTS(adts []byte) {
func (aac *AAC) WriteADTS(ts uint32,adts []byte) {
if aac.SequenceHead == nil {
profile := ((adts[2] & 0xc0) >> 6) + 1
sampleRate := (adts[2] & 0x3c) >> 2
channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
@@ -42,11 +43,24 @@ func (aac *AAC) WriteADTS(adts []byte) {
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
aac.Media.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2})
aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
// aac.ClockRate = aac.SampleRate
aac.Channels = channel
aac.Parse(aac.SequenceHead[2:])
aac.Attach()
}
frameLen := (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5]) >> 5)
for len(adts) >= frameLen {
aac.Value.AUList.Push(aac.BytesPool.GetShell(adts[7:frameLen]))
adts = adts[frameLen:]
if len(adts) < 7 {
break
}
frameLen = (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5]) >> 5)
}
aac.Flush()
}
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
auHeaderLen := util.ReadBE[int](frame.Payload[:aac.Mode]) >> 3 //通常为2即一个AU Header的长度

View File

@@ -1,7 +1,9 @@
package track
import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -18,7 +20,13 @@ type Audio struct {
func (a *Audio) Attach() {
if a.Attached.CompareAndSwap(false, true) {
a.Stream.AddTrack(a)
promise := util.NewPromise(common.Track(a))
a.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
a.Error("attach audio track failed", zap.Error(err))
} else {
a.Info("audio track attached", zap.Uint32("sample rate", a.SampleRate))
}
}
}
@@ -35,15 +43,12 @@ func (a *Audio) GetName() string {
return a.Name
}
func (av *Audio) WriteADTS(adts []byte) {
func (av *Audio) WriteADTS(pts uint32, adts []byte) {
}
func (av *Audio) WriteRaw(pts uint32, raw []byte) {
curValue := &av.Value
curValue.BytesIn += len(raw)
if len(av.AVCCHead) == 2 {
raw = raw[7:] //AAC 去掉7个字节的ADTS头
}
curValue.AUList.Push(av.BytesPool.GetShell(raw))
av.generateTimestamp(pts)
av.Flush()

View File

@@ -32,14 +32,14 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
// }
// 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下100ms作为一个弹性区间防止频繁调用sleep
if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond {
// println("过快毫秒", 过快)
// println("过快毫秒", p.name, 过快.Milliseconds())
if 过快 > p.等待上限 {
time.Sleep(p.等待上限)
} else {
time.Sleep(过快)
}
} else if 过快 < -100*time.Millisecond {
// println("过慢毫秒", 过快)
// println("过慢毫秒", p.name, 过快.Milliseconds())
}
}

View File

@@ -5,6 +5,8 @@ import (
"sync"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/common"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -43,3 +45,13 @@ func (d *Data) Play(ctx context.Context, onData func(any) error) error {
}
return ctx.Err()
}
func (d *Data) Attach() {
promise := util.NewPromise(common.Track(d))
d.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
d.Error("attach data track failed", zap.Error(err))
} else {
d.Info("data track attached")
}
}

View File

@@ -22,6 +22,7 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
vt.SetStuff("h264", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10)
vt.SetStuff(stuff...)
vt.ParamaterSets = make(ParamaterSets, 2)
vt.nalulenSize = 4
vt.dtsEst = NewDTSEstimator()
return
}

View File

@@ -23,6 +23,7 @@ func NewH265(stream IStream, stuff ...any) (vt *H265) {
vt.SetStuff("h265", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10)
vt.SetStuff(stuff...)
vt.ParamaterSets = make(ParamaterSets, 3)
vt.nalulenSize = 4
vt.dtsEst = NewDTSEstimator()
return
}

View File

@@ -1,11 +1,12 @@
package track
import (
"bytes"
// . "github.com/logrusorgru/aurora"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -26,7 +27,13 @@ type Video struct {
func (v *Video) Attach() {
if v.Attached.CompareAndSwap(false, true) {
v.Stream.AddTrack(v)
promise := util.NewPromise(common.Track(v))
v.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
v.Error("attach video track failed", zap.Error(err))
} else {
v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
}
}
}
@@ -67,10 +74,12 @@ func (vt *Video) GetName() string {
// return ctx.Err()
// }
func (vt *Video) computeGOP() {
if vt.HistoryRing == nil && vt.IDRing != nil {
if vt.IDRing != nil {
vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
if vt.HistoryRing == nil {
vt.narrow(vt.GOP)
}
}
vt.AddIDR()
// var n int
// for i := 0; i < len(vt.BytesPool); i++ {
@@ -79,14 +88,12 @@ func (vt *Video) computeGOP() {
// println(n)
}
func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) {
for found, after := true, annexb; len(annexb) > 0 && found; annexb = after {
annexb, after, found = bytes.Cut(annexb, codec.NALU_Delimiter1)
vt.WriteSliceBytes(annexb)
}
func (vt *Video) writeAnnexBSlice(nalu []byte) {
common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1)
}
func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
// println("write annexb", len(frame), pts, dts)
if dts == 0 {
vt.generateTimestamp(pts)
} else {
@@ -94,10 +101,7 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
vt.Value.DTS = dts
}
vt.Value.BytesIn += len(frame)
for found, after := true, frame; len(frame) > 0 && found; frame = after {
frame, after, found = bytes.Cut(frame, codec.NALU_Delimiter2)
vt.writeAnnexBSlice(frame)
}
common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
vt.Flush()
}
@@ -191,14 +195,23 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
// 写入CTS
util.PutBE(b[2:5], vt.RTPTs2Ms(rv.PTS-rv.DTS))
rv.AVCC.Push(mem)
// if rv.AVCC.ByteLength != 5 {
// panic("error")
// }
// var tmp = 0
rv.AUList.Range(func(au *util.BLL) bool {
mem = vt.BytesPool.Get(4)
// println(au.ByteLength)
util.PutBE(mem.Value, uint32(au.ByteLength))
rv.AVCC.Push(mem)
au.Range(func(slice util.Buffer) bool {
rv.AVCC.Push(vt.BytesPool.GetShell(slice))
return true
})
// tmp += 4 + au.ByteLength
// if rv.AVCC.ByteLength != 5+tmp {
// panic("error")
// }
return true
})
}