开发中

This commit is contained in:
李宇翔
2020-03-06 18:28:33 +08:00
commit e23acd0de7
5 changed files with 732 additions and 0 deletions

12
README.md Normal file
View File

@@ -0,0 +1,12 @@
# Monibuca 的RTSP 插件
主要功能是对RTSP地址进行拉流转换
## 插件名称
RTSP
## 配置
## 使用方法

521
client.go Normal file
View File

@@ -0,0 +1,521 @@
package rtspplugin
import (
"crypto/md5"
b64 "encoding/base64"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"net/url"
"strconv"
"strings"
"time"
)
var (
VideoWidth int
VideoHeight int
)
type RtspClient struct {
socket net.Conn
OutGoing chan []byte //out chanel
Signals chan bool //Signals quit
host string //host
port string //port
uri string //url
auth bool //aut
login string
password string //password
session string //rtsp session
responce string //responce string
bauth string //string b auth
track []string //rtsp track
cseq int //qury number
videow int
videoh int
}
//вернет пустой инициализированный обьект
func RtspClientNew() *RtspClient {
Obj := &RtspClient{
cseq: 1, //стартовый номер запроса
Signals: make(chan bool, 1), //буферизируемый канал на 1 сообщение
OutGoing: make(chan []byte, 100000), //буферизиуемый канал на 100000 байт
}
return Obj
}
//основная функция работы с rtsp
func (this *RtspClient) Client(rtsp_url string) (bool, string) {
//проверить и отпарсить url
if !this.ParseUrl(rtsp_url) {
return false, "Не верный url"
}
//установить подключение к камере
if !this.Connect() {
return false, "Не возможно подключиться"
}
//фаза 1 OPTIONS первый этап общения с камерой
//отправляем запрос OPTIONS
if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\n\r\n") {
return false, "Не возможно отправить сообщение OPTIONS"
}
//читаем ответ на запрос OPTIONS
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ OPTIONS соединение потеряно"
} else if status && strings.Contains(message, "Digest") {
if !this.AuthDigest("OPTIONS", message) {
return false, "Требуеться авторизация Digest"
}
} else if status && strings.Contains(message, "Basic") {
if !this.AuthBasic("OPTIONS", message) {
return false, "Требуеться авторизация Basic"
}
} else if !strings.Contains(message, "200") {
return false, "Ошибка OPTIONS not status code 200 OK " + message
}
////////////PHASE 2 DESCRIBE
log.Println("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n")
if !this.Write("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") {
return false, "Не возможно отправть запрос DESCRIBE"
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ DESCRIBE соединение потеряно ?"
} else if status && strings.Contains(message, "Digest") {
if !this.AuthDigest("DESCRIBE", message) {
return false, "Требуеться авторизация Digest"
}
} else if status && strings.Contains(message, "Basic") {
if !this.AuthBasic("DESCRIBE", message) {
return false, "Требуеться авторизация Basic"
}
} else if !strings.Contains(message, "200") {
return false, "Ошибка DESCRIBE not status code 200 OK " + message
} else {
log.Println(message)
this.track = this.ParseMedia(message)
}
if len(this.track) == 0 {
return false, "Ошибка track not found "
}
//PHASE 3 SETUP
log.Println("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n")
if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ SETUP соединение потеряно"
} else if !strings.Contains(message, "200") {
if strings.Contains(message, "401") {
str := this.AuthDigest_Only("SETUP", message)
if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + str + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ SETUP соединение потеряно"
} else if !strings.Contains(message, "200") {
return false, "Ошибка SETUP not status code 200 OK " + message
} else {
this.session = ParseSession(message)
}
} else {
return false, "Ошибка SETUP not status code 200 OK " + message
}
} else {
log.Println(message)
this.session = ParseSession(message)
log.Println(this.session)
}
if len(this.track) > 1 {
if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ SETUP Audio соединение потеряно"
} else if !strings.Contains(message, "200") {
if strings.Contains(message, "401") {
str := this.AuthDigest_Only("SETUP", message)
if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + this.bauth + str + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ SETUP Audio соединение потеряно"
} else if !strings.Contains(message, "200") {
return false, "Ошибка SETUP not status code 200 OK " + message
} else {
log.Println(message)
this.session = ParseSession(message)
}
} else {
return false, "Ошибка SETUP not status code 200 OK " + message
}
} else {
log.Println(message)
this.session = ParseSession(message)
}
}
//PHASE 4 SETUP
log.Println("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n")
if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ PLAY соединение потеряно"
} else if !strings.Contains(message, "200") {
//return false, "Ошибка PLAY not status code 200 OK " + message
if strings.Contains(message, "401") {
str := this.AuthDigest_Only("PLAY", message)
if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + str + "\r\n\r\n") {
return false, ""
}
if status, message := this.Read(); !status {
return false, "Не возможно прочитать ответ PLAY соединение потеряно"
} else if !strings.Contains(message, "200") {
return false, "Ошибка PLAY not status code 200 OK " + message
} else {
//this.session = ParseSession(message)
log.Print(message)
go this.RtspRtpLoop()
return true, "ok"
}
} else {
return false, "Ошибка PLAY not status code 200 OK " + message
}
} else {
log.Print(message)
go this.RtspRtpLoop()
return true, "ok"
}
return false, "other error"
}
/*
The RTP header has the following format:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC |M| PT | sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| synchronization source (SSRC) identifier |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| contributing source (CSRC) identifiers |
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
version (V): 2 bits
This field identifies the version of RTP. The version defined by
this specification is two (2). (The value 1 is used by the first
draft version of RTP and the value 0 is used by the protocol
initially implemented in the "vat" audio tool.)
padding (P): 1 bit
If the padding bit is set, the packet contains one or more
additional padding octets at the end which are not part of the
payload. The last octet of the padding contains a count of how
many padding octets should be ignored, including itself. Padding
may be needed by some encryption algorithms with fixed block sizes
or for carrying several RTP packets in a lower-layer protocol data
unit.
extension (X): 1 bit
If the extension bit is set, the fixed header MUST be followed by
exactly one header extension, with a format defined in Section
5.3.1.
*/
func (this *RtspClient) RtspRtpLoop() {
defer func() {
this.Signals <- true
}()
header := make([]byte, 4)
payload := make([]byte, 4096)
//sync := make([]byte, 256)
sync_b := make([]byte, 1)
timer := time.Now()
for {
if int(time.Now().Sub(timer).Seconds()) > 50 {
if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") {
return
}
timer = time.Now()
}
this.socket.SetDeadline(time.Now().Add(50 * time.Second))
//read rtp hdr 4
if n, err := io.ReadFull(this.socket, header); err != nil || n != 4 {
//rtp hdr read error
return
}
//log.Println(header)
if header[0] != 36 {
//log.Println("desync?", this.host)
for {
///////////////////////////skeep/////////////////////////////////////
if n, err := io.ReadFull(this.socket, sync_b); err != nil && n != 1 {
return
} else if sync_b[0] == 36 {
header[0] = 36
if n, err := io.ReadFull(this.socket, header[1:]); err != nil && n == 3 {
return
}
break
}
}
/*
//вычитываем 256 в попытке отсять мусор обрезать RTSP
if string(header) == "RTSP" {
if n, err := io.ReadFull(this.socket, sync); err != nil && n == 256 {
return
} else {
rtsp_rtp := []byte(strings.Split(string(sync), "\r\n\r\n")[1])
//отправим все что есть в буфере
this.SendBufer(rtsp_rtp)
continue
}
} else {
log.Println("full desync")
return
}
*/
}
payloadLen := (int)(header[2])<<8 + (int)(header[3])
//log.Println("payloadLen", payloadLen)
if payloadLen > 4096 || payloadLen < 12 {
log.Println("desync", this.uri, payloadLen)
return
}
if n, err := io.ReadFull(this.socket, payload[:payloadLen]); err != nil || n != payloadLen {
return
} else {
this.OutGoing <- append(header, payload[:n]...)
}
}
}
//unsafe!
func (this *RtspClient) SendBufer(bufer []byte) {
//тут надо отправлять все пакеты из буфера send all?
payload := make([]byte, 4096)
for {
if len(bufer) < 4 {
log.Fatal("bufer small")
}
dataLength := (int)(bufer[2])<<8 + (int)(bufer[3])
if dataLength > len(bufer)+4 {
if n, err := io.ReadFull(this.socket, payload[:dataLength-len(bufer)+4]); err != nil {
return
} else {
this.OutGoing <- append(bufer, payload[:n]...)
return
}
} else {
this.OutGoing <- bufer[:dataLength+4]
bufer = bufer[dataLength+4:]
}
}
}
func (this *RtspClient) Connect() bool {
d := &net.Dialer{Timeout: 3 * time.Second}
conn, err := d.Dial("tcp", this.host+":"+this.port)
if err != nil {
return false
}
this.socket = conn
return true
}
func (this *RtspClient) Write(message string) bool {
this.cseq += 1
if _, e := this.socket.Write([]byte(message)); e != nil {
return false
}
return true
}
func (this *RtspClient) Read() (bool, string) {
buffer := make([]byte, 4096)
if nb, err := this.socket.Read(buffer); err != nil || nb <= 0 {
log.Println("socket read failed", err)
return false, ""
} else {
return true, string(buffer[:nb])
}
}
func (this *RtspClient) AuthBasic(phase string, message string) bool {
this.bauth = "\r\nAuthorization: Basic " + b64.StdEncoding.EncodeToString([]byte(this.login+":"+this.password))
if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") {
return false
}
if status, message := this.Read(); status && strings.Contains(message, "200") {
this.track = ParseMedia(message)
return true
}
return false
}
func (this *RtspClient) AuthDigest(phase string, message string) bool {
nonce := ParseDirective(message, "nonce")
realm := ParseDirective(message, "realm")
hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password)
hs2 := GetMD5Hash(phase + ":" + this.uri)
responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2)
dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"`
if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + dauth + "\r\n\r\n") {
return false
}
if status, message := this.Read(); status && strings.Contains(message, "200") {
this.track = ParseMedia(message)
return true
}
return false
}
func (this *RtspClient) AuthDigest_Only(phase string, message string) string {
nonce := ParseDirective(message, "nonce")
realm := ParseDirective(message, "realm")
hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password)
hs2 := GetMD5Hash(phase + ":" + this.uri)
responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2)
dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"`
return dauth
}
func (this *RtspClient) ParseUrl(rtsp_url string) bool {
u, err := url.Parse(rtsp_url)
if err != nil {
return false
}
phost := strings.Split(u.Host, ":")
this.host = phost[0]
if len(phost) == 2 {
this.port = phost[1]
} else {
this.port = "554"
}
this.login = u.User.Username()
this.password, this.auth = u.User.Password()
if u.RawQuery != "" {
this.uri = "rtsp://" + this.host + ":" + this.port + u.Path + "?" + string(u.RawQuery)
} else {
this.uri = "rtsp://" + this.host + ":" + this.port + u.Path
}
return true
}
func (this *RtspClient) Close() {
if this.socket != nil {
this.socket.Close()
}
}
func ParseDirective(header, name string) string {
index := strings.Index(header, name)
if index == -1 {
return ""
}
start := 1 + index + strings.Index(header[index:], `"`)
end := start + strings.Index(header[start:], `"`)
return strings.TrimSpace(header[start:end])
}
func ParseSession(header string) string {
mparsed := strings.Split(header, "\r\n")
for _, element := range mparsed {
if strings.Contains(element, "Session:") {
if strings.Contains(element, ";") {
fist := strings.Split(element, ";")[0]
return fist[9:]
} else {
return element[9:]
}
}
}
return ""
}
func ParseMedia(header string) []string {
letters := []string{}
mparsed := strings.Split(header, "\r\n")
paste := ""
if true {
log.Println("headers", header)
}
for _, element := range mparsed {
if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") && strings.Contains(element, "tra") {
paste = element[10:]
if strings.Contains(element, "/") {
striped := strings.Split(element, "/")
paste = striped[len(striped)-1]
}
letters = append(letters, paste)
}
dimensionsPrefix := "a=x-dimensions:"
if strings.HasPrefix(element, dimensionsPrefix) {
dims := []int{}
for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") {
v := 0
fmt.Sscanf(s, "%d", &v)
if v <= 0 {
break
}
dims = append(dims, v)
}
if len(dims) == 2 {
VideoWidth = dims[0]
VideoHeight = dims[1]
}
}
}
return letters
}
func GetMD5Hash(text string) string {
hash := md5.Sum([]byte(text))
return hex.EncodeToString(hash[:])
}
func (this *RtspClient) ParseMedia(header string) []string {
letters := []string{}
mparsed := strings.Split(header, "\r\n")
paste := ""
for _, element := range mparsed {
if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") && strings.Contains(element, "tra") {
paste = element[10:]
if strings.Contains(element, "/") {
striped := strings.Split(element, "/")
paste = striped[len(striped)-1]
}
letters = append(letters, paste)
}
dimensionsPrefix := "a=x-dimensions:"
if strings.HasPrefix(element, dimensionsPrefix) {
dims := []int{}
for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") {
v := 0
fmt.Sscanf(s, "%d", &v)
if v <= 0 {
break
}
dims = append(dims, v)
}
if len(dims) == 2 {
this.videow = dims[0]
this.videoh = dims[1]
}
}
}
return letters
}

