增加Stream NeverTimeout属性,用于纯数据轨道的流保持不关闭

消除一处魔法数字
将TCP监听增加TLS支持
This commit is contained in:
langhuihui
2023-06-04 11:02:45 +08:00
parent fc7ac81c4e
commit f4fb7881f7
14 changed files with 210 additions and 42 deletions

View File

@@ -25,7 +25,7 @@
- 提供配置热更新机制
## 引擎自带HTTP接口
- 获取某一个流的详情 `/api/stream?streamPath=xxx`
- 终止某一个流 `/api/closeStream?streamPath=xxx`
- 终止某一个流 `/api/closestream?streamPath=xxx`
- 获取engine信息 `/api/sysInfo` 返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]}
- 获取系统基本情况 `/api/summary` 返回值Summary数据
- 获取所有插件信息 `/api/plugins` 返回值Plugin数据

View File

@@ -36,7 +36,6 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
if r.Packet == nil {
r.Packet = &rtp.Packet{}
}
r.Raw = raw
if err := r.Packet.Unmarshal(raw); err != nil {
log.Error(err)
return nil
@@ -53,7 +52,7 @@ type BaseFrame struct {
type DataFrame[T any] struct {
BaseFrame
Value T
Value T `json:"-" yaml:"-"`
}
type AVFrame struct {

View File

@@ -12,7 +12,7 @@ type LockFrame[T any] struct {
type LockRing[T any] struct {
RingBuffer[LockFrame[T]]
Reset func(*DataFrame[T])
Reset func(*DataFrame[T]) `json:"-" yaml:"-"`
Flag *int32
}

View File

@@ -24,7 +24,7 @@ type Plugin interface {
type TCPPlugin interface {
Plugin
ServeTCP(*net.TCPConn)
ServeTCP(net.Conn)
}
type HTTPPlugin interface {

View File

@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAn2zPOFL8i4EqVwoa7fzlGXXPaTxVoUthlGEuWP5jPZCRctAe
JNPxZT7vhj6PZsRi0jGJKD3KNosprSxN335KzZmm8FPxghRXEt/y7DvKQuuTXyst
eaQoTbZtdX3a2cXNC4aPbDeCCYhrthHrCo8GL07a3lV9FlKpcrmn+zep3+BXHDD4
QiG/6k4/AyZImKIi6de25riP7yIjJgENgXzQvEDRYj/Qkth+2uoMzvjb5dZvDzSF
syZ4haPdNv66UDMFVzsIQ7ZPSILVw91vdQn7yI7sCVdlQT1CgXkbXOHvyZGhVBQh
ikPEDBCQpvR8m45kXaterYtXDrW4AXb/VfbILQIDAQABAoIBAAm3jZfOwxLTsBt3
A7YuvF4FZrtw0H1QxWVQWQ1WkADwH1VipvuyqVt07w99H6txW96Y41W/EmSprcQe
165AGdoXO+wZCbbWe4oseTd523Whuy1JSe/ImCZIcLqBDcZJPpqtvG8poPToyjvi
MrPFjOh0Q3XauxGRvz89XqY6udFp1+jzt7pmUSk9AWi/CYNGWzU32iEgZZNAxhgg
T3f6HkitiyoyqQURuxMxUVdohjdavVQwDrRqwgLM4MkfIhT+1B52xNYhCVzxT5Eg
5VE533fumUULxCK0t8/aneSn4rJ/5+CU0WBDex9cVtDsXCncK9oCkw/PQZO+lzzt
LHzBytcCgYEA1tZISKL2gFb24aDaS/OcHfDkHrFRVQDHP+iWL8TMs5HHUGsM675z
JxYNTgKH4tgg19V7G1N6SfF9wluNYd+4m681DO4kHMdzQ5+RcuLM/p2T8o1hLjf6
CDTjMoSntyecouwbuqmibBzQw834+LR/0h7N12eLgV5MEjdKMLaktqcCgYEAvfiU
FAIePAwSlroJcmL1AVCOYIyoVK9Vg5hEukclDtrzy0KkSKoMsyGrM/ggPvMNL/vz
W6rJSFahoRb8XyDautZVj1oxjYrrVBOKSKLTKr8+ckfztDO6+aljxtxHdv9sRQg1
zQ+NSozgXJSW08Y464I6CPaS5GW1DGTLFAjCeQsCgYAh06WYAkjL1mWTCy+0C8yG
Dlrs1kCXIMM+tdGH/fW5RHfcmq5zJA6fleJMaSuaNSuesFds6wzzPZnuk1nEkmRP
5xt0SL7Y5TKp8CMHstxSLt+PrmEh1OCCkElBuA9sUEligciv8GvJmBPq8LCGAG2r
2PvSMdSObxmNOLVuzCNNOwKBgAo+c02454R5ai8yjPvcFjYh7+uI6jLW2ZelCF+7
ImZwrCDT0SQR92lZcW/1+1cpqBZkUbUpunzqHwEeyjEfBmx4zlhSlsV5LkN0YkqU
bSqq8WUcOCoJeBWqarT4f+oMz/vQ+4W5Rvc0LY0QfimhUMRyW0rMcRNb4K1wafsE
legNAoGADESfqKfuuL2C4+lmwaPb/7K3yirOEaZECKgc5F9YFRFEJ6Wic+8U4Yvn
89cvY8ye1RM4ZDe1GtuzlgRw11kXc65QtjEnj1j8EqESu+3EcZnq/wOzFFbeELFn
kTwlRfbRHELksH04OAphQwa/BFTXPni+zv0tFYBkj0RyXAVumBQ=
-----END RSA PRIVATE KEY-----

View File

@@ -0,0 +1,66 @@
-----BEGIN CERTIFICATE-----
MIIGbzCCBNegAwIBAgIQUnjZ1U6EvGqTYJGEoD5X7zANBgkqhkiG9w0BAQwFADBZ
MQswCQYDVQQGEwJDTjElMCMGA1UEChMcVHJ1c3RBc2lhIFRlY2hub2xvZ2llcywg
SW5jLjEjMCEGA1UEAxMaVHJ1c3RBc2lhIFJTQSBEViBUTFMgQ0EgRzIwHhcNMjMw
MjIyMDAwMDAwWhcNMjQwMjIyMjM1OTU5WjAdMRswGQYDVQQDExJsb2NhbC5tb25p
YnVjYS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfbM84UvyL
gSpXChrt/OUZdc9pPFWhS2GUYS5Y/mM9kJFy0B4k0/FlPu+GPo9mxGLSMYkoPco2
iymtLE3ffkrNmabwU/GCFFcS3/LsO8pC65NfKy15pChNtm11fdrZxc0Lho9sN4IJ
iGu2EesKjwYvTtreVX0WUqlyuaf7N6nf4FccMPhCIb/qTj8DJkiYoiLp17bmuI/v
IiMmAQ2BfNC8QNFiP9CS2H7a6gzO+Nvl1m8PNIWzJniFo902/rpQMwVXOwhDtk9I
gtXD3W91CfvIjuwJV2VBPUKBeRtc4e/JkaFUFCGKQ8QMEJCm9HybjmRdq16ti1cO
tbgBdv9V9sgtAgMBAAGjggLtMIIC6TAfBgNVHSMEGDAWgBRfOnwREH4MZ3Fh3Iuj
tQADZ/VXHDAdBgNVHQ4EFgQUjqZHMBwnZrrlYVLZnNFBaCgfdU0wDgYDVR0PAQH/
BAQDAgWgMAwGA1UdEwEB/wQCMAAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF
BwMCMEkGA1UdIARCMEAwNAYLKwYBBAGyMQECAjEwJTAjBggrBgEFBQcCARYXaHR0
cHM6Ly9zZWN0aWdvLmNvbS9DUFMwCAYGZ4EMAQIBMH0GCCsGAQUFBwEBBHEwbzBC
BggrBgEFBQcwAoY2aHR0cDovL2NydC50cnVzdC1wcm92aWRlci5jbi9UcnVzdEFz
aWFSU0FEVlRMU0NBRzIuY3J0MCkGCCsGAQUFBzABhh1odHRwOi8vb2NzcC50cnVz
dC1wcm92aWRlci5jbjAdBgNVHREEFjAUghJsb2NhbC5tb25pYnVjYS5jb20wggF/
BgorBgEEAdZ5AgQCBIIBbwSCAWsBaQB3AHb/iD8KtvuVUcJhzPWHujS0pM27Kdxo
Qgqf5mdMWjp0AAABhnkjvQkAAAQDAEgwRgIhAMBG7q1j0b/vZwGD27eUP0+nBh73
P7nC+CSokfpts8SoAiEA+HCrNNZ4jrgiLyi9035m1Hvf414Bn8ksSjWzxNUEiCgA
dgDatr9rP7W2Ip+bwrtca+hwkXFsu1GEhTS9pD0wSNf7qwAAAYZ5I7zdAAAEAwBH
MEUCIQCdIMs1nf5cheetmz4/9om8/6KDsoej8VFXHzaDt4BkAAIgLePqBifr6zUA
lTpl17CRDG7q09kUPzElK8uTLjnugtkAdgDuzdBk1dsazsVct520zROiModGfLzs
3sNRSFlGcR+1mwAAAYZ5I7yiAAAEAwBHMEUCIAbWcfP71joz+2wBVRU78RD0bV6V
ugIiATVYUh+k9duJAiEA7x0JIdFuLBj3ggGH9QpAWdG03z48kZ9Cy1DjibRn4fYw
DQYJKoZIhvcNAQEMBQADggGBAItE6rr+1vFoVA3R71+23W0ctYrBTWyxCOse8i0x
/BEo13FjERXJkKWGSj1mwmTikO94JPcPqm83PVyZ0eIbEPu4IO/E6xFbOkTSQu7c
o+5i7TdqtPfv6AOApt+yBb3t3NHRXk2WYLV4onvuSnorbFaj9wRS7GNr+rXCIbqJ
HaFKpneHoV9XhKYUwdgDr2w6JkGattyFdee5o60+8EtL068Mf6Qg3OmyMosEZuBw
R/Gs4DPVKwxj/qt7cJPZoUDV7L6LzLCkgxre8nvvLbOBkC34i4KQGF4CkTjRPWw+
OpRKOzWIw9fQ2+m+z7QwWi+fZQ31EAT6KGnHqPLePJNj09qqUSff3e/y9szzKGHc
TpKVSCgEuuCiBNze7PG9D8QgBMYHkOuGkMeP1EO0pZ3mxd3obUn+bPz0tsqvMR2t
gBx56pMeFnVNQR26VqT2YE+Xw7j6AQUwLa6SCMEsfPeotnhl5tiIKxjqWjWf1lLV
/YWY7m7yb5ctZnq9FJU926ZLLA==
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFBzCCA++gAwIBAgIRALIM7VUuMaC/NDp1KHQ76aswDQYJKoZIhvcNAQELBQAw
ezELMAkGA1UEBhMCR0IxGzAZBgNVBAgMEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4G
A1UEBwwHU2FsZm9yZDEaMBgGA1UECgwRQ29tb2RvIENBIExpbWl0ZWQxITAfBgNV
BAMMGEFBQSBDZXJ0aWZpY2F0ZSBTZXJ2aWNlczAeFw0yMjAxMTAwMDAwMDBaFw0y
ODEyMzEyMzU5NTlaMFkxCzAJBgNVBAYTAkNOMSUwIwYDVQQKExxUcnVzdEFzaWEg
VGVjaG5vbG9naWVzLCBJbmMuMSMwIQYDVQQDExpUcnVzdEFzaWEgUlNBIERWIFRM
UyBDQSBHMjCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAKjGDe0GSaBs
Yl/VhMaTM6GhfR1TAt4mrhN8zfAMwEfLZth+N2ie5ULbW8YvSGzhqkDhGgSBlafm
qq05oeESrIJQyz24j7icGeGyIZ/jIChOOvjt4M8EVi3O0Se7E6RAgVYcX+QWVp5c
Sy+l7XrrtL/pDDL9Bngnq/DVfjCzm5ZYUb1PpyvYTP7trsV+yYOCNmmwQvB4yVjf
IIpHC1OcsPBntMUGeH1Eja4D+qJYhGOxX9kpa+2wTCW06L8T6OhkpJWYn5JYiht5
8exjAR7b8Zi3DeG9oZO5o6Qvhl3f8uGU8lK1j9jCUN/18mI/5vZJ76i+hsgdlfZB
Rh5lmAQjD80M9TY+oD4MYUqB5XrigPfFAUwXFGehhlwCVw7y6+5kpbq/NpvM5Ba8
SeQYUUuMA8RXpTtGlrrTPqJryfa55hTuX/ThhX4gcCVkbyujo0CYr+Uuc14IOyNY
1fD0/qORbllbgV41wiy/2ZUWZQUodqHWkjT1CwIMbQOY5jmrSYGBwwIDAQABo4IB
JjCCASIwHwYDVR0jBBgwFoAUoBEKIz6W8Qfs4q8p74Klf9AwpLQwHQYDVR0OBBYE
FF86fBEQfgxncWHci6O1AANn9VccMA4GA1UdDwEB/wQEAwIBhjASBgNVHRMBAf8E
CDAGAQH/AgEAMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAiBgNVHSAE
GzAZMA0GCysGAQQBsjEBAgIxMAgGBmeBDAECATBDBgNVHR8EPDA6MDigNqA0hjJo
dHRwOi8vY3JsLmNvbW9kb2NhLmNvbS9BQUFDZXJ0aWZpY2F0ZVNlcnZpY2VzLmNy
bDA0BggrBgEFBQcBAQQoMCYwJAYIKwYBBQUHMAGGGGh0dHA6Ly9vY3NwLmNvbW9k
b2NhLmNvbTANBgkqhkiG9w0BAQsFAAOCAQEAHMUom5cxIje2IiFU7mOCsBr2F6CY
eU5cyfQ/Aep9kAXYUDuWsaT85721JxeXFYkf4D/cgNd9+hxT8ZeDOJrn+ysqR7NO
2K9AdqTdIY2uZPKmvgHOkvH2gQD6jc05eSPOwdY/10IPvmpgUKaGOa/tyygL8Og4
3tYyoHipMMnS4OiYKakDJny0XVuchIP7ZMKiP07Q3FIuSS4omzR77kmc75/6Q9dP
v4wa90UCOn1j6r7WhMmX3eT3Gsdj3WMe9bYD0AFuqa6MDyjIeXq08mVGraXiw73s
Zale8OMckn/BU3O/3aFNLHLfET2H2hT6Wb3nwxjpLIfXmSVcVd8A58XH0g==
-----END CERTIFICATE-----

View File

@@ -2,6 +2,8 @@ package config
import (
"context"
"crypto/tls"
_ "embed"
"net"
"runtime"
"time"
@@ -9,12 +11,28 @@ import (
"m7s.live/engine/v4/log"
)
type TCP struct {
ListenAddr string
ListenNum int //同时并行监听数量0为CPU核心数量
//go:embed local.monibuca.com_bundle.pem
var LocalCert []byte
//go:embed local.monibuca.com.key
var LocalKey []byte
var _ TCPConfig = (*TCP)(nil)
type TCPConfig interface {
ListenTCP(context.Context, TCPPlugin) error
}
func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) {
type TCP struct {
ListenAddr string
ListenAddrTLS string
CertFile string
KeyFile string
ListenNum int //同时并行监听数量0为CPU核心数量
NoDelay bool //是否禁用Nagle算法
}
func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) {
var tempDelay time.Duration
for {
conn, err := l.Accept()
@@ -34,12 +52,21 @@ func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) {
}
return
}
conn.(*net.TCPConn).SetNoDelay(false)
var tcpConn *net.TCPConn
switch v := conn.(type) {
case *net.TCPConn:
tcpConn = v
case *tls.Conn:
tcpConn = v.NetConn().(*net.TCPConn)
}
if !tcp.NoDelay {
tcpConn.SetNoDelay(false)
}
tempDelay = 0
go handler(conn.(*net.TCPConn))
go handler(conn)
}
}
func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error {
func (tcp *TCP) ListenTCP(ctx context.Context, plugin TCPPlugin) error {
l, err := net.Listen("tcp", tcp.ListenAddr)
if err != nil {
if Global.LogLang == "zh" {
@@ -53,9 +80,39 @@ func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error {
if count == 0 {
count = runtime.NumCPU()
}
log.Infof("tcp listen %d at %s", count, tcp.ListenAddr)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
if tcp.ListenAddrTLS != "" {
keyPair, _ := tls.X509KeyPair(LocalCert, LocalKey)
if tcp.CertFile != "" || tcp.KeyFile != "" {
keyPair, err = tls.LoadX509KeyPair(tcp.CertFile, tcp.KeyFile)
}
if err != nil {
if Global.LogLang == "zh" {
log.Fatalf("加载证书失败: %v", err)
} else {
log.Fatalf("LoadX509KeyPair error: %v", err)
}
return err
}
l, err = tls.Listen("tcp", tcp.ListenAddrTLS, &tls.Config{
Certificates: []tls.Certificate{keyPair},
})
if err != nil {
if Global.LogLang == "zh" {
log.Fatalf("%s: 监听失败: %v", tcp.ListenAddrTLS, err)
} else {
log.Fatalf("%s: Listen error: %v", tcp.ListenAddrTLS, err)
}
return err
}
log.Infof("tls tcp listen %d at %s", count, tcp.ListenAddrTLS)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
}
<-ctx.Done()
return l.Close()
}

View File

@@ -172,6 +172,9 @@ func (opt *Plugin) run() {
if conf, ok := opt.Config.(config.HTTPConfig); ok {
go conf.Listen(opt)
}
if conf, ok := opt.Config.(config.TCPConfig); ok {
go conf.ListenTCP(opt, opt.Config.(config.TCPPlugin))
}
}
// Update 热更新配置

View File

@@ -124,6 +124,7 @@ type StreamTimeoutConfig struct {
PublishTimeout time.Duration //发布者无数据后超时
DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活
NeverTimeout bool // 永不超时
}
type Tracks struct {
sync.Map
@@ -418,6 +419,7 @@ func (s *Stream) run() {
s.onSuberClose(sub)
}
}
if !s.NeverTimeout {
hasTrackTimeout := false
trackCount := 0
s.Tracks.Range(func(name string, t Track) {
@@ -433,11 +435,12 @@ func (s *Stream) run() {
})
if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) {
s.action(ACTION_PUBLISHLOST)
} else {
continue
}
}
s.timeout.Reset(time.Second * 5)
//订阅者等待音视频轨道超时了,放弃等待,订阅成功
s.Subscribers.AbortWait()
}
} else {
s.Debug("timeout", timeOutInfo)
s.action(ACTION_TIMEOUT)

View File

@@ -121,8 +121,9 @@ func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
result.Value.SSRC = av.SSRC
result.Value.Version = 2
result.Value.Raw = make([]byte, 1460)
result.Value.Payload = result.Value.Raw[:0]
}
result.Value.Raw = result.Value.Raw[:1460]
result.Value.Payload = result.Value.Raw[:0]
return
}
@@ -273,11 +274,12 @@ func (av *Media) Flush() {
}
curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
}
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
}
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp))
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime {
if bufferTime > 0 && av.IDRingList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.Next.Next.Value.Value.Timestamp) > bufferTime {
av.ShiftIDR()
av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
}
@@ -312,3 +314,10 @@ func (av *Media) Flush() {
curValue.Sequence = av.MoveCount
preValue.CanRead = true
}
func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {
if curTs < preTs {
return curTs + (1<<32)*time.Millisecond - preTs
}
return curTs - preTs
}

View File

@@ -56,6 +56,7 @@ func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) er
}
func (d *Data[T]) Attach(s IStream) {
d.SetStuff(s)
if err := s.AddTrack(d).Await(); err != nil {
d.Error("attach data track failed", zap.Error(err))
} else {

View File

@@ -16,7 +16,11 @@ const (
READSTATE_FIRST
READSTATE_NORMAL
)
const (
SUBMODE_REAL = iota
SUBMODE_NOJUMP
SUBMODE_BUFFER
)
type AVRingReader struct {
ctx context.Context
Track *Media
@@ -88,15 +92,15 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
r.Warn("no IDRring")
}
switch mode {
case 0:
case SUBMODE_REAL:
if r.Track.IDRing != nil {
r.State = READSTATE_FIRST
} else {
r.State = READSTATE_NORMAL
}
case 1:
case SUBMODE_NOJUMP:
r.State = READSTATE_NORMAL
case 2:
case SUBMODE_BUFFER:
if r.Track.HistoryRing != nil {
startRing = r.Track.HistoryRing
}

View File

@@ -53,7 +53,6 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) {
rtpItem = av.GetRTPFromPool()
packet := &rtpItem.Value
br := util.LimitBuffer{Buffer: packet.Payload}
br.Reset()
if av.SampleRate != 90000 {
packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000)
} else {

View File

@@ -10,8 +10,8 @@ type IList[T any] interface {
type ListItem[T any] struct {
Value T
Next, Pre *ListItem[T]
Pool *List[T] // 回收池
Next, Pre *ListItem[T] `json:"-" yaml:"-"`
Pool *List[T] `json:"-" yaml:"-"` // 回收池
list *List[T]
}