From da2fc9d4625e16fe5b4a1a74f8dc0ca80cccb2ba Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sat, 24 Jul 2021 09:38:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=87=8D=E8=BF=9E=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client.go | 33 +++++++++------------------------ main.go | 27 ++++++--------------------- session.go | 2 -- 3 files changed, 15 insertions(+), 47 deletions(-) diff --git a/client.go b/client.go index c4d6639..6ae0977 100644 --- a/client.go +++ b/client.go @@ -25,6 +25,7 @@ func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (err error) { rtsp.Stream = &Stream{ StreamPath: streamPath, Type: "RTSP Pull", + ExtraProp: rtsp, } if result := rtsp.Publish(); result { rtsp.TransType = TRANS_TYPE_TCP @@ -34,13 +35,7 @@ func (rtsp *RTSP) PullStream(streamPath string, rtspUrl string) (err error) { rtsp.aRTPControlChannel = 3 rtsp.URL = rtspUrl rtsp.UDPServer = &UDPServer{Session: rtsp} - if err = rtsp.requestStream(); err != nil { - Println(err) - rtsp.Close() - return - } go rtsp.startStream() - collection.Store(streamPath, rtsp) return } return errors.New("publish badname") @@ -266,34 +261,24 @@ func (client *RTSP) requestStream() (err error) { } func (client *RTSP) startStream() { + if client.Err() != nil { + return + } startTime := time.Now() //loggerTime := time.Now().Add(-10 * time.Second) defer func() { if client.Err() == nil && config.Reconnect { Printf("reconnecting:%s", client.URL) client.RTSPClientInfo = RTSPClientInfo{} - if err := client.requestStream(); err != nil { - t := time.NewTicker(time.Second * 5) - for { - Printf("reconnecting:%s in 5 seconds", client.URL) - select { - case <-client.Done(): - client.Stop() - return - case <-t.C: - if err = client.requestStream(); err == nil { - go client.startStream() - return - } - } - } - } else { - go client.startStream() - } + Printf("reconnecting:%s in 5 seconds", client.URL) + time.AfterFunc(time.Second*5, client.startStream) } else { client.Stop() } }() + if err := client.requestStream(); err != nil { + return + } for client.Err() == nil { if time.Since(startTime) > time.Minute { startTime = time.Now() diff --git a/main.go b/main.go index 3ba1ec5..ffc5566 100644 --- a/main.go +++ b/main.go @@ -14,40 +14,31 @@ import ( "github.com/teris-io/shortid" ) -var collection sync.Map var config = struct { ListenAddr string - AutoPull bool - RemoteAddr string Timeout int Reconnect bool AutoPullList map[string]string -}{":554", false, "rtsp://localhost/${streamPath}", 0, false, nil} +}{":554", 0, false, nil} func init() { InstallPlugin(&PluginConfig{ Name: "RTSP", Config: &config, Run: runPlugin, - HotConfig: map[string]func(interface{}){ - "AutoPull": func(value interface{}) { - config.AutoPull = value.(bool) - }, - }, }) } func runPlugin() { - http.HandleFunc("/api/rtsp/list", func(w http.ResponseWriter, r *http.Request) { sse := NewSSE(w, r.Context()) var err error for tick := time.NewTicker(time.Second); err == nil; <-tick.C { var info []*RTSP - collection.Range(func(key, value interface{}) bool { - rtsp := value.(*RTSP) - info = append(info, rtsp) - return true - }) + for _, s := range Streams.ToList() { + if rtsp, ok := s.ExtraProp.(*RTSP); ok { + info = append(info, rtsp) + } + } err = sse.WriteJSON(info) } }) @@ -71,12 +62,6 @@ func runPlugin() { if config.ListenAddr != "" { go log.Fatal(ListenRtsp(config.ListenAddr)) } - // AddHook(HOOK_SUBSCRIBE, func(value interface{}) { - // s := value.(*Subscriber) - // if config.AutoPull && s.Publisher == nil { - // new(RTSP).PullStream(s.StreamPath, strings.Replace(config.RemoteAddr, "${streamPath}", s.StreamPath, -1)) - // } - // }) } func ListenRtsp(addr string) error { diff --git a/session.go b/session.go index d978bee..3431011 100644 --- a/session.go +++ b/session.go @@ -77,7 +77,6 @@ func (session *RTSP) SessionString() string { func (session *RTSP) Stop() { if session.Stream != nil { session.Close() - collection.Delete(session.StreamPath) } if session.Conn != nil { session.connRW.Flush() @@ -396,7 +395,6 @@ func (session *RTSP) handleRequest(req *Request) { Printf("video codec[%s]\n", session.VSdp.Codec) } session.Stream.Type = "RTSP" - collection.Store(streamPath, session) } case "DESCRIBE": session.Type = SESSEION_TYPE_PLAYER