first commit

This commit is contained in:
langhuihui
2023-05-11 21:13:01 +08:00
parent b7f55c4976
commit 2baf2ee5db
12 changed files with 1455 additions and 12 deletions

35
README.en.md Normal file
View File

@@ -0,0 +1,35 @@
_[简体中文](https://github.com/Monibuca/plugin-ps) | English_
# PS Plugin
enable receive MpegPS stream
## Plugin Address
https://github.com/Monibuca/plugin-ps
## Plugin Import
```go
import ( _ "m7s.live/plugin/ps/v4" )
```
## Default Config
```yaml
ps:
publish: # format refer to global config
relaymode: 1 # 0:relay only 1:protocol transfar only 2:relay and protocol transfar
```
## API
### receive PS stream
`/ps/api/receive?streamPath=xxx&ssrc=xxx&port=xxx&reuse=1&dump=xxx`
- reuse means whether to reuse port, if reuse port, please make sure the ssrc from device is same as ssrc parameter, otherwise it will cause mixed stream
- dump means whether to dump to file, if dump to file, it will generate a folder named dump in current directory, the folder contains a file named by streamPath parameter, the file content is the data received from port [4byte content length][2byte relative time][content]
### replay PS dump file
`/ps/api/replay?streamPath=xxx&dump=xxx`
- dump means the file to replay, default is dump/ps
- streamPath means the streamPath of replayed video stream, default is replay/dump/ps (if dump is abc, then streamPath is replay/abc)

View File

@@ -1,2 +1,34 @@
# plugin-ps
handle mpeg-ps stream for monibuca
_[English](https://github.com/Monibuca/plugin-ps/blob/v4/README.en.md) | 简体中文_
# PS 插件
支持接收MpegPS流
## 插件地址
https://github.com/Monibuca/plugin-ps
## 插件引入
```go
import ( _ "m7s.live/plugin/ps/v4" )
```
## 默认配置
```yaml
ps:
publish: # 格式参考全局配置
relaymode: 1 # 0:纯转发 1:转协议,不转发 2:转发并且转协议
```
## API
### 接收PS流
`/ps/api/receive?streamPath=xxx&ssrc=xxx&port=xxx&reuse=1&dump=xxx`
其中:
- reuse代表是否端口复用如果使用端口复用请务必确定设备发送的ssrc和ssrc参数一致否则会出现混流的情况
- dump代表是否dump到文件如果dump到文件会在当前目录下生成一个以dump为名的文件夹文件夹下面是以streamPath参数值为名的文件文件内容从端口收到的数据[4byte 内容长度][2byte 相对时间][内容]
### 回放PS的dump文件
`/ps/api/replay?streamPath=xxx&dump=xxx`
- dump 代表需要回放的文件默认是dump/ps
- streamPath 代表回放时生成的视频流的streamPath, 默认是replay/dump/ps (如果dump传了abc, 那么streamPath默认是replay/abc)

10
go.mod
View File

@@ -2,7 +2,12 @@ module m7s.live/plugin/ps/v4
go 1.19
require m7s.live/engine/v4 v4.12.7
require (
github.com/pion/rtp v1.7.13
github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274
go.uber.org/zap v1.23.0
m7s.live/engine/v4 v4.12.8
)
require (
github.com/aler9/gortsplib/v2 v2.2.2 // indirect
@@ -18,7 +23,6 @@ require (
github.com/mcuadros/go-defaults v1.2.0 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtp v1.7.13 // indirect
github.com/pion/webrtc/v3 v3.1.49 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
github.com/q191201771/naza v0.30.8 // indirect
@@ -29,11 +33,9 @@ require (
github.com/shirou/gopsutil/v3 v3.22.10 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.7.0 // indirect

18
go.sum
View File

@@ -1,6 +1,7 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aler9/gortsplib/v2 v2.2.2 h1:tTw8pdKSOEjlZjjE1S4ftXPHJkYOqjNNv3hjQ0Nto9M=
github.com/aler9/gortsplib/v2 v2.2.2/go.mod h1:k6uBVHGwsIc/0L5SLLqWwi6bSJUb4VR0HfvncyHlKQI=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -13,6 +14,7 @@ github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusO
github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo=
github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ=
github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI=
@@ -33,12 +35,14 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
@@ -53,6 +57,7 @@ github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuK
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
@@ -60,6 +65,7 @@ github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMn
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc=
github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
@@ -74,6 +80,7 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q=
github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ=
github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY=
github.com/pion/ice/v2 v2.2.12/go.mod h1:z2KXVFyRkmjetRlaVRgjO9U3ShKwzhlUylvxKfHfd5A=
@@ -101,6 +108,8 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M
github.com/pion/webrtc/v3 v3.1.49 h1:rbsNGxK9jMYts+xE6zYAJMUQHnGwmk/JYze8yttW+to=
github.com/pion/webrtc/v3 v3.1.49/go.mod h1:kHf/o47QW4No1rgpsFux/h7lUhtUnwFnSFDZOXeLapw=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c h1:NRoLoZvkBTKvR5gQLgA3e0hqjkY9u1wm+iOL45VN/qI=
@@ -129,6 +138,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
@@ -145,6 +155,7 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
@@ -227,6 +238,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -246,8 +258,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
@@ -261,5 +275,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
m7s.live/engine/v4 v4.12.7 h1:3qe4c4NNFtNRCuolHE9NOlo2WkhswhMOrVZj8bk6Goc=
m7s.live/engine/v4 v4.12.7/go.mod h1:LoALBfV5rmsz5TJQr6cmLxM33mfUE5BKBq/sMtXOVlc=
m7s.live/engine/v4 v4.12.8 h1:cNGyajzEkbUzIcPtcedGbxvMlIuScWxDb/raYFgAHKE=
m7s.live/engine/v4 v4.12.8/go.mod h1:LoALBfV5rmsz5TJQr6cmLxM33mfUE5BKBq/sMtXOVlc=

215
main.go
View File

@@ -1,15 +1,222 @@
package pluginps
package ps
import (
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/lang"
"m7s.live/engine/v4/util"
)
type PSConfig struct {
Publisher
config.Publish
RelayMode int // 转发模式,0:转协议+不转发,1:不转协议+转发2:转协议+转发
streams sync.Map
shareTCP sync.Map
shareUDP sync.Map
}
var conf = &PSConfig{}
var PSPlugin = InstallPlugin(conf)
func (c *PSConfig) OnEvent(event any) {
}
switch event.(type) {
case FirstConfig:
lang.Merge("zh", map[string]string{
"start receive ps stream from": "开始接收PS流来自",
"stop receive ps stream from": "停止接收PS流来自",
"ssrc not found": "未找到ssrc",
})
}
}
func (c *PSConfig) ServeTCP(conn *net.TCPConn) {
var err error
ps := make(util.Buffer, 1024)
tcpAddr := zap.String("tcp", conn.LocalAddr().String())
rtpLen := make([]byte, 2)
var puber *PSPublisher
if _, err = io.ReadFull(conn, rtpLen); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(rtpLen)))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
var rtpPacket rtp.Packet
if err := rtpPacket.Unmarshal(ps); err != nil {
PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err))
}
ssrc := rtpPacket.SSRC
if v, ok := conf.streams.Load(ssrc); ok {
puber = v.(*PSPublisher)
puber.Info("start receive ps stream from", tcpAddr)
defer puber.Info("stop receive ps stream from", tcpAddr)
defer puber.Stop()
for err == nil {
puber.PushPS(ps)
if _, err = io.ReadFull(conn, rtpLen); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(rtpLen)))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
}
} else {
PSPlugin.Error("ssrc not found", zap.Uint32("ssrc", ssrc))
}
}
func (c *PSConfig) ServeUDP(conn *net.UDPConn) {
bufUDP := make([]byte, 1024*1024)
udpAddr := zap.String("udp", conn.LocalAddr().String())
var rtpPacket rtp.Packet
PSPlugin.Info("start receive ps stream from", udpAddr)
defer PSPlugin.Info("stop receive ps stream from", udpAddr)
var lastSSRC uint32
var lastPubber *PSPublisher
for {
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
n, _, err := conn.ReadFromUDP(bufUDP)
if err != nil {
return
}
if err := rtpPacket.Unmarshal(bufUDP[:n]); err != nil {
PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err))
}
ssrc := rtpPacket.SSRC
if lastSSRC != ssrc {
if v, ok := conf.streams.Load(ssrc); ok {
lastSSRC = ssrc
lastPubber = v.(*PSPublisher)
} else {
PSPlugin.Error("ssrc not found", zap.Uint32("ssrc", ssrc))
continue
}
}
lastPubber.Packet = rtpPacket
lastPubber.pushPS()
}
}
func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) {
var pubber PSPublisher
if _, loaded := conf.streams.LoadOrStore(ssrc, &pubber); loaded {
return fmt.Errorf("ssrc %d already exists", ssrc)
} else {
if dump != "" {
pubber.dump, err = os.OpenFile(dump, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return
}
}
if err = PSPlugin.Publish(streamPath, &pubber); err == nil {
protocol, listenaddr, _ := strings.Cut(port, ":")
if !strings.Contains(listenaddr, ":") {
listenaddr = ":" + listenaddr
}
switch protocol {
case "tcp":
var tcpConf config.TCP
tcpConf.ListenAddr = listenaddr
if reuse {
if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok {
} else {
conf.streams.Store(ssrc, &pubber)
go tcpConf.Listen(PSPlugin, conf)
}
} else {
tcpConf.ListenNum = 1
go tcpConf.Listen(pubber, &pubber)
}
case "udp":
if reuse {
var udpConf struct {
*net.UDPConn
}
if _, ok := conf.shareUDP.LoadOrStore(listenaddr, &udpConf); ok {
} else {
udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
if err != nil {
PSPlugin.Error("udp listen error", zap.Error(err))
return err
}
udpConf.UDPConn = udpConn
conf.streams.Store(ssrc, &pubber)
go conf.ServeUDP(udpConn)
}
} else {
udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
if err != nil {
pubber.Stop()
return err
} else {
go pubber.ServeUDP(udpConn)
}
}
}
}
}
return
}
// 收流
func (c *PSConfig) API_receive(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
dump := query.Get("dump")
streamPath := query.Get("streamPath")
ssrc := query.Get("ssrc")
port := query.Get("port")
reuse := query.Get("reuse") // 是否复用端口
if _ssrc, err := strconv.ParseInt(ssrc, 10, 0); err == nil {
if err := Receive(streamPath, dump, port, uint32(_ssrc), reuse != ""); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
w.Write([]byte("ok"))
}
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func (c *PSConfig) API_replay(w http.ResponseWriter, r *http.Request) {
dump := r.URL.Query().Get("dump")
streamPath := r.URL.Query().Get("streamPath")
if dump == "" {
dump = "dump/ps"
}
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
if streamPath == "" {
if strings.HasPrefix(dump, "/") {
streamPath = "replay" + dump
} else {
streamPath = "replay/" + dump
}
}
var pub PSPublisher
pub.SetIO(f)
if err = PSPlugin.Publish(streamPath, &pub); err == nil {
go pub.Replay(f)
w.Write([]byte("ok"))
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}

112
mpegps/buffer.go Normal file
View File

@@ -0,0 +1,112 @@
package mpegps
import (
"encoding/binary"
"errors"
"io"
)
type IOBuffer struct {
buf []byte // contents are the bytes buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
}
func (b *IOBuffer) Next(n int) []byte {
m := b.Len()
if n > m {
n = m
}
data := b.buf[b.off : b.off+n]
b.off += n
return data
}
func (b *IOBuffer) Uint16() (uint16, error) {
if b.Len() > 1 {
return binary.BigEndian.Uint16(b.Next(2)), nil
}
return 0, io.EOF
}
func (b *IOBuffer) Skip(n int) (err error) {
_, err = b.ReadN(n)
return
}
func (b *IOBuffer) Uint32() (uint32, error) {
if b.Len() > 3 {
return binary.BigEndian.Uint32(b.Next(4)), nil
}
return 0, io.EOF
}
func (b *IOBuffer) ReadN(length int) ([]byte, error) {
if b.Len() >= length {
return b.Next(length), nil
}
return nil, io.EOF
}
//func (b *IOBuffer) Read(buf []byte) (n int, err error) {
// var ret []byte
// ret, err = b.ReadN(len(buf))
// copy(buf, ret)
// return len(ret), err
//}
// empty reports whether the unread portion of the buffer is empty.
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }
func (b *IOBuffer) ReadByte() (byte, error) {
if b.empty() {
// Buffer is empty, reset to recover space.
b.Reset()
return 0, io.EOF
}
c := b.buf[b.off]
b.off++
return c, nil
}
func (b *IOBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
}
func (b *IOBuffer) Len() int { return len(b.buf) - b.off }
// tryGrowByReslice is a inlineable version of grow for the fast-case where the
// internal buffer only needs to be resliced.
// It returns the index where bytes should be written and whether it succeeded.
func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
if l := len(b.buf); n <= cap(b.buf)-l {
b.buf = b.buf[:l+n]
return l, true
}
return 0, false
}
var ErrTooLarge = errors.New("IOBuffer: too large")
func (b *IOBuffer) Write(p []byte) (n int, err error) {
l := copy(b.buf, b.buf[b.off:])
b.buf = append(b.buf[:l], p...)
b.off = 0
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
return len(p), nil
// defer func() {
// if recover() != nil {
// panic(ErrTooLarge)
// }
// }()
// l := len(p)
// oldLen := len(b.buf)
// m, ok := b.tryGrowByReslice(l)
// if !ok {
// m = oldLen - b.off
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
// b.off = 0
// b.buf = buf
// }
// return copy(b.buf[m:], p), nil
}

