Compare commits

..

10 Commits

Author SHA1 Message Date
dexter
e411d30e91 Merge pull request #17 from jianglieshan/aler9
fix:修改rtsp插件作为服务端出流时,ssrc为0的bug
2022-01-16 13:12:11 +08:00
jianglieshan
709a4cee7b fix:修改rtsp插件作为服务端出流时,ssrc为0的bug 2022-01-15 16:50:40 +08:00
dexter
a90f52769d Merge pull request #16 from ziminghua/aler9
增加RTSPClient关闭的事件订阅,同步关闭客户端连接
2022-01-11 17:00:40 +08:00
訾明华
3764a26bbd 增加RTSPClient关闭的事件订阅,同步关闭客户端连接
增加RTSPClient关闭的事件订阅,同步关闭客户端连接
2022-01-11 16:52:14 +08:00
dexter
2533ab2604 Merge pull request #15 from ziminghua/aler9
多slice的情况下,同步同一帧的时间戳
2022-01-11 11:40:38 +08:00
訾明华
db07f0d588 多slice的情况下,同步同一帧的时间戳
`vpacketer.Packetize`再打包的过程中会把当前的timestamp+samples作为下一次打包的时间戳,如果多slice会连续传递samples导致同一帧的时间戳不一致
2022-01-11 11:34:29 +08:00
dexter
f110513d70 增加配置项ReadBufferSize 2021-12-29 22:59:45 +08:00
dexter
8901f4c117 修复bug 2021-12-29 22:16:11 +08:00
dexter
2f7c2de352 增加读取缓存大小,设置Mark标志位 2021-12-29 20:18:12 +08:00
dexter
af053bb5e6 对处理回调判空 2021-12-27 20:42:06 +08:00
6 changed files with 57 additions and 53 deletions

View File

@@ -145,10 +145,14 @@ func (client *RTSPClient) pullStream() {
}
client.Client = &gortsplib.Client{
OnPacketRTP: func(trackID int, payload []byte) {
var clone []byte
client.processFunc[trackID](append(clone, payload...))
// Println("OnPacketRTP", trackID, len(payload))
if f := client.processFunc[trackID]; f != nil {
var clone []byte
f(append(clone, payload...))
}
},
Transport: &client.Transport,
ReadBufferSize: config.ReadBufferSize,
Transport: &client.Transport,
}
// parse URL
u, err := base.ParseURL(client.URL)
@@ -161,6 +165,9 @@ func (client *RTSPClient) pullStream() {
Printf("connect:%s error:%v", client.URL, err)
return
}
client.OnClose = func() {
client.Client.Close()
}
//client.close should be after connected!
defer client.Client.Close()
var res *base.Response

3
go.mod
View File

@@ -3,10 +3,9 @@ module github.com/Monibuca/plugin-rtsp/v3
go 1.16
require (
github.com/Monibuca/engine/v3 v3.3.11
github.com/Monibuca/engine/v3 v3.4.1
github.com/Monibuca/utils/v3 v3.0.5
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529
github.com/pion/rtp v1.7.4
github.com/pion/sdp/v3 v3.0.4
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)

11
go.sum
View File

@@ -1,14 +1,10 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/Monibuca/engine/v3 v3.3.11 h1:4zuuW34UgBxNT66wg+SbYm0C/kp2wghnoqWjM06eIIs=
github.com/Monibuca/engine/v3 v3.3.11/go.mod h1:LowMZ/iw4t6tfTZkSYZHIA0Z1HE8b7xfTDLO4WhX3Hg=
github.com/Monibuca/utils/v3 v3.0.4 h1:PssGhww+qePzw4qpB3g2DCG5Buru0Cu64UiqtAPuHjc=
github.com/Monibuca/utils/v3 v3.0.4/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/Monibuca/engine/v3 v3.4.1 h1:Ap2VbwTkMUkv80NPeUX2sNdV5Vz5nPVoU/6RU51PSAc=
github.com/Monibuca/engine/v3 v3.4.1/go.mod h1:rgAUey5ziRhlh6WugWyA5fYKyGOvcwhtTMDk4sukE7E=
github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es=
github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/aler9/gortsplib v0.0.0-20211118111704-d5bdc197beb5 h1:qcrLOrHxW0x9fIguacwD/2CIlswI6kVCUThtx/uuhSs=
github.com/aler9/gortsplib v0.0.0-20211118111704-d5bdc197beb5/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529 h1:j2tfs+eUubyZnuwmYWzK+IS681IixfUyD8bivz4sqAw=
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
@@ -86,13 +82,10 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

