run rtmp audio success

This commit is contained in:
langhuihui
2024-03-27 17:34:18 +08:00
parent 9ff60ac668
commit ad13914b44
11 changed files with 195 additions and 84 deletions

View File

@@ -2,4 +2,4 @@ global:
loglevel: debug
rtmp:
publish:
pubaudio: false
# pubvideo: false

13
example/rtmp2rtmp/main.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import (
"context"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/rtmp"
)
func main() {
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
m7s.Run(context.Background(), "config.yaml")
}

View File

@@ -16,7 +16,6 @@ type AVFrame struct {
Wrap IAVFrame `json:"-" yaml:"-"` // 封装格式
}
type DataFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
@@ -82,7 +81,6 @@ func (df *DataFrame) Init() {
func (df *DataFrame) Reset() {
df.BytesIn = 0
df.DeltaTime = 0
}
type ICodecCtx interface {
@@ -94,6 +92,7 @@ type IAVFrame interface {
DecodeConfig(*AVTrack) error
ToRaw(*AVTrack) (any, error)
FromRaw(*AVTrack, any) error
GetTimestamp() time.Duration
Recycle()
IsIDR() bool
}

View File

@@ -3,7 +3,6 @@ package pkg
import (
"log/slog"
"slices"
"time"
"m7s.live/m7s/v5/pkg/util"
)
@@ -37,28 +36,8 @@ type AVTrack struct {
Track
RingWriter
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
BufferTime time.Duration //发布者配置中的缓冲时间(时光回溯)
ICodecCtx
SSRC uint32
SampleRate uint32
PayloadType byte
}
func (av *AVTrack) Narrow(gop int) {
if l := av.Size - gop; l > 12 {
av.Debug("resize", "before", av.Size, "after", av.Size-5)
//缩小缓冲环节省内存
av.Reduce(5)
}
}
func (av *AVTrack) AddIDR(r *util.Ring[AVFrame]) {
if av.BufferTime > 0 {
av.IDRingList.AddIDR(r)
if av.HistoryRing == nil {
av.HistoryRing = av.IDRing
}
} else {
av.IDRing = r
}
}

View File

@@ -175,13 +175,13 @@ func (p *Plugin) Start() {
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.server.config.HTTP.ListenAddrTLS) {
go func() {
p.Info("https listen at ", "addr", aurora.Blink(httpConf.ListenAddrTLS))
p.CancelCauseFunc(httpConf.ListenTLS())
p.Stop(httpConf.ListenTLS())
}()
}
if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.server.config.HTTP.ListenAddr) {
go func() {
p.Info("http listen at ", "addr", aurora.Blink(httpConf.ListenAddr))
p.CancelCauseFunc(httpConf.Listen())
p.Stop(httpConf.Listen())
}()
}
tcpConf := p.config.TCP
@@ -197,7 +197,7 @@ func (p *Plugin) Start() {
l, err := net.Listen("tcp", tcpConf.ListenAddr)
if err != nil {
p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err)
p.CancelCauseFunc(err)
p.Stop(err)
return
}
defer l.Close()
@@ -213,7 +213,7 @@ func (p *Plugin) Start() {
}
if err != nil {
p.Error("LoadX509KeyPair", "error", err)
p.CancelCauseFunc(err)
p.Stop(err)
return
}
l, err := tls.Listen("tcp", tcpConf.ListenAddrTLS, &tls.Config{
@@ -221,7 +221,7 @@ func (p *Plugin) Start() {
})
if err != nil {
p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err)
p.CancelCauseFunc(err)
p.Stop(err)
return
}
defer l.Close()
@@ -250,18 +250,32 @@ func (p *Plugin) OnTCPConnect(conn *net.TCPConn) {
p.handler.OnEvent(conn)
}
func (p *Plugin) Publish(streamPath string) (publisher *Publisher, err error) {
func (p *Plugin) Publish(streamPath string, options ...any) (publisher *Publisher, err error) {
publisher = &Publisher{Publish: p.config.Publish}
publisher.Init(p, streamPath)
ctx := p.Context
for _, option := range options {
switch v := option.(type) {
case context.Context:
ctx = v
}
}
publisher.Init(ctx, p, streamPath)
publisher.Subscribers = make(map[*Subscriber]struct{})
publisher.TransTrack = make(map[reflect.Type]*AVTrack)
err = sendPromiseToServer(p.server, publisher)
return
}
func (p *Plugin) Subscribe(streamPath string) (subscriber *Subscriber, err error) {
func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) {
subscriber = &Subscriber{Subscribe: p.config.Subscribe}
subscriber.Init(p, streamPath)
ctx := p.Context
for _, option := range options {
switch v := option.(type) {
case context.Context:
ctx = v
}
}
subscriber.Init(ctx, p, streamPath)
err = sendPromiseToServer(p.server, subscriber)
return
}

View File

@@ -33,14 +33,13 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
var err error
logger.Info("conn")
defer func() {
// ze := zap.Error(err)
// logger.Info("conn close", ze)
// for _, sender := range senders {
// sender.Stop(ze)
// }
// for _, receiver := range receivers {
// receiver.Stop(ze)
// }
p.Info("conn close")
for _, sender := range senders {
sender.Stop(err)
}
for _, receiver := range receivers {
receiver.Stop(err)
}
}()
nc := NewNetConnection(conn)
// ctx, cancel := context.WithCancel(p)

View File

@@ -22,11 +22,29 @@ func (avcc *RTMPAudio) DecodeConfig(track *AVTrack) error {
switch b0 & 0b1111_0000 >> 4 {
case 7:
track.Codec = "pcmu"
var ctx G711Ctx
track.ICodecCtx = &ctx
case 8:
track.Codec = "pcma"
var ctx G711Ctx
track.ICodecCtx = &ctx
case 10:
track.Codec = "aac"
var ctx AACCtx
b0, err = reader.ReadByte()
if err != nil {
return err
}
b1, err = reader.ReadByte()
if err != nil {
return err
}
ctx.AudioObjectType = b0 >> 3
ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7)
ctx.ChannelConfiguration = (b1 >> 3) & 0x0F
ctx.FrameLengthFlag = (b1 >> 2) & 0x01
ctx.DependsOnCoreCoder = (b1 >> 1) & 0x01
ctx.ExtensionFlag = b1 & 0x01
ctx.SequenceFrame = avcc
track.ICodecCtx = &ctx
}

View File

@@ -324,11 +324,33 @@ type H265Ctx struct {
H264Ctx
VPS []byte
}
type G711Ctx struct {
}
func (ctx *G711Ctx) GetSequenceFrame() IAVFrame {
return nil
}
type AACCtx struct {
AudioSpecificConfig
SequenceFrame *RTMPAudio
}
func (ctx *AACCtx) GetSequenceFrame() IAVFrame {
return ctx.SequenceFrame
}
}
type GASpecificConfig struct {
FrameLengthFlag byte // 1 bit
DependsOnCoreCoder byte // 1 bit
ExtensionFlag byte // 1 bit
}
type AudioSpecificConfig struct {
AudioObjectType byte // 5 bits
SamplingFrequencyIndex byte // 4 bits
ChannelConfiguration byte // 4 bits
GASpecificConfig
}
var SamplingFrequencies = [...]int{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, 0, 0, 0}

View File

@@ -1,6 +1,8 @@
package pkg
import (
"time"
"m7s.live/m7s/v5/pkg/util"
)
@@ -22,6 +24,9 @@ type RTMPData struct {
util.RecyclebleMemory
}
func (avcc *RTMPData) GetTimestamp() time.Duration {
return time.Duration(avcc.Timestamp) * time.Millisecond
}
func (avcc *RTMPData) IsIDR() bool {
return false
}

View File

@@ -28,30 +28,10 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
return
}
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) (err error) {
if t.ICodecCtx == nil {
return data.DecodeConfig(t)
}
t.Ring.Value.Wrap = data
// if n := len(t.DataTypes); n > 1 {
// t.Ring.Value.Raw, err = data.ToRaw(t)
// if err != nil {
// return
// }
// if t.Ring.Value.Raw == nil {
// return
// }
// for i := 1; i < n; i++ {
// if len(t.Ring.Value.Wrap) <= i {
// t.Ring.Value.Wrap = append(t.Ring.Value.Wrap, nil)
// }
// t.Ring.Value.Wrap[i] = reflect.New(t.DataTypes[i]).Interface().(IAVFrame)
// t.Ring.Value.Wrap[i].FromRaw(t, t.Ring.Value.Raw)
// }
// }
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
t.Value.Wrap = data
t.Value.Timestamp = data.GetTimestamp()
t.Step()
return
}
func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
@@ -68,17 +48,29 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.TransTrack[reflect.TypeOf(data)] = t
p.Unlock()
}
// if t.IDRing != nil {
// p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
// if t.HistoryRing == nil {
// t.Narrow(p.GOP)
// }
// }
cur := t.Ring
err = p.writeAV(t, data)
if err == nil && data.IsIDR() {
t.AddIDR(cur)
if t.ICodecCtx == nil {
return data.DecodeConfig(t)
}
if data.IsIDR() {
if t.IDRing != nil {
p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
if t.HistoryRing == nil {
if l := t.Size - p.GOP; l > 12 {
t.Debug("resize", "before", t.Size, "after", t.Size-5)
t.Reduce(5) //缩小缓冲环节省内存
}
}
}
if p.BufferTime > 0 {
t.IDRingList.AddIDR(t.Ring)
if t.HistoryRing == nil {
t.HistoryRing = t.IDRing
}
} else {
t.IDRing = t.Ring
}
}
p.writeAV(t, data)
return
}
@@ -96,7 +88,11 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
p.TransTrack[reflect.TypeOf(data)] = t
p.Unlock()
}
return p.writeAV(t, data)
if t.ICodecCtx == nil {
return data.DecodeConfig(t)
}
p.writeAV(t, data)
return
}
func (p *Publisher) WriteData(data IDataFrame) (err error) {

View File

@@ -14,6 +14,7 @@ import (
)
type PubSubBase struct {
ID string
*slog.Logger `json:"-" yaml:"-"`
context.Context `json:"-" yaml:"-"`
context.CancelCauseFunc `json:"-" yaml:"-"`
@@ -28,7 +29,7 @@ func (ps *PubSubBase) Stop(err error) {
ps.CancelCauseFunc(err)
}
func (ps *PubSubBase) Init(p *Plugin, streamPath string) {
func (ps *PubSubBase) Init(ctx context.Context, p *Plugin, streamPath string) {
ps.Plugin = p
ps.Context, ps.CancelCauseFunc = context.WithCancelCause(p.Context)
if u, err := url.Parse(streamPath); err == nil {
@@ -54,6 +55,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
at := s.Publisher.GetAudioTrack(a1)
if at != nil {
ar = NewAVRingReader(at)
ar.Logger = s.Logger.With("reader", a1.Name())
ah = reflect.ValueOf(audioHandler)
}
}
@@ -62,6 +64,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
vt := s.Publisher.GetVideoTrack(v1)
if vt != nil {
vr = NewAVRingReader(vt)
vr.Logger = s.Logger.With("reader", v1.Name())
vh = reflect.ValueOf(videoHandler)
}
}
@@ -71,20 +74,40 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
if s.Args.Has(s.SubModeArgName) {
subMode, _ = strconv.Atoi(s.Args.Get(s.SubModeArgName))
}
var audioFrame, videoFrame *AVFrame
var audioFrame, videoFrame, lastSentAF, lastSentVF *AVFrame
defer func() {
if lastSentVF != nil {
lastSentVF.ReaderLeave()
}
if lastSentAF != nil {
lastSentAF.ReaderLeave()
}
s.Info("subscriber stopped", "reason", context.Cause(s.Context))
}()
sendAudioFrame := func() {
lastSentAF = audioFrame
s.Debug("send audio frame", "frame", audioFrame.Sequence)
ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)})
}
sendVideoFrame := func() {
lastSentVF = videoFrame
s.Debug("send video frame", "frame", videoFrame.Sequence)
vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
}
for err := s.Err(); err == nil; err = s.Err() {
if vr != nil {
for err == nil {
err = vr.ReadFrame(subMode)
if err == nil {
videoFrame = &vr.Value
err = s.Err()
} else {
s.Stop(err)
}
if err != nil {
s.Stop(err)
// stopReason = zap.Error(err)
return
}
videoFrame = &vr.Value
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if videoFrame.Wrap.IsIDR() && vr.DecConfChanged() {
vr.LastCodecCtx = vr.Track.ICodecCtx
@@ -95,7 +118,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
if audioFrame != nil {
if util.Conditoinal(s.SyncMode == 0, videoFrame.Timestamp > audioFrame.Timestamp, videoFrame.WriteTime.After(audioFrame.WriteTime)) {
// fmt.Println("switch audio", audioFrame.CanRead)
ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)})
sendAudioFrame()
audioFrame = nil
break
}
@@ -105,9 +128,52 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
}
if !s.IFrameOnly || videoFrame.Wrap.IsIDR() {
vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
sendVideoFrame()
}
}
}
// 正常模式下或者纯音频模式下,音频开始播放
if ar != nil {
for err == nil {
switch ar.State {
case READSTATE_INIT:
if vr != nil {
ar.FirstTs = vr.FirstTs
}
case READSTATE_NORMAL:
if vr != nil {
ar.SkipTs = vr.SkipTs
}
}
err = ar.ReadFrame(subMode)
if err == nil {
audioFrame = &ar.Value
err = s.Err()
} else {
// fmt.Println("skip video", frame.Sequence)
s.Stop(err)
}
if err != nil {
return
}
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if ar.DecConfChanged() {
ar.LastCodecCtx = ar.Track.ICodecCtx
if sf := ar.Track.ICodecCtx.GetSequenceFrame(); sf != nil {
ah.Call([]reflect.Value{reflect.ValueOf(sf)})
}
}
if vr != nil && videoFrame != nil {
if util.Conditoinal(s.SyncMode == 0, audioFrame.Timestamp > videoFrame.Timestamp, audioFrame.WriteTime.After(videoFrame.WriteTime)) {
sendVideoFrame()
videoFrame = nil
break
}
}
if audioFrame.Timestamp >= ar.SkipTs {
sendAudioFrame()
} else {
s.Debug("skip audio", "frame.AbsTime", audioFrame.Timestamp, "s.AudioReader.SkipTs", ar.SkipTs)
}
}
}