修复日志打印序列化报错问题

This commit is contained in:
langhuihui
2023-04-15 08:37:22 +08:00
parent ec3dfa4c0e
commit 9dd9fe6cea
19 changed files with 74 additions and 48 deletions

View File

@@ -59,15 +59,15 @@ type DataFrame[T any] struct {
type AVFrame struct {
BaseFrame
IFrame bool
CanRead bool `json:"-"`
CanRead bool `json:"-" yaml:"-"`
PTS time.Duration
DTS time.Duration
Timestamp time.Duration // 绝对时间戳
ADTS *util.ListItem[util.Buffer] `json:"-"` // ADTS头
AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP util.List[RTPFrame] `json:"-"`
AUList util.BLLs `json:"-"` // 裸数据
Extras any `json:"-"` // 任意扩展数据
ADTS *util.ListItem[util.Buffer] `json:"-" yaml:"-"` // ADTS头
AVCC util.BLL `json:"-" yaml:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP util.List[RTPFrame] `json:"-" yaml:"-"`
AUList util.BLLs `json:"-" yaml:"-"` // 裸数据
Extras any `json:"-" yaml:"-"` // 任意扩展数据
}
func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {

View File

@@ -24,9 +24,9 @@ const (
// Base 基础Track类
type Base struct {
Name string
log.Zap `json:"-"`
Stream IStream `json:"-"`
Attached atomic.Bool `json:"-"`
log.Zap `json:"-" yaml:"-"`
Stream IStream `json:"-" yaml:"-"`
Attached atomic.Bool `json:"-" yaml:"-"`
State TrackState
ts time.Time
bytes int

View File

@@ -5,7 +5,7 @@ import (
)
type RingBuffer[T any] struct {
*util.Ring[T] `json:"-"`
*util.Ring[T] `json:"-" yaml:"-"`
Size int
MoveCount uint32
LastValue *T

View File

@@ -66,6 +66,7 @@ type Pull struct {
RePull int // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉高于0 的数代表最大重拉次数
PullOnStart map[string]string // 启动时拉流的列表
PullOnSub map[string]string // 订阅时自动拉流的列表
Proxy string // 代理地址
}
func (p *Pull) GetPullConfig() *Pull {
@@ -89,6 +90,7 @@ func (p *Pull) AddPullOnSub(streamPath string, url string) {
type Push struct {
RePush int // 断开后自动重推,0 表示不自动重推,-1 表示无限重推高于0 的数代表最大重推次数
PushList map[string]string // 自动推流列表
Proxy string // 代理地址
}
func (p *Push) GetPushConfig() *Push {

View File

@@ -8,7 +8,7 @@ import (
type Event[T any] struct {
Time time.Time
Target T `json:"-"`
Target T `json:"-" yaml:"-"`
}
func CreateEvent[T any](target T) (event Event[T]) {

16
io.go
View File

@@ -34,16 +34,16 @@ type AuthPub interface {
type IO struct {
ID string
Type string
context.Context `json:"-"` //不要直接设置应当通过OnEvent传入父级Context
context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者
*log.Logger `json:"-"`
context.Context `json:"-" yaml:"-"` //不要直接设置应当通过OnEvent传入父级Context
context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
*log.Logger `json:"-" yaml:"-"`
StartTime time.Time //创建时间
Stream *Stream `json:"-"`
io.Reader `json:"-"`
io.Writer `json:"-"`
io.Closer `json:"-"`
Stream *Stream `json:"-" yaml:"-"`
io.Reader `json:"-" yaml:"-"`
io.Writer `json:"-" yaml:"-"`
io.Closer `json:"-" yaml:"-"`
Args url.Values
Spesific IIO `json:"-"`
Spesific IIO `json:"-" yaml:"-"`
}
func (io *IO) IsClosed() bool {

View File

@@ -18,6 +18,8 @@ name: 名称
state: 状态
initialize: 初始化
"start read": 开始读取
"start pull": 开始从远端拉流
"pull failed": 拉取失败
"wait publisher": 等待发布者发布
"wait timeout": 等待超时
created: 已创建
@@ -39,10 +41,10 @@ reamins: 剩余
"http handle added": http处理器已添加
"http handle added to engine": http处理器已添加到引擎
"plugin disabled": 插件已禁用
"pull failed": 拉取失败
"audio codec not support yet": 音频编码暂不支持
"video track attached": 视频轨道已附加
"audio track attached": 音频轨道已附加
"data track attached": 数据轨道已附加
"first frame read": 第一帧已读取
firstTs: 第一帧时间戳
firstSeq: 第一帧序列号

10
main.go
View File

@@ -22,9 +22,9 @@ import (
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/lang"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
"m7s.live/engine/v4/lang"
)
var (
@@ -185,12 +185,20 @@ func Run(ctx context.Context, configFile string) (err error) {
for {
select {
case event := <-EventBus:
ts := time.Now()
for _, plugin := range Plugins {
if !plugin.Disabled {
ts := time.Now()
plugin.Config.OnEvent(event)
if cost := time.Since(ts); cost > time.Millisecond*100 {
plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
}
}
}
EngineConfig.OnEvent(event)
if cost := time.Since(ts); cost > time.Millisecond*100 {
log.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
}
case <-ctx.Done():
return
case <-reportTimer.C:

View File

@@ -56,16 +56,16 @@ type DefaultYaml string
// Plugin 插件信息
type Plugin struct {
context.Context `json:"-"`
context.CancelFunc `json:"-"`
context.Context `json:"-" yaml:"-"`
context.CancelFunc `json:"-" yaml:"-"`
Name string //插件名称
Config config.Plugin `json:"-"` //类型化的插件配置
Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置
Version string //插件版本
Yaml string //配置文件中的配置项
modifiedYaml string //修改过的配置的yaml文件内容
RawConfig config.Config //最终合并后的配置的map形式方便查询
Modified config.Config //修改过的配置项
*log.Logger `json:"-"`
*log.Logger `json:"-" yaml:"-"`
saveTimer *time.Timer //用于保存的时候的延迟,防抖
Disabled bool
}

View File

@@ -10,7 +10,7 @@ import (
type MP4Publisher struct {
Publisher
*mp4.MovDemuxer `json:"-"`
*mp4.MovDemuxer `json:"-" yaml:"-"`
}
// Start reading the MP4 file

View File

@@ -19,9 +19,9 @@ type cacheItem struct {
type PSPublisher struct {
Publisher
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
// mpegps.MpegPsStream `json:"-"`
// *mpegps.PSDemuxer `json:"-"`
mpegps.DecPSPackage `json:"-"`
// mpegps.MpegPsStream `json:"-" yaml:"-"`
// *mpegps.PSDemuxer `json:"-" yaml:"-"`
mpegps.DecPSPackage `json:"-" yaml:"-"`
reorder util.RTPReorder[*cacheItem]
pool util.BytesPool
lastSeq uint16

View File

@@ -8,7 +8,7 @@ import (
type TSPublisher struct {
Publisher
mpegts.MpegTsStream
mpegts.MpegTsStream `json:"-" yaml:"-"`
}
func (t *TSPublisher) OnEvent(event any) {
@@ -21,6 +21,9 @@ func (t *TSPublisher) OnEvent(event any) {
t.AudioTrack = v.getAudioTrack()
t.VideoTrack = v.getVideoTrack()
}
case SEKick, SEclose:
close(t.PESChan)
t.Publisher.OnEvent(event)
default:
t.Publisher.OnEvent(event)
}

View File

@@ -21,8 +21,8 @@ var _ IPublisher = (*Publisher)(nil)
type Publisher struct {
IO
Config *config.Publish
common.AudioTrack `json:"-"`
common.VideoTrack `json:"-"`
common.AudioTrack `json:"-" yaml:"-"`
common.VideoTrack `json:"-" yaml:"-"`
}
func (p *Publisher) GetPublisher() *Publisher {
@@ -64,7 +64,7 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo
b0 := frame.GetByte(0)
// https://github.com/veovera/enhanced-rtmp/blob/main/enhanced-rtmp-v1.pdf
if isExtHeader := b0 & 0b1000_0000; isExtHeader != 0 {
fourCC := frame.GetUintN(1,4)
fourCC := frame.GetUintN(1, 4)
if fourCC == codec.FourCC_H265_32 {
p.VideoTrack = track.NewH265(p.Stream, pool)
p.VideoTrack.WriteAVCC(ts, frame)

View File

@@ -396,6 +396,9 @@ func (s *Stream) run() {
}
}
s.Tracks.ModifyRange(func(name string, t Track) {
if _, ok := t.(*track.Data); ok {
return
}
// track 超过一定时间没有更新数据了
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
@@ -422,12 +425,13 @@ func (s *Stream) run() {
}
case action, ok := <-s.actionChan.C:
timeStart = time.Now()
timeOutInfo = zap.Any("action", action)
if ok {
switch v := action.(type) {
case SubPulse:
timeOutInfo = zap.String("action", "SubPulse")
pulseSuber[v] = struct{}{}
case *util.Promise[IPublisher]:
timeOutInfo = zap.String("action", "Publish")
if s.IsClosed() {
v.Reject(ErrStreamIsClosed)
}
@@ -441,6 +445,7 @@ func (s *Stream) run() {
v.Reject(ErrBadStreamName)
}
case *util.Promise[ISubscriber]:
timeOutInfo = zap.String("action", "Subscribe")
if s.IsClosed() {
v.Reject(ErrStreamIsClosed)
}
@@ -487,9 +492,11 @@ func (s *Stream) run() {
s.action(ACTION_FIRSTENTER)
}
case Unsubscribe:
timeOutInfo = zap.String("action", "Unsubscribe")
delete(pulseSuber, v)
s.onSuberClose(v)
case TrackRemoved:
timeOutInfo = zap.String("action", "TrackRemoved")
name := v.GetBase().Name
if t, ok := s.Tracks.Delete(name); ok {
s.Info("track -1", zap.String("name", name))
@@ -502,6 +509,7 @@ func (s *Stream) run() {
}
}
case *util.Promise[Track]:
timeOutInfo = zap.String("action", "Track")
if s.State == STATE_WAITPUBLISH {
s.action(ACTION_PUBLISH)
}
@@ -521,8 +529,10 @@ func (s *Stream) run() {
case NoMoreTrack:
s.Subscribers.AbortWait()
case StreamAction:
timeOutInfo = zap.String("action", "StreamAction")
s.action(v)
default:
timeOutInfo = zap.String("action", "unknown")
s.Error("unknown action", timeOutInfo)
}
} else {

View File

@@ -116,7 +116,7 @@ type Subscriber struct {
IO
IsInternal bool //是否内部订阅,不放入订阅列表
Config *config.Subscribe
TrackPlayer `json:"-"`
TrackPlayer `json:"-" yaml:"-"`
}
func (s *Subscriber) GetSubscriber() *Subscriber {

View File

@@ -85,15 +85,15 @@ type Media struct {
Base
RingBuffer[AVFrame]
PayloadType byte
IDRingList `json:"-"` //最近的关键帧位置,首屏渲染
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
SSRC uint32
SampleRate uint32
BytesPool util.BytesPool `json:"-"`
RtpPool util.Pool[RTPFrame] `json:"-"`
SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
BytesPool util.BytesPool `json:"-" yaml:"-"`
RtpPool util.Pool[RTPFrame] `json:"-" yaml:"-"`
SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
SequenceHeadSeq int
RTPDemuxer
SpesificTrack `json:"-"`
SpesificTrack `json:"-" yaml:"-"`
deltaTs time.Duration //用于接续发布后时间戳连续
流速控制
}

View File

@@ -33,6 +33,7 @@ func (dt *Data) Push(data any) {
dt.Lock()
defer dt.Unlock()
}
dt.Value.WriteTime = time.Now()
dt.Write(data)
}

View File

@@ -14,7 +14,7 @@ var _ SpesificTrack = (*H265)(nil)
type H265 struct {
Video
VPS []byte `json:"-"`
VPS []byte `json:"-" yaml:"-"`
}
func NewH265(stream IStream, stuff ...any) (vt *H265) {

View File

@@ -23,9 +23,9 @@ type Video struct {
dtsEst *DTSEstimator
lostFlag bool // 是否丢帧
codec.SPSInfo
ParamaterSets `json:"-"`
SPS []byte `json:"-"`
PPS []byte `json:"-"`
ParamaterSets `json:"-" yaml:"-"`
SPS []byte `json:"-" yaml:"-"`
PPS []byte `json:"-" yaml:"-"`
}
func (v *Video) Attach() {