feat: aac

This commit is contained in:
langhuihui
2024-07-04 19:12:13 +08:00
parent 87dc204fc0
commit 938f23955b
31 changed files with 723 additions and 505 deletions

13
api.go
View File

@@ -72,8 +72,7 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
} }
defer reader.StopRead() defer reader.StopRead()
if reader.Value.Raw == nil { if reader.Value.Raw == nil {
reader.Value.Raw, err = reader.Value.Wraps[0].ToRaw(publisher.VideoTrack.ICodecCtx) if err = reader.Value.Demux(publisher.VideoTrack.ICodecCtx); err != nil {
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
} }
@@ -81,17 +80,13 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
var annexb pkg.AnnexB var annexb pkg.AnnexB
var t pkg.AVTrack var t pkg.AVTrack
err = annexb.DecodeConfig(&t, publisher.VideoTrack.ICodecCtx) err = annexb.ConvertCtx(publisher.VideoTrack.ICodecCtx, &t)
if t.ICodecCtx == nil { if t.ICodecCtx == nil {
http.Error(rw, "unsupported codec", http.StatusInternalServerError) http.Error(rw, "unsupported codec", http.StatusInternalServerError)
return return
} }
frame, err := t.CreateFrame(&reader.Value) annexb.Mux(t.ICodecCtx, &reader.Value)
if err != nil { _, err = annexb.WriteTo(rw)
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
_, err = frame.(*pkg.AnnexB).WriteTo(rw)
} }
func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err error) { func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err error) {

View File

@@ -3,8 +3,6 @@ global:
enableauth: true enableauth: true
tcp: tcp:
listenaddr: :50051 listenaddr: :50051
publish:
pubaudio: false
# ringsize: 20-250 # ringsize: 20-250
# buffertime: 10s # buffertime: 10s
# speed: 1 # speed: 1
@@ -13,8 +11,6 @@ console:
logrotate: logrotate:
level: debug level: debug
rtsp: rtsp:
subscribe:
subaudio: false
rtmp: rtmp:
# tcp: # tcp:
# listenaddr: :11935 # listenaddr: :11935

View File

@@ -12,7 +12,6 @@ rtsp:
pushlist: pushlist:
live/test: rtsp://localhost/live/test live/test: rtsp://localhost/live/test
hdl: hdl:
publish: publish:
pubaudio: false pubaudio: false
pull: pull:

View File

@@ -24,13 +24,8 @@ func (a *AnnexB) Dump(t byte, w io.Writer) {
} }
// DecodeConfig implements pkg.IAVFrame. // DecodeConfig implements pkg.IAVFrame.
func (a *AnnexB) DecodeConfig(t *AVTrack, ctx ICodecCtx) error { func (a *AnnexB) ConvertCtx(ctx codec.ICodecCtx, t *AVTrack) error {
switch c := ctx.(type) { t.ICodecCtx = ctx.GetBase()
case codec.IH264Ctx:
var annexb264 Annexb264Ctx
annexb264.H264Ctx = *c.GetH264Ctx()
t.ICodecCtx = &annexb264
}
return nil return nil
} }
@@ -42,9 +37,12 @@ func (a *AnnexB) GetSize() int {
func (a *AnnexB) GetTimestamp() time.Duration { func (a *AnnexB) GetTimestamp() time.Duration {
return a.DTS * time.Millisecond / 90 return a.DTS * time.Millisecond / 90
} }
func (a *AnnexB) GetCTS() time.Duration {
return (a.PTS - a.DTS) * time.Millisecond / 90
}
// Parse implements pkg.IAVFrame. // Parse implements pkg.IAVFrame.
func (a *AnnexB) Parse(t *AVTrack) (isIDR bool, isSeq bool, raw any, err error) { func (a *AnnexB) Parse(t *AVTrack) (err error) {
panic("unimplemented") panic("unimplemented")
} }
@@ -53,35 +51,25 @@ func (a *AnnexB) String() string {
return fmt.Sprintf("%d %d", a.DTS, a.Memory.Size) return fmt.Sprintf("%d %d", a.DTS, a.Memory.Size)
} }
// ToRaw implements pkg.IAVFrame. // Demux implements pkg.IAVFrame.
func (a *AnnexB) ToRaw(ctx ICodecCtx) (any, error) { func (a *AnnexB) Demux(ctx codec.ICodecCtx) (any, error) {
// var nalus Nalus
// nalus.PTS = a.PTS
// nalus.DTS = a.DTS
panic("unimplemented") panic("unimplemented")
} }
type Annexb264Ctx struct { func (a *AnnexB) Mux(codecCtx codec.ICodecCtx, frame *AVFrame) {
codec.H264Ctx a.AppendOne(codec.NALU_Delimiter2)
}
type Annexb265Ctx struct {
codec.H265Ctx
}
func (a *Annexb264Ctx) CreateFrame(frame *AVFrame) (IAVFrame, error) {
var annexb AnnexB
// annexb.RecyclableBuffers.ScalableMemoryAllocator = frame.Wraps[0].GetScalableMemoryAllocator()
annexb.Append(codec.NALU_Delimiter2)
if frame.IDR { if frame.IDR {
annexb.Append(a.SPS[0], codec.NALU_Delimiter2, a.PPS[0], codec.NALU_Delimiter2) switch ctx := codecCtx.(type) {
} case *codec.H264Ctx:
var nalus = frame.Raw.(Nalus) a.Append(ctx.SPS[0], codec.NALU_Delimiter2, ctx.PPS[0], codec.NALU_Delimiter2)
for i, nalu := range nalus.Nalus { case *codec.H265Ctx:
if i > 0 { a.Append(ctx.SPS[0], codec.NALU_Delimiter2, ctx.PPS[0], codec.NALU_Delimiter2, ctx.VPS[0], codec.NALU_Delimiter2)
annexb.Append(codec.NALU_Delimiter1)
} }
annexb.Append(nalu.Buffers...)
} }
return &annexb, nil for i, nalu := range frame.Raw.(Nalus) {
if i > 0 {
a.AppendOne(codec.NALU_Delimiter1)
}
a.Append(nalu.Buffers...)
}
} }

View File

