新增环境变量覆盖配置功能
兼容偶尔时间戳不对的情况

This commit is contained in:
langhuihui
2023-04-23 10:18:40 +08:00
parent 2f71561bcd
commit e9177794cb
15 changed files with 246 additions and 187 deletions

View File

@@ -58,12 +58,12 @@ func (d *DTSEstimator) Feed(pts uint32) uint32 {
dts = d.cache[0]
}
// if d.prevDTS > dts {
// dts = d.prevDTS
// }
if d.prevDTS >= dts {
dts = d.prevDTS + 90
if d.prevDTS > dts {
dts = d.prevDTS
}
// if d.prevDTS >= dts {
// dts = d.prevDTS + 90
// }
d.prevPTS = pts
d.prevDTS = dts
return dts

View File

@@ -31,7 +31,7 @@ type Base struct {
ts time.Time
bytes int
frames int
drops int //丢帧数
DropCount int `json:"-" yaml:"-"` //丢帧数
BPS int
FPS int
Drops int // 丢帧率
@@ -45,10 +45,10 @@ func (bt *Base) ComputeBPS(bytes int) {
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
bt.BPS = int(float64(bt.bytes) / elapse)
bt.FPS = int(float64(bt.frames) / elapse)
bt.Drops = int(float64(bt.drops) / elapse)
bt.Drops = int(float64(bt.DropCount) / elapse)
bt.bytes = 0
bt.frames = 0
bt.drops = 0
bt.DropCount = 0
bt.ts = time.Now()
}
}

View File

@@ -1,6 +1,7 @@
package config
import (
"fmt"
"net"
"net/http"
"os"
@@ -8,6 +9,7 @@ import (
"strings"
"time"
"gopkg.in/yaml.v3"
"m7s.live/engine/v4/log"
)
@@ -89,72 +91,15 @@ func (config Config) Unmarshal(s any) {
}
// 需要被写入的字段
fv := el.FieldByName(name)
fvKind := fv.Kind()
ft := fv.Type()
value := reflect.ValueOf(v)
if child, ok := v.(Config); ok { //处理值是递归情况map)
if fvKind == reflect.Map {
if fv.Kind() == reflect.Map {
if fv.IsNil() {
fv.Set(reflect.MakeMap(ft))
fv.Set(reflect.MakeMap(fv.Type()))
}
}
child.Unmarshal(fv)
} else {
if ft == durationType && fv.CanSet() {
if value.Type() == durationType {
fv.Set(value)
} else if value.IsZero() || !value.IsValid() {
fv.SetInt(0)
} else if d, err := time.ParseDuration(value.String()); err == nil {
fv.SetInt(int64(d))
} else {
if Global.LogLang == "zh" {
log.Errorf("%s 无效的时间值: %v 请添加单位s,m,h,d例如100ms, 10s, 4m, 1h", k, value)
} else {
log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d)eg: 100ms, 10s, 4m, 1h", k, value)
}
os.Exit(1)
}
continue
}
switch fvKind {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
fv.SetUint(uint64(value.Int()))
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
fv.SetInt(value.Int())
case reflect.Float32, reflect.Float64:
if value.CanFloat() {
fv.SetFloat(value.Float())
} else {
fv.SetFloat(float64(value.Int()))
}
case reflect.Slice:
var s reflect.Value
if value.Kind() == reflect.Slice {
l := value.Len()
s = reflect.MakeSlice(ft, l, value.Cap())
for i := 0; i < l; i++ {
fv := value.Index(i)
item := s.Index(i)
if child, ok := fv.Interface().(Config); ok {
item.Set(child.CreateElem(ft.Elem()))
} else if fv.Kind() == reflect.Interface {
item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type()))
} else {
item.Set(fv)
}
}
} else {
//值是单值,但类型是数组,默认解析为一个元素的数组
s = reflect.MakeSlice(ft, 1, 1)
s.Index(0).Set(value)
}
fv.Set(s)
default:
if value.IsValid() {
fv.Set(value)
}
}
assign(name, fv, reflect.ValueOf(v))
}
}
}
@@ -205,9 +150,9 @@ func (config *Config) Set(key string, value any) {
}
}
func (config Config) Get(key string) any {
v, _ := config[strings.ToLower(key)]
return v
func (config Config) Get(key string) (v any) {
v = config[strings.ToLower(key)]
return
}
func (config Config) Has(key string) (ok bool) {
@@ -227,35 +172,102 @@ func (config Config) GetChild(key string) Config {
return nil
}
func Struct2Config(s any) (config Config) {
func Struct2Config(s any, prefix ...string) (config Config) {
config = make(Config)
var t reflect.Type
var v reflect.Value
if vv, ok := s.(reflect.Value); ok {
v = vv
t = vv.Type()
t, v = vv.Type(), vv
} else {
t = reflect.TypeOf(s)
v = reflect.ValueOf(s)
t, v = reflect.TypeOf(s), reflect.ValueOf(s)
if t.Kind() == reflect.Pointer {
v = v.Elem()
t = t.Elem()
t, v = t.Elem(), v.Elem()
}
}
for i, j := 0, t.NumField(); i < j; i++ {
ft := t.Field(i)
ft, fv := t.Field(i), v.Field(i)
if !ft.IsExported() {
continue
}
name := strings.ToLower(ft.Name)
var envPath []string
if len(prefix) > 0 {
envPath = append(prefix, strings.ToUpper(ft.Name))
envKey := strings.Join(envPath, "_")
if envValue := os.Getenv(envKey); envValue != "" {
yaml.Unmarshal([]byte(fmt.Sprintf("%s: %s", name, envValue)), config)
assign(envKey, fv, reflect.ValueOf(config[name]))
config[name] = fv.Interface()
return
}
}
switch ft.Type.Kind() {
case reflect.Struct:
config[name] = Struct2Config(v.Field(i))
config[name] = Struct2Config(fv, envPath...)
case reflect.Slice:
fallthrough
default:
reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(name), v.Field(i))
reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(name), fv)
}
}
return
}
func assign(k string, target reflect.Value, source reflect.Value) {
ft := target.Type()
if ft == durationType && target.CanSet() {
if source.Type() == durationType {
target.Set(source)
} else if source.IsZero() || !source.IsValid() {
target.SetInt(0)
} else if d, err := time.ParseDuration(source.String()); err == nil {
target.SetInt(int64(d))
} else {
if Global.LogLang == "zh" {
log.Errorf("%s 无效的时间值: %v 请添加单位s,m,h,d例如100ms, 10s, 4m, 1h", k, source)
} else {
log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d)eg: 100ms, 10s, 4m, 1h", k, source)
}
os.Exit(1)
}
return
}
switch target.Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
target.SetUint(uint64(source.Int()))
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
target.SetInt(source.Int())
case reflect.Float32, reflect.Float64:
if source.CanFloat() {
target.SetFloat(source.Float())
} else {
target.SetFloat(float64(source.Int()))
}
case reflect.Slice:
var s reflect.Value
if source.Kind() == reflect.Slice {
l := source.Len()
s = reflect.MakeSlice(ft, l, source.Cap())
for i := 0; i < l; i++ {
fv := source.Index(i)
item := s.Index(i)
if child, ok := fv.Interface().(Config); ok {
item.Set(child.CreateElem(ft.Elem()))
} else if fv.Kind() == reflect.Interface {
item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type()))
} else {
item.Set(fv)
}
}
} else {
//值是单值,但类型是数组,默认解析为一个元素的数组
s = reflect.MakeSlice(ft, 1, 1)
s.Index(0).Set(source)
}
target.Set(s)
default:
if source.IsValid() {
target.Set(source.Convert(ft))
}
}
}