10
go.mod Normal file
View File

@@ -0,0 +1,10 @@
module github.com/Monibuca/rtspplugin
go 1.13
require (
github.com/Monibuca/engine v1.1.0
github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 // indirect
github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859
)

30
go.sum Normal file
View File

@@ -0,0 +1,30 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/engine v1.1.0 h1:X/dEUWpASCPESYx1cGXk5pp73Egiou5obGUEfrRZUdg=
github.com/Monibuca/engine v1.1.0/go.mod h1:NjqVgtXuRSOyk3+NWgCuDf2p7TsBisjYxoEwA9uCZ38=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8 h1:Bkx+0neYCcHW7BUeVCbR2GOn47NesdImh8nHHOKccD4=
github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8/go.mod h1:/JBZajtCDe9Z4j84v5QWo4PLn1K6jcBHh6qXN/bm/vw=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/pixelbender/go-sdp v1.0.0 h1:hLP2ALBN4sLpgp2r3EDcFUSN3AyOkg1jonuWEJniotY=
github.com/pixelbender/go-sdp v1.0.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY=
github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0=
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 h1:r1JUI0wuHlgRb8jNd3zPBBkjUdrjpVKr8SdJWc8ntg8=
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8/go.mod h1:RZd/IqzNpFANwOB9rVmsnAYpo/6KesK4PqrN1a5cRgg=
github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859 h1:vrlOUrBlpVmIvWsd8FhUwXWzdYqYcgFzbf8j1qPkGM8=
github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859/go.mod h1:OAvmouyIV28taMw4SC4+hSnouObQqQkTQNOhU3Zowl0=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

