Compare commits

...

16 Commits
dev ... v3

Author SHA1 Message Date
dexter
a331359e97 增加流超时再次重连功能 2021-10-14 10:00:55 +08:00
dexter
bbd668796e 优化重连逻辑 2021-10-11 15:40:50 +08:00
dexter
107b4e8941 AACPayloader 2021-10-06 20:43:18 +08:00
dexter
ac8aa96350 format 2021-10-06 09:22:59 +08:00
dexter
f267b1ca52 rtp依赖1.6.5不能用1.7版本 2021-08-08 08:07:22 +08:00
dexter
229370c083 更改类型适配pion的rtp类型升级 2021-08-08 07:40:52 +08:00
dexter
bb1e8ba1d8 适配3.3 2021-08-07 22:00:28 +08:00
dexter
8cf3e0c0fc 增加对publisher的非空判断 2021-08-04 15:27:55 +08:00
dexter
1ecb45d904 修改readme 2021-08-03 15:33:55 +08:00
dexter
3ea5bb7f27 更新readme 2021-08-02 09:21:38 +08:00
langhuihui
9aec4ec4be 防止json循环引用 2021-07-24 11:38:24 +08:00
langhuihui
da2fc9d462 更新重连逻辑 2021-07-24 09:38:22 +08:00
李宇翔
f68a3ee14b 实现rtsp拉流播放 2021-07-19 20:07:01 +08:00
langhuihui
a2f5cb87b1 修复音频初始化问题 2021-07-12 23:24:04 +08:00
langhuihui
5cdbc220de 修改rtsp自动拉流配置结构 2021-07-11 21:43:15 +08:00
langhuihui
f0a00f3db9 更新readme 2021-07-10 17:52:50 +08:00
8 changed files with 351 additions and 157 deletions

View File