View File

@@ -13,7 +13,7 @@ import (
)
type PublishConfig interface {
GetPublishConfig() *Publish
GetPublishConfig() Publish
}
type SubscribeConfig interface {
@@ -37,7 +37,7 @@ type Publish struct {
BufferTime time.Duration `default:"0s"` // 缓冲长度(单位:秒)0代表取最近关键帧
}
func (c *Publish) GetPublishConfig() *Publish {
func (c Publish) GetPublishConfig() Publish {
return c
}

View File

@@ -58,7 +58,12 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) {
}
func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request) {
if err := json.NewEncoder(rw).Encode(Plugins); err != nil {
format := r.URL.Query().Get("format")
if format == "yaml" {
if err := yaml.NewEncoder(rw).Encode(Plugins); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
} else if err := json.NewEncoder(rw).Encode(Plugins); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
}

View File

@@ -47,6 +47,7 @@ reamins: 剩余
"data track attached": 数据轨道已附加
"first frame read": 第一帧已读取
"fu have no start": rtp的FU起始包丢了
"disabled by env": 被环境变量禁用
firstTs: 第一帧时间戳
firstSeq: 第一帧序列号
skipSeq: 跳过序列号

View File

@@ -96,13 +96,18 @@ func Run(ctx context.Context, configFile string) (err error) {
log.LogLevel.SetLevel(loglevel)
Engine.Logger = log.LocaleLogger.Named("engine")
// 使得RawConfig具备全量配置信息用于合并到插件配置中
Engine.RawConfig = config.Struct2Config(EngineConfig.Engine)
Engine.RawConfig = config.Struct2Config(&EngineConfig.Engine, "GLOBAL")
Engine.assign()
Engine.Logger.Debug("", zap.Any("config", EngineConfig))
EventBus = make(chan any, EngineConfig.EventBusSize)
go EngineConfig.Listen(Engine)
for _, plugin := range plugins {
plugin.Logger = log.LocaleLogger.Named(plugin.Name)
if os.Getenv(strings.ToUpper(plugin.Name)+"_ENABLE") == "false" {
plugin.Disabled = true
plugin.Warn("disabled by env")
continue
}
plugin.Info("initialize", zap.String("version", plugin.Version))
userConfig := cg.GetChild(plugin.Name)
if userConfig != nil {
@@ -136,7 +141,7 @@ func Run(ctx context.Context, configFile string) (err error) {
}
var enabledPlugins, disabledPlugins []string
for _, plugin := range plugins {
if plugin.RawConfig["enable"] == false || plugin.Disabled {
if plugin.Disabled || plugin.RawConfig["enable"] == false {
plugin.Disabled = true
disabledPlugins = append(disabledPlugins, plugin.Name)
} else {

View File

@@ -1,7 +1,6 @@
package engine
import (
"bytes"
"context"
"errors"
"io"
@@ -115,6 +114,7 @@ func (opt *Plugin) assign() {
opt.Warn("assign config failed", zap.Error(err))
}
}
if opt == Engine {
opt.registerHandler()
return
@@ -148,16 +148,17 @@ func (opt *Plugin) assign() {
func (opt *Plugin) run() {
opt.Context, opt.CancelFunc = context.WithCancel(Engine)
opt.RawConfig.Unmarshal(opt.Config)
opt.RawConfig = config.Struct2Config(opt.Config, strings.ToUpper(opt.Name))
// var buffer bytes.Buffer
// err := yaml.NewEncoder(&buffer).Encode(opt.Config)
// if err != nil {
// panic(err)
// }
// err = yaml.NewDecoder(&buffer).Decode(&opt.RawConfig)
// if err != nil {
// panic(err)
// }
opt.Config.OnEvent(FirstConfig(opt.RawConfig))
var buffer bytes.Buffer
err := yaml.NewEncoder(&buffer).Encode(opt.Config)
if err != nil {
panic(err)
}
err = yaml.NewDecoder(&buffer).Decode(&opt.RawConfig)
if err != nil {
panic(err)
}
delete(opt.RawConfig, "defaultyaml")
opt.Debug("config", zap.Any("config", opt.Config))
// opt.RawConfig = config.Struct2Config(opt.Config)
@@ -214,13 +215,23 @@ func (opt *Plugin) Save() error {
}
func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
puber := pub.GetPublisher()
if puber == nil {
if EngineConfig.LogLang == "zh" {
return errors.New("不是发布者")
} else {
return errors.New("not publisher")
}
}
if puber.Config == nil {
conf, ok := opt.Config.(config.PublishConfig)
if !ok {
conf = EngineConfig
}
var copyConfig = *conf.GetPublishConfig()
pub.GetPublisher().Config = &copyConfig
return pub.receive(streamPath, pub)
copyConfig := conf.GetPublishConfig()
puber.Config = &copyConfig
}
return pub.Publish(streamPath, pub)
}
var ErrStreamNotExist = errors.New("stream not exist")
@@ -233,12 +244,7 @@ func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error {
opt.Warn("stream not exist", zap.String("path", streamPath))
return ErrStreamNotExist
}
conf, ok := opt.Config.(config.SubscribeConfig)
if !ok {
conf = EngineConfig
}
sub.GetSubscriber().Config = conf.GetSubscribeConfig()
return sub.receive(streamPath, sub)
return opt.Subscribe(streamPath, sub)
}
// Subscribe 订阅一个流,如果流不存在则创建一个等待流
@@ -251,12 +257,15 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
return errors.New("not subscriber")
}
}
if suber.Config == nil {
conf, ok := opt.Config.(config.SubscribeConfig)
if !ok {
conf = EngineConfig
}
suber.Config = conf.GetSubscribeConfig()
return sub.receive(streamPath, sub)
copyConfig := *conf.GetSubscribeConfig()
suber.Config = &copyConfig
}
return sub.Subscribe(streamPath, sub)
}
// SubscribeBlock 阻塞订阅一个流,直到订阅结束

View File

@@ -69,33 +69,33 @@ func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64
if p.AudioTrack != nil {
p.AudioTrack.WriteADTS(uint32(pts), frame)
} else {
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
}
case mpeg2.PS_STREAM_G711A:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, true)
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
}
case mpeg2.PS_STREAM_G711U:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, false)
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
}
case mpeg2.PS_STREAM_H264:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH264(p.Publisher.Stream)
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
}
case mpeg2.PS_STREAM_H265:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH265(p.Publisher.Stream)
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
}
}
}
@@ -137,9 +137,9 @@ func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if p.VideoTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream)
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
case mpegts.STREAM_TYPE_H265:
p.VideoTrack = NewH265(p.Publisher.Stream)
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
default:
//推测编码类型
var maybe264 codec.H264NALUType
@@ -151,10 +151,10 @@ func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
codec.NALU_SPS,
codec.NALU_PPS,
codec.NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream)
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
p.VideoTrack = NewH265(p.Publisher.Stream)
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
}
}
}
@@ -173,11 +173,11 @@ func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
if p.AudioTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_G711A:
p.AudioTrack = NewG711(p.Publisher.Stream, true)
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
case mpegts.STREAM_TYPE_G711U:
p.AudioTrack = NewG711(p.Publisher.Stream, false)
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
p.WriteADTS(ts, payload)
case 0: //推测编码类型
if payload[0] == 0xff && payload[1]>>4 == 0xf {

View File

@@ -4,10 +4,12 @@ import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
)
type TSPublisher struct {
Publisher
pool util.BytesPool
mpegts.MpegTsStream `json:"-" yaml:"-"`
}
@@ -16,6 +18,7 @@ func (t *TSPublisher) OnEvent(event any) {
case IPublisher:
t.PESChan = make(chan *mpegts.MpegTsPESPacket, 50)
t.PESBuffer = make(map[uint16]*mpegts.MpegTsPESPacket)
t.pool = make(util.BytesPool, 17)
go t.ReadPES()
if !t.Equal(v) {
t.AudioTrack = v.getAudioTrack()
@@ -33,23 +36,23 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) {
switch s.StreamType {
case mpegts.STREAM_TYPE_H264:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH264(t.Publisher.Stream)
t.VideoTrack = track.NewH264(t.Publisher.Stream, t.pool)
}
case mpegts.STREAM_TYPE_H265:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH265(t.Publisher.Stream)
t.VideoTrack = track.NewH265(t.Publisher.Stream, t.pool)
}
case mpegts.STREAM_TYPE_AAC:
if t.AudioTrack == nil {
t.AudioTrack = track.NewAAC(t.Publisher.Stream)
t.AudioTrack = track.NewAAC(t.Publisher.Stream, t.pool)
}
case mpegts.STREAM_TYPE_G711A:
if t.AudioTrack == nil {
t.AudioTrack = track.NewG711(t.Publisher.Stream, true)
t.AudioTrack = track.NewG711(t.Publisher.Stream, true, t.pool)
}
case mpegts.STREAM_TYPE_G711U:
if t.AudioTrack == nil {
t.AudioTrack = track.NewG711(t.Publisher.Stream, false)
t.AudioTrack = track.NewG711(t.Publisher.Stream, false, t.pool)
}
default:
t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))

