diff --git a/client.go b/client.go index c855aa9..e836558 100644 --- a/client.go +++ b/client.go @@ -1,537 +1,67 @@ package rtsp import ( - "bufio" - "bytes" - "crypto/md5" - "encoding/base64" - "encoding/binary" "errors" - "fmt" - "io" - "net" - "net/url" - "regexp" - "strconv" - "strings" "time" . "github.com/Monibuca/engine/v3" . "github.com/Monibuca/utils/v3" + "github.com/aler9/gortsplib" ) +type RTSPClient struct { + RTSPublisher + Conn *gortsplib.ClientConn +} + // PullStream 从外部拉流 -func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (err error) { +func (rtsp *RTSPClient) PullStream(streamPath string, rtspUrl string) (err error) { rtsp.Stream = &Stream{ StreamPath: streamPath, Type: "RTSP Pull", ExtraProp: rtsp, } if result := rtsp.Publish(); result { - rtsp.TransType = TRANS_TYPE_TCP - rtsp.vRTPChannel = 0 - rtsp.vRTPControlChannel = 1 - rtsp.aRTPChannel = 2 - rtsp.aRTPControlChannel = 3 rtsp.URL = rtspUrl - rtsp.UDPServer = &UDPServer{Session: rtsp} - go rtsp.startStream() + if config.Reconnect { + go func() { + for rtsp.startStream(); rtsp.Err() == nil; rtsp.startStream() { + Printf("reconnecting:%s in 5 seconds", rtspUrl) + time.Sleep(time.Second * 5) + } + rtsp.Conn.Close() + if rtsp.IsTimeout { + go rtsp.PullStream(streamPath, rtspUrl) + } + }() + } else { + go func() { + rtsp.startStream() + rtsp.Conn.Close() + }() + } return } return errors.New("publish badname") } -func DigestAuth(authLine string, method string, URL string) (string, error) { - l, err := url.Parse(URL) - if err != nil { - return "", fmt.Errorf("Url parse error:%v,%v", URL, err) - } - realm := "" - nonce := "" - realmRex := regexp.MustCompile(`realm="(.*?)"`) - result1 := realmRex.FindStringSubmatch(authLine) - nonceRex := regexp.MustCompile(`nonce="(.*?)"`) - result2 := nonceRex.FindStringSubmatch(authLine) - - if len(result1) == 2 { - realm = result1[1] - } else { - return "", fmt.Errorf("auth error : no realm found") - } - if len(result2) == 2 { - nonce = result2[1] - } else { - return "", fmt.Errorf("auth error : no nonce found") - } - // response= md5(md5(username:realm:password):nonce:md5(public_method:url)); - username := l.User.Username() - password, _ := l.User.Password() - l.User = nil - if l.Port() == "" { - l.Host = fmt.Sprintf("%s:%s", l.Host, "554") - } - md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, password)))) - md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, l.String())))) - - response := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL)))) - Authorization := fmt.Sprintf("Digest username=\"%s\", realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"", username, realm, nonce, l.String(), response) - return Authorization, nil -} - -// auth Basic验证 -func BasicAuth(authLine string, method string, URL string) (string, error) { - l, err := url.Parse(URL) - if err != nil { - return "", fmt.Errorf("Url parse error:%v,%v", URL, err) - } - username := l.User.Username() - password, _ := l.User.Password() - userAndpass := []byte(username + ":" + password) - Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(userAndpass)) - return Authorization, nil -} -func (client *RTSP) checkAuth(method string, resp *Response) (string, error) { - if resp.StatusCode == 401 { - // need auth. - AuthHeaders := resp.Header["WWW-Authenticate"] - auths, ok := AuthHeaders.([]string) - if ok { - for _, authLine := range auths { - if strings.IndexAny(authLine, "Digest") == 0 { - // realm="HipcamRealServer", - // nonce="3b27a446bfa49b0c48c3edb83139543d" - client.authLine = authLine - return DigestAuth(authLine, method, client.URL) - } else if strings.IndexAny(authLine, "Basic") == 0 { - return BasicAuth(authLine, method, client.URL) - } - } - return "", fmt.Errorf("auth error") - } else { - authLine, _ := AuthHeaders.(string) - if strings.IndexAny(authLine, "Digest") == 0 { - client.authLine = authLine - return DigestAuth(authLine, method, client.URL) - } else if strings.IndexAny(authLine, "Basic") == 0 { - return BasicAuth(authLine, method, client.URL) - } - } - } - return "", nil -} -func (client *RTSP) requestStream() (err error) { - timeout := time.Duration(5) * time.Second - l, err := url.Parse(client.URL) - if err != nil { - return err - } - if strings.ToLower(l.Scheme) != "rtsp" { - err = fmt.Errorf("RTSP url is invalid") - return err - } - if strings.ToLower(l.Hostname()) == "" { - err = fmt.Errorf("RTSP url is invalid") - return err - } - port := l.Port() - if len(port) == 0 { - port = "554" - } - conn, err := net.DialTimeout("tcp", l.Hostname()+":"+port, timeout) - if err != nil { - // handle error - return err - } - - networkBuffer := 204800 - - timeoutConn := RichConn{ - conn, - timeout, - } - client.Conn = &timeoutConn - client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) - - headers := map[string]string{} - //headers["Require"] = "implicit-play" - // An OPTIONS request returns the request types the server will accept. - resp, err := client.Request("OPTIONS", headers) - if err != nil { - if resp != nil { - Authorization, _ := client.checkAuth("OPTIONS", resp) - if len(Authorization) > 0 { - headers := make(map[string]string) - headers["Require"] = "implicit-play" - headers["Authorization"] = Authorization - // An OPTIONS request returns the request types the server will accept. - resp, err = client.Request("OPTIONS", headers) - } - if err != nil { - return err - } - } else { - return err - } - } - - // A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description, - // typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL. - // In the typical case, there is one media stream each for audio and video. - headers = make(map[string]string) - headers["Accept"] = "application/sdp" - resp, err = client.Request("DESCRIBE", headers) - if err != nil { - if resp != nil { - authorization, _ := client.checkAuth("DESCRIBE", resp) - if len(authorization) > 0 { - headers := make(map[string]string) - headers["Authorization"] = authorization - headers["Accept"] = "application/sdp" - resp, err = client.Request("DESCRIBE", headers) - } - if err != nil { - return err - } - } else { - return err - } - } - client.SDPRaw = resp.Body - client.SDPMap = ParseSDP(client.SDPRaw) - client.VSdp, client.HasVideo = client.SDPMap["video"] - client.ASdp, client.HasAudio = client.SDPMap["audio"] - session := "" - otherChannel := 4 - for t, sdpInfo := range client.SDPMap { - headers = make(map[string]string) - if session != "" { - headers["Session"] = session - } - var _url = sdpInfo.Control - if !strings.HasPrefix(strings.ToLower(sdpInfo.Control), "rtsp://") { - _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(sdpInfo.Control, "/") - } - switch t { - case "video": - client.setVideoTrack() - if client.TransType == TRANS_TYPE_TCP { - headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) - } else { - //RTP/AVP;unicast;client_port=64864-64865 - if err = client.UDPServer.SetupVideo(); err != nil { - Printf("Setup video err.%v", err) - return err - } - headers["Transport"] = fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort) - client.Conn.timeout = 0 // UDP ignore timeout - } - case "audio": - client.setAudioTrack() - if client.TransType == TRANS_TYPE_TCP { - headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) - } else { - if err = client.UDPServer.SetupAudio(); err != nil { - Printf("Setup audio err.%v", err) - return err - } - headers["Transport"] = fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort) - client.Conn.timeout = 0 // UDP ignore timeout - } - default: - if client.TransType == TRANS_TYPE_TCP { - headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", otherChannel, otherChannel+1) - otherChannel += 2 - } else { - //TODO: UDP support - } - } - if resp, err = client.RequestWithPath("SETUP", _url, headers, true); err != nil { - return err - } - session, _ = resp.Header["Session"].(string) - session = strings.Split(session, ";")[0] - } - headers = make(map[string]string) - if session != "" { - headers["Session"] = session - client.Session = session - } - resp, err = client.Request("PLAY", headers) - return err -} - -func (client *RTSP) startStream() { +func (client *RTSPClient) startStream() { if client.Err() != nil { return } - startTime := time.Now() + // startTime := time.Now() //loggerTime := time.Now().Add(-10 * time.Second) - defer func() { - if client.Err() == nil && config.Reconnect { - client.RTSPClientInfo = RTSPClientInfo{} - Printf("reconnecting:%s in 5 seconds", client.URL) - time.AfterFunc(time.Second*5, client.startStream) - } else { - client.Stop() - } - }() - if err := client.requestStream(); err != nil { - Printf("rtsp requestStream err:%v", err) + conn, err := gortsplib.DialRead(client.URL) + if err != nil { + Printf("connect:%s error:%v", client.URL, err) return } - for client.Err() == nil { - if time.Since(startTime) > time.Minute { - startTime = time.Now() - headers := make(map[string]string) - headers["Require"] = "implicit-play" - // An OPTIONS request returns the request types the server will accept. - if err := client.RequestNoResp("GET_PARAMETER", headers); err != nil { - // ignore... - } + client.Conn = conn + tracks := conn.Tracks() + client.setTracks(tracks) + err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + if streamType == gortsplib.StreamTypeRTP { + client.processFunc[trackID](payload) } - b, err := client.connRW.ReadByte() - if err != nil { - Printf("client.connRW.ReadByte err:%v", err) - return - } - switch b { - case '$': // rtp - header := make([]byte, 3) - _, err := io.ReadFull(client.connRW, header) - if err != nil { - Printf("io.ReadFull err:%v", err) - return - } - channel := int(header[0]) - length := binary.BigEndian.Uint16(header[1:]) - content := make([]byte, length) - _, err = io.ReadFull(client.connRW, content) - if err != nil { - Printf("io.ReadFull err:%v", err) - return - } - - switch channel { - case client.aRTPChannel: - if client.RtpAudio != nil { - client.RtpAudio.Push(content) - } - case client.aRTPControlChannel: - - case client.vRTPChannel: - if client.RtpVideo != nil { - client.RtpVideo.Push(content) - } - case client.vRTPControlChannel: - - default: - Printf("unknow rtp pack type, channel:%v", channel) - continue - } - - //if client.debugLogEnable { - // rtp := ParseRTP(pack.Buffer) - // if rtp != nil { - // rtpSN := uint16(rtp.SequenceNumber) - // if client.lastRtpSN != 0 && client.lastRtpSN+1 != rtpSN { - // Printf("%s, %d packets lost, current SN=%d, last SN=%d\n", client.String(), rtpSN-client.lastRtpSN, rtpSN, client.lastRtpSN) - // } - // client.lastRtpSN = rtpSN - // } - // - // elapsed := time.Now().Sub(loggerTime) - // if elapsed >= 30*time.Second { - // Printf("%v read rtp frame.", client) - // loggerTime = time.Now() - // } - //} - - client.InBytes += int(length + 4) - - default: // rtsp - builder := bytes.Buffer{} - builder.WriteByte(b) - contentLen := 0 - for client.Err() == nil { - line, prefix, err := client.connRW.ReadLine() - if err != nil { - Printf("client.connRW.ReadLine err:%v", err) - return - } - if len(line) == 0 { - if contentLen != 0 { - content := make([]byte, contentLen) - _, err = io.ReadFull(client.connRW, content) - if err != nil { - err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) - return - } - builder.Write(content) - } - Printf("<<<[IN]\n%s", builder.String()) - break - } - s := string(line) - builder.Write(line) - if !prefix { - builder.WriteString("\r\n") - } - - if strings.Index(s, "Content-Length:") == 0 { - splits := strings.Split(s, ":") - contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) - if err != nil { - Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) - return - } - } - } - } - } -} - -func (client *RTSP) Request(method string, headers map[string]string) (*Response, error) { - l, err := url.Parse(client.URL) - if err != nil { - return nil, fmt.Errorf("Url parse error:%v", err) - } - l.User = nil - return client.RequestWithPath(method, l.String(), headers, true) -} - -func (client *RTSP) RequestNoResp(method string, headers map[string]string) (err error) { - l, err := url.Parse(client.URL) - if err != nil { - return fmt.Errorf("Url parse error:%v", err) - } - l.User = nil - if _, err = client.RequestWithPath(method, l.String(), headers, false); err != nil { - return err - } - return nil -} - -func (client *RTSP) RequestWithPath(method string, path string, headers map[string]string, needResp bool) (resp *Response, err error) { - headers["User-Agent"] = client.Agent - if len(headers["Authorization"]) == 0 { - if len(client.authLine) != 0 { - Authorization, _ := DigestAuth(client.authLine, method, client.URL) - if len(Authorization) > 0 { - headers["Authorization"] = Authorization - } - } - } - if len(client.Session) > 0 { - headers["Session"] = client.Session - } - client.Seq++ - cseq := client.Seq - builder := bytes.Buffer{} - builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, path)) - builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq)) - for k, v := range headers { - builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)) - } - builder.WriteString(fmt.Sprintf("\r\n")) - s := builder.String() - Printf("[OUT]>>>\n%s", s) - _, err = client.connRW.WriteString(s) - if err != nil { - return - } - client.connRW.Flush() - - if !needResp { - return nil, nil - } - lineCount := 0 - statusCode := 200 - status := "" - sid := "" - contentLen := 0 - respHeader := make(map[string]interface{}) - var line []byte - builder.Reset() - for { - isPrefix := false - if line, isPrefix, err = client.connRW.ReadLine(); err != nil { - return - } - s := string(line) - builder.Write(line) - if !isPrefix { - builder.WriteString("\r\n") - } - if len(line) == 0 { - body := "" - if contentLen > 0 { - content := make([]byte, contentLen) - _, err = io.ReadFull(client.connRW, content) - if err != nil { - err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) - return - } - body = string(content) - builder.Write(content) - } - resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, body) - resp.Header = respHeader - Printf("<<<[IN]\n%s", builder.String()) - - if !(statusCode >= 200 && statusCode <= 300) { - err = fmt.Errorf("Response StatusCode is :%d", statusCode) - return - } - return - } - if lineCount == 0 { - splits := strings.Split(s, " ") - if len(splits) < 3 { - err = fmt.Errorf("StatusCode Line error:%s", s) - return - } - statusCode, err = strconv.Atoi(splits[1]) - if err != nil { - return - } - status = splits[2] - } - lineCount++ - splits := strings.Split(s, ":") - if len(splits) == 2 { - if val, ok := respHeader[splits[0]]; ok { - if slice, ok2 := val.([]string); ok2 { - slice = append(slice, strings.TrimSpace(splits[1])) - respHeader[splits[0]] = slice - } else { - str, _ := val.(string) - slice := []string{str, strings.TrimSpace(splits[1])} - respHeader[splits[0]] = slice - } - } else { - respHeader[splits[0]] = strings.TrimSpace(splits[1]) - } - } - if strings.Index(s, "Session:") == 0 { - splits := strings.Split(s, ":") - sid = strings.TrimSpace(splits[1]) - } - //if strings.Index(s, "CSeq:") == 0 { - // splits := strings.Split(s, ":") - // cseq, err = strconv.Atoi(strings.TrimSpace(splits[1])) - // if err != nil { - // err = fmt.Errorf("Atoi CSeq err. line:%s", s) - // return - // } - //} - if strings.Index(s, "Content-Length:") == 0 { - splits := strings.Split(s, ":") - contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) - if err != nil { - return - } - } - - } - return + }) } diff --git a/go.mod b/go.mod index 872e0ff..f65891c 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/Monibuca/plugin-rtsp/v3 go 1.16 require ( - github.com/Monibuca/engine/v3 v3.3.0 - github.com/Monibuca/utils/v3 v3.0.2 - github.com/pion/rtp v1.6.5 - github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 - golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect + github.com/Monibuca/engine/v3 v3.3.10 + github.com/Monibuca/utils/v3 v3.0.4 + github.com/aler9/gortsplib v0.0.0-20211106122816-6e38851a096e + github.com/pion/rtp v1.7.4 + github.com/pion/sdp/v3 v3.0.2 + golang.org/x/sys v0.0.0-20211113001501-0c823b97ae02 // indirect ) diff --git a/go.sum b/go.sum index 9e744e1..c8cc17f 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/engine/v3 v3.3.0 h1:7zwYsLEHdeVZy6+JjVlaDhl/asr0HG6jirBL4uynj0s= -github.com/Monibuca/engine/v3 v3.3.0/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g= -github.com/Monibuca/utils/v3 v3.0.1/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= -github.com/Monibuca/utils/v3 v3.0.2 h1:n2vr67DHanav8wBC9IENk8xrKzeGJnBsxYUu69s8TrQ= -github.com/Monibuca/utils/v3 v3.0.2/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= +github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/Monibuca/engine/v3 v3.3.10 h1:zRw9aGEmB6K6ee0figdRh2HZFGQSn2nvpptMT3Xm0HY= +github.com/Monibuca/engine/v3 v3.3.10/go.mod h1:LowMZ/iw4t6tfTZkSYZHIA0Z1HE8b7xfTDLO4WhX3Hg= +github.com/Monibuca/utils/v3 v3.0.4 h1:PssGhww+qePzw4qpB3g2DCG5Buru0Cu64UiqtAPuHjc= +github.com/Monibuca/utils/v3 v3.0.4/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/aler9/gortsplib v0.0.0-20211106122816-6e38851a096e h1:qSjVAaIvJukmEuLxV0agmQ5KmBabBK+jzb+eNqG3Z+w= +github.com/aler9/gortsplib v0.0.0-20211106122816-6e38851a096e/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc= +github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= +github.com/asticode/go-astits v1.10.0/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ= github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs= github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4= @@ -24,6 +28,10 @@ github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jK github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/icza/bitio v1.0.0 h1:squ/m1SHyFeCA6+6Gyol1AxV9nmPPlJFT8c2vKdj3U8= +github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A= +github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lToqnXgA8Mz1DP11X4zSJ159C3k= +github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= @@ -38,27 +46,34 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= +github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= +github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI= -github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA= +github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g= +github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 h1:3SNcvBmEPE1YlB1JpVZouslJpI3GBNoiqW7+wb0Rz7w= -github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -67,10 +82,15 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211113001501-0c823b97ae02 h1:7NCfEGl0sfUojmX78nK9pBJuUlSZWEJA/TwASvfiPLo= +golang.org/x/sys v0.0.0-20211113001501-0c823b97ae02/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= diff --git a/main.go b/main.go index d67b369..932e74a 100644 --- a/main.go +++ b/main.go @@ -1,25 +1,25 @@ package rtsp import ( - "bufio" + "encoding/json" "fmt" "log" - "net" "net/http" - "sync" "time" . "github.com/Monibuca/engine/v3" . "github.com/Monibuca/utils/v3" - "github.com/teris-io/shortid" + "github.com/aler9/gortsplib" ) var config = struct { ListenAddr string + UDPAddr string + RTCPAddr string Timeout int Reconnect bool AutoPullList map[string]string -}{":554", 0, false, nil} +}{":554", ":8000", ":8001", 0, false, nil} func init() { InstallPlugin(&PluginConfig{ @@ -28,25 +28,36 @@ func init() { Run: runPlugin, }) } +func getRtspList() (info []*RTSPublisher) { + for _, s := range Streams.ToList() { + if rtsp, ok := s.ExtraProp.(*RTSPublisher); ok { + info = append(info, rtsp) + } + } + return +} func runPlugin() { http.HandleFunc("/api/rtsp/list", func(w http.ResponseWriter, r *http.Request) { + CORS(w, r) + if r.URL.Query().Get("json") != "" { + if jsonData, err := json.Marshal(getRtspList()); err == nil { + w.Write(jsonData) + } else { + w.WriteHeader(500) + } + return + } sse := NewSSE(w, r.Context()) var err error for tick := time.NewTicker(time.Second); err == nil; <-tick.C { - var info []*RTSP - for _, s := range Streams.ToList() { - if rtsp, ok := s.ExtraProp.(*RTSP); ok { - info = append(info, rtsp) - } - } - err = sse.WriteJSON(info) + err = sse.WriteJSON(getRtspList()) } }) http.HandleFunc("/api/rtsp/pull", func(w http.ResponseWriter, r *http.Request) { CORS(w, r) targetURL := r.URL.Query().Get("target") streamPath := r.URL.Query().Get("streamPath") - if err := (&RTSP{RTSPClientInfo: RTSPClientInfo{Agent: "Monibuca"}}).PullStream(streamPath, targetURL); err == nil { + if err := (&RTSPClient{}).PullStream(streamPath, targetURL); err == nil { w.Write([]byte(`{"code":0}`)) } else { w.Write([]byte(fmt.Sprintf(`{"code":1,"msg":"%s"}`, err.Error()))) @@ -54,7 +65,7 @@ func runPlugin() { }) if len(config.AutoPullList) > 0 { for streamPath, url := range config.AutoPullList { - if err := (&RTSP{RTSPClientInfo: RTSPClientInfo{Agent: "Monibuca"}}).PullStream(streamPath, url); err != nil { + if err := (&RTSPClient{}).PullStream(streamPath, url); err != nil { Println(err) } } @@ -66,151 +77,13 @@ func runPlugin() { func ListenRtsp(addr string) error { defer log.Println("rtsp server start!") - listener, err := net.Listen("tcp", addr) - if err != nil { - return err + s := &gortsplib.Server{ + Handler: &RTSPServer{}, + UDPRTPAddress: config.UDPAddr, + UDPRTCPAddress: config.RTCPAddr, + MulticastIPRange: "224.1.0.0/16", + MulticastRTPPort: 8002, + MulticastRTCPPort: 8003, } - var tempDelay time.Duration - networkBuffer := 204800 - timeoutMillis := config.Timeout - for { - conn, err := listener.Accept() - conn.(*net.TCPConn).SetNoDelay(false) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - if max := 1 * time.Second; tempDelay > max { - tempDelay = max - } - fmt.Printf("rtsp: Accept error: %v; retrying in %v", err, tempDelay) - time.Sleep(tempDelay) - continue - } - return err - } - - tempDelay = 0 - timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} - go (&RTSP{ - ID: shortid.MustGenerate(), - Conn: timeoutTCPConn, - connRW: bufio.NewReadWriter(bufio.NewReaderSize(timeoutTCPConn, networkBuffer), bufio.NewWriterSize(timeoutTCPConn, networkBuffer)), - Timeout: config.Timeout, - vRTPChannel: -1, - vRTPControlChannel: -1, - aRTPChannel: -1, - aRTPControlChannel: -1, - }).AcceptPush() - } - return nil -} - -type RTSP struct { - *Stream `json:"-"` - URL string - SDPRaw string - InBytes int - OutBytes int - RTSPClientInfo - ID string - Conn *RichConn `json:"-"` - connRW *bufio.ReadWriter - connWLock sync.RWMutex - Type SessionType - TransType TransType - - SDPMap map[string]*SDPInfo - nonce string - ASdp *SDPInfo - VSdp *SDPInfo - Timeout int - //tcp channels - aRTPChannel int - aRTPControlChannel int - vRTPChannel int - vRTPControlChannel int - UDPServer *UDPServer `json:"-"` - UDPClient *UDPClient `json:"-"` - Auth func(string) string `json:"-"` - HasVideo bool - HasAudio bool - RtpAudio *RTPAudio - RtpVideo *RTPVideo -} - -func (rtsp *RTSP) setVideoTrack() { - if rtsp.VSdp.Codec == "H264" { - rtsp.RtpVideo = rtsp.NewRTPVideo(7) - if len(rtsp.VSdp.SpropParameterSets) > 1 { - rtsp.RtpVideo.PushNalu(0, 0, rtsp.VSdp.SpropParameterSets...) - } - } else if rtsp.VSdp.Codec == "H265" { - rtsp.RtpVideo = rtsp.NewRTPVideo(12) - if len(rtsp.VSdp.VPS) > 0 { - rtsp.RtpVideo.PushNalu(0, 0, rtsp.VSdp.VPS, rtsp.VSdp.SPS, rtsp.VSdp.PPS) - } - } -} -func (rtsp *RTSP) setAudioTrack() { - var at *RTPAudio - if len(rtsp.ASdp.Config) > 0 { - at = rtsp.NewRTPAudio(0) - at.SetASC(rtsp.ASdp.Config) - } else { - switch rtsp.ASdp.Codec { - case "AAC": - at = rtsp.NewRTPAudio(10) - case "PCMA": - at = rtsp.NewRTPAudio(7) - at.SoundRate = rtsp.ASdp.TimeScale - at.SoundSize = 16 - at.Channels = 1 - at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)} - case "PCMU": - at = rtsp.NewRTPAudio(8) - at.SoundRate = rtsp.ASdp.TimeScale - at.SoundSize = 16 - at.Channels = 1 - at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)} - default: - Printf("rtsp audio codec not support:%s", rtsp.ASdp.Codec) - return - } - } - rtsp.RtpAudio = at -} - -type RTSPClientInfo struct { - Agent string - Session string - authLine string - Seq int -} -type RichConn struct { - net.Conn - timeout time.Duration -} - -func (conn *RichConn) Read(b []byte) (n int, err error) { - if conn.timeout > 0 { - conn.Conn.SetReadDeadline(time.Now().Add(conn.timeout)) - } else { - var t time.Time - conn.Conn.SetReadDeadline(t) - } - return conn.Conn.Read(b) -} - -func (conn *RichConn) Write(b []byte) (n int, err error) { - if conn.timeout > 0 { - conn.Conn.SetWriteDeadline(time.Now().Add(conn.timeout)) - } else { - var t time.Time - conn.Conn.SetWriteDeadline(t) - } - return conn.Conn.Write(b) + return s.StartAndWait(addr) } diff --git a/payloader.go b/payloader.go new file mode 100644 index 0000000..d408dcb --- /dev/null +++ b/payloader.go @@ -0,0 +1,24 @@ +package rtsp + +// AACayloader payloads AAC packets +type AACPayloader struct{} + +// Payload fragments an AAC packet across one or more byte arrays +func (p *AACPayloader) Payload(mtu uint16, payload []byte) [][]byte { + var out [][]byte + o := make([]byte, len(payload)+4) + //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 + o[0] = 0x00 //高位 + o[1] = 0x10 //低位 + //AU_HEADER + o[2] = (byte)((len(payload) & 0x1fe0) >> 5) //高位 + o[3] = (byte)((len(payload) & 0x1f) << 3) //低位 + copy(o[4:], payload) + return append(out, o) +} + +type H265Payloader struct{} + +func (p *H265Payloader) Payload(mtu uint16, payload []byte) [][]byte { + return nil +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..c9f70ca --- /dev/null +++ b/publisher.go @@ -0,0 +1,115 @@ +package rtsp + +import ( + "encoding/base64" + "encoding/hex" + "strconv" + "strings" + + . "github.com/Monibuca/engine/v3" + . "github.com/Monibuca/utils/v3" + "github.com/aler9/gortsplib" +) + +type RTSPublisher struct { + *Stream `json:"-"` + URL string + stream *gortsplib.ServerStream + processFunc []func([]byte) +} + +func (p *RTSPublisher) setTracks(tracks gortsplib.Tracks) { + for i, track := range tracks { + v, ok := track.Media.Attribute("rtpmap") + if !ok { + continue + } + fmtp := make(map[string]string) + if v, ok := track.Media.Attribute("fmtp"); ok { + if tmp := strings.SplitN(v, " ", 2); len(tmp) == 2 { + for _, kv := range strings.Split(tmp[1], ";") { + kv = strings.Trim(kv, " ") + + if len(kv) == 0 { + continue + } + tmp := strings.SplitN(kv, "=", 2) + if len(tmp) == 2 { + fmtp[strings.TrimSpace(tmp[0])] = tmp[1] + } + } + } + } + + v = strings.TrimSpace(v) + vals := strings.Split(v, " ") + if len(vals) != 2 { + continue + } + timeScale := 0 + keyval := strings.Split(vals[1], "/") + if i, err := strconv.Atoi(keyval[1]); err == nil { + timeScale = i + } + if len(keyval) >= 2 { + Printf("track %d is %s",i,keyval[0]) + switch strings.ToLower(keyval[0]) { + case "h264": + vt := p.NewRTPVideo(7) + if conf, err := track.ExtractConfigH264(); err == nil { + vt.PushNalu(0, 0, conf.SPS, conf.PPS) + } + p.processFunc = append(p.processFunc, vt.Push) + case "h265", "hevc": + vt := p.NewRTPVideo(12) + if v, ok := fmtp["sprop-vps"]; ok { + vps, _ := base64.StdEncoding.DecodeString(v) + vt.PushNalu(0, 0, vps) + } + if v, ok := fmtp["sprop-sps"]; ok { + sps, _ := base64.StdEncoding.DecodeString(v) + vt.PushNalu(0, 0, sps) + } + if v, ok := fmtp["sprop-pps"]; ok { + pps, _ := base64.StdEncoding.DecodeString(v) + vt.PushNalu(0, 0, pps) + } + p.processFunc = append(p.processFunc, vt.Push) + case "pcma": + at := p.NewRTPAudio(7) + at.SoundRate = timeScale + at.SoundSize = 16 + if len(keyval) >= 3 { + x, _ := strconv.Atoi(keyval[2]) + at.Channels = byte(x) + } else { + at.Channels = 1 + } + at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)} + p.processFunc = append(p.processFunc, at.Push) + case "pcmu": + at := p.NewRTPAudio(8) + at.SoundRate = timeScale + at.SoundSize = 16 + if len(keyval) >= 3 { + x, _ := strconv.Atoi(keyval[2]) + at.Channels = byte(x) + } else { + at.Channels = 1 + } + at.ExtraData = []byte{(at.CodecID << 4) | (1 << 1)} + p.processFunc = append(p.processFunc, at.Push) + case "mpeg4-generic": + at := p.NewRTPAudio(0) + if config, ok := fmtp["config"]; ok { + asc, _ := hex.DecodeString(config) + at.SetASC(asc) + } else { + Println("aac no config") + } + at.SoundSize = 16 + p.processFunc = append(p.processFunc, at.Push) + } + } + } +} diff --git a/request.go b/request.go deleted file mode 100644 index 759adad..0000000 --- a/request.go +++ /dev/null @@ -1,100 +0,0 @@ -package rtsp - -import ( - "fmt" - "log" - "regexp" - "strconv" - "strings" -) - -const ( - RTSP_VERSION = "RTSP/1.0" -) - -const ( - // Client to server for presentation and stream objects; recommended - DESCRIBE = "DESCRIBE" - // Bidirectional for client and stream objects; optional - ANNOUNCE = "ANNOUNCE" - // Bidirectional for client and stream objects; optional - GET_PARAMETER = "GET_PARAMETER" - // Bidirectional for client and stream objects; required for Client to server, optional for server to client - OPTIONS = "OPTIONS" - // Client to server for presentation and stream objects; recommended - PAUSE = "PAUSE" - // Client to server for presentation and stream objects; required - PLAY = "PLAY" - // Client to server for presentation and stream objects; optional - RECORD = "RECORD" - // Server to client for presentation and stream objects; optional - REDIRECT = "REDIRECT" - // Client to server for stream objects; required - SETUP = "SETUP" - // Bidirectional for presentation and stream objects; optional - SET_PARAMETER = "SET_PARAMETER" - // Client to server for presentation and stream objects; required - TEARDOWN = "TEARDOWN" - DATA = "DATA" -) - -type Request struct { - Method string - URL string - Version string - Header map[string]string - Content string - Body string -} - -func NewRequest(content string) *Request { - lines := strings.Split(strings.TrimSpace(content), "\r\n") - if len(lines) == 0 { - return nil - } - items := regexp.MustCompile("\\s+").Split(strings.TrimSpace(lines[0]), -1) - if len(items) < 3 { - return nil - } - if !strings.HasPrefix(items[2], "RTSP") { - log.Printf("invalid rtsp request, line[0] %s", lines[0]) - return nil - } - header := make(map[string]string) - for i := 1; i < len(lines); i++ { - line := strings.TrimSpace(lines[i]) - headerItems := regexp.MustCompile(":\\s+").Split(line, 2) - if len(headerItems) < 2 { - continue - } - header[headerItems[0]] = headerItems[1] - } - return &Request{ - Method: items[0], - URL: items[1], - Version: items[2], - Header: header, - Content: content, - Body: "", - } -} - -func (r *Request) String() string { - str := fmt.Sprintf("%s %s %s\r\n", r.Method, r.URL, r.Version) - for key, value := range r.Header { - str += fmt.Sprintf("%s: %s\r\n", key, value) - } - str += "\r\n" - str += r.Body - return str -} - -func (r *Request) GetContentLength() int { - v, err := strconv.ParseInt(r.Header["Content-Length"], 10, 64) - if err != nil { - return 0 - } else { - return int(v) - } -} - diff --git a/response.go b/response.go deleted file mode 100644 index 6b4c195..0000000 --- a/response.go +++ /dev/null @@ -1,51 +0,0 @@ -package rtsp - -import ( - "fmt" - "strconv" -) - -type Response struct { - Version string - StatusCode int - Status string - Header map[string]interface{} - Body string -} - -func NewResponse(statusCode int, status, cSeq, sid, body string) *Response { - res := &Response{ - Version: RTSP_VERSION, - StatusCode: statusCode, - Status: status, - Header: map[string]interface{}{"CSeq": cSeq, "Session": sid}, - Body: body, - } - len := len(body) - if len > 0 { - res.Header["Content-Length"] = strconv.Itoa(len) - } else { - delete(res.Header, "Content-Length") - } - return res -} - -func (r *Response) String() string { - str := fmt.Sprintf("%s %d %s\r\n", r.Version, r.StatusCode, r.Status) - for key, value := range r.Header { - str += fmt.Sprintf("%s: %s\r\n", key, value) - } - str += "\r\n" - str += r.Body - return str -} - -func (r *Response) SetBody(body string) { - len := len(body) - r.Body = body - if len > 0 { - r.Header["Content-Length"] = strconv.Itoa(len) - } else { - delete(r.Header, "Content-Length") - } -} diff --git a/rtp-parser.go b/rtp-parser.go deleted file mode 100644 index a512491..0000000 --- a/rtp-parser.go +++ /dev/null @@ -1,68 +0,0 @@ -package rtsp - -import ( - "encoding/binary" -) - -const ( - RTP_FIXED_HEADER_LENGTH = 12 -) - -type RTPInfo struct { - Version int - Padding bool - Extension bool - CSRCCnt int - Marker bool - PayloadType int - SequenceNumber int - Timestamp int - SSRC int - Payload []byte - PayloadOffset int -} - -func ParseRTP(rtpBytes []byte) *RTPInfo { - if len(rtpBytes) < RTP_FIXED_HEADER_LENGTH { - return nil - } - firstByte := rtpBytes[0] - secondByte := rtpBytes[1] - info := &RTPInfo{ - Version: int(firstByte >> 6), - Padding: (firstByte>>5)&1 == 1, - Extension: (firstByte>>4)&1 == 1, - CSRCCnt: int(firstByte & 0x0f), - - Marker: secondByte>>7 == 1, - PayloadType: int(secondByte & 0x7f), - SequenceNumber: int(binary.BigEndian.Uint16(rtpBytes[2:])), - Timestamp: int(binary.BigEndian.Uint32(rtpBytes[4:])), - SSRC: int(binary.BigEndian.Uint32(rtpBytes[8:])), - } - offset := RTP_FIXED_HEADER_LENGTH - end := len(rtpBytes) - if end-offset >= 4*info.CSRCCnt { - offset += 4 * info.CSRCCnt - } - if info.Extension && end-offset >= 4 { - extLen := 4 * int(binary.BigEndian.Uint16(rtpBytes[offset+2:])) - offset += 4 - if end-offset >= extLen { - offset += extLen - } - } - if info.Padding && end-offset > 0 { - paddingLen := int(rtpBytes[end-1]) - if end-offset >= paddingLen { - end -= paddingLen - } - } - info.Payload = rtpBytes[offset:end] - info.PayloadOffset = offset - if end-offset < 1 { - return nil - } - - return info -} diff --git a/sdp-parser.go b/sdp-parser.go deleted file mode 100644 index 944a790..0000000 --- a/sdp-parser.go +++ /dev/null @@ -1,111 +0,0 @@ -package rtsp - -import ( - "encoding/base64" - "encoding/hex" - "strconv" - "strings" -) - -type SDPInfo struct { - AVType string - Codec string - TimeScale int - Control string - Rtpmap int - Config []byte - SpropParameterSets [][]byte - VPS []byte - PPS []byte - SPS []byte - PayloadType int - SizeLength int - IndexLength int -} - -func ParseSDP(sdpRaw string) map[string]*SDPInfo { - sdpMap := make(map[string]*SDPInfo) - var info *SDPInfo - for _, line := range strings.Split(sdpRaw, "\n") { - line = strings.TrimSpace(line) - typeval := strings.SplitN(line, "=", 2) - if len(typeval) == 2 { - fields := strings.SplitN(typeval[1], " ", 2) - switch typeval[0] { - case "m": - if len(fields) > 0 { - info = &SDPInfo{AVType: fields[0]} - sdpMap[info.AVType] = info - mfields := strings.Split(fields[1], " ") - if len(mfields) >= 3 { - info.PayloadType, _ = strconv.Atoi(mfields[2]) - } - } - case "a": - if info != nil { - for _, field := range fields { - keyval := strings.SplitN(field, ":", 2) - if len(keyval) >= 2 { - key := keyval[0] - val := keyval[1] - switch key { - case "control": - info.Control = val - case "rtpmap": - info.Rtpmap, _ = strconv.Atoi(val) - } - } - keyval = strings.Split(field, "/") - if len(keyval) >= 2 { - switch keyval[0] { - case "h264", "h265", "pcma", "pcmu": - info.Codec = strings.ToUpper(keyval[0]) - case "H264", "H265", "PCMA", "PCMU": - info.Codec = keyval[0] - case "HEVC": - info.Codec = "H265" - case "MPEG4-GENERIC": - info.Codec = "AAC" - } - if i, err := strconv.Atoi(keyval[1]); err == nil { - info.TimeScale = i - } - } - keyval = strings.Split(field, ";") - if len(keyval) > 1 { - for _, field := range keyval { - keyval := strings.SplitN(field, "=", 2) - if len(keyval) == 2 { - key := strings.TrimSpace(keyval[0]) - val := keyval[1] - switch key { - case "config": - info.Config, _ = hex.DecodeString(val) - case "sizelength": - info.SizeLength, _ = strconv.Atoi(val) - case "indexlength": - info.IndexLength, _ = strconv.Atoi(val) - case "sprop-vps": - info.VPS, _ = base64.StdEncoding.DecodeString(val) - case "sprop-sps": - info.SPS, _ = base64.StdEncoding.DecodeString(val) - case "sprop-pps": - info.PPS, _ = base64.StdEncoding.DecodeString(val) - case "sprop-parameter-sets": - fields := strings.Split(val, ",") - for _, field := range fields { - val, _ := base64.StdEncoding.DecodeString(field) - info.SpropParameterSets = append(info.SpropParameterSets, val) - } - } - } - } - } - } - } - - } - } - } - return sdpMap -} diff --git a/server.go b/server.go new file mode 100644 index 0000000..6021b00 --- /dev/null +++ b/server.go @@ -0,0 +1,250 @@ +package rtsp + +import ( + "fmt" + "sync" + "unsafe" + + "github.com/Monibuca/engine/v3" + . "github.com/Monibuca/utils/v3" + "github.com/Monibuca/utils/v3/codec" + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/aac" + "github.com/aler9/gortsplib/pkg/base" + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" +) + +// 接收RTSP推流:OnConnOpen->OnAnnounce->OnSetup->OnSessionOpen +// 接收RTSP拉流:OnConnOpen->OnDescribe->OnSetup->OnSessionOpen +type RTSPServer struct { + sync.Map +} +type RTSPSubscriber struct { + stream *gortsplib.ServerStream + engine.Subscriber + vt *engine.VideoTrack + at *engine.AudioTrack +} + +// called after a connection is opened. +func (sh *RTSPServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + Printf("rtsp conn opened") +} + +// called after a connection is closed. +func (sh *RTSPServer) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + Printf("rtsp conn closed (%v)", ctx.Error) + if p, ok := sh.Load(ctx.Conn); ok { + switch v := p.(type) { + case *RTSPublisher: + v.Close() + case *RTSPSubscriber: + v.Close() + } + sh.Delete(ctx.Conn) + } +} + +// called after a session is opened. +func (sh *RTSPServer) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + Printf("rtsp session opened") +} + +// called after a session is closed. +func (sh *RTSPServer) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + Printf("rtsp session closed") + if v, ok := sh.LoadAndDelete(ctx.Session); ok { + switch v := v.(type) { + case *RTSPublisher: + v.Close() + case *RTSPSubscriber: + v.Close() + } + } +} + +// called after receiving a DESCRIBE request. +func (sh *RTSPServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + Printf("describe request") + var err error + if s := engine.FindStream(ctx.Path); s != nil { + var tracks gortsplib.Tracks + var stream *gortsplib.ServerStream + var sub RTSPSubscriber + sub.Type = "RTSP pull" + sub.vt = s.WaitVideoTrack("h264", "h265") + sub.at = s.WaitAudioTrack("aac", "pcma", "pcmu") + ssrc := uintptr(unsafe.Pointer(stream)) + var trackIds = 0 + if sub.vt != nil { + trackId := trackIds + var vtrack *gortsplib.Track + var vpacketer rtp.Packetizer + switch sub.vt.CodecID { + case codec.CodecID_H264: + if vtrack, err = gortsplib.NewTrackH264(96, &gortsplib.TrackConfigH264{ + SPS: sub.vt.ExtraData.NALUs[0], + PPS: sub.vt.ExtraData.NALUs[1], + }); err == nil { + vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000) + } else { + return nil, nil, err + } + case codec.CodecID_H265: + vtrack = NewH265Track(96, sub.vt.ExtraData.NALUs) + vpacketer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &H265Payloader{}, rtp.NewFixedSequencer(1), 90000) + } + var st uint32 + onVideo := func(ts uint32, pack *engine.VideoPack) { + for _, nalu := range pack.NALUs { + for _, pack := range vpacketer.Packetize(nalu, (ts-st)*90) { + rtp, _ := pack.Marshal() + stream.WriteFrame(trackId, gortsplib.StreamTypeRTP, rtp) + } + } + st = ts + } + sub.OnVideo = func(ts uint32, pack *engine.VideoPack) { + if st = ts; st != 0 { + sub.OnVideo = onVideo + } + onVideo(ts, pack) + } + tracks = append(tracks, vtrack) + trackIds++ + } + if sub.at != nil { + var st uint32 + trackId := trackIds + switch sub.at.CodecID { + case codec.CodecID_PCMA, codec.CodecID_PCMU: + atrack := NewG711Track(97, map[byte]string{7: "pcma", 8: "pcmu"}[sub.vt.CodecID]) + apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000) + sub.OnAudio = func(ts uint32, pack *engine.AudioPack) { + for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*8) { + buf, _ := pack.Marshal() + stream.WriteFrame(trackId, gortsplib.StreamTypeRTP, buf) + } + st = ts + } + tracks = append(tracks, atrack) + case codec.CodecID_AAC: + var mpegConf aac.MPEG4AudioConfig + mpegConf.Decode(sub.at.ExtraData[2:]) + conf := &gortsplib.TrackConfigAAC{ + Type: int(mpegConf.Type), + SampleRate: mpegConf.SampleRate, + ChannelCount: mpegConf.ChannelCount, + AOTSpecificConfig: mpegConf.AOTSpecificConfig, + } + if atrack, err := gortsplib.NewTrackAAC(97, conf); err == nil { + apacketizer := rtp.NewPacketizer(1200, 97, uint32(ssrc), &AACPayloader{}, rtp.NewFixedSequencer(1), uint32(mpegConf.SampleRate)) + sub.OnAudio = func(ts uint32, pack *engine.AudioPack) { + for _, pack := range apacketizer.Packetize(pack.Raw, (ts-st)*uint32(mpegConf.SampleRate)/1000) { + buf, _ := pack.Marshal() + stream.WriteFrame(trackId, gortsplib.StreamTypeRTP, buf) + } + st = ts + } + tracks = append(tracks, atrack) + } + } + } + stream = gortsplib.NewServerStream(tracks) + sub.stream = stream + sh.Store(ctx.Conn, &sub) + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + // if stream, ok := s.ExtraProp.(*gortsplib.ServerStream); ok { + // return &base.Response{ + // StatusCode: base.StatusOK, + // }, stream, nil + // } + } + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil +} + +// called after receiving an ANNOUNCE request. +func (sh *RTSPServer) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { + Printf("announce request") + p := &RTSPublisher{ + Stream: &engine.Stream{ + StreamPath: ctx.Path, + Type: "RTSP push", + }, + } + p.URL = ctx.Req.URL.String() + if p.Publish() { + p.setTracks(ctx.Tracks) + p.stream = gortsplib.NewServerStream(ctx.Tracks) + sh.Store(ctx.Conn, p) + sh.Store(ctx.Session, p) + } else { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("streamPath is already exist") + } + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +// called after receiving a SETUP request. +func (sh *RTSPServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + Printf("setup request") + if p, ok := sh.Load(ctx.Conn); ok { + switch v := p.(type) { + case *RTSPublisher: + return &base.Response{ + StatusCode: base.StatusOK, + }, v.stream, nil + case *RTSPSubscriber: + return &base.Response{ + StatusCode: base.StatusOK, + }, v.stream, nil + } + } + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil +} + +// called after receiving a PLAY request. +func (sh *RTSPServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + Printf("play request") + if p, ok := sh.Load(ctx.Conn); ok { + if sub := p.(*RTSPSubscriber); sub.Subscribe(ctx.Path) == nil { + go func() { + sub.Play(sub.at, sub.vt) + ctx.Conn.Close() + }() + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + } + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil +} + +// called after receiving a RECORD request. +func (sh *RTSPServer) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { + Printf("record request") + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +// called after receiving a frame. +func (sh *RTSPServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { + if p, ok := sh.Load(ctx.Session); ok { + if ctx.StreamType == gortsplib.StreamTypeRTP { + p.(*RTSPublisher).processFunc[ctx.TrackID](ctx.Payload) + } + } +} diff --git a/session.go b/session.go deleted file mode 100644 index 9ad2897..0000000 --- a/session.go +++ /dev/null @@ -1,724 +0,0 @@ -package rtsp - -import ( - "bytes" - "crypto/md5" - "encoding/base64" - "encoding/binary" - "fmt" - "io" - "net/url" - "regexp" - "strconv" - "strings" - "time" - "unsafe" - - . "github.com/Monibuca/engine/v3" - . "github.com/Monibuca/utils/v3" - "github.com/pion/rtp" - "github.com/pion/rtp/codecs" - "github.com/teris-io/shortid" -) - -type RTPPack struct { - Type RTPType - Raw []byte -} -type SessionType int - -const ( - SESSION_TYPE_PUSHER SessionType = iota - SESSEION_TYPE_PLAYER -) - -func (st SessionType) String() string { - switch st { - case SESSION_TYPE_PUSHER: - return "pusher" - case SESSEION_TYPE_PLAYER: - return "player" - } - return "unknow" -} - -type RTPType int - -const ( - RTP_TYPE_AUDIO RTPType = iota - RTP_TYPE_VIDEO - RTP_TYPE_AUDIOCONTROL - RTP_TYPE_VIDEOCONTROL -) - -type TransType int - -const ( - TRANS_TYPE_TCP TransType = iota - TRANS_TYPE_UDP -) - -func (tt TransType) String() string { - switch tt { - case TRANS_TYPE_TCP: - return "TCP" - case TRANS_TYPE_UDP: - return "UDP" - } - return "unknow" -} - -const UDP_BUF_SIZE = 1048576 - -func (session *RTSP) SessionString() string { - return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.StreamPath, session.ID, session.Conn.RemoteAddr().String()) -} - -func (session *RTSP) Stop() { - if session.Stream != nil { - session.Close() - } - if session.Conn != nil { - session.connRW.Flush() - session.Conn.Close() - session.Conn = nil - } - if session.UDPClient != nil { - session.UDPClient.Stop() - session.UDPClient = nil - } - if session.UDPServer != nil { - session.UDPServer.Stop() - session.UDPServer = nil - } -} - -// AcceptPush 接受推流 -func (session *RTSP) AcceptPush() { - defer session.Stop() - buf2 := make([]byte, 2) - timer := time.Unix(0, 0) - for { - buf1, err := session.connRW.ReadByte() - if err != nil { - Println(err) - return - } - if buf1 == 0x24 { //rtp data - if buf1, err = session.connRW.ReadByte(); err != nil { - Println(err) - return - } - if _, err := io.ReadFull(session.connRW, buf2); err != nil { - Println(err) - return - } - channel := int(buf1) - rtpLen := int(binary.BigEndian.Uint16(buf2)) - rtpBytes := make([]byte, rtpLen) - if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil { - Println(err) - return - } - - // t := pack.Timestamp / 90 - switch channel { - case session.aRTPChannel: - // pack.Type = RTP_TYPE_AUDIO - if session.RtpAudio != nil { - elapsed := time.Since(timer) - if elapsed >= 30*time.Second { - Println("Recv an audio RTP package") - timer = time.Now() - } - session.RtpAudio.Push(rtpBytes) - } - case session.aRTPControlChannel: - // pack.Type = RTP_TYPE_AUDIOCONTROL - case session.vRTPChannel: - // pack.Type = RTP_TYPE_VIDEO - if session.RtpVideo != nil { - elapsed := time.Since(timer) - if elapsed >= 30*time.Second { - Println("Recv an video RTP package") - timer = time.Now() - } - session.RtpVideo.Push(rtpBytes) - } - case session.vRTPControlChannel: - // pack.Type = RTP_TYPE_VIDEOCONTROL - default: - // Printf("unknow rtp pack type, %v", pack.Type) - continue - } - session.InBytes += rtpLen + 4 - } else { // rtsp cmd - reqBuf := bytes.NewBuffer(nil) - reqBuf.WriteByte(buf1) - for { - if line, isPrefix, err := session.connRW.ReadLine(); err != nil { - Println(err) - return - } else { - reqBuf.Write(line) - if !isPrefix { - reqBuf.WriteString("\r\n") - } - if len(line) == 0 { - req := NewRequest(reqBuf.String()) - if req == nil { - break - } - session.InBytes += reqBuf.Len() - contentLen := req.GetContentLength() - session.InBytes += contentLen - if contentLen > 0 { - bodyBuf := make([]byte, contentLen) - if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil { - Println(err) - return - } else if n != contentLen { - Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n) - return - } - req.Body = string(bodyBuf) - } - session.handleRequest(req) - break - } - } - } - } - } -} - -func (session *RTSP) CheckAuth(authLine string, method string) error { - realmRex := regexp.MustCompile(`realm="(.*?)"`) - nonceRex := regexp.MustCompile(`nonce="(.*?)"`) - usernameRex := regexp.MustCompile(`username="(.*?)"`) - responseRex := regexp.MustCompile(`response="(.*?)"`) - uriRex := regexp.MustCompile(`uri="(.*?)"`) - - realm := "" - nonce := "" - username := "" - response := "" - uri := "" - result1 := realmRex.FindStringSubmatch(authLine) - if len(result1) == 2 { - realm = result1[1] - } else { - return fmt.Errorf("CheckAuth error : no realm found") - } - result1 = nonceRex.FindStringSubmatch(authLine) - if len(result1) == 2 { - nonce = result1[1] - } else { - return fmt.Errorf("CheckAuth error : no nonce found") - } - if session.nonce != nonce { - return fmt.Errorf("CheckAuth error : sessionNonce not same as nonce") - } - - result1 = usernameRex.FindStringSubmatch(authLine) - if len(result1) == 2 { - username = result1[1] - } else { - return fmt.Errorf("CheckAuth error : username not found") - } - - result1 = responseRex.FindStringSubmatch(authLine) - if len(result1) == 2 { - response = result1[1] - } else { - return fmt.Errorf("CheckAuth error : response not found") - } - - result1 = uriRex.FindStringSubmatch(authLine) - if len(result1) == 2 { - uri = result1[1] - } else { - return fmt.Errorf("CheckAuth error : uri not found") - } - // var user models.User - // err := db.SQLite.Where("Username = ?", username).First(&user).Error - // if err != nil { - // return fmt.Errorf("CheckAuth error : user not exists") - // } - md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, session.Auth(username))))) - md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri)))) - myResponse := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL)))) - if myResponse != response { - return fmt.Errorf("CheckAuth error : response not equal") - } - return nil -} - -func (session *RTSP) handleRequest(req *Request) { - //if session.Timeout > 0 { - // session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second)) - //} - Printf("<<<\n%s", req) - res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "") - var streamPath string - defer func() { - if p := recover(); p != nil { - Printf("handleRequest err ocurs:%v", p) - res.StatusCode = 500 - res.Status = fmt.Sprintf("Inner Server Error, %v", p) - } - Printf(">>>\n%s", res) - outBytes := []byte(res.String()) - session.connWLock.Lock() - session.connRW.Write(outBytes) - session.connRW.Flush() - session.connWLock.Unlock() - session.OutBytes += len(outBytes) - switch req.Method { - case "PLAY", "RECORD": - switch session.Type { - case SESSEION_TYPE_PLAYER: - sub := Subscriber{ - ID: session.ID, - Type: "RTSP", - } - if sub.Subscribe(streamPath) == nil { - at, vt := session.UDPClient.AT, session.UDPClient.VT - if vt != nil { - var st uint32 - onVideo := func(ts uint32, pack *VideoPack) { - if session.UDPClient == nil { - return - } - for _, nalu := range pack.NALUs { - for _, pack := range session.UDPClient.VPacketizer.Packetize(nalu, (ts-st)*90) { - p := &RTPPack{ - Type: RTP_TYPE_VIDEO, - } - p.Raw, _ = pack.Marshal() - session.SendRTP(p) - } - } - st = ts - } - sub.OnVideo = func(ts uint32, pack *VideoPack) { - if st = ts; st != 0 { - sub.OnVideo = onVideo - } - onVideo(ts, pack) - } - } - if at != nil { - tb := uint32(at.SoundRate / 1000) - var st uint32 - onAudio := func(ts uint32, pack *AudioPack) { - if session.UDPClient == nil { - return - } - for _, pack := range session.UDPClient.APacketizer.Packetize(pack.Payload, (ts-st)*tb) { - p := &RTPPack{ - Type: RTP_TYPE_VIDEO, - } - p.Raw, _ = pack.Marshal() - session.SendRTP(p) - } - st = ts - } - sub.OnAudio = func(ts uint32, pack *AudioPack) { - if st = ts; st != 0 { - sub.OnAudio = onAudio - } - onAudio(ts, pack) - } - } - go sub.Play(at, vt) - } - // if session.Pusher.HasPlayer(session.Player) { - // session.Player.Pause(false) - // } else { - // session.Pusher.AddPlayer(session.Player) - // } - } - case "TEARDOWN": - { - session.Stop() - return - } - } - if res.StatusCode != 200 && res.StatusCode != 401 { - Printf("Response request error[%d]. stop session.", res.StatusCode) - session.Stop() - } - }() - session.URL = req.URL - _url, err := url.Parse(req.URL) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid URL" - return - } - streamPath = strings.TrimPrefix(_url.Path, "/") - if req.Method != "OPTIONS" { - if session.Auth != nil { - authLine := req.Header["Authorization"] - authFailed := true - if authLine != "" { - err := session.CheckAuth(authLine, req.Method) - if err == nil { - authFailed = false - } else { - Printf("%v", err) - } - } - if authFailed { - res.StatusCode = 401 - res.Status = "Unauthorized" - nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate()))) - session.nonce = nonce - res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="Monibuca", nonce="%s", algorithm="MD5"`, nonce) - return - } - } - } - switch req.Method { - case "OPTIONS": - res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD" - case "ANNOUNCE": - session.Type = SESSION_TYPE_PUSHER - session.SDPRaw = req.Body - session.SDPMap = ParseSDP(req.Body) - if session.Stream = Publish(streamPath, "RTSP"); session.Stream != nil { - if session.ASdp, session.HasAudio = session.SDPMap["audio"]; session.HasAudio { - session.setAudioTrack() - Printf("audio codec[%s]\n", session.ASdp.Codec) - } - if session.VSdp, session.HasVideo = session.SDPMap["video"]; session.HasVideo { - session.setVideoTrack() - Printf("video codec[%s]\n", session.VSdp.Codec) - } - session.Stream.Type = "RTSP" - } - case "DESCRIBE": - session.Type = SESSEION_TYPE_PLAYER - stream := FindStream(streamPath) - if stream == nil { - res.StatusCode = 404 - res.Status = "No Such Stream:" + streamPath - return - } - sdpInfo := []string{ - "v=0", - fmt.Sprintf("o=%s 0 0 IN IP4 %d", session.ID, 0), - "s=monibuca", - "t=0 0", - "a=recvonly", - } - ssrc := uintptr(unsafe.Pointer(stream)) - if session.UDPClient == nil { - session.UDPClient = &UDPClient{ - Conn: session.Conn.Conn, - } - } - vt, at := stream.WaitVideoTrack(), stream.WaitAudioTrack() - if vt != nil { - session.UDPClient.VT = vt - sdpInfo = append(sdpInfo, "m=video 0 RTP/AVP 96") - switch vt.CodecID { - case 7: - sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0]) - pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1]) - session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000) - sdpInfo = append(sdpInfo, "a=rtpmap:96 H264/90000", - fmt.Sprintf("a=fmtp:96 profile-level-id=%02X00%02X; packetization-mode=1; sprop-parameter-sets=%s,%s", vt.SPSInfo.ProfileIdc, vt.SPSInfo.LevelIdc*10, sps, pps)) - case 12: - vps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0]) - sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1]) - pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[2]) - // TODO: - // session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H265Payloader{}, rtp.NewFixedSequencer(1), 90000) - sdpInfo = append(sdpInfo, "a=rtpmap:96 H265/90000", - fmt.Sprintf("a=fmtp:96 packetization-mode=1;sprop-vps=%s;sprop-sps=%s;sprop-pps=%s", vps, sps, pps)) - } - } - if at != nil { - sdpInfo = append(sdpInfo, "m=audio 0 RTP/AVP 97") - switch at.CodecID { - case 7: - sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMA/8000") - session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000) - session.UDPClient.AT = at - case 8: - sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMU/8000") - session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000) - session.UDPClient.AT = at - case 10: - // TODO: - sdpInfo = append(sdpInfo, fmt.Sprintf("a=rtpmap:97 MPEG4-GENERIC/%d/%d", at.SoundRate, at.Channels)) - session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &AACPayloader{}, rtp.NewFixedSequencer(1), uint32(at.SoundRate)) - session.UDPClient.AT = at - } - } - session.SDPRaw = strings.Join(sdpInfo, "\r\n") + "\r\n" - res.SetBody(session.SDPRaw) - case "SETUP": - ts := req.Header["Transport"] - // control字段可能是`stream=1`字样,也可能是rtsp://...字样。即control可能是url的path,也可能是整个url - // 例1: - // a=control:streamid=1 - // 例2: - // a=control:rtsp://192.168.1.64/trackID=1 - // 例3: - // a=control:?ctype=video - if _url.Port() == "" { - _url.Host = fmt.Sprintf("%s:554", _url.Host) - } - setupPath := _url.String() - - // error status. SETUP without ANNOUNCE or DESCRIBE. - //if session.Pusher == nil { - // res.StatusCode = 500 - // res.Status = "Error Status" - // return - //} - var vPath, aPath string - if session.HasVideo { - if strings.Index(strings.ToLower(session.VSdp.Control), "rtsp://") == 0 { - vControlUrl, err := url.Parse(session.VSdp.Control) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid VControl" - return - } - if vControlUrl.Port() == "" { - vControlUrl.Host = fmt.Sprintf("%s:554", vControlUrl.Host) - } - vPath = vControlUrl.String() - } else { - vPath = session.VSdp.Control - } - } - if session.HasAudio { - if strings.Index(strings.ToLower(session.ASdp.Control), "rtsp://") == 0 { - aControlUrl, err := url.Parse(session.ASdp.Control) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid AControl" - return - } - if aControlUrl.Port() == "" { - aControlUrl.Host = fmt.Sprintf("%s:554", aControlUrl.Host) - } - aPath = aControlUrl.String() - } else { - aPath = session.ASdp.Control - } - } - - mtcp := regexp.MustCompile("interleaved=(\\d+)(-(\\d+))?") - mudp := regexp.MustCompile("client_port=(\\d+)(-(\\d+))?") - - if tcpMatchs := mtcp.FindStringSubmatch(ts); tcpMatchs != nil { - session.TransType = TRANS_TYPE_TCP - if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { - session.aRTPChannel, _ = strconv.Atoi(tcpMatchs[1]) - session.aRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3]) - } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { - session.vRTPChannel, _ = strconv.Atoi(tcpMatchs[1]) - session.vRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3]) - } else { - res.StatusCode = 500 - res.Status = fmt.Sprintf("SETUP [TCP] got UnKown control:%s", setupPath) - Printf("SETUP [TCP] got UnKown control:%s", setupPath) - } - Printf("Parse SETUP req.TRANSPORT:TCP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) - } else if udpMatchs := mudp.FindStringSubmatch(ts); udpMatchs != nil { - session.TransType = TRANS_TYPE_UDP - // no need for tcp timeout. - session.Conn.timeout = 0 - if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil { - session.UDPClient = &UDPClient{} - } - if session.Type == SESSION_TYPE_PUSHER && session.UDPServer == nil { - session.UDPServer = &UDPServer{ - Session: session, - } - } - Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) - if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { - if session.Type == SESSEION_TYPE_PLAYER { - session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupAudio(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup audio error, %v", err) - return - } - } - if session.Type == SESSION_TYPE_PUSHER { - if err := session.UDPServer.SetupAudio(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp server setup audio error, %v", err) - return - } - tss := strings.Split(ts, ";") - idx := -1 - for i, val := range tss { - if val == udpMatchs[0] { - idx = i - } - } - tail := append([]string{}, tss[idx+1:]...) - tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.APort, session.UDPServer.AControlPort)) - tss = append(tss, tail...) - ts = strings.Join(tss, ";") - } - } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { - if session.Type == SESSEION_TYPE_PLAYER { - session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupVideo(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup video error, %v", err) - return - } - } - - if session.Type == SESSION_TYPE_PUSHER { - if err := session.UDPServer.SetupVideo(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp server setup video error, %v", err) - return - } - tss := strings.Split(ts, ";") - idx := -1 - for i, val := range tss { - if val == udpMatchs[0] { - idx = i - } - } - tail := append([]string{}, tss[idx+1:]...) - tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.UDPServer.VPort, session.UDPServer.VControlPort)) - tss = append(tss, tail...) - ts = strings.Join(tss, ";") - } - } else { - if session.Type == SESSEION_TYPE_PLAYER { - if session.UDPClient.VPort == 0 { - session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupVideo(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup video error, %v", err) - return - } - } else { - session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupAudio(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup audio error, %v", err) - return - } - } - } - Printf("SETUP [UDP] got UnKown control:%s", setupPath) - } - } - res.Header["Transport"] = ts - case "PLAY": - // error status. PLAY without ANNOUNCE or DESCRIBE. - // if session.Pusher == nil { - // res.StatusCode = 500 - // res.Status = "Error Status" - // return - // } - res.Header["Range"] = req.Header["Range"] - case "RECORD": - // error status. RECORD without ANNOUNCE or DESCRIBE. - // if session.Pusher == nil { - // res.StatusCode = 500 - // res.Status = "Error Status" - // return - // } - case "PAUSE": - // if session.Player == nil { - // res.StatusCode = 500 - // res.Status = "Error Status" - // return - // } - // session.Player.Pause(true) - } -} - -func (session *RTSP) SendRTP(pack *RTPPack) (err error) { - if pack == nil { - err = fmt.Errorf("player send rtp got nil pack") - return - } - if session.TransType == TRANS_TYPE_UDP { - if session.UDPClient == nil { - err = fmt.Errorf("player use udp transport but udp client not found") - return - } - err = session.UDPClient.SendRTP(pack) - session.OutBytes += len(pack.Raw) - return - } - switch pack.Type { - case RTP_TYPE_AUDIO: - bufChannel := make([]byte, 2) - bufChannel[0] = 0x24 - bufChannel[1] = byte(session.aRTPChannel) - session.connWLock.Lock() - session.connRW.Write(bufChannel) - bufLen := make([]byte, 2) - binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw))) - session.connRW.Write(bufLen) - session.connRW.Write(pack.Raw) - session.connRW.Flush() - session.connWLock.Unlock() - session.OutBytes += len(pack.Raw) + 4 - case RTP_TYPE_AUDIOCONTROL: - bufChannel := make([]byte, 2) - bufChannel[0] = 0x24 - bufChannel[1] = byte(session.aRTPControlChannel) - session.connWLock.Lock() - session.connRW.Write(bufChannel) - bufLen := make([]byte, 2) - binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw))) - session.connRW.Write(bufLen) - session.connRW.Write(pack.Raw) - session.connRW.Flush() - session.connWLock.Unlock() - session.OutBytes += len(pack.Raw) + 4 - case RTP_TYPE_VIDEO: - bufChannel := make([]byte, 2) - bufChannel[0] = 0x24 - bufChannel[1] = byte(session.vRTPChannel) - session.connWLock.Lock() - session.connRW.Write(bufChannel) - bufLen := make([]byte, 2) - binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw))) - session.connRW.Write(bufLen) - session.connRW.Write(pack.Raw) - session.connRW.Flush() - session.connWLock.Unlock() - session.OutBytes += len(pack.Raw) + 4 - case RTP_TYPE_VIDEOCONTROL: - bufChannel := make([]byte, 2) - bufChannel[0] = 0x24 - bufChannel[1] = byte(session.vRTPControlChannel) - session.connWLock.Lock() - session.connRW.Write(bufChannel) - bufLen := make([]byte, 2) - binary.BigEndian.PutUint16(bufLen, uint16(len(pack.Raw))) - session.connRW.Write(bufLen) - session.connRW.Write(pack.Raw) - session.connRW.Flush() - session.connWLock.Unlock() - session.OutBytes += len(pack.Raw) + 4 - default: - err = fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type) - } - return -} diff --git a/track.go b/track.go new file mode 100644 index 0000000..e41aee8 --- /dev/null +++ b/track.go @@ -0,0 +1,87 @@ +package rtsp + +import ( + "encoding/base64" + "fmt" + "strconv" + + "github.com/aler9/gortsplib" + psdp "github.com/pion/sdp/v3" +) + +// func NewTrackAAC(payloadType uint8, conf *gortsplib.TrackConfigAAC) (*gortsplib.Track, error) { +// mpegConf, err := aac.MPEG4AudioConfig{ +// Type: aac.MPEG4AudioType(conf.Type), +// SampleRate: conf.SampleRate, +// ChannelCount: conf.ChannelCount, +// AOTSpecificConfig: conf.AOTSpecificConfig, +// }.Encode() +// if err != nil { +// return nil, err +// } + +// typ := strconv.FormatInt(int64(payloadType), 10) + +// return &gortsplib.Track{ +// Media: &psdp.MediaDescription{ +// MediaName: psdp.MediaName{ +// Media: "audio", +// Protos: []string{"RTP", "AVP"}, +// Formats: []string{typ}, +// }, +// Attributes: []psdp.Attribute{ +// { +// Key: "rtpmap", +// Value: typ + " mpeg4-generic/" + strconv.FormatInt(int64(conf.SampleRate), 10) + +// "/" + strconv.FormatInt(int64(conf.ChannelCount), 10), +// }, +// { +// Key: "fmtp", +// Value: typ + " profile-level-id=1; " + +// "mode=AAC-hbr; " + +// "sizelength=6; " + +// "indexlength=2; " + +// "indexdeltalength=2; " + +// "config=" + hex.EncodeToString(mpegConf), +// }, +// }, +// }, +// }, nil +// } +func NewG711Track(payloadType uint8, law string) *gortsplib.Track { + return &gortsplib.Track{ + Media: &psdp.MediaDescription{ + MediaName: psdp.MediaName{ + Media: "audio", + Protos: []string{"RTP", "AVP"}, + Formats: []string{strconv.FormatInt(int64(payloadType), 10)}}, + Attributes: []psdp.Attribute{ + { + Key: "rtpmap", + Value: fmt.Sprintf("%d %s/8000/1", payloadType, law), + }, + }, + }, + } +} +func NewH265Track(payloadType uint8, sprop [][]byte) *gortsplib.Track { + return &gortsplib.Track{ + Media: &psdp.MediaDescription{ + MediaName: psdp.MediaName{ + Media: "video", + Protos: []string{"RTP", "AVP"}, + Formats: []string{fmt.Sprintf("%d", payloadType)}, + }, + Attributes: []psdp.Attribute{ + { + Key: "rtpmap", + Value: fmt.Sprintf("%d H265/90000", payloadType), + }, + { + Key: "fmtp", + Value: fmt.Sprintf("%d packetization-mode=1;sprop-vps=%s;sprop-sps=%s;sprop-pps=%s;", payloadType, base64.StdEncoding.EncodeToString(sprop[0]), base64.StdEncoding.EncodeToString(sprop[1]), base64.StdEncoding.EncodeToString(sprop[2])), + }, + }, + }, + } +} diff --git a/udp-client.go b/udp-client.go deleted file mode 100644 index e917429..0000000 --- a/udp-client.go +++ /dev/null @@ -1,167 +0,0 @@ -package rtsp - -import ( - "fmt" - "net" - "strings" - - . "github.com/Monibuca/engine/v3" - . "github.com/Monibuca/utils/v3" - "github.com/pion/rtp" -) - -type UDPClient struct { - Conn net.Conn - APort int - AConn *net.UDPConn - AControlPort int - AControlConn *net.UDPConn - VPort int - VConn *net.UDPConn - VControlPort int - VControlConn *net.UDPConn - AT *AudioTrack - APacketizer rtp.Packetizer - VT *VideoTrack - VPacketizer rtp.Packetizer - Stoped bool -} - -func (s *UDPClient) Stop() { - if s.Stoped { - return - } - s.Stoped = true - if s.AConn != nil { - s.AConn.Close() - s.AConn = nil - } - if s.AControlConn != nil { - s.AControlConn.Close() - s.AControlConn = nil - } - if s.VConn != nil { - s.VConn.Close() - s.VConn = nil - } - if s.VControlConn != nil { - s.VControlConn.Close() - s.VControlConn = nil - } -} - -func (c *UDPClient) SetupAudio() (err error) { - defer func() { - if err != nil { - Println(err) - c.Stop() - } - }() - host := c.Conn.RemoteAddr().String() - host = host[:strings.LastIndex(host, ":")] - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort)) - if err != nil { - return - } - c.AConn, err = net.DialUDP("udp", nil, addr) - if err != nil { - return - } - networkBuffer := 1048576 - if err := c.AConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp client audio conn set read buffer error, %v", err) - } - if err := c.AConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp client audio conn set write buffer error, %v", err) - } - - addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.AControlPort)) - if err != nil { - return - } - c.AControlConn, err = net.DialUDP("udp", nil, addr) - if err != nil { - return - } - if err := c.AControlConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp client audio control conn set read buffer error, %v", err) - } - if err := c.AControlConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp client audio control conn set write buffer error, %v", err) - } - return -} - -func (c *UDPClient) SetupVideo() (err error) { - defer func() { - if err != nil { - Println(err) - c.Stop() - } - }() - host := c.Conn.RemoteAddr().String() - host = host[:strings.LastIndex(host, ":")] - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort)) - if err != nil { - return - } - c.VConn, err = net.DialUDP("udp", nil, addr) - if err != nil { - return - } - networkBuffer := 1048576 - if err := c.VConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp client video conn set read buffer error, %v", err) - } - if err := c.VConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp client video conn set write buffer error, %v", err) - } - - addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VControlPort)) - if err != nil { - return - } - c.VControlConn, err = net.DialUDP("udp", nil, addr) - if err != nil { - return - } - if err := c.VControlConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp client video control conn set read buffer error, %v", err) - } - if err := c.VControlConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp client video control conn set write buffer error, %v", err) - } - return -} - -func (c *UDPClient) SendRTP(pack *RTPPack) (err error) { - if pack == nil { - err = fmt.Errorf("udp client send rtp got nil pack") - return - } - var conn *net.UDPConn - switch pack.Type { - case RTP_TYPE_AUDIO: - conn = c.AConn - case RTP_TYPE_AUDIOCONTROL: - conn = c.AControlConn - case RTP_TYPE_VIDEO: - conn = c.VConn - case RTP_TYPE_VIDEOCONTROL: - conn = c.VControlConn - default: - err = fmt.Errorf("udp client send rtp got unkown pack type[%v]", pack.Type) - return - } - if conn == nil { - err = fmt.Errorf("udp client send rtp pack type[%v] failed, conn not found", pack.Type) - return - } - - if _, err = conn.Write(pack.Raw); err != nil { - err = fmt.Errorf("udp client write bytes error, %v", err) - return - } - // Printf("udp client write [%d/%d]", n, pack.Buffer.Len()) - return -} diff --git a/udp-server.go b/udp-server.go deleted file mode 100644 index bdc8561..0000000 --- a/udp-server.go +++ /dev/null @@ -1,219 +0,0 @@ -package rtsp - -import ( - "fmt" - "net" - "strconv" - "strings" - "sync" - - . "github.com/Monibuca/utils/v3" -) - -type UDPServer struct { - Session *RTSP - UDPClient - sync.Mutex -} - -func (s *UDPServer) AddInputBytes(bytes int) { - if s.Session != nil { - s.Session.InBytes += bytes - return - } - panic(fmt.Errorf("session and RTSPClient both nil")) -} - -func (s *UDPServer) Stop() { - if s.Stoped { - return - } - s.Stoped = true - if s.AConn != nil { - s.AConn.Close() - s.AConn = nil - } - if s.AControlConn != nil { - s.AControlConn.Close() - s.AControlConn = nil - } - if s.VConn != nil { - s.VConn.Close() - s.VConn = nil - } - if s.VControlConn != nil { - s.VControlConn.Close() - s.VControlConn = nil - } -} - -func (s *UDPServer) SetupAudio() (err error) { - addr, err := net.ResolveUDPAddr("udp", ":0") - if err != nil { - return - } - s.AConn, err = net.ListenUDP("udp", addr) - if err != nil { - return - } - networkBuffer := 1048576 - if err := s.AConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp server audio conn set read buffer error, %v", err) - } - if err := s.AConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp server audio conn set write buffer error, %v", err) - } - la := s.AConn.LocalAddr().String() - strPort := la[strings.LastIndex(la, ":")+1:] - s.APort, err = strconv.Atoi(strPort) - if err != nil { - return - } - go func() { - bufUDP := make([]byte, UDP_BUF_SIZE) - Printf("udp server start listen audio port[%d]", s.APort) - defer Printf("udp server stop listen audio port[%d]", s.APort) - // timer := time.Unix(0, 0) - for !s.Stoped { - if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil { - // elapsed := time.Now().Sub(timer) - // if elapsed >= 30*time.Second { - // Printf("Package recv from AConn.len:%d\n", n) - // timer = time.Now() - // } - s.AddInputBytes(n) - var bytes []byte - s.Session.RtpAudio.Push(append(bytes, bufUDP[:n]...)) - } else { - Println("udp server read audio pack error", err) - continue - } - } - }() - addr, err = net.ResolveUDPAddr("udp", ":0") - if err != nil { - return - } - s.AControlConn, err = net.ListenUDP("udp", addr) - if err != nil { - return - } - if err := s.AControlConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp server audio control conn set read buffer error, %v", err) - } - if err := s.AControlConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp server audio control conn set write buffer error, %v", err) - } - la = s.AControlConn.LocalAddr().String() - strPort = la[strings.LastIndex(la, ":")+1:] - s.AControlPort, err = strconv.Atoi(strPort) - if err != nil { - return - } - go func() { - bufUDP := make([]byte, UDP_BUF_SIZE) - Printf("udp server start listen audio control port[%d]", s.AControlPort) - defer Printf("udp server stop listen audio control port[%d]", s.AControlPort) - for !s.Stoped { - if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil { - //Printf("Package recv from AControlConn.len:%d\n", n) - s.AddInputBytes(n) - // pack := RTPPack{ - // Type: RTP_TYPE_AUDIOCONTROL, - // } - // pack.Unmarshal(bufUDP[:n]) - // s.HandleRTP(pack) - } else { - Println("udp server read audio control pack error", err) - continue - } - } - }() - return -} - -func (s *UDPServer) SetupVideo() (err error) { - addr, err := net.ResolveUDPAddr("udp", ":0") - if err != nil { - return - } - s.VConn, err = net.ListenUDP("udp", addr) - if err != nil { - return - } - networkBuffer := 1048576 - if err := s.VConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp server video conn set read buffer error, %v", err) - } - if err := s.VConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp server video conn set write buffer error, %v", err) - } - la := s.VConn.LocalAddr().String() - strPort := la[strings.LastIndex(la, ":")+1:] - s.VPort, err = strconv.Atoi(strPort) - if err != nil { - return - } - go func() { - bufUDP := make([]byte, UDP_BUF_SIZE) - Printf("udp server start listen video port[%d]", s.VPort) - defer Printf("udp server stop listen video port[%d]", s.VPort) - // timer := time.Unix(0, 0) - for !s.Stoped { - if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil { - // elapsed := time.Now().Sub(timer) - // if elapsed >= 30*time.Second { - // Printf("Package recv from VConn.len:%d\n", n) - // timer = time.Now() - // } - s.AddInputBytes(n) - var bytes []byte - s.Session.RtpVideo.Push(append(bytes, bufUDP[:n]...)) - } else { - Println("udp server read video pack error", err) - continue - } - } - }() - - addr, err = net.ResolveUDPAddr("udp", ":0") - if err != nil { - return - } - s.VControlConn, err = net.ListenUDP("udp", addr) - if err != nil { - return - } - if err := s.VControlConn.SetReadBuffer(networkBuffer); err != nil { - Printf("udp server video control conn set read buffer error, %v", err) - } - if err := s.VControlConn.SetWriteBuffer(networkBuffer); err != nil { - Printf("udp server video control conn set write buffer error, %v", err) - } - la = s.VControlConn.LocalAddr().String() - strPort = la[strings.LastIndex(la, ":")+1:] - s.VControlPort, err = strconv.Atoi(strPort) - if err != nil { - return - } - go func() { - bufUDP := make([]byte, UDP_BUF_SIZE) - Printf("udp server start listen video control port[%d]", s.VControlPort) - defer Printf("udp server stop listen video control port[%d]", s.VControlPort) - for !s.Stoped { - if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil { - //Printf("Package recv from VControlConn.len:%d\n", n) - s.AddInputBytes(n) - // pack := RTPPack{ - // Type: RTP_TYPE_VIDEOCONTROL, - // } - // pack.Unmarshal(bufUDP[:n]) - // s.HandleRTP(pack) - } else { - Println("udp server read video control pack error", err) - continue - } - } - }() - return -}