Files
plugin-rtsp/session.go
2020-05-10 18:28:05 +08:00

639 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rtsp
import (
"bytes"
"crypto/md5"
"encoding/binary"
"fmt"
"io"
"net/url"
"regexp"
"strconv"
"strings"
"time"
. "github.com/Monibuca/engine/v2"
"github.com/teris-io/shortid"
)
type RTPPack struct {
Type RTPType
Buffer []byte
}
type SessionType int
const (
SESSION_TYPE_PUSHER SessionType = iota
SESSEION_TYPE_PLAYER
)
func (st SessionType) String() string {
switch st {
case SESSION_TYPE_PUSHER:
return "pusher"
case SESSEION_TYPE_PLAYER:
return "player"
}
return "unknow"
}
type RTPType int
const (
RTP_TYPE_AUDIO RTPType = iota
RTP_TYPE_VIDEO
RTP_TYPE_AUDIOCONTROL
RTP_TYPE_VIDEOCONTROL
)
func (rt RTPType) String() string {
switch rt {
case RTP_TYPE_AUDIO:
return "audio"
case RTP_TYPE_VIDEO:
return "video"
case RTP_TYPE_AUDIOCONTROL:
return "audio control"
case RTP_TYPE_VIDEOCONTROL:
return "video control"
}
return "unknow"
}
type TransType int
const (
TRANS_TYPE_TCP TransType = iota
TRANS_TYPE_UDP
)
func (tt TransType) String() string {
switch tt {
case TRANS_TYPE_TCP:
return "TCP"
case TRANS_TYPE_UDP:
return "UDP"
}
return "unknow"
}
const UDP_BUF_SIZE = 1048576
func (session *RTSP) SessionString() string {
return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.StreamPath, session.ID, session.Conn.RemoteAddr().String())
}
func (session *RTSP) Stop() {
if session.Conn != nil {
session.connRW.Flush()
session.Conn.Close()
session.Conn = nil
}
if session.UDPClient != nil {
session.UDPClient.Stop()
session.UDPClient = nil
}
if session.UDPServer != nil {
session.UDPServer.Stop()
session.UDPServer = nil
}
if session.Running() {
collection.Delete(session.StreamPath)
session.Cancel()
}
}
// AcceptPush 接受推流
func (session *RTSP) AcceptPush() {
defer session.Stop()
buf2 := make([]byte, 2)
timer := time.Unix(0, 0)
for {
buf1, err := session.connRW.ReadByte()
if err != nil {
Println(err)
return
}
if buf1 == 0x24 { //rtp data
if buf1, err = session.connRW.ReadByte(); err != nil {
Println(err)
return
}
if _, err := io.ReadFull(session.connRW, buf2); err != nil {
Println(err)
return
}
channel := int(buf1)
rtpLen := int(binary.BigEndian.Uint16(buf2))
rtpBytes := make([]byte, rtpLen)
if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil {
Println(err)
return
}
var pack *RTPPack
switch channel {
case session.aRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBytes,
}
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an audio RTP package")
timer = time.Now()
}
case session.aRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBytes,
}
case session.vRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBytes,
}
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
Println("Recv an video RTP package")
timer = time.Now()
}
case session.vRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBytes,
}
default:
Printf("unknow rtp pack type, %v", pack.Type)
continue
}
if pack == nil {
Printf("session tcp got nil rtp pack")
continue
}
session.InBytes += rtpLen + 4
session.handleRTP(pack)
} else { // rtsp cmd
reqBuf := bytes.NewBuffer(nil)
reqBuf.WriteByte(buf1)
for {
if line, isPrefix, err := session.connRW.ReadLine(); err != nil {
Println(err)
return
} else {
reqBuf.Write(line)
if !isPrefix {
reqBuf.WriteString("\r\n")
}
if len(line) == 0 {
req := NewRequest(reqBuf.String())
if req == nil {
break
}
session.InBytes += reqBuf.Len()
contentLen := req.GetContentLength()
session.InBytes += contentLen
if contentLen > 0 {
bodyBuf := make([]byte, contentLen)
if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil {
Println(err)
return
} else if n != contentLen {
Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n)
return
}
req.Body = string(bodyBuf)
}
session.handleRequest(req)
break
}
}
}
}
}
}
func (session *RTSP) CheckAuth(authLine string, method string) error {
realmRex := regexp.MustCompile(`realm="(.*?)"`)
nonceRex := regexp.MustCompile(`nonce="(.*?)"`)
usernameRex := regexp.MustCompile(`username="(.*?)"`)
responseRex := regexp.MustCompile(`response="(.*?)"`)
uriRex := regexp.MustCompile(`uri="(.*?)"`)
realm := ""
nonce := ""
username := ""
response := ""
uri := ""
result1 := realmRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
realm = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no realm found")
}
result1 = nonceRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
nonce = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no nonce found")
}
if session.nonce != nonce {
return fmt.Errorf("CheckAuth error : sessionNonce not same as nonce")
}
result1 = usernameRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
username = result1[1]
} else {
return fmt.Errorf("CheckAuth error : username not found")
}
result1 = responseRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
response = result1[1]
} else {
return fmt.Errorf("CheckAuth error : response not found")
}
result1 = uriRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
uri = result1[1]
} else {
return fmt.Errorf("CheckAuth error : uri not found")
}
// var user models.User
// err := db.SQLite.Where("Username = ?", username).First(&user).Error
// if err != nil {
// return fmt.Errorf("CheckAuth error : user not exists")
// }
md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, session.Auth(username)))))
md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri))))
myResponse := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL))))
if myResponse != response {
return fmt.Errorf("CheckAuth error : response not equal")
}
return nil
}
func (session *RTSP) handleRequest(req *Request) {
//if session.Timeout > 0 {
// session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second))
//}
Printf("<<<\n%s", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
defer func() {
if p := recover(); p != nil {
Printf("handleRequest err ocurs:%v", p)
res.StatusCode = 500
res.Status = fmt.Sprintf("Inner Server Error, %v", p)
}
Printf(">>>\n%s", res)
outBytes := []byte(res.String())
session.connWLock.Lock()
session.connRW.Write(outBytes)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(outBytes)
switch req.Method {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
// if session.Pusher.HasPlayer(session.Player) {
// session.Player.Pause(false)
// } else {
// session.Pusher.AddPlayer(session.Player)
// }
}
case "TEARDOWN":
{
session.Stop()
return
}
}
if res.StatusCode != 200 && res.StatusCode != 401 {
Printf("Response request error[%d]. stop session.", res.StatusCode)
session.Stop()
}
}()
if req.Method != "OPTIONS" {
if session.Auth != nil {
authLine := req.Header["Authorization"]
authFailed := true
if authLine != "" {
err := session.CheckAuth(authLine, req.Method)
if err == nil {
authFailed = false
} else {
Printf("%v", err)
}
}
if authFailed {
res.StatusCode = 401
res.Status = "Unauthorized"
nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate())))
session.nonce = nonce
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce)
return
}
}
}
switch req.Method {
case "OPTIONS":
res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD"
case "ANNOUNCE":
session.Type = SESSION_TYPE_PUSHER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := strings.TrimPrefix(url.Path,"/")
session.SDPRaw = req.Body
session.SDPMap = ParseSDP(req.Body)
sdp, ok := session.SDPMap["audio"]
if ok {
session.AControl = sdp.Control
session.ACodec = sdp.Codec
session.AudioSpecificConfig = sdp.Config
Printf("audio codec[%s]\n", session.ACodec)
}
if sdp, ok = session.SDPMap["video"];ok {
session.VControl = sdp.Control
session.VCodec = sdp.Codec
session.SPS = sdp.SpropParameterSets[0]
session.PPS = sdp.SpropParameterSets[1]
Printf("video codec[%s]\n", session.VCodec)
}
if session.Publisher.Publish(streamPath) {
session.Stream.Type = "RTSP"
session.RTSPInfo.StreamInfo = &session.Stream.StreamInfo
collection.Store(streamPath, session)
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
streamPath := url.Path
stream := FindStream(streamPath)
if stream == nil {
return
}
//
//res.SetBody(session.SDPRaw)
case "SETUP":
ts := req.Header["Transport"]
// control字段可能是`stream=1`字样也可能是rtsp://...字样。即control可能是url的path也可能是整个url
// 例1
// a=control:streamid=1
// 例2
// a=control:rtsp://192.168.1.64/trackID=1
// 例3
// a=control:?ctype=video
setupUrl, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
if setupUrl.Port() == "" {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()
// error status. SETUP without ANNOUNCE or DESCRIBE.
//if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
//}
vPath := ""
if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 {
vControlUrl, err := url.Parse(session.VControl)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid VControl"
return
}
if vControlUrl.Port() == "" {
vControlUrl.Host = fmt.Sprintf("%s:554", vControlUrl.Host)
}
vPath = vControlUrl.String()
} else {
vPath = session.VControl
}
aPath := ""
if strings.Index(strings.ToLower(session.AControl), "rtsp://") == 0 {
aControlUrl, err := url.Parse(session.AControl)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid AControl"
return
}
if aControlUrl.Port() == "" {
aControlUrl.Host = fmt.Sprintf("%s:554", aControlUrl.Host)
}
aPath = aControlUrl.String()
} else {
aPath = session.AControl
}
mtcp := regexp.MustCompile("interleaved=(\\d+)(-(\\d+))?")
mudp := regexp.MustCompile("client_port=(\\d+)(-(\\d+))?")
if tcpMatchs := mtcp.FindStringSubmatch(ts); tcpMatchs != nil {
session.TransType = TRANS_TYPE_TCP
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
session.aRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.aRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
session.vRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.vRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else {
res.StatusCode = 500
res.Status = fmt.Sprintf("SETUP [TCP] got UnKown control:%s", setupPath)
Printf("SETUP [TCP] got UnKown control:%s", setupPath)
}
Printf("Parse SETUP req.TRANSPORT:TCP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
} else if udpMatchs := mudp.FindStringSubmatch(ts); udpMatchs != nil {
session.TransType = TRANS_TYPE_UDP
// no need for tcp timeout.
session.Conn.timeout = 0
if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil {
session.UDPClient = &UDPClient{}
}
if session.Type == SESSION_TYPE_PUSHER && session.UDPServer == nil {
session.UDPServer = &UDPServer{
Session: session,
}
}
Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.UDPServer.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup audio error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.APort, session.UDPServer.AControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.UDPServer.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup video error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.VPort, session.UDPServer.VControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else {
Printf("SETUP [UDP] got UnKown control:%s", setupPath)
}
}
res.Header["Transport"] = ts
case "PLAY":
// error status. PLAY without ANNOUNCE or DESCRIBE.
// if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
res.Header["Range"] = req.Header["Range"]
case "RECORD":
// error status. RECORD without ANNOUNCE or DESCRIBE.
// if session.Pusher == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
case "PAUSE":
// if session.Player == nil {
// res.StatusCode = 500
// res.Status = "Error Status"
// return
// }
// session.Player.Pause(true)
}
}
func (session *RTSP) SendRTP(pack *RTPPack) (err error) {
if pack == nil {
err = fmt.Errorf("player send rtp got nil pack")
return
}
if session.TransType == TRANS_TYPE_UDP {
if session.UDPClient == nil {
err = fmt.Errorf("player use udp transport but udp client not found")
return
}
err = session.UDPClient.SendRTP(pack)
session.OutBytes += len(pack.Buffer)
return
}
switch pack.Type {
case RTP_TYPE_AUDIO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
case RTP_TYPE_AUDIOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
case RTP_TYPE_VIDEO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
case RTP_TYPE_VIDEOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer)))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(pack.Buffer) + 4
default:
err = fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type)
}
return
}