diff --git a/common/frame.go b/common/frame.go index 2348a2e..8b0e991 100644 --- a/common/frame.go +++ b/common/frame.go @@ -1,6 +1,7 @@ package common import ( + "bytes" "io" "net" "time" @@ -11,7 +12,13 @@ import ( "m7s.live/engine/v4/util" ) -type AnnexBFrame []byte // 一帧AnnexB格式数据 +func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) { + for found, after := true, frame; len(frame) > 0 && found; frame = after { + frame, after, found = bytes.Cut(frame, delimiter) + process(frame) + } +} + type RTPFrame struct { rtp.Packet } diff --git a/common/index.go b/common/index.go index ade5ac8..60b1d3e 100644 --- a/common/index.go +++ b/common/index.go @@ -106,12 +106,12 @@ type AVTrack interface { type VideoTrack interface { AVTrack WriteSliceBytes(slice []byte) - WriteAnnexB(uint32, uint32, AnnexBFrame) + WriteAnnexB(uint32, uint32, []byte) SetLostFlag() } type AudioTrack interface { AVTrack - WriteADTS([]byte) + WriteADTS(uint32, []byte) WriteRaw(uint32, []byte) } diff --git a/common/stream.go b/common/stream.go index af71c37..fffc1e1 100644 --- a/common/stream.go +++ b/common/stream.go @@ -3,10 +3,11 @@ package common import ( "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" + "m7s.live/engine/v4/util" ) type IStream interface { - AddTrack(Track) + AddTrack(*util.Promise[Track]) RemoveTrack(Track) Close() IsClosed() bool diff --git a/io.go b/io.go index 31ff39d..3594bcf 100644 --- a/io.go +++ b/io.go @@ -127,11 +127,14 @@ func (io *IO) Stop() { } } -var ErrBadName = errors.New("Stream Already Exist") -var ErrStreamIsClosed = errors.New("Stream Is Closed") -var ErrPublisherLost = errors.New("Publisher Lost") -var OnAuthSub func(p *util.Promise[ISubscriber]) error -var OnAuthPub func(p *util.Promise[IPublisher]) error +var ( + ErrBadStreamName = errors.New("Stream Already Exist") + ErrBadTrackName = errors.New("Track Already Exist") + ErrStreamIsClosed = errors.New("Stream Is Closed") + ErrPublisherLost = errors.New("Publisher Lost") + OnAuthSub func(p *util.Promise[ISubscriber]) error + OnAuthPub func(p *util.Promise[IPublisher]) error +) // receive 用于接收发布或者订阅 func (io *IO) receive(streamPath string, specific IIO) error { @@ -153,7 +156,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { s, create := findOrCreateStream(u.Path, wt) Streams.Unlock() if s == nil { - return ErrBadName + return ErrBadStreamName } io.Stream = s io.Spesific = specific @@ -177,7 +180,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { } else if oldPublisher == specific { //断线重连 } else { - return ErrBadName + return ErrBadStreamName } } s.PublishTimeout = conf.PublishTimeout diff --git a/memory-ts.go b/memory-ts.go index fa95a4b..806d959 100644 --- a/memory-ts.go +++ b/memory-ts.go @@ -29,23 +29,22 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M err = errors.New("packetStartCodePrefix != 0x000001") return } - - var pesBuffers net.Buffers = packet.Buffers + pesHeadItem := ts.Get(32) + pesHeadItem.Value.Reset() + _, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header) + if err != nil { + return + } + pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...) + defer pesHeadItem.Recycle() var pesPktLength int var tsHeaderLength int for i := 0; len(pesBuffers) > 0; i++ { headerItem := ts.Get(mpegts.TS_PACKET_SIZE) ts.BLL.Push(headerItem) - bwTsHeader := headerItem.Value - tsHeaderBuffer := bwTsHeader.SubBuf(0, 0) - copy(bwTsHeader, mpegts.Stuffing) - if i == 0 { - _, err = mpegts.WritePESHeader(&tsHeaderBuffer, packet.Header) - if err != nil { - return - } - pesPktLength = len(tsHeaderBuffer) + util.SizeOfBuffers(pesBuffers) - } + bwTsHeader := &headerItem.Value + bwTsHeader.Reset() + pesPktLength = util.SizeOfBuffers(pesBuffers) tsHeader := mpegts.MpegTsHeader{ SyncByte: 0x47, TransportErrorIndicator: 0, @@ -89,14 +88,19 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M tsStuffingLength = 0 } // error - tsHeaderLength, err = mpegts.WriteTsHeader(&tsHeaderBuffer, tsHeader) + tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader) if err != nil { return } + if tsStuffingLength > 0 { + if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil { + return + } + } tsHeaderLength += int(tsStuffingLength) } else { - tsHeaderLength, err = mpegts.WriteTsHeader(&tsHeaderBuffer, tsHeader) + tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader) if err != nil { return } @@ -107,13 +111,13 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M //fmt.Println("tsPayloadLength :", tsPayloadLength) // 这里不断的减少PES包 - io.CopyN(&tsHeaderBuffer, &pesBuffers, int64(tsPayloadLength)) + io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength)) // tmp := tsHeaderByte[3] << 2 // tmp = tmp >> 6 // if tmp == 2 { // fmt.Println("fuck you mother.") // } - tsPktByteLen := len(tsHeaderBuffer) + tsPktByteLen := bwTsHeader.Len() if tsPktByteLen != mpegts.TS_PACKET_SIZE { err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen)) diff --git a/publisher-ts.go b/publisher-ts.go index e73bd77..2223fa3 100644 --- a/publisher-ts.go +++ b/publisher-ts.go @@ -3,7 +3,6 @@ package engine import ( "go.uber.org/zap" "m7s.live/engine/v4/codec/mpegts" - "m7s.live/engine/v4/common" "m7s.live/engine/v4/track" ) @@ -69,22 +68,7 @@ func (t *TSPublisher) ReadPES() { if t.AudioTrack != nil { switch t.AudioTrack.(type) { case *track.AAC: - if t.adts == nil { - t.adts = append(t.adts, pes.Payload[:7]...) - t.AudioTrack.WriteADTS(t.adts) - } - remainLen := len(pes.Payload) - for remainLen > 0 { - // AACFrameLength(13) - // xx xxxxxxxx xxx - frameLen := (int(pes.Payload[3]&3) << 11) | (int(pes.Payload[4]) << 3) | (int(pes.Payload[5]) >> 5) - if frameLen > remainLen { - break - } - t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload[:frameLen]) - pes.Payload = pes.Payload[frameLen:remainLen] - remainLen -= frameLen - } + t.AudioTrack.WriteADTS(uint32(pes.Header.Pts), pes.Payload) case *track.G711: t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload) } @@ -96,7 +80,7 @@ func (t *TSPublisher) ReadPES() { } } if t.VideoTrack != nil { - t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), common.AnnexBFrame(pes.Payload)) + t.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload) } } } diff --git a/stream.go b/stream.go index 864b9e0..ac54324 100644 --- a/stream.go +++ b/stream.go @@ -432,7 +432,7 @@ func (s *Stream) run() { if s.action(ACTION_PUBLISH) || republish { v.Resolve() } else { - v.Reject(ErrBadName) + v.Reject(ErrBadStreamName) } case *util.Promise[ISubscriber]: if s.IsClosed() { @@ -494,14 +494,16 @@ func (s *Stream) run() { dt.Dispose() } } - case Track: + case *util.Promise[Track]: if s.State == STATE_WAITPUBLISH { s.action(ACTION_PUBLISH) } - name := v.GetBase().Name - if s.Tracks.Add(name, v) { - s.Info("track +1", zap.String("name", name)) - s.Subscribers.OnTrack(v) + name := v.Value.GetBase().Name + if s.Tracks.Add(name, v.Value) { + v.Resolve() + s.Subscribers.OnTrack(v.Value) + } else { + v.Reject(ErrBadTrackName) } case StreamAction: s.action(v) @@ -521,7 +523,7 @@ func (s *Stream) run() { } } -func (s *Stream) AddTrack(t Track) { +func (s *Stream) AddTrack(t *util.Promise[Track]) { s.Receive(t) } @@ -533,11 +535,11 @@ func (s *Stream) RemoveTrack(t Track) { s.Receive(TrackRemoved{t}) } -func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data) { +func (r *Stream) NewDataTrack(name string, locker sync.Locker) (dt *track.Data) { dt = &track.Data{ Locker: locker, } dt.Init(10) - dt.Stream = r + dt.SetStuff(name, r) return } diff --git a/subscriber.go b/subscriber.go index aa76b88..305ec5b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -63,15 +63,11 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) { return t.WriteTo(w) } -// func copyBuffers(b net.Buffers) (r net.Buffers) { -// return append(r, b...) -// } func (v VideoFrame) GetAnnexB() (r net.Buffers) { v.AUList.Range(func(au *util.BLL) bool { - r = append(append(r, codec.NALU_Delimiter1), au.ToBuffers()...) + r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...) return true }) - r[0] = codec.NALU_Delimiter2 return } @@ -178,12 +174,12 @@ func (s *Subscriber) PlayBlock(subType byte) { return } sendVideoDecConf := func() { - s.Debug("sendVideoDecConf") + // s.Debug("sendVideoDecConf") spesic.OnEvent(s.Video.ParamaterSets) spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead)) } sendAudioDecConf := func() { - s.Debug("sendAudioDecConf") + // s.Debug("sendAudioDecConf") spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead)) } var sendAudioFrame, sendVideoFrame func(*AVFrame) @@ -248,7 +244,6 @@ func (s *Subscriber) PlayBlock(subType byte) { sendVideoFrame = func(frame *AVFrame) { // println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame) // b := util.Buffer(frame.AVCC.ToBytes()[5:]) - // println(frame.Sequence) // for b.CanRead() { // nalulen := int(b.ReadUint32()) // if b.CanReadN(nalulen) { @@ -278,6 +273,7 @@ func (s *Subscriber) PlayBlock(subType byte) { for ctx.Err() == nil { s.VideoReader.Read(ctx, subMode) frame := s.VideoReader.Frame + // println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) if frame == nil || ctx.Err() != nil { return } @@ -322,7 +318,7 @@ func (s *Subscriber) PlayBlock(subType byte) { } s.AudioReader.Read(ctx, subMode) frame := s.AudioReader.Frame - // println("audio", frame.Sequence, frame.AbsTime) + // println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) if frame == nil || ctx.Err() != nil { return } diff --git a/track/aac.go b/track/aac.go index 81ef64a..8e8ac0a 100644 --- a/track/aac.go +++ b/track/aac.go @@ -34,17 +34,31 @@ type AAC struct { lack int // 用于处理不完整的AU,缺少的字节数 } -func (aac *AAC) WriteADTS(adts []byte) { - profile := ((adts[2] & 0xc0) >> 6) + 1 - sampleRate := (adts[2] & 0x3c) >> 2 - channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) - config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) - config2 := ((sampleRate & 0x1) << 7) | (channel << 3) - aac.Media.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2}) - aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) - aac.Channels = channel - aac.Parse(aac.SequenceHead[2:]) - aac.Attach() +func (aac *AAC) WriteADTS(ts uint32,adts []byte) { + if aac.SequenceHead == nil { + profile := ((adts[2] & 0xc0) >> 6) + 1 + sampleRate := (adts[2] & 0x3c) >> 2 + channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) + config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) + config2 := ((sampleRate & 0x1) << 7) | (channel << 3) + aac.Media.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2}) + aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) + // aac.ClockRate = aac.SampleRate + aac.Channels = channel + aac.Parse(aac.SequenceHead[2:]) + aac.Attach() + } + + frameLen := (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5]) >> 5) + for len(adts) >= frameLen { + aac.Value.AUList.Push(aac.BytesPool.GetShell(adts[7:frameLen])) + adts = adts[frameLen:] + if len(adts) < 7 { + break + } + frameLen = (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5]) >> 5) + } + aac.Flush() } // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 diff --git a/track/audio.go b/track/audio.go index 32573ed..d426f68 100644 --- a/track/audio.go +++ b/track/audio.go @@ -1,7 +1,9 @@ package track import ( + "go.uber.org/zap" "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -18,7 +20,13 @@ type Audio struct { func (a *Audio) Attach() { if a.Attached.CompareAndSwap(false, true) { - a.Stream.AddTrack(a) + promise := util.NewPromise(common.Track(a)) + a.Stream.AddTrack(promise) + if err := promise.Await(); err != nil { + a.Error("attach audio track failed", zap.Error(err)) + } else { + a.Info("audio track attached", zap.Uint32("sample rate", a.SampleRate)) + } } } @@ -35,15 +43,12 @@ func (a *Audio) GetName() string { return a.Name } -func (av *Audio) WriteADTS(adts []byte) { +func (av *Audio) WriteADTS(pts uint32, adts []byte) { } func (av *Audio) WriteRaw(pts uint32, raw []byte) { curValue := &av.Value curValue.BytesIn += len(raw) - if len(av.AVCCHead) == 2 { - raw = raw[7:] //AAC 去掉7个字节的ADTS头 - } curValue.AUList.Push(av.BytesPool.GetShell(raw)) av.generateTimestamp(pts) av.Flush() diff --git a/track/base.go b/track/base.go index 746edf5..f27069d 100644 --- a/track/base.go +++ b/track/base.go @@ -32,14 +32,14 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { // } // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond { - // println("过快毫秒", 过快) + // println("过快毫秒", p.name, 过快.Milliseconds()) if 过快 > p.等待上限 { time.Sleep(p.等待上限) } else { time.Sleep(过快) } } else if 过快 < -100*time.Millisecond { - // println("过慢毫秒", 过快) + // println("过慢毫秒", p.name, 过快.Milliseconds()) } } diff --git a/track/data.go b/track/data.go index 0de2cf0..59de88e 100644 --- a/track/data.go +++ b/track/data.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.uber.org/zap" + "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -43,3 +45,13 @@ func (d *Data) Play(ctx context.Context, onData func(any) error) error { } return ctx.Err() } + +func (d *Data) Attach() { + promise := util.NewPromise(common.Track(d)) + d.Stream.AddTrack(promise) + if err := promise.Await(); err != nil { + d.Error("attach data track failed", zap.Error(err)) + } else { + d.Info("data track attached") + } +} diff --git a/track/h264.go b/track/h264.go index 43153de..efa5fdb 100644 --- a/track/h264.go +++ b/track/h264.go @@ -22,6 +22,7 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) { vt.SetStuff("h264", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) vt.SetStuff(stuff...) vt.ParamaterSets = make(ParamaterSets, 2) + vt.nalulenSize = 4 vt.dtsEst = NewDTSEstimator() return } diff --git a/track/h265.go b/track/h265.go index c6f1180..fc280ae 100644 --- a/track/h265.go +++ b/track/h265.go @@ -23,6 +23,7 @@ func NewH265(stream IStream, stuff ...any) (vt *H265) { vt.SetStuff("h265", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) vt.SetStuff(stuff...) vt.ParamaterSets = make(ParamaterSets, 3) + vt.nalulenSize = 4 vt.dtsEst = NewDTSEstimator() return } diff --git a/track/video.go b/track/video.go index e7e3579..313e514 100644 --- a/track/video.go +++ b/track/video.go @@ -1,11 +1,12 @@ package track import ( - "bytes" // . "github.com/logrusorgru/aurora" + "go.uber.org/zap" "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -26,7 +27,13 @@ type Video struct { func (v *Video) Attach() { if v.Attached.CompareAndSwap(false, true) { - v.Stream.AddTrack(v) + promise := util.NewPromise(common.Track(v)) + v.Stream.AddTrack(promise) + if err := promise.Await(); err != nil { + v.Error("attach video track failed", zap.Error(err)) + } else { + v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height)) + } } } @@ -67,9 +74,11 @@ func (vt *Video) GetName() string { // return ctx.Err() // } func (vt *Video) computeGOP() { - if vt.HistoryRing == nil && vt.IDRing != nil { + if vt.IDRing != nil { vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence) - vt.narrow(vt.GOP) + if vt.HistoryRing == nil { + vt.narrow(vt.GOP) + } } vt.AddIDR() // var n int @@ -79,14 +88,12 @@ func (vt *Video) computeGOP() { // println(n) } -func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) { - for found, after := true, annexb; len(annexb) > 0 && found; annexb = after { - annexb, after, found = bytes.Cut(annexb, codec.NALU_Delimiter1) - vt.WriteSliceBytes(annexb) - } +func (vt *Video) writeAnnexBSlice(nalu []byte) { + common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1) } -func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { +func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) { + // println("write annexb", len(frame), pts, dts) if dts == 0 { vt.generateTimestamp(pts) } else { @@ -94,10 +101,7 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { vt.Value.DTS = dts } vt.Value.BytesIn += len(frame) - for found, after := true, frame; len(frame) > 0 && found; frame = after { - frame, after, found = bytes.Cut(frame, codec.NALU_Delimiter2) - vt.writeAnnexBSlice(frame) - } + common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2) vt.Flush() } @@ -191,14 +195,23 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { // 写入CTS util.PutBE(b[2:5], vt.RTPTs2Ms(rv.PTS-rv.DTS)) rv.AVCC.Push(mem) + // if rv.AVCC.ByteLength != 5 { + // panic("error") + // } + // var tmp = 0 rv.AUList.Range(func(au *util.BLL) bool { mem = vt.BytesPool.Get(4) + // println(au.ByteLength) util.PutBE(mem.Value, uint32(au.ByteLength)) rv.AVCC.Push(mem) au.Range(func(slice util.Buffer) bool { rv.AVCC.Push(vt.BytesPool.GetShell(slice)) return true }) + // tmp += 4 + au.ByteLength + // if rv.AVCC.ByteLength != 5+tmp { + // panic("error") + // } return true }) }