From 1873861e8b530b7e9a6ff21b8f2c86d8b40a446a Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 10 May 2020 18:28:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ERTSP=20server=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LICENSE | 21 ++ README.md | 8 +- client.go | 988 +++++++++++++++++++++++++------------------------- go.mod | 8 +- go.sum | 35 ++ main.go | 363 ++++++++++++------- request.go | 100 +++++ response.go | 51 +++ rtp-parser.go | 68 ++++ sdp-parser.go | 105 ++++++ session.go | 638 ++++++++++++++++++++++++++++++++ udp-client.go | 160 ++++++++ udp-server.go | 242 +++++++++++++ 13 files changed, 2158 insertions(+), 629 deletions(-) create mode 100644 LICENSE create mode 100644 request.go create mode 100644 response.go create mode 100644 rtp-parser.go create mode 100644 sdp-parser.go create mode 100644 session.go create mode 100644 udp-client.go create mode 100644 udp-server.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7f73d3d --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019-present, dexter + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index e1e1bdb..b3ef534 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Monibuca 的RTSP 插件 -主要功能是对RTSP地址进行拉流转换 +主要功能是提供RTSP的端口监听接受RTSP推流,以及对RTSP地址进行拉流转发 ## 插件名称 @@ -9,16 +9,18 @@ RTSP ## 配置 ```toml [RTSP] +ListenAddr = ":554" BufferLength = 2048 AutoPull = false RemoteAddr = "rtsp://localhost/${streamPath}" ``` +- ListenAddr 是监听端口,可以将rtsp流推到Monibuca中 - BufferLength是指解析拉取的rtp包的缓冲大小 -- AutoPull是指当有用户订阅一个新房间的时候自动向远程拉流转发 +- AutoPull是指当有用户订阅一个新流的时候自动向远程拉流转发 - RemoteAddr 指远程拉流地址,其中${streamPath}是占位符,实际使用流路径替换。 ## 使用方法(拉流转发) ```go -new(RTSP).Publish("live/user1","rtsp://xxx.xxx.xxx.xxx/live/user1") +new(RTSP).PullStream("live/user1","rtsp://xxx.xxx.xxx.xxx/live/user1") ``` \ No newline at end of file diff --git a/client.go b/client.go index d9d6512..1d62439 100644 --- a/client.go +++ b/client.go @@ -1,12 +1,14 @@ -package rtspplugin +package rtsp import ( + "bufio" + "bytes" "crypto/md5" - b64 "encoding/base64" - "encoding/hex" + "encoding/binary" "fmt" + . "github.com/Monibuca/engine/v2" + "github.com/pixelbender/go-sdp/sdp" "io" - "log" "net" "net/url" "regexp" @@ -15,532 +17,540 @@ import ( "time" ) -var ( - VideoWidth int - VideoHeight int - spropReg *regexp.Regexp - configReg *regexp.Regexp -) - -func init() { - spropReg, _ = regexp.Compile("sprop-parameter-sets=([^;]+)") - configReg, _ = regexp.Compile("config=([^;]+)") +// PullStream 从外部拉流 +func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (result bool) { + if result = rtsp.Publisher.Publish(streamPath); result { + rtsp.Stream.Type = "RTSP" + rtsp.RTSPInfo.StreamInfo = &rtsp.Stream.StreamInfo + rtsp.TransType = TRANS_TYPE_TCP + rtsp.vRTPChannel = 0 + rtsp.vRTPControlChannel = 1 + rtsp.aRTPChannel = 2 + rtsp.aRTPControlChannel = 3 + rtsp.URL = rtspUrl + if err := rtsp.requestStream();err != nil { + rtsp.Close() + return false + } + go rtsp.startStream() + collection.Store(streamPath, rtsp) + // go rtsp.run() + } + return } +func DigestAuth(authLine string, method string, URL string) (string, error) { + l, err := url.Parse(URL) + if err != nil { + return "", fmt.Errorf("Url parse error:%v,%v", URL, err) + } + realm := "" + nonce := "" + realmRex := regexp.MustCompile(`realm="(.*?)"`) + result1 := realmRex.FindStringSubmatch(authLine) -type RtspClient struct { - socket net.Conn - OutGoing chan []byte //out chanel - Signals chan bool //Signals quit - host string //host - port string //port - uri string //url - auth bool //aut - login string - password string //password - session string //rtsp session - responce string //responce string - bauth string //string b auth - track []string //rtsp track - cseq int //qury number - videow int - videoh int - SPS []byte - PPS []byte - Header string - AudioSpecificConfig []byte -} + nonceRex := regexp.MustCompile(`nonce="(.*?)"`) + result2 := nonceRex.FindStringSubmatch(authLine) -//RtspClientNew 返回空的初始化对象 -func RtspClientNew(bufferLength int) *RtspClient { - Obj := &RtspClient{ - cseq: 1, //查询起始号码 - Signals: make(chan bool, 1), //一个消息缓冲通道 - OutGoing: make(chan []byte, bufferLength), //输出通道 - } - return Obj -} - -func (this *RtspClient) Client(rtsp_url string) (bool, string) { - //Check back url - if !this.ParseUrl(rtsp_url) { - return false, "Incorrect url" - } - //Install connect to camera - if !this.Connect() { - return false, "cannot connect" - } - //Phase 1 options camera phase 1 - //Send options request - if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\n\r\n") { - return false, "Unable to send options message" - } - //Read the response to the options request - if status, message := this.Read(); !status { - return false, "Unable to read options response connection lost" - } else if status && strings.Contains(message, "Digest") { - if !this.AuthDigest("OPTIONS", message) { - return false, "Summary of authorization required" - } - } else if status && strings.Contains(message, "Basic") { - if !this.AuthBasic("OPTIONS", message) { - return false, "Need certification Basic" - } - } else if !strings.Contains(message, "200") { - return false, "error OPTIONS not status code 200 OK " + message - } - - ////////////PHASE 2 DESCRIBE - log.Println("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") - if !this.Write("DESCRIBE " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") { - return false, "Unable to send query DESCRIBE" - } - if status, message := this.Read(); !status { - return false, "Can't read response for decscribe connection loss?" - } else if status && strings.Contains(message, "Digest") { - if !this.AuthDigest("DESCRIBE", message) { - return false, "Summary of authorization required" - } - } else if status && strings.Contains(message, "Basic") { - if !this.AuthBasic("DESCRIBE", message) { - return false, "Basis of authorization required" - } - } else if !strings.Contains(message, "200") { - return false, "error DESCRIBE not status code 200 OK " + message + if len(result1) == 2 { + realm = result1[1] } else { - this.Header = message - this.track = this.ParseMedia(message) - + return "", fmt.Errorf("auth error : no realm found") } - if len(this.track) == 0 { - return false, "error track not found " - } - //PHASE 3 SETUP - log.Println("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n") - if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + "\r\n\r\n") { - return false, "" - } - if status, message := this.Read(); !status { - return false, "Unable to read response for missing setup connection." - - } else if !strings.Contains(message, "200") { - if strings.Contains(message, "401") { - str := this.AuthDigest_Only("SETUP", message) - if !this.Write("SETUP " + this.uri + "/" + this.track[0] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1" + this.bauth + str + "\r\n\r\n") { - return false, "" - } - if status, message := this.Read(); !status { - return false, "Unable to read response for missing setup connection." - - } else if !strings.Contains(message, "200") { - - return false, "error SETUP not status code 200 OK " + message - - } else { - this.session = ParseSession(message) - } - } else { - return false, "error SETUP not status code 200 OK " + message - } + if len(result2) == 2 { + nonce = result2[1] } else { - log.Println(message) - this.session = ParseSession(message) - log.Println(this.session) + return "", fmt.Errorf("auth error : no nonce found") } - if len(this.track) > 1 { + // response= md5(md5(username:realm:password):nonce:md5(public_method:url)); + username := l.User.Username() + password, _ := l.User.Password() + l.User = nil + if l.Port() == "" { + l.Host = fmt.Sprintf("%s:%s", l.Host, "554") + } + md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, password)))) + md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, l.String())))) - if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { - return false, "" - } - if status, message := this.Read(); !status { - return false, "Unable to read response for missing setup audio connection." - - } else if !strings.Contains(message, "200") { - if strings.Contains(message, "401") { - str := this.AuthDigest_Only("SETUP", message) - if !this.Write("SETUP " + this.uri + "/" + this.track[1] + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3" + this.bauth + str + "\r\n\r\n") { - return false, "" + response := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL)))) + Authorization := fmt.Sprintf("Digest username=\"%s\", realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"", username, realm, nonce, l.String(), response) + return Authorization, nil +} +func (client *RTSP) checkAuth(method string, resp *Response) (string, error) { + if resp.StatusCode == 401 { + // need auth. + AuthHeaders := resp.Header["WWW-Authenticate"] + auths, ok := AuthHeaders.([]string) + if ok { + for _, authLine := range auths { + if strings.IndexAny(authLine, "Digest") == 0 { + // realm="HipcamRealServer", + // nonce="3b27a446bfa49b0c48c3edb83139543d" + client.authLine = authLine + return DigestAuth(authLine, method, client.URL) + } else if strings.IndexAny(authLine, "Basic") == 0 { + // not support yet + // TODO.. } - if status, message := this.Read(); !status { - return false, "Unable to read response for missing setup audio connection." + } + return "", fmt.Errorf("auth error") + } else { + authLine, _ := AuthHeaders.(string) + if strings.IndexAny(authLine, "Digest") == 0 { + client.authLine = authLine + return DigestAuth(authLine, method, client.URL) + } else if strings.IndexAny(authLine, "Basic") == 0 { + // not support yet + // TODO.. + return "", fmt.Errorf("not support Basic auth yet") + } + } + } + return "", nil +} +func (client *RTSP) requestStream() (err error) { + timeout := time.Duration(5) * time.Second + l, err := url.Parse(client.URL) + if err != nil { + return err + } + if strings.ToLower(l.Scheme) != "rtsp" { + err = fmt.Errorf("RTSP url is invalid") + return err + } + if strings.ToLower(l.Hostname()) == "" { + err = fmt.Errorf("RTSP url is invalid") + return err + } + port := l.Port() + if len(port) == 0 { + port = "554" + } + conn, err := net.DialTimeout("tcp", l.Hostname()+":"+port, timeout) + if err != nil { + // handle error + return err + } - } else if !strings.Contains(message, "200") { + networkBuffer := 204800 - return false, "error SETUP not status code 200 OK " + message + timeoutConn := RichConn{ + conn, + timeout, + } + client.Conn = &timeoutConn + client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) - } else { - log.Println(message) - this.session = ParseSession(message) + headers := make(map[string]string) + headers["Require"] = "implicit-play" + // An OPTIONS request returns the request types the server will accept. + resp, err := client.Request("OPTIONS", headers) + if err != nil { + if resp != nil { + Authorization, _ := client.checkAuth("OPTIONS", resp) + if len(Authorization) > 0 { + headers := make(map[string]string) + headers["Require"] = "implicit-play" + headers["Authorization"] = Authorization + // An OPTIONS request returns the request types the server will accept. + resp, err = client.Request("OPTIONS", headers) + } + if err != nil { + return err + } + } else { + return err + } + } + + // A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description, + // typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL. + // In the typical case, there is one media stream each for audio and video. + headers = make(map[string]string) + headers["Accept"] = "application/sdp" + resp, err = client.Request("DESCRIBE", headers) + if err != nil { + if resp != nil { + authorization, _ := client.checkAuth("DESCRIBE", resp) + if len(authorization) > 0 { + headers := make(map[string]string) + headers["Authorization"] = authorization + headers["Accept"] = "application/sdp" + resp, err = client.Request("DESCRIBE", headers) + } + if err != nil { + return err + } + } else { + return err + } + } + _sdp, err := sdp.ParseString(resp.Body) + if err != nil { + return err + } + client.Sdp = _sdp + client.SDPRaw = resp.Body + client.SDPMap = ParseSDP(client.SDPRaw) + session := "" + for _, media := range _sdp.Media { + switch media.Type { + case "video": + client.VControl = media.Attributes.Get("control") + client.VCodec = media.Format[0].Name + client.SPS = client.SDPMap["video"].SpropParameterSets[0] + client.PPS = client.SDPMap["video"].SpropParameterSets[1] + var _url = "" + if strings.Index(strings.ToLower(client.VControl), "rtsp://") == 0 { + _url = client.VControl + } else { + _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/") + } + headers = make(map[string]string) + if client.TransType == TRANS_TYPE_TCP { + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) + } else { + if client.UDPServer == nil { + client.UDPServer = &UDPServer{Session: client} } - } else { - return false, "error SETUP not status code 200 OK " + message + //RTP/AVP;unicast;client_port=64864-64865 + err = client.UDPServer.SetupVideo() + if err != nil { + Printf("Setup video err.%v", err) + return err + } + headers["Transport"] = fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort) + client.Conn.timeout = 0 // UDP ignore timeout } - } else { - log.Println(message) - this.session = ParseSession(message) + if session != "" { + headers["Session"] = session + } + Printf("Parse DESCRIBE response, VIDEO VControl:%s, VCode:%s, url:%s,Session:%s,vRTPChannel:%d,vRTPControlChannel:%d", client.VControl, client.VCodec, _url, session, client.vRTPChannel, client.vRTPControlChannel) + resp, err = client.RequestWithPath("SETUP", _url, headers, true) + if err != nil { + return err + } + session, _ = resp.Header["Session"].(string) + case "audio": + client.AControl = media.Attributes.Get("control") + client.ACodec = media.Format[0].Name + client.AudioSpecificConfig = client.SDPMap["audio"].Config + var _url = "" + if strings.Index(strings.ToLower(client.AControl), "rtsp://") == 0 { + _url = client.AControl + } else { + _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/") + } + headers = make(map[string]string) + if client.TransType == TRANS_TYPE_TCP { + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) + } else { + if client.UDPServer == nil { + client.UDPServer = &UDPServer{Session: client} + } + err = client.UDPServer.SetupAudio() + if err != nil { + Printf("Setup audio err.%v", err) + return err + } + headers["Transport"] = fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort) + client.Conn.timeout = 0 // UDP ignore timeout + } + if session != "" { + headers["Session"] = session + } + Printf("Parse DESCRIBE response, AUDIO AControl:%s, ACodec:%s, url:%s,Session:%s, aRTPChannel:%d,aRTPControlChannel:%d", client.AControl, client.ACodec, _url, session, client.aRTPChannel, client.aRTPControlChannel) + resp, err = client.RequestWithPath("SETUP", _url, headers, true) + if err != nil { + return err + } + session, _ = resp.Header["Session"].(string) } } - - //PHASE 4 SETUP - log.Println("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") - if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { - return false, "" + headers = make(map[string]string) + if session != "" { + headers["Session"] = session } - if status, message := this.Read(); !status { - return false, "Unable to read play response lost connection" - - } else if !strings.Contains(message, "200") { - //return false, "Ошибка PLAY not status code 200 OK " + message - if strings.Contains(message, "401") { - str := this.AuthDigest_Only("PLAY", message) - if !this.Write("PLAY " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + str + "\r\n\r\n") { - return false, "" - } - if status, message := this.Read(); !status { - return false, "Unable to read play response lost connection" - - } else if !strings.Contains(message, "200") { - - return false, "error PLAY not status code 200 OK " + message - - } else { - //this.session = ParseSession(message) - log.Print(message) - go this.RtspRtpLoop() - return true, "ok" - } - } else { - return false, "error PLAY not status code 200 OK " + message - } - } else { - log.Print(message) - go this.RtspRtpLoop() - return true, "ok" + resp, err = client.Request("PLAY", headers) + if err != nil { + return err } - return false, "other error" + return nil } -/* - The RTP header has the following format: - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - |V=2|P|X| CC |M| PT | sequence number | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | timestamp | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | synchronization source (SSRC) identifier | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | contributing source (CSRC) identifiers | - | .... | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - version (V): 2 bits - This field identifies the version of RTP. The version defined by - this specification is two (2). (The value 1 is used by the first - draft version of RTP and the value 0 is used by the protocol - initially implemented in the "vat" audio tool.) - padding (P): 1 bit - If the padding bit is set, the packet contains one or more - additional padding octets at the end which are not part of the - payload. The last octet of the padding contains a count of how - many padding octets should be ignored, including itself. Padding - may be needed by some encryption algorithms with fixed block sizes - or for carrying several RTP packets in a lower-layer protocol data - unit. - extension (X): 1 bit - If the extension bit is set, the fixed header MUST be followed by - exactly one header extension, with a format defined in Section - 5.3.1. -*/ -func (this *RtspClient) RtspRtpLoop() { - defer func() { - this.Signals <- true - }() - header := make([]byte, 4) - payload := make([]byte, 4096) - //sync := make([]byte, 256) - sync_b := make([]byte, 1) - timer := time.Now() +func (client *RTSP) startStream() { + //startTime := time.Now() + //loggerTime := time.Now().Add(-10 * time.Second) + defer client.Stop() for { - if int(time.Now().Sub(timer).Seconds()) > 50 { - if !this.Write("OPTIONS " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + "\r\nSession: " + this.session + this.bauth + "\r\n\r\n") { - return - } - timer = time.Now() - } - this.socket.SetDeadline(time.Now().Add(50 * time.Second)) - //read rtp hdr 4 - if n, err := io.ReadFull(this.socket, header); err != nil || n != 4 { - //rtp hdr read error + //if client.OptionIntervalMillis > 0 { + // if time.Since(startTime) > time.Duration(client.OptionIntervalMillis)*time.Millisecond { + // startTime = time.Now() + // headers := make(map[string]string) + // headers["Require"] = "implicit-play" + // // An OPTIONS request returns the request types the server will accept. + // if err := client.RequestNoResp("OPTIONS", headers); err != nil { + // // ignore... + // } + // } + //} + b, err := client.connRW.ReadByte() + if err != nil { + Printf("client.connRW.ReadByte err:%v", err) return } - //log.Println(header) - if header[0] != 36 { - //log.Println("desync?", this.host) + switch b { + case 0x24: // rtp + header := make([]byte, 4) + header[0] = b + _, err := io.ReadFull(client.connRW, header[1:]) + if err != nil { + Printf("io.ReadFull err:%v", err) + return + } + channel := int(header[1]) + length := binary.BigEndian.Uint16(header[2:]) + content := make([]byte, length) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + Printf("io.ReadFull err:%v", err) + return + } + rtpBuf := content + var pack *RTPPack + switch channel { + case client.aRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIO, + Buffer: rtpBuf, + } + case client.aRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIOCONTROL, + Buffer: rtpBuf, + } + case client.vRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEO, + Buffer: rtpBuf, + } + case client.vRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEOCONTROL, + Buffer: rtpBuf, + } + default: + Printf("unknow rtp pack type, channel:%v", channel) + continue + } + if pack == nil { + Printf("session tcp got nil rtp pack") + continue + } + + //if client.debugLogEnable { + // rtp := ParseRTP(pack.Buffer) + // if rtp != nil { + // rtpSN := uint16(rtp.SequenceNumber) + // if client.lastRtpSN != 0 && client.lastRtpSN+1 != rtpSN { + // Printf("%s, %d packets lost, current SN=%d, last SN=%d\n", client.String(), rtpSN-client.lastRtpSN, rtpSN, client.lastRtpSN) + // } + // client.lastRtpSN = rtpSN + // } + // + // elapsed := time.Now().Sub(loggerTime) + // if elapsed >= 30*time.Second { + // Printf("%v read rtp frame.", client) + // loggerTime = time.Now() + // } + //} + + client.InBytes += int(length + 4) + client.handleRTP(pack) + + default: // rtsp + builder := bytes.Buffer{} + builder.WriteByte(b) + contentLen := 0 for { - ///////////////////////////skeep///////////////////////////////////// - if n, err := io.ReadFull(this.socket, sync_b); err != nil && n != 1 { + line, prefix, err := client.connRW.ReadLine() + if err != nil { + Printf("client.connRW.ReadLine err:%v", err) return - } else if sync_b[0] == 36 { - header[0] = 36 - if n, err := io.ReadFull(this.socket, header[1:]); err != nil && n == 3 { - return + } + if len(line) == 0 { + if contentLen != 0 { + content := make([]byte, contentLen) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) + return + } + builder.Write(content) } + Printf("<<<[IN]\n%s", builder.String()) break } - } - /* - //вычитываем 256 в попытке отсять мусор обрезать RTSP - if string(header) == "RTSP" { - if n, err := io.ReadFull(this.socket, sync); err != nil && n == 256 { - return - } else { - rtsp_rtp := []byte(strings.Split(string(sync), "\r\n\r\n")[1]) - //отправим все что есть в буфере - this.SendBufer(rtsp_rtp) - continue - } - } else { - log.Println("full desync") - return + s := string(line) + builder.Write(line) + if !prefix { + builder.WriteString("\r\n") } - */ - } - payloadLen := (int)(header[2])<<8 + (int)(header[3]) - //log.Println("payloadLen", payloadLen) - if payloadLen > 4096 || payloadLen < 12 { - log.Println("desync", this.uri, payloadLen) - return - } - if n, err := io.ReadFull(this.socket, payload[:payloadLen]); err != nil || n != payloadLen { - return - } else { - this.OutGoing <- append(header, payload[:n]...) + if strings.Index(s, "Content-Length:") == 0 { + splits := strings.Split(s, ":") + contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) + if err != nil { + Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) + return + } + } + } } } - } -//unsafe! -func (this *RtspClient) SendBufer(bufer []byte) { - //Here you need to send all the packages from the send all buffer? - payload := make([]byte, 4096) +func (client *RTSP) Request(method string, headers map[string]string) (*Response, error) { + l, err := url.Parse(client.URL) + if err != nil { + return nil, fmt.Errorf("Url parse error:%v", err) + } + l.User = nil + return client.RequestWithPath(method, l.String(), headers, true) +} + +func (client *RTSP) RequestNoResp(method string, headers map[string]string) (err error) { + l, err := url.Parse(client.URL) + if err != nil { + return fmt.Errorf("Url parse error:%v", err) + } + l.User = nil + if _, err = client.RequestWithPath(method, l.String(), headers, false); err != nil { + return err + } + return nil +} + +func (client *RTSP) RequestWithPath(method string, path string, headers map[string]string, needResp bool) (resp *Response, err error) { + headers["User-Agent"] = client.Agent + if len(headers["Authorization"]) == 0 { + if len(client.authLine) != 0 { + Authorization, _ := DigestAuth(client.authLine, method, client.URL) + if len(Authorization) > 0 { + headers["Authorization"] = Authorization + } + } + } + if len(client.Session) > 0 { + headers["Session"] = client.Session + } + client.Seq++ + cseq := client.Seq + builder := bytes.Buffer{} + builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, path)) + builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq)) + for k, v := range headers { + builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)) + } + builder.WriteString(fmt.Sprintf("\r\n")) + s := builder.String() + Printf("[OUT]>>>\n%s", s) + _, err = client.connRW.WriteString(s) + if err != nil { + return + } + client.connRW.Flush() + + if !needResp { + return nil, nil + } + lineCount := 0 + statusCode := 200 + status := "" + sid := "" + contentLen := 0 + respHeader := make(map[string]interface{}) + var line []byte + builder.Reset() for { - if len(bufer) < 4 { - log.Fatal("bufer small") + isPrefix := false + if line, isPrefix, err = client.connRW.ReadLine(); err != nil { + return } - dataLength := (int)(bufer[2])<<8 + (int)(bufer[3]) - if dataLength > len(bufer)+4 { - if n, err := io.ReadFull(this.socket, payload[:dataLength-len(bufer)+4]); err != nil { - return - } else { - this.OutGoing <- append(bufer, payload[:n]...) - return - } - - } else { - this.OutGoing <- bufer[:dataLength+4] - bufer = bufer[dataLength+4:] + s := string(line) + builder.Write(line) + if !isPrefix { + builder.WriteString("\r\n") } - } -} -func (this *RtspClient) Connect() bool { - d := &net.Dialer{Timeout: 3 * time.Second} - conn, err := d.Dial("tcp", this.host+":"+this.port) - if err != nil { - return false - } - this.socket = conn - return true -} -func (this *RtspClient) Write(message string) bool { - this.cseq += 1 - if _, e := this.socket.Write([]byte(message)); e != nil { - return false - } - return true -} -func (this *RtspClient) Read() (bool, string) { - buffer := make([]byte, 4096) - if nb, err := this.socket.Read(buffer); err != nil || nb <= 0 { - log.Println("socket read failed", err) - return false, "" - } else { - return true, string(buffer[:nb]) - } -} -func (this *RtspClient) AuthBasic(phase string, message string) bool { - this.bauth = "\r\nAuthorization: Basic " + b64.StdEncoding.EncodeToString([]byte(this.login+":"+this.password)) - if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + this.bauth + "\r\n\r\n") { - return false - } - if status, message := this.Read(); status && strings.Contains(message, "200") { - this.track = this.ParseMedia(message) - return true - } - return false -} -func (this *RtspClient) AuthDigest(phase string, message string) bool { - nonce := ParseDirective(message, "nonce") - realm := ParseDirective(message, "realm") - hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password) - hs2 := GetMD5Hash(phase + ":" + this.uri) - responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2) - dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"` - if !this.Write(phase + " " + this.uri + " RTSP/1.0\r\nCSeq: " + strconv.Itoa(this.cseq) + dauth + "\r\n\r\n") { - return false - } - if status, message := this.Read(); status && strings.Contains(message, "200") { - this.track = this.ParseMedia(message) - return true - } - return false -} -func (this *RtspClient) AuthDigest_Only(phase string, message string) string { - nonce := ParseDirective(message, "nonce") - realm := ParseDirective(message, "realm") - hs1 := GetMD5Hash(this.login + ":" + realm + ":" + this.password) - hs2 := GetMD5Hash(phase + ":" + this.uri) - responce := GetMD5Hash(hs1 + ":" + nonce + ":" + hs2) - dauth := "\r\n" + `Authorization: Digest username="` + this.login + `", realm="` + realm + `", nonce="` + nonce + `", uri="` + this.uri + `", response="` + responce + `"` - return dauth -} -func (this *RtspClient) ParseUrl(rtsp_url string) bool { - - u, err := url.Parse(rtsp_url) - if err != nil { - return false - } - phost := strings.Split(u.Host, ":") - this.host = phost[0] - if len(phost) == 2 { - this.port = phost[1] - } else { - this.port = "554" - } - this.login = u.User.Username() - this.password, this.auth = u.User.Password() - if u.RawQuery != "" { - this.uri = "rtsp://" + this.host + ":" + this.port + u.Path + "?" + string(u.RawQuery) - } else { - this.uri = "rtsp://" + this.host + ":" + this.port + u.Path - } - return true -} -func (this *RtspClient) Close() { - if this.socket != nil { - this.socket.Close() - } -} -func ParseDirective(header, name string) string { - index := strings.Index(header, name) - if index == -1 { - return "" - } - start := 1 + index + strings.Index(header[index:], `"`) - end := start + strings.Index(header[start:], `"`) - return strings.TrimSpace(header[start:end]) -} -func ParseSession(header string) string { - mparsed := strings.Split(header, "\r\n") - for _, element := range mparsed { - if strings.Contains(element, "Session:") { - if strings.Contains(element, ";") { - fist := strings.Split(element, ";")[0] - return fist[9:] - } else { - return element[9:] - } - } - } - return "" -} - -// func ParseMedia(header string) []string { -// letters := []string{} -// mparsed := strings.Split(header, "\r\n") -// paste := "" - -// // if true { -// // log.Println("headers", header) -// // } - -// for _, element := range mparsed { -// if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") { -// paste = element[10:] -// if strings.Contains(element, "/") { -// striped := strings.Split(element, "/") -// paste = striped[len(striped)-1] -// } -// letters = append(letters, paste) -// } - -// dimensionsPrefix := "a=x-dimensions:" -// if strings.HasPrefix(element, dimensionsPrefix) { -// dims := []int{} -// for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") { -// v := 0 -// fmt.Sscanf(s, "%d", &v) -// if v <= 0 { -// break -// } -// dims = append(dims, v) -// } -// if len(dims) == 2 { -// VideoWidth = dims[0] -// VideoHeight = dims[1] -// } -// } -// if strings.Contains(element, "sprop-parameter-sets") { -// group := spropReg.FindAllStringSubmatch(element, -1) -// log.Println(group[1]) -// } -// } -// return letters -// } -func GetMD5Hash(text string) string { - hash := md5.Sum([]byte(text)) - return hex.EncodeToString(hash[:]) -} -func (this *RtspClient) ParseMedia(header string) []string { - letters := []string{} - log.Println(header) - mparsed := strings.Split(header, "\r\n") - paste := "" - for _, element := range mparsed { - if strings.Contains(element, "a=control:") && !strings.Contains(element, "*") { - paste = element[10:] - if strings.Contains(element, "/") { - striped := strings.Split(element, "/") - paste = striped[len(striped)-1] - } - letters = append(letters, paste) - } - - dimensionsPrefix := "a=x-dimensions:" - if strings.HasPrefix(element, dimensionsPrefix) { - dims := []int{} - for _, s := range strings.Split(element[len(dimensionsPrefix):], ",") { - v := 0 - fmt.Sscanf(s, "%d", &v) - if v <= 0 { - break + if len(line) == 0 { + body := "" + if contentLen > 0 { + content := make([]byte, contentLen) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) + return } - dims = append(dims, v) + body = string(content) + builder.Write(content) } - if len(dims) == 2 { - this.videow = dims[0] - this.videoh = dims[1] + resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, body) + resp.Header = respHeader + Printf("<<<[IN]\n%s", builder.String()) + + if !(statusCode >= 200 && statusCode <= 300) { + err = fmt.Errorf("Response StatusCode is :%d", statusCode) + return + } + return + } + if lineCount == 0 { + splits := strings.Split(s, " ") + if len(splits) < 3 { + err = fmt.Errorf("StatusCode Line error:%s", s) + return + } + statusCode, err = strconv.Atoi(splits[1]) + if err != nil { + return + } + status = splits[2] + } + lineCount++ + splits := strings.Split(s, ":") + if len(splits) == 2 { + if val, ok := respHeader[splits[0]]; ok { + if slice, ok2 := val.([]string); ok2 { + slice = append(slice, strings.TrimSpace(splits[1])) + respHeader[splits[0]] = slice + } else { + str, _ := val.(string) + slice := []string{str, strings.TrimSpace(splits[1])} + respHeader[splits[0]] = slice + } + } else { + respHeader[splits[0]] = strings.TrimSpace(splits[1]) } } - group := spropReg.FindAllStringSubmatch(element, -1) - if len(group) > 0 { - group := strings.Split(group[0][1], ",") - this.SPS, _ = b64.StdEncoding.DecodeString(group[0]) - this.PPS, _ = b64.StdEncoding.DecodeString(group[1]) - } else if group = configReg.FindAllStringSubmatch(element, -1); len(group) > 0 { - this.AudioSpecificConfig, _ = hex.DecodeString(group[0][1]) + if strings.Index(s, "Session:") == 0 { + splits := strings.Split(s, ":") + sid = strings.TrimSpace(splits[1]) } + //if strings.Index(s, "CSeq:") == 0 { + // splits := strings.Split(s, ":") + // cseq, err = strconv.Atoi(strings.TrimSpace(splits[1])) + // if err != nil { + // err = fmt.Errorf("Atoi CSeq err. line:%s", s) + // return + // } + //} + if strings.Index(s, "Content-Length:") == 0 { + splits := strings.Split(s, ":") + contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) + if err != nil { + return + } + } + } - return letters + return } diff --git a/go.mod b/go.mod index ac9af09..831d5ee 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,10 @@ module github.com/Monibuca/plugin-rtsp go 1.13 require ( - github.com/Monibuca/engine v1.2.1 - github.com/Monibuca/engine/v2 v2.0.0 // indirect + github.com/EasyDarwin/EasyDarwin v8.1.0+incompatible // indirect + github.com/Monibuca/engine/v2 v2.0.0 + github.com/jinzhu/gorm v1.9.12 // indirect + github.com/pixelbender/go-sdp v1.0.0 + github.com/reactivex/rxgo v1.0.0 // indirect + github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf ) diff --git a/go.sum b/go.sum index 6178bf2..66d00ab 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/EasyDarwin/EasyDarwin v8.1.0+incompatible h1:Rr8dRbZtcJhiJvGx5Vs7IENM6RUUwGkZiIj5+WrNhm8= +github.com/EasyDarwin/EasyDarwin v8.1.0+incompatible/go.mod h1:xnmC+Q2+wugEDpQGxivSFNYPOhmNlIQHBfl0hMeriSU= github.com/Monibuca/engine v1.2.1 h1:TJmC6eZA1lR1MScWgempZLiEZD4T6aY/nn/rlQ9UdK8= github.com/Monibuca/engine v1.2.1/go.mod h1:WbDkXENLjcPjyjCR1Mix1GA+uAlwORkv/+8aMVrDX2g= github.com/Monibuca/engine v1.2.2 h1:hNjsrZpOmui8lYhgCJ5ltJU8g/k0Rrdysx2tHNGGnbI= @@ -9,6 +11,10 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUW github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM= +github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= +github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= +github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8 h1:Bkx+0neYCcHW7BUeVCbR2GOn47NesdImh8nHHOKccD4= github.com/falconray0704/gortmp v0.0.0-20170613085150-e3f9bb02c7c8/go.mod h1:/JBZajtCDe9Z4j84v5QWo4PLn1K6jcBHh6qXN/bm/vw= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= @@ -17,20 +23,38 @@ github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1B github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/jinzhu/gorm v1.9.12 h1:Drgk1clyWT9t9ERbzHza6Mj/8FY/CqMyVzOiHviMo6Q= +github.com/jinzhu/gorm v1.9.12/go.mod h1:vhTjlKSJUTWNtcbQtrMBFCxy7eXTzeCAzfL5fBZT/Qs= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.0.1 h1:HjfetcXq097iXP0uoPCdnM4Efp5/9MsM0/M+XOTeR3M= +github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= +github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-sqlite3 v2.0.1+incompatible h1:xQ15muvnzGBHpIpdrNi1DA5x0+TcBZzsIDwmw9uTHzw= +github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/pixelbender/go-sdp v1.0.0 h1:hLP2ALBN4sLpgp2r3EDcFUSN3AyOkg1jonuWEJniotY= github.com/pixelbender/go-sdp v1.0.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/reactivex/rxgo v1.0.0 h1:qpT8/kVwAJDSeGsqx4oUXxgk3UCtAq/EreBGWYRxEcA= +github.com/reactivex/rxgo v1.0.0/go.mod h1:/S1ygE20oE1BvZGIwd3fXx/m6s6pOX5G6zmXg9ninlQ= github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY= github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= @@ -38,9 +62,20 @@ github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ7 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8/go.mod h1:RZd/IqzNpFANwOB9rVmsnAYpo/6KesK4PqrN1a5cRgg= github.com/zhangpeihao/log v0.0.0-20170117094621-62e921e41859/go.mod h1:OAvmouyIV28taMw4SC4+hSnouObQqQkTQNOhU3Zowl0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd h1:GGJVjV8waZKRHrgwvtH66z9ZGVurTD1MT0n1Bb+q4aM= +golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/main.go b/main.go index 121c472..f53cb14 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,11 @@ -package rtspplugin +package rtsp import ( + "bufio" "bytes" "fmt" "log" + "net" "net/http" "strings" "sync" @@ -12,14 +14,18 @@ import ( . "github.com/Monibuca/engine/v2" . "github.com/Monibuca/engine/v2/avformat" "github.com/Monibuca/engine/v2/util" + "github.com/pixelbender/go-sdp/sdp" + "github.com/teris-io/shortid" ) var collection = sync.Map{} var config = struct { + ListenAddr string BufferLength int AutoPull bool RemoteAddr string -}{2048, true, "rtsp://localhost/${streamPath}"} + Timeout int +}{":554", 2048, true, "rtsp://localhost/${streamPath}", 0} func init() { InstallPlugin(&PluginConfig{ @@ -37,7 +43,7 @@ func init() { func runPlugin() { OnSubscribeHooks.AddHook(func(s *Subscriber) { if config.AutoPull && s.Publisher == nil { - new(RTSP).Publish(s.StreamPath, strings.Replace(config.RemoteAddr, "${streamPath}", s.StreamPath, -1)) + new(RTSP).PullStream(s.StreamPath, strings.Replace(config.RemoteAddr, "${streamPath}", s.StreamPath, -1)) } }) http.HandleFunc("/rtsp/list", func(w http.ResponseWriter, r *http.Request) { @@ -48,7 +54,7 @@ func runPlugin() { collection.Range(func(key, value interface{}) bool { rtsp := value.(*RTSP) pinfo := &rtsp.RTSPInfo - pinfo.BufferRate = len(rtsp.OutGoing) * 100 / config.BufferLength + //pinfo.BufferRate = len(rtsp.OutGoing) * 100 / config.BufferLength info = append(info, pinfo) return true }) @@ -60,161 +66,248 @@ func runPlugin() { streamPath := r.URL.Query().Get("streamPath") var err error if err == nil { - new(RTSP).Publish(streamPath, targetURL) + new(RTSP).PullStream(streamPath, targetURL) w.Write([]byte(`{"code":0}`)) } else { w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error()))) } }) + if config.ListenAddr != "" { + log.Fatal(ListenRtsp(config.ListenAddr)) + } +} + +func ListenRtsp(addr string) error { + defer log.Println("rtsp server start!") + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + var tempDelay time.Duration + networkBuffer := 204800 + timeoutMillis := config.Timeout + for { + conn, err := listener.Accept() + conn.(*net.TCPConn).SetNoDelay(false) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + fmt.Printf("rtsp: Accept error: %v; retrying in %v", err, tempDelay) + time.Sleep(tempDelay) + continue + } + return err + } + + tempDelay = 0 + timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} + go (&RTSP{ + ID: shortid.MustGenerate(), + Conn: timeoutTCPConn, + connRW: bufio.NewReadWriter(bufio.NewReaderSize(timeoutTCPConn, networkBuffer), bufio.NewWriterSize(timeoutTCPConn, networkBuffer)), + Timeout: config.Timeout, + vRTPChannel: -1, + vRTPControlChannel: -1, + aRTPChannel: -1, + aRTPControlChannel: -1, + }).AcceptPush() + } + return nil } type RTSP struct { Publisher - *RtspClient RTSPInfo + RTSPClientInfo + ID string + Conn *RichConn + connRW *bufio.ReadWriter + connWLock sync.RWMutex + Type SessionType + TransType TransType + SDPRaw string + SDPMap map[string]*SDPInfo + nonce string + closeOld bool + AControl string + VControl string + ACodec string + VCodec string + avcsent bool + aacsent bool + Timeout int + // stats info + fuBuffer []byte + //tcp channels + aRTPChannel int + aRTPControlChannel int + vRTPChannel int + vRTPControlChannel int + UDPServer *UDPServer + UDPClient *UDPClient + SPS []byte + PPS []byte + AudioSpecificConfig []byte + Auth func(string) string +} +type RTSPClientInfo struct { + Agent string + Session string + Sdp *sdp.Session + authLine string + Seq int } type RTSPInfo struct { + URL string SyncCount int64 Header *string BufferRate int + InBytes int + OutBytes int + StreamInfo *StreamInfo } -func (rtsp *RTSP) run() { - fuBuffer := []byte{} - iframeHead := []byte{0x17, 0x01, 0, 0, 0} - pframeHead := []byte{0x27, 0x01, 0, 0, 0} - spsHead := []byte{0xE1, 0, 0} - ppsHead := []byte{0x01, 0, 0} - nalLength := []byte{0, 0, 0, 0} - avcsent := false - aacsent := false - handleNALU := func(nalType byte, payload []byte, ts int64) { - rtsp.SyncCount++ - vl := len(payload) - switch nalType { - // case NALU_SPS: - // r := bytes.NewBuffer([]byte{}) - // r.Write(RTMP_AVC_HEAD) - // util.BigEndian.PutUint16(spsHead[1:], uint16(vl)) - // r.Write(spsHead) - // r.Write(payload) - // case NALU_PPS: - // r := bytes.NewBuffer([]byte{}) - // util.BigEndian.PutUint16(ppsHead[1:], uint16(vl)) - // r.Write(ppsHead) - // r.Write(payload) - // rtsp.PushVideo(0, r.Bytes()) - // avcsent = true - case NALU_IDR_Picture: - if !avcsent { - r := bytes.NewBuffer([]byte{}) - r.Write(RTMP_AVC_HEAD) - util.BigEndian.PutUint16(spsHead[1:], uint16(len(rtsp.SPS))) - r.Write(spsHead) - r.Write(rtsp.SPS) - util.BigEndian.PutUint16(ppsHead[1:], uint16(len(rtsp.PPS))) - r.Write(ppsHead) - r.Write(rtsp.PPS) - rtsp.PushVideo(0, r.Bytes()) - avcsent = true - } - r := bytes.NewBuffer([]byte{}) - util.BigEndian.PutUint24(iframeHead[2:], 0) - r.Write(iframeHead) - util.BigEndian.PutUint32(nalLength, uint32(vl)) - r.Write(nalLength) - r.Write(payload) - rtsp.PushVideo(uint32(ts), r.Bytes()) - case NALU_Non_IDR_Picture: - r := bytes.NewBuffer([]byte{}) - util.BigEndian.PutUint24(pframeHead[2:], 0) - r.Write(pframeHead) - util.BigEndian.PutUint32(nalLength, uint32(vl)) - r.Write(nalLength) - r.Write(payload) - rtsp.PushVideo(uint32(ts), r.Bytes()) - } +type RichConn struct { + net.Conn + timeout time.Duration +} + +func (conn *RichConn) Read(b []byte) (n int, err error) { + if conn.timeout > 0 { + conn.Conn.SetReadDeadline(time.Now().Add(conn.timeout)) + } else { + var t time.Time + conn.Conn.SetReadDeadline(t) } - for { - select { - case <-rtsp.Done(): - return - case data, ok := <-rtsp.OutGoing: - if ok && data[0] == 36 { - if data[1] == 0 { - cc := data[4] & 0xF - //rtp header - rtphdr := 12 + cc*4 + return conn.Conn.Read(b) +} - //packet time - ts := (int64(data[8]) << 24) + (int64(data[9]) << 16) + (int64(data[10]) << 8) + (int64(data[11])) +func (conn *RichConn) Write(b []byte) (n int, err error) { + if conn.timeout > 0 { + conn.Conn.SetWriteDeadline(time.Now().Add(conn.timeout)) + } else { + var t time.Time + conn.Conn.SetWriteDeadline(t) + } + return conn.Conn.Write(b) +} +func (rtsp *RTSP) handleNALU(nalType byte, payload []byte, ts int64) { + rtsp.SyncCount++ + vl := len(payload) + switch nalType { + // case NALU_SPS: + // r := bytes.NewBuffer([]byte{}) + // r.Write(RTMP_AVC_HEAD) + // util.BigEndian.PutUint16(spsHead[1:], uint16(vl)) + // r.Write(spsHead) + // r.Write(payload) + // case NALU_PPS: + // r := bytes.NewBuffer([]byte{}) + // util.BigEndian.PutUint16(ppsHead[1:], uint16(vl)) + // r.Write(ppsHead) + // r.Write(payload) + // rtsp.PushVideo(0, r.Bytes()) + // avcsent = true + case NALU_IDR_Picture: + if !rtsp.avcsent { + r := bytes.NewBuffer([]byte{}) + r.Write(RTMP_AVC_HEAD) + spsHead := []byte{0xE1, 0, 0} + util.BigEndian.PutUint16(spsHead[1:], uint16(len(rtsp.SPS))) + r.Write(spsHead) + r.Write(rtsp.SPS) + ppsHead := []byte{0x01, 0, 0} + util.BigEndian.PutUint16(ppsHead[1:], uint16(len(rtsp.PPS))) + r.Write(ppsHead) + r.Write(rtsp.PPS) + rtsp.PushVideo(0, r.Bytes()) + rtsp.avcsent = true + } + r := bytes.NewBuffer([]byte{}) + iframeHead := []byte{0x17, 0x01, 0, 0, 0} + util.BigEndian.PutUint24(iframeHead[2:], 0) + r.Write(iframeHead) + nalLength := []byte{0, 0, 0, 0} + util.BigEndian.PutUint32(nalLength, uint32(vl)) + r.Write(nalLength) + r.Write(payload) + rtsp.PushVideo(uint32(ts), r.Bytes()) + case NALU_Non_IDR_Picture: + r := bytes.NewBuffer([]byte{}) + pframeHead := []byte{0x27, 0x01, 0, 0, 0} + util.BigEndian.PutUint24(pframeHead[2:], 0) + r.Write(pframeHead) + nalLength := []byte{0, 0, 0, 0} + util.BigEndian.PutUint32(nalLength, uint32(vl)) + r.Write(nalLength) + r.Write(payload) + rtsp.PushVideo(uint32(ts), r.Bytes()) + } +} +func (rtsp *RTSP) handleRTP(pack *RTPPack) { + data := pack.Buffer + switch pack.Type { + case RTP_TYPE_AUDIO: + if !rtsp.aacsent { + rtsp.PushAudio(0, append([]byte{0xAF, 0x00}, rtsp.AudioSpecificConfig...)) + rtsp.aacsent = true + } + cc := data[0] & 0xF + rtphdr := 12 + cc*4 + payload := data[rtphdr:] + auHeaderLen := (int16(payload[0]) << 8) + int16(payload[1]) + auHeaderLen = auHeaderLen >> 3 + auHeaderCount := int(auHeaderLen / 2) + var auLenArray []int + for iIndex := 0; iIndex < int(auHeaderCount); iIndex++ { + auHeaderInfo := (int16(payload[2+2*iIndex]) << 8) + int16(payload[2+2*iIndex+1]) + auLen := auHeaderInfo >> 3 + auLenArray = append(auLenArray, int(auLen)) + } + startOffset := 2 + 2*auHeaderCount + for _, auLen := range auLenArray { + endOffset := startOffset + auLen + addHead := []byte{0xAF, 0x01} + rtsp.PushAudio(0, append(addHead, payload[startOffset:endOffset]...)) + startOffset = startOffset + auLen + } + case RTP_TYPE_VIDEO: + cc := data[0] & 0xF + //rtp header + rtphdr := 12 + cc*4 - //packet number - //packno := (int64(data[6]) << 8) + int64(data[7]) - data = data[4+rtphdr:] - nalType := data[0] & 0x1F + //packet time + ts := (int64(data[4]) << 24) + (int64(data[5]) << 16) + (int64(data[6]) << 8) + (int64(data[7])) - if nalType >= 1 && nalType <= 23 { - handleNALU(nalType, data, ts) - } else if nalType == 28 { - isStart := data[1]&0x80 != 0 - isEnd := data[1]&0x40 != 0 - nalType := data[1] & 0x1F - //nri := (data[1]&0x60)>>5 - nal := data[0]&0xE0 | data[1]&0x1F - if isStart { - fuBuffer = []byte{0} - } - fuBuffer = append(fuBuffer, data[2:]...) - if isEnd { - fuBuffer[0] = nal - handleNALU(nalType, fuBuffer, ts) - } - } + //packet number + //packno := (int64(data[6]) << 8) + int64(data[7]) + data = data[rtphdr:] + nalType := data[0] & 0x1F - } else if data[1] == 2 { - // audio - if !aacsent { - rtsp.PushAudio(0, append([]byte{0xAF, 0x00}, rtsp.AudioSpecificConfig...)) - aacsent = true - } - cc := data[4] & 0xF - rtphdr := 12 + cc*4 - payload := data[4+rtphdr:] - auHeaderLen := (int16(payload[0]) << 8) + int16(payload[1]) - auHeaderLen = auHeaderLen >> 3 - auHeaderCount := int(auHeaderLen / 2) - var auLenArray []int - for iIndex := 0; iIndex < int(auHeaderCount); iIndex++ { - auHeaderInfo := (int16(payload[2+2*iIndex]) << 8) + int16(payload[2+2*iIndex+1]) - auLen := auHeaderInfo >> 3 - auLenArray = append(auLenArray, int(auLen)) - } - startOffset := 2 + 2*auHeaderCount - for _, auLen := range auLenArray { - endOffset := startOffset + auLen - addHead := []byte{0xAF, 0x01} - rtsp.PushAudio(0, append(addHead, payload[startOffset:endOffset]...)) - startOffset = startOffset + auLen - } - } + if nalType >= 1 && nalType <= 23 { + rtsp.handleNALU(nalType, data, ts) + } else if nalType == 28 { + isStart := data[1]&0x80 != 0 + isEnd := data[1]&0x40 != 0 + nalType := data[1] & 0x1F + //nri := (data[1]&0x60)>>5 + nal := data[0]&0xE0 | data[1]&0x1F + if isStart { + rtsp.fuBuffer = []byte{0} + } + rtsp.fuBuffer = append(rtsp.fuBuffer, data[2:]...) + if isEnd { + rtsp.fuBuffer[0] = nal + rtsp.handleNALU(nalType, rtsp.fuBuffer, ts) } } } } -func (rtsp *RTSP) Publish(streamPath string, rtspUrl string) (result bool) { - if result = rtsp.Publisher.Publish(streamPath); result { - rtsp.Type = "RTSP" - rtsp.RTSPInfo.StreamInfo = &rtsp.Stream.StreamInfo - rtsp.RtspClient = RtspClientNew(config.BufferLength) - rtsp.RTSPInfo.Header = &rtsp.RtspClient.Header - if status, message := rtsp.RtspClient.Client(rtspUrl); !status { - log.Println(message) - return false - } - collection.Store(streamPath, rtsp) - go rtsp.run() - } - return -} diff --git a/request.go b/request.go new file mode 100644 index 0000000..759adad --- /dev/null +++ b/request.go @@ -0,0 +1,100 @@ +package rtsp + +import ( + "fmt" + "log" + "regexp" + "strconv" + "strings" +) + +const ( + RTSP_VERSION = "RTSP/1.0" +) + +const ( + // Client to server for presentation and stream objects; recommended + DESCRIBE = "DESCRIBE" + // Bidirectional for client and stream objects; optional + ANNOUNCE = "ANNOUNCE" + // Bidirectional for client and stream objects; optional + GET_PARAMETER = "GET_PARAMETER" + // Bidirectional for client and stream objects; required for Client to server, optional for server to client + OPTIONS = "OPTIONS" + // Client to server for presentation and stream objects; recommended + PAUSE = "PAUSE" + // Client to server for presentation and stream objects; required + PLAY = "PLAY" + // Client to server for presentation and stream objects; optional + RECORD = "RECORD" + // Server to client for presentation and stream objects; optional + REDIRECT = "REDIRECT" + // Client to server for stream objects; required + SETUP = "SETUP" + // Bidirectional for presentation and stream objects; optional + SET_PARAMETER = "SET_PARAMETER" + // Client to server for presentation and stream objects; required + TEARDOWN = "TEARDOWN" + DATA = "DATA" +) + +type Request struct { + Method string + URL string + Version string + Header map[string]string + Content string + Body string +} + +func NewRequest(content string) *Request { + lines := strings.Split(strings.TrimSpace(content), "\r\n") + if len(lines) == 0 { + return nil + } + items := regexp.MustCompile("\\s+").Split(strings.TrimSpace(lines[0]), -1) + if len(items) < 3 { + return nil + } + if !strings.HasPrefix(items[2], "RTSP") { + log.Printf("invalid rtsp request, line[0] %s", lines[0]) + return nil + } + header := make(map[string]string) + for i := 1; i < len(lines); i++ { + line := strings.TrimSpace(lines[i]) + headerItems := regexp.MustCompile(":\\s+").Split(line, 2) + if len(headerItems) < 2 { + continue + } + header[headerItems[0]] = headerItems[1] + } + return &Request{ + Method: items[0], + URL: items[1], + Version: items[2], + Header: header, + Content: content, + Body: "", + } +} + +func (r *Request) String() string { + str := fmt.Sprintf("%s %s %s\r\n", r.Method, r.URL, r.Version) + for key, value := range r.Header { + str += fmt.Sprintf("%s: %s\r\n", key, value) + } + str += "\r\n" + str += r.Body + return str +} + +func (r *Request) GetContentLength() int { + v, err := strconv.ParseInt(r.Header["Content-Length"], 10, 64) + if err != nil { + return 0 + } else { + return int(v) + } +} + diff --git a/response.go b/response.go new file mode 100644 index 0000000..6b4c195 --- /dev/null +++ b/response.go @@ -0,0 +1,51 @@ +package rtsp + +import ( + "fmt" + "strconv" +) + +type Response struct { + Version string + StatusCode int + Status string + Header map[string]interface{} + Body string +} + +func NewResponse(statusCode int, status, cSeq, sid, body string) *Response { + res := &Response{ + Version: RTSP_VERSION, + StatusCode: statusCode, + Status: status, + Header: map[string]interface{}{"CSeq": cSeq, "Session": sid}, + Body: body, + } + len := len(body) + if len > 0 { + res.Header["Content-Length"] = strconv.Itoa(len) + } else { + delete(res.Header, "Content-Length") + } + return res +} + +func (r *Response) String() string { + str := fmt.Sprintf("%s %d %s\r\n", r.Version, r.StatusCode, r.Status) + for key, value := range r.Header { + str += fmt.Sprintf("%s: %s\r\n", key, value) + } + str += "\r\n" + str += r.Body + return str +} + +func (r *Response) SetBody(body string) { + len := len(body) + r.Body = body + if len > 0 { + r.Header["Content-Length"] = strconv.Itoa(len) + } else { + delete(r.Header, "Content-Length") + } +} diff --git a/rtp-parser.go b/rtp-parser.go new file mode 100644 index 0000000..a512491 --- /dev/null +++ b/rtp-parser.go @@ -0,0 +1,68 @@ +package rtsp + +import ( + "encoding/binary" +) + +const ( + RTP_FIXED_HEADER_LENGTH = 12 +) + +type RTPInfo struct { + Version int + Padding bool + Extension bool + CSRCCnt int + Marker bool + PayloadType int + SequenceNumber int + Timestamp int + SSRC int + Payload []byte + PayloadOffset int +} + +func ParseRTP(rtpBytes []byte) *RTPInfo { + if len(rtpBytes) < RTP_FIXED_HEADER_LENGTH { + return nil + } + firstByte := rtpBytes[0] + secondByte := rtpBytes[1] + info := &RTPInfo{ + Version: int(firstByte >> 6), + Padding: (firstByte>>5)&1 == 1, + Extension: (firstByte>>4)&1 == 1, + CSRCCnt: int(firstByte & 0x0f), + + Marker: secondByte>>7 == 1, + PayloadType: int(secondByte & 0x7f), + SequenceNumber: int(binary.BigEndian.Uint16(rtpBytes[2:])), + Timestamp: int(binary.BigEndian.Uint32(rtpBytes[4:])), + SSRC: int(binary.BigEndian.Uint32(rtpBytes[8:])), + } + offset := RTP_FIXED_HEADER_LENGTH + end := len(rtpBytes) + if end-offset >= 4*info.CSRCCnt { + offset += 4 * info.CSRCCnt + } + if info.Extension && end-offset >= 4 { + extLen := 4 * int(binary.BigEndian.Uint16(rtpBytes[offset+2:])) + offset += 4 + if end-offset >= extLen { + offset += extLen + } + } + if info.Padding && end-offset > 0 { + paddingLen := int(rtpBytes[end-1]) + if end-offset >= paddingLen { + end -= paddingLen + } + } + info.Payload = rtpBytes[offset:end] + info.PayloadOffset = offset + if end-offset < 1 { + return nil + } + + return info +} diff --git a/sdp-parser.go b/sdp-parser.go new file mode 100644 index 0000000..d3cc1bd --- /dev/null +++ b/sdp-parser.go @@ -0,0 +1,105 @@ +package rtsp + +import ( + "encoding/base64" + "encoding/hex" + "strconv" + "strings" +) + +type SDPInfo struct { + AVType string + Codec string + TimeScale int + Control string + Rtpmap int + Config []byte + SpropParameterSets [][]byte + PayloadType int + SizeLength int + IndexLength int +} + +func ParseSDP(sdpRaw string) map[string]*SDPInfo { + sdpMap := make(map[string]*SDPInfo) + var info *SDPInfo + for _, line := range strings.Split(sdpRaw, "\n") { + line = strings.TrimSpace(line) + typeval := strings.SplitN(line, "=", 2) + if len(typeval) == 2 { + fields := strings.SplitN(typeval[1], " ", 2) + switch typeval[0] { + case "m": + if len(fields) > 0 { + switch fields[0] { + case "audio", "video": + sdpMap[fields[0]] = &SDPInfo{AVType: fields[0]} + info = sdpMap[fields[0]] + mfields := strings.Split(fields[1], " ") + if len(mfields) >= 3 { + info.PayloadType, _ = strconv.Atoi(mfields[2]) + } + } + } + + case "a": + if info != nil { + for _, field := range fields { + keyval := strings.SplitN(field, ":", 2) + if len(keyval) >= 2 { + key := keyval[0] + val := keyval[1] + switch key { + case "control": + info.Control = val + case "rtpmap": + info.Rtpmap, _ = strconv.Atoi(val) + } + } + keyval = strings.Split(field, "/") + if len(keyval) >= 2 { + key := keyval[0] + switch key { + case "MPEG4-GENERIC": + info.Codec = "aac" + case "H264": + info.Codec = "h264" + case "H265": + info.Codec = "h265" + } + if i, err := strconv.Atoi(keyval[1]); err == nil { + info.TimeScale = i + } + } + keyval = strings.Split(field, ";") + if len(keyval) > 1 { + for _, field := range keyval { + keyval := strings.SplitN(field, "=", 2) + if len(keyval) == 2 { + key := strings.TrimSpace(keyval[0]) + val := keyval[1] + switch key { + case "config": + info.Config, _ = hex.DecodeString(val) + case "sizelength": + info.SizeLength, _ = strconv.Atoi(val) + case "indexlength": + info.IndexLength, _ = strconv.Atoi(val) + case "sprop-parameter-sets": + fields := strings.Split(val, ",") + for _, field := range fields { + val, _ := base64.StdEncoding.DecodeString(field) + info.SpropParameterSets = append(info.SpropParameterSets, val) + } + } + } + } + } + } + } + + } + } + } + return sdpMap +} diff --git a/session.go b/session.go new file mode 100644 index 0000000..91beba3 --- /dev/null +++ b/session.go @@ -0,0 +1,638 @@ +package rtsp + +import ( + "bytes" + "crypto/md5" + "encoding/binary" + "fmt" + "io" + "net/url" + "regexp" + "strconv" + "strings" + "time" + + . "github.com/Monibuca/engine/v2" + "github.com/teris-io/shortid" +) + +type RTPPack struct { + Type RTPType + Buffer []byte +} + +type SessionType int + +const ( + SESSION_TYPE_PUSHER SessionType = iota + SESSEION_TYPE_PLAYER +) + +func (st SessionType) String() string { + switch st { + case SESSION_TYPE_PUSHER: + return "pusher" + case SESSEION_TYPE_PLAYER: + return "player" + } + return "unknow" +} + +type RTPType int + +const ( + RTP_TYPE_AUDIO RTPType = iota + RTP_TYPE_VIDEO + RTP_TYPE_AUDIOCONTROL + RTP_TYPE_VIDEOCONTROL +) + +func (rt RTPType) String() string { + switch rt { + case RTP_TYPE_AUDIO: + return "audio" + case RTP_TYPE_VIDEO: + return "video" + case RTP_TYPE_AUDIOCONTROL: + return "audio control" + case RTP_TYPE_VIDEOCONTROL: + return "video control" + } + return "unknow" +} + +type TransType int + +const ( + TRANS_TYPE_TCP TransType = iota + TRANS_TYPE_UDP +) + +func (tt TransType) String() string { + switch tt { + case TRANS_TYPE_TCP: + return "TCP" + case TRANS_TYPE_UDP: + return "UDP" + } + return "unknow" +} + +const UDP_BUF_SIZE = 1048576 + +func (session *RTSP) SessionString() string { + return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.StreamPath, session.ID, session.Conn.RemoteAddr().String()) +} + +func (session *RTSP) Stop() { + if session.Conn != nil { + session.connRW.Flush() + session.Conn.Close() + session.Conn = nil + } + if session.UDPClient != nil { + session.UDPClient.Stop() + session.UDPClient = nil + } + if session.UDPServer != nil { + session.UDPServer.Stop() + session.UDPServer = nil + } + if session.Running() { + collection.Delete(session.StreamPath) + session.Cancel() + } +} + +// AcceptPush 接受推流 +func (session *RTSP) AcceptPush() { + defer session.Stop() + buf2 := make([]byte, 2) + timer := time.Unix(0, 0) + for { + buf1, err := session.connRW.ReadByte() + if err != nil { + Println(err) + return + } + if buf1 == 0x24 { //rtp data + if buf1, err = session.connRW.ReadByte(); err != nil { + Println(err) + return + } + if _, err := io.ReadFull(session.connRW, buf2); err != nil { + Println(err) + return + } + channel := int(buf1) + rtpLen := int(binary.BigEndian.Uint16(buf2)) + rtpBytes := make([]byte, rtpLen) + if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil { + Println(err) + return + } + var pack *RTPPack + switch channel { + case session.aRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIO, + Buffer: rtpBytes, + } + elapsed := time.Now().Sub(timer) + if elapsed >= 30*time.Second { + Println("Recv an audio RTP package") + timer = time.Now() + } + case session.aRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIOCONTROL, + Buffer: rtpBytes, + } + case session.vRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEO, + Buffer: rtpBytes, + } + elapsed := time.Now().Sub(timer) + if elapsed >= 30*time.Second { + Println("Recv an video RTP package") + timer = time.Now() + } + case session.vRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEOCONTROL, + Buffer: rtpBytes, + } + default: + Printf("unknow rtp pack type, %v", pack.Type) + continue + } + if pack == nil { + Printf("session tcp got nil rtp pack") + continue + } + session.InBytes += rtpLen + 4 + session.handleRTP(pack) + } else { // rtsp cmd + reqBuf := bytes.NewBuffer(nil) + reqBuf.WriteByte(buf1) + for { + if line, isPrefix, err := session.connRW.ReadLine(); err != nil { + Println(err) + return + } else { + reqBuf.Write(line) + if !isPrefix { + reqBuf.WriteString("\r\n") + } + if len(line) == 0 { + req := NewRequest(reqBuf.String()) + if req == nil { + break + } + session.InBytes += reqBuf.Len() + contentLen := req.GetContentLength() + session.InBytes += contentLen + if contentLen > 0 { + bodyBuf := make([]byte, contentLen) + if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil { + Println(err) + return + } else if n != contentLen { + Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n) + return + } + req.Body = string(bodyBuf) + } + session.handleRequest(req) + break + } + } + } + } + } +} + +func (session *RTSP) CheckAuth(authLine string, method string) error { + realmRex := regexp.MustCompile(`realm="(.*?)"`) + nonceRex := regexp.MustCompile(`nonce="(.*?)"`) + usernameRex := regexp.MustCompile(`username="(.*?)"`) + responseRex := regexp.MustCompile(`response="(.*?)"`) + uriRex := regexp.MustCompile(`uri="(.*?)"`) + + realm := "" + nonce := "" + username := "" + response := "" + uri := "" + result1 := realmRex.FindStringSubmatch(authLine) + if len(result1) == 2 { + realm = result1[1] + } else { + return fmt.Errorf("CheckAuth error : no realm found") + } + result1 = nonceRex.FindStringSubmatch(authLine) + if len(result1) == 2 { + nonce = result1[1] + } else { + return fmt.Errorf("CheckAuth error : no nonce found") + } + if session.nonce != nonce { + return fmt.Errorf("CheckAuth error : sessionNonce not same as nonce") + } + + result1 = usernameRex.FindStringSubmatch(authLine) + if len(result1) == 2 { + username = result1[1] + } else { + return fmt.Errorf("CheckAuth error : username not found") + } + + result1 = responseRex.FindStringSubmatch(authLine) + if len(result1) == 2 { + response = result1[1] + } else { + return fmt.Errorf("CheckAuth error : response not found") + } + + result1 = uriRex.FindStringSubmatch(authLine) + if len(result1) == 2 { + uri = result1[1] + } else { + return fmt.Errorf("CheckAuth error : uri not found") + } + // var user models.User + // err := db.SQLite.Where("Username = ?", username).First(&user).Error + // if err != nil { + // return fmt.Errorf("CheckAuth error : user not exists") + // } + md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, session.Auth(username))))) + md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri)))) + myResponse := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL)))) + if myResponse != response { + return fmt.Errorf("CheckAuth error : response not equal") + } + return nil +} + +func (session *RTSP) handleRequest(req *Request) { + //if session.Timeout > 0 { + // session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second)) + //} + Printf("<<<\n%s", req) + res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "") + defer func() { + if p := recover(); p != nil { + Printf("handleRequest err ocurs:%v", p) + res.StatusCode = 500 + res.Status = fmt.Sprintf("Inner Server Error, %v", p) + } + Printf(">>>\n%s", res) + outBytes := []byte(res.String()) + session.connWLock.Lock() + session.connRW.Write(outBytes) + session.connRW.Flush() + session.connWLock.Unlock() + session.OutBytes += len(outBytes) + switch req.Method { + case "PLAY", "RECORD": + switch session.Type { + case SESSEION_TYPE_PLAYER: + // if session.Pusher.HasPlayer(session.Player) { + // session.Player.Pause(false) + // } else { + // session.Pusher.AddPlayer(session.Player) + // } + } + case "TEARDOWN": + { + session.Stop() + return + } + } + if res.StatusCode != 200 && res.StatusCode != 401 { + Printf("Response request error[%d]. stop session.", res.StatusCode) + session.Stop() + } + }() + if req.Method != "OPTIONS" { + if session.Auth != nil { + authLine := req.Header["Authorization"] + authFailed := true + if authLine != "" { + err := session.CheckAuth(authLine, req.Method) + if err == nil { + authFailed = false + } else { + Printf("%v", err) + } + } + if authFailed { + res.StatusCode = 401 + res.Status = "Unauthorized" + nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate()))) + session.nonce = nonce + res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce) + return + } + } + } + switch req.Method { + case "OPTIONS": + res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD" + case "ANNOUNCE": + session.Type = SESSION_TYPE_PUSHER + session.URL = req.URL + + url, err := url.Parse(req.URL) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid URL" + return + } + streamPath := strings.TrimPrefix(url.Path,"/") + + session.SDPRaw = req.Body + session.SDPMap = ParseSDP(req.Body) + sdp, ok := session.SDPMap["audio"] + if ok { + session.AControl = sdp.Control + session.ACodec = sdp.Codec + session.AudioSpecificConfig = sdp.Config + Printf("audio codec[%s]\n", session.ACodec) + } + if sdp, ok = session.SDPMap["video"];ok { + session.VControl = sdp.Control + session.VCodec = sdp.Codec + session.SPS = sdp.SpropParameterSets[0] + session.PPS = sdp.SpropParameterSets[1] + Printf("video codec[%s]\n", session.VCodec) + } + if session.Publisher.Publish(streamPath) { + session.Stream.Type = "RTSP" + session.RTSPInfo.StreamInfo = &session.Stream.StreamInfo + collection.Store(streamPath, session) + } + case "DESCRIBE": + session.Type = SESSEION_TYPE_PLAYER + session.URL = req.URL + url, err := url.Parse(req.URL) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid URL" + return + } + streamPath := url.Path + stream := FindStream(streamPath) + if stream == nil { + return + } + // + //res.SetBody(session.SDPRaw) + case "SETUP": + ts := req.Header["Transport"] + // control字段可能是`stream=1`字样,也可能是rtsp://...字样。即control可能是url的path,也可能是整个url + // 例1: + // a=control:streamid=1 + // 例2: + // a=control:rtsp://192.168.1.64/trackID=1 + // 例3: + // a=control:?ctype=video + setupUrl, err := url.Parse(req.URL) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid URL" + return + } + if setupUrl.Port() == "" { + setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host) + } + setupPath := setupUrl.String() + + // error status. SETUP without ANNOUNCE or DESCRIBE. + //if session.Pusher == nil { + // res.StatusCode = 500 + // res.Status = "Error Status" + // return + //} + vPath := "" + if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 { + vControlUrl, err := url.Parse(session.VControl) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid VControl" + return + } + if vControlUrl.Port() == "" { + vControlUrl.Host = fmt.Sprintf("%s:554", vControlUrl.Host) + } + vPath = vControlUrl.String() + } else { + vPath = session.VControl + } + + aPath := "" + if strings.Index(strings.ToLower(session.AControl), "rtsp://") == 0 { + aControlUrl, err := url.Parse(session.AControl) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid AControl" + return + } + if aControlUrl.Port() == "" { + aControlUrl.Host = fmt.Sprintf("%s:554", aControlUrl.Host) + } + aPath = aControlUrl.String() + } else { + aPath = session.AControl + } + + mtcp := regexp.MustCompile("interleaved=(\\d+)(-(\\d+))?") + mudp := regexp.MustCompile("client_port=(\\d+)(-(\\d+))?") + + if tcpMatchs := mtcp.FindStringSubmatch(ts); tcpMatchs != nil { + session.TransType = TRANS_TYPE_TCP + if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { + session.aRTPChannel, _ = strconv.Atoi(tcpMatchs[1]) + session.aRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3]) + } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { + session.vRTPChannel, _ = strconv.Atoi(tcpMatchs[1]) + session.vRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3]) + } else { + res.StatusCode = 500 + res.Status = fmt.Sprintf("SETUP [TCP] got UnKown control:%s", setupPath) + Printf("SETUP [TCP] got UnKown control:%s", setupPath) + } + Printf("Parse SETUP req.TRANSPORT:TCP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) + } else if udpMatchs := mudp.FindStringSubmatch(ts); udpMatchs != nil { + session.TransType = TRANS_TYPE_UDP + // no need for tcp timeout. + session.Conn.timeout = 0 + if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil { + session.UDPClient = &UDPClient{} + } + if session.Type == SESSION_TYPE_PUSHER && session.UDPServer == nil { + session.UDPServer = &UDPServer{ + Session: session, + } + } + Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) + if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { + if session.Type == SESSEION_TYPE_PLAYER { + session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupAudio(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup audio error, %v", err) + return + } + } + if session.Type == SESSION_TYPE_PUSHER { + if err := session.UDPServer.SetupAudio(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp server setup audio error, %v", err) + return + } + tss := strings.Split(ts, ";") + idx := -1 + for i, val := range tss { + if val == udpMatchs[0] { + idx = i + } + } + tail := append([]string{}, tss[idx+1:]...) + tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.APort, session.UDPServer.AControlPort)) + tss = append(tss, tail...) + ts = strings.Join(tss, ";") + } + } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { + if session.Type == SESSEION_TYPE_PLAYER { + session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupVideo(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup video error, %v", err) + return + } + } + + if session.Type == SESSION_TYPE_PUSHER { + if err := session.UDPServer.SetupVideo(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp server setup video error, %v", err) + return + } + tss := strings.Split(ts, ";") + idx := -1 + for i, val := range tss { + if val == udpMatchs[0] { + idx = i + } + } + tail := append([]string{}, tss[idx+1:]...) + tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.VPort, session.UDPServer.VControlPort)) + tss = append(tss, tail...) + ts = strings.Join(tss, ";") + } + } else { + Printf("SETUP [UDP] got UnKown control:%s", setupPath) + } + } + res.Header["Transport"] = ts + case "PLAY": + // error status. PLAY without ANNOUNCE or DESCRIBE. + // if session.Pusher == nil { + // res.StatusCode = 500 + // res.Status = "Error Status" + // return + // } + res.Header["Range"] = req.Header["Range"] + case "RECORD": + // error status. RECORD without ANNOUNCE or DESCRIBE. + // if session.Pusher == nil { + // res.StatusCode = 500 + // res.Status = "Error Status" + // return + // } + case "PAUSE": + // if session.Player == nil { + // res.StatusCode = 500 + // res.Status = "Error Status" + // return + // } + // session.Player.Pause(true) + } +} + +func (session *RTSP) SendRTP(pack *RTPPack) (err error) { + if pack == nil { + err = fmt.Errorf("player send rtp got nil pack") + return + } + if session.TransType == TRANS_TYPE_UDP { + if session.UDPClient == nil { + err = fmt.Errorf("player use udp transport but udp client not found") + return + } + err = session.UDPClient.SendRTP(pack) + session.OutBytes += len(pack.Buffer) + return + } + switch pack.Type { + case RTP_TYPE_AUDIO: + bufChannel := make([]byte, 2) + bufChannel[0] = 0x24 + bufChannel[1] = byte(session.aRTPChannel) + session.connWLock.Lock() + session.connRW.Write(bufChannel) + bufLen := make([]byte, 2) + binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer))) + session.connRW.Write(bufLen) + session.connRW.Write(pack.Buffer) + session.connRW.Flush() + session.connWLock.Unlock() + session.OutBytes += len(pack.Buffer) + 4 + case RTP_TYPE_AUDIOCONTROL: + bufChannel := make([]byte, 2) + bufChannel[0] = 0x24 + bufChannel[1] = byte(session.aRTPControlChannel) + session.connWLock.Lock() + session.connRW.Write(bufChannel) + bufLen := make([]byte, 2) + binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer))) + session.connRW.Write(bufLen) + session.connRW.Write(pack.Buffer) + session.connRW.Flush() + session.connWLock.Unlock() + session.OutBytes += len(pack.Buffer) + 4 + case RTP_TYPE_VIDEO: + bufChannel := make([]byte, 2) + bufChannel[0] = 0x24 + bufChannel[1] = byte(session.vRTPChannel) + session.connWLock.Lock() + session.connRW.Write(bufChannel) + bufLen := make([]byte, 2) + binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer))) + session.connRW.Write(bufLen) + session.connRW.Write(pack.Buffer) + session.connRW.Flush() + session.connWLock.Unlock() + session.OutBytes += len(pack.Buffer) + 4 + case RTP_TYPE_VIDEOCONTROL: + bufChannel := make([]byte, 2) + bufChannel[0] = 0x24 + bufChannel[1] = byte(session.vRTPControlChannel) + session.connWLock.Lock() + session.connRW.Write(bufChannel) + bufLen := make([]byte, 2) + binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Buffer))) + session.connRW.Write(bufLen) + session.connRW.Write(pack.Buffer) + session.connRW.Flush() + session.connWLock.Unlock() + session.OutBytes += len(pack.Buffer) + 4 + default: + err = fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type) + } + return +} diff --git a/udp-client.go b/udp-client.go new file mode 100644 index 0000000..3a96bd3 --- /dev/null +++ b/udp-client.go @@ -0,0 +1,160 @@ +package rtsp + +import ( + "fmt" + "net" + "strings" + . "github.com/Monibuca/engine/v2" +) + +type UDPClient struct { + APort int + AConn *net.UDPConn + AControlPort int + AControlConn *net.UDPConn + VPort int + VConn *net.UDPConn + VControlPort int + VControlConn *net.UDPConn + + Stoped bool +} + +func (s *UDPClient) Stop() { + if s.Stoped { + return + } + s.Stoped = true + if s.AConn != nil { + s.AConn.Close() + s.AConn = nil + } + if s.AControlConn != nil { + s.AControlConn.Close() + s.AControlConn = nil + } + if s.VConn != nil { + s.VConn.Close() + s.VConn = nil + } + if s.VControlConn != nil { + s.VControlConn.Close() + s.VControlConn = nil + } +} + +func (c *UDPClient) SetupAudio() (err error) { + defer func() { + if err != nil { + Println(err) + c.Stop() + } + }() + host := c.AConn.RemoteAddr().String() + host = host[:strings.LastIndex(host, ":")] + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort)) + if err != nil { + return + } + c.AConn, err = net.DialUDP("udp", nil, addr) + if err != nil { + return + } + networkBuffer := 1048576 + if err := c.AConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp client audio conn set read buffer error, %v", err) + } + if err := c.AConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp client audio conn set write buffer error, %v", err) + } + + addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.AControlPort)) + if err != nil { + return + } + c.AControlConn, err = net.DialUDP("udp", nil, addr) + if err != nil { + return + } + if err := c.AControlConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp client audio control conn set read buffer error, %v", err) + } + if err := c.AControlConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp client audio control conn set write buffer error, %v", err) + } + return +} + +func (c *UDPClient) SetupVideo() (err error) { + defer func() { + if err != nil { + Println(err) + c.Stop() + } + }() + host := c.VConn.RemoteAddr().String() + host = host[:strings.LastIndex(host, ":")] + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort)) + if err != nil { + return + } + c.VConn, err = net.DialUDP("udp", nil, addr) + if err != nil { + return + } + networkBuffer := 1048576 + if err := c.VConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp client video conn set read buffer error, %v", err) + } + if err := c.VConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp client video conn set write buffer error, %v", err) + } + + addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VControlPort)) + if err != nil { + return + } + c.VControlConn, err = net.DialUDP("udp", nil, addr) + if err != nil { + return + } + if err := c.VControlConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp client video control conn set read buffer error, %v", err) + } + if err := c.VControlConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp client video control conn set write buffer error, %v", err) + } + return +} + +func (c *UDPClient) SendRTP(pack *RTPPack) (err error) { + if pack == nil { + err = fmt.Errorf("udp client send rtp got nil pack") + return + } + var conn *net.UDPConn + switch pack.Type { + case RTP_TYPE_AUDIO: + conn = c.AConn + case RTP_TYPE_AUDIOCONTROL: + conn = c.AControlConn + case RTP_TYPE_VIDEO: + conn = c.VConn + case RTP_TYPE_VIDEOCONTROL: + conn = c.VControlConn + default: + err = fmt.Errorf("udp client send rtp got unkown pack type[%v]", pack.Type) + return + } + if conn == nil { + err = fmt.Errorf("udp client send rtp pack type[%v] failed, conn not found", pack.Type) + return + } + + if _, err = conn.Write(pack.Buffer);err != nil { + err = fmt.Errorf("udp client write bytes error, %v", err) + return + } + // Printf("udp client write [%d/%d]", n, pack.Buffer.Len()) + return +} diff --git a/udp-server.go b/udp-server.go new file mode 100644 index 0000000..2b08f01 --- /dev/null +++ b/udp-server.go @@ -0,0 +1,242 @@ +package rtsp + +import ( + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + . "github.com/Monibuca/engine/v2" +) + +type UDPServer struct { + Session *RTSP + UDPClient + sync.Mutex +} + +func (s *UDPServer) AddInputBytes(bytes int) { + if s.Session != nil { + s.Session.InBytes += bytes + return + } + panic(fmt.Errorf("session and RTSPClient both nil")) +} + +func (s *UDPServer) HandleRTP(pack *RTPPack) { + s.Lock() + defer s.Unlock() + if s.Session != nil { + s.Session.handleRTP(pack) + } +} + +func (s *UDPServer) Stop() { + if s.Stoped { + return + } + s.Stoped = true + if s.AConn != nil { + s.AConn.Close() + s.AConn = nil + } + if s.AControlConn != nil { + s.AControlConn.Close() + s.AControlConn = nil + } + if s.VConn != nil { + s.VConn.Close() + s.VConn = nil + } + if s.VControlConn != nil { + s.VControlConn.Close() + s.VControlConn = nil + } +} + +func (s *UDPServer) SetupAudio() (err error) { + addr, err := net.ResolveUDPAddr("udp", ":0") + if err != nil { + return + } + s.AConn, err = net.ListenUDP("udp", addr) + if err != nil { + return + } + networkBuffer := 1048576 + if err := s.AConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp server audio conn set read buffer error, %v", err) + } + if err := s.AConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp server audio conn set write buffer error, %v", err) + } + la := s.AConn.LocalAddr().String() + strPort := la[strings.LastIndex(la, ":")+1:] + s.APort, err = strconv.Atoi(strPort) + if err != nil { + return + } + go func() { + bufUDP := make([]byte, UDP_BUF_SIZE) + Printf("udp server start listen audio port[%d]", s.APort) + defer Printf("udp server stop listen audio port[%d]", s.APort) + timer := time.Unix(0, 0) + for !s.Stoped { + if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil { + elapsed := time.Now().Sub(timer) + if elapsed >= 30*time.Second { + Printf("Package recv from AConn.len:%d\n", n) + timer = time.Now() + } + rtpBytes := make([]byte, n) + s.AddInputBytes(n) + copy(rtpBytes, bufUDP) + pack := &RTPPack{ + Type: RTP_TYPE_AUDIO, + Buffer: rtpBytes, + } + s.HandleRTP(pack) + } else { + Println("udp server read audio pack error", err) + continue + } + } + }() + addr, err = net.ResolveUDPAddr("udp", ":0") + if err != nil { + return + } + s.AControlConn, err = net.ListenUDP("udp", addr) + if err != nil { + return + } + if err := s.AControlConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp server audio control conn set read buffer error, %v", err) + } + if err := s.AControlConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp server audio control conn set write buffer error, %v", err) + } + la = s.AControlConn.LocalAddr().String() + strPort = la[strings.LastIndex(la, ":")+1:] + s.AControlPort, err = strconv.Atoi(strPort) + if err != nil { + return + } + go func() { + bufUDP := make([]byte, UDP_BUF_SIZE) + Printf("udp server start listen audio control port[%d]", s.AControlPort) + defer Printf("udp server stop listen audio control port[%d]", s.AControlPort) + for !s.Stoped { + if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil { + //Printf("Package recv from AControlConn.len:%d\n", n) + rtpBytes := make([]byte, n) + s.AddInputBytes(n) + copy(rtpBytes, bufUDP) + pack := &RTPPack{ + Type: RTP_TYPE_AUDIOCONTROL, + Buffer: rtpBytes, + } + s.HandleRTP(pack) + } else { + Println("udp server read audio control pack error", err) + continue + } + } + }() + return +} + +func (s *UDPServer) SetupVideo() (err error) { + addr, err := net.ResolveUDPAddr("udp", ":0") + if err != nil { + return + } + s.VConn, err = net.ListenUDP("udp", addr) + if err != nil { + return + } + networkBuffer := 1048576 + if err := s.VConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp server video conn set read buffer error, %v", err) + } + if err := s.VConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp server video conn set write buffer error, %v", err) + } + la := s.VConn.LocalAddr().String() + strPort := la[strings.LastIndex(la, ":")+1:] + s.VPort, err = strconv.Atoi(strPort) + if err != nil { + return + } + go func() { + bufUDP := make([]byte, UDP_BUF_SIZE) + Printf("udp server start listen video port[%d]", s.VPort) + defer Printf("udp server stop listen video port[%d]", s.VPort) + timer := time.Unix(0, 0) + for !s.Stoped { + if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil { + elapsed := time.Now().Sub(timer) + if elapsed >= 30*time.Second { + Printf("Package recv from VConn.len:%d\n", n) + timer = time.Now() + } + rtpBytes := make([]byte, n) + s.AddInputBytes(n) + copy(rtpBytes, bufUDP) + pack := &RTPPack{ + Type: RTP_TYPE_VIDEO, + Buffer: rtpBytes, + } + s.HandleRTP(pack) + } else { + Println("udp server read video pack error", err) + continue + } + } + }() + + addr, err = net.ResolveUDPAddr("udp", ":0") + if err != nil { + return + } + s.VControlConn, err = net.ListenUDP("udp", addr) + if err != nil { + return + } + if err := s.VControlConn.SetReadBuffer(networkBuffer); err != nil { + Printf("udp server video control conn set read buffer error, %v", err) + } + if err := s.VControlConn.SetWriteBuffer(networkBuffer); err != nil { + Printf("udp server video control conn set write buffer error, %v", err) + } + la = s.VControlConn.LocalAddr().String() + strPort = la[strings.LastIndex(la, ":")+1:] + s.VControlPort, err = strconv.Atoi(strPort) + if err != nil { + return + } + go func() { + bufUDP := make([]byte, UDP_BUF_SIZE) + Printf("udp server start listen video control port[%d]", s.VControlPort) + defer Printf("udp server stop listen video control port[%d]", s.VControlPort) + for !s.Stoped { + if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil { + //Printf("Package recv from VControlConn.len:%d\n", n) + rtpBytes := make([]byte, n) + s.AddInputBytes(n) + copy(rtpBytes, bufUDP) + pack := &RTPPack{ + Type: RTP_TYPE_VIDEOCONTROL, + Buffer: rtpBytes, + } + s.HandleRTP(pack) + } else { + Println("udp server read video control pack error", err) + continue + } + } + }() + return +}