实现rtsp拉流播放

This commit is contained in:
李宇翔
2021-07-19 20:07:01 +08:00
parent a2f5cb87b1
commit f68a3ee14b
4 changed files with 164 additions and 47 deletions

2
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/Monibuca/plugin-rtsp/v3
go 1.16
require (
github.com/Monibuca/engine/v3 v3.1.4
github.com/Monibuca/engine/v3 v3.2.0
github.com/Monibuca/utils/v3 v3.0.0
github.com/pion/rtp v1.6.5
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125

4
go.sum
View File

@@ -1,7 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/engine/v3 v3.1.4 h1:1IuBIzCegBdwqHNKb6jD0IKtUU5P/uAd3G6fnCKfNac=
github.com/Monibuca/engine/v3 v3.1.4/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650=
github.com/Monibuca/engine/v3 v3.2.0 h1:gAaw/5NFKvC1w7e1xP4IddP5gdC7Puz75hwwoZmzEeE=
github.com/Monibuca/engine/v3 v3.2.0/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650=
github.com/Monibuca/utils/v3 v3.0.0 h1:i8qCXQPQpRPgjuXKu5C2PYiL5LYzB6GW4xE162mB2ug=
github.com/Monibuca/utils/v3 v3.0.0/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY=

View File

@@ -3,6 +3,7 @@ package rtsp
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
@@ -11,10 +12,12 @@ import (
"strconv"
"strings"
"time"
"unsafe"
. "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/teris-io/shortid"
)
@@ -72,6 +75,10 @@ func (session *RTSP) SessionString() string {
}
func (session *RTSP) Stop() {
if session.Stream != nil {
session.Close()
collection.Delete(session.StreamPath)
}
if session.Conn != nil {
session.connRW.Flush()
session.Conn.Close()
@@ -85,10 +92,6 @@ func (session *RTSP) Stop() {
session.UDPServer.Stop()
session.UDPServer = nil
}
session.Close()
if session.Stream != nil {
collection.Delete(session.StreamPath)
}
}
// AcceptPush 接受推流
@@ -254,6 +257,7 @@ func (session *RTSP) handleRequest(req *Request) {
//}
Printf("<<<\n%s", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
var streamPath string
defer func() {
if p := recover(); p != nil {
Printf("handleRequest err ocurs:%v", p)
@@ -271,6 +275,63 @@ func (session *RTSP) handleRequest(req *Request) {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
sub := Subscriber{
ID: session.ID,
Type: "RTSP",
}
if sub.Subscribe(streamPath) == nil {
at, vt := session.UDPClient.AT, session.UDPClient.VT
if vt != nil {
var st uint32
onVideo := func(pack VideoPack) {
if session.UDPClient == nil {
return
}
for _, nalu := range pack.NALUs {
for _, pack := range session.UDPClient.VPacketizer.Packetize(nalu, (pack.Timestamp-st)*90) {
p := &RTPPack{
Type: RTP_TYPE_VIDEO,
Packet: *pack,
}
p.Raw, _ = p.Marshal()
session.SendRTP(p)
}
}
st = pack.Timestamp
}
sub.OnVideo = func(pack VideoPack) {
if st = pack.Timestamp; st != 0 {
sub.OnVideo = onVideo
}
onVideo(pack)
}
}
if at != nil {
tb := uint32(at.SoundRate / 1000)
var st uint32
onAudio := func(pack AudioPack) {
if session.UDPClient == nil {
return
}
for _, pack := range session.UDPClient.APacketizer.Packetize(pack.Payload, (pack.Timestamp-st)*tb) {
p := &RTPPack{
Type: RTP_TYPE_VIDEO,
Packet: *pack,
}
p.Raw, _ = p.Marshal()
session.SendRTP(p)
}
st = pack.Timestamp
}
sub.OnAudio = func(pack AudioPack) {
if st = pack.Timestamp; st != 0 {
sub.OnAudio = onAudio
}
onAudio(pack)
}
}
go sub.Play(at, vt)
}
// if session.Pusher.HasPlayer(session.Player) {
// session.Player.Pause(false)
// } else {
@@ -288,6 +349,14 @@ func (session *RTSP) handleRequest(req *Request) {
session.Stop()
}
}()
session.URL = req.URL
_url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath = strings.TrimPrefix(_url.Path, "/")
if req.Method != "OPTIONS" {
if session.Auth != nil {
authLine := req.Header["Authorization"]
@@ -305,7 +374,7 @@ func (session *RTSP) handleRequest(req *Request) {
res.Status = "Unauthorized"
nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate())))
session.nonce = nonce
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce)
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="Monibuca", nonce="%s", algorithm="MD5"`, nonce)
return
}
}
@@ -315,24 +384,9 @@ func (session *RTSP) handleRequest(req *Request) {
res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD"
case "ANNOUNCE":
session.Type = SESSION_TYPE_PUSHER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := strings.TrimPrefix(url.Path, "/")
session.SDPRaw = req.Body
session.SDPMap = ParseSDP(req.Body)
stream := &Stream{
StreamPath: streamPath,
Type: "RTSP",
}
session.Stream = stream
if session.Publish() {
if session.Stream = Publish(streamPath, "RTSP"); session.Stream != nil {
if session.ASdp, session.HasAudio = session.SDPMap["audio"]; session.HasAudio {
session.setAudioTrack()
Printf("audio codec[%s]\n", session.ASdp.Codec)
@@ -346,20 +400,64 @@ func (session *RTSP) handleRequest(req *Request) {
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := url.Path
stream := FindStream(streamPath)
if stream == nil {
res.StatusCode = 404
res.Status = "No Such Stream:" + streamPath
return
}
//
//res.SetBody(session.SDPRaw)
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %d", session.ID, 0),
"s=monibuca",
"t=0 0",
"a=recvonly",
}
ssrc := uintptr(unsafe.Pointer(stream))
if session.UDPClient == nil {
session.UDPClient = &UDPClient{
Conn: session.Conn.Conn,
}
}
vt, at := stream.WaitVideoTrack(), stream.WaitAudioTrack()
if vt != nil {
session.UDPClient.VT = vt
sdpInfo = append(sdpInfo, "m=video 0 RTP/AVP 96")
switch vt.CodecID {
case 7:
sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0])
pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1])
session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000)
sdpInfo = append(sdpInfo, "a=rtpmap:96 H264/90000",
fmt.Sprintf("a=fmtp:96 profile-level-id=%02X00%02X; packetization-mode=1; sprop-parameter-sets=%s,%s", vt.SPSInfo.ProfileIdc, vt.SPSInfo.LevelIdc*10, sps, pps))
case 12:
vps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0])
sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1])
pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[2])
// TODO:
// session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H265Payloader{}, rtp.NewFixedSequencer(1), 90000)
sdpInfo = append(sdpInfo, "a=rtpmap:96 H265/90000",
fmt.Sprintf("a=fmtp:96 packetization-mode=1;sprop-vps=%s;sprop-sps=%s;sprop-pps=%s", vps, sps, pps))
}
}
if at != nil {
sdpInfo = append(sdpInfo, "m=audio 0 RTP/AVP 97")
switch at.CodecID {
case 7:
sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMA/8000")
session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
session.UDPClient.AT = at
case 8:
sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMU/8000")
session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000)
session.UDPClient.AT = at
case 10:
// TODO:
sdpInfo = append(sdpInfo, fmt.Sprintf("a=rtpmap:97 MPEG4-GENERIC/%d/%d", at.SoundRate, at.Channels))
}
}
session.SDPRaw = strings.Join(sdpInfo, "\r\n") + "\r\n"
res.SetBody(session.SDPRaw)
case "SETUP":
ts := req.Header["Transport"]
// control字段可能是`stream=1`字样也可能是rtsp://...字样。即control可能是url的path也可能是整个url
@@ -369,16 +467,10 @@ func (session *RTSP) handleRequest(req *Request) {
// a=control:rtsp://192.168.1.64/trackID=1
// 例3
// a=control:?ctype=video
setupUrl, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
if _url.Port() == "" {
_url.Host = fmt.Sprintf("%s:554", _url.Host)
}
if setupUrl.Port() == "" {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()
setupPath := _url.String()
// error status. SETUP without ANNOUNCE or DESCRIBE.
//if session.Pusher == nil {
@@ -508,6 +600,25 @@ func (session *RTSP) handleRequest(req *Request) {
ts = strings.Join(tss, ";")
}
} else {
if session.Type == SESSEION_TYPE_PLAYER {
if session.UDPClient.VPort == 0 {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
} else {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}
}
Printf("SETUP [UDP] got UnKown control:%s", setupPath)
}
}

View File

@@ -5,10 +5,13 @@ import (
"net"
"strings"
. "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
"github.com/pion/rtp"
)
type UDPClient struct {
Conn net.Conn
APort int
AConn *net.UDPConn
AControlPort int
@@ -17,8 +20,11 @@ type UDPClient struct {
VConn *net.UDPConn
VControlPort int
VControlConn *net.UDPConn
Stoped bool
AT *AudioTrack
APacketizer rtp.Packetizer
VT *VideoTrack
VPacketizer rtp.Packetizer
Stoped bool
}
func (s *UDPClient) Stop() {
@@ -51,7 +57,7 @@ func (c *UDPClient) SetupAudio() (err error) {
c.Stop()
}
}()
host := c.AConn.RemoteAddr().String()
host := c.Conn.RemoteAddr().String()
host = host[:strings.LastIndex(host, ":")]
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort))
if err != nil {
@@ -93,7 +99,7 @@ func (c *UDPClient) SetupVideo() (err error) {
c.Stop()
}
}()
host := c.VConn.RemoteAddr().String()
host := c.Conn.RemoteAddr().String()
host = host[:strings.LastIndex(host, ":")]
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort))
if err != nil {