250
mpegps/demuxer_v3.go Normal file
View File

@@ -0,0 +1,250 @@
package mpegps
import (
"errors"
"m7s.live/engine/v4/util"
)
var (
ErrNotFoundStartCode = errors.New("not found the need start code flag")
ErrMarkerBit = errors.New("marker bit value error")
ErrFormatPack = errors.New("not package standard")
ErrParsePakcet = errors.New("parse ps packet error")
)
/*
This implement from VLC source code
notes: https://github.com/videolan/vlc/blob/master/modules/mux/mpeg/bits.h
*/
/*
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
*/
type DecPSPackage struct {
systemClockReferenceBase uint64
systemClockReferenceExtension uint64
programMuxRate uint32
IOBuffer
Payload []byte
PTS uint32
DTS uint32
EsHandler
audio MpegPsEsStream
video MpegPsEsStream
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
dec.programMuxRate = 0
dec.Payload = nil
dec.PTS = 0
dec.DTS = 0
}
func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
payloadlen, err := dec.Uint16()
if err != nil {
return
}
return dec.ReadN(int(payloadlen))
}
func (dec *DecPSPackage) Feed(ps []byte) {
if len(ps) >= 4 && util.BigEndian.Uint32(ps) == StartCodePS {
if dec.Len() > 0 {
dec.Skip(4)
dec.Read(0)
dec.Reset()
}
dec.Write(ps)
} else if dec.Len() > 0 {
dec.Write(ps)
}
}
// read the buffer and push video or audio
func (dec *DecPSPackage) Read(ts uint32) error {
again:
dec.clean()
if err := dec.Skip(9); err != nil {
return err
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
var video []byte
var nextStartCode, videoTs, videoCts uint32
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
break
}
switch nextStartCode {
case StartCodeSYS:
dec.ReadPayload()
//err = dec.decSystemHeader()
case StartCodeMAP:
err = dec.decProgramStreamMap()
case StartCodeVideo:
// var cts uint32
if err = dec.decPESPacket(); err == nil {
if len(video) == 0 {
dec.video.PTS = dec.PTS
dec.video.DTS = dec.DTS
// if dec.PTS == 0 {
// dec.PTS = ts
// }
// if dec.DTS != 0 {
// cts = dec.PTS - dec.DTS
// } else {
// dec.DTS = dec.PTS
// }
// videoTs = dec.DTS / 90
// videoCts = cts / 90
}
video = append(video, dec.Payload...)
} else {
// utils.Println("video", err)
}
case StartCodeAudio:
if err = dec.decPESPacket(); err == nil {
// ts := ts / 90
// if dec.PTS != 0 {
// ts = dec.PTS / 90
// }
dec.audio.PTS = dec.PTS
dec.audio.Buffer = dec.Payload
dec.ReceiveAudio(dec.audio)
// pusher.PushAudio(ts, dec.Payload)
} else {
// utils.Println("audio", err)
}
case StartCodePS:
break loop
default:
dec.ReadPayload()
}
}
if len(video) > 0 {
dec.video.Buffer = video
dec.ReceiveVideo(dec.video)
if false {
println("video", videoTs, videoCts, len(video))
}
// pusher.PushVideo(videoTs, videoCts, video)
}
if nextStartCode == StartCodePS {
// utils.Println(aurora.Red("StartCodePS recursion..."), err)
goto again
}
return err
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
*/
func (dec *DecPSPackage) decProgramStreamMap() error {
psm, err := dec.ReadPayload()
if err != nil {
return err
}
l := len(psm)
index := 2
programStreamInfoLen := util.BigEndian.Uint16(psm[index:])
index += 2
index += int(programStreamInfoLen)
programStreamMapLen := util.BigEndian.Uint16(psm[index:])
index += 2
for programStreamMapLen > 0 {
if l <= index+1 {
break
}
streamType := psm[index]
index++
elementaryStreamID := psm[index]
index++
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
dec.video.Type = streamType
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
dec.audio.Type = streamType
}
if l <= index+1 {
break
}
elementaryStreamInfoLength := util.BigEndian.Uint16(psm[index:])
index += 2
index += int(elementaryStreamInfoLength)
programStreamMapLen -= 4 + elementaryStreamInfoLength
}
return nil
}
func (dec *DecPSPackage) decPESPacket() error {
payload, err := dec.ReadPayload()
if err != nil {
return err
}
if len(payload) < 4 {
return errors.New("not enough data")
}
//data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1
flag := payload[1]
ptsFlag := flag>>7 == 1
dtsFlag := (flag&0b0100_0000)>>6 == 1
var pts, dts uint32
pesHeaderDataLen := payload[2]
payload = payload[3:]
extraData := payload[:pesHeaderDataLen]
if ptsFlag && len(extraData) > 4 {
pts = uint32(extraData[0]&0b0000_1110) << 29
pts += uint32(extraData[1]) << 22
pts += uint32(extraData[2]&0b1111_1110) << 14
pts += uint32(extraData[3]) << 7
pts += uint32(extraData[4]) >> 1
if dtsFlag && len(extraData) > 9 {
dts = uint32(extraData[5]&0b0000_1110) << 29
dts += uint32(extraData[6]) << 22
dts += uint32(extraData[7]&0b1111_1110) << 14
dts += uint32(extraData[8]) << 7
dts += uint32(extraData[9]) >> 1
}
}
dec.PTS = pts
dec.DTS = dts
dec.Payload = payload[pesHeaderDataLen:]
return err
}

