diff --git a/go.mod b/go.mod index 7df4fce..51fa8cd 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Monibuca/plugin-rtsp/v3 go 1.16 require ( - github.com/Monibuca/engine/v3 v3.1.4 + github.com/Monibuca/engine/v3 v3.2.0 github.com/Monibuca/utils/v3 v3.0.0 github.com/pion/rtp v1.6.5 github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 diff --git a/go.sum b/go.sum index 0716ccc..b881c10 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/engine/v3 v3.1.4 h1:1IuBIzCegBdwqHNKb6jD0IKtUU5P/uAd3G6fnCKfNac= -github.com/Monibuca/engine/v3 v3.1.4/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650= +github.com/Monibuca/engine/v3 v3.2.0 h1:gAaw/5NFKvC1w7e1xP4IddP5gdC7Puz75hwwoZmzEeE= +github.com/Monibuca/engine/v3 v3.2.0/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650= github.com/Monibuca/utils/v3 v3.0.0 h1:i8qCXQPQpRPgjuXKu5C2PYiL5LYzB6GW4xE162mB2ug= github.com/Monibuca/utils/v3 v3.0.0/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= diff --git a/session.go b/session.go index 1096e77..d978bee 100644 --- a/session.go +++ b/session.go @@ -3,6 +3,7 @@ package rtsp import ( "bytes" "crypto/md5" + "encoding/base64" "encoding/binary" "fmt" "io" @@ -11,10 +12,12 @@ import ( "strconv" "strings" "time" + "unsafe" . "github.com/Monibuca/engine/v3" . "github.com/Monibuca/utils/v3" "github.com/pion/rtp" + "github.com/pion/rtp/codecs" "github.com/teris-io/shortid" ) @@ -72,6 +75,10 @@ func (session *RTSP) SessionString() string { } func (session *RTSP) Stop() { + if session.Stream != nil { + session.Close() + collection.Delete(session.StreamPath) + } if session.Conn != nil { session.connRW.Flush() session.Conn.Close() @@ -85,10 +92,6 @@ func (session *RTSP) Stop() { session.UDPServer.Stop() session.UDPServer = nil } - session.Close() - if session.Stream != nil { - collection.Delete(session.StreamPath) - } } // AcceptPush 接受推流 @@ -254,6 +257,7 @@ func (session *RTSP) handleRequest(req *Request) { //} Printf("<<<\n%s", req) res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "") + var streamPath string defer func() { if p := recover(); p != nil { Printf("handleRequest err ocurs:%v", p) @@ -271,6 +275,63 @@ func (session *RTSP) handleRequest(req *Request) { case "PLAY", "RECORD": switch session.Type { case SESSEION_TYPE_PLAYER: + sub := Subscriber{ + ID: session.ID, + Type: "RTSP", + } + if sub.Subscribe(streamPath) == nil { + at, vt := session.UDPClient.AT, session.UDPClient.VT + if vt != nil { + var st uint32 + onVideo := func(pack VideoPack) { + if session.UDPClient == nil { + return + } + for _, nalu := range pack.NALUs { + for _, pack := range session.UDPClient.VPacketizer.Packetize(nalu, (pack.Timestamp-st)*90) { + p := &RTPPack{ + Type: RTP_TYPE_VIDEO, + Packet: *pack, + } + p.Raw, _ = p.Marshal() + session.SendRTP(p) + } + } + st = pack.Timestamp + } + sub.OnVideo = func(pack VideoPack) { + if st = pack.Timestamp; st != 0 { + sub.OnVideo = onVideo + } + onVideo(pack) + } + } + if at != nil { + tb := uint32(at.SoundRate / 1000) + var st uint32 + onAudio := func(pack AudioPack) { + if session.UDPClient == nil { + return + } + for _, pack := range session.UDPClient.APacketizer.Packetize(pack.Payload, (pack.Timestamp-st)*tb) { + p := &RTPPack{ + Type: RTP_TYPE_VIDEO, + Packet: *pack, + } + p.Raw, _ = p.Marshal() + session.SendRTP(p) + } + st = pack.Timestamp + } + sub.OnAudio = func(pack AudioPack) { + if st = pack.Timestamp; st != 0 { + sub.OnAudio = onAudio + } + onAudio(pack) + } + } + go sub.Play(at, vt) + } // if session.Pusher.HasPlayer(session.Player) { // session.Player.Pause(false) // } else { @@ -288,6 +349,14 @@ func (session *RTSP) handleRequest(req *Request) { session.Stop() } }() + session.URL = req.URL + _url, err := url.Parse(req.URL) + if err != nil { + res.StatusCode = 500 + res.Status = "Invalid URL" + return + } + streamPath = strings.TrimPrefix(_url.Path, "/") if req.Method != "OPTIONS" { if session.Auth != nil { authLine := req.Header["Authorization"] @@ -305,7 +374,7 @@ func (session *RTSP) handleRequest(req *Request) { res.Status = "Unauthorized" nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate()))) session.nonce = nonce - res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce) + res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="Monibuca", nonce="%s", algorithm="MD5"`, nonce) return } } @@ -315,24 +384,9 @@ func (session *RTSP) handleRequest(req *Request) { res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD" case "ANNOUNCE": session.Type = SESSION_TYPE_PUSHER - session.URL = req.URL - - url, err := url.Parse(req.URL) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid URL" - return - } - streamPath := strings.TrimPrefix(url.Path, "/") - session.SDPRaw = req.Body session.SDPMap = ParseSDP(req.Body) - stream := &Stream{ - StreamPath: streamPath, - Type: "RTSP", - } - session.Stream = stream - if session.Publish() { + if session.Stream = Publish(streamPath, "RTSP"); session.Stream != nil { if session.ASdp, session.HasAudio = session.SDPMap["audio"]; session.HasAudio { session.setAudioTrack() Printf("audio codec[%s]\n", session.ASdp.Codec) @@ -346,20 +400,64 @@ func (session *RTSP) handleRequest(req *Request) { } case "DESCRIBE": session.Type = SESSEION_TYPE_PLAYER - session.URL = req.URL - url, err := url.Parse(req.URL) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid URL" - return - } - streamPath := url.Path stream := FindStream(streamPath) if stream == nil { + res.StatusCode = 404 + res.Status = "No Such Stream:" + streamPath return } - // - //res.SetBody(session.SDPRaw) + sdpInfo := []string{ + "v=0", + fmt.Sprintf("o=%s 0 0 IN IP4 %d", session.ID, 0), + "s=monibuca", + "t=0 0", + "a=recvonly", + } + ssrc := uintptr(unsafe.Pointer(stream)) + if session.UDPClient == nil { + session.UDPClient = &UDPClient{ + Conn: session.Conn.Conn, + } + } + vt, at := stream.WaitVideoTrack(), stream.WaitAudioTrack() + if vt != nil { + session.UDPClient.VT = vt + sdpInfo = append(sdpInfo, "m=video 0 RTP/AVP 96") + switch vt.CodecID { + case 7: + sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0]) + pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1]) + session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H264Payloader{}, rtp.NewFixedSequencer(1), 90000) + sdpInfo = append(sdpInfo, "a=rtpmap:96 H264/90000", + fmt.Sprintf("a=fmtp:96 profile-level-id=%02X00%02X; packetization-mode=1; sprop-parameter-sets=%s,%s", vt.SPSInfo.ProfileIdc, vt.SPSInfo.LevelIdc*10, sps, pps)) + case 12: + vps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[0]) + sps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[1]) + pps := base64.StdEncoding.EncodeToString(vt.ExtraData.NALUs[2]) + // TODO: + // session.UDPClient.VPacketizer = rtp.NewPacketizer(1200, 96, uint32(ssrc), &codecs.H265Payloader{}, rtp.NewFixedSequencer(1), 90000) + sdpInfo = append(sdpInfo, "a=rtpmap:96 H265/90000", + fmt.Sprintf("a=fmtp:96 packetization-mode=1;sprop-vps=%s;sprop-sps=%s;sprop-pps=%s", vps, sps, pps)) + } + } + if at != nil { + sdpInfo = append(sdpInfo, "m=audio 0 RTP/AVP 97") + switch at.CodecID { + case 7: + sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMA/8000") + session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000) + session.UDPClient.AT = at + case 8: + sdpInfo = append(sdpInfo, "a=rtpmap:97 PCMU/8000") + session.UDPClient.APacketizer = rtp.NewPacketizer(1200, 97, uint32(ssrc), &codecs.G711Payloader{}, rtp.NewFixedSequencer(1), 8000) + session.UDPClient.AT = at + case 10: + // TODO: + sdpInfo = append(sdpInfo, fmt.Sprintf("a=rtpmap:97 MPEG4-GENERIC/%d/%d", at.SoundRate, at.Channels)) + } + } + session.SDPRaw = strings.Join(sdpInfo, "\r\n") + "\r\n" + res.SetBody(session.SDPRaw) case "SETUP": ts := req.Header["Transport"] // control字段可能是`stream=1`字样,也可能是rtsp://...字样。即control可能是url的path,也可能是整个url @@ -369,16 +467,10 @@ func (session *RTSP) handleRequest(req *Request) { // a=control:rtsp://192.168.1.64/trackID=1 // 例3: // a=control:?ctype=video - setupUrl, err := url.Parse(req.URL) - if err != nil { - res.StatusCode = 500 - res.Status = "Invalid URL" - return + if _url.Port() == "" { + _url.Host = fmt.Sprintf("%s:554", _url.Host) } - if setupUrl.Port() == "" { - setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host) - } - setupPath := setupUrl.String() + setupPath := _url.String() // error status. SETUP without ANNOUNCE or DESCRIBE. //if session.Pusher == nil { @@ -508,6 +600,25 @@ func (session *RTSP) handleRequest(req *Request) { ts = strings.Join(tss, ";") } } else { + if session.Type == SESSEION_TYPE_PLAYER { + if session.UDPClient.VPort == 0 { + session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupVideo(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup video error, %v", err) + return + } + } else { + session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupAudio(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup audio error, %v", err) + return + } + } + } Printf("SETUP [UDP] got UnKown control:%s", setupPath) } } diff --git a/udp-client.go b/udp-client.go index 589f6cf..e917429 100644 --- a/udp-client.go +++ b/udp-client.go @@ -5,10 +5,13 @@ import ( "net" "strings" + . "github.com/Monibuca/engine/v3" . "github.com/Monibuca/utils/v3" + "github.com/pion/rtp" ) type UDPClient struct { + Conn net.Conn APort int AConn *net.UDPConn AControlPort int @@ -17,8 +20,11 @@ type UDPClient struct { VConn *net.UDPConn VControlPort int VControlConn *net.UDPConn - - Stoped bool + AT *AudioTrack + APacketizer rtp.Packetizer + VT *VideoTrack + VPacketizer rtp.Packetizer + Stoped bool } func (s *UDPClient) Stop() { @@ -51,7 +57,7 @@ func (c *UDPClient) SetupAudio() (err error) { c.Stop() } }() - host := c.AConn.RemoteAddr().String() + host := c.Conn.RemoteAddr().String() host = host[:strings.LastIndex(host, ":")] addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort)) if err != nil { @@ -93,7 +99,7 @@ func (c *UDPClient) SetupVideo() (err error) { c.Stop() } }() - host := c.VConn.RemoteAddr().String() + host := c.Conn.RemoteAddr().String() host = host[:strings.LastIndex(host, ":")] addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort)) if err != nil {