mirror of
https://github.com/Monibuca/plugin-rtsp.git
synced 2025-09-27 03:56:08 +08:00
加入转推功能
This commit is contained in:
93
client.go
93
client.go
@@ -3,11 +3,16 @@ package rtsp
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
. "github.com/Monibuca/engine/v3"
|
||||
. "github.com/Monibuca/utils/v3"
|
||||
"github.com/Monibuca/utils/v3/codec"
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/aler9/gortsplib/pkg/aac"
|
||||
"github.com/aler9/gortsplib/pkg/base"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
)
|
||||
|
||||
type RTSPClient struct {
|
||||
@@ -48,7 +53,93 @@ func (rtsp *RTSPClient) PullStream(streamPath string, rtspUrl string) (err error
|
||||
}
|
||||
return errors.New("publish badname")
|
||||
}
|
||||
|
||||
func (rtsp *RTSPClient) PushStream(streamPath string, rtspUrl string) (err error) {
|
||||
if s := FindStream(streamPath); s != nil {
|
||||
var tracks gortsplib.Tracks
|
||||
var sub RTSPSubscriber
|
||||
sub.Type = "RTSP push out"
|
||||
sub.vt = s.WaitVideoTrack("h264", "h265")
|
||||
sub.at = s.WaitAudioTrack("aac", "pcma", "pcmu")
|
||||
ssrc := uintptr(unsafe.Pointer(&sub))
|
||||
var trackIds = 0
|
||||
if sub.vt != nil {
|
||||
trackId := trackIds
|
||||
var vtrack *gortsplib.Track
|
||||
var vpacketer rtp.Packetizer
|
||||
switch sub.vt.CodecID {
|
||||
case codec.CodecID_H264:
|
||||
if vtrack, err = gortsplib.NewTrackH264(96, &gortsplib.TrackConfigH264{
|
||||
SPS: sub.vt.ExtraData.NALUs[0],
|
||||
PPS: sub.vt.ExtraData.NALUs[1],
|
||||
}); err == nil {
|
||||
vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
case codec.CodecID_H265:
|
||||
vtrack = NewH265Track(96, sub.vt.ExtraData.NALUs)
|
||||
vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &H265Payloader{}, rtp.NewFixedSequencer(1), 90000)
|
||||
}
|
||||
var st uint32
|
||||
onVideo := func(ts uint32, pack *VideoPack) {
|
||||
for _, nalu := range pack.NALUs {
|
||||
for _, pack := range vpacketer.Packetize(nalu, (ts-st)*90) {
|
||||
rtp, _ := pack.Marshal()
|
||||
rtsp.WritePacketRTP(trackId, rtp)
|
||||
}
|
||||
}
|
||||
st = ts
|
||||
}
|
||||
sub.OnVideo = func(ts uint32, pack *VideoPack) {
|
||||
if st = ts; st != 0 {
|
||||
sub.OnVideo = onVideo
|
||||
}
|
||||
onVideo(ts, pack)
|
||||
}
|
||||
tracks = append(tracks, vtrack)
|
||||
trackIds++
|
||||
}
|
||||
if sub.at != nil {
|
||||
var st uint32
|
||||
trackId := trackIds
|
||||
switch sub.at.CodecID {
|
||||
case codec.CodecID_PCMA, codec.CodecID_PCMU:
|
||||
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.vt.CodecID])
|
||||
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
|
||||
sub.OnAudio = func(ts uint32, pack *AudioPack) {
|
||||
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*8) {
|
||||
buf, _ := pack.Marshal()
|
||||
rtsp.WritePacketRTP(trackId, buf)
|
||||
}
|
||||
st = ts
|
||||
}
|
||||
tracks = append(tracks, atrack)
|
||||
case codec.CodecID_AAC:
|
||||
var mpegConf aac.MPEG4AudioConfig
|
||||
mpegConf.Decode(sub.at.ExtraData[2:])
|
||||
conf := &gortsplib.TrackConfigAAC{
|
||||
Type: int(mpegConf.Type),
|
||||
SampleRate: mpegConf.SampleRate,
|
||||
ChannelCount: mpegConf.ChannelCount,
|
||||
AOTSpecificConfig: mpegConf.AOTSpecificConfig,
|
||||
}
|
||||
if atrack, err := gortsplib.NewTrackAAC(97, conf); err == nil {
|
||||
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &AACPayloader{}, rtp.NewFixedSequencer(1), uint32(mpegConf.SampleRate))
|
||||
sub.OnAudio = func(ts uint32, pack *AudioPack) {
|
||||
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*uint32(mpegConf.SampleRate)/1000) {
|
||||
buf, _ := pack.Marshal()
|
||||
rtsp.WritePacketRTP(trackId, buf)
|
||||
}
|
||||
st = ts
|
||||
}
|
||||
tracks = append(tracks, atrack)
|
||||
}
|
||||
}
|
||||
}
|
||||
return rtsp.StartPublishing(rtspUrl, tracks)
|
||||
}
|
||||
return errors.New("stream not exist")
|
||||
}
|
||||
func (client *RTSPClient) startStream() {
|
||||
if client.Err() != nil {
|
||||
return
|
||||
|
10
main.go
10
main.go
@@ -19,7 +19,8 @@ var config = struct {
|
||||
Timeout int
|
||||
Reconnect bool
|
||||
AutoPullList map[string]string
|
||||
}{":554", ":8000", ":8001", 0, false, nil}
|
||||
AutoPushList map[string]string
|
||||
}{":554", ":8000", ":8001", 0, false, nil, nil}
|
||||
|
||||
func init() {
|
||||
InstallPlugin(&PluginConfig{
|
||||
@@ -63,13 +64,18 @@ func runPlugin() {
|
||||
w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error())))
|
||||
}
|
||||
})
|
||||
if len(config.AutoPullList) > 0 {
|
||||
for streamPath, url := range config.AutoPullList {
|
||||
if err := (&RTSPClient{}).PullStream(streamPath, url); err != nil {
|
||||
Println(err)
|
||||
}
|
||||
}
|
||||
go AddHook(HOOK_PUBLISH, func(s *Stream) {
|
||||
for streamPath, url := range config.AutoPushList {
|
||||
if s.StreamPath == streamPath {
|
||||
(&RTSPClient{}).PushStream(streamPath, url)
|
||||
}
|
||||
}
|
||||
})
|
||||
if config.ListenAddr != "" {
|
||||
go log.Fatal(ListenRtsp(config.ListenAddr))
|
||||
}
|
||||
|
Reference in New Issue
Block a user