159
mpegps/mpegps.go Normal file
View File

@@ -0,0 +1,159 @@
package mpegps
import (
"encoding/binary"
"errors"
"io"
"m7s.live/engine/v4/util"
)
const (
StartCodePS = 0x000001ba
StartCodeSYS = 0x000001bb
StartCodeMAP = 0x000001bc
StartCodeVideo = 0x000001e0
StartCodeAudio = 0x000001c0
PrivateStreamCode = 0x000001bd
MEPGProgramEndCode = 0x000001b9
)
type EsHandler interface {
ReceiveAudio(MpegPsEsStream)
ReceiveVideo(MpegPsEsStream)
}
type MpegPsEsStream struct {
Type byte
util.Buffer
PTS uint32
DTS uint32
}
type MpegPsStream struct {
buffer util.Buffer
EsHandler
audio MpegPsEsStream
video MpegPsEsStream
}
func (ps *MpegPsStream) Reset() {
ps.buffer.Reset()
ps.audio.Reset()
if ps.video.Buffer.CanRead() {
ps.ReceiveVideo(ps.video)
ps.video.Buffer = make(util.Buffer, 0)
} else {
ps.video.Reset()
}
}
func (ps *MpegPsStream) Feed(data util.Buffer) (err error) {
reader := &data
if ps.buffer.CanRead() {
ps.buffer.Write(data)
reader = &ps.buffer
}
var begin util.Buffer
var payload []byte
defer func() {
if err != nil && begin.CanRead() {
ps.buffer.Reset()
ps.buffer.Write(begin)
}
}()
for err == nil && reader.CanReadN(4) {
begin = *reader
code := reader.ReadUint32()
switch code {
case StartCodePS:
if ps.audio.Buffer.CanRead() {
ps.ReceiveAudio(ps.audio)
ps.audio.Buffer = make(util.Buffer, 0)
}
if ps.video.Buffer.CanRead() {
ps.ReceiveVideo(ps.video)
ps.video.Buffer = make(util.Buffer, 0)
}
if reader.CanReadN(9) {
reader.ReadN(9)
if reader.CanRead() {
psl := reader.ReadByte() & 0x07
if reader.CanReadN(int(psl)) {
reader.ReadN(int(psl))
continue
}
}
}
err = io.ErrShortBuffer
case StartCodeSYS, PrivateStreamCode:
_, err = ps.ReadPayload(reader)
case StartCodeMAP:
err = ps.decProgramStreamMap(reader)
case StartCodeVideo:
payload, err = ps.ReadPayload(reader)
if err == nil {
err = ps.video.parsePESPacket(payload)
}
case StartCodeAudio:
payload, err = ps.ReadPayload(reader)
if err == nil {
err = ps.audio.parsePESPacket(payload)
}
case MEPGProgramEndCode:
return
default:
err = errors.New("start code error")
}
}
return
}
func (ps *MpegPsStream) ReadPayload(data *util.Buffer) (payload []byte, err error) {
if !data.CanReadN(2) {
return nil, io.ErrShortBuffer
}
payloadlen := data.ReadUint16()
if data.CanReadN(int(payloadlen)) {
payload = data.ReadN(int(payloadlen))
} else {
err = io.ErrShortBuffer
}
return
}
func (ps *MpegPsStream) decProgramStreamMap(data *util.Buffer) error {
psm, err := ps.ReadPayload(data)
if err != nil {
return err
}
l := len(psm)
index := 2
programStreamInfoLen := binary.BigEndian.Uint16(psm[index:])
index += 2
index += int(programStreamInfoLen)
programStreamMapLen := binary.BigEndian.Uint16(psm[index:])
index += 2
for programStreamMapLen > 0 {
if l <= index+1 {
break
}
streamType := psm[index]
index++
elementaryStreamID := psm[index]
index++
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
ps.video.Type = streamType
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
ps.audio.Type = streamType
}
if l <= index+1 {
break
}
elementaryStreamInfoLength := binary.BigEndian.Uint16(psm[index:])
index += 2
index += int(elementaryStreamInfoLength)
programStreamMapLen -= 4 + elementaryStreamInfoLength
}
return nil
}

