diff --git a/common/frame.go b/common/frame.go index 38a0b13..69a9adb 100644 --- a/common/frame.go +++ b/common/frame.go @@ -101,11 +101,11 @@ func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { } type BaseFrame struct { - DeltaTime uint32 // 相对上一帧时间戳,毫秒 - AbsTime uint32 // 绝对时间戳,毫秒 - Timestamp time.Time // 写入时间,可用于比较两个帧的先后 - SeqInTrack uint32 // 在一个Track中的序号 - BytesIn int // 输入字节数用于计算BPS + DeltaTime uint32 // 相对上一帧时间戳,毫秒 + AbsTime uint32 // 绝对时间戳,毫秒 + Timestamp time.Time // 写入时间,可用于比较两个帧的先后 + Sequence uint32 // 在一个Track中的序号 + BytesIn int // 输入字节数用于计算BPS } type DataFrame[T any] struct { diff --git a/common/ring_av.go b/common/ring_av.go index 9d62af2..1165bd4 100644 --- a/common/ring_av.go +++ b/common/ring_av.go @@ -14,7 +14,7 @@ type AVRing[T RawSlice] struct { func (r *AVRing[T]) Step() *AVFrame[T] { last := &r.Value current := r.MoveNext() - current.SeqInTrack = r.MoveCount + current.Sequence = r.MoveCount current.canRead = false current.Reset() last.canRead = true diff --git a/common/stream.go b/common/stream.go index ae6e0e0..68b2d8f 100644 --- a/common/stream.go +++ b/common/stream.go @@ -10,5 +10,5 @@ type IStream interface { IsClosed() bool SSRC() uint32 log.Zap - Receive(any) + Receive(any) bool } diff --git a/go.mod b/go.mod index 7e32b55..358f1f9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/cnotch/ipchub v1.1.0 github.com/google/uuid v1.3.0 github.com/logrusorgru/aurora v2.0.3+incompatible - github.com/mattn/go-colorable v0.1.8 github.com/pion/rtp v1.7.4 github.com/q191201771/naza v0.19.1 go.uber.org/zap v1.21.0 @@ -15,13 +14,14 @@ require ( ) require ( + github.com/kr/pretty v0.3.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef - github.com/mattn/go-isatty v0.0.12 // indirect github.com/pion/randutil v0.1.0 // indirect - golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect ) diff --git a/go.sum b/go.sum index ec72e22..64c18a8 100644 --- a/go.sum +++ b/go.sum @@ -24,17 +24,13 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= -github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= -github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -50,6 +46,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -84,13 +81,9 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -103,15 +96,15 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/io.go b/io.go index 2ab07e4..debeca0 100644 --- a/io.go +++ b/io.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Monibuca/engine/v4/config" + "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" ) @@ -19,11 +20,11 @@ type ClientConfig interface { config.Pull | config.Push } -type IO[C IOConfig] struct { - ID string - Type string - context.Context - context.CancelFunc +type IO[C IOConfig, S IIO] struct { + ID string + Type string + context.Context //不要直接设置,应当通过OnEvent传入父级Context + context.CancelFunc //流关闭是关闭发布者或者订阅者 *zap.Logger StartTime time.Time //创建时间 Stream *Stream `json:"-"` @@ -34,14 +35,16 @@ type IO[C IOConfig] struct { Config *C } -func (io *IO[C]) IsClosed() bool { +func (io *IO[C, S]) IsClosed() bool { return io.Err() != nil } -func (io *IO[C]) OnEvent(event any) any { +func (io *IO[C, S]) OnEvent(event any) { switch v := event.(type) { case context.Context: + //传入父级Context,如果不传入将使用Engine的Context io.Context, io.CancelFunc = context.WithCancel(v) case *Stream: + io.Stream = v io.StartTime = time.Now() io.Logger = v.With(zap.String("type", io.Type)) if io.ID != "" { @@ -55,33 +58,29 @@ func (io *IO[C]) OnEvent(event any) any { io.CancelFunc() } } - return event } -func (io *IO[C]) getID() string { +func (io *IO[C, S]) getID() string { return io.ID } -func (io *IO[C]) getType() string { +func (io *IO[C, S]) getType() string { return io.Type } type IIO interface { IsClosed() bool - OnEvent(any) any + OnEvent(any) getID() string getType() string } -func (io *IO[C]) bye(specific any) { +func (io *IO[C, S]) Bye() { if io.CancelFunc != nil { io.CancelFunc() } - if io.Stream != nil { - io.Stream.Receive(specific) - } } // receive 用于接收发布或者订阅 -func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { +func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool { Streams.Lock() defer Streams.Unlock() streamPath = strings.Trim(streamPath, "/") @@ -91,7 +90,7 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { return false } io.Args = u.Query() - wt := time.Second*5 + wt := time.Second * 5 var c any = conf if v, ok := c.(*config.Subscribe); ok { wt = v.WaitTimeout.Duration() @@ -99,35 +98,34 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { if io.Context == nil { io.Context, io.CancelFunc = context.WithCancel(Engine) } - s, created := findOrCreateStream(u.Path, wt) + s, _ := findOrCreateStream(u.Path, wt) if s.IsClosed() { return false } io.Config = conf - io.Stream = s + if io.Type == "" { + io.Type = reflect.TypeOf(specific).Elem().Name() + } if v, ok := c.(*config.Publish); ok { if s.Publisher != nil && !s.Publisher.IsClosed() { // 根据配置是否剔出原来的发布者 if v.KickExist { s.Warn("kick", zap.Any("publisher", s.Publisher)) - s.Publisher.OnEvent(SEKick{specific.(IPublisher)}) + s.Publisher.OnEvent(SEKick{}) } else { s.Warn("badName", zap.Any("publisher", s.Publisher)) return false } } - if created { - s.PublishTimeout = v.PublishTimeout.Duration() - s.WaitCloseTimeout = v.WaitCloseTimeout.Duration() - } + s.PublishTimeout = v.PublishTimeout.Duration() + s.WaitCloseTimeout = v.WaitCloseTimeout.Duration() } else { Bus.Publish(Event_REQUEST_PUBLISH, s) } - if io.Type == "" { - io.Type = reflect.TypeOf(specific).Elem().Name() + if promise := util.NewPromise[S, bool](specific); s.Receive(promise) { + return promise.Then() } - s.Receive(specific) - return true + return false } type Client[C ClientConfig] struct { diff --git a/plugin.go b/plugin.go index 7b43ac6..dd261b7 100644 --- a/plugin.go +++ b/plugin.go @@ -11,7 +11,6 @@ import ( "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/log" - "github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" "gopkg.in/yaml.v3" @@ -189,12 +188,8 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) bool { } if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok { p := pub.GetPublisher() - unA := track.UnknowAudio{} - unA.Stream = p.Stream - p.AudioTrack = &unA - unV := track.UnknowVideo{} - unV.Stream = p.Stream - p.VideoTrack = &unV + p.AudioTrack = p.Stream.NewAudioTrack() + p.VideoTrack = p.Stream.NewVideoTrack() } return ok } @@ -204,9 +199,5 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool { if !ok { conf = EngineConfig } - if ok = sub.receive(streamPath, sub, conf.GetSubscribeConfig()); ok { - p := sub.GetSubscriber() - p.TrackPlayer.Context, p.TrackPlayer.CancelFunc = context.WithCancel(p.IO) - } - return ok + return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) } diff --git a/publisher.go b/publisher.go index 3a5a0b7..5cf3d4e 100644 --- a/publisher.go +++ b/publisher.go @@ -8,12 +8,11 @@ import ( type IPublisher interface { IIO GetPublisher() *Publisher - receive(string, any, *config.Publish) bool - Unpublish() + receive(string, IPublisher, *config.Publish) bool } type Publisher struct { - IO[config.Publish] + IO[config.Publish, IPublisher] common.AudioTrack common.VideoTrack } @@ -22,10 +21,6 @@ func (p *Publisher) GetPublisher() *Publisher { return p } -func (p *Publisher) Unpublish() { - p.bye(p) -} - type PullEvent int // 用于远程拉流的发布者 diff --git a/stream.go b/stream.go index 2d09e8a..d6e9c56 100644 --- a/stream.go +++ b/stream.go @@ -41,15 +41,14 @@ type SEclose struct { } type SEKick struct { - Publisher IPublisher } +// 四状态机 const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 STATE_PUBLISHING // 正在发布流状态 STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_CLOSED // 流已关闭,不可使用 - STATE_DESTROYED // 资源已释放 ) const ( @@ -59,12 +58,11 @@ const ( ACTION_CLOSE // 主动关闭流 ACTION_LASTLEAVE // 最后一个订阅者离开 ACTION_FIRSTENTER // 第一个订阅者进入 - ACTION_NOTRACKS // 轨道为空了 ) -var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴", "❌"} +var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"} var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"} -var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ +var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ { ACTION_PUBLISH: STATE_PUBLISHING, ACTION_TIMEOUT: STATE_CLOSED, @@ -73,7 +71,6 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ }, { ACTION_PUBLISHLOST: STATE_WAITPUBLISH, - ACTION_NOTRACKS: STATE_WAITPUBLISH, ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_CLOSE: STATE_CLOSED, }, @@ -83,9 +80,6 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ ACTION_FIRSTENTER: STATE_PUBLISHING, ACTION_CLOSE: STATE_CLOSED, }, - { - ACTION_TIMEOUT: STATE_DESTROYED, - }, {}, } @@ -111,15 +105,15 @@ type StreamTimeoutConfig struct { // Stream 流定义 type Stream struct { + timeout *time.Timer //当前状态的超时定时器 + actionChan util.SafeChan[any] *zap.Logger StartTime time.Time //创建时间 StreamTimeoutConfig Path string Publisher IPublisher State StreamState - timeout *time.Timer //当前状态的超时定时器 - actionChan chan any - Subscribers util.Slice[ISubscriber] // 订阅者 + Subscribers []ISubscriber // 订阅者 Tracks map[string]Track AppName string StreamName string @@ -149,7 +143,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream s.Info("created") s.WaitTimeout = waitTimeout Streams.Map[streamPath] = s - s.actionChan = make(chan any, 1) + s.actionChan.Init(1) s.timeout = time.NewTimer(waitTimeout) s.Tracks = make(map[string]Track) go s.run() @@ -162,12 +156,14 @@ func (r *Stream) broadcast(event any) { } } func (r *Stream) action(action StreamAction) (ok bool) { - event := StateEvent{From: r.State, Action: action} - if r.State, ok = event.Next(); ok { + event := StateEvent{action, r.State} + var next StreamState + if next, ok = event.Next(); ok { + r.State = next // 给Publisher状态变更的回调,方便进行远程拉流等操作 var stateEvent any - r.Debug(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[r.State]), zap.String("action", ActionNames[action])) - switch r.State { + r.Debug(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[next]), zap.String("action", ActionNames[action])) + switch next { case STATE_WAITPUBLISH: stateEvent = SEwaitPublish{event, r.Publisher} Bus.Publish(Event_REQUEST_PUBLISH, r) @@ -189,14 +185,15 @@ func (r *Stream) action(action StreamAction) (ok bool) { stateEvent = SEwaitClose{event} r.timeout.Reset(r.WaitCloseTimeout) case STATE_CLOSED: + for !r.actionChan.Close() { + // 等待channel发送完毕 + time.Sleep(time.Millisecond * 100) + } stateEvent = SEclose{event} r.broadcast(stateEvent) - r.Subscribers.Reset() + r.Subscribers = nil Bus.Publish(Event_STREAMCLOSE, r) Streams.Delete(r.Path) - r.timeout.Reset(time.Second) // 延迟1秒钟销毁,防止访问到已关闭的channel - case STATE_DESTROYED: - close(r.actionChan) fallthrough default: r.timeout.Stop() @@ -204,6 +201,8 @@ func (r *Stream) action(action StreamAction) (ok bool) { if r.Publisher != nil { r.Publisher.OnEvent(stateEvent) } + } else { + r.Debug("wrong action", zap.String("action", ActionNames[action])) } return } @@ -211,17 +210,15 @@ func (r *Stream) IsClosed() bool { if r == nil { return true } - return r.State >= STATE_CLOSED + return r.State == STATE_CLOSED } func (s *Stream) Close() { s.Receive(ACTION_CLOSE) } -func (s *Stream) Receive(event any) { - if !s.IsClosed() { - s.actionChan <- event - } +func (s *Stream) Receive(event any) bool { + return s.actionChan.Send(event) } // 流状态处理中枢,包括接收订阅发布指令等 @@ -235,11 +232,25 @@ func (s *Stream) run() { if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) delete(s.Tracks, name) - s.broadcast(TrackRemoved(t)) + s.broadcast(TrackRemoved{t}) + } + } + deletes := 0 + for i, sub := range s.Subscribers { + if sub.IsClosed() { + s.Subscribers = append(s.Subscribers[:(i-deletes)], s.Subscribers[i-deletes+1:]...) + Bus.Publish(Event_UNSUBSCRIBE, sub) + s.Info("suber -1", zap.String("id", sub.getID()), zap.String("type", sub.getType()), zap.Int("remains", len(s.Subscribers))) + if s.Publisher != nil { + s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量 + } + if len(s.Subscribers) == 0 && s.WaitCloseTimeout > 0 { + s.action(ACTION_LASTLEAVE) + } } } if len(s.Tracks) == 0 { - s.action(ACTION_NOTRACKS) + s.action(ACTION_PUBLISHLOST) } else { s.timeout.Reset(time.Second * 5) } @@ -247,63 +258,70 @@ func (s *Stream) run() { s.Debug("timeout", zap.String("state", StateNames[s.State])) s.action(ACTION_TIMEOUT) } - case action, ok := <-s.actionChan: + case action, ok := <-s.actionChan.C: if ok { switch v := action.(type) { - case IPublisher: - if v.IsClosed() { - s.action(ACTION_PUBLISHLOST) - } else if s.action(ACTION_PUBLISH) { - s.Publisher = v - v.OnEvent(s) // 通知Publisher已成功进入Stream + case *util.Promise[IPublisher, bool]: + s.Publisher = v.Value + if s.action(ACTION_PUBLISH) { + s.Publisher.OnEvent(s) // 通知Publisher已成功进入Stream + v.Resolve(true) + } else { + s.Publisher = nil + v.Resolve(false) + } + case *util.Promise[ISubscriber, bool]: + if s.IsClosed() { + v.Resolve(false) + } + suber := v.Value + s.Subscribers = append(s.Subscribers, suber) + sbConfig := suber.GetSubscribeConfig() + if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout { + s.WaitTimeout = wt + } + suber.OnEvent(s) // 通知Subscriber已成功进入Stream + Bus.Publish(Event_SUBSCRIBE, v) + s.Info("suber +1", zap.String("id", suber.getID()), zap.String("type", suber.getType()), zap.Int("remains", len(s.Subscribers))) + v.Resolve(true) + if s.Publisher != nil { + s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 + for _, t := range s.Tracks { + switch t.(type) { + case *track.Audio: + if !sbConfig.SubAudio { + continue + } + case *track.Video: + if !sbConfig.SubVideo { + continue + } + } + suber.OnEvent(t) // 把现有的Track发给订阅者 + } + } + if len(s.Subscribers) == 1 { + s.action(ACTION_FIRSTENTER) } case Track: name := v.GetName() if _, ok := s.Tracks[name]; !ok { s.Tracks[name] = v - s.Info("TrackAdd", zap.String("name", name)) - for _, sub := range s.Subscribers { - sub.OnEvent(v) // 通知Subscriber有新Track可用了 - } + s.Info("track +1", zap.String("name", name)) + s.broadcast(v) } case TrackRemoved: name := v.GetName() if _, ok := s.Tracks[name]; ok { + s.Info("track -1", zap.String("name", name)) delete(s.Tracks, name) - for _, sub := range s.Subscribers { - sub.OnEvent(v) // 通知Subscriber Track已被移除 - } + s.broadcast(v) if len(s.Tracks) == 0 { - s.action(ACTION_NOTRACKS) + s.action(ACTION_PUBLISHLOST) } } case StreamAction: s.action(v) - case ISubscriber: - if !v.IsClosed() { - s.Subscribers.Add(v) - if wt := v.GetSubscribeConfig().WaitTimeout.Duration(); wt > s.WaitTimeout { - s.WaitTimeout = wt - } - v.OnEvent(s) // 通知Subscriber已成功进入Stream - Bus.Publish(Event_SUBSCRIBE, v) - s.Info("suber added", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers))) - if s.Publisher != nil { - s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 - } - if s.Subscribers.Len() == 1 { - s.action(ACTION_FIRSTENTER) - } - } else if s.Subscribers.Delete(v) { - Bus.Publish(Event_UNSUBSCRIBE, v) - s.Info("suber removed", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers))) - if s.Publisher != nil { - s.Publisher.OnEvent(v) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量 - } - if s.Subscribers.Len() == 0 && s.WaitCloseTimeout > 0 { - s.action(ACTION_LASTLEAVE) - } - } } } else { return @@ -318,21 +336,21 @@ func (s *Stream) AddTrack(t Track) { s.Receive(t) } -type TrackRemoved Track +type TrackRemoved struct { + Track +} func (s *Stream) RemoveTrack(t Track) { - s.Receive(TrackRemoved(t)) + s.Receive(TrackRemoved{t}) } // 如果暂时不知道编码格式可以用这个 func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) { - r.Debug("create unknow video track") vt = &track.UnknowVideo{} vt.Stream = r return } func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) { - r.Debug("create unknow audio track") at = &track.UnknowAudio{} at.Stream = r return diff --git a/subscriber.go b/subscriber.go index cb69bf3..8e9e4d6 100644 --- a/subscriber.go +++ b/subscriber.go @@ -7,31 +7,34 @@ import ( . "github.com/Monibuca/engine/v4/common" "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/track" + "go.uber.org/zap" ) -type AudioFrame AVFrame[AudioSlice] -type VideoFrame AVFrame[NALUSlice] +type AudioFrame *AVFrame[AudioSlice] +type VideoFrame *AVFrame[NALUSlice] +type AudioDeConf DecoderConfiguration[AudioSlice] +type VideoDeConf DecoderConfiguration[NALUSlice] type ISubscriber interface { IIO - receive(string, any, *config.Subscribe) bool + receive(string, ISubscriber, *config.Subscribe) bool config.SubscribeConfig GetSubscriber() *Subscriber - Unsubscribe() + IsPlaying() bool + Play(ISubscriber) + Stop() } type TrackPlayer struct { context.Context context.CancelFunc - AudioTrack *track.Audio - VideoTrack *track.Video - vr *AVRing[NALUSlice] - ar *AVRing[AudioSlice] - startTime time.Time //读到第一个关键帧的时间 - firstIFrame *VideoFrame //起始关键帧 + AudioTrack *track.Audio + VideoTrack *track.Video + vr *AVRing[NALUSlice] + ar *AVRing[AudioSlice] } // Subscriber 订阅者实体定义 type Subscriber struct { - IO[config.Subscribe] + IO[config.Subscribe, ISubscriber] TrackPlayer } @@ -39,25 +42,23 @@ func (p *Subscriber) GetSubscriber() *Subscriber { return p } -func (p *Subscriber) Unsubscribe() { - p.bye(p) -} - func (s *Subscriber) GetSubscribeConfig() *config.Subscribe { return s.Config } -func (s *Subscriber) OnEvent(event any) any { - s.IO.OnEvent(event) +func (s *Subscriber) OnEvent(event any) { switch v := event.(type) { case TrackRemoved: - if a, ok := v.(*track.Audio); ok && a == s.AudioTrack { + if a, ok := v.Track.(*track.Audio); ok && a == s.AudioTrack { s.ar = nil - } else if v, ok := v.(*track.Video); ok && v == s.VideoTrack { + } else if v, ok := v.Track.(*track.Video); ok && v == s.VideoTrack { s.vr = nil } + case Track: //默认接受所有track + s.AddTrack(v) + default: + s.IO.OnEvent(event) } - return event } func (s *Subscriber) AddTrack(t Track) bool { @@ -68,7 +69,7 @@ func (s *Subscriber) AddTrack(t Track) bool { } s.VideoTrack = v s.vr = v.ReadRing() - s.firstIFrame = (*VideoFrame)(s.vr.Read(s.TrackPlayer)) + s.Info("track+1", zap.String("name", v.Name)) return true } } else if a, ok := t.(*track.Audio); ok { @@ -78,6 +79,7 @@ func (s *Subscriber) AddTrack(t Track) bool { } s.AudioTrack = a s.ar = a.ReadRing() + s.Info("track+1", zap.String("name", a.Name)) return true } } @@ -86,47 +88,81 @@ func (s *Subscriber) AddTrack(t Track) bool { } func (s *Subscriber) IsPlaying() bool { - return s.TrackPlayer.Err() == nil && (s.AudioTrack != nil || s.VideoTrack != nil) + return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil +} + +func (s *Subscriber) Stop() { + if s.IsPlaying() { + s.TrackPlayer.CancelFunc() + } } //Play 开始播放 -func (s *Subscriber) Play() { +func (s *Subscriber) Play(spesic ISubscriber) { + s.Info("play") var t time.Time - for s.TrackPlayer.Err() == nil { + var startTime time.Time //读到第一个关键帧的时间 + var firstIFrame VideoFrame //起始关键帧 + var audioSent bool //音频是否发送过 + s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) + ctx := s.TrackPlayer.Context + defer s.Info("stop") + for ctx.Err() == nil { if s.vr != nil { + if startTime.IsZero() { + startTime = time.Now() + firstIFrame = (VideoFrame)(s.vr.Read(ctx)) + s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence)) + if ctx.Err() != nil { + return + } + spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration)) + } for { + var vp VideoFrame // 如果进入正常模式 - if s.firstIFrame == nil { - vp := s.vr.Read(s.TrackPlayer) - s.OnEvent((*VideoFrame)(vp)) + if firstIFrame == nil { + vp = VideoFrame(s.vr.Read(ctx)) + if ctx.Err() != nil { + return + } + spesic.OnEvent(vp) s.vr.MoveNext() - if vp.Timestamp.After(t) { - t = vp.Timestamp - break - } } else { - if s.startTime.IsZero() { - s.startTime = time.Now() - } - if &s.VideoTrack.IDRing.Value != (*AVFrame[NALUSlice])(s.firstIFrame) { - s.firstIFrame = nil + if s.VideoTrack.IDRing.Value.Sequence != firstIFrame.Sequence { + firstIFrame = nil s.vr = s.VideoTrack.ReadRing() + s.Debug("skip to latest key frame") + continue } else { - vp := s.vr.Read(s.TrackPlayer) - s.OnEvent((*VideoFrame)(vp)) - fast := time.Duration(vp.AbsTime-s.firstIFrame.AbsTime)*time.Millisecond - time.Since(s.startTime) - if fast > 0 { + vp = VideoFrame(s.vr.Read(ctx)) + if ctx.Err() != nil { + return + } + spesic.OnEvent(vp) + if fast := time.Duration(vp.AbsTime-firstIFrame.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 { time.Sleep(fast) } s.vr.MoveNext() } } + if vp.Timestamp.After(t) { + t = vp.Timestamp + break + } } } - if s.ar != nil { + if s.ar != nil && firstIFrame == nil { + if !audioSent { + spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration)) + audioSent = true + } for { - ap := s.ar.Read(s.TrackPlayer) - s.OnEvent((*AudioFrame)(ap)) + ap := AudioFrame(s.ar.Read(ctx)) + if ctx.Err() != nil { + return + } + spesic.OnEvent(ap) s.ar.MoveNext() if ap.Timestamp.After(t) { t = ap.Timestamp @@ -135,7 +171,6 @@ func (s *Subscriber) Play() { } } } - return } type PushEvent int diff --git a/track/audio.go b/track/audio.go index 9c9f8e9..98f32cc 100644 --- a/track/audio.go +++ b/track/audio.go @@ -19,6 +19,10 @@ type Audio struct { avccHead []byte } +func (a *Audio) IsAAC() bool { + return a.CodecID == codec.CodecID_AAC +} + func (a *Audio) Attach() { a.Stream.AddTrack(a) } diff --git a/track/base.go b/track/base.go index 210ab07..93b008e 100644 --- a/track/base.go +++ b/track/base.go @@ -160,7 +160,7 @@ func (av *Media[T]) Flush() { av.firstTimestamp = time.Now() } else { av.Value.DeltaTime = (av.Value.DTS - preValue.DTS) / 90 - av.Value.AbsTime += av.Value.DeltaTime + av.Value.AbsTime = preValue.AbsTime + av.Value.DeltaTime } av.Base.Flush(&av.Value.BaseFrame) // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep diff --git a/track/video.go b/track/video.go index 0f4891d..97c5d69 100644 --- a/track/video.go +++ b/track/video.go @@ -8,6 +8,7 @@ import ( . "github.com/Monibuca/engine/v4/common" "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" + . "github.com/logrusorgru/aurora" "go.uber.org/zap" ) @@ -38,10 +39,10 @@ func (t *Video) GetName() string { func (t *Video) ComputeGOP() { t.idrCount++ if t.IDRing != nil { - t.GOP = int(t.Value.SeqInTrack - t.IDRing.Value.SeqInTrack) + t.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence) if l := t.Size - t.GOP - 5; l > 5 { t.Size -= l - t.Stream.Debug("resize", zap.String("name", t.Name), zap.Int("after", t.Size+l), zap.Int("before", t.Size)) + t.Stream.Debug(Sprintf("resize(%d%s%d)", t.Size+l, Blink("→"), t.Size), zap.String("name", t.Name)) //缩小缓冲环节省内存 t.Unlink(l).Do(func(v AVFrame[NALUSlice]) { if v.IFrame { diff --git a/util/big_endian.go b/util/big_endian.go index 77ca3eb..26d6da1 100644 --- a/util/big_endian.go +++ b/util/big_endian.go @@ -1,15 +1,17 @@ package util -import "constraints" +type Integer interface { + ~int | ~int16 | ~int32 | ~int64 | ~uint | ~uint16 | ~uint32 | ~uint64 +} -func PutBE[T constraints.Integer](b []byte, num T) []byte { +func PutBE[T Integer](b []byte, num T) []byte { for i, n := 0, len(b); i < n; i++ { b[i] = byte(num >> ((n - i - 1) << 3)) } return b } -func ReadBE[T constraints.Integer](b []byte) (num T) { +func ReadBE[T Integer](b []byte) (num T) { num = 0 for i, n := 0, len(b); i < n; i++ { num += T(b[i]) << ((n - i - 1) << 3) @@ -17,7 +19,7 @@ func ReadBE[T constraints.Integer](b []byte) (num T) { return } -func GetBE[T constraints.Integer](b []byte, num *T) T { +func GetBE[T Integer](b []byte, num *T) T { *num = 0 for i, n := 0, len(b); i < n; i++ { *num += T(b[i]) << ((n - i - 1) << 3) diff --git a/util/index.go b/util/index.go index d5c9570..e213b75 100644 --- a/util/index.go +++ b/util/index.go @@ -1,7 +1,6 @@ package util import ( - "constraints" "os" "path/filepath" "runtime" @@ -26,7 +25,7 @@ func Exist(filename string) bool { return err == nil || os.IsExist(err) } -func ConvertNum[F constraints.Integer, T constraints.Integer](from F, to T) T { +func ConvertNum[F Integer, T Integer](from F, to T) T { return T(from) } diff --git a/util/safe_chan.go b/util/safe_chan.go new file mode 100644 index 0000000..1ade54f --- /dev/null +++ b/util/safe_chan.go @@ -0,0 +1,68 @@ +package util + +import ( + "math" + "sync/atomic" +) + +// SafeChan安全的channel,可以防止close后被写入的问题 +type SafeChan[T any] struct { + C chan T + senders int32 //当前发送者数量 +} + + +func (sc *SafeChan[T]) Init(n int) { + sc.C = make(chan T, n) +} + +// Close senders为0的时候可以安全关闭,否则不能关闭 +func (sc *SafeChan[T]) Close() bool { + if atomic.CompareAndSwapInt32(&sc.senders, 0, math.MinInt32) { + close(sc.C) + return true + } + return false +} + +func (sc *SafeChan[T]) Send(v T) bool { + // senders增加后为正数说明没有被channel没有被关闭,可以发送数据 + if atomic.AddInt32(&sc.senders, 1) > 0 { + sc.C <- v + atomic.AddInt32(&sc.senders, -1) + return true + } + return false +} + +func (sc *SafeChan[T]) IsClosed() bool { + return atomic.LoadInt32(&sc.senders) < 0 +} + +func (sc *SafeChan[T]) IsEmpty() bool { + return atomic.LoadInt32(&sc.senders) == 0 +} + +func (sc *SafeChan[T]) IsFull() bool { + return atomic.LoadInt32(&sc.senders) > 0 +} + +type Promise[S any, R any] struct { + Value S + c chan R +} + +func (r *Promise[S, R]) Resolve(result R) { + r.c <- result +} + +func (r *Promise[S, R]) Then() R { + return <-r.c +} + +func NewPromise[S any, R any](value S) *Promise[S, R] { + return &Promise[S, R]{ + Value: value, + c: make(chan R, 1), + } +}