run rtmp success

This commit is contained in:
langhuihui
2024-03-26 20:27:03 +08:00
parent aca9a8d9cb
commit 9ff60ac668
23 changed files with 603 additions and 477 deletions

3
.gitignore vendored
View File

@@ -1 +1,2 @@
.history
.history
.vscode

View File

@@ -0,0 +1,5 @@
global:
loglevel: debug
rtmp:
publish:
pubaudio: false

1
go.mod
View File

@@ -1,6 +1,7 @@
module m7s.live/m7s/v5
go 1.22
toolchain go1.22.1
require github.com/quic-go/quic-go v0.42.0

View File

@@ -7,14 +7,13 @@ import (
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type AVFrame struct {
DataFrame
Timestamp time.Duration // 绝对时间戳
Wrap []IAVFrame `json:"-" yaml:"-"` // 封装格式
Wrap IAVFrame `json:"-" yaml:"-"` // 封装格式
}
type DataFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒
@@ -27,9 +26,6 @@ type DataFrame struct {
sync.Cond `json:"-" yaml:"-"`
}
func NewDataFrame[T any]() *DataFrame {
return &DataFrame{}
}
func (df *DataFrame) IsWriting() bool {
return !df.CanRead
}
@@ -68,6 +64,7 @@ func (df *DataFrame) StartWrite() bool {
df.Discard() //标记为废弃
return false
} else {
df.Init()
df.CanRead = false //标记为正在写入
return true
}
@@ -88,24 +85,8 @@ func (df *DataFrame) Reset() {
df.DeltaTime = 0
}
type CodecCtx struct {
}
func Update(ICodecCtx) {
}
type VideoCodecCtx struct {
CodecCtx
NalulenSize int
SPSInfo codec.SPSInfo
SequenceData net.Buffers
}
type AudioCodecCtx struct {
CodecCtx
}
type ICodecCtx interface {
GetSequenceFrame() IAVFrame
}
type IDataFrame interface {
}
@@ -114,6 +95,7 @@ type IAVFrame interface {
ToRaw(*AVTrack) (any, error)
FromRaw(*AVTrack, any) error
Recycle()
IsIDR() bool
}
type Nalu [][]byte

View File

@@ -1,6 +1,8 @@
package pkg
import "m7s.live/m7s/v5/pkg/util"
import (
"m7s.live/m7s/v5/pkg/util"
)
type RingReader struct {
*util.Ring[AVFrame]

View File

@@ -42,6 +42,7 @@ func (rb *RingWriter) Init(n int) *RingWriter {
rb.Ring = util.NewRing[AVFrame](n)
rb.Size = n
rb.LastValue = &rb.Value
rb.LastValue.Init()
return rb
}
@@ -90,7 +91,6 @@ func (rb *RingWriter) Reduce(size int) {
rb.Recycle(r)
}
rb.Size -= size
return
}
func (rb *RingWriter) Step() (normal bool) {

View File

@@ -2,8 +2,8 @@ package pkg
import (
"log/slog"
"reflect"
"slices"
"time"
"m7s.live/m7s/v5/pkg/util"
)
@@ -37,9 +37,28 @@ type AVTrack struct {
Track
RingWriter
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
BufferTime time.Duration //发布者配置中的缓冲时间(时光回溯)
ICodecCtx
SSRC uint32
SampleRate uint32
DataTypes []reflect.Type `json:"-" yaml:"-"`
PayloadType byte
}
func (av *AVTrack) Narrow(gop int) {
if l := av.Size - gop; l > 12 {
av.Debug("resize", "before", av.Size, "after", av.Size-5)
//缩小缓冲环节省内存
av.Reduce(5)
}
}
func (av *AVTrack) AddIDR(r *util.Ring[AVFrame]) {
if av.BufferTime > 0 {
av.IDRingList.AddIDR(r)
if av.HistoryRing == nil {
av.HistoryRing = av.IDRing
}
} else {
av.IDRing = r
}
}

View File

@@ -57,16 +57,13 @@ func (buffers *Buffers) ReadByte() (byte, error) {
if buffers.Length == 0 {
return 0, io.EOF
}
level0 := buffers.Buffers[buffers.offset0]
b := level0[buffers.offset1]
buffers.offset1++
buffers.Length--
buffers.Offset++
if buffers.offset1 >= len(level0) {
buffers.offset0++
buffers.offset1 = 0
level1 := buffers.GetLevel1()
if len(level1) == 1 {
defer buffers.move0()
} else {
defer buffers.move1(1)
}
return b, nil
return level1[0], nil
}
func (buffers *Buffers) LEB128Unmarshal() (uint, int, error) {
@@ -89,36 +86,88 @@ func (buffers *Buffers) LEB128Unmarshal() (uint, int, error) {
return v, n, nil
}
func (buffers *Buffers) GetLevel0() []byte {
return buffers.Buffers[buffers.offset0]
}
func (buffers *Buffers) GetLevel1() []byte {
return buffers.GetLevel0()[buffers.offset1:]
}
func (buffers *Buffers) Skip(n int) error {
if n > buffers.Length {
return io.EOF
}
buffers.Length -= n
buffers.Offset += n
for n > 0 {
level0 := buffers.Buffers[buffers.offset0]
level1 := level0[buffers.offset1:]
if n < len(level1) {
buffers.offset1 += n
level1 := buffers.GetLevel1()
level1Len := len(level1)
if n < level1Len {
buffers.move1(n)
break
}
n -= len(level1)
buffers.offset0++
buffers.offset1 = 0
n -= level1Len
buffers.move0()
if buffers.Length == 0 && n > 0 {
return io.EOF
}
}
return nil
}
func (buffers *Buffers) move1(n int) {
buffers.offset1 += n
buffers.Length -= n
buffers.Offset += n
}
func (buffers *Buffers) move0() {
len0 := len(buffers.GetLevel0())
buffers.Offset += len0
buffers.Length -= len0
buffers.offset0++
buffers.offset1 = 0
}
func (buffers *Buffers) ReadBytes(n int) ([]byte, error) {
if n > buffers.Length {
return nil, io.EOF
}
l := n
b := make([]byte, n)
buffers.Read(b)
buffers.Length -= n
for n > 0 {
level1 := buffers.GetLevel1()
level1Len := len(level1)
if n < level1Len {
copy(b[l-n:], level1[:n])
buffers.move1(n)
break
}
copy(b[l-n:], level1)
n -= level1Len
buffers.move0()
if buffers.Length == 0 && n > 0 {
return nil, io.EOF
}
}
return b, nil
}
func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) {
for actual = n; buffers.Length > 0; buffers.move0() {
level0 := buffers.GetLevel0()
level1 := buffers.GetLevel1()
remain1 := len(level1)
if remain1 > n {
*result = append(*result, level0[buffers.offset1:buffers.offset1+n])
buffers.move1(n)
return actual
}
*result = append(*result, level1)
n -= remain1
}
return actual - n
}
func (buffers *Buffers) ReadBE(n int) (num int, err error) {
for i := range n {
b, err := buffers.ReadByte()

View File

@@ -17,6 +17,10 @@ func (p *Pool[T]) Get() T {
return t
}
func (p *Pool[T]) Clear() {
p.pool = p.pool[:0]
}
func (p *Pool[T]) Put(t T) {
p.pool = append(p.pool, t)
}
@@ -24,6 +28,7 @@ func (p *Pool[T]) Put(t T) {
type IPool[T any] interface {
Get() T
Put(T)
Clear()
}
type RecyclebleMemory struct {

View File

@@ -1,6 +1,9 @@
package util
import "context"
import (
"context"
"errors"
)
type Promise[T any] struct {
context.Context
@@ -13,3 +16,9 @@ func NewPromise[T any](v T) *Promise[T] {
p.Context, p.CancelCauseFunc = context.WithCancelCause(context.Background())
return p
}
var ErrResolve = errors.New("promise resolved")
func (p *Promise[T]) Fulfill(err error) {
p.CancelCauseFunc(Conditoinal(err == nil, ErrResolve, err))
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/logrusorgru/aurora/v4"
"github.com/mcuadros/go-defaults"
"gopkg.in/yaml.v3"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
@@ -74,6 +75,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
} else {
p.assign()
}
p.Info("init", "version", plugin.Version)
instance.OnInit()
go p.Start()
}
@@ -119,11 +121,14 @@ func InstallPlugin[C IPlugin](options ...any) error {
return nil
}
func sendPromiseToServer[T any](server *Server, value T) error {
func sendPromiseToServer[T any](server *Server, value T) (err error) {
promise := util.NewPromise(value)
server.eventChan <- promise
<-promise.Done()
return context.Cause(promise.Context)
if err = context.Cause(promise.Context); err == util.ErrResolve {
err = nil
}
return
}
type Plugin struct {
@@ -191,12 +196,12 @@ func (p *Plugin) Start() {
if p.config.TCP.ListenAddr != "" {
l, err := net.Listen("tcp", tcpConf.ListenAddr)
if err != nil {
p.Error("tcp listen error", "addr", tcpConf.ListenAddr, "error", err)
p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err)
p.CancelCauseFunc(err)
return
}
defer l.Close()
p.Info("tcp listen at ", "addr", aurora.Blink(tcpConf.ListenAddr))
p.Info("listen tcp", "addr", tcpConf.ListenAddr)
for i := 0; i < count; i++ {
go tcpConf.Listen(l, tcphandler.OnTCPConnect)
}
@@ -215,12 +220,12 @@ func (p *Plugin) Start() {
Certificates: []tls.Certificate{keyPair},
})
if err != nil {
p.Error("tls tcp listen error", "addr", tcpConf.ListenAddrTLS, "error", err)
p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err)
p.CancelCauseFunc(err)
return
}
defer l.Close()
p.Info("tls tcp listen at ", "addr", aurora.Blink(tcpConf.ListenAddrTLS))
p.Info("listen tcp tls", "addr", tcpConf.ListenAddrTLS)
for i := 0; i < count; i++ {
go tcpConf.Listen(l, tcphandler.OnTCPConnect)
}
@@ -249,6 +254,7 @@ func (p *Plugin) Publish(streamPath string) (publisher *Publisher, err error) {
publisher = &Publisher{Publish: p.config.Publish}
publisher.Init(p, streamPath)
publisher.Subscribers = make(map[*Subscriber]struct{})
publisher.TransTrack = make(map[reflect.Type]*AVTrack)
err = sendPromiseToServer(p.server, publisher)
return
}

View File

@@ -5,12 +5,12 @@ import (
"net"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/rtmp/pkg"
. "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type RTMPPlugin struct {
m7s.Plugin
ChunkSize int
ChunkSize int `default:"1024"`
KeepAlive bool
}
@@ -27,11 +27,11 @@ var _ = m7s.InstallPlugin[*RTMPPlugin](m7s.DefaultYaml(`tcp:
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
defer conn.Close()
// logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String()))
// senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*pkg.RTMPReceiver)
logger := p.Logger.With("remote", conn.RemoteAddr().String())
senders := make(map[uint32]*RTMPSender)
receivers := make(map[uint32]*RTMPReceiver)
var err error
// logger.Info("conn")
logger.Info("conn")
defer func() {
// ze := zap.Error(err)
// logger.Info("conn close", ze)
@@ -42,15 +42,15 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
// receiver.Stop(ze)
// }
}()
nc := pkg.NewNetConnection(conn)
nc := NewNetConnection(conn)
// ctx, cancel := context.WithCancel(p)
// defer cancel()
/* Handshake */
if err = nc.Handshake(); err != nil {
// logger.Error("handshake", zap.Error(err))
logger.Error("handshake", "error", err)
return
}
var msg *pkg.Chunk
var msg *Chunk
var gstreamid uint32
for {
if msg, err = nc.RecvMessage(); err == nil {
@@ -58,14 +58,14 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
continue
}
switch msg.MessageTypeID {
case pkg.RTMP_MSG_AMF0_COMMAND:
case RTMP_MSG_AMF0_COMMAND:
if msg.MsgData == nil {
break
}
// cmd := msg.MsgData.(pkg.Commander).GetCommand()
// logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
cmd := msg.MsgData.(Commander).GetCommand()
logger.Debug("recv cmd", "commandName", cmd.CommandName, "streamID", msg.MessageStreamID)
switch cmd := msg.MsgData.(type) {
case *pkg.CallMessage: //connect
case *CallMessage: //connect
app := cmd.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
switch v := objectEncoding.(type) {
@@ -75,17 +75,33 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
nc.ObjectEncoding = 0
}
nc.AppName = app.(string)
// logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
err = nc.SendMessage(pkg.RTMP_MSG_ACK_SIZE, pkg.Uint32Message(512<<10))
logger.Info("connect", "appName", nc.AppName, "objectEncoding", nc.ObjectEncoding)
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
if err != nil {
logger.Error("sendMessage ack size", "error", err)
return
}
nc.WriteChunkSize = p.ChunkSize
err = nc.SendMessage(pkg.RTMP_MSG_CHUNK_SIZE, pkg.Uint32Message(p.ChunkSize))
err = nc.SendMessage(pkg.RTMP_MSG_BANDWIDTH, &pkg.SetPeerBandwidthMessage{
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(p.ChunkSize))
if err != nil {
logger.Error("sendMessage chunk size", "error", err)
return
}
err = nc.SendMessage(RTMP_MSG_BANDWIDTH, &SetPeerBandwidthMessage{
AcknowledgementWindowsize: uint32(512 << 10),
LimitType: byte(2),
})
err = nc.SendStreamID(pkg.RTMP_USER_STREAM_BEGIN, 0)
m := new(pkg.ResponseConnectMessage)
m.CommandName = pkg.Response_Result
if err != nil {
logger.Error("sendMessage bandwidth", "error", err)
return
}
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0)
if err != nil {
logger.Error("sendMessage stream begin", "error", err)
return
}
m := new(ResponseConnectMessage)
m.CommandName = Response_Result
m.TransactionId = 1
m.Properties = map[string]any{
"fmsVer": "monibuca/" + m7s.Version,
@@ -94,21 +110,25 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
"Author": "dexter",
}
m.Infomation = map[string]any{
"level": pkg.Level_Status,
"code": pkg.NetConnection_Connect_Success,
"level": Level_Status,
"code": NetConnection_Connect_Success,
"objectEncoding": nc.ObjectEncoding,
}
err = nc.SendMessage(pkg.RTMP_MSG_AMF0_COMMAND, m)
case *pkg.CommandMessage: // "createStream"
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
if err != nil {
logger.Error("sendMessage connect", "error", err)
return
}
case *CommandMessage: // "createStream"
gstreamid++
// logger.Info("createStream:", zap.Uint32("streamId", gstreamid))
logger.Info("createStream:", "streamId", gstreamid)
nc.ResponseCreateStream(cmd.TransactionId, gstreamid)
case *pkg.CURDStreamMessage:
case *CURDStreamMessage:
// if stream, ok := receivers[cmd.StreamId]; ok {
// stream.Stop()
// delete(senders, cmd.StreamId)
// }
case *pkg.ReleaseStreamMessage:
case *ReleaseStreamMessage:
// m := &CommandMessage{
// CommandName: "releaseStream_error",
// TransactionId: cmd.TransactionId,
@@ -122,9 +142,9 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
// }
// }
// err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case *pkg.PublishMessage:
receiver := &pkg.RTMPReceiver{
NetStream: pkg.NetStream{
case *PublishMessage:
receiver := &RTMPReceiver{
NetStream: NetStream{
NetConnection: nc,
StreamID: cmd.StreamId,
},
@@ -136,24 +156,33 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
receiver.Publisher, err = p.Publish(nc.AppName + "/" + cmd.PublishingName)
if err != nil {
delete(receivers, cmd.StreamId)
err = receiver.Response(cmd.TransactionId, pkg.NetStream_Publish_BadName, pkg.Level_Error)
err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
} else {
receivers[cmd.StreamId] = receiver
receiver.Begin()
err = receiver.Response(cmd.TransactionId, pkg.NetStream_Publish_Start, pkg.Level_Status)
err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status)
}
case *pkg.PlayMessage:
// streamPath := nc.appName + "/" + cmd.StreamName
// sender := &RTMPSubscriber{}
// sender.NetStream = NetStream{
// nc,
// cmd.StreamId,
// }
case *PlayMessage:
streamPath := nc.AppName + "/" + cmd.StreamName
sender := &RTMPSender{}
sender.NetConnection = nc
sender.StreamID = cmd.StreamId
// sender.SetParentCtx(ctx)
// if !config.KeepAlive {
// sender.SetIO(conn)
// }
if !p.KeepAlive {
// sender.SetIO(conn)
}
// sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
sender.Subscriber, err = p.Subscribe(streamPath)
if err != nil {
err = sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
} else {
senders[sender.StreamID] = sender
sender.Begin()
err = sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status)
err = sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status)
sender.Init()
go sender.Handle(sender.SendAudio, sender.SendVideo)
}
// if RTMPPlugin.Subscribe(streamPath, sender) != nil {
// sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
// } else {
@@ -164,24 +193,24 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
// go sender.PlayRaw()
// }
}
case pkg.RTMP_MSG_AUDIO:
case RTMP_MSG_AUDIO:
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveAudio(msg)
r.WriteAudio(&RTMPAudio{msg.AVData})
} else {
// logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID))
logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID)
}
case pkg.RTMP_MSG_VIDEO:
case RTMP_MSG_VIDEO:
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveVideo(msg)
r.WriteVideo(&RTMPVideo{msg.AVData})
} else {
// logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID))
logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID)
}
}
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
// logger.Info("rtmp client closed")
logger.Info("rtmp client closed")
return
} else {
// logger.Warn("ReadMessage", zap.Error(err))
logger.Warn("ReadMessage", "error", err)
return
}
}

50
plugin/rtmp/pkg/audio.go Normal file
View File

@@ -0,0 +1,50 @@
package pkg
import (
. "m7s.live/m7s/v5/pkg"
)
type RTMPAudio struct {
RTMPData
}
func (avcc *RTMPAudio) DecodeConfig(track *AVTrack) error {
reader := avcc.Buffers
b0, err := reader.ReadByte()
if err != nil {
return err
}
b1, err := reader.ReadByte()
if err != nil {
return err
}
if b1 == 0 {
switch b0 & 0b1111_0000 >> 4 {
case 7:
track.Codec = "pcmu"
case 8:
track.Codec = "pcma"
case 10:
track.Codec = "aac"
var ctx AACCtx
ctx.SequenceFrame = avcc
track.ICodecCtx = &ctx
}
}
return nil
}
func (avcc *RTMPAudio) ToRaw(track *AVTrack) (any, error) {
reader := avcc.Buffers
if track.Codec == "aac" {
err := reader.Skip(2)
return reader.Buffers, err
} else {
err := reader.Skip(1)
return reader.Buffers, err
}
}
func (avcc *RTMPAudio) FromRaw(track *AVTrack, raw any) error {
return nil
}

View File

@@ -5,12 +5,11 @@ import (
"encoding/binary"
"errors"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type AVCDecoderConfigurationRecord struct {
ConfigurationVersion byte // 8 bits Version
AVCProfileIndication byte // 8 bits
@@ -64,7 +63,6 @@ func (p *AVCDecoderConfigurationRecord) Unmarshal(b *util.Buffers) (err error) {
if err != nil {
return
}
b.Skip(6)
var sps, pps [][]byte
for range p.NumOfSequenceParameterSets {
spslen, err1 := b.ReadBE(2)
@@ -311,13 +309,26 @@ func ParseSPS(data []byte) (self codec.SPSInfo, err error) {
// }
type H264Ctx struct {
SequenceFrame *RTMPVideo
codec.SPSInfo
NalulenSize int
SPS []byte
PPS []byte
}
func (ctx *H264Ctx) GetSequenceFrame() IAVFrame {
return ctx.SequenceFrame
}
type H265Ctx struct {
H264Ctx
VPS []byte
}
type AACCtx struct {
SequenceFrame *RTMPAudio
}
func (ctx *AACCtx) GetSequenceFrame() IAVFrame {
return ctx.SequenceFrame
}

View File

@@ -21,3 +21,7 @@ type RTMPData struct {
util.Buffers
util.RecyclebleMemory
}
func (avcc *RTMPData) IsIDR() bool {
return false
}

View File

@@ -1,127 +1,131 @@
package pkg
import "m7s.live/m7s/v5"
import (
"errors"
"net"
"runtime"
// type AVSender struct {
// *RTMPSender
// ChunkHeader
// firstSent bool
// }
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
)
// func (av *AVSender) sendSequenceHead(seqHead []byte) {
// av.SetTimestamp(0)
// av.MessageLength = uint32(len(seqHead))
// for !av.writing.CompareAndSwap(false, true) {
// runtime.Gosched()
// }
// defer av.writing.Store(false)
// if av.firstSent {
// av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
// } else {
// av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
// }
// av.sendChunk(seqHead)
// }
type AVSender struct {
*RTMPSender
ChunkHeader
lastAbs uint32
}
// func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
// seq := frame.Sequence
// payloadLen := frame.AVCC.ByteLength
// if payloadLen == 0 {
// err := errors.New("payload is empty")
// av.Error("payload is empty", zap.Error(err))
// return err
// }
// if av.writeSeqNum > av.bandwidth {
// av.totalWrite += av.writeSeqNum
// av.writeSeqNum = 0
// av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
// av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
// }
// av.MessageLength = uint32(payloadLen)
// for !av.writing.CompareAndSwap(false, true) {
// runtime.Gosched()
// }
// defer av.writing.Store(false)
// // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// // 当Chunk Type为0时(即Chunk12),
// if !av.firstSent {
// av.firstSent = true
// av.SetTimestamp(absTime)
// av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
// } else {
// av.SetTimestamp(frame.DeltaTime)
// av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
// }
// //数据被覆盖导致序号变了
// if seq != frame.Sequence {
// return errors.New("sequence is not equal")
// }
// r := frame.AVCC.NewReader()
// chunk := net.Buffers{av.chunkHeader}
// av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
// for r.CanRead() {
// item := av.bytePool.Get(16)
// defer item.Recycle()
// av.WriteTo(RTMP_CHUNK_HEAD_1, &item.Value)
// // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
// chunk = append(chunk, item.Value)
// av.writeSeqNum += uint32(item.Value.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
// }
// _, err = chunk.WriteTo(av.Conn)
// return nil
// }
func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
// seq := frame.Sequence
payloadLen := frame.Length
if payloadLen == 0 {
err = errors.New("payload is empty")
// av.Error("payload is empty", zap.Error(err))
return err
}
if av.writeSeqNum > av.bandwidth {
av.totalWrite += av.writeSeqNum
av.writeSeqNum = 0
av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
av.MessageLength = uint32(payloadLen)
for !av.writing.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer av.writing.Store(false)
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12),
if av.lastAbs > 0 {
av.SetTimestamp(frame.Timestamp)
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
} else {
av.SetTimestamp(frame.Timestamp - av.lastAbs)
av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
}
av.lastAbs = frame.Timestamp
// //数据被覆盖导致序号变了
// if seq != frame.Sequence {
// return errors.New("sequence is not equal")
// }
r := frame.Buffers
chunk := net.Buffers{av.chunkHeader}
av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &chunk))
for r.Length > 0 {
item := util.Buffer(av.byte16Pool.Get(16))
defer av.byte16Pool.Put(item)
av.WriteTo(RTMP_CHUNK_HEAD_1, &item)
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
chunk = append(chunk, item)
av.writeSeqNum += uint32(item.Len() + r.WriteNTo(av.WriteChunkSize, &chunk))
}
_, err = chunk.WriteTo(av.Conn)
return err
}
// type RTMPSender struct {
// Subscriber
// NetStream
// audio, video AVSender
// }
type RTMPSender struct {
*m7s.Subscriber
NetStream
audio, video AVSender
}
// func (rtmp *RTMPSender) OnEvent(event any) {
// switch v := event.(type) {
// case SEwaitPublish:
// rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
// case SEpublish:
// rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
// case ISubscriber:
// rtmp.audio.RTMPSender = rtmp
// rtmp.video.RTMPSender = rtmp
// rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO
// rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO
// rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO
// rtmp.video.MessageTypeID = RTMP_MSG_VIDEO
// rtmp.audio.MessageStreamID = rtmp.StreamID
// rtmp.video.MessageStreamID = rtmp.StreamID
// case AudioDeConf:
// rtmp.audio.sendSequenceHead(v)
// case VideoDeConf:
// rtmp.video.sendSequenceHead(v)
// case AudioFrame:
// if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// case VideoFrame:
// if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// default:
// rtmp.Subscriber.OnEvent(event)
// }
// }
func (r *RTMPSender) Init() {
r.audio.RTMPSender = r
r.video.RTMPSender = r
r.audio.ChunkStreamID = RTMP_CSID_AUDIO
r.video.ChunkStreamID = RTMP_CSID_VIDEO
r.audio.MessageTypeID = RTMP_MSG_AUDIO
r.video.MessageTypeID = RTMP_MSG_VIDEO
r.audio.MessageStreamID = r.StreamID
r.video.MessageStreamID = r.StreamID
}
// func (r *RTMPSender) Response(tid uint64, code, level string) error {
// m := new(ResponsePlayMessage)
// m.CommandName = Response_OnStatus
// m.TransactionId = tid
// m.Infomation = map[string]any{
// "code": code,
// "level": level,
// "description": "",
// }
// m.StreamID = r.StreamID
// return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// }
// func (rtmp *RTMPSender) OnEvent(event any) {
// switch v := event.(type) {
// case SEwaitPublish:
// rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
// case SEpublish:
// rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
// case ISubscriber:
//
// case AudioDeConf:
// rtmp.audio.sendSequenceHead(v)
// case VideoDeConf:
// rtmp.video.sendSequenceHead(v)
// case AudioFrame:
// if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// case VideoFrame:
// if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// default:
// rtmp.Subscriber.OnEvent(event)
// }
// }
func (r *RTMPSender) SendAudio(audio *RTMPAudio) error {
return r.audio.sendFrame(&audio.RTMPData)
}
func (r *RTMPSender) SendVideo(video *RTMPVideo) error {
return r.video.sendFrame(&video.RTMPData)
}
func (r *RTMPSender) Response(tid uint64, code, level string) error {
m := new(ResponsePlayMessage)
m.CommandName = Response_OnStatus
m.TransactionId = tid
m.Infomation = map[string]any{
"code": code,
"level": level,
"description": "",
}
m.StreamID = r.StreamID
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
type RTMPReceiver struct {
*m7s.Publisher
@@ -140,11 +144,3 @@ func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
m.StreamID = r.StreamID
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
// r.WriteAudio(nil)
}
func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
r.WriteVideo(&RTMPVideo{msg.AVData})
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"io"
"log/slog"
"net"
"runtime"
"sync/atomic"
@@ -45,13 +46,12 @@ const (
)
type BytesPool struct {
*util.Pool[[]byte]
util.Pool[[]byte]
ItemSize int
}
func (bp *BytesPool) Get(size int) []byte {
if size != bp.ItemSize {
bp.Pool = nil
return make([]byte, size)
}
ret := bp.Pool.Get()
@@ -61,7 +61,16 @@ func (bp *BytesPool) Get(size int) []byte {
return ret
}
func (bp *BytesPool) Put(b []byte) {
if cap(b) != bp.ItemSize {
bp.ItemSize = cap(b)
bp.Clear()
}
bp.Pool.Put(b)
}
type NetConnection struct {
*slog.Logger `json:"-" yaml:"-"`
*bufio.Reader `json:"-" yaml:"-"`
net.Conn `json:"-" yaml:"-"`
bandwidth uint32
@@ -76,7 +85,8 @@ type NetConnection struct {
AppName string
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeader util.Buffer
bytePool BytesPool
byteChunkPool BytesPool
byte16Pool BytesPool
writing atomic.Bool // false 可写true 不可写
}
@@ -90,7 +100,6 @@ func NewNetConnection(conn net.Conn) *NetConnection {
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make(util.Buffer, 4),
chunkHeader: make(util.Buffer, 0, 16),
// bytePool: make(util.BytesPool, 17),
}
}
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
@@ -164,9 +173,9 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
if unRead := msgLen - chunk.AVData.Length; unRead < needRead {
needRead = unRead
}
mem := conn.bytePool.Get(needRead)
mem := conn.byteChunkPool.Get(needRead)
if n, err := conn.ReadFull(mem); err != nil {
conn.bytePool.Put(mem)
conn.byteChunkPool.Put(mem)
return nil, err
} else {
conn.readSeqNum += uint32(n)
@@ -326,8 +335,13 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
head.MessageStreamID = sid.GetStreamID()
}
head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader)
for _, chunk := range conn.tmpBuf.Split(conn.WriteChunkSize) {
conn.sendChunk(chunk)
for b := conn.tmpBuf; b.Len() > 0; {
if b.CanReadN(conn.WriteChunkSize) {
conn.sendChunk(b.ReadN(conn.WriteChunkSize))
} else {
conn.sendChunk(b)
break
}
}
return nil
}

View File

@@ -0,0 +1,10 @@
package pkg
type NetStream struct {
*NetConnection
StreamID uint32
}
func (ns *NetStream) Begin() {
ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID)
}

View File

@@ -1,182 +0,0 @@
package pkg
type NetStream struct {
*NetConnection
StreamID uint32
}
func (ns *NetStream) Begin() {
ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID)
}
// type RTMPSubscriber struct {
// RTMPSender
// }
// func (s *RTMPSubscriber) OnEvent(event any) {
// switch event.(type) {
// case engine.SEclose:
// s.Response(0, NetStream_Play_Stop, Level_Status)
// }
// s.RTMPSender.OnEvent(event)
// }
// func (config *RTMPConfig) ServeTCP(conn net.Conn) {
// defer conn.Close()
// logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String()))
// senders := make(map[uint32]*RTMPSubscriber)
// receivers := make(map[uint32]*RTMPReceiver)
// var err error
// logger.Info("conn")
// defer func() {
// ze := zap.Error(err)
// logger.Info("conn close", ze)
// for _, sender := range senders {
// sender.Stop(ze)
// }
// for _, receiver := range receivers {
// receiver.Stop(ze)
// }
// }()
// nc := NewNetConnection(conn)
// ctx, cancel := context.WithCancel(engine.Engine)
// defer cancel()
// /* Handshake */
// if err = nc.Handshake(); err != nil {
// logger.Error("handshake", zap.Error(err))
// return
// }
// var msg *Chunk
// var gstreamid uint32
// for {
// if msg, err = nc.RecvMessage(); err == nil {
// if msg.MessageLength <= 0 {
// continue
// }
// switch msg.MessageTypeID {
// case RTMP_MSG_AMF0_COMMAND:
// if msg.MsgData == nil {
// break
// }
// cmd := msg.MsgData.(Commander).GetCommand()
// logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
// switch cmd := msg.MsgData.(type) {
// case *CallMessage: //connect
// app := cmd.Object["app"] // 客户端要连接到的服务应用名
// objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
// switch v := objectEncoding.(type) {
// case float64:
// nc.objectEncoding = v
// default:
// nc.objectEncoding = 0
// }
// nc.appName = app.(string)
// logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
// err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
// nc.writeChunkSize = config.ChunkSize
// err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
// err = nc.SendMessage(RTMP_MSG_BANDWIDTH, &SetPeerBandwidthMessage{
// AcknowledgementWindowsize: uint32(512 << 10),
// LimitType: byte(2),
// })
// err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0)
// m := new(ResponseConnectMessage)
// m.CommandName = Response_Result
// m.TransactionId = 1
// m.Properties = map[string]any{
// "fmsVer": "monibuca/" + engine.Engine.Version,
// "capabilities": 31,
// "mode": 1,
// "Author": "dexter",
// }
// m.Infomation = map[string]any{
// "level": Level_Status,
// "code": NetConnection_Connect_Success,
// "objectEncoding": nc.objectEncoding,
// }
// err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// case *CommandMessage: // "createStream"
// gstreamid++
// logger.Info("createStream:", zap.Uint32("streamId", gstreamid))
// nc.ResponseCreateStream(cmd.TransactionId, gstreamid)
// case *CURDStreamMessage:
// if stream, ok := receivers[cmd.StreamId]; ok {
// stream.Stop()
// delete(senders, cmd.StreamId)
// }
// case *ReleaseStreamMessage:
// // m := &CommandMessage{
// // CommandName: "releaseStream_error",
// // TransactionId: cmd.TransactionId,
// // }
// // s := engine.Streams.Get(nc.appName + "/" + cmd.StreamName)
// // if s != nil && s.Publisher != nil {
// // if p, ok := s.Publisher.(*RTMPReceiver); ok {
// // // m.CommandName = "releaseStream_result"
// // p.Stop()
// // delete(receivers, p.StreamID)
// // }
// // }
// // err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// case *PublishMessage:
// receiver := &RTMPReceiver{
// NetStream: NetStream{
// NetConnection: nc,
// StreamID: cmd.StreamId,
// },
// }
// receiver.SetParentCtx(ctx)
// if !config.KeepAlive {
// receiver.SetIO(conn)
// }
// if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
// receivers[cmd.StreamId] = receiver
// receiver.Begin()
// err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status)
// } else {
// delete(receivers, cmd.StreamId)
// err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
// }
// case *PlayMessage:
// streamPath := nc.appName + "/" + cmd.StreamName
// sender := &RTMPSubscriber{}
// sender.NetStream = NetStream{
// nc,
// cmd.StreamId,
// }
// sender.SetParentCtx(ctx)
// if !config.KeepAlive {
// sender.SetIO(conn)
// }
// sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
// if RTMPPlugin.Subscribe(streamPath, sender) != nil {
// sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
// } else {
// senders[sender.StreamID] = sender
// sender.Begin()
// sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status)
// sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status)
// go sender.PlayRaw()
// }
// }
// case RTMP_MSG_AUDIO:
// if r, ok := receivers[msg.MessageStreamID]; ok {
// r.ReceiveAudio(msg)
// } else {
// logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID))
// }
// case RTMP_MSG_VIDEO:
// if r, ok := receivers[msg.MessageStreamID]; ok {
// r.ReceiveVideo(msg)
// } else {
// logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID))
// }
// }
// } else if err == io.EOF || err == io.ErrUnexpectedEOF {
// logger.Info("rtmp client closed")
// return
// } else {
// logger.Warn("ReadMessage", zap.Error(err))
// return
// }
// }
// }

View File

@@ -12,6 +12,10 @@ type RTMPVideo struct {
RTMPData
}
func (avcc *RTMPVideo) IsIDR() bool {
return avcc.Buffers.Buffers[0][0]&0b1111_0000>>4 == 1
}
func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
reader := avcc.Buffers
b0, err := reader.ReadByte()
@@ -32,6 +36,8 @@ func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
ctx.NalulenSize = int(info.LengthSizeMinusOne&3 + 1)
ctx.SPS = info.SequenceParameterSetNALUnit
ctx.PPS = info.PictureParameterSetNALUnit
avcc.IPool = nil
ctx.SequenceFrame = avcc
track.ICodecCtx = &ctx
}
case "h265":
@@ -72,7 +78,7 @@ func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
} else {
track.Codec = "h264"
}
reader.ReadBE(3) // cts == 0
_, err = reader.ReadBE(3) // cts == 0
if err != nil {
return err
}
@@ -117,7 +123,6 @@ func (avcc *RTMPVideo) parseAV1(track *AVTrack, reader *util.Buffers) (any, erro
return obus, nil
}
func (avcc *RTMPVideo) ToRaw(track *AVTrack) (any, error) {
reader := avcc.Buffers
b0, err := reader.ReadByte()

View File

@@ -14,7 +14,9 @@ type Publisher struct {
VideoTrack *AVTrack
AudioTrack *AVTrack
DataTrack *DataTrack
TransTrack map[reflect.Type]*AVTrack
Subscribers map[*Subscriber]struct{}
GOP int
sync.RWMutex
}
@@ -22,33 +24,32 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
p.Lock()
defer p.Unlock()
p.Subscribers[subscriber] = struct{}{}
if p.VideoTrack != nil {
subscriber.VideoTrackReader = NewAVRingReader(p.VideoTrack)
}
if p.AudioTrack != nil {
subscriber.AudioTrackReader = NewAVRingReader(p.AudioTrack)
}
subscriber.Publisher = p
return
}
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) (err error) {
if t.ICodecCtx == nil {
err = data.DecodeConfig(t)
}
t.Ring.Value.Wrap[0] = data
if n := len(t.DataTypes); n > 1 {
t.Ring.Value.Raw, err = data.ToRaw(t)
if err != nil {
return
}
if t.Ring.Value.Raw == nil {
return
}
for i := 1; i < n; i++ {
t.Ring.Value.Wrap[i] = reflect.New(t.DataTypes[i]).Interface().(IAVFrame)
t.Ring.Value.Wrap[i].FromRaw(t, t.Ring.Value.Raw)
}
return data.DecodeConfig(t)
}
t.Ring.Value.Wrap = data
// if n := len(t.DataTypes); n > 1 {
// t.Ring.Value.Raw, err = data.ToRaw(t)
// if err != nil {
// return
// }
// if t.Ring.Value.Raw == nil {
// return
// }
// for i := 1; i < n; i++ {
// if len(t.Ring.Value.Wrap) <= i {
// t.Ring.Value.Wrap = append(t.Ring.Value.Wrap, nil)
// }
// t.Ring.Value.Wrap[i] = reflect.New(t.DataTypes[i]).Interface().(IAVFrame)
// t.Ring.Value.Wrap[i].FromRaw(t, t.Ring.Value.Raw)
// }
// }
t.Step()
return
}
@@ -59,19 +60,26 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
}
t := p.VideoTrack
if t == nil {
t = &AVTrack{
DataTypes: []reflect.Type{reflect.TypeOf(data)},
}
t = &AVTrack{}
t.Logger = p.Logger.With("track", "video")
t.Init(256)
p.VideoTrack = t
p.Lock()
for sub := range p.Subscribers {
sub.VideoTrackReader = NewAVRingReader(t)
}
p.VideoTrack = t
p.TransTrack[reflect.TypeOf(data)] = t
p.Unlock()
}
return p.writeAV(t, data)
// if t.IDRing != nil {
// p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
// if t.HistoryRing == nil {
// t.Narrow(p.GOP)
// }
// }
cur := t.Ring
err = p.writeAV(t, data)
if err == nil && data.IsIDR() {
t.AddIDR(cur)
}
return
}
func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
@@ -80,16 +88,12 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
}
t := p.AudioTrack
if t == nil {
t = &AVTrack{
DataTypes: []reflect.Type{reflect.TypeOf(data)},
}
t = &AVTrack{}
t.Logger = p.Logger.With("track", "audio")
t.Init(256)
p.AudioTrack = t
p.Lock()
for sub := range p.Subscribers {
sub.AudioTrackReader = NewAVRingReader(t)
}
p.AudioTrack = t
p.TransTrack[reflect.TypeOf(data)] = t
p.Unlock()
}
return p.writeAV(t, data)
@@ -98,3 +102,21 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
func (p *Publisher) WriteData(data IDataFrame) (err error) {
return
}
func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) {
p.RLock()
defer p.RUnlock()
if t, ok := p.TransTrack[dataType]; ok {
return t
}
return
}
func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) {
p.RLock()
defer p.RUnlock()
if t, ok := p.TransTrack[dataType]; ok {
return t
}
return
}

View File

@@ -7,8 +7,8 @@ import (
"path/filepath"
"slices"
"strings"
"sync/atomic"
"time"
"unsafe"
"github.com/mcuadros/go-defaults"
"gopkg.in/yaml.v3"
@@ -18,10 +18,11 @@ import (
)
var Version = "v5.0.0"
var MergeConfigs = []string{"Publish", "Subscribe", "HTTP"}
var (
ExecPath = os.Args[0]
ExecDir = filepath.Dir(ExecPath)
MergeConfigs = []string{"Publish", "Subscribe", "HTTP"}
ExecPath = os.Args[0]
ExecDir = filepath.Dir(ExecPath)
serverIndexG atomic.Uint32
)
type Server struct {
@@ -36,7 +37,10 @@ type Server struct {
var DefaultServer = NewServer()
func NewServer() *Server {
return &Server{}
return &Server{
Publishers: make(map[string]*Publisher),
Waiting: make(map[string]*Subscriber),
}
}
func Run(ctx context.Context, conf any) error {
@@ -44,7 +48,7 @@ func Run(ctx context.Context, conf any) error {
}
func (s *Server) Run(ctx context.Context, conf any) (err error) {
s.Logger = slog.With("server", uintptr(unsafe.Pointer(s)))
s.Logger = slog.With("server", serverIndexG.Add(1))
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
s.config.HTTP.ListenAddrTLS = ":8443"
s.config.HTTP.ListenAddr = ":8080"
@@ -72,11 +76,15 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
}
}
defaults.SetDefaults(&s.Engine)
defaults.SetDefaults(&s.config)
s.Config.Parse(&s.config)
s.Config.Parse(&s.Engine, "GLOBAL")
if cg != nil {
s.Config.ParseUserFile(cg["global"])
}
var lv slog.LevelVar
lv.UnmarshalText([]byte(s.LogLevel))
slog.SetLogLoggerLevel(lv.Level())
s.initPlugins(cg)
pulse := time.NewTicker(s.PulseInterval)
for {
@@ -88,10 +96,10 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
case <-pulse.C:
case event := <-s.eventChan:
switch v := event.(type) {
case util.Promise[*Publisher]:
v.CancelCauseFunc(s.OnPublish(v.Value))
case util.Promise[*Subscriber]:
v.CancelCauseFunc(s.OnSubscribe(v.Value))
case *util.Promise[*Publisher]:
v.Fulfill(s.OnPublish(v.Value))
case *util.Promise[*Subscriber]:
v.Fulfill(s.OnSubscribe(v.Value))
}
for _, plugin := range s.Plugins {
if plugin.Disabled {
@@ -101,7 +109,6 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
}
}
}
return
}
func (s *Server) initPlugins(cg map[string]map[string]any) {

View File

@@ -1,24 +1,36 @@
package m7s
import (
"context"
"log/slog"
"net/url"
"reflect"
"strconv"
"time"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
type PubSubBase struct {
*slog.Logger `json:"-" yaml:"-"`
Plugin *Plugin
StartTime time.Time
StreamPath string
Args url.Values
*slog.Logger `json:"-" yaml:"-"`
context.Context `json:"-" yaml:"-"`
context.CancelCauseFunc `json:"-" yaml:"-"`
Plugin *Plugin
StartTime time.Time
StreamPath string
Args url.Values
}
func (ps *PubSubBase) Stop(err error) {
ps.Error(err.Error())
ps.CancelCauseFunc(err)
}
func (ps *PubSubBase) Init(p *Plugin, streamPath string) {
ps.Plugin = p
ps.Context, ps.CancelCauseFunc = context.WithCancelCause(p.Context)
if u, err := url.Parse(streamPath); err == nil {
ps.StreamPath, ps.Args = u.Path, u.Query()
}
@@ -29,6 +41,75 @@ func (ps *PubSubBase) Init(p *Plugin, streamPath string) {
type Subscriber struct {
PubSubBase
config.Subscribe
VideoTrackReader *AVRingReader
AudioTrackReader *AVRingReader
Publisher *Publisher
}
type ISubscriberHandler[T IAVFrame] func(data T)
func (s *Subscriber) Handle(audioHandler, videoHandler any) {
var ar, vr *AVRingReader
var ah, vh reflect.Value
if audioHandler != nil {
a1 := reflect.TypeOf(audioHandler).In(0)
at := s.Publisher.GetAudioTrack(a1)
if at != nil {
ar = NewAVRingReader(at)
ah = reflect.ValueOf(audioHandler)
}
}
if videoHandler != nil {
v1 := reflect.TypeOf(videoHandler).In(0)
vt := s.Publisher.GetVideoTrack(v1)
if vt != nil {
vr = NewAVRingReader(vt)
vh = reflect.ValueOf(videoHandler)
}
}
var initState = 0
var subMode = s.SubMode //订阅模式
if s.Args.Has(s.SubModeArgName) {
subMode, _ = strconv.Atoi(s.Args.Get(s.SubModeArgName))
}
var audioFrame, videoFrame *AVFrame
for err := s.Err(); err == nil; err = s.Err() {
if vr != nil {
for err == nil {
err = vr.ReadFrame(subMode)
if err == nil {
err = s.Err()
}
if err != nil {
s.Stop(err)
// stopReason = zap.Error(err)
return
}
videoFrame = &vr.Value
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if videoFrame.Wrap.IsIDR() && vr.DecConfChanged() {
vr.LastCodecCtx = vr.Track.ICodecCtx
s.Debug("video codec changed")
vh.Call([]reflect.Value{reflect.ValueOf(vr.Track.ICodecCtx.GetSequenceFrame())})
}
if ar != nil {
if audioFrame != nil {
if util.Conditoinal(s.SyncMode == 0, videoFrame.Timestamp > audioFrame.Timestamp, videoFrame.WriteTime.After(audioFrame.WriteTime)) {
// fmt.Println("switch audio", audioFrame.CanRead)
ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)})
audioFrame = nil
break
}
} else if initState++; initState >= 2 {
break
}
}
if !s.IFrameOnly || videoFrame.Wrap.IsIDR() {
vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
} else {
// fmt.Println("skip video", frame.Sequence)
}
}
}
}
}