@@ -3,6 +3,7 @@ package pkg
import ( import (
"context" "context"
"log/slog" "log/slog"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"time" "time"
) )
@@ -27,7 +28,7 @@ type AVRingReader struct {
FirstTs time.Duration FirstTs time.Duration
SkipTs time.Duration //ms SkipTs time.Duration //ms
beforeJump time.Duration beforeJump time.Duration
LastCodecCtx ICodecCtx LastCodecCtx codec.ICodecCtx
startTime time.Time startTime time.Time
AbsTime uint32 AbsTime uint32
Delay uint32 Delay uint32
@@ -143,11 +144,11 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) {
} }
// func (r *AVRingReader) GetPTS32() uint32 { // func (r *AVRingReader) GetPTS32() uint32 {
// return uint32((r.Value.Raw.PTS - r.SkipTs*90/time.Millisecond)) // return uint32((r.Value.Raw.Timestamp - r.SkipTs*90/time.Millisecond))
// } // }
// func (r *AVRingReader) GetDTS32() uint32 { // func (r *AVRingReader) GetDTS32() uint32 {
// return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond)) // return uint32((r.Value.CTS - r.SkipTs*90/time.Millisecond))
// } // }
func (r *AVRingReader) ResetAbsTime() { func (r *AVRingReader) ResetAbsTime() {

View File

@@ -12,47 +12,49 @@ import (
) )
type ( type (
ICodecCtx interface {
CreateFrame(*AVFrame) (IAVFrame, error)
FourCC() codec.FourCC
GetInfo() string
}
IAudioCodecCtx interface { IAudioCodecCtx interface {
ICodecCtx codec.ICodecCtx
GetSampleRate() int GetSampleRate() int
GetChannels() int GetChannels() int
GetSampleSize() int GetSampleSize() int
} }
IVideoCodecCtx interface { IVideoCodecCtx interface {
ICodecCtx codec.ICodecCtx
GetWidth() int GetWidth() int
GetHeight() int GetHeight() int
} }
IDataFrame interface { IDataFrame interface {
} }
// Source -> Parse -> Demux -> (ConvertCtx) -> Mux(GetAllocator) -> Recycle
IAVFrame interface { IAVFrame interface {
GetScalableMemoryAllocator() *util.ScalableMemoryAllocator GetAllocator() *util.ScalableMemoryAllocator
Parse(*AVTrack) (bool, bool, any, error) SetAllocator(*util.ScalableMemoryAllocator)
DecodeConfig(*AVTrack, ICodecCtx) error Parse(*AVTrack) error // get codec info, idr
ToRaw(ICodecCtx) (any, error) ConvertCtx(codec.ICodecCtx, *AVTrack) error // convert codec from source stream
Demux(codec.ICodecCtx) (any, error) // demux to raw format
Mux(codec.ICodecCtx, *AVFrame) // mux from raw format
GetTimestamp() time.Duration GetTimestamp() time.Duration
GetCTS() time.Duration
GetSize() int GetSize() int
Recycle() Recycle()
String() string String() string
Dump(byte, io.Writer) Dump(byte, io.Writer)
} }
Nalus struct { Nalus []util.Memory
PTS time.Duration
DTS time.Duration AudioData = util.Memory
Nalus []util.Memory
} OBUs AudioData
AVFrame struct { AVFrame struct {
DataFrame DataFrame
IDR bool IDR bool
Timestamp time.Duration // 绝对时间戳 Timestamp time.Duration // 绝对时间戳
CTS time.Duration // composition time stamp
Wraps []IAVFrame // 封装格式 Wraps []IAVFrame // 封装格式
} }
AVRing = util.Ring[AVFrame] AVRing = util.Ring[AVFrame]
DataFrame struct { DataFrame struct {
sync.RWMutex sync.RWMutex
@@ -67,6 +69,7 @@ var _ IAVFrame = (*AnnexB)(nil)
func (frame *AVFrame) Reset() { func (frame *AVFrame) Reset() {
frame.Timestamp = 0 frame.Timestamp = 0
frame.Raw = nil
if len(frame.Wraps) > 0 { if len(frame.Wraps) > 0 {
for _, wrap := range frame.Wraps { for _, wrap := range frame.Wraps {
wrap.Recycle() wrap.Recycle()
@@ -80,6 +83,11 @@ func (frame *AVFrame) Discard() {
frame.Reset() frame.Reset()
} }
func (frame *AVFrame) Demux(codecCtx codec.ICodecCtx) (err error) {
frame.Raw, err = frame.Wraps[0].Demux(codecCtx)
return
}
func (df *DataFrame) StartWrite() bool { func (df *DataFrame) StartWrite() bool {
if df.TryLock() { if df.TryLock() {
return true return true
@@ -95,15 +103,15 @@ func (df *DataFrame) Ready() {
} }
func (nalus *Nalus) H264Type() codec.H264NALUType { func (nalus *Nalus) H264Type() codec.H264NALUType {
return codec.ParseH264NALUType(nalus.Nalus[0].Buffers[0][0]) return codec.ParseH264NALUType((*nalus)[0].Buffers[0][0])
} }
func (nalus *Nalus) H265Type() codec.H265NALUType { func (nalus *Nalus) H265Type() codec.H265NALUType {
return codec.ParseH265NALUType(nalus.Nalus[0].Buffers[0][0]) return codec.ParseH265NALUType((*nalus)[0].Buffers[0][0])
} }
func (nalus *Nalus) Append(bytes []byte) { func (nalus *Nalus) Append(bytes []byte) {
nalus.Nalus = append(nalus.Nalus, util.Memory{Buffers: net.Buffers{bytes}, Size: len(bytes)}) *nalus = append(*nalus, util.Memory{Buffers: net.Buffers{bytes}, Size: len(bytes)})
} }
func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error { func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error {
@@ -112,27 +120,26 @@ func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error
if err != nil { if err != nil {
return err return err
} }
reader.RangeN(l, nalus.Append) var mem util.Memory
reader.RangeN(int(l), mem.AppendOne)
*nalus = append(*nalus, mem)
} }
return nil return nil
} }
type OBUs struct {
PTS time.Duration
OBUs []net.Buffers
}
func (obus *OBUs) Append(bytes ...[]byte) {
obus.OBUs = append(obus.OBUs, bytes)
}
func (obus *OBUs) ParseAVCC(reader *util.MemoryReader) error { func (obus *OBUs) ParseAVCC(reader *util.MemoryReader) error {
var obuHeader av1.OBUHeader var obuHeader av1.OBUHeader
startLen := reader.Length startLen := reader.Length
for reader.Length > 0 { for reader.Length > 0 {
offset := reader.Size - reader.Length offset := reader.Size - reader.Length
b, _ := reader.ReadByte() b, err := reader.ReadByte()
obuHeader.Unmarshal([]byte{b}) if err != nil {
return err
}
err = obuHeader.Unmarshal([]byte{b})
if err != nil {
return err
}
// if log.Trace { // if log.Trace {
// vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame)) // vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame))
// } // }
@@ -144,7 +151,7 @@ func (obus *OBUs) ParseAVCC(reader *util.MemoryReader) error {
if err != nil { if err != nil {
return err return err
} }
obus.Append(obu) (*AudioData)(obus).AppendOne(obu)
} }
return nil return nil
} }

View File

@@ -1,21 +1,28 @@
package codec package codec
import (
"fmt"
)
type ( type (
AudioCtx struct { AudioCtx struct {
SampleRate int SampleRate int
Channels int Channels int
SampleSize int SampleSize int
} }
PCMACtx AudioCtx PCMACtx struct {
PCMUCtx AudioCtx AudioCtx
OPUSCtx AudioCtx }
AACCtx struct { PCMUCtx struct {
AudioCtx
}
OPUSCtx struct {
AudioCtx
}
AACCtx struct {
AudioCtx AudioCtx
Asc []byte Asc []byte
} }
IAACCtx interface {
GetAACCtx() *AACCtx
}
) )
func (ctx *AudioCtx) GetSampleRate() int { func (ctx *AudioCtx) GetSampleRate() int {
@@ -30,7 +37,11 @@ func (ctx *AudioCtx) GetSampleSize() int {
return ctx.SampleSize return ctx.SampleSize
} }
func (ctx *AACCtx) GetAACCtx() *AACCtx { func (ctx *AudioCtx) GetInfo() string {
return fmt.Sprintf("sample rate: %d, channels: %d, sample size: %d", ctx.SampleRate, ctx.Channels, ctx.SampleSize)
}
func (ctx *AACCtx) GetBase() ICodecCtx {
return ctx return ctx
} }
@@ -42,6 +53,14 @@ func (*PCMACtx) FourCC() FourCC {
return FourCC_ALAW return FourCC_ALAW
} }
func (ctx *PCMACtx) GetBase() ICodecCtx {
return ctx
}
func (ctx *PCMUCtx) GetBase() ICodecCtx {
return ctx
}
func (*AACCtx) FourCC() FourCC { func (*AACCtx) FourCC() FourCC {
return FourCC_MP4A return FourCC_MP4A
} }
@@ -49,3 +68,7 @@ func (*AACCtx) FourCC() FourCC {
func (*OPUSCtx) FourCC() FourCC { func (*OPUSCtx) FourCC() FourCC {
return FourCC_OPUS return FourCC_OPUS
} }
func (ctx *OPUSCtx) GetBase() ICodecCtx {
return ctx
}

View File

@@ -13,15 +13,16 @@ const (
) )
type ( type (
IAV1Ctx interface {
GetAV1Ctx() *AV1Ctx
}
AV1Ctx struct { AV1Ctx struct {
ConfigOBUs []byte ConfigOBUs []byte
} }
) )
func (ctx *AV1Ctx) GetAV1Ctx() *AV1Ctx { func (ctx *AV1Ctx) GetInfo() string {
return "AV1"
}
func (ctx *AV1Ctx) GetBase() ICodecCtx {
return ctx return ctx
} }

View File

@@ -106,9 +106,6 @@ func SplitH264(payload []byte) (nalus [][]byte) {
} }
type ( type (
IH264Ctx interface {
GetH264Ctx() *H264Ctx
}
H264Ctx struct { H264Ctx struct {
SPSInfo SPSInfo
SPS [][]byte SPS [][]byte
@@ -132,6 +129,6 @@ func (h264 *H264Ctx) GetHeight() int {
return int(h264.Height) return int(h264.Height)
} }
func (h264 *H264Ctx) GetH264Ctx() *H264Ctx { func (h264 *H264Ctx) GetBase() ICodecCtx {
return h264 return h264
} }

View File

@@ -90,9 +90,6 @@ const (
var AudNalu = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10} var AudNalu = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10}
type ( type (
IH265Ctx interface {
GetH265Ctx() *H265Ctx
}
H265Ctx struct { H265Ctx struct {
H264Ctx H264Ctx
VPS [][]byte VPS [][]byte
@@ -115,6 +112,6 @@ func (*H265Ctx) FourCC() FourCC {
return FourCC_H265 return FourCC_H265
} }
func (h265 *H265Ctx) GetH265Ctx() *H265Ctx { func (h265 *H265Ctx) GetBase() ICodecCtx {
return h265 return h265
} }

7
pkg/codec/index.go Normal file
View File

@@ -0,0 +1,7 @@
package codec
type ICodecCtx interface {
FourCC() FourCC
GetInfo() string
GetBase() ICodecCtx
}

View File

@@ -30,9 +30,10 @@ type Config struct {
tag reflect.StructTag tag reflect.StructTag
} }
var durationType = reflect.TypeOf(time.Duration(0)) var (
var regexpType = reflect.TypeOf(Regexp{}) durationType = reflect.TypeOf(time.Duration(0))
var regexpYaml = regexp.MustCompile(`^(.+: )"(.+)"$`) regexpType = reflect.TypeOf(Regexp{})
)
func (config *Config) Range(f func(key string, value Config)) { func (config *Config) Range(f func(key string, value Config)) {
if m, ok := config.GetValue().(map[string]Config); ok { if m, ok := config.GetValue().(map[string]Config); ok {
@@ -261,8 +262,7 @@ func (config *Config) valueWithoutModify() any {
} }
func equal(vwm, v any) bool { func equal(vwm, v any) bool {
ft := reflect.TypeOf(vwm) switch ft := reflect.TypeOf(vwm); ft {
switch ft {
case regexpType: case regexpType:
return vwm.(Regexp).String() == v.(Regexp).String() return vwm.(Regexp).String() == v.(Regexp).String()
default: default:

View File

@@ -156,23 +156,6 @@ func (p *Push) CheckPush(streamPath string) string {
return url return url
} }
type Console struct {
Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址
Secret string `desc:"远程控制台密钥"` //远程控制台密钥
PublicAddr string `desc:"远程控制台公网地址"` //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
PublicAddrTLS string `desc:"远程控制台公网TLS地址"`
}
type Engine struct {
EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言
SettingDir string `default:".m7s" desc:""`
EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小
PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度
}
type Common struct { type Common struct {
PublicIP string PublicIP string
LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别 LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别

View File

@@ -3,6 +3,7 @@ package pkg
import ( import (
"context" "context"
"log/slog" "log/slog"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"reflect" "reflect"
"time" "time"
@@ -29,7 +30,7 @@ type (
AVTrack struct { AVTrack struct {
Track Track
*RingWriter *RingWriter
ICodecCtx codec.ICodecCtx
Allocator *util.ScalableMemoryAllocator Allocator *util.ScalableMemoryAllocator
SequenceFrame IAVFrame SequenceFrame IAVFrame
WrapIndex int WrapIndex int
@@ -42,7 +43,7 @@ func NewAVTrack(args ...any) (t *AVTrack) {
switch v := arg.(type) { switch v := arg.(type) {
case IAVFrame: case IAVFrame:
t.FrameType = reflect.TypeOf(v) t.FrameType = reflect.TypeOf(v)
t.Allocator = v.GetScalableMemoryAllocator() t.Allocator = v.GetAllocator()
case reflect.Type: case reflect.Type:
t.FrameType = v t.FrameType = v
case *slog.Logger: case *slog.Logger:

View File

@@ -146,10 +146,7 @@ func (r *BufReader) ReadString(n int) (s string, err error) {
} }
func (r *BufReader) ReadBytes(n int) (mem Memory, err error) { func (r *BufReader) ReadBytes(n int) (mem Memory, err error) {
err = r.ReadRange(n, func(buf []byte) { err = r.ReadRange(n, mem.AppendOne)
mem.Buffers = append(mem.Buffers, buf)
})
mem.Size = n
return return
} }

View File

@@ -24,7 +24,7 @@ func TestReadBytesTo(t *testing.T) {
s := RandomString(100) s := RandomString(100)
t.Logf("s:%s", s) t.Logf("s:%s", s)
var m Memory var m Memory
m.Append([]byte(s)) m.AppendOne([]byte(s))
r := m.NewReader() r := m.NewReader()
seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
var total []byte var total []byte

View File

@@ -19,16 +19,18 @@ type MemoryReader struct {
} }
func NewReadableBuffersFromBytes(b ...[]byte) *MemoryReader { func NewReadableBuffersFromBytes(b ...[]byte) *MemoryReader {
buf := NewMemory(b) buf := &Memory{Buffers: b}
for _, level0 := range b {
buf.Size += len(level0)
}
return &MemoryReader{Memory: buf, Length: buf.Size} return &MemoryReader{Memory: buf, Length: buf.Size}
} }
func NewMemory(buffers net.Buffers) *Memory { func NewMemory(buf []byte) Memory {
ret := &Memory{Buffers: buffers} return Memory{
for _, level0 := range buffers { Buffers: net.Buffers{buf},
ret.Size += len(level0) Size: len(buf),
} }
return ret
} }
func (m *Memory) UpdateBuffer(index int, buf []byte) { func (m *Memory) UpdateBuffer(index int, buf []byte) {
@@ -42,7 +44,7 @@ func (m *Memory) UpdateBuffer(index int, buf []byte) {
func (m *Memory) CopyFrom(b *Memory) { func (m *Memory) CopyFrom(b *Memory) {
buf := make([]byte, b.Size) buf := make([]byte, b.Size)
b.CopyTo(buf) b.CopyTo(buf)
m.Append(buf) m.AppendOne(buf)
} }
func (m *Memory) CopyTo(buf []byte) { func (m *Memory) CopyTo(buf []byte) {
@@ -59,6 +61,11 @@ func (m *Memory) ToBytes() []byte {
return buf return buf
} }
func (m *Memory) AppendOne(b []byte) {
m.Buffers = append(m.Buffers, b)
m.Size += len(b)
}
func (m *Memory) Append(b ...[]byte) { func (m *Memory) Append(b ...[]byte) {
m.Buffers = append(m.Buffers, b...) m.Buffers = append(m.Buffers, b...)
for _, level0 := range b { for _, level0 := range b {
@@ -230,13 +237,13 @@ func (r *MemoryReader) ReadBytes(n int) ([]byte, error) {
return b[:actual], nil return b[:actual], nil
} }
func (r *MemoryReader) ReadBE(n int) (num int, err error) { func (r *MemoryReader) ReadBE(n int) (num uint32, err error) {
for i := range n { for i := range n {
b, err := r.ReadByte() b, err := r.ReadByte()
if err != nil { if err != nil {
return -1, err return 0, err
} }
num += int(b) << ((n - i - 1) << 3) num += uint32(b) << ((n - i - 1) << 3)
} }
return return
} }

View File

@@ -199,7 +199,7 @@ func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) {
return return
} }
func (sma *ScalableMemoryAllocator) GetScalableMemoryAllocator() *ScalableMemoryAllocator { func (sma *ScalableMemoryAllocator) GetAllocator() *ScalableMemoryAllocator {
return sma return sma
} }
@@ -253,6 +253,10 @@ type RecyclableMemory struct {
RecycleIndexes []int RecycleIndexes []int
} }
func (r *RecyclableMemory) SetAllocator(allocator *ScalableMemoryAllocator) {
r.ScalableMemoryAllocator = allocator
}
func (r *RecyclableMemory) NextN(size int) (memory []byte) { func (r *RecyclableMemory) NextN(size int) (memory []byte) {
memory = r.ScalableMemoryAllocator.Malloc(size) memory = r.ScalableMemoryAllocator.Malloc(size)
if memory == nil { if memory == nil {
@@ -260,7 +264,7 @@ func (r *RecyclableMemory) NextN(size int) (memory []byte) {
} else if r.RecycleIndexes != nil { } else if r.RecycleIndexes != nil {
r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) r.RecycleIndexes = append(r.RecycleIndexes, r.Count())
} }
r.Append(memory) r.AppendOne(memory)
return return
} }
@@ -268,7 +272,7 @@ func (r *RecyclableMemory) AddRecycleBytes(b []byte) {
if r.RecycleIndexes != nil { if r.RecycleIndexes != nil {
r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) r.RecycleIndexes = append(r.RecycleIndexes, r.Count())
} }
r.Append(b) r.AppendOne(b)
} }
func (r *RecyclableMemory) RemoveRecycleBytes(index int) (buf []byte) { func (r *RecyclableMemory) RemoveRecycleBytes(index int) (buf []byte) {

View File

@@ -1,6 +1,7 @@
package util package util
import ( import (
"fmt"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"strconv" "strconv"
"strings" "strings"
@@ -21,8 +22,20 @@ func (r *Range[T]) Valid() bool {
return r.Size() >= 0 return r.Size() >= 0
} }
func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error { func (r *Range[T]) Resolve(s string) error {
ss := strings.Split(value.Value, "-") ss := strings.Split(s, "-")
if len(ss) == 0 {
return fmt.Errorf("invalid range: %s", s)
}
if len(ss) == 1 {
i64, err := strconv.ParseInt(s, 10, 0)
r[0] = T(i64)
if err != nil {
return err
}
r[1] = r[0]
return nil
}
i64, err := strconv.ParseInt(ss[0], 10, 0) i64, err := strconv.ParseInt(ss[0], 10, 0)
if err != nil { if err != nil {
return err return err
@@ -35,3 +48,7 @@ func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error {
r[1] = T(i64) r[1] = T(i64)
return nil return nil
} }
func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error {
return r.Resolve(value.Value)
}

View File

@@ -304,18 +304,18 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu
puller.Publish = p.config.Publish puller.Publish = p.config.Publish
puller.PublishTimeout = 0 puller.PublishTimeout = 0
puller.StreamPath = streamPath puller.StreamPath = streamPath
var pullHandler PullHandler
for _, option := range options { for _, option := range options {
switch v := option.(type) { switch v := option.(type) {
case PullHandler: case PullHandler:
defer func() { pullHandler = v
if err == nil {
puller.Start(v)
}
}()
} }
} }
puller.Init(p, streamPath, &puller.Publish, options...) puller.Init(p, streamPath, &puller.Publish, options...)
_, err = p.server.Call(puller) _, err = p.server.Call(puller)
if err == nil && pullHandler != nil {
err = puller.Start(pullHandler)
}
return return
} }

View File

@@ -3,13 +3,15 @@ package rtmp
import ( import (
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
"time"
) )
type RTMPAudio struct { type RTMPAudio struct {
RTMPData RTMPData
} }
func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { func (avcc *RTMPAudio) Parse(t *AVTrack) (err error) {
reader := avcc.NewReader() reader := avcc.NewReader()
var b, b0, b1 byte var b, b0, b1 byte
b, err = reader.ReadByte() b, err = reader.ReadByte()
@@ -19,7 +21,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
switch b & 0b1111_0000 >> 4 { switch b & 0b1111_0000 >> 4 {
case 7: case 7:
if t.ICodecCtx == nil { if t.ICodecCtx == nil {
var ctx PCMACtx var ctx codec.PCMACtx
ctx.SampleRate = 8000 ctx.SampleRate = 8000
ctx.Channels = 1 ctx.Channels = 1
ctx.SampleSize = 8 ctx.SampleSize = 8
@@ -27,7 +29,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
} }
case 8: case 8:
if t.ICodecCtx == nil { if t.ICodecCtx == nil {
var ctx PCMUCtx var ctx codec.PCMUCtx
ctx.SampleRate = 8000 ctx.SampleRate = 8000
ctx.Channels = 1 ctx.Channels = 1
ctx.SampleSize = 8 ctx.SampleSize = 8
@@ -38,8 +40,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
if err != nil { if err != nil {
return return
} }
isSeq = b == 0 if b == 0 {
if isSeq {
var ctx AACCtx var ctx AACCtx
b0, err = reader.ReadByte() b0, err = reader.ReadByte()
if err != nil { if err != nil {
@@ -51,7 +52,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
} }
var cloneFrame RTMPAudio var cloneFrame RTMPAudio
cloneFrame.CopyFrom(&avcc.Memory) cloneFrame.CopyFrom(&avcc.Memory)
ctx.Asc = cloneFrame.Buffers[0] ctx.Asc = []byte{b0, b1}
ctx.AudioObjectType = b0 >> 3 ctx.AudioObjectType = b0 >> 3
ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7) ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7)
ctx.ChannelConfiguration = (b1 >> 3) & 0x0F ctx.ChannelConfiguration = (b1 >> 3) & 0x0F
@@ -68,52 +69,48 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
return return
} }
func (avcc *RTMPAudio) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { func (avcc *RTMPAudio) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) {
switch fourCC := from.FourCC(); fourCC { switch fourCC := from.FourCC(); fourCC {
case codec.FourCC_ALAW:
var ctx PCMACtx
t.ICodecCtx = &ctx
case codec.FourCC_ULAW:
var ctx PCMUCtx
ctx.SampleRate = 8000
ctx.Channels = 1
ctx.SampleSize = 8
t.ICodecCtx = &ctx
case codec.FourCC_MP4A: case codec.FourCC_MP4A:
var ctx AACCtx var ctx AACCtx
ctx.SampleRate = 44100 ctx.AACCtx = *from.GetBase().(*codec.AACCtx)
ctx.Channels = 2 b0, b1 := ctx.Asc[0], ctx.Asc[1]
ctx.SampleSize = 16 ctx.AudioObjectType = b0 >> 3
ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7)
ctx.ChannelConfiguration = (b1 >> 3) & 0x0F
ctx.FrameLengthFlag = (b1 >> 2) & 0x01
ctx.DependsOnCoreCoder = (b1 >> 1) & 0x01
ctx.ExtensionFlag = b1 & 0x01
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
default:
t.ICodecCtx = from.GetBase()
} }
return return
} }
func (avcc *RTMPAudio) ToRaw(codecCtx ICodecCtx) (any, error) { func (avcc *RTMPAudio) Demux(codecCtx codec.ICodecCtx) (raw any, err error) {
reader := avcc.NewReader() reader := avcc.NewReader()
var result util.Memory
if _, ok := codecCtx.(*AACCtx); ok { if _, ok := codecCtx.(*AACCtx); ok {
err := reader.Skip(2) err = reader.Skip(2)
return reader.Memory, err reader.Range(result.AppendOne)
return result, err
} else { } else {
err := reader.Skip(1) err = reader.Skip(1)
return reader.Memory, err reader.Range(result.AppendOne)
return result, err
} }
} }
func (aac *AACCtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { func (avcc *RTMPAudio) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
var rtmpAudio RTMPAudio avcc.Timestamp = uint32(from.Timestamp / time.Millisecond)
frame = &rtmpAudio audioData := from.Raw.(AudioData)
return switch c := codecCtx.FourCC(); c {
} case codec.FourCC_MP4A:
avcc.AppendOne([]byte{0xAF, 0x01})
func (g711 *PCMACtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { avcc.Append(audioData.Buffers...)
var rtmpAudio RTMPAudio case codec.FourCC_ALAW, codec.FourCC_ULAW:
frame = &rtmpAudio avcc.AppendOne([]byte{byte(ParseAudioCodec(c))<<4 | (1 << 1)})
return avcc.Append(audioData.Buffers...)
} }
func (g711 *PCMUCtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) {
var rtmpAudio RTMPAudio
frame = &rtmpAudio
return
} }

View File

@@ -47,12 +47,7 @@ type (
InitialPresentationDelayPresent byte InitialPresentationDelayPresent byte
InitialPresentationDelayMinusOne byte InitialPresentationDelayMinusOne byte
} }
PCMACtx struct {
codec.PCMACtx
}
PCMUCtx struct {
codec.PCMUCtx
}
AACCtx struct { AACCtx struct {
codec.AACCtx codec.AACCtx
AudioSpecificConfig AudioSpecificConfig
@@ -217,7 +212,7 @@ func (ctx *H264Ctx) Unmarshal(b *util.MemoryReader) (err error) {
if err1 != nil { if err1 != nil {
return err1 return err1
} }
spsbytes, err2 := b.ReadBytes(spslen) spsbytes, err2 := b.ReadBytes(int(spslen))
if err2 != nil { if err2 != nil {
return err2 return err2
} }
@@ -239,7 +234,7 @@ func (ctx *H264Ctx) Unmarshal(b *util.MemoryReader) (err error) {
if err1 != nil { if err1 != nil {
return err1 return err1
} }
ppsbytes, err2 := b.ReadBytes(ppslen) ppsbytes, err2 := b.ReadBytes(int(ppslen))
if err2 != nil { if err2 != nil {
return err2 return err2
} }
@@ -292,7 +287,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) {
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
vps, err := b.ReadBytes(vpslen) vps, err := b.ReadBytes(int(vpslen))
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
@@ -311,7 +306,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) {
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
sps, err := b.ReadBytes(spslen) sps, err := b.ReadBytes(int(spslen))
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
@@ -334,7 +329,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) {
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
pps, err := b.ReadBytes(ppslen) pps, err := b.ReadBytes(int(ppslen))
if err != nil { if err != nil {
return ErrHevc return ErrHevc
} }
@@ -765,14 +760,6 @@ func (p *AV1Ctx) Unmarshal(data *util.MemoryReader) (err error) {
return nil return nil
} }
func (PCMACtx) GetInfo() string {
return "pcma"
}
func (PCMUCtx) GetInfo() string {
return "pcmu"
}
func (ctx *AACCtx) GetInfo() string { func (ctx *AACCtx) GetInfo() string {
return fmt.Sprintf("AudioObjectType: %d, SamplingFrequencyIndex: %d, ChannelConfiguration: %d, FrameLengthFlag: %d, DependsOnCoreCoder: %d, ExtensionFlag: %d", ctx.AudioObjectType, ctx.SamplingFrequencyIndex, ctx.ChannelConfiguration, ctx.FrameLengthFlag, ctx.DependsOnCoreCoder, ctx.ExtensionFlag) return fmt.Sprintf("AudioObjectType: %d, SamplingFrequencyIndex: %d, ChannelConfiguration: %d, FrameLengthFlag: %d, DependsOnCoreCoder: %d, ExtensionFlag: %d", ctx.AudioObjectType, ctx.SamplingFrequencyIndex, ctx.ChannelConfiguration, ctx.FrameLengthFlag, ctx.DependsOnCoreCoder, ctx.ExtensionFlag)
} }

View File

@@ -51,6 +51,10 @@ func (avcc *RTMPData) GetTimestamp() time.Duration {
return time.Duration(avcc.Timestamp) * time.Millisecond return time.Duration(avcc.Timestamp) * time.Millisecond
} }
func (avcc *RTMPData) GetCTS() time.Duration {
return 0
}
func (avcc *RTMPData) WrapAudio() *RTMPAudio { func (avcc *RTMPData) WrapAudio() *RTMPAudio {
return &RTMPAudio{RTMPData: *avcc} return &RTMPAudio{RTMPData: *avcc}
} }

View File

@@ -3,6 +3,7 @@ package rtmp
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"io"
"time" "time"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
@@ -14,9 +15,18 @@ var _ IAVFrame = (*RTMPVideo)(nil)
type RTMPVideo struct { type RTMPVideo struct {
RTMPData RTMPData
CTS uint32
} }
func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { func (avcc *RTMPVideo) GetCTS() time.Duration {
return time.Duration(avcc.CTS) * time.Millisecond
}
func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
if avcc.Size <= 10 {
err = io.ErrShortBuffer
return
}
reader := avcc.NewReader() reader := avcc.NewReader()
var b0 byte var b0 byte
b0, err = reader.ReadByte() b0, err = reader.ReadByte()
@@ -24,12 +34,11 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
return return
} }
enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf
isIDR = b0&0b0111_0000>>4 == 1 t.Value.IDR = b0&0b0111_0000>>4 == 1
packetType := b0 & 0b1111 packetType := b0 & 0b1111
var fourCC codec.FourCC var fourCC codec.FourCC
parseSequence := func() (err error) { parseSequence := func() (err error) {
isSeq = true t.Value.IDR = false
isIDR = false
var cloneFrame RTMPVideo var cloneFrame RTMPVideo
cloneFrame.CopyFrom(&avcc.Memory) cloneFrame.CopyFrom(&avcc.Memory)
switch fourCC { switch fourCC {
@@ -101,10 +110,10 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
return return
} }
func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { func (avcc *RTMPVideo) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) {
switch fourCC := from.FourCC(); fourCC { switch fourCC := from.FourCC(); fourCC {
case codec.FourCC_H264: case codec.FourCC_H264:
h264ctx := from.(codec.IH264Ctx).GetH264Ctx() h264ctx := from.GetBase().(*codec.H264Ctx)
var ctx H264Ctx var ctx H264Ctx
ctx.H264Ctx = *h264ctx ctx.H264Ctx = *h264ctx
lenSPS := len(h264ctx.SPS[0]) lenSPS := len(h264ctx.SPS[0])
@@ -125,7 +134,7 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) {
b.Write(h264ctx.PPS[0]) b.Write(h264ctx.PPS[0])
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
var seqFrame RTMPData var seqFrame RTMPData
seqFrame.Append(b) seqFrame.AppendOne(b)
t.SequenceFrame = seqFrame.WrapVideo() t.SequenceFrame = seqFrame.WrapVideo()
if t.Enabled(context.TODO(), TraceLevel) { if t.Enabled(context.TODO(), TraceLevel) {
c := t.FourCC().String() c := t.FourCC().String()
@@ -133,8 +142,9 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) {
data := seqFrame.String() data := seqFrame.String()
t.Trace("decConfig", "codec", c, "size", size, "data", data) t.Trace("decConfig", "codec", c, "size", size, "data", data)
} }
case codec.FourCC_H265:
// TODO: H265
} }
return return
} }
@@ -143,9 +153,8 @@ func (avcc *RTMPVideo) parseH264(ctx *H264Ctx, reader *util.MemoryReader) (any,
if err != nil { if err != nil {
return nil, err return nil, err
} }
avcc.CTS = cts
var nalus Nalus var nalus Nalus
nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil { if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil {
return nalus, err return nalus, err
} }
@@ -157,9 +166,8 @@ func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any,
if err != nil { if err != nil {
return nil, err return nil, err
} }
avcc.CTS = cts
var nalus Nalus var nalus Nalus
nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil { if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil {
return nalus, err return nalus, err
} }
@@ -168,14 +176,13 @@ func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any,
func (avcc *RTMPVideo) parseAV1(reader *util.MemoryReader) (any, error) { func (avcc *RTMPVideo) parseAV1(reader *util.MemoryReader) (any, error) {
var obus OBUs var obus OBUs
obus.PTS = time.Duration(avcc.Timestamp) * 90
if err := obus.ParseAVCC(reader); err != nil { if err := obus.ParseAVCC(reader); err != nil {
return obus, err return obus, err
} }
return obus, nil return obus, nil
} }
func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) { func (avcc *RTMPVideo) Demux(codecCtx codec.ICodecCtx) (any, error) {
reader := avcc.NewReader() reader := avcc.NewReader()
b0, err := reader.ReadByte() b0, err := reader.ReadByte()
if err != nil { if err != nil {
@@ -217,24 +224,13 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) {
var nalus Nalus var nalus Nalus
if codecCtx.FourCC() == codec.FourCC_H265 { if codecCtx.FourCC() == codec.FourCC_H265 {
var ctx = codecCtx.(*H265Ctx) var ctx = codecCtx.(*H265Ctx)
var spsM util.Memory nalus.Append(ctx.SPS[0])
spsM.Append(ctx.SPS[0]) nalus.Append(ctx.PPS[0])
var ppsM util.Memory nalus.Append(ctx.VPS[0])
ppsM.Append(ctx.PPS[0])
var vpsM util.Memory
vpsM.Append(ctx.VPS[0])
nalus.PTS = time.Duration(avcc.Timestamp) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
nalus.Nalus = append(nalus.Nalus, spsM, ppsM, vpsM)
} else { } else {
var ctx = codecCtx.(*H264Ctx) var ctx = codecCtx.(*H264Ctx)
var spsM util.Memory nalus.Append(ctx.SPS[0])
spsM.Append(ctx.SPS[0]) nalus.Append(ctx.PPS[0])
var ppsM util.Memory
ppsM.Append(ctx.PPS[0])
nalus.PTS = time.Duration(avcc.Timestamp) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
nalus.Nalus = append(nalus.Nalus, spsM, ppsM)
} }
return nalus, nil return nalus, nil
} else { } else {
@@ -247,33 +243,24 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) {
} }
return nil, nil return nil, nil
} }
func createH26xFrame(from *AVFrame, codecID VideoCodecID) (frame IAVFrame, err error) {
var rtmpVideo RTMPVideo func (avcc *RTMPVideo) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
rtmpVideo.Timestamp = uint32(from.Timestamp / time.Millisecond) avcc.Timestamp = uint32(from.Timestamp / time.Millisecond)
rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() switch ctx := codecCtx.(type) {
nalus := from.Raw.(Nalus) case *AV1Ctx:
rtmpVideo.RecycleIndexes = make([]int, 0, len(nalus.Nalus)) // Recycle partial data panic(ctx)
head := rtmpVideo.NextN(5) default:
head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(codecID) nalus := from.Raw.(Nalus)
head[1] = 1 avcc.RecycleIndexes = make([]int, 0, len(nalus)) // Recycle partial data
util.PutBE(head[2:5], (nalus.PTS-nalus.DTS)/90) // cts head := avcc.NextN(5)
for _, nalu := range nalus.Nalus { head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(ParseVideoCodec(codecCtx.FourCC()))
naluLenM := rtmpVideo.NextN(4) head[1] = 1
naluLen := uint32(nalu.Size) util.PutBE(head[2:5], from.CTS/time.Millisecond) // cts
binary.BigEndian.PutUint32(naluLenM, naluLen) for _, nalu := range nalus {
rtmpVideo.Append(nalu.Buffers...) naluLenM := avcc.NextN(4)
naluLen := uint32(nalu.Size)
binary.BigEndian.PutUint32(naluLenM, naluLen)
avcc.Append(nalu.Buffers...)
}
} }
frame = &rtmpVideo
return
}
func (h264 *H264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
return createH26xFrame(from, ParseVideoCodec(h264.FourCC()))
}
func (h265 *H265Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
return createH26xFrame(from, ParseVideoCodec(h265.FourCC()))
}
func (av1 *AV1Ctx) CreateFrame(*AVFrame) (frame IAVFrame, err error) {
return
} }

View File

@@ -5,7 +5,10 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/bluenviron/mediacommon/pkg/bits"
"io" "io"
"regexp"
"strings"
"time" "time"
"unsafe" "unsafe"
@@ -16,6 +19,8 @@ import (
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
) )
var configRegexp = regexp.MustCompile(`config=(.+),([^;]+)(;|$)`)
type RTPData struct { type RTPData struct {
*webrtc.RTPCodecParameters *webrtc.RTPCodecParameters
Packets []*rtp.Packet Packets []*rtp.Packet
@@ -48,6 +53,10 @@ func (r *RTPData) GetTimestamp() time.Duration {
return time.Duration(r.Packets[0].Timestamp) * time.Second / time.Duration(r.ClockRate) return time.Duration(r.Packets[0].Timestamp) * time.Second / time.Duration(r.ClockRate)
} }
func (r *RTPData) GetCTS() time.Duration {
return 0
}
func (r *RTPData) GetSize() (s int) { func (r *RTPData) GetSize() (s int) {
for _, p := range r.Packets { for _, p := range r.Packets {
s += p.MarshalSize() s += p.MarshalSize()
@@ -58,6 +67,7 @@ func (r *RTPData) GetSize() (s int) {
type ( type (
RTPCtx struct { RTPCtx struct {
webrtc.RTPCodecParameters webrtc.RTPCodecParameters
Fmtp map[string]string
SequenceNumber uint16 SequenceNumber uint16
SSRC uint32 SSRC uint32
} }
@@ -76,12 +86,29 @@ type (
RTPAACCtx struct { RTPAACCtx struct {
RTPCtx RTPCtx
codec.AACCtx codec.AACCtx
SizeLength int // 通常为13
IndexLength int
IndexDeltaLength int
} }
IRTPCtx interface { IRTPCtx interface {
GetRTPCodecParameter() webrtc.RTPCodecParameters GetRTPCodecParameter() webrtc.RTPCodecParameters
} }
) )
func (r *RTPCtx) parseFmtpLine(cp *webrtc.RTPCodecParameters) {
r.RTPCodecParameters = *cp
r.Fmtp = make(map[string]string)
kvs := strings.Split(r.SDPFmtpLine, ";")
for _, kv := range kvs {
if kv = strings.TrimSpace(kv); kv == "" {
continue
}
if key, value, found := strings.Cut(kv, "="); found {
r.Fmtp[strings.TrimSpace(key)] = strings.TrimSpace(value)
}
}
}
func (r *RTPCtx) GetInfo() string { func (r *RTPCtx) GetInfo() string {
return r.GetRTPCodecParameter().SDPFmtpLine return r.GetRTPCodecParameter().SDPFmtpLine
} }
@@ -94,11 +121,27 @@ func (r *RTPCtx) GetSequenceFrame() IAVFrame {
return nil return nil
} }
func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { func (r *RTPData) Append(ctx *RTPCtx, ts uint32, payload []byte) (lastPacket *rtp.Packet) {
switch c := from.(type) { ctx.SequenceNumber++
case codec.IH264Ctx: lastPacket = &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: ts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: payload,
}
r.Packets = append(r.Packets, lastPacket)
return
}
func (r *RTPData) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) {
switch from.FourCC() {
case codec.FourCC_H264:
var ctx RTPH264Ctx var ctx RTPH264Ctx
ctx.H264Ctx = *c.GetH264Ctx() ctx.H264Ctx = *from.GetBase().(*codec.H264Ctx)
ctx.PayloadType = 96 ctx.PayloadType = 96
ctx.MimeType = webrtc.MimeTypeH264 ctx.MimeType = webrtc.MimeTypeH264
ctx.ClockRate = 90000 ctx.ClockRate = 90000
@@ -106,24 +149,52 @@ func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) {
ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc)
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case codec.IH265Ctx: case codec.FourCC_H265:
var ctx RTPH265Ctx var ctx RTPH265Ctx
ctx.H265Ctx = *c.GetH265Ctx() ctx.H265Ctx = *from.GetBase().(*codec.H265Ctx)
ctx.PayloadType = 98 ctx.PayloadType = 98
ctx.MimeType = webrtc.MimeTypeH265 ctx.MimeType = webrtc.MimeTypeH265
ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), base64.StdEncoding.EncodeToString(ctx.VPS[0])) ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), base64.StdEncoding.EncodeToString(ctx.VPS[0]))
ctx.ClockRate = 90000 ctx.ClockRate = 90000
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case codec.IAACCtx: case codec.FourCC_MP4A:
var ctx RTPAACCtx var ctx RTPAACCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.AACCtx = *c.GetAACCtx() ctx.AACCtx = *from.GetBase().(*codec.AACCtx)
ctx.MimeType = "audio/MPEG4-GENERIC" ctx.MimeType = "audio/MPEG4-GENERIC"
ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s", hex.EncodeToString(ctx.AACCtx.Asc)) ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=%s", hex.EncodeToString(ctx.AACCtx.Asc))
ctx.IndexLength = 3
ctx.IndexDeltaLength = 3
ctx.SizeLength = 13
ctx.RTPCtx.Channels = uint16(ctx.AACCtx.Channels)
ctx.PayloadType = 97 ctx.PayloadType = 97
ctx.ClockRate = uint32(ctx.SampleRate) ctx.ClockRate = uint32(ctx.SampleRate)
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case codec.FourCC_ALAW:
var ctx RTPPCMACtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMACtx = *from.GetBase().(*codec.PCMACtx)
ctx.MimeType = webrtc.MimeTypePCMA
ctx.PayloadType = 8
ctx.ClockRate = uint32(ctx.SampleRate)
t.ICodecCtx = &ctx
case codec.FourCC_ULAW:
var ctx RTPPCMUCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMUCtx = *from.GetBase().(*codec.PCMUCtx)
ctx.MimeType = webrtc.MimeTypePCMU
ctx.PayloadType = 0
ctx.ClockRate = uint32(ctx.SampleRate)
t.ICodecCtx = &ctx
case codec.FourCC_OPUS:
var ctx RTPOPUSCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.OPUSCtx = *from.GetBase().(*codec.OPUSCtx)
ctx.MimeType = webrtc.MimeTypeOpus
ctx.PayloadType = 111
ctx.ClockRate = uint32(ctx.SampleRate)
t.ICodecCtx = &ctx
} }
return return
} }
@@ -132,32 +203,207 @@ type RTPAudio struct {
RTPData RTPData
} }
func (r *RTPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { func (r *RTPAudio) Parse(t *AVTrack) (err error) {
switch r.MimeType { switch r.MimeType {
case webrtc.MimeTypeOpus: case webrtc.MimeTypeOpus:
var ctx RTPOPUSCtx var ctx RTPOPUSCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters ctx.parseFmtpLine(r.RTPCodecParameters)
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case webrtc.MimeTypePCMA: case webrtc.MimeTypePCMA:
var ctx RTPPCMACtx var ctx RTPPCMACtx
ctx.RTPCodecParameters = *r.RTPCodecParameters ctx.parseFmtpLine(r.RTPCodecParameters)
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case webrtc.MimeTypePCMU: case webrtc.MimeTypePCMU:
var ctx RTPPCMUCtx var ctx RTPPCMUCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters ctx.parseFmtpLine(r.RTPCodecParameters)
t.ICodecCtx = &ctx t.ICodecCtx = &ctx
case "audio/MPEG4-GENERIC": case "audio/MPEG4-GENERIC":
var ctx RTPAACCtx var ctx *RTPAACCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters if t.ICodecCtx != nil {
t.ICodecCtx = &ctx ctx = t.ICodecCtx.(*RTPAACCtx)
} else {
ctx = &RTPAACCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
ctx.IndexLength = 3
ctx.IndexDeltaLength = 3
ctx.SizeLength = 13
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.Asc, err = hex.DecodeString(conf); err == nil {
ctx.SampleRate = int(r.ClockRate)
ctx.Channels = int(r.Channels)
ctx.SampleSize = 16
}
}
t.ICodecCtx = ctx
}
} }
return return
} }
func (ctx *RTPCtx) CreateFrame(*AVFrame) (IAVFrame, error) { func (r *RTPAudio) Demux(codexCtx codec.ICodecCtx) (any, error) {
panic("unimplemented") var data AudioData
switch codexCtx.(type) {
case *RTPAACCtx:
var fragments util.Memory
for _, packet := range r.Packets {
if len(packet.Payload) < 2 {
continue
}
auHeaderLen := util.ReadBE[int](packet.Payload[:2])
if auHeaderLen == 0 {
data.AppendOne(packet.Payload)
} else {
dataLens, err := r.readAUHeaders(codexCtx.(*RTPAACCtx), packet.Payload[2:], auHeaderLen)
if err != nil {
return nil, err
}
payload := packet.Payload[2:]
pos := auHeaderLen >> 3
if (auHeaderLen % 8) != 0 {
pos++
}
payload = payload[pos:]
if fragments.Size == 0 {
if packet.Marker {
for _, dataLen := range dataLens {
if len(payload) < int(dataLen) {
return nil, fmt.Errorf("invalid data len %d", dataLen)
}
data.AppendOne(payload[:dataLen])
payload = payload[dataLen:]
}
} else {
if len(dataLens) != 1 {
return nil, fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.AppendOne(payload)
}
} else {
if len(dataLens) != 1 {
return nil, fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.AppendOne(payload)
if !packet.Header.Marker {
continue
}
if uint64(fragments.Size) != dataLens[0] {
return nil, fmt.Errorf("fragmented AU size is not correct %d != %d", dataLens[0], fragments.Size)
}
data.Append(fragments.Buffers...)
fragments = util.Memory{}
}
}
break
}
default:
for _, packet := range r.Packets {
data.AppendOne(packet.Payload)
}
}
return data, nil
} }
func (r *RTPAudio) ToRaw(ICodecCtx) (any, error) { func (r *RTPAudio) Mux(codexCtx codec.ICodecCtx, from *AVFrame) {
return nil, nil data := from.Raw.(AudioData)
var ctx *RTPCtx
var lastPacket *rtp.Packet
switch c := codexCtx.(type) {
case *RTPAACCtx:
ctx = &c.RTPCtx
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度又因为单个auheader字节长度2字节所以再除以2就是auheader的个数。
auHeaderLen := []byte{0x00, 0x10, (byte)((r.Size & 0x1fe0) >> 5), (byte)((r.Size & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3
for reader := data.NewReader(); reader.Length > 0; {
payloadLen := MTUSize
if reader.Length+4 < MTUSize {
payloadLen = reader.Length + 4
}
mem := r.NextN(payloadLen)
copy(mem, auHeaderLen)
reader.ReadBytesTo(mem[4:])
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
return
case *RTPPCMACtx:
ctx = &c.RTPCtx
case *RTPPCMUCtx:
ctx = &c.RTPCtx
}
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
if reader := data.NewReader(); reader.Length > MTUSize {
for reader.Length > 0 {
payloadLen := MTUSize
if reader.Length < MTUSize {
payloadLen = reader.Length
}
mem := r.NextN(payloadLen)
reader.ReadBytesTo(mem)
lastPacket = r.Append(ctx, pts, mem)
}
} else {
mem := r.NextN(reader.Length)
reader.ReadBytesTo(mem)
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
}
func (r *RTPAudio) readAUHeaders(ctx *RTPAACCtx, buf []byte, headersLen int) ([]uint64, error) {
firstRead := false
count := 0
for i := 0; i < headersLen; {
if i == 0 {
i += ctx.SizeLength
i += ctx.IndexLength
} else {
i += ctx.SizeLength
i += ctx.IndexDeltaLength
}
count++
}
dataLens := make([]uint64, count)
pos := 0
i := 0
for headersLen > 0 {
dataLen, err := bits.ReadBits(buf, &pos, ctx.SizeLength)
if err != nil {
return nil, err
}
headersLen -= ctx.SizeLength
if !firstRead {
firstRead = true
if ctx.IndexLength > 0 {
auIndex, err := bits.ReadBits(buf, &pos, ctx.IndexLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexLength
if auIndex != 0 {
return nil, fmt.Errorf("AU-index different than zero is not supported")
}
}
} else if ctx.IndexDeltaLength > 0 {
auIndexDelta, err := bits.ReadBits(buf, &pos, ctx.IndexDeltaLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexDeltaLength
if auIndexDelta != 0 {
return nil, fmt.Errorf("AU-index-delta different than zero is not supported")
}
}
dataLens[i] = dataLen
i++
}
return dataLens, nil
} }

View File

@@ -4,7 +4,7 @@ import (
"encoding/base64" "encoding/base64"
"errors" "errors"
"fmt" "fmt"
"regexp" "strings"
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -36,11 +36,10 @@ type (
) )
var ( var (
_ IAVFrame = (*RTPVideo)(nil) _ IAVFrame = (*RTPVideo)(nil)
_ IVideoCodecCtx = (*RTPH264Ctx)(nil) _ IVideoCodecCtx = (*RTPH264Ctx)(nil)
_ IVideoCodecCtx = (*RTPH265Ctx)(nil) _ IVideoCodecCtx = (*RTPH265Ctx)(nil)
_ IVideoCodecCtx = (*RTPAV1Ctx)(nil) _ IVideoCodecCtx = (*RTPAV1Ctx)(nil)
spropReg = regexp.MustCompile(`sprop-parameter-sets=(.+),([^;]+)(;|$)`)
) )
const ( const (
@@ -49,7 +48,7 @@ const (
MTUSize = 1460 MTUSize = 1460
) )
func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { func (r *RTPVideo) Parse(t *AVTrack) (err error) {
switch r.MimeType { switch r.MimeType {
case webrtc.MimeTypeH264: case webrtc.MimeTypeH264:
var ctx *RTPH264Ctx var ctx *RTPH264Ctx
@@ -57,47 +56,42 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
ctx = t.ICodecCtx.(*RTPH264Ctx) ctx = t.ICodecCtx.(*RTPH264Ctx)
} else { } else {
ctx = &RTPH264Ctx{} ctx = &RTPH264Ctx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
//packetization-mode=1; sprop-parameter-sets=J2QAKaxWgHgCJ+WagICAgQ==,KO48sA==; profile-level-id=640029 //packetization-mode=1; sprop-parameter-sets=J2QAKaxWgHgCJ+WagICAgQ==,KO48sA==; profile-level-id=640029
ctx.RTPCodecParameters = *r.RTPCodecParameters if sprop, ok := ctx.Fmtp["sprop-parameter-sets"]; ok {
if match := spropReg.FindStringSubmatch(ctx.SDPFmtpLine); len(match) > 2 { if sprops := strings.Split(sprop, ","); len(sprops) == 2 {
if sps, err := base64.StdEncoding.DecodeString(match[1]); err == nil { if sps, err := base64.StdEncoding.DecodeString(sprops[0]); err == nil {
ctx.SPS = [][]byte{sps} ctx.SPS = [][]byte{sps}
} }
if pps, err := base64.StdEncoding.DecodeString(match[2]); err == nil { if pps, err := base64.StdEncoding.DecodeString(sprops[1]); err == nil {
ctx.PPS = [][]byte{pps} ctx.PPS = [][]byte{pps}
}
} }
} }
t.ICodecCtx = ctx t.ICodecCtx = ctx
} }
raw, err = r.ToRaw(ctx) if t.Value.Raw, err = r.Demux(ctx); err != nil {
if err != nil {
return return
} }
nalus := raw.(Nalus) for _, nalu := range t.Value.Raw.(Nalus) {
for _, nalu := range nalus.Nalus {
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) { switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
case codec.NALU_SPS: case codec.NALU_SPS:
ctx = &RTPH264Ctx{}
ctx.SPS = [][]byte{nalu.ToBytes()} ctx.SPS = [][]byte{nalu.ToBytes()}
if err = ctx.SPSInfo.Unmarshal(ctx.SPS[0]); err != nil { if err = ctx.SPSInfo.Unmarshal(ctx.SPS[0]); err != nil {
return return
} }
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
case codec.NALU_PPS: case codec.NALU_PPS:
ctx.PPS = [][]byte{nalu.ToBytes()} ctx.PPS = [][]byte{nalu.ToBytes()}
case codec.NALU_IDR_Picture: case codec.NALU_IDR_Picture:
isIDR = true t.Value.IDR = true
} }
} }
case webrtc.MimeTypeVP9: case webrtc.MimeTypeVP9:
// var ctx RTPVP9Ctx // var ctx RTPVP9Ctx
// ctx.FourCC = codec.FourCC_VP9
// ctx.RTPCodecParameters = *r.RTPCodecParameters // ctx.RTPCodecParameters = *r.RTPCodecParameters
// codecCtx = &ctx // codecCtx = &ctx
case webrtc.MimeTypeAV1: case webrtc.MimeTypeAV1:
// var ctx RTPAV1Ctx // var ctx RTPAV1Ctx
// ctx.FourCC = codec.FourCC_AV1
// ctx.RTPCodecParameters = *r.RTPCodecParameters // ctx.RTPCodecParameters = *r.RTPCodecParameters
// codecCtx = &ctx // codecCtx = &ctx
case webrtc.MimeTypeH265: case webrtc.MimeTypeH265:
@@ -106,15 +100,13 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
ctx = t.ICodecCtx.(*RTPH265Ctx) ctx = t.ICodecCtx.(*RTPH265Ctx)
} else { } else {
ctx = &RTPH265Ctx{} ctx = &RTPH265Ctx{}
ctx.RTPCodecParameters = *r.RTPCodecParameters ctx.parseFmtpLine(r.RTPCodecParameters)
t.ICodecCtx = ctx t.ICodecCtx = ctx
} }
raw, err = r.ToRaw(ctx) if t.Value.Raw, err = r.Demux(ctx); err != nil {
if err != nil {
return return
} }
nalus := raw.(Nalus) for _, nalu := range t.Value.Raw.(Nalus) {
for _, nalu := range nalus.Nalus {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case codec.NAL_UNIT_SPS: case codec.NAL_UNIT_SPS:
ctx = &RTPH265Ctx{} ctx = &RTPH265Ctx{}
@@ -132,13 +124,9 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
codec.NAL_UNIT_CODED_SLICE_IDR, codec.NAL_UNIT_CODED_SLICE_IDR,
codec.NAL_UNIT_CODED_SLICE_IDR_N_LP, codec.NAL_UNIT_CODED_SLICE_IDR_N_LP,
codec.NAL_UNIT_CODED_SLICE_CRA: codec.NAL_UNIT_CODED_SLICE_CRA:
isIDR = true t.Value.IDR = true
} }
} }
case "audio/MPEG4-GENERIC", "audio/AAC":
var ctx RTPAACCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = &ctx
default: default:
err = ErrUnsupportCodec err = ErrUnsupportCodec
} }
@@ -153,65 +141,57 @@ func (h265 *RTPH265Ctx) GetInfo() string {
return h265.SDPFmtpLine return h265.SDPFmtpLine
} }
func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { func (av1 *RTPAV1Ctx) GetInfo() string {
var r RTPVideo return av1.SDPFmtpLine
r.RTPCodecParameters = &h264.RTPCodecParameters
if len(from.Wraps) > 0 {
r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
}
nalus := from.Raw.(Nalus)
var lastPacket *rtp.Packet
createPacket := func(payload []byte) *rtp.Packet {
h264.SequenceNumber++
lastPacket = &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: h264.SequenceNumber,
Timestamp: uint32(nalus.PTS),
SSRC: h264.SSRC,
PayloadType: uint8(h264.PayloadType),
},
Payload: payload,
}
return lastPacket
}
if nalus.H264Type() == codec.NALU_IDR_Picture && len(h264.SPS) > 0 && len(h264.PPS) > 0 {
r.Packets = append(r.Packets, createPacket(h264.SPS[0]), createPacket(h264.PPS[0]))
}
for _, nalu := range nalus.Nalus {
if reader := nalu.NewReader(); reader.Length > MTUSize {
//fu-a
mem := r.Malloc(MTUSize)
n := reader.ReadBytesTo(mem[1:])
fuaHead := codec.NALU_FUA.Or(mem[1] & 0x60)
mem[0] = fuaHead
naluType := mem[1] & 0x1f
mem[1] = naluType | startBit
r.FreeRest(&mem, n+1)
r.AddRecycleBytes(mem)
r.Packets = append(r.Packets, createPacket(mem))
for reader.Length > 0 {
mem = r.Malloc(MTUSize)
n = reader.ReadBytesTo(mem[2:])
mem[0] = fuaHead
mem[1] = naluType
r.FreeRest(&mem, n+2)
r.AddRecycleBytes(mem)
r.Packets = append(r.Packets, createPacket(mem))
}
lastPacket.Payload[1] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.ReadBytesTo(mem)
r.Packets = append(r.Packets, createPacket(mem))
}
}
frame = &r
lastPacket.Header.Marker = true
return
} }
func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { func (r *RTPVideo) GetCTS() time.Duration {
return 0
}
func (r *RTPVideo) Mux(codecCtx codec.ICodecCtx, from *AVFrame) {
pts := uint32((from.Timestamp + from.CTS) * 90 / time.Millisecond)
switch c := codecCtx.(type) {
case *RTPH264Ctx:
ctx := &c.RTPCtx
r.RTPCodecParameters = &ctx.RTPCodecParameters
var lastPacket *rtp.Packet
if from.IDR && len(c.SPS) > 0 && len(c.PPS) > 0 {
r.Append(ctx, pts, c.SPS[0])
r.Append(ctx, pts, c.PPS[0])
}
for _, nalu := range from.Raw.(Nalus) {
if reader := nalu.NewReader(); reader.Length > MTUSize {
payloadLen := MTUSize
if reader.Length+1 < payloadLen {
payloadLen = reader.Length + 1
}
//fu-a
mem := r.NextN(payloadLen)
reader.ReadBytesTo(mem[1:])
fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f
mem[0], mem[1] = fuaHead, naluType|startBit
lastPacket = r.Append(ctx, pts, mem)
for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) {
if reader.Length+2 < payloadLen {
payloadLen = reader.Length + 2
}
mem = r.NextN(payloadLen)
reader.ReadBytesTo(mem[2:])
mem[0], mem[1] = fuaHead, naluType
}
lastPacket.Payload[1] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.ReadBytesTo(mem)
lastPacket = r.Append(ctx, pts, mem)
}
}
lastPacket.Header.Marker = true
}
}
func (r *RTPVideo) Demux(ictx codec.ICodecCtx) (any, error) {
switch ictx.(type) { switch ictx.(type) {
case *RTPH264Ctx: case *RTPH264Ctx:
var nalus Nalus var nalus Nalus
@@ -219,17 +199,14 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
var naluType codec.H264NALUType var naluType codec.H264NALUType
gotNalu := func() { gotNalu := func() {
if nalu.Size > 0 { if nalu.Size > 0 {
nalus.Nalus = append(nalus.Nalus, nalu) nalus = append(nalus, nalu)
nalu = util.Memory{} nalu = util.Memory{}
} }
} }
for _, packet := range r.Packets { for _, packet := range r.Packets {
nalus.PTS = time.Duration(packet.Timestamp)
// TODO: B-frame
nalus.DTS = nalus.PTS
b0 := packet.Payload[0] b0 := packet.Payload[0]
if t := codec.ParseH264NALUType(b0); t < 24 { if t := codec.ParseH264NALUType(b0); t < 24 {
nalu.Append(packet.Payload) nalu.AppendOne(packet.Payload)
gotNalu() gotNalu()
} else { } else {
offset := t.Offset() offset := t.Offset()
@@ -240,7 +217,7 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
} }
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); { for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize { if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu.Append(buffer.ReadN(nextSize)) nalu.AppendOne(buffer.ReadN(nextSize))
gotNalu() gotNalu()
} else { } else {
return nil, fmt.Errorf("invalid nalu size %d", nextSize) return nil, fmt.Errorf("invalid nalu size %d", nextSize)
@@ -250,10 +227,10 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
b1 := packet.Payload[1] b1 := packet.Payload[1]
if util.Bit1(b1, 0) { if util.Bit1(b1, 0) {
naluType.Parse(b1) naluType.Parse(b1)
nalu.Append([]byte{naluType.Or(b0 & 0x60)}) nalu.AppendOne([]byte{naluType.Or(b0 & 0x60)})
} }
if nalu.Size > 0 { if nalu.Size > 0 {
nalu.Append(packet.Payload[offset:]) nalu.AppendOne(packet.Payload[offset:])
} else { } else {
return nil, errors.New("fu have no start") return nil, errors.New("fu have no start")
} }

View File

@@ -24,22 +24,16 @@ func TestRTPH264Ctx_CreateFrame(t *testing.T) {
var avFrame = &pkg.AVFrame{} var avFrame = &pkg.AVFrame{}
var mem util.Memory var mem util.Memory
mem.Append([]byte(randStr)) mem.Append([]byte(randStr))
avFrame.Raw = pkg.Nalus{ avFrame.Raw = []util.Memory{mem}
Nalus: []util.Memory{mem}, frame := new(RTPVideo)
} frame.Mux(ctx, avFrame)
f, err := ctx.CreateFrame(avFrame)
if err != nil {
t.Error(err)
return
}
frame := f.(*RTPVideo)
var track = &pkg.AVTrack{} var track = &pkg.AVTrack{}
_, _, raw, err := frame.Parse(track) err := frame.Parse(track)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if s := string(raw.(pkg.Nalus).Nalus[0].ToBytes()); s != randStr { if s := string(track.Value.Raw.(pkg.Nalus)[0].ToBytes()); s != randStr {
t.Error("not equal", len(s), len(randStr)) t.Error("not equal", len(s), len(randStr))
} }
} }

View File

@@ -349,7 +349,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
} else { } else {
var rtpCtx mrtp.RTPData var rtpCtx mrtp.RTPData
var tmpAVTrack AVTrack var tmpAVTrack AVTrack
err = rtpCtx.DecodeConfig(&tmpAVTrack, vt.ICodecCtx) err = rtpCtx.ConvertCtx(vt.ICodecCtx, &tmpAVTrack)
if err == nil { if err == nil {
rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter() rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
} else { } else {

View File

@@ -192,6 +192,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
frame := &t.Value frame := &t.Value
frame.Wraps = append(frame.Wraps, data) frame.Wraps = append(frame.Wraps, data)
ts := data.GetTimestamp() ts := data.GetTimestamp()
frame.CTS = data.GetCTS()
if p.lastTs == 0 { if p.lastTs == 0 {
p.baseTs -= ts p.baseTs -= ts
} }
@@ -232,7 +233,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.Unlock() p.Unlock()
} }
oldCodecCtx := t.ICodecCtx oldCodecCtx := t.ICodecCtx
t.Value.IDR, _, t.Value.Raw, err = data.Parse(t) err = data.Parse(t)
codecCtxChanged := oldCodecCtx != t.ICodecCtx codecCtxChanged := oldCodecCtx != t.ICodecCtx
if err != nil { if err != nil {
p.Error("parse", "err", err) p.Error("parse", "err", err)
@@ -261,16 +262,16 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.writeAV(t, data) p.writeAV(t, data)
if p.VideoTrack.Length > 1 && p.VideoTrack.IsReady() { if p.VideoTrack.Length > 1 && p.VideoTrack.IsReady() {
if t.Value.Raw == nil { if t.Value.Raw == nil {
t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) if err = t.Value.Demux(t.ICodecCtx); err != nil {
if err != nil {
t.Error("to raw", "err", err) t.Error("to raw", "err", err)
return err return err
} }
} }
var toFrame IAVFrame
for i, track := range p.VideoTrack.Items[1:] { for i, track := range p.VideoTrack.Items[1:] {
toType := track.FrameType.Elem()
toFrame := reflect.New(toType).Interface().(IAVFrame)
if track.ICodecCtx == nil { if track.ICodecCtx == nil {
err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) err = toFrame.ConvertCtx(t.ICodecCtx, track)
if err != nil { if err != nil {
track.Error("DecodeConfig", "err", err) track.Error("DecodeConfig", "err", err)
return return
@@ -278,26 +279,22 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if t.IDRingList.Len() > 0 { if t.IDRingList.Len() > 0 {
for rf := t.IDRingList.Front().Value; rf != t.Ring; rf = rf.Next() { for rf := t.IDRingList.Front().Value; rf != t.Ring; rf = rf.Next() {
if i == 0 && rf.Value.Raw == nil { if i == 0 && rf.Value.Raw == nil {
rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) if err = rf.Value.Demux(t.ICodecCtx); err != nil {
if err != nil {
t.Error("to raw", "err", err) t.Error("to raw", "err", err)
return err return err
} }
} }
if toFrame, err = track.CreateFrame(&rf.Value); err != nil { toFrame := reflect.New(toType).Interface().(IAVFrame)
track.Error("from raw", "err", err) toFrame.SetAllocator(data.GetAllocator())
return toFrame.Mux(track.ICodecCtx, &rf.Value)
}
rf.Value.Wraps = append(rf.Value.Wraps, toFrame) rf.Value.Wraps = append(rf.Value.Wraps, toFrame)
} }
} }
} }
if toFrame, err = track.CreateFrame(&t.Value); err != nil { toFrame.SetAllocator(data.GetAllocator())
track.Error("from raw", "err", err) toFrame.Mux(track.ICodecCtx, &t.Value)
return
}
if codecCtxChanged { if codecCtxChanged {
toFrame.DecodeConfig(track, t.ICodecCtx) err = toFrame.ConvertCtx(t.ICodecCtx, track)
} }
t.Value.Wraps = append(t.Value.Wraps, toFrame) t.Value.Wraps = append(t.Value.Wraps, toFrame)
if track.ICodecCtx != nil { if track.ICodecCtx != nil {
@@ -336,7 +333,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
p.Unlock() p.Unlock()
} }
oldCodecCtx := t.ICodecCtx oldCodecCtx := t.ICodecCtx
_, _, t.Value.Raw, err = data.Parse(t) err = data.Parse(t)
codecCtxChanged := oldCodecCtx != t.ICodecCtx codecCtxChanged := oldCodecCtx != t.ICodecCtx
if t.ICodecCtx == nil { if t.ICodecCtx == nil {
return ErrUnsupportCodec return ErrUnsupportCodec
@@ -345,16 +342,16 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
p.writeAV(t, data) p.writeAV(t, data)
if p.AudioTrack.Length > 1 && p.AudioTrack.IsReady() { if p.AudioTrack.Length > 1 && p.AudioTrack.IsReady() {
if t.Value.Raw == nil { if t.Value.Raw == nil {
t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) if err = t.Value.Demux(t.ICodecCtx); err != nil {
if err != nil {
t.Error("to raw", "err", err) t.Error("to raw", "err", err)
return err return err
} }
} }
var toFrame IAVFrame
for i, track := range p.AudioTrack.Items[1:] { for i, track := range p.AudioTrack.Items[1:] {
toType := track.FrameType.Elem()
toFrame := reflect.New(toType).Interface().(IAVFrame)
if track.ICodecCtx == nil { if track.ICodecCtx == nil {
err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) err = toFrame.ConvertCtx(t.ICodecCtx, track)
if err != nil { if err != nil {
track.Error("DecodeConfig", "err", err) track.Error("DecodeConfig", "err", err)
return return
@@ -362,26 +359,22 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
if idr := p.AudioTrack.GetOldestIDR(); idr != nil { if idr := p.AudioTrack.GetOldestIDR(); idr != nil {
for rf := idr; rf != t.Ring; rf = rf.Next() { for rf := idr; rf != t.Ring; rf = rf.Next() {
if i == 0 && rf.Value.Raw == nil { if i == 0 && rf.Value.Raw == nil {
rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) if err = rf.Value.Demux(t.ICodecCtx); err != nil {
if err != nil {
t.Error("to raw", "err", err) t.Error("to raw", "err", err)
return err return err
} }
} }
if toFrame, err = track.CreateFrame(&rf.Value); err != nil { toFrame := reflect.New(toType).Interface().(IAVFrame)
track.Error("from raw", "err", err) toFrame.SetAllocator(data.GetAllocator())
return toFrame.Mux(track.ICodecCtx, &rf.Value)
}
rf.Value.Wraps = append(rf.Value.Wraps, toFrame) rf.Value.Wraps = append(rf.Value.Wraps, toFrame)
} }
} }
} }
if toFrame, err = track.CreateFrame(&t.Value); err != nil { toFrame.SetAllocator(data.GetAllocator())
track.Error("from raw", "err", err) toFrame.Mux(track.ICodecCtx, &t.Value)
return
}
if codecCtxChanged { if codecCtxChanged {
toFrame.DecodeConfig(track, t.ICodecCtx) err = toFrame.ConvertCtx(t.ICodecCtx, track)
} }
t.Value.Wraps = append(t.Value.Wraps, toFrame) t.Value.Wraps = append(t.Value.Wraps, toFrame)
if track.ICodecCtx != nil { if track.ICodecCtx != nil {

View File

@@ -21,7 +21,6 @@ import (
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"m7s.live/m7s/v5/pb" "m7s.live/m7s/v5/pb"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
) )
@@ -40,10 +39,18 @@ var (
defaultLogHandler = console.NewHandler(os.Stdout, &console.HandlerOptions{TimeFormat: "15:04:05.000000"}) defaultLogHandler = console.NewHandler(os.Stdout, &console.HandlerOptions{TimeFormat: "15:04:05.000000"})
) )
type ServerConfig struct {
EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
SettingDir string `default:".m7s" desc:""`
EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小
PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
}
type Server struct { type Server struct {
pb.UnimplementedGlobalServer pb.UnimplementedGlobalServer
Plugin Plugin
config.Engine ServerConfig
ID int ID int
eventChan chan any eventChan chan any
Plugins util.Collection[string, *Plugin] Plugins util.Collection[string, *Plugin]
@@ -146,7 +153,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
} }
} }
s.Config.Parse(&s.config, "GLOBAL") s.Config.Parse(&s.config, "GLOBAL")
s.Config.Parse(&s.Engine, "GLOBAL") s.Config.Parse(&s.ServerConfig, "GLOBAL")
if cg != nil { if cg != nil {
s.Config.ParseUserFile(cg["global"]) s.Config.ParseUserFile(cg["global"])
} }

View File

@@ -17,6 +17,8 @@ import (
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
) )
var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
type Owner struct { type Owner struct {
Conn net.Conn Conn net.Conn
File *os.File File *os.File
@@ -89,11 +91,62 @@ type Subscriber struct {
VideoReader *AVRingReader VideoReader *AVRingReader
} }
func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) {
if s.Publisher == nil || dataType == nil {
return
}
var at *AVTrack
if dataType == AVFrameType {
at = s.Publisher.AudioTrack.AVTrack
awi = -1
} else {
at = s.Publisher.GetAudioTrack(dataType)
if at != nil {
awi = at.WrapIndex
}
}
if at != nil {
if err := at.WaitReady(); err != nil {
return
}
ar := NewAVRingReader(at)
s.AudioReader = ar
ar.StartTs = startAudioTs
ar.Logger = s.Logger.With("reader", dataType.String())
ar.Info("start read")
}
return
}
func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.Duration) (vwi int) {
if s.Publisher == nil || dataType == nil {
return
}
var vt *AVTrack
if dataType == AVFrameType {
vt = s.Publisher.VideoTrack.AVTrack
vwi = -1
} else {
vt = s.Publisher.GetVideoTrack(dataType)
if vt != nil {
vwi = vt.WrapIndex
}
}
if vt != nil {
if err := vt.WaitReady(); err != nil {
return
}
vr := NewAVRingReader(vt)
vr.StartTs = startVideoTs
s.VideoReader = vr
vr.Logger = s.Logger.With("reader", dataType.String())
vr.Info("start read")
}
return
}
func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) { func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) {
var ar, vr *AVRingReader
var a1, v1 reflect.Type var a1, v1 reflect.Type
var at, vt *AVTrack
var awi, vwi int
var startAudioTs, startVideoTs time.Duration var startAudioTs, startVideoTs time.Duration
var initState = 0 var initState = 0
prePublisher := s.Publisher prePublisher := s.Publisher
@@ -104,62 +157,14 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
if s.SubVideo { if s.SubVideo {
v1 = reflect.TypeOf(onVideo).In(0) v1 = reflect.TypeOf(onVideo).In(0)
} }
createAudioReader := func() { awi := s.createAudioReader(a1, startAudioTs)
if s.Publisher == nil || a1 == nil { vwi := s.createVideoReader(v1, startVideoTs)
return
}
if a1 == reflect.TypeOf(audioFrame) {
at = s.Publisher.AudioTrack.AVTrack
awi = -1
} else {
at = s.Publisher.GetAudioTrack(a1)
if at != nil {
awi = at.WrapIndex
}
}
if at != nil {
if err := at.WaitReady(); err != nil {
return
}
ar = NewAVRingReader(at)
s.AudioReader = ar
ar.StartTs = startAudioTs
ar.Logger = s.Logger.With("reader", a1.String())
ar.Info("start read")
}
}
createVideoReader := func() {
if s.Publisher == nil || v1 == nil {
return
}
if v1 == reflect.TypeOf(videoFrame) {
vt = s.Publisher.VideoTrack.AVTrack
vwi = -1
} else {
vt = s.Publisher.GetVideoTrack(v1)
if vt != nil {
vwi = vt.WrapIndex
}
}
if vt != nil {
if err := vt.WaitReady(); err != nil {
return
}
vr = NewAVRingReader(vt)
vr.StartTs = startVideoTs
s.VideoReader = vr
vr.Logger = s.Logger.With("reader", v1.String())
vr.Info("start read")
}
}
createAudioReader()
createVideoReader()
defer func() { defer func() {
if ar != nil { if s.AudioReader != nil {
ar.StopRead() s.AudioReader.StopRead()
} }
if vr != nil { if s.VideoReader != nil {
vr.StopRead() s.VideoReader.StopRead()
} }
}() }()
sendAudioFrame := func() (err error) { sendAudioFrame := func() (err error) {
@@ -170,7 +175,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
} }
err = onAudio(audioFrame.Wraps[awi].(A)) err = onAudio(audioFrame.Wraps[awi].(A))
} else { } else {
ar.StopRead() s.AudioReader.StopRead()
} }
} else { } else {
err = onAudio(any(audioFrame).(A)) err = onAudio(any(audioFrame).(A))
@@ -189,7 +194,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
} }
err = onVideo(videoFrame.Wraps[vwi].(V)) err = onVideo(videoFrame.Wraps[vwi].(V))
} else { } else {
vr.StopRead() s.VideoReader.StopRead()
} }
} else { } else {
err = onVideo(any(videoFrame).(V)) err = onVideo(any(videoFrame).(V))
@@ -203,23 +208,24 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
checkPublisherChange := func() { checkPublisherChange := func() {
if prePublisher != s.Publisher { if prePublisher != s.Publisher {
s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID)
if ar != nil { if s.AudioReader != nil {
startAudioTs = time.Duration(ar.AbsTime) * time.Millisecond startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond
ar.StopRead() s.AudioReader.StopRead()
ar = nil s.AudioReader = nil
} }
if vr != nil { if s.VideoReader != nil {
startVideoTs = time.Duration(vr.AbsTime) * time.Millisecond startVideoTs = time.Duration(s.VideoReader.AbsTime) * time.Millisecond
vr.StopRead() s.VideoReader.StopRead()
vr = nil s.VideoReader = nil
} }
createAudioReader() awi = s.createAudioReader(a1, startAudioTs)
createVideoReader() vwi = s.createVideoReader(v1, startVideoTs)
prePublisher = s.Publisher prePublisher = s.Publisher
} }
} }
for err == nil { for err == nil {
err = s.Err() err = s.Err()
ar, vr := s.AudioReader, s.VideoReader
if vr != nil { if vr != nil {
for err == nil { for err == nil {
err = vr.ReadFrame(&s.Subscribe) err = vr.ReadFrame(&s.Subscribe)
@@ -260,7 +266,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
} }
} }
} else { } else {
createVideoReader() vwi = s.createVideoReader(v1, startVideoTs)
} }
// 正常模式下或者纯音频模式下,音频开始播放 // 正常模式下或者纯音频模式下,音频开始播放
if ar != nil { if ar != nil {
@@ -305,7 +311,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
} }
} }
} else { } else {
createAudioReader() awi = s.createAudioReader(a1, startAudioTs)
} }
checkPublisherChange() checkPublisherChange()
runtime.Gosched() runtime.Gosched()