35
mpegps/mpegps_pes.go Normal file
View File

@@ -0,0 +1,35 @@
package mpegps
import "io"
func (es *MpegPsEsStream) parsePESPacket(payload []byte) (err error) {
if len(payload) < 4 {
return io.ErrShortBuffer
}
//data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1
flag := payload[1]
ptsFlag := flag>>7 == 1
dtsFlag := (flag&0b0100_0000)>>6 == 1
pesHeaderDataLen := payload[2]
if len(payload) < int(pesHeaderDataLen) {
return io.ErrShortBuffer
}
payload = payload[3:]
extraData := payload[:pesHeaderDataLen]
if ptsFlag && len(extraData) > 4 {
es.PTS = uint32(extraData[0]&0b0000_1110) << 29
es.PTS += uint32(extraData[1]) << 22
es.PTS += uint32(extraData[2]&0b1111_1110) << 14
es.PTS += uint32(extraData[3]) << 7
es.PTS += uint32(extraData[4]) >> 1
if dtsFlag && len(extraData) > 9 {
es.DTS = uint32(extraData[5]&0b0000_1110) << 29
es.DTS += uint32(extraData[6]) << 22
es.DTS += uint32(extraData[7]&0b1111_1110) << 14
es.DTS += uint32(extraData[8]) << 7
es.DTS += uint32(extraData[9]) >> 1
}
}
es.Write(payload[pesHeaderDataLen:])
return
}

