Compare commits

...

1 Commits

Author SHA1 Message Date
langhuihui
6eb1bc2652 refactor: frame converter 2025-06-20 10:12:24 +08:00
39 changed files with 812 additions and 676 deletions

4
api.go
View File

@@ -97,8 +97,8 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
}
defer reader.StopRead()
var annexb *pkg.AnnexB
var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.AVTrack, nil)
annexb, err = converter.ConvertFromAVFrame(&reader.Value)
var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.ICodecCtx)
annexb, err = converter.Convert(reader.Value.Wraps[0])
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return

View File

@@ -3,7 +3,6 @@ package pkg
import (
"bytes"
"fmt"
"io"
"time"
"github.com/deepch/vdk/codec/aacparser"
@@ -18,32 +17,36 @@ type ADTS struct {
util.RecyclableMemory
}
func (A *ADTS) Parse(track *AVTrack) (err error) {
if track.ICodecCtx == nil {
func (A *ADTS) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
if old == nil {
var ctx = &codec.AACCtx{}
var reader = A.NewReader()
var adts []byte
adts, err = reader.ReadBytes(7)
if err != nil {
return err
return
}
var hdrlen, framelen, samples int
ctx.Config, hdrlen, framelen, samples, err = aacparser.ParseADTSHeader(adts)
if err != nil {
return err
return
}
b := &bytes.Buffer{}
aacparser.WriteMPEG4AudioConfig(b, ctx.Config)
ctx.ConfigBytes = b.Bytes()
track.ICodecCtx = ctx
track.Info("ADTS", "hdrlen", hdrlen, "framelen", framelen, "samples", samples)
new = ctx
if false {
println("ADTS", "hdrlen", hdrlen, "framelen", framelen, "samples", samples, "config", ctx.Config)
}
// track.Info("ADTS", "hdrlen", hdrlen, "framelen", framelen, "samples", samples)
} else {
new = old
}
track.Value.Raw, err = A.Demux(track.ICodecCtx)
return
}
func (A *ADTS) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
return ctx.GetBase(), nil, nil
func (ADTS) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
return ctx.GetBase(), nil
}
func (A *ADTS) Demux(ctx codec.ICodecCtx) (any, error) {
@@ -83,8 +86,3 @@ func (A *ADTS) GetSize() int {
func (A *ADTS) String() string {
return fmt.Sprintf("ADTS{size:%d}", A.Size)
}
func (A *ADTS) Dump(b byte, writer io.Writer) {
//TODO implement me
panic("implement me")
}

View File

@@ -1,6 +1,7 @@
package pkg
import (
"bytes"
"encoding/binary"
"fmt"
"io"
@@ -30,8 +31,8 @@ func (a *AnnexB) Dump(t byte, w io.Writer) {
}
// DecodeConfig implements pkg.IAVFrame.
func (a *AnnexB) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
return ctx.GetBase(), nil, nil
func (a *AnnexB) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
return ctx.GetBase(), nil
}
// GetSize implements pkg.IAVFrame.
@@ -43,27 +44,20 @@ func (a *AnnexB) GetTimestamp() time.Duration {
return a.DTS * time.Millisecond / 90
}
func (a *AnnexB) GetCTS() time.Duration {
return (a.PTS - a.DTS) * time.Millisecond / 90
}
// Parse implements pkg.IAVFrame.
func (a *AnnexB) Parse(t *AVTrack) (err error) {
func (a *AnnexB) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
f.CTS = (a.PTS - a.DTS) * time.Millisecond / 90
if a.Hevc {
if t.ICodecCtx == nil {
t.ICodecCtx = &codec.H265Ctx{}
}
new = &codec.H265Ctx{}
} else {
if t.ICodecCtx == nil {
t.ICodecCtx = &codec.H264Ctx{}
}
new = &codec.H264Ctx{}
}
if t.Value.Raw, err = a.Demux(t.ICodecCtx); err != nil {
if f.Raw, err = a.Demux(old); err != nil {
return
}
for _, nalu := range t.Value.Raw.(Nalus) {
for _, nalu := range f.Raw.(Nalus) {
if a.Hevc {
ctx := t.ICodecCtx.(*codec.H265Ctx)
ctx := new.(*codec.H265Ctx)
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()}
@@ -72,29 +66,38 @@ func (a *AnnexB) Parse(t *AVTrack) (err error) {
case h265parser.NAL_UNIT_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS())
if old != nil && bytes.Equal(ctx.CodecData.Record, old.(*codec.H265Ctx).Record) {
new = old
}
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
t.Value.IDR = true
f.IDR = true
}
} else {
ctx := t.ICodecCtx.(*codec.H264Ctx)
ctx := new.(*codec.H264Ctx)
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case codec.NALU_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if len(ctx.RecordInfo.PPS) > 0 {
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if old != nil && bytes.Equal(ctx.CodecData.Record, old.(*codec.H264Ctx).Record) {
new = old
}
}
case codec.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
if len(ctx.RecordInfo.SPS) > 0 {
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if old != nil && bytes.Equal(ctx.CodecData.Record, old.(*codec.H264Ctx).Record) {
new = old
}
}
case codec.NALU_IDR_Picture:
t.Value.IDR = true
f.IDR = true
}
}
}

View File

@@ -1,7 +1,6 @@
package pkg
import (
"io"
"net"
"sync"
"time"
@@ -29,18 +28,18 @@ type (
IAVFrame interface {
GetAllocator() *util.ScalableMemoryAllocator
SetAllocator(*util.ScalableMemoryAllocator)
Parse(*AVTrack) error // get codec info, idr
ConvertCtx(codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) // convert codec from source stream
Demux(codec.ICodecCtx) (any, error) // demux to raw format
Mux(codec.ICodecCtx, *AVFrame) // mux from raw format
Parse(old codec.ICodecCtx, frame *AVFrame) (new codec.ICodecCtx, err error) // get codec info, idr
ConvertCtx(source codec.ICodecCtx) (target codec.ICodecCtx, err error) // convert codec from source stream
Demux(codec.ICodecCtx) (any, error) // demux to raw format
Mux(codec.ICodecCtx, *AVFrame) // mux from raw format
GetTimestamp() time.Duration
GetCTS() time.Duration
GetSize() int
Recycle()
String() string
Dump(byte, io.Writer)
}
ISequenceCodecCtx[T any] interface {
GetSequenceFrame() T
}
Nalus []util.Memory
AudioData = util.Memory

View File

@@ -4,71 +4,40 @@ import (
"reflect"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
type AVFrameConvert[T IAVFrame] struct {
FromTrack, ToTrack *AVTrack
lastFromCodecCtx codec.ICodecCtx
AVFrame
sourceCodecCtx, targetCodecCtx codec.ICodecCtx
frameType reflect.Type
}
func NewAVFrameConvert[T IAVFrame](fromTrack *AVTrack, toTrack *AVTrack) *AVFrameConvert[T] {
ret := &AVFrameConvert[T]{}
ret.FromTrack = fromTrack
ret.ToTrack = toTrack
if ret.FromTrack == nil {
ret.FromTrack = &AVTrack{
RingWriter: &RingWriter{
Ring: util.NewRing[AVFrame](1),
},
}
}
if ret.ToTrack == nil {
ret.ToTrack = &AVTrack{
RingWriter: &RingWriter{
Ring: util.NewRing[AVFrame](1),
},
}
var to T
ret.ToTrack.FrameType = reflect.TypeOf(to).Elem()
func NewAVFrameConvert[T IAVFrame](sourceCodecCtx codec.ICodecCtx) *AVFrameConvert[T] {
var to T
ret := &AVFrameConvert[T]{
sourceCodecCtx: sourceCodecCtx,
frameType: reflect.TypeOf(to).Elem(),
}
return ret
}
func (c *AVFrameConvert[T]) ConvertFromAVFrame(avFrame *AVFrame) (to T, err error) {
to = reflect.New(c.ToTrack.FrameType).Interface().(T)
if c.ToTrack.ICodecCtx == nil {
if c.ToTrack.ICodecCtx, c.ToTrack.SequenceFrame, err = to.ConvertCtx(c.FromTrack.ICodecCtx); err != nil {
return
}
}
if err = avFrame.Demux(c.FromTrack.ICodecCtx); err != nil {
func (c *AVFrameConvert[T]) Convert(frame IAVFrame) (to T, err error) {
to = reflect.New(c.frameType).Interface().(T)
var newSourceCodecCtx codec.ICodecCtx
newSourceCodecCtx, err = frame.Parse(c.sourceCodecCtx, &c.AVFrame)
if err != nil {
return
}
to.SetAllocator(avFrame.Wraps[0].GetAllocator())
to.Mux(c.ToTrack.ICodecCtx, avFrame)
return
}
func (c *AVFrameConvert[T]) Convert(frame IAVFrame) (to T, err error) {
to = reflect.New(c.ToTrack.FrameType).Interface().(T)
// Not From Publisher
if c.FromTrack.LastValue == nil {
err = frame.Parse(c.FromTrack)
if err != nil {
if c.targetCodecCtx == nil || c.sourceCodecCtx != newSourceCodecCtx {
c.sourceCodecCtx = newSourceCodecCtx
if c.targetCodecCtx, err = to.ConvertCtx(newSourceCodecCtx); err != nil {
return
}
}
if c.ToTrack.ICodecCtx == nil || c.lastFromCodecCtx != c.FromTrack.ICodecCtx {
if c.ToTrack.ICodecCtx, c.ToTrack.SequenceFrame, err = to.ConvertCtx(c.FromTrack.ICodecCtx); err != nil {
return
}
}
c.lastFromCodecCtx = c.FromTrack.ICodecCtx
if c.FromTrack.Value.Raw, err = frame.Demux(c.FromTrack.ICodecCtx); err != nil {
if c.AVFrame.Raw, err = frame.Demux(c.sourceCodecCtx); err != nil {
return
}
to.SetAllocator(frame.GetAllocator())
to.Mux(c.ToTrack.ICodecCtx, &c.FromTrack.Value)
to.Mux(c.targetCodecCtx, &c.AVFrame)
return
}

View File

@@ -27,6 +27,12 @@ type (
}
)
func NewAACCtxFromRecord(record []byte) (ret *AACCtx, err error) {
ret = &AACCtx{}
ret.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(record)
return
}
func (ctx *AudioCtx) GetRecord() []byte {
return []byte{}
}

View File

@@ -112,6 +112,12 @@ type (
}
)
func NewH264CtxFromRecord(record []byte) (ret *H264Ctx, err error) {
ret = &H264Ctx{}
ret.CodecData, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(record)
return
}
func (*H264Ctx) FourCC() FourCC {
return FourCC_H264
}

View File

@@ -24,6 +24,15 @@ type (
}
)
func NewH265CtxFromRecord(record []byte) (ret *H265Ctx, err error) {
ret = &H265Ctx{}
ret.CodecData, err = h265parser.NewCodecDataFromAVCDecoderConfRecord(record)
if err == nil {
ret.RecordInfo.LengthSizeMinusOne = 3
}
return
}
func (ctx *H265Ctx) GetInfo() string {
return fmt.Sprintf("fps: %d, resolution: %s", ctx.FPS(), ctx.Resolution())
}

View File

@@ -20,15 +20,18 @@ type RawAudio struct {
util.RecyclableMemory
}
func (r *RawAudio) Parse(track *AVTrack) (err error) {
if track.ICodecCtx == nil {
func (r *RawAudio) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
if old == nil {
switch r.FourCC {
case codec.FourCC_MP4A:
ctx := &codec.AACCtx{}
ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(r.ToBytes())
track.ICodecCtx = ctx
if err != nil {
return
}
new = ctx
case codec.FourCC_ALAW:
track.ICodecCtx = &codec.PCMACtx{
new = &codec.PCMACtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
@@ -36,7 +39,7 @@ func (r *RawAudio) Parse(track *AVTrack) (err error) {
},
}
case codec.FourCC_ULAW:
track.ICodecCtx = &codec.PCMUCtx{
new = &codec.PCMUCtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
@@ -44,22 +47,14 @@ func (r *RawAudio) Parse(track *AVTrack) (err error) {
},
}
}
} else {
new = old
}
return
}
func (r *RawAudio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
c := ctx.GetBase()
if c.FourCC().Is(codec.FourCC_MP4A) {
seq := &RawAudio{
FourCC: codec.FourCC_MP4A,
Timestamp: r.Timestamp,
}
seq.SetAllocator(r.GetAllocator())
seq.Memory.Append(c.GetRecord())
return c, seq, nil
}
return c, nil, nil
func (RawAudio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
return ctx.GetBase(), nil
}
func (r *RawAudio) Demux(ctx codec.ICodecCtx) (any, error) {
@@ -77,10 +72,6 @@ func (r *RawAudio) GetTimestamp() time.Duration {
return r.Timestamp
}
func (r *RawAudio) GetCTS() time.Duration {
return 0
}
func (r *RawAudio) GetSize() int {
return r.Size
}
@@ -104,99 +95,134 @@ type H26xFrame struct {
util.RecyclableMemory
}
func (h *H26xFrame) Parse(track *AVTrack) (err error) {
func (h *H26xFrame) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
f.CTS = h.CTS
var hasVideoFrame bool
switch h.FourCC {
case codec.FourCC_H264:
var ctx *codec.H264Ctx
if track.ICodecCtx != nil {
ctx = track.ICodecCtx.GetBase().(*codec.H264Ctx)
}
for _, nalu := range h.Nalus {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case h264parser.NALU_SPS:
ctx = &codec.H264Ctx{}
track.ICodecCtx = ctx
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h264parser.ParseSPS(ctx.SPS()); err != nil {
return
new = old
// First determine the codec type from existing context or FourCC
if old != nil {
switch base := old.GetBase().(type) {
case *codec.H264Ctx:
ctx := base
for _, nalu := range h.Nalus {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case h264parser.NALU_SPS:
ctx = &codec.H264Ctx{}
new = ctx
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h264parser.ParseSPS(ctx.SPS()); err != nil {
return
}
case h264parser.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if err != nil {
return
}
case codec.NALU_IDR_Picture:
f.IDR = true
hasVideoFrame = true
case codec.NALU_Non_IDR_Picture:
hasVideoFrame = true
}
case h264parser.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if err != nil {
return
}
case *codec.H265Ctx:
ctx := base
for _, nalu := range h.Nalus {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
ctx = &codec.H265Ctx{}
ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()}
new = ctx
case h265parser.NAL_UNIT_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h265parser.ParseSPS(ctx.SPS()); err != nil {
return
}
case h265parser.NAL_UNIT_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS())
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
f.IDR = true
hasVideoFrame = true
case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9:
hasVideoFrame = true
}
case codec.NALU_IDR_Picture:
track.Value.IDR = true
hasVideoFrame = true
case codec.NALU_Non_IDR_Picture:
hasVideoFrame = true
}
}
case codec.FourCC_H265:
var ctx *codec.H265Ctx
if track.ICodecCtx != nil {
ctx = track.ICodecCtx.GetBase().(*codec.H265Ctx)
}
for _, nalu := range h.Nalus {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
ctx = &codec.H265Ctx{}
ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()}
track.ICodecCtx = ctx
case h265parser.NAL_UNIT_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h265parser.ParseSPS(ctx.SPS()); err != nil {
return
} else {
// Fallback to FourCC when no old context is available
switch h.FourCC {
case codec.FourCC_H264:
var ctx *codec.H264Ctx
for _, nalu := range h.Nalus {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case h264parser.NALU_SPS:
ctx = &codec.H264Ctx{}
new = ctx
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h264parser.ParseSPS(ctx.SPS()); err != nil {
return
}
case h264parser.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if err != nil {
return
}
case codec.NALU_IDR_Picture:
f.IDR = true
hasVideoFrame = true
case codec.NALU_Non_IDR_Picture:
hasVideoFrame = true
}
}
case codec.FourCC_H265:
var ctx *codec.H265Ctx
for _, nalu := range h.Nalus {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
ctx = &codec.H265Ctx{}
ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()}
new = ctx
case h265parser.NAL_UNIT_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h265parser.ParseSPS(ctx.SPS()); err != nil {
return
}
case h265parser.NAL_UNIT_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS())
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
f.IDR = true
hasVideoFrame = true
case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9:
hasVideoFrame = true
}
case h265parser.NAL_UNIT_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS())
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
track.Value.IDR = true
hasVideoFrame = true
case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9:
hasVideoFrame = true
}
}
}
// Return ErrSkip if no video frames are present (only metadata NALUs)
if !hasVideoFrame {
return ErrSkip
return nil, ErrSkip
}
return
}
func (h *H26xFrame) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
switch c := ctx.GetBase().(type) {
case *codec.H264Ctx:
return c, &H26xFrame{
FourCC: codec.FourCC_H264,
Nalus: []util.Memory{
util.NewMemory(c.SPS()),
util.NewMemory(c.PPS()),
},
}, nil
case *codec.H265Ctx:
return c, &H26xFrame{
FourCC: codec.FourCC_H265,
Nalus: []util.Memory{
util.NewMemory(c.VPS()),
util.NewMemory(c.SPS()),
util.NewMemory(c.PPS()),
},
}, nil
}
return ctx.GetBase(), nil, nil
func (H26xFrame) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
return ctx.GetBase(), nil
}
func (h *H26xFrame) Demux(ctx codec.ICodecCtx) (any, error) {
@@ -229,8 +255,3 @@ func (h *H26xFrame) GetSize() int {
func (h *H26xFrame) String() string {
return fmt.Sprintf("H26xFrame{FourCC: %s, Timestamp: %s, CTS: %s}", h.FourCC, h.Timestamp, h.CTS)
}
func (h *H26xFrame) Dump(b byte, writer io.Writer) {
//TODO implement me
panic("implement me")
}

View File

@@ -16,12 +16,12 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x65}), // IDR Picture NALU type
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err == ErrSkip {
t.Error("Expected H264 IDR frame to not be skipped, but got ErrSkip")
}
if !track.Value.IDR {
if !track.IDR {
t.Error("Expected IDR flag to be set for H264 IDR frame")
}
})
@@ -34,8 +34,8 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x21}), // Non-IDR Picture NALU type
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err == ErrSkip {
t.Error("Expected H264 Non-IDR frame to not be skipped, but got ErrSkip")
}
@@ -49,8 +49,8 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x67}), // SPS NALU type
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err != ErrSkip {
t.Errorf("Expected H264 SPS-only frame to be skipped, but got: %v", err)
}
@@ -64,8 +64,8 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x68}), // PPS NALU type
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err != ErrSkip {
t.Errorf("Expected H264 PPS-only frame to be skipped, but got: %v", err)
}
@@ -80,7 +80,7 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
// Using NAL_UNIT_CODED_SLICE_IDR_W_RADL which should be type 19
},
}
track := &AVTrack{}
track := &AVFrame{}
// Let's use the correct byte pattern for H265 IDR slice
// NAL_UNIT_CODED_SLICE_IDR_W_RADL = 19
@@ -88,11 +88,11 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
idrSliceByte := byte(19 << 1) // 19 * 2 = 38 = 0x26
frame.Nalus[0] = util.NewMemory([]byte{idrSliceByte})
err := frame.Parse(track)
_, err := frame.Parse(nil, track)
if err == ErrSkip {
t.Error("Expected H265 IDR slice to not be skipped, but got ErrSkip")
}
if !track.Value.IDR {
if !track.IDR {
t.Error("Expected IDR flag to be set for H265 IDR slice")
}
})
@@ -105,8 +105,8 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x40, 0x01}), // VPS NALU type (32 << 1 = 64 = 0x40)
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err != ErrSkip {
t.Errorf("Expected H265 VPS-only frame to be skipped, but got: %v", err)
}
@@ -121,12 +121,12 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x65}), // IDR Picture NALU type
},
}
track := &AVTrack{}
err := frame.Parse(track)
track := &AVFrame{}
_, err := frame.Parse(nil, track)
if err == ErrSkip {
t.Error("Expected H264 mixed SPS+IDR frame to not be skipped, but got ErrSkip")
}
if !track.Value.IDR {
if !track.IDR {
t.Error("Expected IDR flag to be set for H264 mixed frame with IDR")
}
})
@@ -140,17 +140,17 @@ func TestH26xFrame_Parse_VideoFrameDetection(t *testing.T) {
util.NewMemory([]byte{0x4C, 0x01}), // IDR_W_RADL slice type (19 << 1)
},
}
track := &AVTrack{}
track := &AVFrame{}
// Fix the IDR slice byte for H265
idrSliceByte := byte(19 << 1) // NAL_UNIT_CODED_SLICE_IDR_W_RADL = 19
frame.Nalus[1] = util.NewMemory([]byte{idrSliceByte, 0x01})
err := frame.Parse(track)
_, err := frame.Parse(nil, track)
if err == ErrSkip {
t.Error("Expected H265 mixed VPS+IDR frame to not be skipped, but got ErrSkip")
}
if !track.Value.IDR {
if !track.IDR {
t.Error("Expected IDR flag to be set for H265 mixed frame with IDR")
}
})