View File

@@ -14,6 +14,7 @@ type IPublisher interface {
GetPublisher() *Publisher
getAudioTrack() common.AudioTrack
getVideoTrack() common.VideoTrack
Publish(streamPath string, pub IPublisher) error
}
var _ IPublisher = (*Publisher)(nil)
@@ -25,6 +26,10 @@ type Publisher struct {
common.VideoTrack `json:"-" yaml:"-"`
}
func (p *Publisher) Publish(streamPath string, pub IPublisher) error {
return p.receive(streamPath, pub)
}
func (p *Publisher) GetPublisher() *Publisher {
return p
}

View File

@@ -20,6 +20,13 @@ import (
type StreamState byte
type StreamAction byte
func (s StreamState) String() string {
return StateNames[s]
}
func (s StreamAction) String() string {
return ActionNames[s]
}
// 四状态机
const (
STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
@@ -261,7 +268,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
r.SEHistory = append(r.SEHistory, event)
// 给Publisher状态变更的回调方便进行远程拉流等操作
var stateEvent any
r.Info(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[next]), zap.String("action", ActionNames[action]))
r.Info(Sprintf("%s%s%s", event.From.String(), Yellow("->"), next.String()), zap.String("action", action.String()))
switch next {
case STATE_WAITPUBLISH:
stateEvent = SEwaitPublish{event, r.Publisher}
@@ -309,7 +316,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
r.Publisher.OnEvent(stateEvent)
}
} else {
r.Debug("wrong action", zap.String("action", ActionNames[action]))
r.Debug("wrong action", zap.String("action", action.String()))
}
return
}
@@ -382,7 +389,7 @@ func (s *Stream) run() {
}
case <-s.timeout.C:
timeStart = time.Now()
timeOutInfo = zap.String("state", StateNames[s.State])
timeOutInfo = zap.String("state", s.State.String())
if s.State == STATE_PUBLISHING {
for sub := range s.Subscribers.internal {
if sub.IsClosed() {
@@ -529,7 +536,7 @@ func (s *Stream) run() {
case NoMoreTrack:
s.Subscribers.AbortWait()
case StreamAction:
timeOutInfo = zap.String("action", "StreamAction")
timeOutInfo = zap.String("action", "StreamAction"+v.String())
s.action(v)
default:
timeOutInfo = zap.String("action", "unknown")

View File

@@ -3,6 +3,7 @@ package engine
import (
"bufio"
"context"
"fmt"
"io"
"net"
"strconv"
@@ -101,6 +102,7 @@ type ISubscriber interface {
PlayBlock(byte)
PlayFLV()
Stop()
Subscribe(streamPath string, sub ISubscriber) error
}
type TrackPlayer struct {
@@ -119,9 +121,14 @@ type Subscriber struct {
TrackPlayer `json:"-" yaml:"-"`
}
func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error {
return s.receive(streamPath, sub)
}
func (s *Subscriber) GetSubscriber() *Subscriber {
return s
}
func (s *Subscriber) SetIO(i any) {
s.IO.SetIO(i)
if s.Writer != nil && s.Config != nil && s.Config.WriteBufferSize > 0 {
@@ -216,7 +223,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
switch subType {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame) {
// fmt.Println("v", s.VideoReader.Delay)
// fmt.Println("v", frame.Sequence, s.VideoReader.AbsTime, s.VideoReader.Delay)
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
}
sendAudioFrame = func(frame *AVFrame) {
@@ -271,7 +278,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead)
}
sendVideoFrame = func(frame *AVFrame) {
// fmt.Println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame)
// fmt.Println(frame.Sequence, s.VideoReader.AbsTime, s.VideoReader.Delay, frame.IFrame)
// b := util.Buffer(frame.AVCC.ToBytes()[5:])
// for b.CanRead() {
// nalulen := int(b.ReadUint32())
@@ -285,7 +292,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
}
sendAudioFrame = func(frame *AVFrame) {
// fmt.Println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime)
fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay)
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
}
}

View File

@@ -26,6 +26,9 @@ func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration)
// println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
}
func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration {
if dts < p.起始dts {
dts += 0xFFFFFFFFF
}
return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90
}
@@ -194,10 +197,10 @@ func (av *Media) AppendAuBytes(b ...[]byte) {
}
func (av *Media) narrow(gop int) {
if l := av.Size - gop - 5; l > 5 {
// av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-l), zap.String("name", av.Name))
if l := av.Size - gop; l > 12 {
// av.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
//缩小缓冲环节省内存
av.Reduce(l).Do(func(v AVFrame) {
av.Reduce(5).Do(func(v AVFrame) {
v.Reset()
})
}
@@ -239,6 +242,33 @@ func (av *Media) Flush() {
curValue.Timestamp -= av.deltaTs
}
}
if av.起始时间.IsZero() {
curValue.DeltaTime = 0
if useDts {
curValue.Timestamp = time.Since(av.Stream.GetStartTime())
}
av.重置(curValue.Timestamp, curValue.DTS)
} else {
if useDts {
deltaDts := curValue.DTS - preValue.DTS
if deltaDts <= 0 {
// 生成一个无奈的deltaDts
deltaDts = 90
// 必须保证DTS递增
curValue.DTS = preValue.DTS + deltaDts
} else if deltaDts != 90 {
// 正常情况下生成容错范围
av.deltaDTSRange = deltaDts * 2
}
curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
}
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
}
if config.Global.PrintTs {
fmt.Println(av.Name, curValue.DTS, av.deltaDTSRange, curValue.DTS-preValue.DTS, curValue.Timestamp, curValue.DeltaTime)
}
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime {
av.ShiftIDR()
@@ -254,35 +284,6 @@ func (av *Media) Flush() {
// }
}
if av.起始时间.IsZero() {
curValue.DeltaTime = 0
if useDts {
curValue.Timestamp = time.Since(av.Stream.GetStartTime())
}
av.重置(curValue.Timestamp, curValue.DTS)
} else {
if useDts {
dts := curValue.DTS
if dts < av.起始dts {
dts += 0xFFFFFFFFF
}
if av.deltaDTSRange > 0 {
if dts < preValue.DTS {
dts = preValue.DTS + av.deltaDTSRange/2
curValue.DTS = dts
} else if curValue.DTS-preValue.DTS > av.deltaDTSRange {
dts = preValue.DTS + av.deltaDTSRange/2
curValue.DTS = dts
}
}
av.deltaDTSRange = (curValue.DTS - preValue.DTS) * 2
curValue.Timestamp = av.根据起始DTS计算绝对时间戳(dts)
}
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
}
if config.Global.PrintTs {
fmt.Println(av.Name, curValue.DTS, curValue.DTS-preValue.DTS, curValue.Timestamp, curValue.DeltaTime)
}
if curValue.AUList.Length > 0 {
// 补完RTP
if config.Global.EnableRTP && curValue.RTP.Length == 0 {

View File

@@ -19,6 +19,7 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) {
av.Value.RTP.PushValue(frame)
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.SequenceNumber
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
if len(p.Payload) > 0 {
av.WriteRTPFrame(&frame)
}
@@ -28,6 +29,7 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) {
func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
av.Value.BytesIn += len(frame.Value.Payload) + 12
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
if len(frame.Value.Payload) > 0 {
av.Value.RTP.Push(frame)
av.WriteRTPFrame(&frame.Value)
@@ -42,9 +44,9 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
// https://www.cnblogs.com/moonwalk/p/15903760.html
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
func (av *Media) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads)
for i, pp := range payloads {
rtpItem := av.GetRTPFromPool()
var rtpItem *util.ListItem[RTPFrame]
for _, pp := range payloads {
rtpItem = av.GetRTPFromPool()
packet := &rtpItem.Value
packet.Payload = packet.Payload[:0]
if av.SampleRate != 90000 {
@@ -52,12 +54,14 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) {
} else {
packet.Timestamp = uint32(av.Value.PTS)
}
packet.Marker = i == packetCount-1
packet.Marker = false
for _, p := range pp {
packet.Payload = append(packet.Payload, p...)
}
av.Value.RTP.Push(rtpItem)
}
// 最后一个rtp包标记为true
rtpItem.Value.Marker = true
}
type RTPDemuxer struct {