283
mpegps/ps-demuxer.go Normal file
View File

@@ -0,0 +1,283 @@
package mpegps
import (
"io"
"github.com/yapingcat/gomedia/go-codec"
"github.com/yapingcat/gomedia/go-mpeg2"
)
type psstream struct {
sid uint8
cid mpeg2.PS_STREAM_TYPE
pts uint64
dts uint64
streamBuf []byte
}
func newpsstream(sid uint8, cid mpeg2.PS_STREAM_TYPE) *psstream {
return &psstream{
sid: sid,
cid: cid,
streamBuf: make([]byte, 0, 4096),
}
}
type PSDemuxer struct {
streamMap map[uint8]*psstream
pkg *mpeg2.PSPacket
mpeg1 bool
cache []byte
OnFrame func(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64)
//解ps包过程中解码回调psmsystem headerpes包等
//decodeResult 解码ps包时的产生的错误
//这个回调主要用于debug查看是否ps包存在问题
OnPacket func(pkg mpeg2.Display, decodeResult error)
}
func NewPSDemuxer() *PSDemuxer {
return &PSDemuxer{
streamMap: make(map[uint8]*psstream),
pkg: new(mpeg2.PSPacket),
cache: make([]byte, 0, 256),
OnFrame: nil,
OnPacket: nil,
}
}
func (psdemuxer *PSDemuxer) Feed(data []byte) error {
var bs *codec.BitStream
if len(psdemuxer.cache) > 0 {
psdemuxer.cache = append(psdemuxer.cache, data...)
bs = codec.NewBitStream(psdemuxer.cache)
} else {
bs = codec.NewBitStream(data)
}
saveReseved := func() {
tmpcache := make([]byte, bs.RemainBytes())
copy(tmpcache, bs.RemainData())
psdemuxer.cache = tmpcache
}
var ret error = nil
for !bs.EOS() {
if mpegerr, ok := ret.(mpeg2.Error); ok {
if mpegerr.NeedMore() {
saveReseved()
}
break
}
if bs.RemainBits() < 32 {
ret = io.ErrShortBuffer
saveReseved()
break
}
prefix_code := bs.NextBits(32)
switch prefix_code {
case 0x000001BA: //pack header
if psdemuxer.pkg.Header == nil {
psdemuxer.pkg.Header = new(mpeg2.PSPackHeader)
}
ret = psdemuxer.pkg.Header.Decode(bs)
psdemuxer.mpeg1 = psdemuxer.pkg.Header.IsMpeg1
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Header, ret)
}
case 0x000001BB: //system header
if psdemuxer.pkg.Header == nil {
panic("psdemuxer.pkg.Header must not be nil")
}
if psdemuxer.pkg.System == nil {
psdemuxer.pkg.System = new(mpeg2.System_header)
}
ret = psdemuxer.pkg.System.Decode(bs)
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.System, ret)
}
case 0x000001BC: //program stream map
if psdemuxer.pkg.Psm == nil {
psdemuxer.pkg.Psm = new(mpeg2.Program_stream_map)
}
if ret = psdemuxer.pkg.Psm.Decode(bs); ret == nil {
for _, streaminfo := range psdemuxer.pkg.Psm.Stream_map {
if _, found := psdemuxer.streamMap[streaminfo.Elementary_stream_id]; !found {
stream := newpsstream(streaminfo.Elementary_stream_id, mpeg2.PS_STREAM_TYPE(streaminfo.Stream_type))
psdemuxer.streamMap[stream.sid] = stream
}
}
}
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Psm, ret)
}
case 0x000001BD, 0x000001BE, 0x000001BF, 0x000001F0, 0x000001F1,
0x000001F2, 0x000001F3, 0x000001F4, 0x000001F5, 0x000001F6,
0x000001F7, 0x000001F8, 0x000001F9, 0x000001FA, 0x000001FB:
if psdemuxer.pkg.CommPes == nil {
psdemuxer.pkg.CommPes = new(mpeg2.CommonPesPacket)
}
ret = psdemuxer.pkg.CommPes.Decode(bs)
case 0x000001FF: //program stream directory
if psdemuxer.pkg.Psd == nil {
psdemuxer.pkg.Psd = new(mpeg2.Program_stream_directory)
}
ret = psdemuxer.pkg.Psd.Decode(bs)
case 0x000001B9: //MPEG_program_end_code
continue
default:
if prefix_code&0xFFFFFFE0 == 0x000001C0 || prefix_code&0xFFFFFFE0 == 0x000001E0 {
if psdemuxer.pkg.Pes == nil {
psdemuxer.pkg.Pes = mpeg2.NewPesPacket()
}
if psdemuxer.mpeg1 {
ret = psdemuxer.pkg.Pes.DecodeMpeg1(bs)
} else {
ret = psdemuxer.pkg.Pes.Decode(bs)
}
if psdemuxer.OnPacket != nil {
psdemuxer.OnPacket(psdemuxer.pkg.Pes, ret)
}
if ret == nil {
if stream, found := psdemuxer.streamMap[psdemuxer.pkg.Pes.Stream_id]; found {
if psdemuxer.mpeg1 && stream.cid == mpeg2.PS_STREAM_UNKNOW {
psdemuxer.guessCodecid(stream)
}
psdemuxer.demuxPespacket(stream, psdemuxer.pkg.Pes)
} else {
if psdemuxer.mpeg1 {
stream := newpsstream(psdemuxer.pkg.Pes.Stream_id, mpeg2.PS_STREAM_UNKNOW)
psdemuxer.streamMap[stream.sid] = stream
stream.streamBuf = append(stream.streamBuf, psdemuxer.pkg.Pes.Pes_payload...)
stream.pts = psdemuxer.pkg.Pes.Pts
stream.dts = psdemuxer.pkg.Pes.Dts
}
}
}
} else {
bs.SkipBits(8)
}
}
}
if ret == nil && len(psdemuxer.cache) > 0 {
psdemuxer.cache = nil
}
return ret
}
func (psdemuxer *PSDemuxer) Reset() {
psdemuxer.cache = psdemuxer.cache[:0]
for _, stream := range psdemuxer.streamMap {
if len(stream.streamBuf) == 0 {
continue
}
}
}
func (psdemuxer *PSDemuxer) guessCodecid(stream *psstream) {
if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_AUDIO) {
stream.cid = mpeg2.PS_STREAM_AAC
} else if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_VIDEO) {
h264score := 0
h265score := 0
codec.SplitFrame(stream.streamBuf, func(nalu []byte) bool {
h264nalutype := codec.H264NaluTypeWithoutStartCode(nalu)
h265nalutype := codec.H265NaluTypeWithoutStartCode(nalu)
if h264nalutype == codec.H264_NAL_PPS ||
h264nalutype == codec.H264_NAL_SPS ||
h264nalutype == codec.H264_NAL_I_SLICE {
h264score += 2
} else if h264nalutype < 5 {
h264score += 1
} else if h264nalutype > 20 {
h264score -= 1
}
if h265nalutype == codec.H265_NAL_PPS ||
h265nalutype == codec.H265_NAL_SPS ||
h265nalutype == codec.H265_NAL_VPS ||
(h265nalutype >= codec.H265_NAL_SLICE_BLA_W_LP && h265nalutype <= codec.H265_NAL_SLICE_CRA) {
h265score += 2
} else if h265nalutype >= codec.H265_NAL_Slice_TRAIL_N && h265nalutype <= codec.H265_NAL_SLICE_RASL_R {
h265score += 1
} else if h265nalutype > 40 {
h265score -= 1
}
if h264score > h265score && h264score >= 4 {
stream.cid = mpeg2.PS_STREAM_H264
} else if h264score < h265score && h265score >= 4 {
stream.cid = mpeg2.PS_STREAM_H265
}
return true
})
}
}
func (psdemuxer *PSDemuxer) demuxPespacket(stream *psstream, pes *mpeg2.PesPacket) error {
switch stream.cid {
case mpeg2.PS_STREAM_AAC, mpeg2.PS_STREAM_G711A, mpeg2.PS_STREAM_G711U:
return psdemuxer.demuxAudio(stream, pes)
case mpeg2.PS_STREAM_H264, mpeg2.PS_STREAM_H265:
return psdemuxer.demuxAudio(stream, pes)
case mpeg2.PS_STREAM_UNKNOW:
if stream.pts != pes.Pts {
stream.streamBuf = nil
}
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
stream.pts = pes.Pts
stream.dts = pes.Dts
}
return nil
}
func (psdemuxer *PSDemuxer) demuxAudio(stream *psstream, pes *mpeg2.PesPacket) error {
if stream.pts != pes.Pts && len(stream.streamBuf) > 0 {
if psdemuxer.OnFrame != nil {
psdemuxer.OnFrame(stream.streamBuf, stream.cid, stream.pts, stream.dts)
}
stream.streamBuf = nil
// stream.streamBuf = stream.streamBuf[:0]
}
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
stream.pts = pes.Pts
stream.dts = pes.Dts
return nil
}
// func (psdemuxer *PSDemuxer) demuxH26x(stream *psstream, pes *mpeg2.PesPacket) error {
// if len(stream.streamBuf) == 0 {
// stream.pts = pes.Pts
// stream.dts = pes.Dts
// }
// stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
// start, sc := codec.FindStartCode(stream.streamBuf, 0)
// for start >= 0 {
// end, sc2 := codec.FindStartCode(stream.streamBuf, start+int(sc))
// if end < 0 {
// break
// }
// if stream.cid == mpeg2.PS_STREAM_H264 {
// naluType := codec.H264NaluType(stream.streamBuf[start:])
// if naluType != codec.H264_NAL_AUD {
// if psdemuxer.OnFrame != nil {
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
// }
// }
// } else if stream.cid == mpeg2.PS_STREAM_H265 {
// naluType := codec.H265NaluType(stream.streamBuf[start:])
// if naluType != codec.H265_NAL_AUD {
// if psdemuxer.OnFrame != nil {
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
// }
// }
// }
// start = end
// sc = sc2
// }
// stream.streamBuf = stream.streamBuf[start:]
// stream.pts = pes.Pts
// stream.dts = pes.Dts
// return nil
// }

