From 2baf2ee5db6428d3fed1a28454655a1b087b4367 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 11 May 2023 21:13:01 +0800 Subject: [PATCH] first commit --- README.en.md | 35 +++++ README.md | 36 ++++- go.mod | 10 +- go.sum | 18 ++- main.go | 215 +++++++++++++++++++++++++++++- mpegps/buffer.go | 112 ++++++++++++++++ mpegps/demuxer_v3.go | 250 +++++++++++++++++++++++++++++++++++ mpegps/mpegps.go | 159 ++++++++++++++++++++++ mpegps/mpegps_pes.go | 35 +++++ mpegps/ps-demuxer.go | 283 +++++++++++++++++++++++++++++++++++++++ port.go | 6 + publisher.go | 308 +++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 1455 insertions(+), 12 deletions(-) create mode 100644 README.en.md create mode 100644 mpegps/buffer.go create mode 100644 mpegps/demuxer_v3.go create mode 100644 mpegps/mpegps.go create mode 100644 mpegps/mpegps_pes.go create mode 100644 mpegps/ps-demuxer.go create mode 100644 port.go create mode 100644 publisher.go diff --git a/README.en.md b/README.en.md new file mode 100644 index 0000000..90dc254 --- /dev/null +++ b/README.en.md @@ -0,0 +1,35 @@ +_[简体中文](https://github.com/Monibuca/plugin-ps) | English_ + +# PS Plugin + +enable receive MpegPS stream + +## Plugin Address + +https://github.com/Monibuca/plugin-ps + +## Plugin Import +```go + import ( _ "m7s.live/plugin/ps/v4" ) +``` + +## Default Config + +```yaml +ps: + publish: # format refer to global config + relaymode: 1 # 0:relay only 1:protocol transfar only 2:relay and protocol transfar +``` + +## API + +### receive PS stream +`/ps/api/receive?streamPath=xxx&ssrc=xxx&port=xxx&reuse=1&dump=xxx` + +- reuse means whether to reuse port, if reuse port, please make sure the ssrc from device is same as ssrc parameter, otherwise it will cause mixed stream +- dump means whether to dump to file, if dump to file, it will generate a folder named dump in current directory, the folder contains a file named by streamPath parameter, the file content is the data received from port [4byte content length][2byte relative time][content] +### replay PS dump file + +`/ps/api/replay?streamPath=xxx&dump=xxx` +- dump means the file to replay, default is dump/ps +- streamPath means the streamPath of replayed video stream, default is replay/dump/ps (if dump is abc, then streamPath is replay/abc) \ No newline at end of file diff --git a/README.md b/README.md index a12fb5f..057068a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,34 @@ -# plugin-ps -handle mpeg-ps stream for monibuca +_[English](https://github.com/Monibuca/plugin-ps/blob/v4/README.en.md) | 简体中文_ +# PS 插件 + +支持接收MpegPS流 + +## 插件地址 + +https://github.com/Monibuca/plugin-ps + +## 插件引入 +```go + import ( _ "m7s.live/plugin/ps/v4" ) +``` + +## 默认配置 + +```yaml +ps: + publish: # 格式参考全局配置 + relaymode: 1 # 0:纯转发 1:转协议,不转发 2:转发并且转协议 +``` + +## API + +### 接收PS流 +`/ps/api/receive?streamPath=xxx&ssrc=xxx&port=xxx&reuse=1&dump=xxx` +其中: +- reuse代表是否端口复用,如果使用端口复用,请务必确定设备发送的ssrc和ssrc参数一致,否则会出现混流的情况 +- dump代表是否dump到文件,如果dump到文件,会在当前目录下生成一个以dump为名的文件夹,文件夹下面是以streamPath参数值为名的文件,文件内容从端口收到的数据[4byte 内容长度][2byte 相对时间][内容] +### 回放PS的dump文件 + +`/ps/api/replay?streamPath=xxx&dump=xxx` +- dump 代表需要回放的文件,默认是dump/ps +- streamPath 代表回放时生成的视频流的streamPath, 默认是replay/dump/ps (如果dump传了abc, 那么streamPath默认是replay/abc) \ No newline at end of file diff --git a/go.mod b/go.mod index 3be55ef..84c09d0 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,12 @@ module m7s.live/plugin/ps/v4 go 1.19 -require m7s.live/engine/v4 v4.12.7 +require ( + github.com/pion/rtp v1.7.13 + github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 + go.uber.org/zap v1.23.0 + m7s.live/engine/v4 v4.12.8 +) require ( github.com/aler9/gortsplib/v2 v2.2.2 // indirect @@ -18,7 +23,6 @@ require ( github.com/mcuadros/go-defaults v1.2.0 // indirect github.com/onsi/ginkgo/v2 v2.2.0 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtp v1.7.13 // indirect github.com/pion/webrtc/v3 v3.1.49 // indirect github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect github.com/q191201771/naza v0.30.8 // indirect @@ -29,11 +33,9 @@ require ( github.com/shirou/gopsutil/v3 v3.22.10 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect - github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.7.0 // indirect diff --git a/go.sum b/go.sum index 5b3c437..fa77677 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aler9/gortsplib/v2 v2.2.2 h1:tTw8pdKSOEjlZjjE1S4ftXPHJkYOqjNNv3hjQ0Nto9M= github.com/aler9/gortsplib/v2 v2.2.2/go.mod h1:k6uBVHGwsIc/0L5SLLqWwi6bSJUb4VR0HfvncyHlKQI= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -13,6 +14,7 @@ github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusO github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo= github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ= github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI= @@ -33,12 +35,14 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -53,6 +57,7 @@ github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuK github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= @@ -60,6 +65,7 @@ github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMn github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -74,6 +80,7 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q= github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ= github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY= github.com/pion/ice/v2 v2.2.12/go.mod h1:z2KXVFyRkmjetRlaVRgjO9U3ShKwzhlUylvxKfHfd5A= @@ -101,6 +108,8 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M github.com/pion/webrtc/v3 v3.1.49 h1:rbsNGxK9jMYts+xE6zYAJMUQHnGwmk/JYze8yttW+to= github.com/pion/webrtc/v3 v3.1.49/go.mod h1:kHf/o47QW4No1rgpsFux/h7lUhtUnwFnSFDZOXeLapw= github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c h1:NRoLoZvkBTKvR5gQLgA3e0hqjkY9u1wm+iOL45VN/qI= @@ -129,6 +138,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= @@ -145,6 +155,7 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= @@ -227,6 +238,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -246,8 +258,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= @@ -261,5 +275,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -m7s.live/engine/v4 v4.12.7 h1:3qe4c4NNFtNRCuolHE9NOlo2WkhswhMOrVZj8bk6Goc= -m7s.live/engine/v4 v4.12.7/go.mod h1:LoALBfV5rmsz5TJQr6cmLxM33mfUE5BKBq/sMtXOVlc= +m7s.live/engine/v4 v4.12.8 h1:cNGyajzEkbUzIcPtcedGbxvMlIuScWxDb/raYFgAHKE= +m7s.live/engine/v4 v4.12.8/go.mod h1:LoALBfV5rmsz5TJQr6cmLxM33mfUE5BKBq/sMtXOVlc= diff --git a/main.go b/main.go index cf2c250..e6cefe6 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,222 @@ -package pluginps +package ps import ( + "encoding/binary" + "fmt" + "io" + "net" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/pion/rtp" + "go.uber.org/zap" . "m7s.live/engine/v4" + "m7s.live/engine/v4/config" + "m7s.live/engine/v4/lang" + "m7s.live/engine/v4/util" ) type PSConfig struct { - Publisher + config.Publish + RelayMode int // 转发模式,0:转协议+不转发,1:不转协议+转发,2:转协议+转发 + streams sync.Map + shareTCP sync.Map + shareUDP sync.Map } + var conf = &PSConfig{} var PSPlugin = InstallPlugin(conf) func (c *PSConfig) OnEvent(event any) { - -} \ No newline at end of file + switch event.(type) { + case FirstConfig: + lang.Merge("zh", map[string]string{ + "start receive ps stream from": "开始接收PS流来自", + "stop receive ps stream from": "停止接收PS流来自", + "ssrc not found": "未找到ssrc", + }) + } +} + +func (c *PSConfig) ServeTCP(conn *net.TCPConn) { + var err error + ps := make(util.Buffer, 1024) + tcpAddr := zap.String("tcp", conn.LocalAddr().String()) + + rtpLen := make([]byte, 2) + var puber *PSPublisher + if _, err = io.ReadFull(conn, rtpLen); err != nil { + return + } + ps.Relloc(int(binary.BigEndian.Uint16(rtpLen))) + if _, err = io.ReadFull(conn, ps); err != nil { + return + } + var rtpPacket rtp.Packet + if err := rtpPacket.Unmarshal(ps); err != nil { + PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err)) + } + ssrc := rtpPacket.SSRC + if v, ok := conf.streams.Load(ssrc); ok { + puber = v.(*PSPublisher) + puber.Info("start receive ps stream from", tcpAddr) + defer puber.Info("stop receive ps stream from", tcpAddr) + defer puber.Stop() + for err == nil { + puber.PushPS(ps) + if _, err = io.ReadFull(conn, rtpLen); err != nil { + return + } + ps.Relloc(int(binary.BigEndian.Uint16(rtpLen))) + if _, err = io.ReadFull(conn, ps); err != nil { + return + } + } + } else { + PSPlugin.Error("ssrc not found", zap.Uint32("ssrc", ssrc)) + } +} + +func (c *PSConfig) ServeUDP(conn *net.UDPConn) { + bufUDP := make([]byte, 1024*1024) + udpAddr := zap.String("udp", conn.LocalAddr().String()) + var rtpPacket rtp.Packet + PSPlugin.Info("start receive ps stream from", udpAddr) + defer PSPlugin.Info("stop receive ps stream from", udpAddr) + var lastSSRC uint32 + var lastPubber *PSPublisher + for { + conn.SetReadDeadline(time.Now().Add(time.Second * 10)) + n, _, err := conn.ReadFromUDP(bufUDP) + if err != nil { + return + } + if err := rtpPacket.Unmarshal(bufUDP[:n]); err != nil { + PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err)) + } + ssrc := rtpPacket.SSRC + if lastSSRC != ssrc { + if v, ok := conf.streams.Load(ssrc); ok { + lastSSRC = ssrc + lastPubber = v.(*PSPublisher) + } else { + PSPlugin.Error("ssrc not found", zap.Uint32("ssrc", ssrc)) + continue + } + } + lastPubber.Packet = rtpPacket + lastPubber.pushPS() + } +} + +func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) { + var pubber PSPublisher + if _, loaded := conf.streams.LoadOrStore(ssrc, &pubber); loaded { + return fmt.Errorf("ssrc %d already exists", ssrc) + } else { + if dump != "" { + pubber.dump, err = os.OpenFile(dump, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return + } + } + if err = PSPlugin.Publish(streamPath, &pubber); err == nil { + protocol, listenaddr, _ := strings.Cut(port, ":") + if !strings.Contains(listenaddr, ":") { + listenaddr = ":" + listenaddr + } + switch protocol { + case "tcp": + var tcpConf config.TCP + tcpConf.ListenAddr = listenaddr + if reuse { + if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok { + } else { + conf.streams.Store(ssrc, &pubber) + go tcpConf.Listen(PSPlugin, conf) + } + } else { + tcpConf.ListenNum = 1 + go tcpConf.Listen(pubber, &pubber) + } + case "udp": + if reuse { + var udpConf struct { + *net.UDPConn + } + if _, ok := conf.shareUDP.LoadOrStore(listenaddr, &udpConf); ok { + } else { + udpConn, err := util.ListenUDP(listenaddr, 1024*1024) + if err != nil { + PSPlugin.Error("udp listen error", zap.Error(err)) + return err + } + udpConf.UDPConn = udpConn + conf.streams.Store(ssrc, &pubber) + go conf.ServeUDP(udpConn) + } + } else { + udpConn, err := util.ListenUDP(listenaddr, 1024*1024) + if err != nil { + pubber.Stop() + return err + } else { + go pubber.ServeUDP(udpConn) + } + } + } + } + } + return +} + +// 收流 +func (c *PSConfig) API_receive(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + dump := query.Get("dump") + streamPath := query.Get("streamPath") + ssrc := query.Get("ssrc") + port := query.Get("port") + reuse := query.Get("reuse") // 是否复用端口 + if _ssrc, err := strconv.ParseInt(ssrc, 10, 0); err == nil { + if err := Receive(streamPath, dump, port, uint32(_ssrc), reuse != ""); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + w.Write([]byte("ok")) + } + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func (c *PSConfig) API_replay(w http.ResponseWriter, r *http.Request) { + dump := r.URL.Query().Get("dump") + streamPath := r.URL.Query().Get("streamPath") + if dump == "" { + dump = "dump/ps" + } + f, err := os.OpenFile(dump, os.O_RDONLY, 0644) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + if streamPath == "" { + if strings.HasPrefix(dump, "/") { + streamPath = "replay" + dump + } else { + streamPath = "replay/" + dump + } + } + var pub PSPublisher + pub.SetIO(f) + if err = PSPlugin.Publish(streamPath, &pub); err == nil { + go pub.Replay(f) + w.Write([]byte("ok")) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} diff --git a/mpegps/buffer.go b/mpegps/buffer.go new file mode 100644 index 0000000..04d6831 --- /dev/null +++ b/mpegps/buffer.go @@ -0,0 +1,112 @@ +package mpegps + +import ( + "encoding/binary" + "errors" + "io" +) + +type IOBuffer struct { + buf []byte // contents are the bytes buf[off : len(buf)] + off int // read at &buf[off], write at &buf[len(buf)] +} + +func (b *IOBuffer) Next(n int) []byte { + m := b.Len() + if n > m { + n = m + } + data := b.buf[b.off : b.off+n] + b.off += n + return data +} +func (b *IOBuffer) Uint16() (uint16, error) { + if b.Len() > 1 { + + return binary.BigEndian.Uint16(b.Next(2)), nil + } + return 0, io.EOF +} + +func (b *IOBuffer) Skip(n int) (err error) { + _, err = b.ReadN(n) + return +} + +func (b *IOBuffer) Uint32() (uint32, error) { + if b.Len() > 3 { + return binary.BigEndian.Uint32(b.Next(4)), nil + } + return 0, io.EOF +} + +func (b *IOBuffer) ReadN(length int) ([]byte, error) { + if b.Len() >= length { + return b.Next(length), nil + } + return nil, io.EOF +} + +//func (b *IOBuffer) Read(buf []byte) (n int, err error) { +// var ret []byte +// ret, err = b.ReadN(len(buf)) +// copy(buf, ret) +// return len(ret), err +//} + +// empty reports whether the unread portion of the buffer is empty. +func (b *IOBuffer) empty() bool { return b.Len() <= b.off } + +func (b *IOBuffer) ReadByte() (byte, error) { + if b.empty() { + // Buffer is empty, reset to recover space. + b.Reset() + return 0, io.EOF + } + c := b.buf[b.off] + b.off++ + return c, nil +} + +func (b *IOBuffer) Reset() { + b.buf = b.buf[:0] + b.off = 0 +} + +func (b *IOBuffer) Len() int { return len(b.buf) - b.off } + +// tryGrowByReslice is a inlineable version of grow for the fast-case where the +// internal buffer only needs to be resliced. +// It returns the index where bytes should be written and whether it succeeded. +func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) { + if l := len(b.buf); n <= cap(b.buf)-l { + b.buf = b.buf[:l+n] + return l, true + } + return 0, false +} + +var ErrTooLarge = errors.New("IOBuffer: too large") + +func (b *IOBuffer) Write(p []byte) (n int, err error) { + l := copy(b.buf, b.buf[b.off:]) + b.buf = append(b.buf[:l], p...) + b.off = 0 + // println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3]) + return len(p), nil + // defer func() { + // if recover() != nil { + // panic(ErrTooLarge) + // } + // }() + // l := len(p) + // oldLen := len(b.buf) + // m, ok := b.tryGrowByReslice(l) + // if !ok { + // m = oldLen - b.off + // buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...) + // b.off = 0 + // b.buf = buf + // } + // return copy(b.buf[m:], p), nil +} diff --git a/mpegps/demuxer_v3.go b/mpegps/demuxer_v3.go new file mode 100644 index 0000000..7a2eceb --- /dev/null +++ b/mpegps/demuxer_v3.go @@ -0,0 +1,250 @@ +package mpegps + +import ( + "errors" + + "m7s.live/engine/v4/util" +) + +var ( + ErrNotFoundStartCode = errors.New("not found the need start code flag") + ErrMarkerBit = errors.New("marker bit value error") + ErrFormatPack = errors.New("not package standard") + ErrParsePakcet = errors.New("parse ps packet error") +) + +/* + This implement from VLC source code + notes: https://github.com/videolan/vlc/blob/master/modules/mux/mpeg/bits.h +*/ + +/* +https://github.com/videolan/vlc/blob/master/modules/demux/mpeg +*/ +type DecPSPackage struct { + systemClockReferenceBase uint64 + systemClockReferenceExtension uint64 + programMuxRate uint32 + IOBuffer + Payload []byte + PTS uint32 + DTS uint32 + EsHandler + audio MpegPsEsStream + video MpegPsEsStream +} + +func (dec *DecPSPackage) clean() { + dec.systemClockReferenceBase = 0 + dec.systemClockReferenceExtension = 0 + dec.programMuxRate = 0 + dec.Payload = nil + dec.PTS = 0 + dec.DTS = 0 +} + +func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) { + payloadlen, err := dec.Uint16() + if err != nil { + return + } + return dec.ReadN(int(payloadlen)) +} +func (dec *DecPSPackage) Feed(ps []byte) { + if len(ps) >= 4 && util.BigEndian.Uint32(ps) == StartCodePS { + if dec.Len() > 0 { + dec.Skip(4) + dec.Read(0) + dec.Reset() + } + dec.Write(ps) + } else if dec.Len() > 0 { + dec.Write(ps) + } +} + +// read the buffer and push video or audio +func (dec *DecPSPackage) Read(ts uint32) error { +again: + dec.clean() + if err := dec.Skip(9); err != nil { + return err + } + + psl, err := dec.ReadByte() + if err != nil { + return err + } + psl &= 0x07 + if err = dec.Skip(int(psl)); err != nil { + return err + } + var video []byte + var nextStartCode, videoTs, videoCts uint32 +loop: + for err == nil { + if nextStartCode, err = dec.Uint32(); err != nil { + break + } + switch nextStartCode { + case StartCodeSYS: + dec.ReadPayload() + //err = dec.decSystemHeader() + case StartCodeMAP: + err = dec.decProgramStreamMap() + case StartCodeVideo: + // var cts uint32 + if err = dec.decPESPacket(); err == nil { + if len(video) == 0 { + dec.video.PTS = dec.PTS + dec.video.DTS = dec.DTS + // if dec.PTS == 0 { + // dec.PTS = ts + // } + // if dec.DTS != 0 { + // cts = dec.PTS - dec.DTS + // } else { + // dec.DTS = dec.PTS + // } + // videoTs = dec.DTS / 90 + // videoCts = cts / 90 + } + video = append(video, dec.Payload...) + } else { + // utils.Println("video", err) + } + case StartCodeAudio: + if err = dec.decPESPacket(); err == nil { + // ts := ts / 90 + // if dec.PTS != 0 { + // ts = dec.PTS / 90 + // } + dec.audio.PTS = dec.PTS + dec.audio.Buffer = dec.Payload + dec.ReceiveAudio(dec.audio) + // pusher.PushAudio(ts, dec.Payload) + } else { + // utils.Println("audio", err) + } + case StartCodePS: + break loop + default: + dec.ReadPayload() + } + } + if len(video) > 0 { + dec.video.Buffer = video + dec.ReceiveVideo(dec.video) + if false { + println("video", videoTs, videoCts, len(video)) + } + // pusher.PushVideo(videoTs, videoCts, video) + } + if nextStartCode == StartCodePS { + // utils.Println(aurora.Red("StartCodePS recursion..."), err) + goto again + } + return err +} + +/* + func (dec *DecPSPackage) decSystemHeader() error { + syslens, err := dec.Uint16() + if err != nil { + return err + } + // drop rate video audio bound and lock flag + syslens -= 6 + if err = dec.Skip(6); err != nil { + return err + } + + // ONE WAY: do not to parse the stream and skip the buffer + //br.Skip(syslen * 8) + + // TWO WAY: parse every stream info + for syslens > 0 { + if nextbits, err := dec.Uint8(); err != nil { + return err + } else if (nextbits&0x80)>>7 != 1 { + break + } + if err = dec.Skip(2); err != nil { + return err + } + syslens -= 3 + } + return nil + } +*/ +func (dec *DecPSPackage) decProgramStreamMap() error { + psm, err := dec.ReadPayload() + if err != nil { + return err + } + l := len(psm) + index := 2 + programStreamInfoLen := util.BigEndian.Uint16(psm[index:]) + index += 2 + index += int(programStreamInfoLen) + programStreamMapLen := util.BigEndian.Uint16(psm[index:]) + index += 2 + for programStreamMapLen > 0 { + if l <= index+1 { + break + } + streamType := psm[index] + index++ + elementaryStreamID := psm[index] + index++ + if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef { + dec.video.Type = streamType + } else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf { + dec.audio.Type = streamType + } + if l <= index+1 { + break + } + elementaryStreamInfoLength := util.BigEndian.Uint16(psm[index:]) + index += 2 + index += int(elementaryStreamInfoLength) + programStreamMapLen -= 4 + elementaryStreamInfoLength + } + return nil +} + +func (dec *DecPSPackage) decPESPacket() error { + payload, err := dec.ReadPayload() + if err != nil { + return err + } + if len(payload) < 4 { + return errors.New("not enough data") + } + //data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1 + flag := payload[1] + ptsFlag := flag>>7 == 1 + dtsFlag := (flag&0b0100_0000)>>6 == 1 + var pts, dts uint32 + pesHeaderDataLen := payload[2] + payload = payload[3:] + extraData := payload[:pesHeaderDataLen] + if ptsFlag && len(extraData) > 4 { + pts = uint32(extraData[0]&0b0000_1110) << 29 + pts += uint32(extraData[1]) << 22 + pts += uint32(extraData[2]&0b1111_1110) << 14 + pts += uint32(extraData[3]) << 7 + pts += uint32(extraData[4]) >> 1 + if dtsFlag && len(extraData) > 9 { + dts = uint32(extraData[5]&0b0000_1110) << 29 + dts += uint32(extraData[6]) << 22 + dts += uint32(extraData[7]&0b1111_1110) << 14 + dts += uint32(extraData[8]) << 7 + dts += uint32(extraData[9]) >> 1 + } + } + dec.PTS = pts + dec.DTS = dts + dec.Payload = payload[pesHeaderDataLen:] + return err +} diff --git a/mpegps/mpegps.go b/mpegps/mpegps.go new file mode 100644 index 0000000..cb6217c --- /dev/null +++ b/mpegps/mpegps.go @@ -0,0 +1,159 @@ +package mpegps + +import ( + "encoding/binary" + "errors" + "io" + + "m7s.live/engine/v4/util" +) + +const ( + StartCodePS = 0x000001ba + StartCodeSYS = 0x000001bb + StartCodeMAP = 0x000001bc + StartCodeVideo = 0x000001e0 + StartCodeAudio = 0x000001c0 + PrivateStreamCode = 0x000001bd + MEPGProgramEndCode = 0x000001b9 +) + +type EsHandler interface { + ReceiveAudio(MpegPsEsStream) + ReceiveVideo(MpegPsEsStream) +} + +type MpegPsEsStream struct { + Type byte + util.Buffer + PTS uint32 + DTS uint32 +} + +type MpegPsStream struct { + buffer util.Buffer + EsHandler + audio MpegPsEsStream + video MpegPsEsStream +} + +func (ps *MpegPsStream) Reset() { + ps.buffer.Reset() + ps.audio.Reset() + if ps.video.Buffer.CanRead() { + ps.ReceiveVideo(ps.video) + ps.video.Buffer = make(util.Buffer, 0) + } else { + ps.video.Reset() + } +} + +func (ps *MpegPsStream) Feed(data util.Buffer) (err error) { + reader := &data + if ps.buffer.CanRead() { + ps.buffer.Write(data) + reader = &ps.buffer + } + var begin util.Buffer + var payload []byte + defer func() { + if err != nil && begin.CanRead() { + ps.buffer.Reset() + ps.buffer.Write(begin) + } + }() + for err == nil && reader.CanReadN(4) { + begin = *reader + code := reader.ReadUint32() + switch code { + case StartCodePS: + if ps.audio.Buffer.CanRead() { + ps.ReceiveAudio(ps.audio) + ps.audio.Buffer = make(util.Buffer, 0) + } + if ps.video.Buffer.CanRead() { + ps.ReceiveVideo(ps.video) + ps.video.Buffer = make(util.Buffer, 0) + } + if reader.CanReadN(9) { + reader.ReadN(9) + if reader.CanRead() { + psl := reader.ReadByte() & 0x07 + if reader.CanReadN(int(psl)) { + reader.ReadN(int(psl)) + continue + } + } + } + err = io.ErrShortBuffer + case StartCodeSYS, PrivateStreamCode: + _, err = ps.ReadPayload(reader) + case StartCodeMAP: + err = ps.decProgramStreamMap(reader) + case StartCodeVideo: + payload, err = ps.ReadPayload(reader) + if err == nil { + err = ps.video.parsePESPacket(payload) + } + case StartCodeAudio: + payload, err = ps.ReadPayload(reader) + if err == nil { + err = ps.audio.parsePESPacket(payload) + } + case MEPGProgramEndCode: + return + default: + err = errors.New("start code error") + } + } + return +} + +func (ps *MpegPsStream) ReadPayload(data *util.Buffer) (payload []byte, err error) { + if !data.CanReadN(2) { + return nil, io.ErrShortBuffer + } + payloadlen := data.ReadUint16() + if data.CanReadN(int(payloadlen)) { + payload = data.ReadN(int(payloadlen)) + } else { + err = io.ErrShortBuffer + } + return +} + +func (ps *MpegPsStream) decProgramStreamMap(data *util.Buffer) error { + psm, err := ps.ReadPayload(data) + if err != nil { + return err + } + l := len(psm) + index := 2 + programStreamInfoLen := binary.BigEndian.Uint16(psm[index:]) + index += 2 + index += int(programStreamInfoLen) + programStreamMapLen := binary.BigEndian.Uint16(psm[index:]) + index += 2 + for programStreamMapLen > 0 { + if l <= index+1 { + break + } + streamType := psm[index] + index++ + elementaryStreamID := psm[index] + index++ + if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef { + ps.video.Type = streamType + } else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf { + ps.audio.Type = streamType + } + if l <= index+1 { + break + } + elementaryStreamInfoLength := binary.BigEndian.Uint16(psm[index:]) + index += 2 + index += int(elementaryStreamInfoLength) + programStreamMapLen -= 4 + elementaryStreamInfoLength + } + return nil +} diff --git a/mpegps/mpegps_pes.go b/mpegps/mpegps_pes.go new file mode 100644 index 0000000..31f3505 --- /dev/null +++ b/mpegps/mpegps_pes.go @@ -0,0 +1,35 @@ +package mpegps + +import "io" + +func (es *MpegPsEsStream) parsePESPacket(payload []byte) (err error) { + if len(payload) < 4 { + return io.ErrShortBuffer + } + //data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1 + flag := payload[1] + ptsFlag := flag>>7 == 1 + dtsFlag := (flag&0b0100_0000)>>6 == 1 + pesHeaderDataLen := payload[2] + if len(payload) < int(pesHeaderDataLen) { + return io.ErrShortBuffer + } + payload = payload[3:] + extraData := payload[:pesHeaderDataLen] + if ptsFlag && len(extraData) > 4 { + es.PTS = uint32(extraData[0]&0b0000_1110) << 29 + es.PTS += uint32(extraData[1]) << 22 + es.PTS += uint32(extraData[2]&0b1111_1110) << 14 + es.PTS += uint32(extraData[3]) << 7 + es.PTS += uint32(extraData[4]) >> 1 + if dtsFlag && len(extraData) > 9 { + es.DTS = uint32(extraData[5]&0b0000_1110) << 29 + es.DTS += uint32(extraData[6]) << 22 + es.DTS += uint32(extraData[7]&0b1111_1110) << 14 + es.DTS += uint32(extraData[8]) << 7 + es.DTS += uint32(extraData[9]) >> 1 + } + } + es.Write(payload[pesHeaderDataLen:]) + return +} diff --git a/mpegps/ps-demuxer.go b/mpegps/ps-demuxer.go new file mode 100644 index 0000000..79028a7 --- /dev/null +++ b/mpegps/ps-demuxer.go @@ -0,0 +1,283 @@ +package mpegps + +import ( + "io" + + "github.com/yapingcat/gomedia/go-codec" + "github.com/yapingcat/gomedia/go-mpeg2" +) + +type psstream struct { + sid uint8 + cid mpeg2.PS_STREAM_TYPE + pts uint64 + dts uint64 + streamBuf []byte +} + +func newpsstream(sid uint8, cid mpeg2.PS_STREAM_TYPE) *psstream { + return &psstream{ + sid: sid, + cid: cid, + streamBuf: make([]byte, 0, 4096), + } +} + +type PSDemuxer struct { + streamMap map[uint8]*psstream + pkg *mpeg2.PSPacket + mpeg1 bool + cache []byte + OnFrame func(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) + //解ps包过程中,解码回调psm,system header,pes包等 + //decodeResult 解码ps包时的产生的错误 + //这个回调主要用于debug,查看是否ps包存在问题 + OnPacket func(pkg mpeg2.Display, decodeResult error) +} + +func NewPSDemuxer() *PSDemuxer { + return &PSDemuxer{ + streamMap: make(map[uint8]*psstream), + pkg: new(mpeg2.PSPacket), + cache: make([]byte, 0, 256), + OnFrame: nil, + OnPacket: nil, + } +} + +func (psdemuxer *PSDemuxer) Feed(data []byte) error { + var bs *codec.BitStream + if len(psdemuxer.cache) > 0 { + psdemuxer.cache = append(psdemuxer.cache, data...) + bs = codec.NewBitStream(psdemuxer.cache) + } else { + bs = codec.NewBitStream(data) + } + + saveReseved := func() { + tmpcache := make([]byte, bs.RemainBytes()) + copy(tmpcache, bs.RemainData()) + psdemuxer.cache = tmpcache + } + + var ret error = nil + for !bs.EOS() { + if mpegerr, ok := ret.(mpeg2.Error); ok { + if mpegerr.NeedMore() { + saveReseved() + } + break + } + if bs.RemainBits() < 32 { + ret = io.ErrShortBuffer + saveReseved() + break + } + prefix_code := bs.NextBits(32) + switch prefix_code { + case 0x000001BA: //pack header + if psdemuxer.pkg.Header == nil { + psdemuxer.pkg.Header = new(mpeg2.PSPackHeader) + } + ret = psdemuxer.pkg.Header.Decode(bs) + psdemuxer.mpeg1 = psdemuxer.pkg.Header.IsMpeg1 + if psdemuxer.OnPacket != nil { + psdemuxer.OnPacket(psdemuxer.pkg.Header, ret) + } + case 0x000001BB: //system header + if psdemuxer.pkg.Header == nil { + panic("psdemuxer.pkg.Header must not be nil") + } + if psdemuxer.pkg.System == nil { + psdemuxer.pkg.System = new(mpeg2.System_header) + } + ret = psdemuxer.pkg.System.Decode(bs) + if psdemuxer.OnPacket != nil { + psdemuxer.OnPacket(psdemuxer.pkg.System, ret) + } + case 0x000001BC: //program stream map + if psdemuxer.pkg.Psm == nil { + psdemuxer.pkg.Psm = new(mpeg2.Program_stream_map) + } + if ret = psdemuxer.pkg.Psm.Decode(bs); ret == nil { + for _, streaminfo := range psdemuxer.pkg.Psm.Stream_map { + if _, found := psdemuxer.streamMap[streaminfo.Elementary_stream_id]; !found { + stream := newpsstream(streaminfo.Elementary_stream_id, mpeg2.PS_STREAM_TYPE(streaminfo.Stream_type)) + psdemuxer.streamMap[stream.sid] = stream + } + } + } + if psdemuxer.OnPacket != nil { + psdemuxer.OnPacket(psdemuxer.pkg.Psm, ret) + } + case 0x000001BD, 0x000001BE, 0x000001BF, 0x000001F0, 0x000001F1, + 0x000001F2, 0x000001F3, 0x000001F4, 0x000001F5, 0x000001F6, + 0x000001F7, 0x000001F8, 0x000001F9, 0x000001FA, 0x000001FB: + if psdemuxer.pkg.CommPes == nil { + psdemuxer.pkg.CommPes = new(mpeg2.CommonPesPacket) + } + ret = psdemuxer.pkg.CommPes.Decode(bs) + case 0x000001FF: //program stream directory + if psdemuxer.pkg.Psd == nil { + psdemuxer.pkg.Psd = new(mpeg2.Program_stream_directory) + } + ret = psdemuxer.pkg.Psd.Decode(bs) + case 0x000001B9: //MPEG_program_end_code + continue + default: + if prefix_code&0xFFFFFFE0 == 0x000001C0 || prefix_code&0xFFFFFFE0 == 0x000001E0 { + if psdemuxer.pkg.Pes == nil { + psdemuxer.pkg.Pes = mpeg2.NewPesPacket() + } + if psdemuxer.mpeg1 { + ret = psdemuxer.pkg.Pes.DecodeMpeg1(bs) + } else { + ret = psdemuxer.pkg.Pes.Decode(bs) + } + if psdemuxer.OnPacket != nil { + psdemuxer.OnPacket(psdemuxer.pkg.Pes, ret) + } + if ret == nil { + if stream, found := psdemuxer.streamMap[psdemuxer.pkg.Pes.Stream_id]; found { + if psdemuxer.mpeg1 && stream.cid == mpeg2.PS_STREAM_UNKNOW { + psdemuxer.guessCodecid(stream) + } + psdemuxer.demuxPespacket(stream, psdemuxer.pkg.Pes) + } else { + if psdemuxer.mpeg1 { + stream := newpsstream(psdemuxer.pkg.Pes.Stream_id, mpeg2.PS_STREAM_UNKNOW) + psdemuxer.streamMap[stream.sid] = stream + stream.streamBuf = append(stream.streamBuf, psdemuxer.pkg.Pes.Pes_payload...) + stream.pts = psdemuxer.pkg.Pes.Pts + stream.dts = psdemuxer.pkg.Pes.Dts + } + } + } + } else { + bs.SkipBits(8) + } + } + } + + if ret == nil && len(psdemuxer.cache) > 0 { + psdemuxer.cache = nil + } + + return ret +} + +func (psdemuxer *PSDemuxer) Reset() { + psdemuxer.cache = psdemuxer.cache[:0] + for _, stream := range psdemuxer.streamMap { + if len(stream.streamBuf) == 0 { + continue + } + + } +} + +func (psdemuxer *PSDemuxer) guessCodecid(stream *psstream) { + if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_AUDIO) { + stream.cid = mpeg2.PS_STREAM_AAC + } else if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_VIDEO) { + h264score := 0 + h265score := 0 + codec.SplitFrame(stream.streamBuf, func(nalu []byte) bool { + h264nalutype := codec.H264NaluTypeWithoutStartCode(nalu) + h265nalutype := codec.H265NaluTypeWithoutStartCode(nalu) + if h264nalutype == codec.H264_NAL_PPS || + h264nalutype == codec.H264_NAL_SPS || + h264nalutype == codec.H264_NAL_I_SLICE { + h264score += 2 + } else if h264nalutype < 5 { + h264score += 1 + } else if h264nalutype > 20 { + h264score -= 1 + } + + if h265nalutype == codec.H265_NAL_PPS || + h265nalutype == codec.H265_NAL_SPS || + h265nalutype == codec.H265_NAL_VPS || + (h265nalutype >= codec.H265_NAL_SLICE_BLA_W_LP && h265nalutype <= codec.H265_NAL_SLICE_CRA) { + h265score += 2 + } else if h265nalutype >= codec.H265_NAL_Slice_TRAIL_N && h265nalutype <= codec.H265_NAL_SLICE_RASL_R { + h265score += 1 + } else if h265nalutype > 40 { + h265score -= 1 + } + if h264score > h265score && h264score >= 4 { + stream.cid = mpeg2.PS_STREAM_H264 + } else if h264score < h265score && h265score >= 4 { + stream.cid = mpeg2.PS_STREAM_H265 + } + return true + }) + } +} + +func (psdemuxer *PSDemuxer) demuxPespacket(stream *psstream, pes *mpeg2.PesPacket) error { + switch stream.cid { + case mpeg2.PS_STREAM_AAC, mpeg2.PS_STREAM_G711A, mpeg2.PS_STREAM_G711U: + return psdemuxer.demuxAudio(stream, pes) + case mpeg2.PS_STREAM_H264, mpeg2.PS_STREAM_H265: + return psdemuxer.demuxAudio(stream, pes) + case mpeg2.PS_STREAM_UNKNOW: + if stream.pts != pes.Pts { + stream.streamBuf = nil + } + stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...) + stream.pts = pes.Pts + stream.dts = pes.Dts + } + return nil +} + +func (psdemuxer *PSDemuxer) demuxAudio(stream *psstream, pes *mpeg2.PesPacket) error { + if stream.pts != pes.Pts && len(stream.streamBuf) > 0 { + if psdemuxer.OnFrame != nil { + psdemuxer.OnFrame(stream.streamBuf, stream.cid, stream.pts, stream.dts) + } + stream.streamBuf = nil + // stream.streamBuf = stream.streamBuf[:0] + } + stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...) + stream.pts = pes.Pts + stream.dts = pes.Dts + return nil +} + +// func (psdemuxer *PSDemuxer) demuxH26x(stream *psstream, pes *mpeg2.PesPacket) error { +// if len(stream.streamBuf) == 0 { +// stream.pts = pes.Pts +// stream.dts = pes.Dts +// } +// stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...) +// start, sc := codec.FindStartCode(stream.streamBuf, 0) +// for start >= 0 { +// end, sc2 := codec.FindStartCode(stream.streamBuf, start+int(sc)) +// if end < 0 { +// break +// } +// if stream.cid == mpeg2.PS_STREAM_H264 { +// naluType := codec.H264NaluType(stream.streamBuf[start:]) +// if naluType != codec.H264_NAL_AUD { +// if psdemuxer.OnFrame != nil { +// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90) +// } +// } +// } else if stream.cid == mpeg2.PS_STREAM_H265 { +// naluType := codec.H265NaluType(stream.streamBuf[start:]) +// if naluType != codec.H265_NAL_AUD { +// if psdemuxer.OnFrame != nil { +// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90) +// } +// } +// } +// start = end +// sc = sc2 +// } +// stream.streamBuf = stream.streamBuf[start:] +// stream.pts = pes.Pts +// stream.dts = pes.Dts +// return nil +// } diff --git a/port.go b/port.go new file mode 100644 index 0000000..c37f7f6 --- /dev/null +++ b/port.go @@ -0,0 +1,6 @@ +package ps + +type MediaPort struct { + Protocol string + Port uint16 +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..2c60509 --- /dev/null +++ b/publisher.go @@ -0,0 +1,308 @@ +package ps + +import ( + "encoding/binary" + "io" + "net" + "os" + "time" + + "github.com/pion/rtp" + "github.com/yapingcat/gomedia/go-mpeg2" + "go.uber.org/zap" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/codec/mpegts" + "m7s.live/engine/v4/track" + . "m7s.live/engine/v4/track" + "m7s.live/engine/v4/util" + "m7s.live/plugin/ps/v4/mpegps" +) + +type cacheItem struct { + Seq uint16 + *util.ListItem[util.Buffer] +} + +type PSPublisher struct { + Publisher + relayTrack *track.Data + rtp.Packet + DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用 + // mpegps.MpegPsStream `json:"-" yaml:"-"` + // *mpegps.PSDemuxer `json:"-" yaml:"-"` + mpegps.DecPSPackage `json:"-" yaml:"-"` + reorder util.RTPReorder[*cacheItem] + pool util.BytesPool + lastSeq uint16 + lastReceive time.Time + dump *os.File + dumpLen []byte +} + +func (p *PSPublisher) OnEvent(event any) { + switch event.(type) { + case IPublisher: + p.dumpLen = make([]byte, 6) + if conf.RelayMode != 0 { + p.relayTrack = p.Stream.NewDataTrack("ps", nil) + p.relayTrack.Attach() + } + case SEclose, SEKick: + conf.streams.Delete(p.Stream.Path) + } + p.Publisher.OnEvent(event) +} + +func (p *PSPublisher) ServeTCP(conn *net.TCPConn) { + var err error + ps := make(util.Buffer, 1024) + p.SetIO(conn) + defer p.Stop() + tcpAddr := zap.String("tcp", conn.LocalAddr().String()) + p.Info("start receive ps stream from", tcpAddr) + defer p.Info("stop receive ps stream from", tcpAddr) + for err == nil { + if _, err = io.ReadFull(conn, p.dumpLen[:2]); err != nil { + return + } + ps.Relloc(int(binary.BigEndian.Uint16(p.dumpLen[:2]))) + if _, err = io.ReadFull(conn, ps); err != nil { + return + } + p.PushPS(ps) + } +} + +func (p *PSPublisher) ServeUDP(conn *net.UDPConn) { + p.SetIO(conn) + defer p.Stop() + bufUDP := make([]byte, 1024*1024) + udpAddr := zap.String("udp", conn.LocalAddr().String()) + p.Info("start receive ps stream from", udpAddr) + defer p.Info("stop receive ps stream from", udpAddr) + for { + conn.SetReadDeadline(time.Now().Add(time.Second * 10)) + n, _, err := conn.ReadFromUDP(bufUDP) + if err != nil { + return + } + p.PushPS(bufUDP[:n]) + } +} +func (p *PSPublisher) PushPS(ps util.Buffer) { + if conf.RelayMode != 0 { + item := p.pool.Get(len(ps)) + copy(item.Value, ps) + p.relayTrack.Push(item) + } + if conf.RelayMode != 1 { + if err := p.Unmarshal(ps); err != nil { + p.Error("gb28181 decode rtp error:", zap.Error(err)) + } else if !p.IsClosed() { + p.writeDump(ps) + } + p.pushPS() + } +} + +// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt +func (p *PSPublisher) pushPS() { + if p.Stream == nil { + return + } + if p.pool == nil { + // p.PSDemuxer = mpegps.NewPSDemuxer() + // p.PSDemuxer.OnPacket = p.OnPacket + // p.PSDemuxer.OnFrame = p.OnFrame + p.EsHandler = p + p.lastSeq = p.SequenceNumber - 1 + p.pool = make(util.BytesPool, 17) + } + if p.DisableReorder { + p.Feed(p.Packet.Payload) + p.lastSeq = p.SequenceNumber + } else { + item := p.pool.Get(len(p.Packet.Payload)) + copy(item.Value, p.Packet.Payload) + for rtpPacket := p.reorder.Push(p.SequenceNumber, &cacheItem{p.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() { + if rtpPacket.Seq != p.lastSeq+1 { + p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq)) + p.Reset() + if p.VideoTrack != nil { + p.SetLostFlag() + } + } + p.Feed(rtpPacket.Value) + p.lastSeq = rtpPacket.Seq + rtpPacket.Recycle() + } + } +} +func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) { + switch cid { + case mpeg2.PS_STREAM_AAC: + if p.AudioTrack != nil { + p.AudioTrack.WriteADTS(uint32(pts), frame) + } else { + p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool) + } + case mpeg2.PS_STREAM_G711A: + if p.AudioTrack != nil { + p.AudioTrack.WriteRaw(uint32(pts), frame) + } else { + p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool) + } + case mpeg2.PS_STREAM_G711U: + if p.AudioTrack != nil { + p.AudioTrack.WriteRaw(uint32(pts), frame) + } else { + p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool) + } + case mpeg2.PS_STREAM_H264: + if p.VideoTrack != nil { + // p.WriteNalu(uint32(pts), uint32(dts), frame) + p.WriteAnnexB(uint32(pts), uint32(dts), frame) + } else { + p.VideoTrack = NewH264(p.Publisher.Stream, p.pool) + } + case mpeg2.PS_STREAM_H265: + if p.VideoTrack != nil { + // p.WriteNalu(uint32(pts), uint32(dts), frame) + p.WriteAnnexB(uint32(pts), uint32(dts), frame) + } else { + p.VideoTrack = NewH265(p.Publisher.Stream, p.pool) + } + } +} + +func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) { + // switch value := pkg.(type) { + // case *mpeg2.PSPackHeader: + // // fd3.WriteString("--------------PS Pack Header--------------\n") + // if decodeResult == nil { + // // value.PrettyPrint(fd3) + // } else { + // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error())) + // } + // case *mpeg2.System_header: + // // fd3.WriteString("--------------System Header--------------\n") + // if decodeResult == nil { + // // value.PrettyPrint(fd3) + // } else { + // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error())) + // } + // case *mpeg2.Program_stream_map: + // // fd3.WriteString("--------------------PSM-------------------\n") + // if decodeResult == nil { + // // value.PrettyPrint(fd3) + // } else { + // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error())) + // } + // case *mpeg2.PesPacket: + // // fd3.WriteString("-------------------PES--------------------\n") + // if decodeResult == nil { + // // value.PrettyPrint(fd3) + // } else { + // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error())) + // } + // } +} + +func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) { + if !conf.PubVideo { + return + } + if p.VideoTrack == nil { + switch es.Type { + case mpegts.STREAM_TYPE_H264: + p.VideoTrack = NewH264(p.Publisher.Stream, p.pool) + case mpegts.STREAM_TYPE_H265: + p.VideoTrack = NewH265(p.Publisher.Stream, p.pool) + default: + //推测编码类型 + var maybe264 codec.H264NALUType + maybe264 = maybe264.Parse(es.Buffer[4]) + switch maybe264 { + case codec.NALU_Non_IDR_Picture, + codec.NALU_IDR_Picture, + codec.NALU_SEI, + codec.NALU_SPS, + codec.NALU_PPS, + codec.NALU_Access_Unit_Delimiter: + p.VideoTrack = NewH264(p.Publisher.Stream, p.pool) + default: + p.Info("maybe h265", zap.Uint8("type", maybe264.Byte())) + p.VideoTrack = NewH265(p.Publisher.Stream, p.pool) + } + } + } + payload, pts, dts := es.Buffer, es.PTS, es.DTS + if dts == 0 { + dts = pts + } + // if binary.BigEndian.Uint32(payload) != 1 { + // panic("not annexb") + // } + p.WriteAnnexB(pts, dts, payload) +} + +func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) { + if !conf.PubAudio { + return + } + ts, payload := es.PTS, es.Buffer + if p.AudioTrack == nil { + switch es.Type { + case mpegts.STREAM_TYPE_G711A: + p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool) + case mpegts.STREAM_TYPE_G711U: + p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool) + case mpegts.STREAM_TYPE_AAC: + p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool) + p.WriteADTS(ts, payload) + case 0: //推测编码类型 + if payload[0] == 0xff && payload[1]>>4 == 0xf { + p.AudioTrack = NewAAC(p.Publisher.Stream) + p.WriteADTS(ts, payload) + } + default: + p.Error("audio type not supported yet", zap.Uint8("type", es.Type)) + } + } else if es.Type == mpegts.STREAM_TYPE_AAC { + p.WriteADTS(ts, payload) + } else { + p.WriteRaw(ts, payload) + } +} +func (p *PSPublisher) writeDump(ps util.Buffer) { + if p.dump != nil { + util.PutBE(p.dumpLen[:4], ps.Len()) + if p.lastReceive.IsZero() { + util.PutBE(p.dumpLen[4:], 0) + } else { + util.PutBE(p.dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds())) + } + p.lastReceive = time.Now() + p.dump.Write(p.dumpLen) + p.dump.Write(ps) + } +} +func (p *PSPublisher) Replay(f *os.File) (err error) { + defer f.Close() + var t uint16 + for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) { + _, err = f.Read(l) + if err != nil { + return + } + payload := make([]byte, util.ReadBE[int](l[:4])) + t = util.ReadBE[uint16](l[4:]) + _, err = f.Read(payload) + if err != nil { + return + } + p.PushPS(payload) + } + return +}