View File

@@ -56,9 +56,8 @@ type (
Track
*RingWriter
codec.ICodecCtx
Allocator *util.ScalableMemoryAllocator
SequenceFrame IAVFrame
WrapIndex int
Allocator *util.ScalableMemoryAllocator
WrapIndex int
TsTamer
SpeedController
DropController
@@ -244,7 +243,7 @@ func (s *SpeedController) speedControl(speed float64, ts time.Duration) {
}
should := time.Duration(float64(ts-s.beginTimestamp) / speed)
s.Delta = should - elapsed
// fmt.Println(speed, elapsed, should, s.Delta)
// fmt.Println("speed control", "speed", speed, "elapsed", elapsed, "should", should, "delta", s.Delta)
if s.Delta > threshold {
time.Sleep(min(s.Delta, time.Millisecond*500))
}

View File

@@ -47,6 +47,18 @@ func (m *Memory) CopyFrom(b *Memory) {
m.AppendOne(buf)
}
func (m *Memory) Equal(b *Memory) bool {
if m.Size != b.Size || len(m.Buffers) != len(b.Buffers) {
return false
}
for i, buf := range m.Buffers {
if !slices.Equal(buf, b.Buffers[i]) {
return false
}
}
return true
}
func (m *Memory) CopyTo(buf []byte) {
for _, b := range m.Buffers {
l := len(b)

View File

@@ -197,7 +197,7 @@ func (plugin *FLVPlugin) processMp4ToFlv(w http.ResponseWriter, r *http.Request,
}
// 创建DemuxerConverterRange进行MP4解复用和转换
demuxer := &mp4.DemuxerConverterRange[*rtmp.RTMPAudio, *rtmp.RTMPVideo]{
demuxer := &mp4.DemuxerConverterRange[*rtmp.Audio, *rtmp.Video]{
DemuxerRange: mp4.DemuxerRange{
StartTime: params.startTime,
EndTime: params.endTime,
@@ -213,9 +213,9 @@ func (plugin *FLVPlugin) processMp4ToFlv(w http.ResponseWriter, r *http.Request,
tsOffset := int64(0) // 偏移时间戳
// 执行解复用和转换
err := demuxer.Demux(r.Context(),
func(audio *rtmp.RTMPAudio) error {
func(audio *rtmp.Audio) error {
if !hasWritten {
if err := flvWriter.WriteHeader(demuxer.AudioTrack != nil, demuxer.VideoTrack != nil); err != nil {
if err := flvWriter.WriteHeader(demuxer.AudioCodec != nil, demuxer.VideoCodec != nil); err != nil {
return err
}
}
@@ -226,9 +226,9 @@ func (plugin *FLVPlugin) processMp4ToFlv(w http.ResponseWriter, r *http.Request,
// 写入音频数据帧
return flvWriter.WriteTag(flv.FLV_TAG_TYPE_AUDIO, timestamp, uint32(audio.Size), audio.Buffers...)
}, func(frame *rtmp.RTMPVideo) error {
}, func(frame *rtmp.Video) error {
if !hasWritten {
if err := flvWriter.WriteHeader(demuxer.AudioTrack != nil, demuxer.VideoTrack != nil); err != nil {
if err := flvWriter.WriteHeader(demuxer.AudioCodec != nil, demuxer.VideoCodec != nil); err != nil {
return err
}
}

View File

@@ -60,11 +60,11 @@ func (task *Live) rtmpData2FlvTag(t byte, data *rtmp.RTMPData, ts uint32) error
return task.WriteFlvTag(append(net.Buffers{task.b[:]}, data.Memory.Buffers...))
}
func (task *Live) WriteAudioTag(data *rtmp.RTMPAudio, ts uint32) error {
func (task *Live) WriteAudioTag(data *rtmp.Audio, ts uint32) error {
return task.rtmpData2FlvTag(FLV_TAG_TYPE_AUDIO, &data.RTMPData, ts)
}
func (task *Live) WriteVideoTag(data *rtmp.RTMPVideo, ts uint32) error {
func (task *Live) WriteVideoTag(data *rtmp.Video, ts uint32) error {
return task.rtmpData2FlvTag(FLV_TAG_TYPE_VIDEO, &data.RTMPData, ts)
}
@@ -73,9 +73,9 @@ func (task *Live) Run() (err error) {
if err != nil {
return
}
err = m7s.PlayBlock(task.Subscriber, func(audio *rtmp.RTMPAudio) error {
err = m7s.PlayBlock(task.Subscriber, func(audio *rtmp.Audio) error {
return task.WriteAudioTag(audio, task.Subscriber.AudioReader.AbsTime)
}, func(video *rtmp.RTMPVideo) error {
}, func(video *rtmp.Video) error {
return task.WriteVideoTag(video, task.Subscriber.VideoReader.AbsTime)
})
if err != nil {

View File

@@ -252,14 +252,14 @@ func (r *Recorder) Run() (err error) {
writer = NewFlvWriter(file)
if vr := suber.VideoReader; vr != nil {
vr.ResetAbsTime()
seq := vr.Track.SequenceFrame.(*rtmp.RTMPVideo)
seq := vr.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.Video]).GetSequenceFrame()
err = writer.WriteTag(FLV_TAG_TYPE_VIDEO, 0, uint32(seq.Size), seq.Buffers...)
offset = int64(seq.Size + 15)
}
if ar := suber.AudioReader; ar != nil {
ar.ResetAbsTime()
if ar.Track.SequenceFrame != nil {
seq := ar.Track.SequenceFrame.(*rtmp.RTMPAudio)
if seqCtx, ok := ar.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.Audio]); ok {
seq := seqCtx.GetSequenceFrame()
err = writer.WriteTag(FLV_TAG_TYPE_AUDIO, 0, uint32(seq.Size), seq.Buffers...)
offset += int64(seq.Size + 15)
}
@@ -267,14 +267,14 @@ func (r *Recorder) Run() (err error) {
}
}
return m7s.PlayBlock(ctx.Subscriber, func(audio *rtmp.RTMPAudio) (err error) {
return m7s.PlayBlock(ctx.Subscriber, func(audio *rtmp.Audio) (err error) {
if suber.VideoReader == nil && !noFragment {
checkFragment(suber.AudioReader.AbsTime)
}
err = writer.WriteTag(FLV_TAG_TYPE_AUDIO, suber.AudioReader.AbsTime, uint32(audio.Size), audio.Buffers...)
offset += int64(audio.Size + 15)
return
}, func(video *rtmp.RTMPVideo) (err error) {
}, func(video *rtmp.Video) (err error) {
if suber.VideoReader.Value.IDR {
filepositions = append(filepositions, uint64(offset))
times = append(times, float64(suber.VideoReader.AbsTime)/1000)

View File

@@ -215,11 +215,11 @@ func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request,
writePMTHeader := func() {
if !hasWritten {
var audio, video codec.FourCC
if demuxer.AudioTrack != nil && demuxer.AudioTrack.ICodecCtx != nil {
audio = demuxer.AudioTrack.ICodecCtx.FourCC()
if demuxer.AudioCodec != nil {
audio = demuxer.AudioCodec.FourCC()
}
if demuxer.VideoTrack != nil && demuxer.VideoTrack.ICodecCtx != nil {
video = demuxer.VideoTrack.ICodecCtx.FourCC()
if demuxer.VideoCodec != nil {
video = demuxer.VideoCodec.FourCC()
}
tsWriter.WritePMTPacket(audio, video)
hasWritten = true
@@ -241,7 +241,7 @@ func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request,
return tsWriter.WriteAudioFrame(audio, &audioFrame)
}, func(video *pkg.AnnexB) error {
writePMTHeader()
videoFrame.IsKeyFrame = demuxer.VideoTrack.Value.IDR
videoFrame.IsKeyFrame = demuxer.VideoConverter.IDR
// 写入视频帧
return tsWriter.WriteVideoFrame(video, &videoFrame)
})

146
plugin/hls/pkg/audio.go Normal file
View File

@@ -0,0 +1,146 @@
package hls
import (
"fmt"
"io"
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
)
var _ pkg.IAVFrame = (*Audio)(nil)
type Audio struct {
mpegts.MpegTsPESPacket
PES *mpegts.MpegtsPESFrame
allocator *util.ScalableMemoryAllocator
}
// GetAllocator implements pkg.IAVFrame.
func (a *Audio) GetAllocator() *util.ScalableMemoryAllocator {
return a.allocator
}
// SetAllocator implements pkg.IAVFrame.
func (a *Audio) SetAllocator(allocator *util.ScalableMemoryAllocator) {
a.allocator = allocator
}
// Parse implements pkg.IAVFrame.
func (Audio) Parse(old codec.ICodecCtx, f *pkg.AVFrame) (new codec.ICodecCtx, err error) {
return old, nil
}
// ConvertCtx implements pkg.IAVFrame.
func (Audio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
return ctx.GetBase(), nil
}
// Demux implements pkg.IAVFrame.
func (a *Audio) Demux(codecCtx codec.ICodecCtx) (any, error) {
// 从 PES 包中提取音频数据
if a.Payload.Len() > 0 {
// 返回音频数据作为 AudioData (util.Memory)
data := make([]byte, a.Payload.Len())
copy(data, a.Payload.Bytes())
return util.Memory{Buffers: [][]byte{data}, Size: len(data)}, nil
}
return nil, pkg.ErrNotFound
}
// Mux implements pkg.IAVFrame.
func (a *Audio) Mux(codecCtx codec.ICodecCtx, frame *pkg.AVFrame) {
// 从 AVFrame 复制数据到 HLS Audio
if frame.Raw != nil {
switch rawData := frame.Raw.(type) {
case util.Memory:
a.Payload.Reset()
a.Payload.Write(rawData.ToBytes())
case []byte:
a.Payload.Reset()
a.Payload.Write(rawData)
default:
a.Payload.Reset()
}
} else {
a.Payload.Reset()
}
// 设置时间戳
if frame.Timestamp > 0 {
// 转换为 90kHz 时间戳
a.Header.Pts = uint64(frame.Timestamp.Nanoseconds() * 90 / 1000000000)
a.Header.PtsDtsFlags = 0x80 // 只有 PTS
if frame.CTS > 0 {
a.Header.Dts = uint64((frame.Timestamp - frame.CTS).Nanoseconds() * 90 / 1000000000)
a.Header.PtsDtsFlags = 0xC0 // PTS 和 DTS 都存在
}
}
}
// GetTimestamp implements pkg.IAVFrame.
func (a *Audio) GetTimestamp() time.Duration {
if a.Header.PtsDtsFlags&0x80 != 0 { // PTS 存在
// 从 90kHz 转换回 time.Duration
return time.Duration(a.Header.Pts) * time.Microsecond * 1000 / 90
}
return 0
}
// GetCTS implements pkg.IAVFrame.
func (a *Audio) GetCTS() time.Duration {
if a.Header.PtsDtsFlags&0xC0 == 0xC0 { // PTS 和 DTS 都存在
pts := time.Duration(a.Header.Pts) * time.Microsecond * 1000 / 90
dts := time.Duration(a.Header.Dts) * time.Microsecond * 1000 / 90
return pts - dts
}
return 0
}
// GetSize implements pkg.IAVFrame.
func (a *Audio) GetSize() int {
return a.Payload.Len()
}
// Recycle implements pkg.IAVFrame.
func (a *Audio) Recycle() {
// 回收资源
if a.allocator != nil {
// 如果数据是通过分配器分配的,这里可以进行回收
}
// 重置数据
a.Payload.Reset()
a.Buffers = nil
// 重置 Header
a.Header = mpegts.MpegTsPESHeader{}
// 重置 PES 信息
if a.PES != nil {
*a.PES = mpegts.MpegtsPESFrame{}
}
}
// String implements pkg.IAVFrame.
func (a *Audio) String() string {
return fmt.Sprintf("HLSAudio[pts:%d, size:%d, pid:%d]",
a.Header.Pts, a.Payload.Len(), func() uint16 {
if a.PES != nil {
return a.PES.Pid
}
return 0
}())
}
// Dump implements pkg.IAVFrame.
func (a *Audio) Dump(t byte, w io.Writer) {
// 输出音频数据到 writer
if a.Payload.Len() > 0 {
w.Write(a.Payload.Bytes())
}
}

View File

@@ -256,7 +256,7 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ctx.ws {
ctx.Flush()
}
m7s.PlayBlock(sub, func(frame *rtmp.RTMPAudio) (err error) {
m7s.PlayBlock(sub, func(frame *rtmp.Audio) (err error) {
bs := frame.Memory.ToBytes()
if offsetAudio == 2 && bs[1] == 0 {
return nil
@@ -277,7 +277,7 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
audio.Samplelist[0].Data = bs[offsetAudio:]
audio.Samplelist[0].Size = len(audio.Samplelist[0].Data)
return
}, func(frame *rtmp.RTMPVideo) (err error) {
}, func(frame *rtmp.Video) (err error) {
bs := frame.Memory.ToBytes()
if ctx, ok := sub.VideoReader.Track.ICodecCtx.(*rtmp.H265Ctx); ok && ctx.Enhanced {
switch bs[0] & 0b1111 {

View File

@@ -29,14 +29,14 @@ func (a *Audio) SetAllocator(allocator *util.ScalableMemoryAllocator) {
}
// Parse implements pkg.IAVFrame.
func (a *Audio) Parse(t *pkg.AVTrack) error {
return nil
func (a *Audio) Parse(old codec.ICodecCtx, f *pkg.AVFrame) (new codec.ICodecCtx, err error) {
return old, nil
}
// ConvertCtx implements pkg.IAVFrame.
func (a *Audio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, pkg.IAVFrame, error) {
func (a *Audio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
// 返回基础编解码器上下文,不进行转换
return ctx.GetBase(), nil, nil
return ctx.GetBase(), nil
}
// Demux implements pkg.IAVFrame.
@@ -65,8 +65,6 @@ func (a *Audio) Demux(codecCtx codec.ICodecCtx) (any, error) {
// Mux implements pkg.IAVFrame.
func (a *Audio) Mux(codecCtx codec.ICodecCtx, frame *pkg.AVFrame) {
// 从 AVFrame 复制数据到 MP4 Sample
a.KeyFrame = false // 音频帧通常不是关键帧
a.Timestamp = uint32(frame.Timestamp.Milliseconds())
a.CTS = uint32(frame.CTS.Milliseconds())
@@ -98,11 +96,6 @@ func (a *Audio) GetTimestamp() time.Duration {
return time.Duration(a.Timestamp) * time.Millisecond
}
// GetCTS implements pkg.IAVFrame.
func (a *Audio) GetCTS() time.Duration {
return time.Duration(a.CTS) * time.Millisecond
}
// GetSize implements pkg.IAVFrame.
func (a *Audio) GetSize() int {
return a.Size

View File

@@ -20,7 +20,7 @@ type DemuxerRange struct {
*slog.Logger
StartTime, EndTime time.Time
Streams []m7s.RecordStream
AudioTrack, VideoTrack *pkg.AVTrack
AudioCodec, VideoCodec codec.ICodecCtx
}
func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, onVideo func(*Video) error) error {
@@ -52,79 +52,40 @@ func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, on
var h264Ctx codec.H264Ctx
h264Ctx.CodecData, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData)
if err == nil {
if d.VideoTrack == nil {
d.VideoTrack = &pkg.AVTrack{
ICodecCtx: &h264Ctx,
RingWriter: &pkg.RingWriter{
Ring: util.NewRing[pkg.AVFrame](1),
}}
d.VideoTrack.Logger = d.With("track", "video")
} else {
// 如果已经有视频轨道,使用现有的轨道
d.VideoTrack.ICodecCtx = &h264Ctx
}
d.AudioCodec = &h264Ctx
}
case box.MP4_CODEC_H265:
var h265Ctx codec.H265Ctx
h265Ctx.CodecData, err = h265parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData)
if err == nil {
if d.VideoTrack == nil {
d.VideoTrack = &pkg.AVTrack{
ICodecCtx: &h265Ctx,
RingWriter: &pkg.RingWriter{
Ring: util.NewRing[pkg.AVFrame](1),
}}
d.VideoTrack.Logger = d.With("track", "video")
} else {
// 如果已经有视频轨道,使用现有的轨道
d.VideoTrack.ICodecCtx = &h265Ctx
}
if err != nil {
return err
}
d.VideoCodec = &h265Ctx
case box.MP4_CODEC_AAC:
var aacCtx codec.AACCtx
aacCtx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(track.ExtraData)
if err == nil {
if d.AudioTrack == nil {
d.AudioTrack = &pkg.AVTrack{
ICodecCtx: &aacCtx,
RingWriter: &pkg.RingWriter{
Ring: util.NewRing[pkg.AVFrame](1),
}}
d.AudioTrack.Logger = d.With("track", "audio")
} else {
// 如果已经有音频轨道,使用现有的轨道
d.AudioTrack.ICodecCtx = &aacCtx
}
d.AudioCodec = &aacCtx
}
case box.MP4_CODEC_G711A:
if d.AudioTrack == nil {
d.AudioTrack = &pkg.AVTrack{
ICodecCtx: &codec.PCMACtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
SampleSize: 16,
},
if d.AudioCodec == nil {
d.AudioCodec = &codec.PCMACtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
SampleSize: 16,
},
RingWriter: &pkg.RingWriter{
Ring: util.NewRing[pkg.AVFrame](1),
}}
d.AudioTrack.Logger = d.With("track", "audio")
}
}
case box.MP4_CODEC_G711U:
if d.AudioTrack == nil {
d.AudioTrack = &pkg.AVTrack{
ICodecCtx: &codec.PCMUCtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
SampleSize: 16,
},
if d.AudioCodec == nil {
d.AudioCodec = &codec.PCMUCtx{
AudioCtx: codec.AudioCtx{
SampleRate: 8000,
Channels: 1,
SampleSize: 16,
},
RingWriter: &pkg.RingWriter{
Ring: util.NewRing[pkg.AVFrame](1),
}}
d.AudioTrack.Logger = d.With("track", "audio")
}
}
}
}
@@ -192,25 +153,25 @@ func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, on
type DemuxerConverterRange[TA pkg.IAVFrame, TV pkg.IAVFrame] struct {
DemuxerRange
audioConverter *pkg.AVFrameConvert[TA]
videoConverter *pkg.AVFrameConvert[TV]
AudioConverter *pkg.AVFrameConvert[TA]
VideoConverter *pkg.AVFrameConvert[TV]
}
func (d *DemuxerConverterRange[TA, TV]) Demux(ctx context.Context, onAudio func(TA) error, onVideo func(TV) error) error {
d.DemuxerRange.Demux(ctx, func(audio *Audio) error {
if d.audioConverter == nil {
d.audioConverter = pkg.NewAVFrameConvert[TA](d.AudioTrack, nil)
if d.AudioConverter == nil {
d.AudioConverter = pkg.NewAVFrameConvert[TA](d.AudioCodec)
}
target, err := d.audioConverter.Convert(audio)
target, err := d.AudioConverter.Convert(audio)
if err == nil {
err = onAudio(target)
}
return err
}, func(video *Video) error {
if d.videoConverter == nil {
d.videoConverter = pkg.NewAVFrameConvert[TV](d.VideoTrack, nil)
if d.VideoConverter == nil {
d.VideoConverter = pkg.NewAVFrameConvert[TV](d.VideoCodec)
}
target, err := d.videoConverter.Convert(video)
target, err := d.VideoConverter.Convert(video)
if err == nil {
err = onVideo(target)
}

View File

@@ -6,9 +6,6 @@ import (
"strings"
"time"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
@@ -53,24 +50,24 @@ func (p *HTTPReader) Run() (err error) {
for _, track := range demuxer.Tracks {
switch track.Cid {
case box.MP4_CODEC_H264:
var h264Ctx codec.H264Ctx
h264Ctx.CodecData, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData)
var h264Ctx *codec.H264Ctx
h264Ctx, err = codec.NewH264CtxFromRecord(track.ExtraData)
if err == nil {
publisher.SetCodecCtx(&h264Ctx, &Video{})
publisher.SetCodecCtx(h264Ctx, &Video{})
}
case box.MP4_CODEC_H265:
var h265Ctx codec.H265Ctx
h265Ctx.CodecData, err = h265parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData)
var h265Ctx *codec.H265Ctx
h265Ctx, err = codec.NewH265CtxFromRecord(track.ExtraData)
if err == nil {
publisher.SetCodecCtx(&h265Ctx, &Video{
publisher.SetCodecCtx(h265Ctx, &Video{
allocator: allocator,
})
}
case box.MP4_CODEC_AAC:
var aacCtx codec.AACCtx
aacCtx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(track.ExtraData)
var aacCtx *codec.AACCtx
aacCtx, err = codec.NewAACCtxFromRecord(track.ExtraData)
if err == nil {
publisher.SetCodecCtx(&aacCtx, &Audio{
publisher.SetCodecCtx(aacCtx, &Audio{
allocator: allocator,
})
}

View File

@@ -67,7 +67,7 @@ func (p *RecordReader) Run() (err error) {
}
if err = demuxerRange.Demux(p.Context, func(a *Audio) error {
if !publisher.HasAudioTrack() {
publisher.SetCodecCtx(demuxerRange.AudioTrack.ICodecCtx, a)
publisher.SetCodecCtx(demuxerRange.AudioCodec, a)
}
if publisher.Paused != nil {
publisher.Paused.Await()
@@ -90,7 +90,7 @@ func (p *RecordReader) Run() (err error) {
return publisher.WriteAudio(a)
}, func(v *Video) error {
if !publisher.HasVideoTrack() {
publisher.SetCodecCtx(demuxerRange.VideoTrack.ICodecCtx, v)
publisher.SetCodecCtx(demuxerRange.VideoCodec, v)
}
if publisher.Paused != nil {
publisher.Paused.Await()

View File

@@ -2,7 +2,6 @@ package mp4
import (
"fmt"
"io"
"slices"
"time"
@@ -30,15 +29,16 @@ func (v *Video) SetAllocator(allocator *util.ScalableMemoryAllocator) {
}
// Parse implements pkg.IAVFrame.
func (v *Video) Parse(t *pkg.AVTrack) error {
t.Value.IDR = v.KeyFrame
return nil
func (v *Video) Parse(old codec.ICodecCtx, f *pkg.AVFrame) (new codec.ICodecCtx, err error) {
f.IDR = v.KeyFrame
f.CTS = time.Duration(v.CTS) * time.Millisecond
return old, nil
}
// ConvertCtx implements pkg.IAVFrame.
func (v *Video) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, pkg.IAVFrame, error) {
func (v *Video) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, error) {
// 返回基础编解码器上下文,不进行转换
return ctx.GetBase(), nil, nil
return ctx.GetBase(), nil
}
// Demux implements pkg.IAVFrame.
@@ -160,11 +160,3 @@ func (v *Video) String() string {
return fmt.Sprintf("MP4Video[ts:%d, cts:%d, size:%d, keyframe:%t]",
v.Timestamp, v.CTS, v.Size, v.KeyFrame)
}
// Dump implements pkg.IAVFrame.
func (v *Video) Dump(t byte, w io.Writer) {
// 输出数据到 writer
if v.Data != nil {
w.Write(v.Data)
}
}

View File

@@ -6,8 +6,6 @@ import (
"log"
"os/exec"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
mp4 "m7s.live/v5/plugin/mp4/pkg"
@@ -50,24 +48,24 @@ func ProcessWithFFmpeg(samples []box.Sample, index int, videoTrack *mp4.Track, o
go func() {
defer stdin.Close()
convert := pkg.NewAVFrameConvert[*pkg.AnnexB](nil, nil)
var convert *pkg.AVFrameConvert[*pkg.AnnexB]
switch videoTrack.Cid {
case box.MP4_CODEC_H264:
var h264Ctx codec.H264Ctx
h264Ctx.CodecData, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(videoTrack.ExtraData)
var h264Ctx *codec.H264Ctx
h264Ctx, err = codec.NewH264CtxFromRecord(videoTrack.ExtraData)
if err != nil {
log.Printf("解析H264失败: %v", err)
return
}
convert.FromTrack.ICodecCtx = &h264Ctx
convert = pkg.NewAVFrameConvert[*pkg.AnnexB](h264Ctx)
case box.MP4_CODEC_H265:
var h265Ctx codec.H265Ctx
h265Ctx.CodecData, err = h265parser.NewCodecDataFromAVCDecoderConfRecord(videoTrack.ExtraData)
var h265Ctx *codec.H265Ctx
h265Ctx, err = codec.NewH265CtxFromRecord(videoTrack.ExtraData)
if err != nil {
log.Printf("解析H265失败: %v", err)
return
}
convert.FromTrack.ICodecCtx = &h265Ctx
convert = pkg.NewAVFrameConvert[*pkg.AnnexB](h265Ctx)
default:
log.Printf("不支持的编解码器: %v", videoTrack.Cid)
return

View File

@@ -9,11 +9,13 @@ import (
"m7s.live/v5/pkg/util"
)
type RTMPAudio struct {
var _ IAVFrame = (*Audio)(nil)
type Audio struct {
RTMPData
}
func (avcc *RTMPAudio) Parse(t *AVTrack) (err error) {
func (avcc *Audio) Parse(old codec.ICodecCtx, f *AVFrame) (ctx codec.ICodecCtx, err error) {
reader := avcc.NewReader()
var b byte
b, err = reader.ReadByte()
@@ -22,20 +24,24 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (err error) {
}
switch b & 0b1111_0000 >> 4 {
case 7:
if t.ICodecCtx == nil {
var ctx codec.PCMACtx
ctx.SampleRate = 8000
ctx.Channels = 1
ctx.SampleSize = 8
t.ICodecCtx = &ctx
if old == nil {
var pcma codec.PCMACtx
pcma.SampleRate = 8000
pcma.Channels = 1
pcma.SampleSize = 8
ctx = &pcma
} else {
ctx = old
}
case 8:
if t.ICodecCtx == nil {
if old == nil {
var ctx codec.PCMUCtx
ctx.SampleRate = 8000
ctx.Channels = 1
ctx.SampleSize = 8
t.ICodecCtx = &ctx
old = &ctx
} else {
ctx = old
}
case 10:
b, err = reader.ReadByte()
@@ -43,29 +49,38 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (err error) {
return
}
if b == 0 {
var ctx codec.AACCtx
var cloneFrame RTMPAudio
cloneFrame.CopyFrom(&avcc.Memory)
ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(cloneFrame.Buffers[0][2:])
t.SequenceFrame = &cloneFrame
t.ICodecCtx = &ctx
if old == nil || avcc.Memory.Equal(&old.(*AACCtx).SequenceFrame.Memory) {
var c AACCtx
c.AACCtx = &codec.AACCtx{}
c.SequenceFrame.CopyFrom(&avcc.Memory)
c.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(c.SequenceFrame.Buffers[0][2:])
ctx = &c
} else {
ctx = old
err = ErrSkip
}
} else {
ctx = old
}
}
return
}
func (avcc *RTMPAudio) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVFrame, err error) {
to = from.GetBase()
switch v := to.(type) {
func (avcc *Audio) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, err error) {
switch v := from.GetBase().(type) {
case *codec.AACCtx:
var seqFrame RTMPAudio
seqFrame.AppendOne(append([]byte{0xAF, 0x00}, v.ConfigBytes...))
seq = &seqFrame
ctx := &AACCtx{
AACCtx: v,
}
ctx.SequenceFrame.AppendOne(append([]byte{0xAF, 0x00}, v.ConfigBytes...))
to = ctx
default:
to = v
}
return
}
func (avcc *RTMPAudio) Demux(codecCtx codec.ICodecCtx) (raw any, err error) {
func (avcc *Audio) Demux(codecCtx codec.ICodecCtx) (raw any, err error) {
reader := avcc.NewReader()
var result util.Memory
if _, ok := codecCtx.(*codec.AACCtx); ok {
@@ -79,7 +94,7 @@ func (avcc *RTMPAudio) Demux(codecCtx codec.ICodecCtx) (raw any, err error) {
}
}
func (avcc *RTMPAudio) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
func (avcc *Audio) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
avcc.Timestamp = uint32(from.Timestamp / time.Millisecond)
audioData := from.Raw.(AudioData)
avcc.InitRecycleIndexes(1)

View File

@@ -12,14 +12,25 @@ import (
type (
AudioCodecID byte
VideoCodecID byte
H265Ctx struct {
codec.H265Ctx
Enhanced bool
H264Ctx struct {
*codec.H264Ctx
SequenceFrame Video
}
H265Ctx struct {
*codec.H265Ctx
SequenceFrame Video
Enhanced bool
}
AACCtx struct {
*codec.AACCtx
SequenceFrame Audio
}
OPUSCtx struct {
codec.OPUSCtx
}
AV1Ctx struct {
codec.AV1Ctx
*codec.AV1Ctx
SequenceFrame Video
Version byte
SeqProfile byte
SeqLevelIdx0 byte
@@ -111,6 +122,18 @@ var (
ErrNonZeroReservedBits = errors.New("non-zero reserved bits found in AV1CodecConfigurationRecord")
)
func (ctx *AACCtx) GetSequenceFrame() *Audio {
return &ctx.SequenceFrame
}
func (ctx *H264Ctx) GetSequenceFrame() *Video {
return &ctx.SequenceFrame
}
func (ctx *H265Ctx) GetSequenceFrame() *Video {
return &ctx.SequenceFrame
}
func (p *AV1Ctx) GetInfo() string {
return fmt.Sprintf("% 02X", p.ConfigOBUs)
}

View File

@@ -1,9 +1,7 @@
package rtmp
import (
"encoding/binary"
"fmt"
"io"
"time"
"m7s.live/v5/pkg/util"
@@ -23,15 +21,6 @@ type RTMPData struct {
util.RecyclableMemory
}
func (avcc *RTMPData) Dump(t byte, w io.Writer) {
m := avcc.GetAllocator().Borrow(9 + avcc.Size)
m[0] = t
binary.BigEndian.PutUint32(m[1:], uint32(4+avcc.Size))
binary.BigEndian.PutUint32(m[5:], avcc.Timestamp)
avcc.CopyTo(m[9:])
w.Write(m)
}
func (avcc *RTMPData) GetSize() int {
return avcc.Size
}
@@ -51,14 +40,10 @@ func (avcc *RTMPData) GetTimestamp() time.Duration {
return time.Duration(avcc.Timestamp) * time.Millisecond
}
func (avcc *RTMPData) GetCTS() time.Duration {
return 0
func (avcc *RTMPData) WrapAudio() *Audio {
return &Audio{RTMPData: *avcc}
}
func (avcc *RTMPData) WrapAudio() *RTMPAudio {
return &RTMPAudio{RTMPData: *avcc}
}
func (avcc *RTMPData) WrapVideo() *RTMPVideo {
return &RTMPVideo{RTMPData: *avcc}
func (avcc *RTMPData) WrapVideo() *Video {
return &Video{RTMPData: *avcc}
}

View File

@@ -2,9 +2,10 @@ package rtmp
import (
"errors"
"m7s.live/v5/pkg"
"runtime"
"m7s.live/v5/pkg"
"m7s.live/v5"
)
@@ -15,11 +16,11 @@ type Sender struct {
lastAbs uint32
}
func (av *Sender) HandleAudio(frame *RTMPAudio) (err error) {
func (av *Sender) HandleAudio(frame *Audio) (err error) {
return av.SendFrame(&frame.RTMPData)
}
func (av *Sender) HandleVideo(frame *RTMPVideo) (err error) {
func (av *Sender) HandleVideo(frame *Video) (err error) {
return av.SendFrame(&frame.RTMPData)
}

View File

@@ -8,26 +8,21 @@ import (
"time"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
. "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
var _ IAVFrame = (*RTMPVideo)(nil)
var _ IAVFrame = (*Video)(nil)
type RTMPVideo struct {
type Video struct {
RTMPData
CTS uint32
}
func (avcc *RTMPVideo) GetCTS() time.Duration {
return time.Duration(avcc.CTS) * time.Millisecond
}
// 过滤掉异常的 NALU
func (avcc *RTMPVideo) filterH264(naluSizeLen int) {
func (avcc *Video) filterH264(naluSizeLen int) {
reader := avcc.NewReader()
lenReader := reader.NewReader()
reader.Skip(5)
@@ -88,11 +83,12 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) {
}
}
func (avcc *RTMPVideo) filterH265(naluSizeLen int) {
func (avcc *Video) filterH265(naluSizeLen int) {
//TODO
}
func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
func (avcc *Video) Parse(old codec.ICodecCtx, f *AVFrame) (ctx codec.ICodecCtx, err error) {
f.CTS = time.Duration(avcc.CTS) * time.Millisecond
if avcc.Size <= 10 {
err = io.ErrShortBuffer
return
@@ -104,45 +100,46 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
return
}
enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf
t.Value.IDR = b0&0b0111_0000>>4 == 1
f.IDR = b0&0b0111_0000>>4 == 1
packetType := b0 & 0b1111
codecId := VideoCodecID(b0 & 0x0F)
var fourCC codec.FourCC
parseSequence := func() (err error) {
t.Value.IDR = false
var cloneFrame RTMPVideo
f.IDR = false
var cloneFrame Video
cloneFrame.CopyFrom(&avcc.Memory)
switch fourCC {
case codec.FourCC_H264:
var ctx codec.H264Ctx
ctx.Record = cloneFrame.Buffers[0][reader.Offset():]
if t.ICodecCtx != nil && bytes.Equal(t.ICodecCtx.(*codec.H264Ctx).Record, ctx.Record) {
return ErrSkip
newCtx := &H264Ctx{
SequenceFrame: cloneFrame,
}
// fmt.Printf("record: %s", hex.Dump(ctx.Record))
if _, err = ctx.RecordInfo.Unmarshal(ctx.Record); err == nil {
t.SequenceFrame = &cloneFrame
t.ICodecCtx = &ctx
ctx.SPSInfo, err = h264parser.ParseSPS(ctx.SPS())
newCtx.H264Ctx, err = codec.NewH264CtxFromRecord(cloneFrame.Buffers[0][reader.Offset():])
if err == nil {
if old != nil && bytes.Equal(old.(*H264Ctx).Record, newCtx.Record) {
ctx = old
err = ErrSkip
return
}
ctx = newCtx
}
case codec.FourCC_H265:
var ctx H265Ctx
ctx.Enhanced = enhanced
ctx.Record = cloneFrame.Buffers[0][reader.Offset():]
if t.ICodecCtx != nil && bytes.Equal(t.ICodecCtx.(*H265Ctx).Record, ctx.Record) {
return ErrSkip
newCtx := H265Ctx{
Enhanced: enhanced,
SequenceFrame: cloneFrame,
}
if _, err = ctx.RecordInfo.Unmarshal(ctx.Record); err == nil {
ctx.RecordInfo.LengthSizeMinusOne = 3 // Unmarshal wrong LengthSizeMinusOne
t.SequenceFrame = &cloneFrame
t.ICodecCtx = &ctx
ctx.SPSInfo, err = h265parser.ParseSPS(ctx.SPS())
newCtx.H265Ctx, err = codec.NewH265CtxFromRecord(cloneFrame.Buffers[0][reader.Offset():])
if err == nil {
if old != nil && bytes.Equal(old.(*H265Ctx).Record, newCtx.Record) {
ctx = old
err = ErrSkip
return
}
ctx = newCtx
}
case codec.FourCC_AV1:
var ctx AV1Ctx
if err = ctx.Unmarshal(reader); err == nil {
t.SequenceFrame = &cloneFrame
t.ICodecCtx = &ctx
var newCtx AV1Ctx
if err = newCtx.Unmarshal(reader); err == nil {
ctx = &newCtx
}
}
return
@@ -154,17 +151,17 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
err = parseSequence()
return
case PacketTypeCodedFrames:
switch t.ICodecCtx.(type) {
switch old.(type) {
case *H265Ctx:
if avcc.CTS, err = reader.ReadBE(3); err != nil {
return err
return old, err
}
// avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
case *AV1Ctx:
// return avcc.parseAV1(reader)
}
case PacketTypeCodedFramesX:
// avcc.filterH265(int(t.ICodecCtx.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1)
// avcc.filterH265(int(old.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1)
}
} else {
b0, err = reader.ReadByte() //sequence frame flag
@@ -185,56 +182,53 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
return
}
} else {
// switch ctx := t.ICodecCtx.(type) {
// switch ctx := old.(type) {
// case *codec.H264Ctx:
// avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
// case *H265Ctx:
// avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
// }
// if avcc.Size <= 5 {
// return ErrSkip
// return old, ErrSkip
// }
}
}
if ctx == nil {
ctx = old
}
return
}
func (avcc *RTMPVideo) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVFrame, err error) {
func (avcc *Video) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, err error) {
var enhanced = true //TODO
switch fourCC := from.FourCC(); fourCC {
case codec.FourCC_H264:
h264ctx := from.GetBase().(*codec.H264Ctx)
var seqFrame RTMPData
seqFrame.AppendOne(append([]byte{0x17, 0, 0, 0, 0}, h264ctx.Record...))
switch fromCtx := from.GetBase().(type) {
case *codec.H264Ctx:
ctx := &H264Ctx{H264Ctx: fromCtx}
ctx.SequenceFrame.AppendOne(append([]byte{0x17, 0, 0, 0, 0}, fromCtx.Record...))
//if t.Enabled(context.TODO(), TraceLevel) {
// c := t.FourCC().String()
// size := seqFrame.GetSize()
// data := seqFrame.String()
// t.Trace("decConfig", "codec", c, "size", size, "data", data)
//}
return h264ctx, seqFrame.WrapVideo(), err
case codec.FourCC_H265:
h265ctx := from.GetBase().(*codec.H265Ctx)
b := make(util.Buffer, len(h265ctx.Record)+5)
return ctx, err
case *codec.H265Ctx:
ctx := &H265Ctx{H265Ctx: fromCtx, Enhanced: enhanced}
b := make(util.Buffer, len(ctx.Record)+5)
if enhanced {
b[0] = 0b1001_0000 | byte(PacketTypeSequenceStart)
copy(b[1:], codec.FourCC_H265[:])
} else {
b[0], b[1], b[2], b[3], b[4] = 0x1C, 0, 0, 0, 0
}
copy(b[5:], h265ctx.Record)
var ctx H265Ctx
ctx.Enhanced = enhanced
ctx.H265Ctx = *h265ctx
var seqFrame RTMPData
seqFrame.AppendOne(b)
return &ctx, seqFrame.WrapVideo(), err
case codec.FourCC_AV1:
copy(b[5:], ctx.Record)
ctx.SequenceFrame.AppendOne(b)
return ctx, err
}
return
}
func (avcc *RTMPVideo) parseH264(ctx *codec.H264Ctx, reader *util.MemoryReader) (any, error) {
func (avcc *Video) parseH264(ctx *H264Ctx, reader *util.MemoryReader) (any, error) {
var nalus Nalus
if err := nalus.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil {
return nalus, err
@@ -242,7 +236,7 @@ func (avcc *RTMPVideo) parseH264(ctx *codec.H264Ctx, reader *util.MemoryReader)
return nalus, nil
}
func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any, error) {
func (avcc *Video) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any, error) {
var nalus Nalus
if err := nalus.ParseAVCC(reader, int(ctx.RecordInfo.LengthSizeMinusOne)+1); err != nil {
return nalus, err
@@ -250,7 +244,7 @@ func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any,
return nalus, nil
}
func (avcc *RTMPVideo) parseAV1(reader *util.MemoryReader) (any, error) {
func (avcc *Video) parseAV1(reader *util.MemoryReader) (any, error) {
var obus OBUs
if err := obus.ParseAVCC(reader); err != nil {
return obus, err
@@ -258,7 +252,7 @@ func (avcc *RTMPVideo) parseAV1(reader *util.MemoryReader) (any, error) {
return obus, nil
}
func (avcc *RTMPVideo) Demux(codecCtx codec.ICodecCtx) (any, error) {
func (avcc *Video) Demux(codecCtx codec.ICodecCtx) (any, error) {
reader := avcc.NewReader()
b0, err := reader.ReadByte()
if err != nil {
@@ -310,7 +304,7 @@ func (avcc *RTMPVideo) Demux(codecCtx codec.ICodecCtx) (any, error) {
return avcc.parseH265(ctx, reader)
}
case *codec.H264Ctx:
case *H264Ctx:
if b0 == 0 {
nalus.Append(ctx.SPS())
nalus.Append(ctx.PPS())
@@ -323,7 +317,7 @@ func (avcc *RTMPVideo) Demux(codecCtx codec.ICodecCtx) (any, error) {
return nil, nil
}
func (avcc *RTMPVideo) muxOld26x(codecID VideoCodecID, from *AVFrame) {
func (avcc *Video) muxOld26x(codecID VideoCodecID, from *AVFrame) {
nalus := from.Raw.(Nalus)
avcc.InitRecycleIndexes(len(nalus)) // Recycle partial data
head := avcc.NextN(5)
@@ -338,12 +332,12 @@ func (avcc *RTMPVideo) muxOld26x(codecID VideoCodecID, from *AVFrame) {
}
}
func (avcc *RTMPVideo) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
func (avcc *Video) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
avcc.Timestamp = uint32(from.Timestamp / time.Millisecond)
switch ctx := codecCtx.(type) {
case *AV1Ctx:
panic(ctx)
case *codec.H264Ctx:
case *H264Ctx:
avcc.muxOld26x(CodecID_H264, from)
case *H265Ctx:
if ctx.Enhanced {

View File

@@ -72,19 +72,19 @@ type (
}
PCMACtx struct {
RTPCtx
codec.PCMACtx
*codec.PCMACtx
}
PCMUCtx struct {
RTPCtx
codec.PCMUCtx
*codec.PCMUCtx
}
OPUSCtx struct {
RTPCtx
codec.OPUSCtx
*codec.OPUSCtx
}
AACCtx struct {
RTPCtx
codec.AACCtx
*codec.AACCtx
SizeLength int // 通常为13
IndexLength int
IndexDeltaLength int
@@ -137,11 +137,11 @@ func (r *RTPData) Append(ctx *RTPCtx, ts uint32, payload []byte) (lastPacket *rt
return
}
func (r *RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVFrame, err error) {
switch from.FourCC() {
case codec.FourCC_H264:
func (RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, err error) {
switch base := from.GetBase().(type) {
case *codec.H264Ctx:
var ctx H264Ctx
ctx.H264Ctx = *from.GetBase().(*codec.H264Ctx)
ctx.H264Ctx = base
ctx.PayloadType = 96
ctx.MimeType = webrtc.MimeTypeH264
ctx.ClockRate = 90000
@@ -149,19 +149,19 @@ func (r *RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVF
ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc)
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
to = &ctx
case codec.FourCC_H265:
case *codec.H265Ctx:
var ctx H265Ctx
ctx.H265Ctx = *from.GetBase().(*codec.H265Ctx)
ctx.H265Ctx = base
ctx.PayloadType = 98
ctx.MimeType = webrtc.MimeTypeH265
ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), base64.StdEncoding.EncodeToString(ctx.VPS()))
ctx.ClockRate = 90000
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
to = &ctx
case codec.FourCC_MP4A:
case *codec.AACCtx:
var ctx AACCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.AACCtx = *from.GetBase().(*codec.AACCtx)
ctx.AACCtx = base
ctx.MimeType = "audio/MPEG4-GENERIC"
ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=%s", hex.EncodeToString(ctx.AACCtx.ConfigBytes))
ctx.IndexLength = 3
@@ -171,26 +171,26 @@ func (r *RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVF
ctx.PayloadType = 97
ctx.ClockRate = uint32(ctx.CodecData.SampleRate())
to = &ctx
case codec.FourCC_ALAW:
case *codec.PCMACtx:
var ctx PCMACtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMACtx = *from.GetBase().(*codec.PCMACtx)
ctx.PCMACtx = base
ctx.MimeType = webrtc.MimeTypePCMA
ctx.PayloadType = 8
ctx.ClockRate = uint32(ctx.SampleRate)
to = &ctx
case codec.FourCC_ULAW:
case *codec.PCMUCtx:
var ctx PCMUCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMUCtx = *from.GetBase().(*codec.PCMUCtx)
ctx.PCMUCtx = base
ctx.MimeType = webrtc.MimeTypePCMU
ctx.PayloadType = 0
ctx.ClockRate = uint32(ctx.SampleRate)
to = &ctx
case codec.FourCC_OPUS:
case *codec.OPUSCtx:
var ctx OPUSCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.OPUSCtx = *from.GetBase().(*codec.OPUSCtx)
ctx.OPUSCtx = base
ctx.MimeType = webrtc.MimeTypeOpus
ctx.PayloadType = 111
ctx.ClockRate = uint32(ctx.CodecData.SampleRate())
@@ -199,67 +199,67 @@ func (r *RTPData) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVF
return
}
var _ IAVFrame = (*Audio)(nil)
type Audio struct {
RTPData
}
func (r *Audio) Parse(t *AVTrack) (err error) {
func (r *Audio) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
if old != nil {
return old, nil
}
switch r.MimeType {
case webrtc.MimeTypeOpus:
var ctx OPUSCtx
ctx.OPUSCtx = &codec.OPUSCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.OPUSCtx.Channels = int(ctx.RTPCodecParameters.Channels)
t.ICodecCtx = &ctx
new = &ctx
case webrtc.MimeTypePCMA:
var ctx PCMACtx
ctx.PCMACtx = &codec.PCMACtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.AudioCtx.SampleRate = int(r.ClockRate)
ctx.AudioCtx.Channels = int(ctx.RTPCodecParameters.Channels)
t.ICodecCtx = &ctx
new = &ctx
case webrtc.MimeTypePCMU:
var ctx PCMUCtx
ctx.PCMUCtx = &codec.PCMUCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.AudioCtx.SampleRate = int(r.ClockRate)
ctx.AudioCtx.Channels = int(ctx.RTPCodecParameters.Channels)
t.ICodecCtx = &ctx
new = &ctx
case "audio/MP4A-LATM":
var ctx *AACCtx
if t.ICodecCtx != nil {
// ctx = t.ICodecCtx.(*AACCtx)
} else {
ctx = &AACCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.ConfigBytes, err = hex.DecodeString(conf); err == nil {
if ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(ctx.AACCtx.ConfigBytes); err != nil {
return
}
ctx := &AACCtx{
AACCtx: &codec.AACCtx{},
}
ctx.parseFmtpLine(r.RTPCodecParameters)
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.ConfigBytes, err = hex.DecodeString(conf); err == nil {
if ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(ctx.AACCtx.ConfigBytes); err != nil {
return
}
}
t.ICodecCtx = ctx
}
new = ctx
case "audio/MPEG4-GENERIC":
var ctx *AACCtx
if t.ICodecCtx != nil {
// ctx = t.ICodecCtx.(*AACCtx)
} else {
ctx = &AACCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.IndexLength = 3
ctx.IndexDeltaLength = 3
ctx.SizeLength = 13
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.ConfigBytes, err = hex.DecodeString(conf); err == nil {
if ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(ctx.AACCtx.ConfigBytes); err != nil {
return
}
ctx := &AACCtx{AACCtx: &codec.AACCtx{}}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.IndexLength = 3
ctx.IndexDeltaLength = 3
ctx.SizeLength = 13
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.ConfigBytes, err = hex.DecodeString(conf); err == nil {
if ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(ctx.AACCtx.ConfigBytes); err != nil {
return
}
}
t.ICodecCtx = ctx
}
new = ctx
}
if len(r.Packets) == 0 {
return ErrSkip
err = ErrSkip
}
return
}

View File

@@ -1,6 +1,7 @@
package rtp
import (
"bytes"
"encoding/base64"
"fmt"
"io"
@@ -26,16 +27,16 @@ type (
}
H264Ctx struct {
H26xCtx
codec.H264Ctx
*codec.H264Ctx
}
H265Ctx struct {
H26xCtx
codec.H265Ctx
*codec.H265Ctx
DONL bool
}
AV1Ctx struct {
RTPCtx
codec.AV1Ctx
*codec.AV1Ctx
}
VP9Ctx struct {
RTPCtx
@@ -62,14 +63,17 @@ const (
MTUSize = 1460
)
func (r *Video) Parse(t *AVTrack) (err error) {
func (r *Video) Parse(old codec.ICodecCtx, f *AVFrame) (new codec.ICodecCtx, err error) {
f.CTS = r.CTS
switch r.MimeType {
case webrtc.MimeTypeH264:
var ctx *H264Ctx
if t.ICodecCtx != nil {
ctx = t.ICodecCtx.(*H264Ctx)
if old != nil {
ctx = old.(*H264Ctx)
} else {
ctx = &H264Ctx{}
ctx = &H264Ctx{
H264Ctx: &codec.H264Ctx{},
}
ctx.parseFmtpLine(r.RTPCodecParameters)
var sps, pps []byte
//packetization-mode=1; sprop-parameter-sets=J2QAKaxWgHgCJ+WagICAgQ==,KO48sA==; profile-level-id=640029
@@ -86,9 +90,8 @@ func (r *Video) Parse(t *AVTrack) (err error) {
return
}
}
t.ICodecCtx = ctx
}
if t.Value.Raw, err = r.Demux(ctx); err != nil {
if f.Raw, err = r.Demux(ctx); err != nil {
return
}
pts := r.Packets[0].Timestamp
@@ -96,45 +99,58 @@ func (r *Video) Parse(t *AVTrack) (err error) {
dts := ctx.dtsEst.Feed(pts)
r.DTS = time.Duration(dts) * time.Millisecond / 90
r.CTS = time.Duration(pts-dts) * time.Millisecond / 90
for _, nalu := range t.Value.Raw.(Nalus) {
var sps, pps []byte
for _, nalu := range f.Raw.(Nalus) {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case h264parser.NALU_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h264parser.ParseSPS(ctx.SPS()); err != nil {
return
}
sps = nalu.ToBytes()
case h264parser.NALU_PPS:
hasSPSPPS = true
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
if ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.RecordInfo.SPS[0], ctx.RecordInfo.PPS[0]); err != nil {
return
}
pps = nalu.ToBytes()
case codec.NALU_IDR_Picture:
t.Value.IDR = true
f.IDR = true
}
}
if t.Value.IDR && !hasSPSPPS {
spsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
if hasSPSPPS = sps != nil && pps != nil; hasSPSPPS {
var newCodecData h264parser.CodecData
if newCodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
return
}
ppsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
if old != nil && bytes.Equal(newCodecData.Record, old.(*H264Ctx).Record) {
new = old
} else {
ctx = &H264Ctx{
H264Ctx: &codec.H264Ctx{
CodecData: newCodecData,
},
}
ctx.parseFmtpLine(r.RTPCodecParameters)
new = ctx
}
} else {
new = ctx
if f.IDR {
spsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP)
}
r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP)
}
for _, p := range r.Packets {
p.SequenceNumber = ctx.seq
@@ -142,10 +158,12 @@ func (r *Video) Parse(t *AVTrack) (err error) {
}
case webrtc.MimeTypeH265:
var ctx *H265Ctx
if t.ICodecCtx != nil {
ctx = t.ICodecCtx.(*H265Ctx)
if old != nil {
ctx = old.(*H265Ctx)
} else {
ctx = &H265Ctx{}
ctx = &H265Ctx{
H265Ctx: &codec.H265Ctx{},
}
ctx.parseFmtpLine(r.RTPCodecParameters)
var vps, sps, pps []byte
if sprop_sps, ok := ctx.Fmtp["sprop-sps"]; ok {
@@ -173,76 +191,86 @@ func (r *Video) Parse(t *AVTrack) (err error) {
ctx.DONL = true
}
}
t.ICodecCtx = ctx
}
if t.Value.Raw, err = r.Demux(ctx); err != nil {
if f.Raw, err = r.Demux(ctx); err != nil {
return
}
pts := r.Packets[0].Timestamp
dts := ctx.dtsEst.Feed(pts)
r.DTS = time.Duration(dts) * time.Millisecond / 90
r.CTS = time.Duration(pts-dts) * time.Millisecond / 90
var vps, sps, pps []byte
var hasVPSSPSPPS bool
for _, nalu := range t.Value.Raw.(Nalus) {
for _, nalu := range f.Raw.(Nalus) {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
ctx = &H265Ctx{}
ctx.RecordInfo.VPS = [][]byte{nalu.ToBytes()}
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
vps = nalu.ToBytes()
case h265parser.NAL_UNIT_SPS:
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
if ctx.SPSInfo, err = h265parser.ParseSPS(ctx.SPS()); err != nil {
return
}
sps = nalu.ToBytes()
case h265parser.NAL_UNIT_PPS:
hasVPSSPSPPS = true
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
if ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.RecordInfo.VPS[0], ctx.RecordInfo.SPS[0], ctx.RecordInfo.PPS[0]); err != nil {
return
}
pps = nalu.ToBytes()
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
t.Value.IDR = true
f.IDR = true
}
}
if t.Value.IDR && !hasVPSSPSPPS {
vpsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.VPS(),
if hasVPSSPSPPS = vps != nil && sps != nil && pps != nil; hasVPSSPSPPS {
var newCodecData h265parser.CodecData
if newCodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps); err != nil {
return
}
spsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
if old != nil && bytes.Equal(newCodecData.Record, old.(*H265Ctx).Record) {
new = old
} else {
ctx = &H265Ctx{
H265Ctx: &codec.H265Ctx{
CodecData: newCodecData,
},
}
ctx.parseFmtpLine(r.RTPCodecParameters)
new = ctx
}
ppsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
} else {
new = ctx
if f.IDR {
vpsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.VPS(),
}
spsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP)
}
r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP)
}
for _, p := range r.Packets {
p.SequenceNumber = ctx.seq
ctx.seq++
@@ -253,8 +281,9 @@ func (r *Video) Parse(t *AVTrack) (err error) {
// codecCtx = &ctx
case webrtc.MimeTypeAV1:
var ctx AV1Ctx
ctx.AV1Ctx = &codec.AV1Ctx{}
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = &ctx
new = &ctx
default:
err = ErrUnsupportCodec
}
@@ -277,10 +306,6 @@ func (r *Video) GetTimestamp() time.Duration {
return r.DTS
}
func (r *Video) GetCTS() time.Duration {
return r.CTS
}
func (r *Video) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
pts := uint32((from.Timestamp + from.CTS) * 90 / time.Millisecond)
switch c := codecCtx.(type) {

View File

@@ -22,16 +22,16 @@ func TestRTPH264Ctx_CreateFrame(t *testing.T) {
var avFrame = &pkg.AVFrame{}
var mem util.Memory
mem.Append([]byte(randStr))
avFrame.Raw = []util.Memory{mem}
avFrame.Raw = pkg.Nalus{mem}
frame := new(Video)
frame.Mux(ctx, avFrame)
var track = &pkg.AVTrack{}
err := frame.Parse(track)
var track = &pkg.AVFrame{}
_, err := frame.Parse(nil, track)
if err != nil {
t.Error(err)
return
}
if s := string(track.Value.Raw.(pkg.Nalus)[0].ToBytes()); s != randStr {
if s := string(track.Raw.(pkg.Nalus)[0].ToBytes()); s != randStr {
t.Error("not equal", len(s), len(randStr))
}
}

View File

@@ -359,6 +359,7 @@ func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) erro
return
} else if onReceive != nil {
if err := onReceive(channelID, buf); err != nil {
c.Error("onReceive", "error", err)
c.MemoryAllocator.Free(buf)
}
} else {

View File

@@ -218,7 +218,7 @@ func TestNetConnection_Receive(t *testing.T) {
videoFrame.Packets = append(videoFrame.Packets, packet)
return nil
} else {
videoFrame.Parse(videoTrack)
videoFrame.Parse(videoTrack.ICodecCtx, &videoTrack.Value)
// t := time.Now()
// if err = r.WriteVideo(videoFrame); err != nil {
// return err

View File

@@ -495,7 +495,7 @@ func (r *Receiver) Receive() (err error) {
r.audioTSCheckStart = now
r.Stream.Debug("check audio timestamp start firsttime", "timestamp", packet.Timestamp)
} else if !r.useVideoTS {
r.Stream.Debug("debug audio timestamp", "current", packet.Timestamp, "last", r.lastAudioPacketTS, "duration", now.Sub(r.audioTSCheckStart))
// r.Stream.Debug("debug audio timestamp", "current", packet.Timestamp, "last", r.lastAudioPacketTS, "duration", now.Sub(r.audioTSCheckStart))
// 如果3秒内时间戳没有变化切换到使用视频时间戳
if packet.Timestamp == r.lastAudioPacketTS && now.Sub(r.audioTSCheckStart) > 3*time.Second {
r.useVideoTS = true
@@ -511,7 +511,7 @@ func (r *Receiver) Receive() (err error) {
// 时间戳有变化,重置检查
r.lastAudioPacketTS = packet.Timestamp
r.audioTSCheckStart = now
r.Stream.Debug("reset audioTSCheckStart", "lastAudioPacketTS", r.lastAudioPacketTS)
// r.Stream.Debug("reset audioTSCheckStart", "lastAudioPacketTS", r.lastAudioPacketTS)
}
}
@@ -553,10 +553,11 @@ func (r *Receiver) Receive() (err error) {
return pkg.ErrDiscard
} else {
// t := time.Now()
// fmt.Println("write video1", t)
if err = r.WriteVideo(videoFrame); err != nil {
return err
}
// fmt.Println("write video", time.Since(t))
// fmt.Println("write video2", time.Since(t))
videoFrame = &mrtp.Video{}
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = []*rtp.Packet{packet}

View File

@@ -26,12 +26,12 @@ func GetVideoFrame(publisher *m7s.Publisher, server *m7s.Server) ([]*pkg.AnnexB,
return nil, err
}
defer reader.StopRead()
var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.AVTrack, nil)
var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.ICodecCtx)
var annexbList []*pkg.AnnexB
for lastFrameSequence := publisher.VideoTrack.AVTrack.LastValue.Sequence; reader.Value.Sequence <= lastFrameSequence; reader.ReadNext() {
annexb, err := converter.ConvertFromAVFrame(&reader.Value)
annexb, err := converter.Convert(reader.Value.Wraps[0])
if err != nil {
return nil, err
}

View File

@@ -234,7 +234,7 @@ func (IO *MultipleConnection) SendSubscriber(subscriber *m7s.Subscriber) (audioS
} else {
var rtpCtx mrtp.RTPData
var tmpAVTrack AVTrack
tmpAVTrack.ICodecCtx, _, err = rtpCtx.ConvertCtx(vctx)
tmpAVTrack.ICodecCtx, err = rtpCtx.ConvertCtx(vctx)
if err == nil {
rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
} else {
@@ -277,7 +277,7 @@ func (IO *MultipleConnection) SendSubscriber(subscriber *m7s.Subscriber) (audioS
} else {
var rtpCtx mrtp.RTPData
var tmpAVTrack AVTrack
tmpAVTrack.ICodecCtx, _, err = rtpCtx.ConvertCtx(actx)
tmpAVTrack.ICodecCtx, err = rtpCtx.ConvertCtx(actx)
if err == nil {
rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
} else {
@@ -399,13 +399,10 @@ func (r *RemoteStream) Start() (err error) {
if ctx, ok := vctx.(mrtp.IRTPCtx); ok {
rcc = ctx.GetRTPCodecParameter()
} else {
var rtpCtx mrtp.RTPData
var tmpAVTrack AVTrack
tmpAVTrack.ICodecCtx, _, err = rtpCtx.ConvertCtx(vctx)
if err == nil {
rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
if ctx, err := (mrtp.RTPData{}).ConvertCtx(vctx); err == nil {
rcc = ctx.(mrtp.IRTPCtx).GetRTPCodecParameter()
} else {
return
return err
}
}

View File

@@ -2,8 +2,6 @@ package m7s
import (
"fmt"
"os"
"path/filepath"
"reflect"
"slices"
"sync"
@@ -112,7 +110,6 @@ type Publisher struct {
OnSeek func(time.Time)
OnGetPosition func() time.Time
PullProxyConfig *PullProxyConfig
dumpFile *os.File
dropAfterTs time.Duration
}
@@ -159,12 +156,6 @@ func (p *Publisher) Start() (err error) {
if !p.PubVideo {
p.videoReady.Reject(ErrMuted)
}
if p.Dump {
f := filepath.Join("./dump", p.StreamPath)
os.MkdirAll(filepath.Dir(f), 0666)
p.dumpFile, _ = os.OpenFile(filepath.Join("./dump", p.StreamPath), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
}
s.Waiting.WakeUp(p.StreamPath, p)
p.processAliasOnStart()
p.Plugin.Server.OnPublish(p)
@@ -290,7 +281,6 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
func (p *Publisher) fixTimestamp(t *AVTrack, data IAVFrame) {
frame := &t.Value
ts := data.GetTimestamp()
frame.CTS = data.GetCTS()
bytesIn := data.GetSize()
t.AddBytesIn(bytesIn)
frame.Timestamp = t.Tame(ts, t.FPS, p.Scale)
@@ -350,9 +340,6 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if err = p.Err(); err != nil {
return
}
if p.dumpFile != nil {
data.Dump(1, p.dumpFile)
}
if !p.PubVideo {
return ErrMuted
}
@@ -362,21 +349,23 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.VideoTrack.Set(t)
p.Call(p.trackAdded)
}
err = data.Parse(t)
var newCodecCtx codec.ICodecCtx
oldCodecCtx := t.ICodecCtx
newCodecCtx, err = data.Parse(oldCodecCtx, &t.Value)
if err != nil {
return nil
return
}
codecCtxChanged := oldCodecCtx != newCodecCtx
if newCodecCtx == nil {
return ErrUnsupportCodec
}
t.ICodecCtx = newCodecCtx
p.fixTimestamp(t, data)
defer t.SpeedControl(p.Speed)
oldCodecCtx := t.ICodecCtx
codecCtxChanged := oldCodecCtx != t.ICodecCtx
if err != nil {
p.Error("parse", "err", err)
return err
}
if t.ICodecCtx == nil {
return ErrUnsupportCodec
}
if codecCtxChanged && oldCodecCtx != nil {
oldWidth, oldHeight := oldCodecCtx.(IVideoCodecCtx).Width(), oldCodecCtx.(IVideoCodecCtx).Height()
newWidth, newHeight := t.ICodecCtx.(IVideoCodecCtx).Width(), t.ICodecCtx.(IVideoCodecCtx).Height()
@@ -418,7 +407,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
toType := track.FrameType.Elem()
toFrame := reflect.New(toType).Interface().(IAVFrame)
if track.ICodecCtx == nil {
if track.ICodecCtx, track.SequenceFrame, err = toFrame.ConvertCtx(t.ICodecCtx); err != nil {
if track.ICodecCtx, err = toFrame.ConvertCtx(t.ICodecCtx); err != nil {
track.Error("DecodeConfig", "err", err)
return
}
@@ -440,7 +429,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
toFrame.SetAllocator(data.GetAllocator())
toFrame.Mux(track.ICodecCtx, &t.Value)
if codecCtxChanged {
track.ICodecCtx, track.SequenceFrame, err = toFrame.ConvertCtx(t.ICodecCtx)
track.ICodecCtx, err = toFrame.ConvertCtx(t.ICodecCtx)
}
t.Value.Wraps = append(t.Value.Wraps, toFrame)
if track.ICodecCtx != nil {
@@ -465,9 +454,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
if err = p.Err(); err != nil {
return
}
if p.dumpFile != nil {
data.Dump(0, p.dumpFile)
}
if !p.PubAudio {
return ErrMuted
}
@@ -477,10 +464,16 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
p.AudioTrack.Set(t)
p.Call(p.trackAdded)
}
err = data.Parse(t)
var newCodecCtx codec.ICodecCtx
newCodecCtx, err = data.Parse(t.ICodecCtx, &t.Value)
if err != nil {
return
}
codecCtxChanged := t.ICodecCtx != newCodecCtx
if newCodecCtx == nil {
return ErrUnsupportCodec
}
t.ICodecCtx = newCodecCtx
p.fixTimestamp(t, data)
defer t.SpeedControl(p.Speed)
// 根据丢帧率进行音频帧丢弃
@@ -492,11 +485,6 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
}
}
}
oldCodecCtx := t.ICodecCtx
codecCtxChanged := oldCodecCtx != t.ICodecCtx
if t.ICodecCtx == nil {
return ErrUnsupportCodec
}
t.Ready(err)
p.writeAV(t, data)
if p.AudioTrack.Length > 1 && p.AudioTrack.IsReady() {
@@ -510,7 +498,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
toType := track.FrameType.Elem()
toFrame := reflect.New(toType).Interface().(IAVFrame)
if track.ICodecCtx == nil {
if track.ICodecCtx, track.SequenceFrame, err = toFrame.ConvertCtx(t.ICodecCtx); err != nil {
if track.ICodecCtx, err = toFrame.ConvertCtx(t.ICodecCtx); err != nil {
track.Error("DecodeConfig", "err", err)
return
}
@@ -532,7 +520,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
toFrame.SetAllocator(data.GetAllocator())
toFrame.Mux(track.ICodecCtx, &t.Value)
if codecCtxChanged {
track.ICodecCtx, track.SequenceFrame, err = toFrame.ConvertCtx(t.ICodecCtx)
track.ICodecCtx, err = toFrame.ConvertCtx(t.ICodecCtx)
}
t.Value.Wraps = append(t.Value.Wraps, toFrame)
if track.ICodecCtx != nil {
@@ -600,9 +588,6 @@ func (p *Publisher) Dispose() {
p.AudioTrack.Dispose()
p.VideoTrack.Dispose()
p.Info("unpublish", "remain", s.Streams.Length, "reason", p.StopReason())
if p.dumpFile != nil {
p.dumpFile.Close()
}
p.State = PublisherStateDisposed
p.processPullProxyOnDispose()
}

View File

@@ -438,9 +438,9 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) {
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if handler.videoFrame.IDR && vr.DecConfChanged() {
vr.LastCodecCtx = vr.Track.ICodecCtx
if seqFrame := vr.Track.SequenceFrame; seqFrame != nil {
if sctx, ok := vr.LastCodecCtx.(ISequenceCodecCtx[V]); ok {
if handler.vwi > 0 {
err = handler.OnVideo(seqFrame.(V))
err = handler.OnVideo(sctx.GetSequenceFrame())
}
}
}
@@ -498,9 +498,9 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) {
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if ar.DecConfChanged() {
ar.LastCodecCtx = ar.Track.ICodecCtx
if seqFrame := ar.Track.SequenceFrame; seqFrame != nil {
if sctx, ok := ar.LastCodecCtx.(ISequenceCodecCtx[A]); ok {
if handler.awi > 0 {
err = handler.OnAudio(seqFrame.(A))
err = handler.OnAudio(sctx.GetSequenceFrame())
}
}
}