commit e23acd0de7fe1201bcde6347bcdb99dfb81b6016 Author: 李宇翔 <178529795@qq.com> Date: Fri Mar 6 18:28:33 2020 +0800 开发中 diff --git a/README.md b/README.md new file mode 100644 index 0000000..d90972f --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# Monibuca 的RTSP 插件 + +主要功能是对RTSP地址进行拉流转换 + +## 插件名称 + +RTSP + +## 配置 + + +## 使用方法 \ No newline at end of file diff --git a/client.go b/client.go new file mode 100644 index 0000000..b6513bd --- /dev/null +++ b/client.go @@ -0,0 +1,521 @@ +package rtspplugin + +import ( + "crypto/md5" + b64 "encoding/base64" + "encoding/hex" + "fmt" + "io" + "log" + "net" + "net/url" + "strconv" + "strings" + "time" +) + +var ( + VideoWidth int + VideoHeight int +) + +type RtspClient struct { + socket net.Conn + OutGoing chan []byte //out chanel + Signals chan bool //Signals quit + host string //host + port string //port + uri string //url + auth bool //aut + login string + password string //password + session string //rtsp session + responce string //responce string + bauth string //string b auth + track []string //rtsp track + cseq int //qury number + videow int + videoh int +} + +//вернет пустой инициализированный обьект +func RtspClientNew() *RtspClient { + Obj := &RtspClient{ + cseq: 1, //стартовый номер запроса + Signals: make(chan bool, 1), //буферизируемый канал на 1 сообщение + OutGoing: make(chan []byte, 100000), //буферизиуемый канал на 100000 байт + } + return Obj +} + +//основная функция работы с rtsp +func (this *RtspClient) Client(rtsp_url string) (bool, string) { + //проверить и отпарсить url + if !this.ParseUrl(rtsp_url) { + return false, "Не верный url" + } + //установить подключение к камере + if !this.Connect() { + return false, "Не возможно подключиться" + } + //фаза 1 OPTIONS первый этап общения с камерой + //отправляем запрос OPTIONS + if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\n\r\n") { + return false, "Не возможно отправить сообщение OPTIONS" + } + //читаем ответ на запрос OPTIONS + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ OPTIONS соединение потеряно" + } else if status && strings.Contains(message, "Digest") { + if !this.AuthDigest("OPTIONS", message) { + return false, "Требуеться авторизация Digest" + } + } else if status && strings.Contains(message, "Basic") { + if !this.AuthBasic("OPTIONS", message) { + return false, "Требуеться авторизация Basic" + } + } else if !strings.Contains(message, "200") { + return false, "Ошибка OPTIONS not status code 200 OK " + message + } + + ////////////PHASE 2 DESCRIBE + log.Println("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") + if !this.Write("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") { + return false, "Не возможно отправть запрос DESCRIBE" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ DESCRIBE соединение потеряно ?" + } else if status && strings.Contains(message, "Digest") { + if !this.AuthDigest("DESCRIBE", message) { + return false, "Требуеться авторизация Digest" + } + } else if status && strings.Contains(message, "Basic") { + if !this.AuthBasic("DESCRIBE", message) { + return false, "Требуеться авторизация Basic" + } + } else if !strings.Contains(message, "200") { + return false, "Ошибка DESCRIBE not status code 200 OK " + message + } else { + log.Println(message) + this.track = this.ParseMedia(message) + + } + if len(this.track) == 0 { + return false, "Ошибка track not found " + } + //PHASE 3 SETUP + log.Println("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n") + if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ SETUP соединение потеряно" + + } else if !strings.Contains(message, "200") { + if strings.Contains(message, "401") { + str := this.AuthDigest_Only("SETUP", message) + if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + str + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ SETUP соединение потеряно" + + } else if !strings.Contains(message, "200") { + + return false, "Ошибка SETUP not status code 200 OK " + message + + } else { + this.session = ParseSession(message) + } + } else { + return false, "Ошибка SETUP not status code 200 OK " + message + } + } else { + log.Println(message) + this.session = ParseSession(message) + log.Println(this.session) + } + if len(this.track) > 1 { + + if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ SETUP Audio соединение потеряно" + + } else if !strings.Contains(message, "200") { + if strings.Contains(message, "401") { + str := this.AuthDigest_Only("SETUP", message) + if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + this.bauth + str + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ SETUP Audio соединение потеряно" + + } else if !strings.Contains(message, "200") { + + return false, "Ошибка SETUP not status code 200 OK " + message + + } else { + log.Println(message) + this.session = ParseSession(message) + } + } else { + return false, "Ошибка SETUP not status code 200 OK " + message + } + } else { + log.Println(message) + this.session = ParseSession(message) + } + } + + //PHASE 4 SETUP + log.Println("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") + if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ PLAY соединение потеряно" + + } else if !strings.Contains(message, "200") { + //return false, "Ошибка PLAY not status code 200 OK " + message + if strings.Contains(message, "401") { + str := this.AuthDigest_Only("PLAY", message) + if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + str + "\r\n\r\n") { + return false, "" + } + if status, message := this.Read(); !status { + return false, "Не возможно прочитать ответ PLAY соединение потеряно" + + } else if !strings.Contains(message, "200") { + + return false, "Ошибка PLAY not status code 200 OK " + message + + } else { + //this.session = ParseSession(message) + log.Print(message) + go this.RtspRtpLoop() + return true, "ok" + } + } else { + return false, "Ошибка PLAY not status code 200 OK " + message + } + } else { + log.Print(message) + go this.RtspRtpLoop() + return true, "ok" + } + return false, "other error" +} + +/* + The RTP header has the following format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + version (V): 2 bits + This field identifies the version of RTP. The version defined by + this specification is two (2). (The value 1 is used by the first + draft version of RTP and the value 0 is used by the protocol + initially implemented in the "vat" audio tool.) + padding (P): 1 bit + If the padding bit is set, the packet contains one or more + additional padding octets at the end which are not part of the + payload. The last octet of the padding contains a count of how + many padding octets should be ignored, including itself. Padding + may be needed by some encryption algorithms with fixed block sizes + or for carrying several RTP packets in a lower-layer protocol data + unit. + extension (X): 1 bit + If the extension bit is set, the fixed header MUST be followed by + exactly one header extension, with a format defined in Section + 5.3.1. +*/ +func (this *RtspClient) RtspRtpLoop() { + defer func() { + this.Signals <- true + }() + header := make([]byte, 4) + payload := make([]byte, 4096) + //sync := make([]byte, 256) + sync_b := make([]byte, 1) + timer := time.Now() + for { + if int(time.Now().Sub(timer).Seconds()) > 50 { + if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { + return + } + timer = time.Now() + } + this.socket.SetDeadline(time.Now().Add(50 * time.Second)) + //read rtp hdr 4 + if n, err := io.ReadFull(this.socket, header); err != nil || n != 4 { + //rtp hdr read error + return + } + //log.Println(header) + if header[0] != 36 { + //log.Println("desync?", this.host) + for { + ///////////////////////////skeep///////////////////////////////////// + if n, err := io.ReadFull(this.socket, sync_b); err != nil && n != 1 { + return + } else if sync_b[0] == 36 { + header[0] = 36 + if n, err := io.ReadFull(this.socket, header[1:]); err != nil && n == 3 { + return + } + break + } + } + /* + //вычитываем 256 в попытке отсять мусор обрезать RTSP + if string(header) == "RTSP" { + if n, err := io.ReadFull(this.socket, sync); err != nil && n == 256 { + return + } else { + rtsp_rtp := []byte(strings.Split(string(sync), "\r\n\r\n")[1]) + //отправим все что есть в буфере + this.SendBufer(rtsp_rtp) + continue + } + } else { + log.Println("full desync") + return + } + */ + } + + payloadLen := (int)(header[2])<<8 + (int)(header[3]) + //log.Println("payloadLen", payloadLen) + if payloadLen > 4096 || payloadLen < 12 { + log.Println("desync", this.uri, payloadLen) + return + } + if n, err := io.ReadFull(this.socket, payload[:payloadLen]); err != nil || n != payloadLen { + return + } else { + this.OutGoing <- append(header, payload[:n]...) + } + } + +} + +//unsafe! +func (this *RtspClient) SendBufer(bufer []byte) { + //тут надо отправлять все пакеты из буфера send all? + payload := make([]byte, 4096) + for { + if len(bufer) < 4 { + log.Fatal("bufer small") + } + dataLength := (int)(bufer[2])<<8 + (int)(bufer[3]) + if dataLength > len(bufer)+4 { + if n, err := io.ReadFull(this.socket, payload[:dataLength-len(bufer)+4]); err != nil { + return + } else { + this.OutGoing <- append(bufer, payload[:n]...) + return + } + + } else { + this.OutGoing <- bufer[:dataLength+4] + bufer = bufer[dataLength+4:] + } + } +} +func (this *RtspClient) Connect() bool { + d := &net.Dialer{Timeout: 3 * time.Second} + conn, err := d.Dial("tcp", this.host+":"+this.port) + if err != nil { + return false + } + this.socket = conn + return true +} +func (this *RtspClient) Write(message string) bool { + this.cseq += 1 + if _, e := this.socket.Write([]byte(message)); e != nil { + return false + } + return true +} +func (this *RtspClient) Read() (bool, string) { + buffer := make([]byte, 4096) + if nb, err := this.socket.Read(buffer); err != nil || nb <= 0 { + log.Println("socket read failed", err) + return false, "" + } else { + return true, string(buffer[:nb]) + } +} +func (this *RtspClient) AuthBasic(phase string, message string) bool { + this.bauth = "\r\nAuthorization: Basic " + b64.StdEncoding.EncodeToString([]byte(this.login+":"+this.password)) + if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") { + return false + } + if status, message := this.Read(); status && strings.Contains(message, "200") { + this.track = ParseMedia(message) + return true + } + return false +} +func (this *RtspClient) AuthDigest(phase string, message string) bool { + nonce := ParseDirective(message, "nonce") + realm := ParseDirective(message, "realm") + hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password) + hs2 := GetMD5Hash(phase + ":" + this.uri) + responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2) + dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"` + if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + dauth + "\r\n\r\n") { + return false + } + if status, message := this.Read(); status && strings.Contains(message, "200") { + this.track = ParseMedia(message) + return true + } + return false +} +func (this *RtspClient) AuthDigest_Only(phase string, message string) string { + nonce := ParseDirective(message, "nonce") + realm := ParseDirective(message, "realm") + hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password) + hs2 := GetMD5Hash(phase + ":" + this.uri) + responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2) + dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"` + return dauth +} +func (this *RtspClient) ParseUrl(rtsp_url string) bool { + + u, err := url.Parse(rtsp_url) + if err != nil { + return false + } + phost := strings.Split(u.Host, ":") + this.host = phost[0] + if len(phost) == 2 { + this.port = phost[1] + } else { + this.port = "554" + } + this.login = u.User.Username() + this.password, this.auth = u.User.Password() + if u.RawQuery != "" { + this.uri = "rtsp://" + this.host + ":" + this.port + u.Path + "?" + string(u.RawQuery) + } else { + this.uri = "rtsp://" + this.host + ":" + this.port + u.Path + } + return true +} +func (this *RtspClient) Close() { + if this.socket != nil { + this.socket.Close() + } +} +func ParseDirective(header, name string) string { + index := strings.Index(header, name) + if index == -1 { + return "" + } + start := 1 + index + strings.Index(header[index:], `"`) + end := start + strings.Index(header[start:], `"`) + return strings.TrimSpace(header[start:end]) +} +func ParseSession(header string) string { + mparsed := strings.Split(header, "\r\n") + for _, element := range mparsed { + if strings.Contains(element, "Session:") { + if strings.Contains(element, ";") { + fist := strings.Split(element, ";")[0] + return fist[9:] + } else { + return element[9:] + } + } + } + return "" +} +func ParseMedia(header string) []string { + letters := []string{} + mparsed := strings.Split(header, "\r\n") + paste := "" + + if true { + log.Println("headers", header) + } + + for _, element := range mparsed { + if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") && strings.Contains(element, "tra") { + paste = element[10:] + if strings.Contains(element, "/") { + striped := strings.Split(element, "/") + paste = striped[len(striped)-1] + } + letters = append(letters, paste) + } + + dimensionsPrefix := "a=x-dimensions:" + if strings.HasPrefix(element, dimensionsPrefix) { + dims := []int{} + for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") { + v := 0 + fmt.Sscanf(s, "%d", &v) + if v <= 0 { + break + } + dims = append(dims, v) + } + if len(dims) == 2 { + VideoWidth = dims[0] + VideoHeight = dims[1] + } + } + } + return letters +} +func GetMD5Hash(text string) string { + hash := md5.Sum([]byte(text)) + return hex.EncodeToString(hash[:]) +} +func (this *RtspClient) ParseMedia(header string) []string { + letters := []string{} + mparsed := strings.Split(header, "\r\n") + paste := "" + for _, element := range mparsed { + if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") && strings.Contains(element, "tra") { + paste = element[10:] + if strings.Contains(element, "/") { + striped := strings.Split(element, "/") + paste = striped[len(striped)-1] + } + letters = append(letters, paste) + } + + dimensionsPrefix := "a=x-dimensions:" + if strings.HasPrefix(element, dimensionsPrefix) { + dims := []int{} + for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") { + v := 0 + fmt.Sscanf(s, "%d", &v) + if v <= 0 { + break + } + dims = append(dims, v) + } + if len(dims) == 2 { + this.videow = dims[0] + this.videoh = dims[1] + } + } + } + return letters +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..85be43e --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/Monibuca/rtspplugin + +go 1.13 + +require ( + github.com/Monibuca/engine v1.1.0 + github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8 + github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 // indirect + github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1c6db6e --- /dev/null +++ b/go.sum @@ -0,0 +1,30 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Monibuca/engine v1.1.0 h1:X/dEUWpASCPESYx1cGXk5pp73Egiou5obGUEfrRZUdg= +github.com/Monibuca/engine v1.1.0/go.mod h1:NjqVgtXuRSOyk3+NWgCuDf2p7TsBisjYxoEwA9uCZ38= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8 h1:Bkx+0neYCcHW7BUeVCbR2GOn47NesdImh8nHHOKccD4= +github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8/go.mod h1:/JBZajtCDe9Z4j84v5QWo4PLn1K6jcBHh6qXN/bm/vw= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= +github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/pixelbender/go-sdp v1.0.0 h1:hLP2ALBN4sLpgp2r3EDcFUSN3AyOkg1jonuWEJniotY= +github.com/pixelbender/go-sdp v1.0.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY= +github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA= +github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= +github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 h1:r1JUI0wuHlgRb8jNd3zPBBkjUdrjpVKr8SdJWc8ntg8= +github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8/go.mod h1:RZd/IqzNpFANwOB9rVmsnAYpo/6KesK4PqrN1a5cRgg= +github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859 h1:vrlOUrBlpVmIvWsd8FhUwXWzdYqYcgFzbf8j1qPkGM8= +github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859/go.mod h1:OAvmouyIV28taMw4SC4+hSnouObQqQkTQNOhU3Zowl0= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..26a470a --- /dev/null +++ b/main.go @@ -0,0 +1,159 @@ +package rtspplugin + +import ( + "bytes" + "log" + "strings" + + . "github.com/Monibuca/engine" + . "github.com/Monibuca/engine/avformat" + "github.com/Monibuca/engine/util" +) + +var config = struct { + BufferLength int + AutoPublish bool + RtspURL string +}{2048, true, "rtsp://localhost/${streamPath}"} + +func init() { + InstallPlugin(&PluginConfig{ + Name: "RTSP", + Version: "1.0.0", + Config: &config, + Run: func() { + if config.AutoPublish { + OnSubscribeHooks.AddHook(func(s *OutputStream) { + if s.Publisher == nil { + new(RTSP).Publish(s.StreamPath, strings.Replace(config.RtspURL, "${streamPath}", "s.StreamPath")) + } + }) + } + }, + }) +} + +type RTSP struct { + InputStream + *RtspClient + RTSPInfo +} +type RTSPInfo struct { + SyncCount int64 + RoomInfo *RoomInfo +} + +func (rtsp *RTSP) run() { + + fuBuffer := []byte{} + iframeHead := []byte{0x17, 0x01, 0, 0, 0} + pframeHead := []byte{0x27, 0x01, 0, 0, 0} + spsHead := []byte{0xE1, 0, 0} + ppsHead := []byte{0x01, 0, 0} + nalLength := []byte{0, 0, 0, 0} + r := bytes.NewBuffer([]byte{}) + av := NewAVPacket(FLV_TAG_TYPE_VIDEO) + handleNALU := func(nalType byte, payload []byte, ts int64) { + rtsp.SyncCount++ + vl := len(payload) + switch nalType { + case NALU_SPS: + r.Write(RTMP_AVC_HEAD) + util.BigEndian.PutUint16(spsHead[1:], uint16(vl)) + r.Write(spsHead) + r.Write(payload) + case NALU_PPS: + util.BigEndian.PutUint16(ppsHead[1:], uint16(vl)) + r.Write(ppsHead) + r.Write(payload) + av.VideoFrameType = 1 + av.Payload = r.Bytes() + rtsp.PushVideo(av) + case NALU_IDR_Picture: + av = NewAVPacket(FLV_TAG_TYPE_VIDEO) + r.Reset() + av.VideoFrameType = 1 + av.Timestamp = uint32(ts) + util.BigEndian.PutUint24(iframeHead[2:], 0) + r.Write(iframeHead) + util.BigEndian.PutUint32(nalLength, uint32(vl)) + r.Write(nalLength) + rtsp.PushVideo(av) + case NALU_Non_IDR_Picture: + av = NewAVPacket(FLV_TAG_TYPE_VIDEO) + r.Reset() + av.VideoFrameType = 2 + av.Timestamp = uint32(ts) + util.BigEndian.PutUint24(pframeHead[2:], 0) + r.Write(iframeHead) + util.BigEndian.PutUint32(nalLength, uint32(vl)) + r.Write(nalLength) + rtsp.PushVideo(av) + } + } + for { + select { + case <-rtsp.Done(): + return + case data, ok := <-rtsp.OutGoing: + if ok && data[0] == 36 { + if data[1] == 0 { + cc := data[4] & 0xF + //rtp header + rtphdr := 12 + cc*4 + + //packet time + ts := (int64(data[8]) << 24) + (int64(data[9]) << 16) + (int64(data[10]) << 8) + (int64(data[11])) + + //packet number + packno := (int64(data[6]) << 8) + int64(data[7]) + if false { + log.Println("packet num", packno) + } + + nalType := data[4+rtphdr] & 0x1F + + if nalType >= 1 && nalType <= 23 { + handleNALU(nalType, data[4+rtphdr:], ts) + } else if nalType == 28 { + isStart := data[4+rtphdr+1]&0x80 != 0 + isEnd := data[4+rtphdr+1]&0x40 != 0 + nalType := data[4+rtphdr+1] & 0x1F + //nri := (data[4+rtphdr+1]&0x60)>>5 + nal := data[4+rtphdr]&0xE0 | data[4+rtphdr+1]&0x1F + if isStart { + fuBuffer = []byte{0} + } + fuBuffer = append(fuBuffer, data[4+rtphdr+2:]...) + if isEnd { + fuBuffer[0] = nal + handleNALU(nalType, fuBuffer, ts) + } + } + + } else if data[1] == 2 { + // audio + cc := data[4] & 0xF + rtphdr := 12 + cc*4 + //or not payload := data[4+rtphdr:] + payload := data[4+rtphdr+4:] + av := NewAVPacket(FLV_TAG_TYPE_AUDIO) + av.Payload = payload + rtsp.PushAudio(av) + } + } + } + } +} +func (rtsp *RTSP) Publish(streamPath string, rtspUrl string) (result bool) { + if result = rtsp.InputStream.Publish(streamPath, rtsp); result { + rtsp.RTSPInfo.RoomInfo = &rtsp.Room.RoomInfo + rtsp.RtspClient = RtspClientNew() + if status, message := rtsp.RtspClient.Client(rtspUrl); !status { + log.Println(message) + return false + } + go rtsp.run() + } + return +}