Files
plugin-rtsp/session.go
langhuihui 2e39eabcba 适配3.0
2021-02-17 21:54:37 +08:00

624 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rtsp
import (
"bytes"
"crypto/md5"
"encoding/binary"
"fmt"
"io"
"net/url"
"regexp"
"strconv"
"strings"
"time"
. "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp"
"github.com/teris-io/shortid"
)
type RTPPack struct {
Type RTPType
rtp.Packet
}
type SessionType int
const (
SESSION_TYPE_PUSHER SessionType = iota
SESSEION_TYPE_PLAYER
)
func (st SessionType) String() string {
switch st {
case SESSION_TYPE_PUSHER:
return "pusher"
case SESSEION_TYPE_PLAYER:
return "player"
}
return "unknow"
}
type RTPType int
const (
RTP_TYPE_AUDIO RTPType = iota
RTP_TYPE_VIDEO
RTP_TYPE_AUDIOCONTROL
RTP_TYPE_VIDEOCONTROL
)
type TransType int
const (
TRANS_TYPE_TCP TransType = iota
TRANS_TYPE_UDP
)
func (tt TransType) String() string {
switch tt {
case TRANS_TYPE_TCP:
return "TCP"
case TRANS_TYPE_UDP:
return "UDP"
}
return "unknow"
}
const UDP_BUF_SIZE = 1048576
func (session *RTSP) SessionString() string {
return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.StreamPath, session.ID, session.Conn.RemoteAddr().String())
}
func (session *RTSP) Stop() {
if session.Conn != nil {
session.connRW.Flush()
session.Conn.Close()
session.Conn = nil
}
if session.UDPClient != nil {
session.UDPClient.Stop()
session.UDPClient = nil
}
if session.UDPServer != nil {
session.UDPServer.Stop()
session.UDPServer = nil
}
session.Close()
if session.Stream != nil {
collection.Delete(session.StreamPath)
}
}
// AcceptPush 接受推流
func (session *RTSP) AcceptPush() {
defer session.Stop()
buf2 := make([]byte, 2)
timer := time.Unix(0, 0)
for {
buf1, err := session.connRW.ReadByte()
if err != nil {
Println(err)
return
}
if buf1 == 0x24 { //rtp data
if buf1, err = session.connRW.ReadByte(); err != nil {
Println(err)
return
}
if _, err := io.ReadFull(session.connRW, buf2); err != nil {
Println(err)
return
}
channel := int(buf1)
rtpLen := int(binary.BigEndian.Uint16(buf2))
pack := new(RTPPack)
rtpBytes := make([]byte, rtpLen)
if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil {
Println(err)
return
}
if err = pack.Unmarshal(rtpBytes); err != nil {
Println(err)
return
}
switch channel {
case session.aRTPChannel:
pack.Type = RTP_TYPE_AUDIO
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an audio RTP package")
timer = time.Now()
}
session.OriginAudioTrack.Push(pack.Timestamp,pack.Payload)
case session.aRTPControlChannel:
pack.Type = RTP_TYPE_AUDIOCONTROL
case session.vRTPChannel:
pack.Type = RTP_TYPE_VIDEO
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an video RTP package")
timer = time.Now()
}
session.OriginVideoTrack.Push(pack.Timestamp,pack.Payload)
case session.vRTPControlChannel:
pack.Type = RTP_TYPE_VIDEOCONTROL
default:
Printf("unknow rtp pack type, %v", pack.Type)
continue
}
session.InBytes += rtpLen + 4
} else { // rtsp cmd
reqBuf := bytes.NewBuffer(nil)
reqBuf.WriteByte(buf1)
for {
if line, isPrefix, err := session.connRW.ReadLine(); err != nil {
Println(err)
return
} else {
reqBuf.Write(line)
if !isPrefix {
reqBuf.WriteString("\r\n")
}
if len(line) == 0 {
req := NewRequest(reqBuf.String())
if req == nil {
break
}
session.InBytes += reqBuf.Len()
contentLen := req.GetContentLength()
session.InBytes += contentLen
if contentLen > 0 {
bodyBuf := make([]byte, contentLen)
if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil {
Println(err)
return
} else if n != contentLen {
Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n)
return
}
req.Body = string(bodyBuf)
}
session.handleRequest(req)
break
}
}
}
}
}
}
func (session *RTSP) CheckAuth(authLine string, method string) error {
realmRex := regexp.MustCompile(`realm="(.*?)"`)
nonceRex := regexp.MustCompile(`nonce="(.*?)"`)
usernameRex := regexp.MustCompile(`username="(.*?)"`)
responseRex := regexp.MustCompile(`response="(.*?)"`)
uriRex := regexp.MustCompile(`uri="(.*?)"`)
realm := ""
nonce := ""
username := ""
response := ""
uri := ""
result1 := realmRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
realm = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no realm found")
}
result1 = nonceRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
nonce = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no nonce found")
}
if session.nonce != nonce {
return fmt.Errorf("CheckAuth error : sessionNonce not same as nonce")
}
result1 = usernameRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
username = result1[1]
} else {
return fmt.Errorf("CheckAuth error : username not found")
}
result1 = responseRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
response = result1[1]
} else {
return fmt.Errorf("CheckAuth error : response not found")
}
result1 = uriRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
uri = result1[1]
} else {
return fmt.Errorf("CheckAuth error : uri not found")
}
// var user models.User
// err := db.SQLite.Where("Username = ?", username).First(&user).Error
// if err != nil {
// return fmt.Errorf("CheckAuth error : user not exists")
// }
md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, session.Auth(username)))))
md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri))))
myResponse := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL))))
if myResponse != response {
return fmt.Errorf("CheckAuth error : response not equal")
}
return nil
}
func (session *RTSP) handleRequest(req *Request) {
//if session.Timeout > 0 {
// session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second))
//}
Printf("<<<\n%s", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
defer func() {
if p := recover(); p != nil {
Printf("handleRequest err ocurs:%v", p)
res.StatusCode = 500
res.Status = fmt.Sprintf("Inner Server Error, %v", p)
}
Printf(">>>\n%s", res)
outBytes := []byte(res.String())
session.connWLock.Lock()
session.connRW.Write(outBytes)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(outBytes)
switch req.Method {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
// if session.Pusher.HasPlayer(session.Player) {
// session.Player.Pause(false)
// } else {
// session.Pusher.AddPlayer(session.Player)
// }
}
case "TEARDOWN":
{
session.Stop()
return
}
}
if res.StatusCode != 200 && res.StatusCode != 401 {
Printf("Response request error[%d]. stop session.", res.StatusCode)
session.Stop()
}
}()
if req.Method != "OPTIONS" {
if session.Auth != nil {
authLine := req.Header["Authorization"]
authFailed := true
if authLine != "" {
err := session.CheckAuth(authLine, req.Method)
if err == nil {
authFailed = false
} else {
Printf("%v", err)
}
}
if authFailed {
res.StatusCode = 401
res.Status = "Unauthorized"
nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate())))
session.nonce = nonce
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce)
return
}
}
}
switch req.Method {
case "OPTIONS":
res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD"
case "ANNOUNCE":
session.Type = SESSION_TYPE_PUSHER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := strings.TrimPrefix(url.Path, "/")
session.SDPRaw = req.Body
session.SDPMap = ParseSDP(req.Body)
if session.Publish(streamPath) {
if session.ASdp, session.HasAudio = session.SDPMap["audio"]; session.HasAudio {
at := NewAudioTrack()
if len(session.ASdp.Control) > 0 {
at.SetASC(session.ASdp.Config)
} else {
session.setAudioFormat(at)
}
session.SetOriginAT(at)
Printf("audio codec[%s]\n", session.ASdp.Codec)
}
if session.VSdp, session.HasVideo = session.SDPMap["video"]; session.HasVideo {
if len(session.VSdp.SpropParameterSets) > 1 {
vt := NewVideoTrack()
vt.CodecID = 7
vt.Push(0, session.VSdp.SpropParameterSets[0])
vt.Push(0, session.VSdp.SpropParameterSets[1])
vt.SPSInfo, err = codec.ParseSPS(vt.SPS)
session.SetOriginVT(vt)
}
Printf("video codec[%s]\n", session.VSdp.Codec)
}
session.Stream.Type = "RTSP"
collection.Store(streamPath, session)
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := url.Path
stream := FindStream(streamPath)
if stream == nil {
return
}
//
//res.SetBody(session.SDPRaw)
case "SETUP":
ts := req.Header["Transport"]
// control字段可能是`stream=1`字样也可能是rtsp://...字样。即control可能是url的path也可能是整个url
// 例1
// a=control:streamid=1
// 例2
// a=control:rtsp://192.168.1.64/trackID=1
// 例3
// a=control:?ctype=video
setupUrl, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
if setupUrl.Port() == "" {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()
// error status. SETUP without ANNOUNCE or DESCRIBE.
//if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
//}
var vPath, aPath string
if session.HasVideo {
if strings.Index(strings.ToLower(session.VSdp.Control), "rtsp://") == 0 {
vControlUrl, err := url.Parse(session.VSdp.Control)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid VControl"
return
}
if vControlUrl.Port() == "" {
vControlUrl.Host = fmt.Sprintf("%s:554", vControlUrl.Host)
}
vPath = vControlUrl.String()
} else {
vPath = session.VSdp.Control
}
}
if session.HasAudio {
if strings.Index(strings.ToLower(session.ASdp.Control), "rtsp://") == 0 {
aControlUrl, err := url.Parse(session.ASdp.Control)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid AControl"
return
}
if aControlUrl.Port() == "" {
aControlUrl.Host = fmt.Sprintf("%s:554", aControlUrl.Host)
}
aPath = aControlUrl.String()
} else {
aPath = session.ASdp.Control
}
}
mtcp := regexp.MustCompile("interleaved=(\\d+)(-(\\d+))?")
mudp := regexp.MustCompile("client_port=(\\d+)(-(\\d+))?")
if tcpMatchs := mtcp.FindStringSubmatch(ts); tcpMatchs != nil {
session.TransType = TRANS_TYPE_TCP
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
session.aRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.aRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
session.vRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.vRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else {
res.StatusCode = 500
res.Status = fmt.Sprintf("SETUP [TCP] got UnKown control:%s", setupPath)
Printf("SETUP [TCP] got UnKown control:%s", setupPath)
}
Printf("Parse SETUP req.TRANSPORT:TCP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
} else if udpMatchs := mudp.FindStringSubmatch(ts); udpMatchs != nil {
session.TransType = TRANS_TYPE_UDP
// no need for tcp timeout.
session.Conn.timeout = 0
if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil {
session.UDPClient = &UDPClient{}
}
if session.Type == SESSION_TYPE_PUSHER && session.UDPServer == nil {
session.UDPServer = &UDPServer{
Session: session,
}
}
Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.UDPServer.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup audio error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.APort, session.UDPServer.AControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.UDPServer.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup video error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.VPort, session.UDPServer.VControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else {
Printf("SETUP [UDP] got UnKown control:%s", setupPath)
}
}
res.Header["Transport"] = ts
case "PLAY":
// error status. PLAY without ANNOUNCE or DESCRIBE.
// if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
res.Header["Range"] = req.Header["Range"]
case "RECORD":
// error status. RECORD without ANNOUNCE or DESCRIBE.
// if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
case "PAUSE":
// if session.Player == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
// session.Player.Pause(true)
}
}
func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
if pack == nil {
err = fmt.Errorf("player send rtp got nil pack")
return
}
if session.TransType == TRANS_TYPE_UDP {
if session.UDPClient == nil {
err = fmt.Errorf("player use udp transport but udp client not found")
return
}
err = session.UDPClient.SendRTP(pack)
session.OutBytes += len(pack.Raw)
return
}
switch pack.Type {
case RTP_TYPE_AUDIO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_AUDIOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_VIDEO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Raw) + 4
case RTP_TYPE_VIDEOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Raw)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Raw) + 4
default:
err = fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type)
}
return
}