6
port.go Normal file
View File

@@ -0,0 +1,6 @@
package ps
type MediaPort struct {
Protocol string
Port uint16
}

308
publisher.go Normal file
View File

@@ -0,0 +1,308 @@
package ps
import (
"encoding/binary"
"io"
"net"
"os"
"time"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/go-mpeg2"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/track"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/ps/v4/mpegps"
)
type cacheItem struct {
Seq uint16
*util.ListItem[util.Buffer]
}
type PSPublisher struct {
Publisher
relayTrack *track.Data
rtp.Packet
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
// mpegps.MpegPsStream `json:"-" yaml:"-"`
// *mpegps.PSDemuxer `json:"-" yaml:"-"`
mpegps.DecPSPackage `json:"-" yaml:"-"`
reorder util.RTPReorder[*cacheItem]
pool util.BytesPool
lastSeq uint16
lastReceive time.Time
dump *os.File
dumpLen []byte
}
func (p *PSPublisher) OnEvent(event any) {
switch event.(type) {
case IPublisher:
p.dumpLen = make([]byte, 6)
if conf.RelayMode != 0 {
p.relayTrack = p.Stream.NewDataTrack("ps", nil)
p.relayTrack.Attach()
}
case SEclose, SEKick:
conf.streams.Delete(p.Stream.Path)
}
p.Publisher.OnEvent(event)
}
func (p *PSPublisher) ServeTCP(conn *net.TCPConn) {
var err error
ps := make(util.Buffer, 1024)
p.SetIO(conn)
defer p.Stop()
tcpAddr := zap.String("tcp", conn.LocalAddr().String())
p.Info("start receive ps stream from", tcpAddr)
defer p.Info("stop receive ps stream from", tcpAddr)
for err == nil {
if _, err = io.ReadFull(conn, p.dumpLen[:2]); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(p.dumpLen[:2])))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
p.PushPS(ps)
}
}
func (p *PSPublisher) ServeUDP(conn *net.UDPConn) {
p.SetIO(conn)
defer p.Stop()
bufUDP := make([]byte, 1024*1024)
udpAddr := zap.String("udp", conn.LocalAddr().String())
p.Info("start receive ps stream from", udpAddr)
defer p.Info("stop receive ps stream from", udpAddr)
for {
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
n, _, err := conn.ReadFromUDP(bufUDP)
if err != nil {
return
}
p.PushPS(bufUDP[:n])
}
}
func (p *PSPublisher) PushPS(ps util.Buffer) {
if conf.RelayMode != 0 {
item := p.pool.Get(len(ps))
copy(item.Value, ps)
p.relayTrack.Push(item)
}
if conf.RelayMode != 1 {
if err := p.Unmarshal(ps); err != nil {
p.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps)
}
p.pushPS()
}
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *PSPublisher) pushPS() {
if p.Stream == nil {
return
}
if p.pool == nil {
// p.PSDemuxer = mpegps.NewPSDemuxer()
// p.PSDemuxer.OnPacket = p.OnPacket
// p.PSDemuxer.OnFrame = p.OnFrame
p.EsHandler = p
p.lastSeq = p.SequenceNumber - 1
p.pool = make(util.BytesPool, 17)
}
if p.DisableReorder {
p.Feed(p.Packet.Payload)
p.lastSeq = p.SequenceNumber
} else {
item := p.pool.Get(len(p.Packet.Payload))
copy(item.Value, p.Packet.Payload)
for rtpPacket := p.reorder.Push(p.SequenceNumber, &cacheItem{p.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() {
if rtpPacket.Seq != p.lastSeq+1 {
p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq))
p.Reset()
if p.VideoTrack != nil {
p.SetLostFlag()
}
}
p.Feed(rtpPacket.Value)
p.lastSeq = rtpPacket.Seq
rtpPacket.Recycle()
}
}
}
func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) {
switch cid {
case mpeg2.PS_STREAM_AAC:
if p.AudioTrack != nil {
p.AudioTrack.WriteADTS(uint32(pts), frame)
} else {
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
}
case mpeg2.PS_STREAM_G711A:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
}
case mpeg2.PS_STREAM_G711U:
if p.AudioTrack != nil {
p.AudioTrack.WriteRaw(uint32(pts), frame)
} else {
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
}
case mpeg2.PS_STREAM_H264:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
}
case mpeg2.PS_STREAM_H265:
if p.VideoTrack != nil {
// p.WriteNalu(uint32(pts), uint32(dts), frame)
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
} else {
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
}
}
}
func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
// switch value := pkg.(type) {
// case *mpeg2.PSPackHeader:
// // fd3.WriteString("--------------PS Pack Header--------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.System_header:
// // fd3.WriteString("--------------System Header--------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.Program_stream_map:
// // fd3.WriteString("--------------------PSM-------------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// case *mpeg2.PesPacket:
// // fd3.WriteString("-------------------PES--------------------\n")
// if decodeResult == nil {
// // value.PrettyPrint(fd3)
// } else {
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
// }
// }
}
func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if !conf.PubVideo {
return
}
if p.VideoTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
case mpegts.STREAM_TYPE_H265:
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
default:
//推测编码类型
var maybe264 codec.H264NALUType
maybe264 = maybe264.Parse(es.Buffer[4])
switch maybe264 {
case codec.NALU_Non_IDR_Picture,
codec.NALU_IDR_Picture,
codec.NALU_SEI,
codec.NALU_SPS,
codec.NALU_PPS,
codec.NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
}
}
}
payload, pts, dts := es.Buffer, es.PTS, es.DTS
if dts == 0 {
dts = pts
}
// if binary.BigEndian.Uint32(payload) != 1 {
// panic("not annexb")
// }
p.WriteAnnexB(pts, dts, payload)
}
func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
if !conf.PubAudio {
return
}
ts, payload := es.PTS, es.Buffer
if p.AudioTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_G711A:
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
case mpegts.STREAM_TYPE_G711U:
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
p.WriteADTS(ts, payload)
case 0: //推测编码类型
if payload[0] == 0xff && payload[1]>>4 == 0xf {
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(ts, payload)
}
default:
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
}
} else if es.Type == mpegts.STREAM_TYPE_AAC {
p.WriteADTS(ts, payload)
} else {
p.WriteRaw(ts, payload)
}
}
func (p *PSPublisher) writeDump(ps util.Buffer) {
if p.dump != nil {
util.PutBE(p.dumpLen[:4], ps.Len())
if p.lastReceive.IsZero() {
util.PutBE(p.dumpLen[4:], 0)
} else {
util.PutBE(p.dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dump.Write(p.dumpLen)
p.dump.Write(ps)
}
}
func (p *PSPublisher) Replay(f *os.File) (err error) {
defer f.Close()
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
if err != nil {
return
}
payload := make([]byte, util.ReadBE[int](l[:4]))
t = util.ReadBE[uint16](l[4:])
_, err = f.Read(payload)
if err != nil {
return
}
p.PushPS(payload)
}
return
}