52
main.go
View File

@@ -13,37 +13,32 @@ import (
)
var config = struct {
ListenAddr string
UDPAddr string
RTCPAddr string
Timeout int
Reconnect bool
AutoPullList map[string]string
AutoPushList map[string]string
}{":554", ":8000", ":8001", 0, false, nil, nil}
ListenAddr string
UDPAddr string
RTCPAddr string
Timeout int
Reconnect bool
AutoPullList map[string]string
AutoPushList map[string]string
ReadBufferSize int
}{":554", ":8000", ":8001", 0, false, nil, nil, 2048}
type RTSPStreamInfo struct {
StreamPath string
Type string //流类型,来自发布者
StartTime time.Time
URL string
SubscriberCount int
var pconfig = PluginConfig{
Name: "RTSP",
Config: &config,
}
func init() {
InstallPlugin(&PluginConfig{
Name: "RTSP",
Config: &config,
Run: runPlugin,
})
pconfig.Install(runPlugin)
}
func getRtspList() (info []*RTSPStreamInfo) {
func getRtspList() (info []*Stream) {
for _, s := range Streams.ToList() {
switch rtsp := s.ExtraProp.(type) {
switch s.ExtraProp.(type) {
case *RTSPublisher:
info = append(info, rtsp.GetInfo())
info = append(info, s)
case *RTSPClient:
info = append(info, rtsp.GetInfo())
info = append(info, s)
}
}
return
@@ -69,7 +64,17 @@ func runPlugin() {
CORS(w, r)
targetURL := r.URL.Query().Get("target")
streamPath := r.URL.Query().Get("streamPath")
save := r.URL.Query().Get("save")
if err := (&RTSPClient{Transport: gortsplib.TransportTCP}).PullStream(streamPath, targetURL); err == nil {
if save == "1" {
if config.AutoPullList == nil {
config.AutoPullList = make(map[string]string)
}
config.AutoPullList[streamPath] = targetURL
if err := pconfig.Save(); err != nil {
Println(err)
}
}
w.Write([]byte(`{"code":0}`))
} else {
w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error())))
@@ -80,6 +85,7 @@ func runPlugin() {
Println(err)
}
}
go AddHook(HOOK_PUBLISH, func(s *Stream) {
for streamPath, url := range config.AutoPushList {
if s.StreamPath == streamPath {

View File

@@ -13,21 +13,10 @@ import (
type RTSPublisher struct {
*Stream `json:"-"`
URL string
stream *gortsplib.ServerStream
processFunc []func([]byte)
}
func (p *RTSPublisher) GetInfo() (info *RTSPStreamInfo) {
info = &RTSPStreamInfo{
URL: p.URL,
StreamPath: p.StreamPath,
Type: p.Type,
StartTime: p.StartTime,
SubscriberCount: len(p.Subscribers),
}
return
}
func (p *RTSPublisher) setTracks(tracks gortsplib.Tracks) {
if p.processFunc != nil {
p.processFunc = p.processFunc[:len(tracks)]

View File

@@ -75,7 +75,7 @@ func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
sub.Type = "RTSP pull"
sub.vt = s.WaitVideoTrack("h264", "h265")
sub.at = s.WaitAudioTrack("aac", "pcma", "pcmu")
ssrc := uintptr(unsafe.Pointer(stream))
ssrc := uintptr(unsafe.Pointer(&stream))
var trackIds = 0
if sub.vt != nil {
trackId := trackIds
@@ -97,9 +97,17 @@ func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
}
var st uint32
onVideo := func(ts uint32, pack *engine.VideoPack) {
for _, nalu := range pack.NALUs {
for _, pack := range vpacketer.Packetize(nalu, (ts-st)*90) {
rtp, _ := pack.Marshal()
for i, nalu := range pack.NALUs {
var samples uint32
if i == len(pack.NALUs)-1 {
samples = (ts - st) * 90
} else {
samples = 0
}
packs := vpacketer.Packetize(nalu, samples)
for j, rtpack := range packs {
rtpack.Marker = i == len(pack.NALUs)-1 && j == len(packs)-1
rtp, _ := rtpack.Marshal()
stream.WritePacketRTP(trackId, rtp)
}
}
@@ -249,6 +257,8 @@ func (sh *RTSPServer) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
ctx.Session.Close()
return
}
rtsp.processFunc[ctx.TrackID](ctx.Payload)
if f := rtsp.processFunc[ctx.TrackID]; f != nil {
f(ctx.Payload)
}
}
}