diff --git a/README.md b/README.md index fdcc873..f88445a 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ rtmp: kickexist: false publishtimeout: 10 waitclosetimeout: 0 + delayclosetimeout: 0 subscribe: subaudio: true subvideo: true @@ -55,9 +56,8 @@ rtmp: listennum: 0 pull: repull: 0 # 当断开后是否自动重新拉流,0代表不进行重新拉流,-1代表无限次重新拉流 - pullonstart: false # 是否在m7s启动的时候自动拉流 - pullonsubscribe: false # 是否在有人订阅的时候自动拉流(按需拉流) - pulllist: {} # 拉流列表,以 streamPath为key,远程地址为value + pullonstart: {} # 是否在m7s启动的时候自动拉流 + pullonsub: {} # 是否在有人订阅的时候自动拉流(按需拉流) push: repush: 0 # 当断开后是否自动重新推流,0代表不进行重新推流,-1代表无限次重新推流 pushlist: {} # 推流列表,以 streamPath为key,远程地址为value @@ -71,4 +71,11 @@ subscribe ::: ## API -无 +### `rtmp/api/list` +获取所有rtmp流 + +### `rtmp/api/pull?target=[RTMP地址]&streamPath=[流标识]` +从远程拉取rtmp到m7s中 + +### `rtmp/api/push?target=[RTMP地址]&streamPath=[流标识]` +将本地的流推送到远端 \ No newline at end of file diff --git a/go.mod b/go.mod index 482466b..75d8355 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( go.uber.org/zap v1.21.0 - m7s.live/engine/v4 v4.9.3 + m7s.live/engine/v4 v4.9.5 ) require ( diff --git a/go.sum b/go.sum index 60b1a94..60effe7 100644 --- a/go.sum +++ b/go.sum @@ -250,5 +250,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -m7s.live/engine/v4 v4.9.3 h1:FP5duB4mdv0vw60dB1cQ5whkQqZZ3Ekw1J98Srb66As= -m7s.live/engine/v4 v4.9.3/go.mod h1:OgI9lOQ1bE64s9rApdGGop1MBAJIpc/V2MJ190d9ig4= +m7s.live/engine/v4 v4.9.5 h1:xTZYokxH/kNOqrGzQlkOQIsElbkb8VsfwlktjjOXZ08= +m7s.live/engine/v4 v4.9.5/go.mod h1:OgI9lOQ1bE64s9rApdGGop1MBAJIpc/V2MJ190d9ig4= diff --git a/handshake.go b/handshake.go index 8042501..8e6d5fb 100644 --- a/handshake.go +++ b/handshake.go @@ -91,17 +91,21 @@ func (nc *NetConnection) Handshake() error { return nc.complex_handshake(C1) } -func (client *NetConnection) ClientHandshake() error { +func (client *NetConnection) ClientHandshake() (err error) { C0C1 := make([]byte, C1S1_SIZE+1) C0C1[0] = RTMP_HANDSHAKE_VERSION - client.Write(C0C1) - S1C1 := ReadBuf(client.Reader, C1S1_SIZE+C1S1_SIZE+1) - if S1C1[0] != RTMP_HANDSHAKE_VERSION { - return errors.New("S1 C1 Error") + if _, err = client.Write(C0C1); err == nil { + // read S0 S1 + if _, err = io.ReadFull(client.Reader, C0C1); err == nil { + if C0C1[0] != RTMP_HANDSHAKE_VERSION { + err = errors.New("S1 C1 Error") + // C2 + } else if _, err = client.Write(C0C1[1:]); err == nil { + _, err = io.ReadFull(client.Reader, C0C1[1:]) // S2 + } + } } - C2 := S1C1[1 : 1536+1] - client.Write(C2) - return nil + return } func (nc *NetConnection) simple_handshake(C1 []byte) error { diff --git a/main.go b/main.go index 9c92360..0f234ca 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package rtmp import ( "context" "net/http" + "strconv" "time" "go.uber.org/zap" @@ -28,11 +29,9 @@ func (c *RTMPConfig) OnEvent(event any) { RTMPPlugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr)) go c.Listen(RTMPPlugin, c) } - if c.PullOnStart { - for streamPath, url := range c.PullList { - if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil { - RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) - } + for streamPath, url := range c.PullOnStart { + if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil { + RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) } } case config.Config: @@ -51,14 +50,12 @@ func (c *RTMPConfig) OnEvent(event any) { } } case *Stream: //按需拉流 - if c.PullOnSubscribe { - for streamPath, url := range c.PullList { - if streamPath == v.Path { - if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil { - RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) - } - break + for streamPath, url := range c.PullOnSub { + if streamPath == v.Path { + if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil { + RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) } + break } } } @@ -87,7 +84,8 @@ func (*RTMPConfig) API_list(w http.ResponseWriter, r *http.Request) { } func (*RTMPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { - err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), r.URL.Query().Has("save")) + save, _ := strconv.Atoi(r.URL.Query().Get("save")) + err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), save) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) } else {