mirror of
https://github.com/Monibuca/plugin-rtsp.git
synced 2025-09-26 19:51:14 +08:00
198 lines
5.6 KiB
Go
198 lines
5.6 KiB
Go
package rtsp
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
"unsafe"
|
|
|
|
. "github.com/Monibuca/engine/v3"
|
|
. "github.com/Monibuca/utils/v3"
|
|
"github.com/Monibuca/utils/v3/codec"
|
|
"github.com/aler9/gortsplib"
|
|
"github.com/aler9/gortsplib/pkg/aac"
|
|
"github.com/aler9/gortsplib/pkg/base"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/rtp/codecs"
|
|
)
|
|
|
|
type RTSPClient struct {
|
|
RTSPublisher
|
|
Transport gortsplib.Transport
|
|
*gortsplib.Client `json:"-"`
|
|
}
|
|
|
|
// PullStream 从外部拉流
|
|
func (rtsp *RTSPClient) PullStream(streamPath string, rtspUrl string) (err error) {
|
|
rtsp.Stream = &Stream{
|
|
StreamPath: streamPath,
|
|
Type: "RTSP Pull",
|
|
ExtraProp: rtsp,
|
|
}
|
|
if result := rtsp.Publish(); result {
|
|
rtsp.URL = rtspUrl
|
|
if config.Reconnect {
|
|
go func() {
|
|
for rtsp.startStream(); rtsp.Err() == nil; rtsp.startStream() {
|
|
Printf("reconnecting:%s in 5 seconds", rtspUrl)
|
|
if rtsp.Transport == gortsplib.TransportTCP {
|
|
rtsp.Transport = gortsplib.TransportUDP
|
|
} else {
|
|
rtsp.Transport = gortsplib.TransportTCP
|
|
}
|
|
time.Sleep(time.Second * 5)
|
|
}
|
|
if rtsp.IsTimeout {
|
|
go rtsp.PullStream(streamPath, rtspUrl)
|
|
}
|
|
}()
|
|
} else {
|
|
go rtsp.startStream()
|
|
}
|
|
return
|
|
}
|
|
return errors.New("publish badname")
|
|
}
|
|
func (rtsp *RTSPClient) PushStream(streamPath string, rtspUrl string) (err error) {
|
|
if s := FindStream(streamPath); s != nil {
|
|
var tracks gortsplib.Tracks
|
|
var sub RTSPSubscriber
|
|
sub.Type = "RTSP push out"
|
|
sub.vt = s.WaitVideoTrack("h264", "h265")
|
|
sub.at = s.WaitAudioTrack("aac", "pcma", "pcmu")
|
|
ssrc := uintptr(unsafe.Pointer(&sub))
|
|
var trackIds = 0
|
|
if sub.vt != nil {
|
|
trackId := trackIds
|
|
var vtrack *gortsplib.Track
|
|
var vpacketer rtp.Packetizer
|
|
switch sub.vt.CodecID {
|
|
case codec.CodecID_H264:
|
|
if vtrack, err = gortsplib.NewTrackH264(96, &gortsplib.TrackConfigH264{
|
|
SPS: sub.vt.ExtraData.NALUs[0],
|
|
PPS: sub.vt.ExtraData.NALUs[1],
|
|
}); err == nil {
|
|
vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000)
|
|
} else {
|
|
return err
|
|
}
|
|
case codec.CodecID_H265:
|
|
vtrack = NewH265Track(96, sub.vt.ExtraData.NALUs)
|
|
vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &H265Payloader{}, rtp.NewFixedSequencer(1), 90000)
|
|
}
|
|
var st uint32
|
|
onVideo := func(ts uint32, pack *VideoPack) {
|
|
for _, nalu := range pack.NALUs {
|
|
for _, pack := range vpacketer.Packetize(nalu, (ts-st)*90) {
|
|
rtp, _ := pack.Marshal()
|
|
rtsp.WritePacketRTP(trackId, rtp)
|
|
}
|
|
}
|
|
st = ts
|
|
}
|
|
sub.OnVideo = func(ts uint32, pack *VideoPack) {
|
|
if st = ts; st != 0 {
|
|
sub.OnVideo = onVideo
|
|
}
|
|
onVideo(ts, pack)
|
|
}
|
|
tracks = append(tracks, vtrack)
|
|
trackIds++
|
|
}
|
|
if sub.at != nil {
|
|
var st uint32
|
|
trackId := trackIds
|
|
switch sub.at.CodecID {
|
|
case codec.CodecID_PCMA, codec.CodecID_PCMU:
|
|
atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.vt.CodecID])
|
|
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
|
|
sub.OnAudio = func(ts uint32, pack *AudioPack) {
|
|
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*8) {
|
|
buf, _ := pack.Marshal()
|
|
rtsp.WritePacketRTP(trackId, buf)
|
|
}
|
|
st = ts
|
|
}
|
|
tracks = append(tracks, atrack)
|
|
case codec.CodecID_AAC:
|
|
var mpegConf aac.MPEG4AudioConfig
|
|
mpegConf.Decode(sub.at.ExtraData[2:])
|
|
conf := &gortsplib.TrackConfigAAC{
|
|
Type: int(mpegConf.Type),
|
|
SampleRate: mpegConf.SampleRate,
|
|
ChannelCount: mpegConf.ChannelCount,
|
|
AOTSpecificConfig: mpegConf.AOTSpecificConfig,
|
|
}
|
|
if atrack, err := gortsplib.NewTrackAAC(97, conf); err == nil {
|
|
apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &AACPayloader{}, rtp.NewFixedSequencer(1), uint32(mpegConf.SampleRate))
|
|
sub.OnAudio = func(ts uint32, pack *AudioPack) {
|
|
for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*uint32(mpegConf.SampleRate)/1000) {
|
|
buf, _ := pack.Marshal()
|
|
rtsp.WritePacketRTP(trackId, buf)
|
|
}
|
|
st = ts
|
|
}
|
|
tracks = append(tracks, atrack)
|
|
}
|
|
}
|
|
}
|
|
return rtsp.StartPublishing(rtspUrl, tracks)
|
|
}
|
|
return errors.New("stream not exist")
|
|
}
|
|
func (client *RTSPClient) startStream() {
|
|
if client.Err() != nil {
|
|
return
|
|
}
|
|
client.Client = &gortsplib.Client{
|
|
OnPacketRTP: func(trackID int, payload []byte) {
|
|
var clone []byte
|
|
client.processFunc[trackID](append(clone, payload...))
|
|
},
|
|
Transport: &client.Transport,
|
|
}
|
|
defer client.Client.Close()
|
|
// parse URL
|
|
u, err := base.ParseURL(client.URL)
|
|
if err != nil {
|
|
Printf("ParseURL:%s error:%v", client.URL, err)
|
|
return
|
|
}
|
|
// connect to the server
|
|
if err = client.Start(u.Scheme, u.Host); err != nil {
|
|
Printf("connect:%s error:%v", client.URL, err)
|
|
return
|
|
}
|
|
var res *base.Response
|
|
if res, err = client.Options(u); err != nil {
|
|
Printf("option:%s error:%v", client.URL, err)
|
|
return
|
|
}
|
|
Println(res)
|
|
// find published tracks
|
|
tracks, baseURL, res, err := client.Describe(u)
|
|
if err != nil {
|
|
Printf("Describe:%s error:%v", client.URL, err)
|
|
return
|
|
}
|
|
Println(res)
|
|
if client.processFunc == nil {
|
|
client.setTracks(tracks)
|
|
}
|
|
for _, track := range tracks {
|
|
if res, err = client.Setup(true, baseURL, track, 0, 0); err != nil {
|
|
Printf("Setup:%s error:%v", baseURL.String(), err)
|
|
return
|
|
}
|
|
Println(res)
|
|
}
|
|
// start reading tracks
|
|
if res, err = client.Play(nil); err != nil {
|
|
Printf("Play:%s error:%v", baseURL.String(), err)
|
|
return
|
|
}
|
|
Println(res)
|
|
// wait until a fatal error
|
|
err = client.Wait()
|
|
Printf("Wait:%s error:%v", baseURL.String(), err)
|
|
}
|