@@ -1,29 +1,63 @@
# Monibuca 的RTSP 插件
# RTSP插件
主要功能是提供RTSP的端口监听接受RTSP推流以及对RTSP地址进行拉流转发
## 插件地址
## 插件名称
github.com/Monibuca/plugin-rtsp
RTSP
## 插件引入
```go
import (
_ "github.com/Monibuca/plugin-rtsp"
)
```
## 默认插件配置
## 配置
```toml
[RTSP]
ListenAddr = ":554"
BufferLength = 2048
AutoPull = false
RemoteAddr = "rtsp://localhost/${streamPath}"
[[RTSP.AutoPullList]]
URL = "rtsp://admin:admin@192.168.1.212:554/cam/realmonitor?channel=1&subtype=1"
StreamPath = "live/rtsp"
# 端口接收推流
ListenAddr = ":554"
Reconnect = true
[RTSP.AutoPullList]
"live/rtsp1" = "rtsp://admin:admin@192.168.1.212:554/cam/realmonitor?channel=1&subtype=1"
"live/rtsp2" = "rtsp://admin:admin@192.168.1.212:554/cam/realmonitor?channel=2&subtype=1"
```
- ListenAddr 是监听端口可以将rtsp流推到Monibuca中
- BufferLength是指解析拉取的rtp包的缓冲大小
- AutoPull是指当有用户订阅一个新流的时候自动向远程拉流转发
- RemoteAddr 指远程拉流地址,其中${streamPath}是占位符,实际使用流路径替换。
- AutoPullList 是一个数组如果配置了该数组则会在程序启动时自动启动拉流StreamPath一定要是唯一的不能重复
## 使用方法(拉流转发)
- `ListenAddr`是监听的地址
- `Reconnect` 是否自动重连
- `RTSP.AutoPullList` 可以配置多项用于自动拉流key是streamPathvalue是远程rtsp地址
### 特殊功能
当自动拉流列表中当的streamPath为sub/xxx 这种形式的话在gb28181的分屏显示时会优先采用rtsp流已实现分屏观看子码流效果
## 插件功能
### 接收RTSP协议的推流
例如通过ffmpeg向m7s进行推流
```bash
ffmpeg -i **** rtsp://localhost/live/test
```
会在m7s内部形成一个名为live/test的流
### 从远程拉取rtsp到m7s中
可调用接口
`/api/rtsp/pull?target=[RTSP地址]&streamPath=[流标识]`
## 使用编程方式拉流
```go
new(RTSP).PullStream("live/user1","rtsp://xxx.xxx.xxx.xxx/live/user1")
```
```
### 罗列所有的rtsp协议的流
可调用接口
`/api/rtsp/list`
### 从m7s中拉取rtsp协议流
直接通过协议rtsp://xxx.xxx.xxx.xxx/live/user1 即可播放
> h265和aac 编码拉流尚未实现,敬请期待

View File

@@ -25,6 +25,7 @@ func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (err error) {
rtsp.Stream = &Stream{
StreamPath: streamPath,
Type: "RTSP Pull",
ExtraProp: rtsp,
}
if result := rtsp.Publish(); result {
rtsp.TransType = TRANS_TYPE_TCP
@@ -34,13 +35,26 @@ func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (err error) {
rtsp.aRTPControlChannel = 3
rtsp.URL = rtspUrl
rtsp.UDPServer = &UDPServer{Session: rtsp}
if err = rtsp.requestStream(); err != nil {
Println(err)
rtsp.Close()
return
if config.Reconnect {
go func() {
for rtsp.Err() == nil {
rtsp.RTSPClientInfo = RTSPClientInfo{}
Printf("reconnecting:%s in 5 seconds", rtspUrl)
time.Sleep(time.Second * 5)
rtsp.startStream()
}
rtsp.Stop()
if rtsp.IsTimeout {
go rtsp.PullStream(streamPath, rtspUrl)
}
}()
} else {
rtsp.RTSPClientInfo = RTSPClientInfo{}
go func() {
rtsp.startStream()
rtsp.Stop()
}()
}
go rtsp.startStream()
collection.Store(streamPath, rtsp)
return
}
return errors.New("publish badname")
@@ -157,7 +171,7 @@ func (client *RTSP) requestStream() (err error) {
client.Conn = &timeoutConn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer))
headers := make(map[string]string)
headers := map[string]string{}
//headers["Require"] = "implicit-play"
// An OPTIONS request returns the request types the server will accept.
resp, err := client.Request("OPTIONS", headers)
@@ -266,34 +280,15 @@ func (client *RTSP) requestStream() (err error) {
}
func (client *RTSP) startStream() {
if client.Err() != nil {
return
}
startTime := time.Now()
//loggerTime := time.Now().Add(-10 * time.Second)
defer func() {
if client.Err() == nil && config.Reconnect {
Printf("reconnecting:%s", client.URL)
client.RTSPClientInfo = RTSPClientInfo{}
if err := client.requestStream(); err != nil {
t := time.NewTicker(time.Second * 5)
for {
Printf("reconnecting:%s in 5 seconds", client.URL)
select {
case <-client.Done():
client.Stop()
return
case <-t.C:
if err = client.requestStream(); err == nil {
go client.startStream()
return
}
}
}
} else {
go client.startStream()
}
} else {
client.Stop()
}
}()
if err := client.requestStream(); err != nil {
Printf("rtsp requestStream err:%v", err)
return
}
for client.Err() == nil {
if time.Since(startTime) > time.Minute {
startTime = time.Now()
@@ -310,30 +305,33 @@ func (client *RTSP) startStream() {
return
}
switch b {
case 0x24: // rtp
header := make([]byte, 4)
header[0] = b
_, err := io.ReadFull(client.connRW, header[1:])
case '$': // rtp
header := make([]byte, 3)
_, err := io.ReadFull(client.connRW, header)
if err != nil {
Printf("io.ReadFull err:%v", err)
return
}
channel := int(header[1])
length := binary.BigEndian.Uint16(header[2:])
channel := int(header[0])
length := binary.BigEndian.Uint16(header[1:])
content := make([]byte, length)
_, err = io.ReadFull(client.connRW, content)
if err != nil {
Printf("io.ReadFull err:%v", err)
return
}
switch channel {
case client.aRTPChannel:
client.RtpAudio.Push(content)
if client.RtpAudio != nil {
client.RtpAudio.Push(content)
}
case client.aRTPControlChannel:
case client.vRTPChannel:
client.RtpVideo.Push(content)
if client.RtpVideo != nil {
client.RtpVideo.Push(content)
}
case client.vRTPControlChannel:
default:

8
go.mod
View File

@@ -3,9 +3,9 @@ module github.com/Monibuca/plugin-rtsp/v3
go 1.16
require (
github.com/Monibuca/engine/v3 v3.0.0-beta5
github.com/Monibuca/utils/v3 v3.0.0-beta
github.com/pion/rtp v1.6.5
github.com/Monibuca/engine/v3 v3.3.7
github.com/Monibuca/utils/v3 v3.0.2
github.com/pion/rtp v1.7.2
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125
golang.org/x/sys v0.0.0-20210611083646-a4fc73990273 // indirect
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c // indirect
)

67
go.sum
View File

@@ -1,32 +1,87 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/engine/v3 v3.0.0-beta5 h1:b27ZQDfvf5dBMZbCSIUXItUwVIFs95fpkAV4xjN7BNE=
github.com/Monibuca/engine/v3 v3.0.0-beta5/go.mod h1:SMgnlwih4pBA/HkTLjKXZFYkv3ukRzFjv65CARRLVIk=
github.com/Monibuca/utils/v3 v3.0.0-beta h1:z4p/BSH5J9Ja/gwoDmj1RyN+b0q28Nmn/fqXiwq2hGY=
github.com/Monibuca/utils/v3 v3.0.0-beta/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI=
github.com/Monibuca/engine/v3 v3.3.0 h1:7zwYsLEHdeVZy6+JjVlaDhl/asr0HG6jirBL4uynj0s=
github.com/Monibuca/engine/v3 v3.3.0/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g=
github.com/Monibuca/engine/v3 v3.3.7 h1:EB77gSzvu4ThRWcWBWPmqWcCmMqC4B21/sUQmf/i2XU=
github.com/Monibuca/engine/v3 v3.3.7/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g=
github.com/Monibuca/utils/v3 v3.0.1/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/Monibuca/utils/v3 v3.0.2 h1:n2vr67DHanav8wBC9IENk8xrKzeGJnBsxYUu69s8TrQ=
github.com/Monibuca/utils/v3 v3.0.2/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY=
github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs=
github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4=
github.com/cnotch/loader v0.0.0-20200405015128-d9d964d09439/go.mod h1:oWpDagHB6p+Kqqq7RoRZKyC4XAXft50hR8pbTxdbYYs=
github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg=
github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg=
github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo=
github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=
github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI=
github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.2 h1:HCDKDCixh7PVjkQTsqHAbk1lg+bx059EHxcnyl42dYs=
github.com/pion/rtp v1.7.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY=
github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 h1:3SNcvBmEPE1YlB1JpVZouslJpI3GBNoiqW7+wb0Rz7w=
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210611083646-a4fc73990273 h1:faDu4veV+8pcThn4fewv6TVlNCezafGoC1gM/mxQLbQ=
golang.org/x/sys v0.0.0-20210611083646-a4fc73990273/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c h1:taxlMj0D/1sOAuv/CbSD+MMDof2vbyPTqz5FNYKpXt8=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

46
main.go
View File

@@ -14,43 +14,31 @@ import (
"github.com/teris-io/shortid"
)
var collection sync.Map
var config = struct {
ListenAddr string
AutoPull bool
RemoteAddr string
Timeout int
Reconnect bool
AutoPullList []*struct {
URL string
StreamPath string
}
}{":554", false, "rtsp://localhost/${streamPath}", 0, false, nil}
AutoPullList map[string]string
}{":554", 0, false, nil}
func init() {
InstallPlugin(&PluginConfig{
Name: "RTSP",
Config: &config,
Run: runPlugin,
HotConfig: map[string]func(interface{}){
"AutoPull": func(value interface{}) {
config.AutoPull = value.(bool)
},
},
})
}
func runPlugin() {
http.HandleFunc("/api/rtsp/list", func(w http.ResponseWriter, r *http.Request) {
sse := NewSSE(w, r.Context())
var err error
for tick := time.NewTicker(time.Second); err == nil; <-tick.C {
var info []*RTSP
collection.Range(func(key, value interface{}) bool {
rtsp := value.(*RTSP)
info = append(info, rtsp)
return true
})
for _, s := range Streams.ToList() {
if rtsp, ok := s.ExtraProp.(*RTSP); ok {
info = append(info, rtsp)
}
}
err = sse.WriteJSON(info)
}
})
@@ -58,15 +46,15 @@ func runPlugin() {
CORS(w, r)
targetURL := r.URL.Query().Get("target")
streamPath := r.URL.Query().Get("streamPath")
if err := new(RTSP).PullStream(streamPath, targetURL); err == nil {
if err := (&RTSP{RTSPClientInfo: RTSPClientInfo{Agent: "Monibuca"}}).PullStream(streamPath, targetURL); err == nil {
w.Write([]byte(`{"code":0}`))
} else {
w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error())))
}
})
if len(config.AutoPullList) > 0 {
for _, info := range config.AutoPullList {
if err := new(RTSP).PullStream(info.StreamPath, info.URL); err != nil {
for streamPath, url := range config.AutoPullList {
if err := (&RTSP{RTSPClientInfo: RTSPClientInfo{Agent: "Monibuca"}}).PullStream(streamPath, url); err != nil {
Println(err)
}
}
@@ -74,12 +62,6 @@ func runPlugin() {
if config.ListenAddr != "" {
go log.Fatal(ListenRtsp(config.ListenAddr))
}
// AddHook(HOOK_SUBSCRIBE, func(value interface{}) {
// s := value.(*Subscriber)
// if config.AutoPull && s.Publisher == nil {
// new(RTSP).PullStream(s.StreamPath, strings.Replace(config.RemoteAddr, "${streamPath}", s.StreamPath, -1))
// }
// })
}
func ListenRtsp(addr string) error {
@@ -128,7 +110,7 @@ func ListenRtsp(addr string) error {
}
type RTSP struct {
*Stream
*Stream `json:"-"`
URL string
SDPRaw string
InBytes int
@@ -175,7 +157,7 @@ func (rtsp *RTSP) setVideoTrack() {
}
func (rtsp *RTSP) setAudioTrack() {
var at *RTPAudio
if len(rtsp.ASdp.Control) > 0 {
if len(rtsp.ASdp.Config) > 0 {
at = rtsp.NewRTPAudio(0)
at.SetASC(rtsp.ASdp.Config)
} else {
@@ -186,10 +168,14 @@ func (rtsp *RTSP) setAudioTrack() {
at = rtsp.NewRTPAudio(7)
at.SoundRate = rtsp.ASdp.TimeScale
at.SoundSize = 16
at.Channels = 1
at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)}
case "PCMU":
at = rtsp.NewRTPAudio(8)
at.SoundRate = rtsp.ASdp.TimeScale
at.SoundSize = 16
at.Channels = 1
at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)}
default:
Printf("rtsp audio codec not support:%s", rtsp.ASdp.Codec)
return

View File

@@ -58,6 +58,8 @@ func ParseSDP(sdpRaw string) map[string]*SDPInfo {
keyval = strings.Split(field, "/")
if len(keyval) >= 2 {
switch keyval[0] {
case "h264", "h265", "pcma", "pcmu":
info.Codec = strings.ToUpper(keyval[0])
case "H264", "H265", "PCMA", "PCMU":
info.Codec = keyval[0]
case "HEVC":

View File

@@ -3,6 +3,7 @@ package rtsp
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
@@ -11,16 +12,18 @@ import (
"strconv"
"strings"
"time"
"unsafe"
. "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/teris-io/shortid"
)
type RTPPack struct {
Type RTPType
rtp.Packet
Raw []byte
}
type SessionType int
@@ -72,6 +75,9 @@ func (session *RTSP) SessionString() string {
}
func (session *RTSP) Stop() {
if session.Stream != nil {
session.Close()
}
if session.Conn != nil {
session.connRW.Flush()
session.Conn.Close()
@@ -85,10 +91,6 @@ func (session *RTSP) Stop() {
session.UDPServer.Stop()
session.UDPServer = nil
}
session.Close()
if session.Stream != nil {
collection.Delete(session.StreamPath)
}
}
// AcceptPush 接受推流
@@ -123,22 +125,26 @@ func (session *RTSP) AcceptPush() {
switch channel {
case session.aRTPChannel:
// pack.Type = RTP_TYPE_AUDIO
elapsed := time.Since(timer)
if elapsed >= 30*time.Second {
Println("Recv an audio RTP package")
timer = time.Now()
if session.RtpAudio != nil {
elapsed := time.Since(timer)
if elapsed >= 30*time.Second {
Println("Recv an audio RTP package")
timer = time.Now()
}
session.RtpAudio.Push(rtpBytes)
}
session.RtpAudio.Push(rtpBytes)
case session.aRTPControlChannel:
// pack.Type = RTP_TYPE_AUDIOCONTROL
case session.vRTPChannel:
// pack.Type = RTP_TYPE_VIDEO
elapsed := time.Since(timer)
if elapsed >= 30*time.Second {
Println("Recv an video RTP package")
timer = time.Now()
if session.RtpVideo != nil {
elapsed := time.Since(timer)
if elapsed >= 30*time.Second {
Println("Recv an video RTP package")
timer = time.Now()
}
session.RtpVideo.Push(rtpBytes)
}
session.RtpVideo.Push(rtpBytes)
case session.vRTPControlChannel:
// pack.Type = RTP_TYPE_VIDEOCONTROL
default:
@@ -254,6 +260,7 @@ func (session *RTSP) handleRequest(req *Request) {
//}
Printf("<<<\n%s", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
var streamPath string
defer func() {
if p := recover(); p != nil {
Printf("handleRequest err ocurs:%v", p)
@@ -271,6 +278,61 @@ func (session *RTSP) handleRequest(req *Request) {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
sub := Subscriber{
ID: session.ID,
Type: "RTSP",
}
if sub.Subscribe(streamPath) == nil {
at, vt := session.UDPClient.AT, session.UDPClient.VT
if vt != nil {
var st uint32
onVideo := func(ts uint32, pack *VideoPack) {
if session.UDPClient == nil {
return
}
for _, nalu := range pack.NALUs {
for _, pack := range session.UDPClient.VPacketizer.Packetize(nalu, (ts-st)*90) {
p := &RTPPack{
Type: RTP_TYPE_VIDEO,
}
p.Raw, _ = pack.Marshal()
session.SendRTP(p)
}
}
st = ts
}
sub.OnVideo = func(ts uint32, pack *VideoPack) {
if st = ts; st != 0 {
sub.OnVideo = onVideo
}
onVideo(ts, pack)
}
}
if at != nil {
tb := uint32(at.SoundRate / 1000)
var st uint32
onAudio := func(ts uint32, pack *AudioPack) {
if session.UDPClient == nil {
return
}
for _, pack := range session.UDPClient.APacketizer.Packetize(pack.Payload, (ts-st)*tb) {
p := &RTPPack{
Type: RTP_TYPE_VIDEO,
}
p.Raw, _ = pack.Marshal()
session.SendRTP(p)
}
st = ts
}
sub.OnAudio = func(ts uint32, pack *AudioPack) {
if st = ts; st != 0 {
sub.OnAudio = onAudio
}
onAudio(ts, pack)
}
}
go sub.Play(at, vt)
}
// if session.Pusher.HasPlayer(session.Player) {
// session.Player.Pause(false)
// } else {
@@ -288,6 +350,14 @@ func (session *RTSP) handleRequest(req *Request) {
session.Stop()
}
}()
session.URL = req.URL
_url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath = strings.TrimPrefix(_url.Path, "/")
if req.Method != "OPTIONS" {
if session.Auth != nil {
authLine := req.Header["Authorization"]
@@ -305,7 +375,7 @@ func (session *RTSP) handleRequest(req *Request) {
res.Status = "Unauthorized"
nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate())))
session.nonce = nonce
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce)
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="Monibuca", nonce="%s", algorithm="MD5"`, nonce)
return
}
}
@@ -315,24 +385,9 @@ func (session *RTSP) handleRequest(req *Request) {
res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD"
case "ANNOUNCE":
session.Type = SESSION_TYPE_PUSHER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := strings.TrimPrefix(url.Path, "/")
session.SDPRaw = req.Body
session.SDPMap = ParseSDP(req.Body)
stream := &Stream{
StreamPath: streamPath,
Type: "RTSP",
}
session.Stream = stream
if session.Publish() {
if session.Stream = Publish(streamPath, "RTSP"); session.Stream != nil {
if session.ASdp, session.HasAudio = session.SDPMap["audio"]; session.HasAudio {
session.setAudioTrack()
Printf("audio codec[%s]\n", session.ASdp.Codec)
@@ -342,24 +397,69 @@ func (session *RTSP) handleRequest(req *Request) {
Printf("video codec[%s]\n", session.VSdp.Codec)
}
session.Stream.Type = "RTSP"
collection.Store(streamPath, session)
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := url.Path
stream := FindStream(streamPath)
if stream == nil {
res.StatusCode = 404
res.Status = "No Such Stream:" + streamPath
return
}
//
//res.SetBody(session.SDPRaw)
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %d", session.ID, 0),
"s=monibuca",
"t=0 0",
"a=recvonly",
}
ssrc := uintptr(unsafe.Pointer(stream))
if session.UDPClient == nil {
session.UDPClient = &UDPClient{
Conn: session.Conn.Conn,
}
}
vt, at := stream.WaitVideoTrack(), stream.WaitAudioTrack()
if vt != nil {
session.UDPClient.VT = vt
sdpInfo = append(sdpInfo, "m=video 0 RTP/AVP 96")
switch vt.CodecID {
case 7:
sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0])
pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1])
session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000)
sdpInfo = append(sdpInfo, "a=rtpmap:96 H264/90000",
fmt.Sprintf("a=fmtp:96 profile-level-id=%02X00%02X; packetization-mode=1; sprop-parameter-sets=%s,%s", vt.SPSInfo.ProfileIdc, vt.SPSInfo.LevelIdc*10, sps, pps))
case 12:
vps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0])
sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1])
pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[2])
// TODO:
// session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H265Payloader{}, rtp.NewFixedSequencer(1), 90000)
sdpInfo = append(sdpInfo, "a=rtpmap:96 H265/90000",
fmt.Sprintf("a=fmtp:96 packetization-mode=1;sprop-vps=%s;sprop-sps=%s;sprop-pps=%s", vps, sps, pps))
}
}
if at != nil {
sdpInfo = append(sdpInfo, "m=audio 0 RTP/AVP 97")
switch at.CodecID {
case 7:
sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMA/8000")
session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
session.UDPClient.AT = at
case 8:
sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMU/8000")
session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
session.UDPClient.AT = at
case 10:
// TODO:
sdpInfo = append(sdpInfo, fmt.Sprintf("a=rtpmap:97 MPEG4-GENERIC/%d/%d", at.SoundRate, at.Channels))
// session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &AACPayloader{}, rtp.NewFixedSequencer(1), uint32(at.SoundRate))
//session.UDPClient.AT = at
}
}
session.SDPRaw = strings.Join(sdpInfo, "\r\n") + "\r\n"
res.SetBody(session.SDPRaw)
case "SETUP":
ts := req.Header["Transport"]
// control字段可能是`stream=1`字样也可能是rtsp://...字样。即control可能是url的path也可能是整个url
@@ -369,16 +469,10 @@ func (session *RTSP) handleRequest(req *Request) {
// a=control:rtsp://192.168.1.64/trackID=1
// 例3
// a=control:?ctype=video
setupUrl, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
if _url.Port() == "" {
_url.Host = fmt.Sprintf("%s:554", _url.Host)
}
if setupUrl.Port() == "" {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()
setupPath := _url.String()
// error status. SETUP without ANNOUNCE or DESCRIBE.
//if session.Pusher == nil {
@@ -508,6 +602,25 @@ func (session *RTSP) handleRequest(req *Request) {
ts = strings.Join(tss, ";")
}
} else {
if session.Type == SESSEION_TYPE_PLAYER {
if session.UDPClient.VPort == 0 {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
} else {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}
}
Printf("SETUP [UDP] got UnKown control:%s", setupPath)
}
}

View File

@@ -5,10 +5,13 @@ import (
"net"
"strings"
. "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
"github.com/pion/rtp"
)
type UDPClient struct {
Conn net.Conn
APort int
AConn *net.UDPConn
AControlPort int
@@ -17,8 +20,11 @@ type UDPClient struct {
VConn *net.UDPConn
VControlPort int
VControlConn *net.UDPConn
Stoped bool
AT *AudioTrack
APacketizer rtp.Packetizer
VT *VideoTrack
VPacketizer rtp.Packetizer
Stoped bool
}
func (s *UDPClient) Stop() {
@@ -51,7 +57,7 @@ func (c *UDPClient) SetupAudio() (err error) {
c.Stop()
}
}()
host := c.AConn.RemoteAddr().String()
host := c.Conn.RemoteAddr().String()
host = host[:strings.LastIndex(host, ":")]
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort))
if err != nil {
@@ -93,7 +99,7 @@ func (c *UDPClient) SetupVideo() (err error) {
c.Stop()
}
}()
host := c.VConn.RemoteAddr().String()
host := c.Conn.RemoteAddr().String()
host = host[:strings.LastIndex(host, ":")]
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort))
if err != nil {