This commit is contained in:
langhuihui
2024-03-22 15:43:09 +08:00
parent b2bce81e55
commit 0efbe886c8
29 changed files with 2179 additions and 172 deletions

1
go.mod
View File

@@ -5,6 +5,7 @@ go 1.22
require github.com/quic-go/quic-go v0.42.0
require (
github.com/bluenviron/mediacommon v1.9.2
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/logrusorgru/aurora/v4 v4.0.0

2
go.sum
View File

@@ -1,3 +1,5 @@
github.com/bluenviron/mediacommon v1.9.2 h1:EHcvoC5YMXRcFE010bTNf07ZiSlB/e/AdZyG7GsEYN0=
github.com/bluenviron/mediacommon v1.9.2/go.mod h1:lt8V+wMyPw8C69HAqDWV5tsAwzN9u2Z+ca8B6C//+n0=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

View File

@@ -1,32 +1,175 @@
package pkg
import (
"net"
"sync"
"sync/atomic"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type AVFrame struct {
DataFrame
}
type DataFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Timestamp time.Duration // 绝对时间戳
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
CanRead bool `json:"-" yaml:"-"` // 是否可读取
readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
Raw any `json:"-" yaml:"-"` // 裸格式
Wrap []IAVFrame `json:"-" yaml:"-"` // 封装格式
sync.Cond `json:"-" yaml:"-"`
}
func NewDataFrame[T any]() *DataFrame {
return &DataFrame{}
}
func (df *DataFrame) IsWriting() bool {
return !df.CanRead
}
func (df *DataFrame) IsDiscarded() bool {
return df.L == nil
}
func (df *DataFrame) Discard() int32 {
df.L = nil //标记为废弃
return df.readerCount.Load()
}
func (df *DataFrame) SetSequence(sequence uint32) {
df.Sequence = sequence
}
func (df *DataFrame) GetSequence() uint32 {
return df.Sequence
}
func (df *DataFrame) ReaderEnter() int32 {
return df.readerCount.Add(1)
}
func (df *DataFrame) ReaderCount() int32 {
return df.readerCount.Load()
}
func (df *DataFrame) ReaderLeave() int32 {
return df.readerCount.Add(-1)
}
func (df *DataFrame) StartWrite() bool {
if df.readerCount.Load() > 0 {
df.Discard() //标记为废弃
return false
} else {
df.CanRead = false //标记为正在写入
return true
}
}
func (df *DataFrame) Ready() {
df.WriteTime = time.Now()
df.CanRead = true //标记为可读取
df.Broadcast()
}
func (df *DataFrame) Init() {
df.L = EmptyLocker
}
func (df *DataFrame) Reset() {
df.BytesIn = 0
df.DeltaTime = 0
}
type CodecCtx struct {
}
type IRaw interface {
}
type IVideoData interface {
ToRaw(*CodecCtx) IRaw
FromRaw(*CodecCtx, IRaw)
func Update(ICodecCtx) {
}
type IAudioData interface {
ToRaw(*CodecCtx) IRaw
FromRaw(*CodecCtx, IRaw)
type VideoCodecCtx struct {
CodecCtx
NalulenSize int
SPSInfo codec.SPSInfo
SequenceData net.Buffers
}
type IData interface {
type AudioCodecCtx struct {
CodecCtx
}
type H264Nalu struct {
type ICodecCtx interface {
}
type IDataFrame interface {
}
type IAVFrame interface {
DecodeConfig(*AVTrack) error
ToRaw(*AVTrack) (any, error)
FromRaw(*AVTrack, any) error
}
func (nalu *H264Nalu) ToRaw(ctx *CodecCtx) IRaw {
return nalu
type Nalu [][]byte
type Nalus struct {
PTS time.Duration
DTS time.Duration
Nalus []Nalu
}
func (nalu *H264Nalu) FromRaw(ctx *CodecCtx, raw IRaw) {
*nalu = *raw.(*H264Nalu)
func (nalus *Nalus) Append(bytes ...[]byte) {
nalus.Nalus = append(nalus.Nalus, Nalu(bytes))
}
type H265Nalu struct {
func (nalus *Nalus) ParseAVCC(reader *util.Buffers, naluSizeLen int) error {
for reader.Length > 0 {
l, err := reader.ReadBE(naluSizeLen)
if err != nil {
return err
}
nalu, err := reader.ReadBytes(int(l))
if err != nil {
return err
}
nalus.Append(nalu)
}
return nil
}
type OBUs struct {
PTS time.Duration
OBUs []net.Buffers
}
func (obus *OBUs) Append(bytes ...[]byte) {
obus.OBUs = append(obus.OBUs, bytes)
}
func (obus *OBUs) ParseAVCC(reader *util.Buffers) error {
var obuHeader av1.OBUHeader
for reader.Length > 0 {
offset := reader.Offset
b, _ := reader.ReadByte()
obuHeader.Unmarshal([]byte{b})
// if log.Trace {
// vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame))
// }
obuSize, _, _ := reader.LEB128Unmarshal()
end := reader.Offset
size := end - offset + int(obuSize)
reader = &util.Buffers{Buffers: reader.Buffers}
reader.Skip(offset)
obu, err := reader.ReadBytes(size)
if err != nil {
return err
}
obus.Append(obu)
}
return nil
}

59
pkg/codec/video.go Normal file
View File

@@ -0,0 +1,59 @@
package codec
type SPSInfo struct {
ProfileIdc uint
LevelIdc uint
MbWidth uint
MbHeight uint
CropLeft uint
CropRight uint
CropTop uint
CropBottom uint
Width uint
Height uint
}
type AudioCodecID byte
type VideoCodecID byte
const (
ADTS_HEADER_SIZE = 7
CodecID_AAC AudioCodecID = 0xA
CodecID_PCMA AudioCodecID = 7
CodecID_PCMU AudioCodecID = 8
CodecID_OPUS AudioCodecID = 0xC
CodecID_H264 VideoCodecID = 7
CodecID_H265 VideoCodecID = 0xC
CodecID_AV1 VideoCodecID = 0xD
)
func (codecId AudioCodecID) String() string {
switch codecId {
case CodecID_AAC:
return "aac"
case CodecID_PCMA:
return "pcma"
case CodecID_PCMU:
return "pcmu"
case CodecID_OPUS:
return "opus"
}
return "unknow"
}
func (codecId VideoCodecID) String() string {
switch codecId {
case CodecID_H264:
return "h264"
case CodecID_H265:
return "h265"
case CodecID_AV1:
return "av1"
}
return "unknow"
}

View File

@@ -1,14 +1,9 @@
package config
import (
"context"
"crypto/tls"
"log/slog"
"net/http"
"time"
"github.com/logrusorgru/aurora/v4"
"golang.org/x/sync/errgroup"
)
var _ HTTPConfig = (*HTTP)(nil)
@@ -26,11 +21,12 @@ type HTTP struct {
WriteTimeout time.Duration `desc:"写入超时"`
IdleTimeout time.Duration `desc:"空闲超时"`
mux *http.ServeMux
server *http.Server
serverTLS *http.Server
middlewares []Middleware
}
type HTTPConfig interface {
GetHTTPConfig() *HTTP
Listen(ctx context.Context) error
Handle(string, http.Handler)
Handler(*http.Request) (http.Handler, string)
AddMiddleware(Middleware)
@@ -64,66 +60,57 @@ func (config *HTTP) Handler(r *http.Request) (h http.Handler, pattern string) {
return config.mux.Handler(r)
}
// ListenAddrs Listen http and https
func (config *HTTP) Listen(ctx context.Context) error {
if config.mux == nil {
return nil
func (config *HTTP) StopListen() {
if config.server != nil {
config.server.Close()
}
var g errgroup.Group
if config.ListenAddrTLS != "" && (config == &Global.HTTP || config.ListenAddrTLS != Global.ListenAddrTLS) {
g.Go(func() error {
slog.Info("🌐 https listen at ", "addr", aurora.Blink(config.ListenAddrTLS))
cer, _ := tls.X509KeyPair(LocalCert, LocalKey)
var server = http.Server{
Addr: config.ListenAddrTLS,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cer},
CipherSuites: []uint16{
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_CHACHA20_POLY1305_SHA256,
tls.TLS_AES_256_GCM_SHA384,
//tls.TLS_RSA_WITH_AES_128_CBC_SHA,
//tls.TLS_RSA_WITH_AES_256_CBC_SHA,
//tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
//tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
},
},
}
return server.ListenAndServeTLS(config.CertFile, config.KeyFile)
})
if config.serverTLS != nil {
config.serverTLS.Close()
}
if config.ListenAddr != "" && (config == &Global.HTTP || config.ListenAddr != Global.ListenAddr) {
g.Go(func() error {
slog.Info("🌐 http listen at ", "addr", aurora.Blink(config.ListenAddr))
var server = http.Server{
Addr: config.ListenAddr,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
}
return server.ListenAndServe()
})
}
g.Go(func() error {
<-ctx.Done()
return ctx.Err()
})
return g.Wait()
}
func (config *HTTP) ListenTLS() error {
cer, _ := tls.X509KeyPair(LocalCert, LocalKey)
config.serverTLS = &http.Server{
Addr: config.ListenAddrTLS,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cer},
CipherSuites: []uint16{
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_CHACHA20_POLY1305_SHA256,
tls.TLS_AES_256_GCM_SHA384,
//tls.TLS_RSA_WITH_AES_128_CBC_SHA,
//tls.TLS_RSA_WITH_AES_256_CBC_SHA,
//tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
//tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
},
},
}
return config.serverTLS.ListenAndServeTLS(config.CertFile, config.KeyFile)
}
func (config *HTTP) Listen() error {
config.server = &http.Server{
Addr: config.ListenAddr,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
}
return config.server.ListenAndServe()
}

View File

@@ -1,11 +1,9 @@
package config
import (
"context"
"crypto/tls"
_ "embed"
"net"
"runtime"
"time"
)
@@ -15,12 +13,6 @@ var LocalCert []byte
//go:embed local.monibuca.com.key
var LocalKey []byte
var _ TCPConfig = (*TCP)(nil)
type TCPConfig interface {
ListenTCP(context.Context, TCPPlugin) error
}
type TCP struct {
ListenAddr string `desc:"监听地址格式为ip:portip 可省略默认监听所有网卡"`
ListenAddrTLS string `desc:"监听地址格式为ip:portip 可省略默认监听所有网卡"`
@@ -28,14 +20,15 @@ type TCP struct {
KeyFile string `desc:"私钥文件"`
ListenNum int `desc:"同时并行监听数量0为CPU核心数量"` //同时并行监听数量0为CPU核心数量
NoDelay bool `desc:"是否禁用Nagle算法"` //是否禁用Nagle算法
KeepAlive bool `desc:"是否启用KeepAlive"` //是否启用KeepAlive
}
func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) {
func (tcp *TCP) Listen(l net.Listener, handler func(*net.TCPConn)) {
var tempDelay time.Duration
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if ne, ok := err.(net.Error); ok && !ne.Timeout() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
@@ -61,52 +54,6 @@ func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) {
tcpConn.SetNoDelay(false)
}
tempDelay = 0
go handler(conn)
go handler(tcpConn)
}
}
func (tcp *TCP) ListenTCP(ctx context.Context, plugin TCPPlugin) error {
l, err := net.Listen("tcp", tcp.ListenAddr)
if err != nil {
// if Global.LogLang == "zh" {
// slog.Fatalf("%s: 监听失败: %v", tcp.ListenAddr, err)
// } else {
// slog.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err)
// }
return err
}
count := tcp.ListenNum
if count == 0 {
count = runtime.NumCPU()
}
// slog.Infof("tcp listen %d at %s", count, tcp.ListenAddr)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
if tcp.ListenAddrTLS != "" {
keyPair, _ := tls.X509KeyPair(LocalCert, LocalKey)
if tcp.CertFile != "" || tcp.KeyFile != "" {
keyPair, err = tls.LoadX509KeyPair(tcp.CertFile, tcp.KeyFile)
}
if err != nil {
// slog.Error("LoadX509KeyPair", "error", err)
return err
}
l, err = tls.Listen("tcp", tcp.ListenAddrTLS, &tls.Config{
Certificates: []tls.Certificate{keyPair},
})
if err != nil {
// if Global.LogLang == "zh" {
// slog.Fatalf("%s: 监听失败: %v", tcp.ListenAddrTLS, err)
// } else {
// slog.Fatalf("%s: Listen error: %v", tcp.ListenAddrTLS, err)
// }
return err
}
// slog.Infof("tls tcp listen %d at %s", count, tcp.ListenAddrTLS)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
}
<-ctx.Done()
return l.Close()
}

View File

@@ -2,7 +2,6 @@ package config
import (
"fmt"
"net/http"
"regexp"
"strings"
"sync"
@@ -177,7 +176,6 @@ type Engine struct {
Publish
Subscribe
HTTP
Console
EnableAVCC bool `default:"true" desc:"启用AVCC格式rtmp、http-flv协议使用"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true" desc:"启用RTP格式rtsp、webrtc等协议使用"` //启用RTP格式rtsp、webrtc等协议使用
EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
@@ -190,12 +188,3 @@ type Engine struct {
RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度
PoolSize int `desc:"内存池大小"` //内存池大小
}
var Global *Engine
func (cfg *Engine) InitDefaultHttp() {
Global = cfg
cfg.HTTP.mux = http.NewServeMux()
cfg.HTTP.ListenAddrTLS = ":8443"
cfg.HTTP.ListenAddr = ":8080"
}

9
pkg/error.go Normal file
View File

@@ -0,0 +1,9 @@
package pkg
import "errors"
var (
ErrStreamExist = errors.New("stream exist")
ErrKick = errors.New("kick")
ErrDiscard = errors.New("discard")
)

View File

@@ -2,9 +2,10 @@ package pkg
// EventBus is a simple event bus
type EventBus chan any
// NewEventBus creates a new EventBus
func NewEventBus() EventBus {
return make(chan any)
func NewEventBus(size int) EventBus {
return make(chan any, size)
}
// // Publish publishes an event

141
pkg/reader.go Normal file
View File

@@ -0,0 +1,141 @@
package pkg
import (
"log/slog"
"time"
)
const (
READSTATE_INIT = iota
READSTATE_FIRST
READSTATE_NORMAL
)
const (
SUBMODE_REAL = iota
SUBMODE_NOJUMP
SUBMODE_BUFFER
)
type AVRingReader struct {
RingReader
mode int
Track *AVTrack
State byte
FirstSeq uint32
StartTs time.Duration
FirstTs time.Duration
SkipTs time.Duration //ms
beforeJump time.Duration
LastCodecCtx ICodecCtx
startTime time.Time
AbsTime uint32
Delay uint32
*slog.Logger
}
func (r *AVRingReader) DecConfChanged() bool {
return r.LastCodecCtx != r.Track.ICodecCtx
}
func NewAVRingReader(t *AVTrack) *AVRingReader {
// t.Debug("reader +1", zap.Int32("count", t.ReaderCount.Add(1)))
return &AVRingReader{
Track: t,
}
}
func (r *AVRingReader) readFrame() (err error) {
err = r.ReadNext()
if err != nil {
return err
}
// 超过一半的缓冲区大小说明Reader太慢需要丢帧
if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence {
// r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence))
return r.Read(r.Track.IDRing)
}
return
}
func (r *AVRingReader) ReadFrame(mode int) (err error) {
r.mode = mode
switch r.State {
case READSTATE_INIT:
// r.Info("start read", zap.Int("mode", mode))
startRing := r.Track.Ring
if r.Track.IDRing != nil {
startRing = r.Track.IDRing
} else {
// r.Warn("no IDRring")
}
switch mode {
case SUBMODE_REAL:
if r.Track.IDRing != nil {
r.State = READSTATE_FIRST
} else {
r.State = READSTATE_NORMAL
}
case SUBMODE_NOJUMP:
r.State = READSTATE_NORMAL
case SUBMODE_BUFFER:
if r.Track.HistoryRing != nil {
startRing = r.Track.HistoryRing
}
r.State = READSTATE_NORMAL
}
if err = r.StartRead(startRing); err != nil {
return
}
r.startTime = time.Now()
if r.FirstTs == 0 {
r.FirstTs = r.Value.Timestamp
}
r.SkipTs = r.FirstTs - r.StartTs
r.FirstSeq = r.Value.Sequence
// r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
case READSTATE_FIRST:
if r.Track.IDRing.Value.Sequence != r.FirstSeq {
if err = r.Read(r.Track.IDRing); err != nil {
return
}
r.SkipTs = r.Value.Timestamp - r.beforeJump - r.StartTs - 10*time.Millisecond
// r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs))
r.State = READSTATE_NORMAL
} else {
if err = r.readFrame(); err != nil {
return
}
r.beforeJump = r.Value.Timestamp - r.FirstTs
// 防止过快消费
if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second {
time.Sleep(fast)
}
}
case READSTATE_NORMAL:
if err = r.readFrame(); err != nil {
return
}
}
r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds())
if r.AbsTime == 0 {
r.AbsTime = 1
}
// r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
// fmt.Println(r.Track.Name, r.Delay)
// fmt.Println(r.Track.Name, r.Value.Sequence, r.Delay, r.AbsTime)
return
}
// func (r *AVRingReader) GetPTS32() uint32 {
// return uint32((r.Value.Raw.PTS - r.SkipTs*90/time.Millisecond))
// }
// func (r *AVRingReader) GetDTS32() uint32 {
// return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond))
// }
func (r *AVRingReader) ResetAbsTime() {
r.SkipTs = r.Value.Timestamp
r.AbsTime = 1
}

64
pkg/ring-reader.go Normal file
View File

@@ -0,0 +1,64 @@
package pkg
import "m7s.live/m7s/v5/pkg/util"
type RingReader struct {
*util.Ring[AVFrame]
Count int // 读取的帧数
}
func (r *RingReader) StartRead(ring *util.Ring[AVFrame]) (err error) {
r.Ring = ring
if r.Value.IsDiscarded() {
return ErrDiscard
}
if r.Value.IsWriting() {
// t := time.Now()
r.Value.Wait()
// log.Info("wait", time.Since(t))
}
r.Count++
r.Value.ReaderEnter()
return
}
func (r *RingReader) TryRead() (f *AVFrame, err error) {
if r.Count > 0 {
preValue := &r.Value
if preValue.IsDiscarded() {
preValue.ReaderLeave()
err = ErrDiscard
return
}
if r.Next().Value.IsWriting() {
return
}
defer preValue.ReaderLeave()
r.Ring = r.Next()
} else {
if r.Value.IsWriting() {
return
}
}
if r.Value.IsDiscarded() {
err = ErrDiscard
return
}
r.Count++
f = &r.Value
r.Value.ReaderEnter()
return
}
func (r *RingReader) ReadNext() (err error) {
return r.Read(r.Next())
}
func (r *RingReader) Read(ring *util.Ring[AVFrame]) (err error) {
preValue := &r.Value
defer preValue.ReaderLeave()
if preValue.IsDiscarded() {
return ErrDiscard
}
return r.StartRead(ring)
}

116
pkg/ring-writer.go Normal file
View File

@@ -0,0 +1,116 @@
package pkg
import (
"sync/atomic"
"m7s.live/m7s/v5/pkg/util"
)
type emptyLocker struct{}
func (emptyLocker) Lock() {}
func (emptyLocker) Unlock() {}
var EmptyLocker emptyLocker
// type IDataFrame interface {
// Reset() // 重置数据,复用内存
// Ready() // 标记为可读取
// ReaderEnter() int32 // 读取者数量+1
// ReaderLeave() int32 // 读取者数量-1
// StartWrite() bool // 开始写入
// SetSequence(uint32) // 设置序号
// GetSequence() uint32 // 获取序号
// ReaderCount() int32 // 读取者数量
// Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧剥离RingBuffer防止并发读写
// IsDiscarded() bool // 是否已废弃
// IsWriting() bool // 是否正在写入
// Wait() // 阻塞等待可读取
// Broadcast() // 广播可读取
// }
type RingWriter struct {
*util.Ring[AVFrame] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *util.Ring[AVFrame]
poolSize int
Size int
LastValue *AVFrame
}
func (rb *RingWriter) Init(n int) *RingWriter {
rb.Ring = util.NewRing[AVFrame](n)
rb.Size = n
rb.LastValue = &rb.Value
return rb
}
func (rb *RingWriter) Glow(size int) (newItem *util.Ring[AVFrame]) {
if size < rb.poolSize {
newItem = rb.pool.Unlink(size)
rb.poolSize -= size
} else if size == rb.poolSize {
newItem = rb.pool
rb.poolSize = 0
rb.pool = nil
} else {
newItem = util.NewRing[AVFrame](size - rb.poolSize).Link(rb.pool)
rb.poolSize = 0
rb.pool = nil
}
rb.Link(newItem)
rb.Size += size
return
}
func (rb *RingWriter) Recycle(r *util.Ring[AVFrame]) {
rb.poolSize++
r.Value.Reset()
if rb.pool == nil {
rb.pool = r
} else {
rb.pool.Link(r)
}
}
func (rb *RingWriter) Reduce(size int) {
r := rb.Unlink(size)
if size > 1 {
for p := r.Next(); p != r; {
next := p.Next() //先保存下一个节点
if p.Value.Discard() == 0 {
rb.Recycle(p.Prev().Unlink(1))
} else {
// fmt.Println("Reduce", p.Value.ReaderCount())
}
p = next
}
}
if r.Value.Discard() == 0 {
rb.Recycle(r)
}
rb.Size -= size
return
}
func (rb *RingWriter) Step() (normal bool) {
rb.LastValue.Broadcast() // 防止订阅者还在等待
rb.LastValue = &rb.Value
nextSeq := rb.LastValue.GetSequence() + 1
next := rb.Next()
if normal = next.Value.StartWrite(); normal {
next.Value.Reset()
rb.Ring = next
} else {
rb.Reduce(1) //抛弃还有订阅者的节点
rb.Ring = rb.Glow(1) //补充一个新节点
rb.Value.StartWrite()
}
rb.Value.SetSequence(nextSeq)
rb.LastValue.Ready()
return
}
func (rb *RingWriter) GetReaderCount() int32 {
return rb.ReaderCount.Load()
}

45
pkg/track.go Normal file
View File

@@ -0,0 +1,45 @@
package pkg
import (
"log/slog"
"reflect"
"slices"
"m7s.live/m7s/v5/pkg/util"
)
type Track struct {
*slog.Logger `json:"-" yaml:"-"`
}
type DataTrack struct {
Track
}
type IDRingList struct {
IDRList []*util.Ring[AVFrame]
IDRing *util.Ring[AVFrame]
HistoryRing *util.Ring[AVFrame]
}
func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) {
p.IDRList = append(p.IDRList, IDRing)
p.IDRing = IDRing
}
func (p *IDRingList) ShiftIDR() {
p.IDRList = slices.Delete(p.IDRList, 0, 1)
p.HistoryRing = p.IDRList[0]
}
type AVTrack struct {
Codec string
Track
RingWriter
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
ICodecCtx
SSRC uint32
SampleRate uint32
DataTypes []reflect.Type `json:"-" yaml:"-"`
PayloadType byte
}

302
pkg/util/buffer.go Normal file
View File

@@ -0,0 +1,302 @@
package util
import (
"encoding/binary"
"fmt"
"io"
"math"
"net"
)
type Integer interface {
~int | ~int16 | ~int32 | ~int64 | ~uint | ~uint16 | ~uint32 | ~uint64
}
func PutBE[T Integer](b []byte, num T) []byte {
for i, n := 0, len(b); i < n; i++ {
b[i] = byte(num >> ((n - i - 1) << 3))
}
return b
}
func ReadBE[T Integer](b []byte) (num T) {
num = 0
for i, n := 0, len(b); i < n; i++ {
num += T(b[i]) << ((n - i - 1) << 3)
}
return
}
func GetBE[T Integer](b []byte, num *T) T {
*num = 0
for i, n := 0, len(b); i < n; i++ {
*num += T(b[i]) << ((n - i - 1) << 3)
}
return *num
}
// Buffer 用于方便自动扩容的内存写入,已经读取
type Buffer []byte
// ReuseBuffer 重用buffer内容可能会被覆盖要尽早复制
type ReuseBuffer struct {
Buffer
}
func (ReuseBuffer) Reuse() bool {
return true
}
// LimitBuffer 限制buffer的长度不会改变原来的buffer防止内存泄漏
type LimitBuffer struct {
Buffer
}
func (b *LimitBuffer) ReadN(n int) (result LimitBuffer) {
result.Buffer = b.Buffer.ReadN(n)
return
}
func (b LimitBuffer) Clone() (result LimitBuffer) {
result.Buffer = b.Buffer.Clone()
return
}
func (b LimitBuffer) SubBuf(start int, length int) (result LimitBuffer) {
result.Buffer = b.Buffer.SubBuf(start, length)
return
}
func (b *LimitBuffer) Malloc(count int) (result LimitBuffer) {
l := b.Len()
newL := l + count
if c := b.Cap(); newL > c {
panic(fmt.Sprintf("LimitBuffer Malloc %d > %d", newL, c))
} else {
*b = b.SubBuf(0, newL)
}
return b.SubBuf(l, count)
}
func (b *LimitBuffer) Write(a []byte) (n int, err error) {
l := b.Len()
newL := l + len(a)
if c := b.Cap(); newL > c {
return 0, fmt.Errorf("LimitBuffer Write %d > %d", newL, c)
// panic(fmt.Sprintf("LimitBuffer Write %d > %d", newL, c))
} else {
b.Buffer = b.Buffer.SubBuf(0, newL)
copy(b.Buffer[l:], a)
}
return len(a), nil
}
// IBytes 用于区分传入的内存是否是复用内存,例如从网络中读取的数据,如果是复用内存,需要尽早复制
type IBytes interface {
Len() int
Bytes() []byte
Reuse() bool
}
type IBuffer interface {
Len() int
Bytes() []byte
Reuse() bool
SubBuf(start int, length int) Buffer
Malloc(count int) Buffer
Reset()
WriteUint32(v uint32)
WriteUint24(v uint32)
WriteUint16(v uint16)
WriteFloat64(v float64)
WriteByte(v byte)
WriteString(a string)
Write(a []byte) (n int, err error)
ReadN(n int) Buffer
ReadFloat64() float64
ReadUint64() uint64
ReadUint32() uint32
ReadUint24() uint32
ReadUint16() uint16
ReadByte() byte
Read(buf []byte) (n int, err error)
Clone() Buffer
CanRead() bool
CanReadN(n int) bool
Cap() int
}
func (Buffer) Reuse() bool {
return false
}
func (b *Buffer) Read(buf []byte) (n int, err error) {
if !b.CanReadN(len(buf)) {
copy(buf, *b)
return b.Len(), io.EOF
}
ret := b.ReadN(len(buf))
copy(buf, ret)
return len(ret), err
}
func (b *Buffer) ReadN(n int) Buffer {
l := b.Len()
r := (*b)[:n]
*b = (*b)[n:l]
return r
}
func (b *Buffer) ReadFloat64() float64 {
return math.Float64frombits(b.ReadUint64())
}
func (b *Buffer) ReadUint64() uint64 {
return binary.BigEndian.Uint64(b.ReadN(8))
}
func (b *Buffer) ReadUint32() uint32 {
return binary.BigEndian.Uint32(b.ReadN(4))
}
func (b *Buffer) ReadUint24() uint32 {
return ReadBE[uint32](b.ReadN(3))
}
func (b *Buffer) ReadUint16() uint16 {
return binary.BigEndian.Uint16(b.ReadN(2))
}
func (b *Buffer) ReadByte() byte {
return b.ReadN(1)[0]
}
func (b *Buffer) WriteFloat64(v float64) {
PutBE(b.Malloc(8), math.Float64bits(v))
}
func (b *Buffer) WriteUint32(v uint32) {
binary.BigEndian.PutUint32(b.Malloc(4), v)
}
func (b *Buffer) WriteUint24(v uint32) {
PutBE(b.Malloc(3), v)
}
func (b *Buffer) WriteUint16(v uint16) {
binary.BigEndian.PutUint16(b.Malloc(2), v)
}
func (b *Buffer) WriteByte(v byte) {
b.Malloc(1)[0] = v
}
func (b *Buffer) WriteString(a string) {
*b = append(*b, a...)
}
func (b *Buffer) Write(a []byte) (n int, err error) {
l := b.Len()
newL := l + len(a)
if newL > b.Cap() {
*b = append(*b, a...)
} else {
*b = b.SubBuf(0, newL)
copy((*b)[l:], a)
}
return len(a), nil
}
func (b Buffer) Clone() (result Buffer) {
return append(result, b...)
}
func (b Buffer) Bytes() []byte {
return b
}
func (b Buffer) Len() int {
return len(b)
}
func (b Buffer) CanRead() bool {
return b.CanReadN(1)
}
func (b Buffer) CanReadN(n int) bool {
return b.Len() >= n
}
func (b Buffer) Cap() int {
return cap(b)
}
func (b Buffer) SubBuf(start int, length int) Buffer {
return b[start : start+length]
}
// Malloc 扩大原来的buffer的长度返回新增的buffer
func (b *Buffer) Malloc(count int) Buffer {
l := b.Len()
newL := l + count
if newL > b.Cap() {
n := make(Buffer, newL)
copy(n, *b)
*b = n
} else {
*b = b.SubBuf(0, newL)
}
return b.SubBuf(l, count)
}
// Relloc 改变 buffer 到指定大小
func (b *Buffer) Relloc(count int) {
b.Reset()
b.Malloc(count)
}
func (b *Buffer) Reset() {
*b = b.SubBuf(0, 0)
}
func (b *Buffer) Split(n int) (result net.Buffers) {
origin := *b
for {
if b.CanReadN(n) {
result = append(result, b.ReadN(n))
} else {
result = append(result, *b)
*b = origin
return
}
}
}
// ConcatBuffers 合并碎片内存为一个完整内存
func ConcatBuffers[T ~[]byte](input []T) (out []byte) {
for _, v := range input {
out = append(out, v...)
}
return
}
// SizeOfBuffers 计算Buffers的内容长度
func SizeOfBuffers[T ~[]byte](buf []T) (size int) {
for _, b := range buf {
size += len(b)
}
return
}
// SplitBuffers 按照一定大小分割 Buffers
func SplitBuffers[T ~[]byte](buf []T, size int) (result [][]T) {
buf = append([]T(nil), buf...)
for total := SizeOfBuffers(buf); total > 0; {
if total <= size {
return append(result, buf)
} else {
var before []T
sizeOfBefore := 0
for _, b := range buf {
need := size - sizeOfBefore
if lenOfB := len(b); lenOfB > need {
before = append(before, b[:need])
result = append(result, before)
total -= need
buf[0] = b[need:]
break
} else {
sizeOfBefore += lenOfB
before = append(before, b)
total -= lenOfB
buf = buf[1:]
}
}
}
}
return
}

18
pkg/util/buffer_test.go Normal file
View File

@@ -0,0 +1,18 @@
package util
import (
"testing"
)
func TestBuffer(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var b Buffer
t.Log(b == nil)
b.Write([]byte{1, 2, 3})
if b == nil {
t.Fail()
} else {
t.Logf("b:% x", b)
}
})
}

131
pkg/util/buffers.go Normal file
View File

@@ -0,0 +1,131 @@
package util
import (
"io"
"net"
)
type Buffers struct {
Offset int
offset0 int
offset1 int
Length int
net.Buffers
}
func NewBuffersFromBytes(b ...[]byte) *Buffers {
return NewBuffers(net.Buffers(b))
}
func NewBuffers(buffers net.Buffers) *Buffers {
ret := &Buffers{Buffers: buffers}
for _, level0 := range buffers {
ret.Length += len(level0)
}
return ret
}
func (buffers *Buffers) ReadFromBytes(b ...[]byte) {
buffers.Buffers = append(buffers.Buffers, b...)
for _, level0 := range b {
buffers.Length += len(level0)
}
}
func (buffers *Buffers) ReadByteTo(b ...*byte) (err error) {
for i := range b {
if buffers.Length == 0 {
return io.EOF
}
*b[i], err = buffers.ReadByte()
if err != nil {
return
}
}
return
}
func (buffers *Buffers) ReadByteMask(mask byte) (byte, error) {
b, err := buffers.ReadByte()
if err != nil {
return 0, err
}
return b & mask, nil
}
func (buffers *Buffers) ReadByte() (byte, error) {
if buffers.Length == 0 {
return 0, io.EOF
}
level0 := buffers.Buffers[buffers.offset0]
b := level0[buffers.offset1]
buffers.offset1++
buffers.Length--
buffers.Offset++
if buffers.offset1 >= len(level0) {
buffers.offset0++
buffers.offset1 = 0
}
return b, nil
}
func (buffers *Buffers) LEB128Unmarshal() (uint, int, error) {
v := uint(0)
n := 0
for i := 0; i < 8; i++ {
b, err := buffers.ReadByte()
if err != nil {
return 0, 0, err
}
v |= (uint(b&0b01111111) << (i * 7))
n++
if (b & 0b10000000) == 0 {
break
}
}
return v, n, nil
}
func (buffers *Buffers) Skip(n int) error {
if n > buffers.Length {
return io.EOF
}
buffers.Length -= n
buffers.Offset += n
for n > 0 {
level0 := buffers.Buffers[buffers.offset0]
level1 := level0[buffers.offset1:]
if n < len(level1) {
buffers.offset1 += n
break
}
n -= len(level1)
buffers.offset0++
buffers.offset1 = 0
}
return nil
}
func (buffers *Buffers) ReadBytes(n int) ([]byte, error) {
if n > buffers.Length {
return nil, io.EOF
}
b := make([]byte, n)
buffers.Read(b)
buffers.Length -= n
return b, nil
}
func (buffers *Buffers) ReadBE(n int) (num int, err error) {
for i := range n {
b, err := buffers.ReadByte()
if err != nil {
return -1, err
}
num += int(b) << ((n - i - 1) << 3)
}
return
}

65
pkg/util/golomb_reader.go Normal file
View File

@@ -0,0 +1,65 @@
package util
import (
"io"
)
type GolombBitReader struct {
R io.Reader
buf [1]byte
left byte
}
func (r *GolombBitReader) ReadBit() (res uint, err error) {
if r.left == 0 {
if _, err = r.R.Read(r.buf[:]); err != nil {
return
}
r.left = 8
}
r.left--
res = uint(r.buf[0]>>r.left) & 1
return
}
func (r *GolombBitReader) ReadBits(n int) (res uint, err error) {
for i := 0; i < n; i++ {
var bit uint
if bit, err = r.ReadBit(); err != nil {
return
}
res |= bit << uint(n-i-1)
}
return
}
func (r *GolombBitReader) ReadExponentialGolombCode() (res uint, err error) {
i := 0
for {
var bit uint
if bit, err = r.ReadBit(); err != nil {
return
}
if !(bit == 0 && i < 32) {
break
}
i++
}
if res, err = r.ReadBits(i); err != nil {
return
}
res += (1 << uint(i)) - 1
return
}
func (r *GolombBitReader) ReadSE() (res uint, err error) {
if res, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if res&0x01 != 0 {
res = (res + 1) / 2
} else {
res = -res / 2
}
return
}

19
pkg/util/pool.go Normal file
View File

@@ -0,0 +1,19 @@
package util
type Pool[T any] struct {
pool []*T
}
func (p *Pool[T]) Get() *T {
l := len(p.pool)
if l == 0 {
return new(T)
}
t := p.pool[l-1]
p.pool = p.pool[:l-1]
return t
}
func (p *Pool[T]) Put(t *T) {
p.pool = append(p.pool, t)
}

15
pkg/util/promise.go Normal file
View File

@@ -0,0 +1,15 @@
package util
import "context"
type Promise[T any] struct {
context.Context
context.CancelCauseFunc
Value T
}
func NewPromise[T any](v T) *Promise[T] {
p := &Promise[T]{Value: v}
p.Context, p.CancelCauseFunc = context.WithCancelCause(context.Background())
return p
}

136
pkg/util/ring.go Normal file
View File

@@ -0,0 +1,136 @@
package util
// A Ring is an element of a circular list, or ring.
// Rings do not have a beginning or end; a pointer to any ring element
// serves as reference to the entire ring. Empty rings are represented
// as nil Ring pointers. The zero value for a Ring is a one-element
// ring with a nil Value.
//
type Ring[T any] struct {
next, prev *Ring[T]
Value T // for use by client; untouched by this library
}
func (r *Ring[T]) init() *Ring[T] {
r.next = r
r.prev = r
return r
}
// Next returns the next ring element. r must not be empty.
func (r *Ring[T]) Next() *Ring[T] {
if r.next == nil {
return r.init()
}
return r.next
}
// Prev returns the previous ring element. r must not be empty.
func (r *Ring[T]) Prev() *Ring[T] {
if r.next == nil {
return r.init()
}
return r.prev
}
// Move moves n % r.Len() elements backward (n < 0) or forward (n >= 0)
// in the ring and returns that ring element. r must not be empty.
//
func (r *Ring[T]) Move(n int) *Ring[T] {
if r.next == nil {
return r.init()
}
switch {
case n < 0:
for ; n < 0; n++ {
r = r.prev
}
case n > 0:
for ; n > 0; n-- {
r = r.next
}
}
return r
}
// New creates a ring of n elements.
func NewRing[T any](n int) *Ring[T] {
if n <= 0 {
return nil
}
r := new(Ring[T])
p := r
for i := 1; i < n; i++ {
p.next = &Ring[T]{prev: p}
p = p.next
}
p.next = r
r.prev = p
return r
}
// Link connects ring r with ring s such that r.Next()
// becomes s and returns the original value for r.Next().
// r must not be empty.
//
// If r and s point to the same ring, linking
// them removes the elements between r and s from the ring.
// The removed elements form a subring and the result is a
// reference to that subring (if no elements were removed,
// the result is still the original value for r.Next(),
// and not nil).
//
// If r and s point to different rings, linking
// them creates a single ring with the elements of s inserted
// after r. The result points to the element following the
// last element of s after insertion.
//
func (r *Ring[T]) Link(s *Ring[T]) *Ring[T] {
n := r.Next()
if s != nil {
p := s.Prev()
// Note: Cannot use multiple assignment because
// evaluation order of LHS is not specified.
r.next = s
s.prev = r
n.prev = p
p.next = n
}
return n
}
// Unlink removes n % r.Len() elements from the ring r, starting
// at r.Next(). If n % r.Len() == 0, r remains unchanged.
// The result is the removed subring. r must not be empty.
//
func (r *Ring[T]) Unlink(n int) *Ring[T] {
if n <= 0 {
return nil
}
return r.Link(r.Move(n + 1))
}
// Len computes the number of elements in ring r.
// It executes in time proportional to the number of elements.
//
func (r *Ring[T]) Len() int {
n := 0
if r != nil {
n = 1
for p := r.Next(); p != r; p = p.next {
n++
}
}
return n
}
// Do calls function f on each element of the ring, in forward order.
// The behavior of Do is undefined if f changes *r.
func (r *Ring[T]) Do(f func(T)) {
if r != nil {
f(r.Value)
for p := r.Next(); p != r; p = p.next {
f(p.Value)
}
}
}

109
plugin.go
View File

@@ -2,13 +2,18 @@ package m7s
import (
"context"
"crypto/tls"
"log/slog"
"net"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"github.com/logrusorgru/aurora/v4"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
type PluginMeta struct {
@@ -22,6 +27,14 @@ type IPlugin interface {
OnEvent(any)
}
type IPublishPlugin interface {
OnStopPublish(*Publisher, error)
}
type ITCPPlugin interface {
OnTCPConnect(*net.TCPConn)
}
var plugins []PluginMeta
func InstallPlugin[C IPlugin](options ...any) error {
@@ -44,6 +57,13 @@ func InstallPlugin[C IPlugin](options ...any) error {
return nil
}
func sendPromiseToServer[T any](server *Server, value T) error {
promise := util.NewPromise(value)
server.EventBus <- promise
<-promise.Done()
return context.Cause(promise.Context)
}
type Plugin struct {
Disabled bool
Meta *PluginMeta
@@ -62,13 +82,96 @@ type Plugin struct {
*slog.Logger
handler IPlugin
server *Server
sync.RWMutex
}
func (p *Plugin) PostMessage(message any) {
p.server.EventBus <- message
func (p *Plugin) OnInit() {
var err error
httpConf := p.Config.HTTP
defer httpConf.StopListen()
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.server.Config.ListenAddrTLS) {
go func() {
p.Info("https listen at ", "addr", aurora.Blink(httpConf.ListenAddrTLS))
p.CancelCauseFunc(httpConf.ListenTLS())
}()
}
if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.server.Config.ListenAddr) {
go func() {
p.Info("http listen at ", "addr", aurora.Blink(httpConf.ListenAddr))
p.CancelCauseFunc(httpConf.Listen())
}()
}
tcpConf := p.Config.TCP
tcphandler, ok := p.handler.(ITCPPlugin)
if !ok {
tcphandler = p
}
count := p.Config.TCP.ListenNum
if count == 0 {
count = runtime.NumCPU()
}
if p.Config.TCP.ListenAddr != "" {
l, err := net.Listen("tcp", tcpConf.ListenAddr)
if err != nil {
p.Error("tcp listen error", "addr", tcpConf.ListenAddr, "error", err)
p.CancelCauseFunc(err)
return
}
defer l.Close()
p.Info("tcp listen at ", "addr", aurora.Blink(tcpConf.ListenAddr))
for i := 0; i < count; i++ {
go tcpConf.Listen(l, tcphandler.OnTCPConnect)
}
}
if tcpConf.ListenAddrTLS != "" {
keyPair, _ := tls.X509KeyPair(config.LocalCert, config.LocalKey)
if tcpConf.CertFile != "" || tcpConf.KeyFile != "" {
keyPair, err = tls.LoadX509KeyPair(tcpConf.CertFile, tcpConf.KeyFile)
}
if err != nil {
p.Error("LoadX509KeyPair", "error", err)
p.CancelCauseFunc(err)
return
}
l, err := tls.Listen("tcp", tcpConf.ListenAddrTLS, &tls.Config{
Certificates: []tls.Certificate{keyPair},
})
if err != nil {
p.Error("tls tcp listen error", "addr", tcpConf.ListenAddrTLS, "error", err)
p.CancelCauseFunc(err)
return
}
defer l.Close()
p.Info("tls tcp listen at ", "addr", aurora.Blink(tcpConf.ListenAddrTLS))
for i := 0; i < count; i++ {
go tcpConf.Listen(l, tcphandler.OnTCPConnect)
}
}
select {
case <-p.Done():
return
}
}
func (p *Plugin) OnEvent(event any) {
}
func (p *Plugin) OnTCPConnect(conn *net.TCPConn) {
p.handler.OnEvent(conn)
}
func (p *Plugin) Publish(streamPath string) (publisher *Publisher, err error) {
publisher = &Publisher{Plugin: p, Config: p.Config.Publish, Logger: p.With("streamPath", streamPath)}
publisher = &Publisher{Publish: p.Config.Publish}
publisher.Init(p, streamPath)
publisher.Subscribers = make(map[*Subscriber]struct{})
err = sendPromiseToServer(p.server, publisher)
return
}
func (p *Plugin) Subscribe(streamPath string) (subscriber *Subscriber, err error) {
subscriber = &Subscriber{Subscribe: p.Config.Subscribe}
subscriber.Init(p, streamPath)
err = sendPromiseToServer(p.server, subscriber)
return
}

View File

@@ -1,8 +1,10 @@
package demo
import (
"net"
"m7s.live/m7s/v5"
. "m7s.live/m7s/v5/pkg"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type DemoPlugin struct {
@@ -12,13 +14,16 @@ type DemoPlugin struct {
func (p *DemoPlugin) OnInit() {
puber, err := p.Publish("live/demo")
if err != nil {
panic(err)
return
}
puber.WriteVideo(&H264Nalu{})
puber.WriteVideo(&rtmp.RTMPVideo{
Timestamp: 0,
Buffers: net.Buffers{[]byte{0x17, 0x00, 0x67, 0x42, 0x00, 0x0a, 0x8f, 0x14, 0x01, 0x00, 0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x68, 0xce, 0x3c, 0x80}},
})
}
func (p *DemoPlugin) OnEvent(event any) {
// ...
func (p *DemoPlugin) OnStopPublish(puber *m7s.Publisher, err error) {
}
var _ = m7s.InstallPlugin[*DemoPlugin]()

View File

@@ -1 +1,21 @@
package rtmp
package rtmp
import "m7s.live/m7s/v5"
type RTMPPlugin struct {
m7s.Plugin
}
func (p *RTMPPlugin) OnInit() {
}
func (p *RTMPPlugin) OnStopPublish(puber *m7s.Publisher, err error) {
}
func (p *RTMPPlugin) OnEvent(event any) {
// ...
}
var _ = m7s.InstallPlugin[*RTMPPlugin]()

324
plugin/rtmp/pkg/codec.go Normal file
View File

@@ -0,0 +1,324 @@
package pkg
import (
"bytes"
"encoding/binary"
"errors"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
var FourCC_H265 = [4]byte{'H', '2', '6', '5'}
var FourCC_AV1 = [4]byte{'a', 'v', '0', '1'}
type AVCDecoderConfigurationRecord struct {
ConfigurationVersion byte // 8 bits Version
AVCProfileIndication byte // 8 bits
ProfileCompatibility byte // 8 bits
AVCLevelIndication byte // 8 bits
Reserved1 byte // 6 bits
LengthSizeMinusOne byte // 2 bits 非常重要,每个NALU包前面都(lengthSizeMinusOne & 3)+1个字节的NAL包长度描述
Reserved2 byte // 3 bits
NumOfSequenceParameterSets byte // 5 bits SPS 的个数,计算方法是 numOfSequenceParameterSets & 0x1F
NumOfPictureParameterSets byte // 8 bits PPS 的个数
SequenceParameterSetLength uint16 // 16 byte SPS Length
SequenceParameterSetNALUnit []byte // n byte SPS
PictureParameterSetLength uint16 // 16 byte PPS Length
PictureParameterSetNALUnit []byte // n byte PPS
}
func (p *AVCDecoderConfigurationRecord) Marshal(b []byte) (n int) {
b[0] = 1
b[1] = p.AVCProfileIndication
b[2] = p.ProfileCompatibility
b[3] = p.AVCLevelIndication
b[4] = p.LengthSizeMinusOne | 0xfc
b[5] = uint8(1) | 0xe0
n += 6
binary.BigEndian.PutUint16(b[n:], p.SequenceParameterSetLength)
n += 2
copy(b[n:], p.SequenceParameterSetNALUnit)
n += len(p.SequenceParameterSetNALUnit)
b[n] = uint8(1)
n++
binary.BigEndian.PutUint16(b[n:], p.PictureParameterSetLength)
n += 2
copy(b[n:], p.PictureParameterSetNALUnit)
n += len(p.PictureParameterSetNALUnit)
return
}
var ErrDecconfInvalid = errors.New("decode error")
func (p *AVCDecoderConfigurationRecord) Unmarshal(b *util.Buffers) (err error) {
if b.Length < 7 {
err = errors.New("not enough len")
return
}
b.ReadByteTo(&p.ConfigurationVersion, &p.AVCProfileIndication, &p.ProfileCompatibility, &p.AVCLevelIndication, &p.LengthSizeMinusOne)
p.LengthSizeMinusOne = p.LengthSizeMinusOne & 0x03
p.NumOfSequenceParameterSets, err = b.ReadByteMask(0x1f)
if err != nil {
return
}
b.Skip(6)
var sps, pps [][]byte
for range p.NumOfSequenceParameterSets {
spslen, err1 := b.ReadBE(2)
if err1 != nil {
return err1
}
spsbytes, err2 := b.ReadBytes(spslen)
if err2 != nil {
return err2
}
sps = append(sps, spsbytes)
}
p.SequenceParameterSetLength = uint16(len(sps[0]))
p.SequenceParameterSetNALUnit = sps[0]
if b.Length < 1 {
err = ErrDecconfInvalid
return
}
ppscount, err1 := b.ReadByte()
if err1 != nil {
return err1
}
for range ppscount {
ppslen, err1 := b.ReadBE(2)
if err1 != nil {
return err1
}
ppsbytes, err2 := b.ReadBytes(ppslen)
if err2 != nil {
return err2
}
pps = append(pps, ppsbytes)
}
if ppscount >= 1 {
p.PictureParameterSetLength = uint16(len(pps[0]))
p.PictureParameterSetNALUnit = pps[0]
} else {
err = ErrDecconfInvalid
}
return
}
func ParseSPS(data []byte) (self codec.SPSInfo, err error) {
r := &util.GolombBitReader{R: bytes.NewReader(data)}
if _, err = r.ReadBits(8); err != nil {
return
}
if self.ProfileIdc, err = r.ReadBits(8); err != nil {
return
}
// constraint_set0_flag-constraint_set6_flag,reserved_zero_2bits
if _, err = r.ReadBits(8); err != nil {
return
}
// level_idc
if self.LevelIdc, err = r.ReadBits(8); err != nil {
return
}
// seq_parameter_set_id
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if self.ProfileIdc == 100 || self.ProfileIdc == 110 ||
self.ProfileIdc == 122 || self.ProfileIdc == 244 ||
self.ProfileIdc == 44 || self.ProfileIdc == 83 ||
self.ProfileIdc == 86 || self.ProfileIdc == 118 {
var chroma_format_idc uint
if chroma_format_idc, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if chroma_format_idc == 3 {
// residual_colour_transform_flag
if _, err = r.ReadBit(); err != nil {
return
}
}
// bit_depth_luma_minus8
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
// bit_depth_chroma_minus8
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
// qpprime_y_zero_transform_bypass_flag
if _, err = r.ReadBit(); err != nil {
return
}
var seq_scaling_matrix_present_flag uint
if seq_scaling_matrix_present_flag, err = r.ReadBit(); err != nil {
return
}
if seq_scaling_matrix_present_flag != 0 {
for i := 0; i < 8; i++ {
var seq_scaling_list_present_flag uint
if seq_scaling_list_present_flag, err = r.ReadBit(); err != nil {
return
}
if seq_scaling_list_present_flag != 0 {
var sizeOfScalingList uint
if i < 6 {
sizeOfScalingList = 16
} else {
sizeOfScalingList = 64
}
lastScale := uint(8)
nextScale := uint(8)
for j := uint(0); j < sizeOfScalingList; j++ {
if nextScale != 0 {
var delta_scale uint
if delta_scale, err = r.ReadSE(); err != nil {
return
}
nextScale = (lastScale + delta_scale + 256) % 256
}
if nextScale != 0 {
lastScale = nextScale
}
}
}
}
}
}
// log2_max_frame_num_minus4
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
var pic_order_cnt_type uint
if pic_order_cnt_type, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if pic_order_cnt_type == 0 {
// log2_max_pic_order_cnt_lsb_minus4
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
} else if pic_order_cnt_type == 1 {
// delta_pic_order_always_zero_flag
if _, err = r.ReadBit(); err != nil {
return
}
// offset_for_non_ref_pic
if _, err = r.ReadSE(); err != nil {
return
}
// offset_for_top_to_bottom_field
if _, err = r.ReadSE(); err != nil {
return
}
var num_ref_frames_in_pic_order_cnt_cycle uint
if num_ref_frames_in_pic_order_cnt_cycle, err = r.ReadExponentialGolombCode(); err != nil {
return
}
for i := uint(0); i < num_ref_frames_in_pic_order_cnt_cycle; i++ {
if _, err = r.ReadSE(); err != nil {
return
}
}
}
// max_num_ref_frames
if _, err = r.ReadExponentialGolombCode(); err != nil {
return
}
// gaps_in_frame_num_value_allowed_flag
if _, err = r.ReadBit(); err != nil {
return
}
if self.MbWidth, err = r.ReadExponentialGolombCode(); err != nil {
return
}
self.MbWidth++
if self.MbHeight, err = r.ReadExponentialGolombCode(); err != nil {
return
}
self.MbHeight++
var frame_mbs_only_flag uint
if frame_mbs_only_flag, err = r.ReadBit(); err != nil {
return
}
if frame_mbs_only_flag == 0 {
// mb_adaptive_frame_field_flag
if _, err = r.ReadBit(); err != nil {
return
}
}
// direct_8x8_inference_flag
if _, err = r.ReadBit(); err != nil {
return
}
var frame_cropping_flag uint
if frame_cropping_flag, err = r.ReadBit(); err != nil {
return
}
if frame_cropping_flag != 0 {
if self.CropLeft, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if self.CropRight, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if self.CropTop, err = r.ReadExponentialGolombCode(); err != nil {
return
}
if self.CropBottom, err = r.ReadExponentialGolombCode(); err != nil {
return
}
}
self.Width = (self.MbWidth * 16) - self.CropLeft*2 - self.CropRight*2
self.Height = ((2 - frame_mbs_only_flag) * self.MbHeight * 16) - self.CropTop*2 - self.CropBottom*2
return
}
// func ParseHevcSPS(data []byte) (self codec.SPSInfo, err error) {
// var rawsps hevc.H265RawSPS
// if err = rawsps.Decode(data); err == nil {
// self.CropLeft, self.CropRight, self.CropTop, self.CropBottom = uint(rawsps.Conf_win_left_offset), uint(rawsps.Conf_win_right_offset), uint(rawsps.Conf_win_top_offset), uint(rawsps.Conf_win_bottom_offset)
// self.Width = uint(rawsps.Pic_width_in_luma_samples)
// self.Height = uint(rawsps.Pic_height_in_luma_samples)
// }
// return
// }
type H264Ctx struct {
codec.SPSInfo
NalulenSize int
SPS []byte
PPS []byte
}
type H265Ctx struct {
H264Ctx
VPS []byte
}

10
plugin/rtmp/pkg/const.go Normal file
View File

@@ -0,0 +1,10 @@
package pkg
const (
PacketTypeSequenceStart = iota
PacketTypeCodedFrames
PacketTypeSequenceEnd
PacketTypeCodedFramesX
PacketTypeMetadata
PacketTypeMPEG2TSSequenceStart
)

184
plugin/rtmp/pkg/video.go Normal file
View File

@@ -0,0 +1,184 @@
package pkg
import (
"net"
"time"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type RTMPVideo struct {
Timestamp uint32
net.Buffers
}
func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
var reader util.Buffers
reader.Buffers = avcc.Buffers
b0, err := reader.ReadByte()
if err != nil {
return err
}
enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf
// frameType := b0 & 0b0111_0000 >> 4
packetType := b0 & 0b1111
parseSequence := func() (err error) {
switch track.Codec {
case "h264":
var ctx H264Ctx
var info AVCDecoderConfigurationRecord
if err = info.Unmarshal(&reader); err == nil {
ctx.SPSInfo, _ = ParseSPS(info.SequenceParameterSetNALUnit)
ctx.NalulenSize = int(info.LengthSizeMinusOne&3 + 1)
ctx.SPS = info.SequenceParameterSetNALUnit
ctx.PPS = info.PictureParameterSetNALUnit
track.ICodecCtx = &ctx
}
case "h265":
// var ctx H265Ctx
case "av1":
}
return
}
if enhanced {
var fourCC [4]byte
_, err = reader.Read(fourCC[:])
if err != nil {
return err
}
switch fourCC {
case FourCC_H265:
track.Codec = "h265"
case FourCC_AV1:
track.Codec = "av1"
}
switch packetType {
case PacketTypeSequenceStart:
if err = parseSequence(); err != nil {
return err
}
return nil
case PacketTypeCodedFrames:
case PacketTypeCodedFramesX:
}
} else {
b0, err = reader.ReadByte() //sequence frame flag
if err != nil {
return err
}
if codec.VideoCodecID(b0&0x0F) == codec.CodecID_H265 {
track.Codec = "h265"
} else {
track.Codec = "h264"
}
reader.ReadBE(3) // cts == 0
if err != nil {
return err
}
if b0 == 0 {
if err = parseSequence(); err != nil {
return err
}
} else {
}
}
return nil
}
func (avcc *RTMPVideo) parseH264(track *AVTrack, reader *util.Buffers, cts uint32) (any, error) {
var nalus Nalus
ctx := track.ICodecCtx.(*H264Ctx)
nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil {
return nalus, err
}
return nalus, nil
}
func (avcc *RTMPVideo) parseH265(track *AVTrack, reader *util.Buffers, cts uint32) (any, error) {
var nalus Nalus
ctx := track.ICodecCtx.(*H265Ctx)
nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil {
return nalus, err
}
return nalus, nil
}
func (avcc *RTMPVideo) parseAV1(track *AVTrack, reader *util.Buffers) (any, error) {
var obus OBUs
obus.PTS = time.Duration(avcc.Timestamp) * 90
if err := obus.ParseAVCC(reader); err != nil {
return obus, err
}
return obus, nil
}
func (avcc *RTMPVideo) ToRaw(track *AVTrack) (any, error) {
var reader util.Buffers
reader.Buffers = avcc.Buffers
b0, err := reader.ReadByte()
if err != nil {
return nil, err
}
enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf
// frameType := b0 & 0b0111_0000 >> 4
packetType := b0 & 0b1111
if enhanced {
err = reader.Skip(4) // fourcc
if err != nil {
return nil, err
}
switch packetType {
case PacketTypeSequenceStart:
if err = avcc.DecodeConfig(track); err != nil {
return nil, err
}
return nil, nil
case PacketTypeCodedFrames:
if track.Codec == "h265" {
cts, err := reader.ReadBE(3) //cts, only h265
if err != nil {
return nil, err
}
return avcc.parseH265(track, &reader, uint32(cts))
} else {
return avcc.parseAV1(track, &reader)
}
case PacketTypeCodedFramesX:
}
} else {
b0, err = reader.ReadByte() //sequence frame flag
if err != nil {
return nil, err
}
cts, err := reader.ReadBE(3)
if err != nil {
return nil, err
}
if b0 == 0 {
if err = avcc.DecodeConfig(track); err != nil {
return nil, err
}
} else {
if track.Codec == "h265" {
return avcc.parseH265(track, &reader, uint32(cts))
} else {
return avcc.parseH264(track, &reader, uint32(cts))
}
}
}
return nil, nil
}
func (avcc *RTMPVideo) FromRaw(track *AVTrack, raw any) error {
return nil
}

View File

@@ -1,23 +1,100 @@
package m7s
import (
"log/slog"
"reflect"
"sync"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
)
type Publisher struct {
Config config.Publish
Plugin *Plugin
Logger *slog.Logger
PubSubBase
config.Publish
VideoTrack *AVTrack
AudioTrack *AVTrack
DataTrack *DataTrack
Subscribers map[*Subscriber]struct{}
sync.RWMutex
}
func (p *Publisher) WriteVideo(data IVideoData) {
func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
p.Lock()
defer p.Unlock()
p.Subscribers[subscriber] = struct{}{}
if p.VideoTrack != nil {
subscriber.VideoTrackReader = NewAVRingReader(p.VideoTrack)
}
if p.AudioTrack != nil {
subscriber.AudioTrackReader = NewAVRingReader(p.AudioTrack)
}
return
}
func (p *Publisher) WriteAudio(data IAudioData) {
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) (err error) {
if t.ICodecCtx == nil {
err = data.DecodeConfig(t)
}
t.Ring.Value.Wrap[0] = data
if n := len(t.DataTypes); n > 1 {
t.Ring.Value.Raw, err = data.ToRaw(t)
if err != nil {
return
}
if t.Ring.Value.Raw == nil {
return
}
for i := 1; i < n; i++ {
t.Ring.Value.Wrap[i] = reflect.New(t.DataTypes[i]).Interface().(IAVFrame)
t.Ring.Value.Wrap[i].FromRaw(t, t.Ring.Value.Raw)
}
}
t.Step()
return
}
func (p *Publisher) WriteData(data IData) {
func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if !p.PubVideo {
return
}
t := p.VideoTrack
if t == nil {
t = &AVTrack{
DataTypes: []reflect.Type{reflect.TypeOf(data)},
}
t.Logger = p.Logger.With("track", "video")
t.Init(256)
p.VideoTrack = t
p.Lock()
for sub := range p.Subscribers {
sub.VideoTrackReader = NewAVRingReader(t)
}
p.Unlock()
}
return p.writeAV(t, data)
}
func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
if !p.PubAudio {
return
}
t := p.AudioTrack
if t == nil {
t = &AVTrack{
DataTypes: []reflect.Type{reflect.TypeOf(data)},
}
t.Logger = p.Logger.With("track", "audio")
t.Init(256)
p.AudioTrack = t
p.Lock()
for sub := range p.Subscribers {
sub.AudioTrackReader = NewAVRingReader(t)
}
p.Unlock()
}
return p.writeAV(t, data)
}
func (p *Publisher) WriteData(data IDataFrame) (err error) {
return
}

View File

@@ -5,17 +5,25 @@ import (
"errors"
"log/slog"
"reflect"
"slices"
"time"
"unsafe"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
type Server struct {
context.Context `json:"-" yaml:"-"`
context.CancelCauseFunc `json:"-" yaml:"-"`
Plugins []*Plugin
EventBus `json:"-" yaml:"-"`
StartTime time.Time
context.Context
context.CancelCauseFunc
EventBus
*slog.Logger
Config config.Engine
Plugins []*Plugin
Publishers map[string]*Publisher
Waiting map[string]*Subscriber
}
var DefaultServer = &Server{}
@@ -31,13 +39,25 @@ func Run(ctx context.Context) {
func (s *Server) Run(ctx context.Context) {
s.Logger = slog.With("server", uintptr(unsafe.Pointer(s)))
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
s.EventBus = NewEventBus()
s.EventBus = NewEventBus(10)
s.Config.HTTP.ListenAddrTLS = ":8443"
s.Config.HTTP.ListenAddr = ":8080"
s.Info("start")
s.initPlugins()
pulse := time.NewTicker(s.Config.PulseInterval)
select {
case <-s.Done():
s.Warn("Server is done", "reason", context.Cause(s))
pulse.Stop()
return
case <-pulse.C:
case event := <-s.EventBus:
switch v := event.(type) {
case util.Promise[*Publisher]:
v.CancelCauseFunc(s.OnPublish(v.Value))
case util.Promise[*Subscriber]:
v.CancelCauseFunc(s.OnSubscribe(v.Value))
}
for _, plugin := range s.Plugins {
if plugin.Disabled {
continue
@@ -59,7 +79,47 @@ func (s *Server) initPlugins() {
p.Meta = &plugin
p.server = s
p.Logger = s.Logger.With("plugin", plugin.Name)
p.Context, p.CancelCauseFunc = context.WithCancelCause(s.Context)
s.Plugins = append(s.Plugins, p)
p.OnInit()
instance.OnInit()
}
}
func (s *Server) OnPublish(publisher *Publisher) error {
if oldPublisher, ok := s.Publishers[publisher.StreamPath]; ok {
if publisher.KickExist {
oldPlugin := oldPublisher.Plugin
publisher.Warn("kick")
oldPlugin.handler.(IPublishPlugin).OnStopPublish(oldPublisher, ErrKick)
if index := slices.Index(oldPlugin.Publishers, oldPublisher); index != -1 {
oldPlugin.Publishers = slices.Delete(oldPlugin.Publishers, index, index+1)
}
publisher.VideoTrack = oldPublisher.VideoTrack
publisher.AudioTrack = oldPublisher.AudioTrack
publisher.DataTrack = oldPublisher.DataTrack
publisher.Subscribers = oldPublisher.Subscribers
oldPublisher.Subscribers = nil
} else {
return ErrStreamExist
}
} else {
s.Publishers[publisher.StreamPath] = publisher
publisher.Plugin.Info("publish", "streamPath", publisher.StreamPath)
publisher.Plugin.Publishers = append(publisher.Plugin.Publishers, publisher)
}
if subscriber, ok := s.Waiting[publisher.StreamPath]; ok {
delete(s.Waiting, publisher.StreamPath)
publisher.AddSubscriber(subscriber)
}
return nil
}
func (s *Server) OnSubscribe(subscriber *Subscriber) error {
if publisher, ok := s.Publishers[subscriber.StreamPath]; ok {
return publisher.AddSubscriber(subscriber)
} else {
s.Waiting[subscriber.StreamPath] = subscriber
}
return nil
}

34
subscriber.go Normal file
View File

@@ -0,0 +1,34 @@
package m7s
import (
"log/slog"
"net/url"
"time"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
)
type PubSubBase struct {
*slog.Logger `json:"-" yaml:"-"`
Plugin *Plugin
StartTime time.Time
StreamPath string
Args url.Values
}
func (ps *PubSubBase) Init(p *Plugin, streamPath string) {
ps.Plugin = p
if u, err := url.Parse(streamPath); err == nil {
ps.StreamPath, ps.Args = u.Path, u.Query()
}
ps.Logger = p.With("streamPath", ps.StreamPath)
ps.StartTime = time.Now()
}
type Subscriber struct {
PubSubBase
config.Subscribe
VideoTrackReader *AVRingReader
AudioTrackReader *AVRingReader
}