159
main.go Normal file
View File

@@ -0,0 +1,159 @@
package rtspplugin
import (
"bytes"
"log"
"strings"
. "github.com/Monibuca/engine"
. "github.com/Monibuca/engine/avformat"
"github.com/Monibuca/engine/util"
)
var config = struct {
BufferLength int
AutoPublish bool
RtspURL string
}{2048, true, "rtsp://localhost/${streamPath}"}
func init() {
InstallPlugin(&PluginConfig{
Name: "RTSP",
Version: "1.0.0",
Config: &config,
Run: func() {
if config.AutoPublish {
OnSubscribeHooks.AddHook(func(s *OutputStream) {
if s.Publisher == nil {
new(RTSP).Publish(s.StreamPath, strings.Replace(config.RtspURL, "${streamPath}", "s.StreamPath"))
}
})
}
},
})
}
type RTSP struct {
InputStream
*RtspClient
RTSPInfo
}
type RTSPInfo struct {
SyncCount int64
RoomInfo *RoomInfo
}
func (rtsp *RTSP) run() {
fuBuffer := []byte{}
iframeHead := []byte{0x17, 0x01, 0, 0, 0}
pframeHead := []byte{0x27, 0x01, 0, 0, 0}
spsHead := []byte{0xE1, 0, 0}
ppsHead := []byte{0x01, 0, 0}
nalLength := []byte{0, 0, 0, 0}
r := bytes.NewBuffer([]byte{})
av := NewAVPacket(FLV_TAG_TYPE_VIDEO)
handleNALU := func(nalType byte, payload []byte, ts int64) {
rtsp.SyncCount++
vl := len(payload)
switch nalType {
case NALU_SPS:
r.Write(RTMP_AVC_HEAD)
util.BigEndian.PutUint16(spsHead[1:], uint16(vl))
r.Write(spsHead)
r.Write(payload)
case NALU_PPS:
util.BigEndian.PutUint16(ppsHead[1:], uint16(vl))
r.Write(ppsHead)
r.Write(payload)
av.VideoFrameType = 1
av.Payload = r.Bytes()
rtsp.PushVideo(av)
case NALU_IDR_Picture:
av = NewAVPacket(FLV_TAG_TYPE_VIDEO)
r.Reset()
av.VideoFrameType = 1
av.Timestamp = uint32(ts)
util.BigEndian.PutUint24(iframeHead[2:], 0)
r.Write(iframeHead)
util.BigEndian.PutUint32(nalLength, uint32(vl))
r.Write(nalLength)
rtsp.PushVideo(av)
case NALU_Non_IDR_Picture:
av = NewAVPacket(FLV_TAG_TYPE_VIDEO)
r.Reset()
av.VideoFrameType = 2
av.Timestamp = uint32(ts)
util.BigEndian.PutUint24(pframeHead[2:], 0)
r.Write(iframeHead)
util.BigEndian.PutUint32(nalLength, uint32(vl))
r.Write(nalLength)
rtsp.PushVideo(av)
}
}
for {
select {
case <-rtsp.Done():
return
case data, ok := <-rtsp.OutGoing:
if ok && data[0] == 36 {
if data[1] == 0 {
cc := data[4] & 0xF
//rtp header
rtphdr := 12 + cc*4
//packet time
ts := (int64(data[8]) << 24) + (int64(data[9]) << 16) + (int64(data[10]) << 8) + (int64(data[11]))
//packet number
packno := (int64(data[6]) << 8) + int64(data[7])
if false {
log.Println("packet num", packno)
}
nalType := data[4+rtphdr] & 0x1F
if nalType >= 1 && nalType <= 23 {
handleNALU(nalType, data[4+rtphdr:], ts)
} else if nalType == 28 {
isStart := data[4+rtphdr+1]&0x80 != 0
isEnd := data[4+rtphdr+1]&0x40 != 0
nalType := data[4+rtphdr+1] & 0x1F
//nri := (data[4+rtphdr+1]&0x60)>>5
nal := data[4+rtphdr]&0xE0 | data[4+rtphdr+1]&0x1F
if isStart {
fuBuffer = []byte{0}
}
fuBuffer = append(fuBuffer, data[4+rtphdr+2:]...)
if isEnd {
fuBuffer[0] = nal
handleNALU(nalType, fuBuffer, ts)
}
}
} else if data[1] == 2 {
// audio
cc := data[4] & 0xF
rtphdr := 12 + cc*4
//or not payload := data[4+rtphdr:]
payload := data[4+rtphdr+4:]
av := NewAVPacket(FLV_TAG_TYPE_AUDIO)
av.Payload = payload
rtsp.PushAudio(av)
}
}
}
}
}
func (rtsp *RTSP) Publish(streamPath string, rtspUrl string) (result bool) {
if result = rtsp.InputStream.Publish(streamPath, rtsp); result {
rtsp.RTSPInfo.RoomInfo = &rtsp.Room.RoomInfo
rtsp.RtspClient = RtspClientNew()
if status, message := rtsp.RtspClient.Client(rtspUrl); !status {
log.Println(message)
return false
}
go rtsp.run()
}
return
}