From f4fb7881f79e150f6d96efeb43863f573dcbc99b Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 4 Jun 2023 11:02:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Stream=20NeverTimeout?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=EF=BC=8C=E7=94=A8=E4=BA=8E=E7=BA=AF=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E8=BD=A8=E9=81=93=E7=9A=84=E6=B5=81=E4=BF=9D=E6=8C=81?= =?UTF-8?q?=E4=B8=8D=E5=85=B3=E9=97=AD=20=E6=B6=88=E9=99=A4=E4=B8=80?= =?UTF-8?q?=E5=A4=84=E9=AD=94=E6=B3=95=E6=95=B0=E5=AD=97=20=E5=B0=86TCP?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E5=A2=9E=E5=8A=A0TLS=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- common/frame.go | 3 +- common/ring_lock.go | 2 +- config/config.go | 2 +- config/local.monibuca.com.key | 27 +++++++++++ config/local.monibuca.com_bundle.pem | 66 ++++++++++++++++++++++++++ config/tcp.go | 71 +++++++++++++++++++++++++--- plugin.go | 5 +- stream.go | 41 ++++++++-------- track/base.go | 15 ++++-- track/data.go | 1 + track/reader-av.go | 12 +++-- track/rtp.go | 1 - util/list.go | 4 +- 14 files changed, 210 insertions(+), 42 deletions(-) create mode 100644 config/local.monibuca.com.key create mode 100644 config/local.monibuca.com_bundle.pem diff --git a/README.md b/README.md index 27232ff..85b2438 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ - 提供配置热更新机制 ## 引擎自带HTTP接口 - 获取某一个流的详情 `/api/stream?streamPath=xxx` -- 终止某一个流 `/api/closeStream?streamPath=xxx` +- 终止某一个流 `/api/closestream?streamPath=xxx` - 获取engine信息 `/api/sysInfo` 返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]} - 获取系统基本情况 `/api/summary` 返回值Summary数据 - 获取所有插件信息 `/api/plugins` 返回值Plugin数据 diff --git a/common/frame.go b/common/frame.go index fcb1a5d..4eefdfa 100644 --- a/common/frame.go +++ b/common/frame.go @@ -36,7 +36,6 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame { if r.Packet == nil { r.Packet = &rtp.Packet{} } - r.Raw = raw if err := r.Packet.Unmarshal(raw); err != nil { log.Error(err) return nil @@ -53,7 +52,7 @@ type BaseFrame struct { type DataFrame[T any] struct { BaseFrame - Value T + Value T `json:"-" yaml:"-"` } type AVFrame struct { diff --git a/common/ring_lock.go b/common/ring_lock.go index bd3f884..33f76e7 100644 --- a/common/ring_lock.go +++ b/common/ring_lock.go @@ -12,7 +12,7 @@ type LockFrame[T any] struct { type LockRing[T any] struct { RingBuffer[LockFrame[T]] - Reset func(*DataFrame[T]) + Reset func(*DataFrame[T]) `json:"-" yaml:"-"` Flag *int32 } diff --git a/config/config.go b/config/config.go index f0ba6dc..ab58c85 100644 --- a/config/config.go +++ b/config/config.go @@ -24,7 +24,7 @@ type Plugin interface { type TCPPlugin interface { Plugin - ServeTCP(*net.TCPConn) + ServeTCP(net.Conn) } type HTTPPlugin interface { diff --git a/config/local.monibuca.com.key b/config/local.monibuca.com.key new file mode 100644 index 0000000..06cc303 --- /dev/null +++ b/config/local.monibuca.com.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAn2zPOFL8i4EqVwoa7fzlGXXPaTxVoUthlGEuWP5jPZCRctAe +JNPxZT7vhj6PZsRi0jGJKD3KNosprSxN335KzZmm8FPxghRXEt/y7DvKQuuTXyst +eaQoTbZtdX3a2cXNC4aPbDeCCYhrthHrCo8GL07a3lV9FlKpcrmn+zep3+BXHDD4 +QiG/6k4/AyZImKIi6de25riP7yIjJgENgXzQvEDRYj/Qkth+2uoMzvjb5dZvDzSF +syZ4haPdNv66UDMFVzsIQ7ZPSILVw91vdQn7yI7sCVdlQT1CgXkbXOHvyZGhVBQh +ikPEDBCQpvR8m45kXaterYtXDrW4AXb/VfbILQIDAQABAoIBAAm3jZfOwxLTsBt3 +A7YuvF4FZrtw0H1QxWVQWQ1WkADwH1VipvuyqVt07w99H6txW96Y41W/EmSprcQe +165AGdoXO+wZCbbWe4oseTd523Whuy1JSe/ImCZIcLqBDcZJPpqtvG8poPToyjvi +MrPFjOh0Q3XauxGRvz89XqY6udFp1+jzt7pmUSk9AWi/CYNGWzU32iEgZZNAxhgg +T3f6HkitiyoyqQURuxMxUVdohjdavVQwDrRqwgLM4MkfIhT+1B52xNYhCVzxT5Eg +5VE533fumUULxCK0t8/aneSn4rJ/5+CU0WBDex9cVtDsXCncK9oCkw/PQZO+lzzt +LHzBytcCgYEA1tZISKL2gFb24aDaS/OcHfDkHrFRVQDHP+iWL8TMs5HHUGsM675z +JxYNTgKH4tgg19V7G1N6SfF9wluNYd+4m681DO4kHMdzQ5+RcuLM/p2T8o1hLjf6 +CDTjMoSntyecouwbuqmibBzQw834+LR/0h7N12eLgV5MEjdKMLaktqcCgYEAvfiU +FAIePAwSlroJcmL1AVCOYIyoVK9Vg5hEukclDtrzy0KkSKoMsyGrM/ggPvMNL/vz +W6rJSFahoRb8XyDautZVj1oxjYrrVBOKSKLTKr8+ckfztDO6+aljxtxHdv9sRQg1 +zQ+NSozgXJSW08Y464I6CPaS5GW1DGTLFAjCeQsCgYAh06WYAkjL1mWTCy+0C8yG +Dlrs1kCXIMM+tdGH/fW5RHfcmq5zJA6fleJMaSuaNSuesFds6wzzPZnuk1nEkmRP +5xt0SL7Y5TKp8CMHstxSLt+PrmEh1OCCkElBuA9sUEligciv8GvJmBPq8LCGAG2r +2PvSMdSObxmNOLVuzCNNOwKBgAo+c02454R5ai8yjPvcFjYh7+uI6jLW2ZelCF+7 +ImZwrCDT0SQR92lZcW/1+1cpqBZkUbUpunzqHwEeyjEfBmx4zlhSlsV5LkN0YkqU +bSqq8WUcOCoJeBWqarT4f+oMz/vQ+4W5Rvc0LY0QfimhUMRyW0rMcRNb4K1wafsE +legNAoGADESfqKfuuL2C4+lmwaPb/7K3yirOEaZECKgc5F9YFRFEJ6Wic+8U4Yvn +89cvY8ye1RM4ZDe1GtuzlgRw11kXc65QtjEnj1j8EqESu+3EcZnq/wOzFFbeELFn +kTwlRfbRHELksH04OAphQwa/BFTXPni+zv0tFYBkj0RyXAVumBQ= +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/config/local.monibuca.com_bundle.pem b/config/local.monibuca.com_bundle.pem new file mode 100644 index 0000000..95685de --- /dev/null +++ b/config/local.monibuca.com_bundle.pem @@ -0,0 +1,66 @@ +-----BEGIN CERTIFICATE----- +MIIGbzCCBNegAwIBAgIQUnjZ1U6EvGqTYJGEoD5X7zANBgkqhkiG9w0BAQwFADBZ +MQswCQYDVQQGEwJDTjElMCMGA1UEChMcVHJ1c3RBc2lhIFRlY2hub2xvZ2llcywg +SW5jLjEjMCEGA1UEAxMaVHJ1c3RBc2lhIFJTQSBEViBUTFMgQ0EgRzIwHhcNMjMw +MjIyMDAwMDAwWhcNMjQwMjIyMjM1OTU5WjAdMRswGQYDVQQDExJsb2NhbC5tb25p +YnVjYS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfbM84UvyL +gSpXChrt/OUZdc9pPFWhS2GUYS5Y/mM9kJFy0B4k0/FlPu+GPo9mxGLSMYkoPco2 +iymtLE3ffkrNmabwU/GCFFcS3/LsO8pC65NfKy15pChNtm11fdrZxc0Lho9sN4IJ +iGu2EesKjwYvTtreVX0WUqlyuaf7N6nf4FccMPhCIb/qTj8DJkiYoiLp17bmuI/v +IiMmAQ2BfNC8QNFiP9CS2H7a6gzO+Nvl1m8PNIWzJniFo902/rpQMwVXOwhDtk9I +gtXD3W91CfvIjuwJV2VBPUKBeRtc4e/JkaFUFCGKQ8QMEJCm9HybjmRdq16ti1cO +tbgBdv9V9sgtAgMBAAGjggLtMIIC6TAfBgNVHSMEGDAWgBRfOnwREH4MZ3Fh3Iuj +tQADZ/VXHDAdBgNVHQ4EFgQUjqZHMBwnZrrlYVLZnNFBaCgfdU0wDgYDVR0PAQH/ +BAQDAgWgMAwGA1UdEwEB/wQCMAAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF +BwMCMEkGA1UdIARCMEAwNAYLKwYBBAGyMQECAjEwJTAjBggrBgEFBQcCARYXaHR0 +cHM6Ly9zZWN0aWdvLmNvbS9DUFMwCAYGZ4EMAQIBMH0GCCsGAQUFBwEBBHEwbzBC +BggrBgEFBQcwAoY2aHR0cDovL2NydC50cnVzdC1wcm92aWRlci5jbi9UcnVzdEFz +aWFSU0FEVlRMU0NBRzIuY3J0MCkGCCsGAQUFBzABhh1odHRwOi8vb2NzcC50cnVz +dC1wcm92aWRlci5jbjAdBgNVHREEFjAUghJsb2NhbC5tb25pYnVjYS5jb20wggF/ +BgorBgEEAdZ5AgQCBIIBbwSCAWsBaQB3AHb/iD8KtvuVUcJhzPWHujS0pM27Kdxo +Qgqf5mdMWjp0AAABhnkjvQkAAAQDAEgwRgIhAMBG7q1j0b/vZwGD27eUP0+nBh73 +P7nC+CSokfpts8SoAiEA+HCrNNZ4jrgiLyi9035m1Hvf414Bn8ksSjWzxNUEiCgA +dgDatr9rP7W2Ip+bwrtca+hwkXFsu1GEhTS9pD0wSNf7qwAAAYZ5I7zdAAAEAwBH +MEUCIQCdIMs1nf5cheetmz4/9om8/6KDsoej8VFXHzaDt4BkAAIgLePqBifr6zUA +lTpl17CRDG7q09kUPzElK8uTLjnugtkAdgDuzdBk1dsazsVct520zROiModGfLzs +3sNRSFlGcR+1mwAAAYZ5I7yiAAAEAwBHMEUCIAbWcfP71joz+2wBVRU78RD0bV6V +ugIiATVYUh+k9duJAiEA7x0JIdFuLBj3ggGH9QpAWdG03z48kZ9Cy1DjibRn4fYw +DQYJKoZIhvcNAQEMBQADggGBAItE6rr+1vFoVA3R71+23W0ctYrBTWyxCOse8i0x +/BEo13FjERXJkKWGSj1mwmTikO94JPcPqm83PVyZ0eIbEPu4IO/E6xFbOkTSQu7c +o+5i7TdqtPfv6AOApt+yBb3t3NHRXk2WYLV4onvuSnorbFaj9wRS7GNr+rXCIbqJ +HaFKpneHoV9XhKYUwdgDr2w6JkGattyFdee5o60+8EtL068Mf6Qg3OmyMosEZuBw +R/Gs4DPVKwxj/qt7cJPZoUDV7L6LzLCkgxre8nvvLbOBkC34i4KQGF4CkTjRPWw+ +OpRKOzWIw9fQ2+m+z7QwWi+fZQ31EAT6KGnHqPLePJNj09qqUSff3e/y9szzKGHc +TpKVSCgEuuCiBNze7PG9D8QgBMYHkOuGkMeP1EO0pZ3mxd3obUn+bPz0tsqvMR2t +gBx56pMeFnVNQR26VqT2YE+Xw7j6AQUwLa6SCMEsfPeotnhl5tiIKxjqWjWf1lLV +/YWY7m7yb5ctZnq9FJU926ZLLA== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFBzCCA++gAwIBAgIRALIM7VUuMaC/NDp1KHQ76aswDQYJKoZIhvcNAQELBQAw +ezELMAkGA1UEBhMCR0IxGzAZBgNVBAgMEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4G +A1UEBwwHU2FsZm9yZDEaMBgGA1UECgwRQ29tb2RvIENBIExpbWl0ZWQxITAfBgNV +BAMMGEFBQSBDZXJ0aWZpY2F0ZSBTZXJ2aWNlczAeFw0yMjAxMTAwMDAwMDBaFw0y +ODEyMzEyMzU5NTlaMFkxCzAJBgNVBAYTAkNOMSUwIwYDVQQKExxUcnVzdEFzaWEg +VGVjaG5vbG9naWVzLCBJbmMuMSMwIQYDVQQDExpUcnVzdEFzaWEgUlNBIERWIFRM +UyBDQSBHMjCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAKjGDe0GSaBs +Yl/VhMaTM6GhfR1TAt4mrhN8zfAMwEfLZth+N2ie5ULbW8YvSGzhqkDhGgSBlafm +qq05oeESrIJQyz24j7icGeGyIZ/jIChOOvjt4M8EVi3O0Se7E6RAgVYcX+QWVp5c +Sy+l7XrrtL/pDDL9Bngnq/DVfjCzm5ZYUb1PpyvYTP7trsV+yYOCNmmwQvB4yVjf +IIpHC1OcsPBntMUGeH1Eja4D+qJYhGOxX9kpa+2wTCW06L8T6OhkpJWYn5JYiht5 +8exjAR7b8Zi3DeG9oZO5o6Qvhl3f8uGU8lK1j9jCUN/18mI/5vZJ76i+hsgdlfZB +Rh5lmAQjD80M9TY+oD4MYUqB5XrigPfFAUwXFGehhlwCVw7y6+5kpbq/NpvM5Ba8 +SeQYUUuMA8RXpTtGlrrTPqJryfa55hTuX/ThhX4gcCVkbyujo0CYr+Uuc14IOyNY +1fD0/qORbllbgV41wiy/2ZUWZQUodqHWkjT1CwIMbQOY5jmrSYGBwwIDAQABo4IB +JjCCASIwHwYDVR0jBBgwFoAUoBEKIz6W8Qfs4q8p74Klf9AwpLQwHQYDVR0OBBYE +FF86fBEQfgxncWHci6O1AANn9VccMA4GA1UdDwEB/wQEAwIBhjASBgNVHRMBAf8E +CDAGAQH/AgEAMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAiBgNVHSAE +GzAZMA0GCysGAQQBsjEBAgIxMAgGBmeBDAECATBDBgNVHR8EPDA6MDigNqA0hjJo +dHRwOi8vY3JsLmNvbW9kb2NhLmNvbS9BQUFDZXJ0aWZpY2F0ZVNlcnZpY2VzLmNy +bDA0BggrBgEFBQcBAQQoMCYwJAYIKwYBBQUHMAGGGGh0dHA6Ly9vY3NwLmNvbW9k +b2NhLmNvbTANBgkqhkiG9w0BAQsFAAOCAQEAHMUom5cxIje2IiFU7mOCsBr2F6CY +eU5cyfQ/Aep9kAXYUDuWsaT85721JxeXFYkf4D/cgNd9+hxT8ZeDOJrn+ysqR7NO +2K9AdqTdIY2uZPKmvgHOkvH2gQD6jc05eSPOwdY/10IPvmpgUKaGOa/tyygL8Og4 +3tYyoHipMMnS4OiYKakDJny0XVuchIP7ZMKiP07Q3FIuSS4omzR77kmc75/6Q9dP +v4wa90UCOn1j6r7WhMmX3eT3Gsdj3WMe9bYD0AFuqa6MDyjIeXq08mVGraXiw73s +Zale8OMckn/BU3O/3aFNLHLfET2H2hT6Wb3nwxjpLIfXmSVcVd8A58XH0g== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/config/tcp.go b/config/tcp.go index c50a6ba..2be3359 100644 --- a/config/tcp.go +++ b/config/tcp.go @@ -2,6 +2,8 @@ package config import ( "context" + "crypto/tls" + _ "embed" "net" "runtime" "time" @@ -9,12 +11,28 @@ import ( "m7s.live/engine/v4/log" ) -type TCP struct { - ListenAddr string - ListenNum int //同时并行监听数量,0为CPU核心数量 +//go:embed local.monibuca.com_bundle.pem +var LocalCert []byte + +//go:embed local.monibuca.com.key +var LocalKey []byte + +var _ TCPConfig = (*TCP)(nil) + +type TCPConfig interface { + ListenTCP(context.Context, TCPPlugin) error } -func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) { +type TCP struct { + ListenAddr string + ListenAddrTLS string + CertFile string + KeyFile string + ListenNum int //同时并行监听数量,0为CPU核心数量 + NoDelay bool //是否禁用Nagle算法 +} + +func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) { var tempDelay time.Duration for { conn, err := l.Accept() @@ -34,12 +52,21 @@ func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) { } return } - conn.(*net.TCPConn).SetNoDelay(false) + var tcpConn *net.TCPConn + switch v := conn.(type) { + case *net.TCPConn: + tcpConn = v + case *tls.Conn: + tcpConn = v.NetConn().(*net.TCPConn) + } + if !tcp.NoDelay { + tcpConn.SetNoDelay(false) + } tempDelay = 0 - go handler(conn.(*net.TCPConn)) + go handler(conn) } } -func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error { +func (tcp *TCP) ListenTCP(ctx context.Context, plugin TCPPlugin) error { l, err := net.Listen("tcp", tcp.ListenAddr) if err != nil { if Global.LogLang == "zh" { @@ -53,9 +80,39 @@ func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error { if count == 0 { count = runtime.NumCPU() } + log.Infof("tcp listen %d at %s", count, tcp.ListenAddr) for i := 0; i < count; i++ { go tcp.listen(l, plugin.ServeTCP) } + if tcp.ListenAddrTLS != "" { + keyPair, _ := tls.X509KeyPair(LocalCert, LocalKey) + if tcp.CertFile != "" || tcp.KeyFile != "" { + keyPair, err = tls.LoadX509KeyPair(tcp.CertFile, tcp.KeyFile) + } + if err != nil { + if Global.LogLang == "zh" { + log.Fatalf("加载证书失败: %v", err) + } else { + log.Fatalf("LoadX509KeyPair error: %v", err) + } + return err + } + l, err = tls.Listen("tcp", tcp.ListenAddrTLS, &tls.Config{ + Certificates: []tls.Certificate{keyPair}, + }) + if err != nil { + if Global.LogLang == "zh" { + log.Fatalf("%s: 监听失败: %v", tcp.ListenAddrTLS, err) + } else { + log.Fatalf("%s: Listen error: %v", tcp.ListenAddrTLS, err) + } + return err + } + log.Infof("tls tcp listen %d at %s", count, tcp.ListenAddrTLS) + for i := 0; i < count; i++ { + go tcp.listen(l, plugin.ServeTCP) + } + } <-ctx.Done() return l.Close() } diff --git a/plugin.go b/plugin.go index b78dadf..2d1d093 100644 --- a/plugin.go +++ b/plugin.go @@ -130,7 +130,7 @@ func (opt *Plugin) assign() { opt.Disabled = false //移除这个属性防止反序列化报错 delete(opt.RawConfig, "enable") - } + } if opt.Disabled { opt.Warn("plugin disabled") return @@ -172,6 +172,9 @@ func (opt *Plugin) run() { if conf, ok := opt.Config.(config.HTTPConfig); ok { go conf.Listen(opt) } + if conf, ok := opt.Config.(config.TCPConfig); ok { + go conf.ListenTCP(opt, opt.Config.(config.TCPPlugin)) + } } // Update 热更新配置 diff --git a/stream.go b/stream.go index 405e765..e9b6541 100644 --- a/stream.go +++ b/stream.go @@ -124,6 +124,7 @@ type StreamTimeoutConfig struct { PublishTimeout time.Duration //发布者无数据后超时 DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活 IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活 + NeverTimeout bool // 永不超时 } type Tracks struct { sync.Map @@ -418,26 +419,28 @@ func (s *Stream) run() { s.onSuberClose(sub) } } - hasTrackTimeout := false - trackCount := 0 - s.Tracks.Range(func(name string, t Track) { - trackCount++ - if _, ok := t.(track.Custom); ok { - return - } - // track 超过一定时间没有更新数据了 - if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { - s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) - hasTrackTimeout = true - } - }) - if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) { - s.action(ACTION_PUBLISHLOST) - } else { - s.timeout.Reset(time.Second * 5) - //订阅者等待音视频轨道超时了,放弃等待,订阅成功 - s.Subscribers.AbortWait() + if !s.NeverTimeout { + hasTrackTimeout := false + trackCount := 0 + s.Tracks.Range(func(name string, t Track) { + trackCount++ + if _, ok := t.(track.Custom); ok { + return + } + // track 超过一定时间没有更新数据了 + if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { + s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) + hasTrackTimeout = true + } + }) + if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) { + s.action(ACTION_PUBLISHLOST) + continue + } } + s.timeout.Reset(time.Second * 5) + //订阅者等待音视频轨道超时了,放弃等待,订阅成功 + s.Subscribers.AbortWait() } else { s.Debug("timeout", timeOutInfo) s.action(ACTION_TIMEOUT) diff --git a/track/base.go b/track/base.go index c8da7c6..687a241 100644 --- a/track/base.go +++ b/track/base.go @@ -121,8 +121,9 @@ func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { result.Value.SSRC = av.SSRC result.Value.Version = 2 result.Value.Raw = make([]byte, 1460) - result.Value.Payload = result.Value.Raw[:0] } + result.Value.Raw = result.Value.Raw[:1460] + result.Value.Payload = result.Value.Raw[:0] return } @@ -273,11 +274,12 @@ func (av *Media) Flush() { } curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS) } - curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond) + + curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond) } av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp)) bufferTime := av.Stream.GetPublisherConfig().BufferTime - if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime { + if bufferTime > 0 && av.IDRingList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.Next.Next.Value.Value.Timestamp) > bufferTime { av.ShiftIDR() av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence)) } @@ -312,3 +314,10 @@ func (av *Media) Flush() { curValue.Sequence = av.MoveCount preValue.CanRead = true } + +func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration { + if curTs < preTs { + return curTs + (1<<32)*time.Millisecond - preTs + } + return curTs - preTs +} diff --git a/track/data.go b/track/data.go index 40e4379..c051e99 100644 --- a/track/data.go +++ b/track/data.go @@ -56,6 +56,7 @@ func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) er } func (d *Data[T]) Attach(s IStream) { + d.SetStuff(s) if err := s.AddTrack(d).Await(); err != nil { d.Error("attach data track failed", zap.Error(err)) } else { diff --git a/track/reader-av.go b/track/reader-av.go index f2e819c..aaafdc8 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -16,7 +16,11 @@ const ( READSTATE_FIRST READSTATE_NORMAL ) - +const ( + SUBMODE_REAL = iota + SUBMODE_NOJUMP + SUBMODE_BUFFER +) type AVRingReader struct { ctx context.Context Track *Media @@ -88,15 +92,15 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.Warn("no IDRring") } switch mode { - case 0: + case SUBMODE_REAL: if r.Track.IDRing != nil { r.State = READSTATE_FIRST } else { r.State = READSTATE_NORMAL } - case 1: + case SUBMODE_NOJUMP: r.State = READSTATE_NORMAL - case 2: + case SUBMODE_BUFFER: if r.Track.HistoryRing != nil { startRing = r.Track.HistoryRing } diff --git a/track/rtp.go b/track/rtp.go index bf9bd9b..b956a6b 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -53,7 +53,6 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) { rtpItem = av.GetRTPFromPool() packet := &rtpItem.Value br := util.LimitBuffer{Buffer: packet.Payload} - br.Reset() if av.SampleRate != 90000 { packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000) } else { diff --git a/util/list.go b/util/list.go index a697bd5..d0a852a 100644 --- a/util/list.go +++ b/util/list.go @@ -10,8 +10,8 @@ type IList[T any] interface { type ListItem[T any] struct { Value T - Next, Pre *ListItem[T] - Pool *List[T] // 回收池 + Next, Pre *ListItem[T] `json:"-" yaml:"-"` + Pool *List[T] `json:"-" yaml:"-"` // 回收池 list *List[T] }