mirror of
https://github.com/Monibuca/plugin-rtsp.git
synced 2025-09-27 12:02:20 +08:00
Compare commits
16 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e6d0489d9c | ||
![]() |
68d0d9aa08 | ||
![]() |
e411d30e91 | ||
![]() |
709a4cee7b | ||
![]() |
a90f52769d | ||
![]() |
3764a26bbd | ||
![]() |
2533ab2604 | ||
![]() |
db07f0d588 | ||
![]() |
f110513d70 | ||
![]() |
8901f4c117 | ||
![]() |
2f7c2de352 | ||
![]() |
af053bb5e6 | ||
![]() |
bed7ba8a87 | ||
![]() |
0cbc4beb0f | ||
![]() |
edbfc07275 | ||
![]() |
329f93022e |
33
client.go
33
client.go
@@ -42,6 +42,7 @@ func (rtsp *RTSPClient) PullStream(streamPath string, rtspUrl string) (err error
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
if rtsp.IsTimeout {
|
||||
rtsp.processFunc = nil
|
||||
go rtsp.PullStream(streamPath, rtspUrl)
|
||||
}
|
||||
}()
|
||||
@@ -103,7 +104,7 @@ func (rtsp *RTSPClient) PushStream(streamPath string, rtspUrl string) (err error
|
||||
trackId := trackIds
|
||||
switch sub.at.CodecID {
|
||||
case codec.CodecID_PCMA, codec.CodecID_PCMU:
|
||||
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.vt.CodecID])
|
||||
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.at.CodecID])
|
||||
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
|
||||
sub.OnAudio = func(ts uint32, pack *AudioPack) {
|
||||
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*8) {
|
||||
@@ -145,12 +146,15 @@ func (client *RTSPClient) pullStream() {
|
||||
}
|
||||
client.Client = &gortsplib.Client{
|
||||
OnPacketRTP: func(trackID int, payload []byte) {
|
||||
var clone []byte
|
||||
client.processFunc[trackID](append(clone, payload...))
|
||||
// Println("OnPacketRTP", trackID, len(payload))
|
||||
if f := client.processFunc[trackID]; f != nil {
|
||||
var clone []byte
|
||||
f(append(clone, payload...))
|
||||
}
|
||||
},
|
||||
Transport: &client.Transport,
|
||||
ReadBufferSize: config.ReadBufferSize,
|
||||
Transport: &client.Transport,
|
||||
}
|
||||
defer client.Client.Close()
|
||||
// parse URL
|
||||
u, err := base.ParseURL(client.URL)
|
||||
if err != nil {
|
||||
@@ -162,6 +166,11 @@ func (client *RTSPClient) pullStream() {
|
||||
Printf("connect:%s error:%v", client.URL, err)
|
||||
return
|
||||
}
|
||||
client.OnClose = func() {
|
||||
client.Client.Close()
|
||||
}
|
||||
//client.close should be after connected!
|
||||
defer client.Client.Close()
|
||||
var res *base.Response
|
||||
if res, err = client.Options(u); err != nil {
|
||||
Printf("option:%s error:%v", client.URL, err)
|
||||
@@ -179,7 +188,7 @@ func (client *RTSPClient) pullStream() {
|
||||
client.setTracks(tracks)
|
||||
}
|
||||
for _, track := range tracks {
|
||||
if res, err = client.Setup(true, baseURL, track, 0, 0); err != nil {
|
||||
if res, err = client.Setup(true, track, baseURL, 0, 0); err != nil {
|
||||
Printf("Setup:%s error:%v", baseURL.String(), err)
|
||||
return
|
||||
}
|
||||
@@ -192,6 +201,14 @@ func (client *RTSPClient) pullStream() {
|
||||
}
|
||||
Println(res)
|
||||
// wait until a fatal error
|
||||
err = client.Wait()
|
||||
Printf("Wait:%s error:%v", baseURL.String(), err)
|
||||
var fatalChan = make(chan error)
|
||||
go func() {
|
||||
fatalChan <- client.Wait()
|
||||
}()
|
||||
select {
|
||||
case err := <-fatalChan:
|
||||
Printf("Wait:%s error:%v", baseURL.String(), err)
|
||||
case <-client.Done():
|
||||
Printf("client:%s done", client.URL)
|
||||
}
|
||||
}
|
||||
|
7
go.mod
7
go.mod
@@ -3,10 +3,9 @@ module github.com/Monibuca/plugin-rtsp/v3
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/Monibuca/engine/v3 v3.3.11
|
||||
github.com/Monibuca/utils/v3 v3.0.4
|
||||
github.com/aler9/gortsplib v0.0.0-20211118111704-d5bdc197beb5
|
||||
github.com/Monibuca/engine/v3 v3.4.1
|
||||
github.com/Monibuca/utils/v3 v3.0.5
|
||||
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529
|
||||
github.com/pion/rtp v1.7.4
|
||||
github.com/pion/sdp/v3 v3.0.4
|
||||
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 // indirect
|
||||
)
|
||||
|
26
go.sum
26
go.sum
@@ -1,18 +1,12 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
|
||||
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/Monibuca/engine/v3 v3.3.10 h1:zRw9aGEmB6K6ee0figdRh2HZFGQSn2nvpptMT3Xm0HY=
|
||||
github.com/Monibuca/engine/v3 v3.3.10/go.mod h1:LowMZ/iw4t6tfTZkSYZHIA0Z1HE8b7xfTDLO4WhX3Hg=
|
||||
github.com/Monibuca/engine/v3 v3.3.11 h1:4zuuW34UgBxNT66wg+SbYm0C/kp2wghnoqWjM06eIIs=
|
||||
github.com/Monibuca/engine/v3 v3.3.11/go.mod h1:LowMZ/iw4t6tfTZkSYZHIA0Z1HE8b7xfTDLO4WhX3Hg=
|
||||
github.com/Monibuca/utils/v3 v3.0.4 h1:PssGhww+qePzw4qpB3g2DCG5Buru0Cu64UiqtAPuHjc=
|
||||
github.com/Monibuca/utils/v3 v3.0.4/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
|
||||
github.com/aler9/gortsplib v0.0.0-20211106122816-6e38851a096e h1:qSjVAaIvJukmEuLxV0agmQ5KmBabBK+jzb+eNqG3Z+w=
|
||||
github.com/aler9/gortsplib v0.0.0-20211106122816-6e38851a096e/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
|
||||
github.com/aler9/gortsplib v0.0.0-20211115164017-1411cb33f558 h1:uyW1alIzoJCAmNZ2xuo5EOTbSbf9W7tVYOzb0UrzZ0U=
|
||||
github.com/aler9/gortsplib v0.0.0-20211115164017-1411cb33f558/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
|
||||
github.com/aler9/gortsplib v0.0.0-20211118111704-d5bdc197beb5 h1:qcrLOrHxW0x9fIguacwD/2CIlswI6kVCUThtx/uuhSs=
|
||||
github.com/aler9/gortsplib v0.0.0-20211118111704-d5bdc197beb5/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
|
||||
github.com/Monibuca/engine/v3 v3.4.1 h1:Ap2VbwTkMUkv80NPeUX2sNdV5Vz5nPVoU/6RU51PSAc=
|
||||
github.com/Monibuca/engine/v3 v3.4.1/go.mod h1:rgAUey5ziRhlh6WugWyA5fYKyGOvcwhtTMDk4sukE7E=
|
||||
github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es=
|
||||
github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
|
||||
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529 h1:j2tfs+eUubyZnuwmYWzK+IS681IixfUyD8bivz4sqAw=
|
||||
github.com/aler9/gortsplib v0.0.0-20211212220644-6f374e396529/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc=
|
||||
github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
|
||||
github.com/asticode/go-astits v1.10.0/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
|
||||
github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY=
|
||||
@@ -58,7 +52,6 @@ github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko
|
||||
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
|
||||
github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA=
|
||||
github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
|
||||
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g=
|
||||
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
|
||||
github.com/pion/sdp/v3 v3.0.4 h1:2Kf+dgrzJflNCSw3TV5v2VLeI0s/qkzy2r5jlR0wzf8=
|
||||
github.com/pion/sdp/v3 v3.0.4/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
|
||||
@@ -91,13 +84,8 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20211113001501-0c823b97ae02 h1:7NCfEGl0sfUojmX78nK9pBJuUlSZWEJA/TwASvfiPLo=
|
||||
golang.org/x/sys v0.0.0-20211113001501-0c823b97ae02/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c h1:DHcbWVXeY+0Y8HHKR+rbLwnoh2F4tNCY7rTiHJ30RmA=
|
||||
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI=
|
||||
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
52
main.go
52
main.go
@@ -13,37 +13,32 @@ import (
|
||||
)
|
||||
|
||||
var config = struct {
|
||||
ListenAddr string
|
||||
UDPAddr string
|
||||
RTCPAddr string
|
||||
Timeout int
|
||||
Reconnect bool
|
||||
AutoPullList map[string]string
|
||||
AutoPushList map[string]string
|
||||
}{":554", ":8000", ":8001", 0, false, nil, nil}
|
||||
ListenAddr string
|
||||
UDPAddr string
|
||||
RTCPAddr string
|
||||
Timeout int
|
||||
Reconnect bool
|
||||
AutoPullList map[string]string
|
||||
AutoPushList map[string]string
|
||||
ReadBufferSize int
|
||||
}{":554", ":8000", ":8001", 0, false, nil, nil, 2048}
|
||||
|
||||
type RTSPStreamInfo struct {
|
||||
StreamPath string
|
||||
Type string //流类型,来自发布者
|
||||
StartTime time.Time
|
||||
URL string
|
||||
SubscriberCount int
|
||||
var pconfig = PluginConfig{
|
||||
Name: "RTSP",
|
||||
Config: &config,
|
||||
}
|
||||
|
||||
func init() {
|
||||
InstallPlugin(&PluginConfig{
|
||||
Name: "RTSP",
|
||||
Config: &config,
|
||||
Run: runPlugin,
|
||||
})
|
||||
pconfig.Install(runPlugin)
|
||||
}
|
||||
func getRtspList() (info []*RTSPStreamInfo) {
|
||||
|
||||
func getRtspList() (info []*Stream) {
|
||||
for _, s := range Streams.ToList() {
|
||||
switch rtsp := s.ExtraProp.(type) {
|
||||
switch s.ExtraProp.(type) {
|
||||
case *RTSPublisher:
|
||||
info = append(info, rtsp.GetInfo())
|
||||
info = append(info, s)
|
||||
case *RTSPClient:
|
||||
info = append(info, rtsp.GetInfo())
|
||||
info = append(info, s)
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -69,7 +64,17 @@ func runPlugin() {
|
||||
CORS(w, r)
|
||||
targetURL := r.URL.Query().Get("target")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
save := r.URL.Query().Get("save")
|
||||
if err := (&RTSPClient{Transport: gortsplib.TransportTCP}).PullStream(streamPath, targetURL); err == nil {
|
||||
if save == "1" {
|
||||
if config.AutoPullList == nil {
|
||||
config.AutoPullList = make(map[string]string)
|
||||
}
|
||||
config.AutoPullList[streamPath] = targetURL
|
||||
if err := pconfig.Save(); err != nil {
|
||||
Println(err)
|
||||
}
|
||||
}
|
||||
w.Write([]byte(`{"code":0}`))
|
||||
} else {
|
||||
w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error())))
|
||||
@@ -80,6 +85,7 @@ func runPlugin() {
|
||||
Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
go AddHook(HOOK_PUBLISH, func(s *Stream) {
|
||||
for streamPath, url := range config.AutoPushList {
|
||||
if s.StreamPath == streamPath {
|
||||
|
11
publisher.go
11
publisher.go
@@ -13,21 +13,10 @@ import (
|
||||
|
||||
type RTSPublisher struct {
|
||||
*Stream `json:"-"`
|
||||
URL string
|
||||
stream *gortsplib.ServerStream
|
||||
processFunc []func([]byte)
|
||||
}
|
||||
|
||||
func (p *RTSPublisher) GetInfo() (info *RTSPStreamInfo) {
|
||||
info = &RTSPStreamInfo{
|
||||
URL: p.URL,
|
||||
StreamPath: p.StreamPath,
|
||||
Type: p.Type,
|
||||
StartTime: p.StartTime,
|
||||
SubscriberCount: len(p.Subscribers),
|
||||
}
|
||||
return
|
||||
}
|
||||
func (p *RTSPublisher) setTracks(tracks gortsplib.Tracks) {
|
||||
if p.processFunc != nil {
|
||||
p.processFunc = p.processFunc[:len(tracks)]
|
||||
|
27
server.go
27
server.go
@@ -75,7 +75,7 @@ func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
|
||||
sub.Type = "RTSP pull"
|
||||
sub.vt = s.WaitVideoTrack("h264", "h265")
|
||||
sub.at = s.WaitAudioTrack("aac", "pcma", "pcmu")
|
||||
ssrc := uintptr(unsafe.Pointer(stream))
|
||||
ssrc := uintptr(unsafe.Pointer(&stream))
|
||||
var trackIds = 0
|
||||
if sub.vt != nil {
|
||||
trackId := trackIds
|
||||
@@ -97,9 +97,17 @@ func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
|
||||
}
|
||||
var st uint32
|
||||
onVideo := func(ts uint32, pack *engine.VideoPack) {
|
||||
for _, nalu := range pack.NALUs {
|
||||
for _, pack := range vpacketer.Packetize(nalu, (ts-st)*90) {
|
||||
rtp, _ := pack.Marshal()
|
||||
for i, nalu := range pack.NALUs {
|
||||
var samples uint32
|
||||
if i == len(pack.NALUs)-1 {
|
||||
samples = (ts - st) * 90
|
||||
} else {
|
||||
samples = 0
|
||||
}
|
||||
packs := vpacketer.Packetize(nalu, samples)
|
||||
for j, rtpack := range packs {
|
||||
rtpack.Marker = i == len(pack.NALUs)-1 && j == len(packs)-1
|
||||
rtp, _ := rtpack.Marshal()
|
||||
stream.WritePacketRTP(trackId, rtp)
|
||||
}
|
||||
}
|
||||
@@ -119,7 +127,7 @@ func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
|
||||
trackId := trackIds
|
||||
switch sub.at.CodecID {
|
||||
case codec.CodecID_PCMA, codec.CodecID_PCMU:
|
||||
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.vt.CodecID])
|
||||
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.at.CodecID])
|
||||
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
|
||||
sub.OnAudio = func(ts uint32, pack *engine.AudioPack) {
|
||||
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*8) {
|
||||
@@ -244,6 +252,13 @@ func (sh *RTSPServer) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
|
||||
// called after receiving a frame.
|
||||
func (sh *RTSPServer) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
|
||||
if p, ok := sh.Load(ctx.Session); ok {
|
||||
p.(*RTSPublisher).processFunc[ctx.TrackID](ctx.Payload)
|
||||
rtsp := p.(*RTSPublisher)
|
||||
if rtsp.Err() != nil {
|
||||
ctx.Session.Close()
|
||||
return
|
||||
}
|
||||
if f := rtsp.processFunc[ctx.TrackID]; f != nil {
|
||||
f(ctx.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user