优化代码适配RC1

This commit is contained in:
langhuihui
2022-02-19 21:15:10 +08:00
parent c0bd09e249
commit 24ed1b337c
16 changed files with 298 additions and 194 deletions

View File

@@ -104,7 +104,7 @@ type BaseFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒 DeltaTime uint32 // 相对上一帧时间戳,毫秒
AbsTime uint32 // 绝对时间戳,毫秒 AbsTime uint32 // 绝对时间戳,毫秒
Timestamp time.Time // 写入时间,可用于比较两个帧的先后 Timestamp time.Time // 写入时间,可用于比较两个帧的先后
SeqInTrack uint32 // 在一个Track中的序号 Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS BytesIn int // 输入字节数用于计算BPS
} }

View File

@@ -14,7 +14,7 @@ type AVRing[T RawSlice] struct {
func (r *AVRing[T]) Step() *AVFrame[T] { func (r *AVRing[T]) Step() *AVFrame[T] {
last := &r.Value last := &r.Value
current := r.MoveNext() current := r.MoveNext()
current.SeqInTrack = r.MoveCount current.Sequence = r.MoveCount
current.canRead = false current.canRead = false
current.Reset() current.Reset()
last.canRead = true last.canRead = true

View File

@@ -10,5 +10,5 @@ type IStream interface {
IsClosed() bool IsClosed() bool
SSRC() uint32 SSRC() uint32
log.Zap log.Zap
Receive(any) Receive(any) bool
} }

6
go.mod
View File

@@ -6,7 +6,6 @@ require (
github.com/cnotch/ipchub v1.1.0 github.com/cnotch/ipchub v1.1.0
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/logrusorgru/aurora v2.0.3+incompatible github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-colorable v0.1.8
github.com/pion/rtp v1.7.4 github.com/pion/rtp v1.7.4
github.com/q191201771/naza v0.19.1 github.com/q191201771/naza v0.19.1
go.uber.org/zap v1.21.0 go.uber.org/zap v1.21.0
@@ -15,13 +14,14 @@ require (
) )
require ( require (
github.com/kr/pretty v0.3.0 // indirect
go.uber.org/atomic v1.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
) )
require ( require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect
) )

17
go.sum
View File

@@ -24,17 +24,13 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@@ -50,6 +46,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY=
github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@@ -84,13 +81,9 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -103,15 +96,15 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

46
io.go
View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -19,11 +20,11 @@ type ClientConfig interface {
config.Pull | config.Push config.Pull | config.Push
} }
type IO[C IOConfig] struct { type IO[C IOConfig, S IIO] struct {
ID string ID string
Type string Type string
context.Context context.Context //不要直接设置应当通过OnEvent传入父级Context
context.CancelFunc context.CancelFunc //流关闭是关闭发布者或者订阅者
*zap.Logger *zap.Logger
StartTime time.Time //创建时间 StartTime time.Time //创建时间
Stream *Stream `json:"-"` Stream *Stream `json:"-"`
@@ -34,14 +35,16 @@ type IO[C IOConfig] struct {
Config *C Config *C
} }
func (io *IO[C]) IsClosed() bool { func (io *IO[C, S]) IsClosed() bool {
return io.Err() != nil return io.Err() != nil
} }
func (io *IO[C]) OnEvent(event any) any { func (io *IO[C, S]) OnEvent(event any) {
switch v := event.(type) { switch v := event.(type) {
case context.Context: case context.Context:
//传入父级Context如果不传入将使用Engine的Context
io.Context, io.CancelFunc = context.WithCancel(v) io.Context, io.CancelFunc = context.WithCancel(v)
case *Stream: case *Stream:
io.Stream = v
io.StartTime = time.Now() io.StartTime = time.Now()
io.Logger = v.With(zap.String("type", io.Type)) io.Logger = v.With(zap.String("type", io.Type))
if io.ID != "" { if io.ID != "" {
@@ -55,33 +58,29 @@ func (io *IO[C]) OnEvent(event any) any {
io.CancelFunc() io.CancelFunc()
} }
} }
return event
} }
func (io *IO[C]) getID() string { func (io *IO[C, S]) getID() string {
return io.ID return io.ID
} }
func (io *IO[C]) getType() string { func (io *IO[C, S]) getType() string {
return io.Type return io.Type
} }
type IIO interface { type IIO interface {
IsClosed() bool IsClosed() bool
OnEvent(any) any OnEvent(any)
getID() string getID() string
getType() string getType() string
} }
func (io *IO[C]) bye(specific any) { func (io *IO[C, S]) Bye() {
if io.CancelFunc != nil { if io.CancelFunc != nil {
io.CancelFunc() io.CancelFunc()
} }
if io.Stream != nil {
io.Stream.Receive(specific)
}
} }
// receive 用于接收发布或者订阅 // receive 用于接收发布或者订阅
func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool {
Streams.Lock() Streams.Lock()
defer Streams.Unlock() defer Streams.Unlock()
streamPath = strings.Trim(streamPath, "/") streamPath = strings.Trim(streamPath, "/")
@@ -91,7 +90,7 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool {
return false return false
} }
io.Args = u.Query() io.Args = u.Query()
wt := time.Second*5 wt := time.Second * 5
var c any = conf var c any = conf
if v, ok := c.(*config.Subscribe); ok { if v, ok := c.(*config.Subscribe); ok {
wt = v.WaitTimeout.Duration() wt = v.WaitTimeout.Duration()
@@ -99,35 +98,34 @@ func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool {
if io.Context == nil { if io.Context == nil {
io.Context, io.CancelFunc = context.WithCancel(Engine) io.Context, io.CancelFunc = context.WithCancel(Engine)
} }
s, created := findOrCreateStream(u.Path, wt) s, _ := findOrCreateStream(u.Path, wt)
if s.IsClosed() { if s.IsClosed() {
return false return false
} }
io.Config = conf io.Config = conf
io.Stream = s if io.Type == "" {
io.Type = reflect.TypeOf(specific).Elem().Name()
}
if v, ok := c.(*config.Publish); ok { if v, ok := c.(*config.Publish); ok {
if s.Publisher != nil && !s.Publisher.IsClosed() { if s.Publisher != nil && !s.Publisher.IsClosed() {
// 根据配置是否剔出原来的发布者 // 根据配置是否剔出原来的发布者
if v.KickExist { if v.KickExist {
s.Warn("kick", zap.Any("publisher", s.Publisher)) s.Warn("kick", zap.Any("publisher", s.Publisher))
s.Publisher.OnEvent(SEKick{specific.(IPublisher)}) s.Publisher.OnEvent(SEKick{})
} else { } else {
s.Warn("badName", zap.Any("publisher", s.Publisher)) s.Warn("badName", zap.Any("publisher", s.Publisher))
return false return false
} }
} }
if created {
s.PublishTimeout = v.PublishTimeout.Duration() s.PublishTimeout = v.PublishTimeout.Duration()
s.WaitCloseTimeout = v.WaitCloseTimeout.Duration() s.WaitCloseTimeout = v.WaitCloseTimeout.Duration()
}
} else { } else {
Bus.Publish(Event_REQUEST_PUBLISH, s) Bus.Publish(Event_REQUEST_PUBLISH, s)
} }
if io.Type == "" { if promise := util.NewPromise[S, bool](specific); s.Receive(promise) {
io.Type = reflect.TypeOf(specific).Elem().Name() return promise.Then()
} }
s.Receive(specific) return false
return true
} }
type Client[C ClientConfig] struct { type Client[C ClientConfig] struct {

View File

@@ -11,7 +11,6 @@ import (
"github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/log"
"github.com/Monibuca/engine/v4/track"
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
"go.uber.org/zap" "go.uber.org/zap"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@@ -189,12 +188,8 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) bool {
} }
if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok { if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok {
p := pub.GetPublisher() p := pub.GetPublisher()
unA := track.UnknowAudio{} p.AudioTrack = p.Stream.NewAudioTrack()
unA.Stream = p.Stream p.VideoTrack = p.Stream.NewVideoTrack()
p.AudioTrack = &unA
unV := track.UnknowVideo{}
unV.Stream = p.Stream
p.VideoTrack = &unV
} }
return ok return ok
} }
@@ -204,9 +199,5 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool {
if !ok { if !ok {
conf = EngineConfig conf = EngineConfig
} }
if ok = sub.receive(streamPath, sub, conf.GetSubscribeConfig()); ok { return sub.receive(streamPath, sub, conf.GetSubscribeConfig())
p := sub.GetSubscriber()
p.TrackPlayer.Context, p.TrackPlayer.CancelFunc = context.WithCancel(p.IO)
}
return ok
} }

View File

@@ -8,12 +8,11 @@ import (
type IPublisher interface { type IPublisher interface {
IIO IIO
GetPublisher() *Publisher GetPublisher() *Publisher
receive(string, any, *config.Publish) bool receive(string, IPublisher, *config.Publish) bool
Unpublish()
} }
type Publisher struct { type Publisher struct {
IO[config.Publish] IO[config.Publish, IPublisher]
common.AudioTrack common.AudioTrack
common.VideoTrack common.VideoTrack
} }
@@ -22,10 +21,6 @@ func (p *Publisher) GetPublisher() *Publisher {
return p return p
} }
func (p *Publisher) Unpublish() {
p.bye(p)
}
type PullEvent int type PullEvent int
// 用于远程拉流的发布者 // 用于远程拉流的发布者

162
stream.go
View File

@@ -41,15 +41,14 @@ type SEclose struct {
} }
type SEKick struct { type SEKick struct {
Publisher IPublisher
} }
// 四状态机
const ( const (
STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
STATE_PUBLISHING // 正在发布流状态 STATE_PUBLISHING // 正在发布流状态
STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启)
STATE_CLOSED // 流已关闭,不可使用 STATE_CLOSED // 流已关闭,不可使用
STATE_DESTROYED // 资源已释放
) )
const ( const (
@@ -59,12 +58,11 @@ const (
ACTION_CLOSE // 主动关闭流 ACTION_CLOSE // 主动关闭流
ACTION_LASTLEAVE // 最后一个订阅者离开 ACTION_LASTLEAVE // 最后一个订阅者离开
ACTION_FIRSTENTER // 第一个订阅者进入 ACTION_FIRSTENTER // 第一个订阅者进入
ACTION_NOTRACKS // 轨道为空了
) )
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴", "❌"} var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"} var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{
{ {
ACTION_PUBLISH: STATE_PUBLISHING, ACTION_PUBLISH: STATE_PUBLISHING,
ACTION_TIMEOUT: STATE_CLOSED, ACTION_TIMEOUT: STATE_CLOSED,
@@ -73,7 +71,6 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{
}, },
{ {
ACTION_PUBLISHLOST: STATE_WAITPUBLISH, ACTION_PUBLISHLOST: STATE_WAITPUBLISH,
ACTION_NOTRACKS: STATE_WAITPUBLISH,
ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_LASTLEAVE: STATE_WAITCLOSE,
ACTION_CLOSE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED,
}, },
@@ -83,9 +80,6 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{
ACTION_FIRSTENTER: STATE_PUBLISHING, ACTION_FIRSTENTER: STATE_PUBLISHING,
ACTION_CLOSE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED,
}, },
{
ACTION_TIMEOUT: STATE_DESTROYED,
},
{}, {},
} }
@@ -111,15 +105,15 @@ type StreamTimeoutConfig struct {
// Stream 流定义 // Stream 流定义
type Stream struct { type Stream struct {
timeout *time.Timer //当前状态的超时定时器
actionChan util.SafeChan[any]
*zap.Logger *zap.Logger
StartTime time.Time //创建时间 StartTime time.Time //创建时间
StreamTimeoutConfig StreamTimeoutConfig
Path string Path string
Publisher IPublisher Publisher IPublisher
State StreamState State StreamState
timeout *time.Timer //当前状态的超时定时器 Subscribers []ISubscriber // 订阅者
actionChan chan any
Subscribers util.Slice[ISubscriber] // 订阅者
Tracks map[string]Track Tracks map[string]Track
AppName string AppName string
StreamName string StreamName string
@@ -149,7 +143,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
s.Info("created") s.Info("created")
s.WaitTimeout = waitTimeout s.WaitTimeout = waitTimeout
Streams.Map[streamPath] = s Streams.Map[streamPath] = s
s.actionChan = make(chan any, 1) s.actionChan.Init(1)
s.timeout = time.NewTimer(waitTimeout) s.timeout = time.NewTimer(waitTimeout)
s.Tracks = make(map[string]Track) s.Tracks = make(map[string]Track)
go s.run() go s.run()
@@ -162,12 +156,14 @@ func (r *Stream) broadcast(event any) {
} }
} }
func (r *Stream) action(action StreamAction) (ok bool) { func (r *Stream) action(action StreamAction) (ok bool) {
event := StateEvent{From: r.State, Action: action} event := StateEvent{action, r.State}
if r.State, ok = event.Next(); ok { var next StreamState
if next, ok = event.Next(); ok {
r.State = next
// 给Publisher状态变更的回调方便进行远程拉流等操作 // 给Publisher状态变更的回调方便进行远程拉流等操作
var stateEvent any var stateEvent any
r.Debug(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[r.State]), zap.String("action", ActionNames[action])) r.Debug(Sprintf("%s%s%s", StateNames[event.From], Yellow("->"), StateNames[next]), zap.String("action", ActionNames[action]))
switch r.State { switch next {
case STATE_WAITPUBLISH: case STATE_WAITPUBLISH:
stateEvent = SEwaitPublish{event, r.Publisher} stateEvent = SEwaitPublish{event, r.Publisher}
Bus.Publish(Event_REQUEST_PUBLISH, r) Bus.Publish(Event_REQUEST_PUBLISH, r)
@@ -189,14 +185,15 @@ func (r *Stream) action(action StreamAction) (ok bool) {
stateEvent = SEwaitClose{event} stateEvent = SEwaitClose{event}
r.timeout.Reset(r.WaitCloseTimeout) r.timeout.Reset(r.WaitCloseTimeout)
case STATE_CLOSED: case STATE_CLOSED:
for !r.actionChan.Close() {
// 等待channel发送完毕
time.Sleep(time.Millisecond * 100)
}
stateEvent = SEclose{event} stateEvent = SEclose{event}
r.broadcast(stateEvent) r.broadcast(stateEvent)
r.Subscribers.Reset() r.Subscribers = nil
Bus.Publish(Event_STREAMCLOSE, r) Bus.Publish(Event_STREAMCLOSE, r)
Streams.Delete(r.Path) Streams.Delete(r.Path)
r.timeout.Reset(time.Second) // 延迟1秒钟销毁防止访问到已关闭的channel
case STATE_DESTROYED:
close(r.actionChan)
fallthrough fallthrough
default: default:
r.timeout.Stop() r.timeout.Stop()
@@ -204,6 +201,8 @@ func (r *Stream) action(action StreamAction) (ok bool) {
if r.Publisher != nil { if r.Publisher != nil {
r.Publisher.OnEvent(stateEvent) r.Publisher.OnEvent(stateEvent)
} }
} else {
r.Debug("wrong action", zap.String("action", ActionNames[action]))
} }
return return
} }
@@ -211,17 +210,15 @@ func (r *Stream) IsClosed() bool {
if r == nil { if r == nil {
return true return true
} }
return r.State >= STATE_CLOSED return r.State == STATE_CLOSED
} }
func (s *Stream) Close() { func (s *Stream) Close() {
s.Receive(ACTION_CLOSE) s.Receive(ACTION_CLOSE)
} }
func (s *Stream) Receive(event any) { func (s *Stream) Receive(event any) bool {
if !s.IsClosed() { return s.actionChan.Send(event)
s.actionChan <- event
}
} }
// 流状态处理中枢,包括接收订阅发布指令等 // 流状态处理中枢,包括接收订阅发布指令等
@@ -235,11 +232,25 @@ func (s *Stream) run() {
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
delete(s.Tracks, name) delete(s.Tracks, name)
s.broadcast(TrackRemoved(t)) s.broadcast(TrackRemoved{t})
}
}
deletes := 0
for i, sub := range s.Subscribers {
if sub.IsClosed() {
s.Subscribers = append(s.Subscribers[:(i-deletes)], s.Subscribers[i-deletes+1:]...)
Bus.Publish(Event_UNSUBSCRIBE, sub)
s.Info("suber -1", zap.String("id", sub.getID()), zap.String("type", sub.getType()), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开在回调中可以去获取订阅者数量
}
if len(s.Subscribers) == 0 && s.WaitCloseTimeout > 0 {
s.action(ACTION_LASTLEAVE)
}
} }
} }
if len(s.Tracks) == 0 { if len(s.Tracks) == 0 {
s.action(ACTION_NOTRACKS) s.action(ACTION_PUBLISHLOST)
} else { } else {
s.timeout.Reset(time.Second * 5) s.timeout.Reset(time.Second * 5)
} }
@@ -247,63 +258,70 @@ func (s *Stream) run() {
s.Debug("timeout", zap.String("state", StateNames[s.State])) s.Debug("timeout", zap.String("state", StateNames[s.State]))
s.action(ACTION_TIMEOUT) s.action(ACTION_TIMEOUT)
} }
case action, ok := <-s.actionChan: case action, ok := <-s.actionChan.C:
if ok { if ok {
switch v := action.(type) { switch v := action.(type) {
case IPublisher: case *util.Promise[IPublisher, bool]:
if v.IsClosed() { s.Publisher = v.Value
s.action(ACTION_PUBLISHLOST) if s.action(ACTION_PUBLISH) {
} else if s.action(ACTION_PUBLISH) { s.Publisher.OnEvent(s) // 通知Publisher已成功进入Stream
s.Publisher = v v.Resolve(true)
v.OnEvent(s) // 通知Publisher已成功进入Stream } else {
s.Publisher = nil
v.Resolve(false)
}
case *util.Promise[ISubscriber, bool]:
if s.IsClosed() {
v.Resolve(false)
}
suber := v.Value
s.Subscribers = append(s.Subscribers, suber)
sbConfig := suber.GetSubscribeConfig()
if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout {
s.WaitTimeout = wt
}
suber.OnEvent(s) // 通知Subscriber已成功进入Stream
Bus.Publish(Event_SUBSCRIBE, v)
s.Info("suber +1", zap.String("id", suber.getID()), zap.String("type", suber.getType()), zap.Int("remains", len(s.Subscribers)))
v.Resolve(true)
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
for _, t := range s.Tracks {
switch t.(type) {
case *track.Audio:
if !sbConfig.SubAudio {
continue
}
case *track.Video:
if !sbConfig.SubVideo {
continue
}
}
suber.OnEvent(t) // 把现有的Track发给订阅者
}
}
if len(s.Subscribers) == 1 {
s.action(ACTION_FIRSTENTER)
} }
case Track: case Track:
name := v.GetName() name := v.GetName()
if _, ok := s.Tracks[name]; !ok { if _, ok := s.Tracks[name]; !ok {
s.Tracks[name] = v s.Tracks[name] = v
s.Info("TrackAdd", zap.String("name", name)) s.Info("track +1", zap.String("name", name))
for _, sub := range s.Subscribers { s.broadcast(v)
sub.OnEvent(v) // 通知Subscriber有新Track可用了
}
} }
case TrackRemoved: case TrackRemoved:
name := v.GetName() name := v.GetName()
if _, ok := s.Tracks[name]; ok { if _, ok := s.Tracks[name]; ok {
s.Info("track -1", zap.String("name", name))
delete(s.Tracks, name) delete(s.Tracks, name)
for _, sub := range s.Subscribers { s.broadcast(v)
sub.OnEvent(v) // 通知Subscriber Track已被移除
}
if len(s.Tracks) == 0 { if len(s.Tracks) == 0 {
s.action(ACTION_NOTRACKS) s.action(ACTION_PUBLISHLOST)
} }
} }
case StreamAction: case StreamAction:
s.action(v) s.action(v)
case ISubscriber:
if !v.IsClosed() {
s.Subscribers.Add(v)
if wt := v.GetSubscribeConfig().WaitTimeout.Duration(); wt > s.WaitTimeout {
s.WaitTimeout = wt
}
v.OnEvent(s) // 通知Subscriber已成功进入Stream
Bus.Publish(Event_SUBSCRIBE, v)
s.Info("suber added", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
}
if s.Subscribers.Len() == 1 {
s.action(ACTION_FIRSTENTER)
}
} else if s.Subscribers.Delete(v) {
Bus.Publish(Event_UNSUBSCRIBE, v)
s.Info("suber removed", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有订阅者离开在回调中可以去获取订阅者数量
}
if s.Subscribers.Len() == 0 && s.WaitCloseTimeout > 0 {
s.action(ACTION_LASTLEAVE)
}
}
} }
} else { } else {
return return
@@ -318,21 +336,21 @@ func (s *Stream) AddTrack(t Track) {
s.Receive(t) s.Receive(t)
} }
type TrackRemoved Track type TrackRemoved struct {
Track
}
func (s *Stream) RemoveTrack(t Track) { func (s *Stream) RemoveTrack(t Track) {
s.Receive(TrackRemoved(t)) s.Receive(TrackRemoved{t})
} }
// 如果暂时不知道编码格式可以用这个 // 如果暂时不知道编码格式可以用这个
func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) { func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) {
r.Debug("create unknow video track")
vt = &track.UnknowVideo{} vt = &track.UnknowVideo{}
vt.Stream = r vt.Stream = r
return return
} }
func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) { func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) {
r.Debug("create unknow audio track")
at = &track.UnknowAudio{} at = &track.UnknowAudio{}
at.Stream = r at.Stream = r
return return

View File

@@ -7,16 +7,21 @@ import (
. "github.com/Monibuca/engine/v4/common" . "github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/track"
"go.uber.org/zap"
) )
type AudioFrame AVFrame[AudioSlice] type AudioFrame *AVFrame[AudioSlice]
type VideoFrame AVFrame[NALUSlice] type VideoFrame *AVFrame[NALUSlice]
type AudioDeConf DecoderConfiguration[AudioSlice]
type VideoDeConf DecoderConfiguration[NALUSlice]
type ISubscriber interface { type ISubscriber interface {
IIO IIO
receive(string, any, *config.Subscribe) bool receive(string, ISubscriber, *config.Subscribe) bool
config.SubscribeConfig config.SubscribeConfig
GetSubscriber() *Subscriber GetSubscriber() *Subscriber
Unsubscribe() IsPlaying() bool
Play(ISubscriber)
Stop()
} }
type TrackPlayer struct { type TrackPlayer struct {
context.Context context.Context
@@ -25,13 +30,11 @@ type TrackPlayer struct {
VideoTrack *track.Video VideoTrack *track.Video
vr *AVRing[NALUSlice] vr *AVRing[NALUSlice]
ar *AVRing[AudioSlice] ar *AVRing[AudioSlice]
startTime time.Time //读到第一个关键帧的时间
firstIFrame *VideoFrame //起始关键帧
} }
// Subscriber 订阅者实体定义 // Subscriber 订阅者实体定义
type Subscriber struct { type Subscriber struct {
IO[config.Subscribe] IO[config.Subscribe, ISubscriber]
TrackPlayer TrackPlayer
} }
@@ -39,25 +42,23 @@ func (p *Subscriber) GetSubscriber() *Subscriber {
return p return p
} }
func (p *Subscriber) Unsubscribe() {
p.bye(p)
}
func (s *Subscriber) GetSubscribeConfig() *config.Subscribe { func (s *Subscriber) GetSubscribeConfig() *config.Subscribe {
return s.Config return s.Config
} }
func (s *Subscriber) OnEvent(event any) any { func (s *Subscriber) OnEvent(event any) {
s.IO.OnEvent(event)
switch v := event.(type) { switch v := event.(type) {
case TrackRemoved: case TrackRemoved:
if a, ok := v.(*track.Audio); ok && a == s.AudioTrack { if a, ok := v.Track.(*track.Audio); ok && a == s.AudioTrack {
s.ar = nil s.ar = nil
} else if v, ok := v.(*track.Video); ok && v == s.VideoTrack { } else if v, ok := v.Track.(*track.Video); ok && v == s.VideoTrack {
s.vr = nil s.vr = nil
} }
case Track: //默认接受所有track
s.AddTrack(v)
default:
s.IO.OnEvent(event)
} }
return event
} }
func (s *Subscriber) AddTrack(t Track) bool { func (s *Subscriber) AddTrack(t Track) bool {
@@ -68,7 +69,7 @@ func (s *Subscriber) AddTrack(t Track) bool {
} }
s.VideoTrack = v s.VideoTrack = v
s.vr = v.ReadRing() s.vr = v.ReadRing()
s.firstIFrame = (*VideoFrame)(s.vr.Read(s.TrackPlayer)) s.Info("track+1", zap.String("name", v.Name))
return true return true
} }
} else if a, ok := t.(*track.Audio); ok { } else if a, ok := t.(*track.Audio); ok {
@@ -78,6 +79,7 @@ func (s *Subscriber) AddTrack(t Track) bool {
} }
s.AudioTrack = a s.AudioTrack = a
s.ar = a.ReadRing() s.ar = a.ReadRing()
s.Info("track+1", zap.String("name", a.Name))
return true return true
} }
} }
@@ -86,47 +88,81 @@ func (s *Subscriber) AddTrack(t Track) bool {
} }
func (s *Subscriber) IsPlaying() bool { func (s *Subscriber) IsPlaying() bool {
return s.TrackPlayer.Err() == nil && (s.AudioTrack != nil || s.VideoTrack != nil) return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil
}
func (s *Subscriber) Stop() {
if s.IsPlaying() {
s.TrackPlayer.CancelFunc()
}
} }
//Play 开始播放 //Play 开始播放
func (s *Subscriber) Play() { func (s *Subscriber) Play(spesic ISubscriber) {
s.Info("play")
var t time.Time var t time.Time
for s.TrackPlayer.Err() == nil { var startTime time.Time //读到第一个关键帧的时间
var firstIFrame VideoFrame //起始关键帧
var audioSent bool //音频是否发送过
s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
ctx := s.TrackPlayer.Context
defer s.Info("stop")
for ctx.Err() == nil {
if s.vr != nil { if s.vr != nil {
if startTime.IsZero() {
startTime = time.Now()
firstIFrame = (VideoFrame)(s.vr.Read(ctx))
s.Debug("firstIFrame", zap.Uint32("seq", firstIFrame.Sequence))
if ctx.Err() != nil {
return
}
spesic.OnEvent(VideoDeConf(s.VideoTrack.DecoderConfiguration))
}
for { for {
var vp VideoFrame
// 如果进入正常模式 // 如果进入正常模式
if s.firstIFrame == nil { if firstIFrame == nil {
vp := s.vr.Read(s.TrackPlayer) vp = VideoFrame(s.vr.Read(ctx))
s.OnEvent((*VideoFrame)(vp)) if ctx.Err() != nil {
return
}
spesic.OnEvent(vp)
s.vr.MoveNext() s.vr.MoveNext()
if vp.Timestamp.After(t) {
t = vp.Timestamp
break
}
} else { } else {
if s.startTime.IsZero() { if s.VideoTrack.IDRing.Value.Sequence != firstIFrame.Sequence {
s.startTime = time.Now() firstIFrame = nil
}
if &s.VideoTrack.IDRing.Value != (*AVFrame[NALUSlice])(s.firstIFrame) {
s.firstIFrame = nil
s.vr = s.VideoTrack.ReadRing() s.vr = s.VideoTrack.ReadRing()
s.Debug("skip to latest key frame")
continue
} else { } else {
vp := s.vr.Read(s.TrackPlayer) vp = VideoFrame(s.vr.Read(ctx))
s.OnEvent((*VideoFrame)(vp)) if ctx.Err() != nil {
fast := time.Duration(vp.AbsTime-s.firstIFrame.AbsTime)*time.Millisecond - time.Since(s.startTime) return
if fast > 0 { }
spesic.OnEvent(vp)
if fast := time.Duration(vp.AbsTime-firstIFrame.AbsTime)*time.Millisecond - time.Since(startTime); fast > 0 {
time.Sleep(fast) time.Sleep(fast)
} }
s.vr.MoveNext() s.vr.MoveNext()
} }
} }
if vp.Timestamp.After(t) {
t = vp.Timestamp
break
} }
} }
if s.ar != nil { }
if s.ar != nil && firstIFrame == nil {
if !audioSent {
spesic.OnEvent(AudioDeConf(s.AudioTrack.DecoderConfiguration))
audioSent = true
}
for { for {
ap := s.ar.Read(s.TrackPlayer) ap := AudioFrame(s.ar.Read(ctx))
s.OnEvent((*AudioFrame)(ap)) if ctx.Err() != nil {
return
}
spesic.OnEvent(ap)
s.ar.MoveNext() s.ar.MoveNext()
if ap.Timestamp.After(t) { if ap.Timestamp.After(t) {
t = ap.Timestamp t = ap.Timestamp
@@ -135,7 +171,6 @@ func (s *Subscriber) Play() {
} }
} }
} }
return
} }
type PushEvent int type PushEvent int

View File

@@ -19,6 +19,10 @@ type Audio struct {
avccHead []byte avccHead []byte
} }
func (a *Audio) IsAAC() bool {
return a.CodecID == codec.CodecID_AAC
}
func (a *Audio) Attach() { func (a *Audio) Attach() {
a.Stream.AddTrack(a) a.Stream.AddTrack(a)
} }

View File

@@ -160,7 +160,7 @@ func (av *Media[T]) Flush() {
av.firstTimestamp = time.Now() av.firstTimestamp = time.Now()
} else { } else {
av.Value.DeltaTime = (av.Value.DTS - preValue.DTS) / 90 av.Value.DeltaTime = (av.Value.DTS - preValue.DTS) / 90
av.Value.AbsTime += av.Value.DeltaTime av.Value.AbsTime = preValue.AbsTime + av.Value.DeltaTime
} }
av.Base.Flush(&av.Value.BaseFrame) av.Base.Flush(&av.Value.BaseFrame)
// 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下100ms作为一个弹性区间防止频繁调用sleep // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下100ms作为一个弹性区间防止频繁调用sleep

View File

@@ -8,6 +8,7 @@ import (
. "github.com/Monibuca/engine/v4/common" . "github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
. "github.com/logrusorgru/aurora"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -38,10 +39,10 @@ func (t *Video) GetName() string {
func (t *Video) ComputeGOP() { func (t *Video) ComputeGOP() {
t.idrCount++ t.idrCount++
if t.IDRing != nil { if t.IDRing != nil {
t.GOP = int(t.Value.SeqInTrack - t.IDRing.Value.SeqInTrack) t.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
if l := t.Size - t.GOP - 5; l > 5 { if l := t.Size - t.GOP - 5; l > 5 {
t.Size -= l t.Size -= l
t.Stream.Debug("resize", zap.String("name", t.Name), zap.Int("after", t.Size+l), zap.Int("before", t.Size)) t.Stream.Debug(Sprintf("resize(%d%s%d)", t.Size+l, Blink("→"), t.Size), zap.String("name", t.Name))
//缩小缓冲环节省内存 //缩小缓冲环节省内存
t.Unlink(l).Do(func(v AVFrame[NALUSlice]) { t.Unlink(l).Do(func(v AVFrame[NALUSlice]) {
if v.IFrame { if v.IFrame {

View File

@@ -1,15 +1,17 @@
package util package util
import "constraints" type Integer interface {
~int | ~int16 | ~int32 | ~int64 | ~uint | ~uint16 | ~uint32 | ~uint64
}
func PutBE[T constraints.Integer](b []byte, num T) []byte { func PutBE[T Integer](b []byte, num T) []byte {
for i, n := 0, len(b); i < n; i++ { for i, n := 0, len(b); i < n; i++ {
b[i] = byte(num >> ((n - i - 1) << 3)) b[i] = byte(num >> ((n - i - 1) << 3))
} }
return b return b
} }
func ReadBE[T constraints.Integer](b []byte) (num T) { func ReadBE[T Integer](b []byte) (num T) {
num = 0 num = 0
for i, n := 0, len(b); i < n; i++ { for i, n := 0, len(b); i < n; i++ {
num += T(b[i]) << ((n - i - 1) << 3) num += T(b[i]) << ((n - i - 1) << 3)
@@ -17,7 +19,7 @@ func ReadBE[T constraints.Integer](b []byte) (num T) {
return return
} }
func GetBE[T constraints.Integer](b []byte, num *T) T { func GetBE[T Integer](b []byte, num *T) T {
*num = 0 *num = 0
for i, n := 0, len(b); i < n; i++ { for i, n := 0, len(b); i < n; i++ {
*num += T(b[i]) << ((n - i - 1) << 3) *num += T(b[i]) << ((n - i - 1) << 3)

View File

@@ -1,7 +1,6 @@
package util package util
import ( import (
"constraints"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@@ -26,7 +25,7 @@ func Exist(filename string) bool {
return err == nil || os.IsExist(err) return err == nil || os.IsExist(err)
} }
func ConvertNum[F constraints.Integer, T constraints.Integer](from F, to T) T { func ConvertNum[F Integer, T Integer](from F, to T) T {
return T(from) return T(from)
} }

68
util/safe_chan.go Normal file
View File

@@ -0,0 +1,68 @@
package util
import (
"math"
"sync/atomic"
)
// SafeChan安全的channel可以防止close后被写入的问题
type SafeChan[T any] struct {
C chan T
senders int32 //当前发送者数量
}
func (sc *SafeChan[T]) Init(n int) {
sc.C = make(chan T, n)
}
// Close senders为0的时候可以安全关闭否则不能关闭
func (sc *SafeChan[T]) Close() bool {
if atomic.CompareAndSwapInt32(&sc.senders, 0, math.MinInt32) {
close(sc.C)
return true
}
return false
}
func (sc *SafeChan[T]) Send(v T) bool {
// senders增加后为正数说明没有被channel没有被关闭可以发送数据
if atomic.AddInt32(&sc.senders, 1) > 0 {
sc.C <- v
atomic.AddInt32(&sc.senders, -1)
return true
}
return false
}
func (sc *SafeChan[T]) IsClosed() bool {
return atomic.LoadInt32(&sc.senders) < 0
}
func (sc *SafeChan[T]) IsEmpty() bool {
return atomic.LoadInt32(&sc.senders) == 0
}
func (sc *SafeChan[T]) IsFull() bool {
return atomic.LoadInt32(&sc.senders) > 0
}
type Promise[S any, R any] struct {
Value S
c chan R
}
func (r *Promise[S, R]) Resolve(result R) {
r.c <- result
}
func (r *Promise[S, R]) Then() R {
return <-r.c
}
func NewPromise[S any, R any](value S) *Promise[S, R] {
return &Promise[S, R]{
Value: value,
c: make(chan R, 1),
}
}