优化作为client的握手流程

This commit is contained in:
dexter
2022-12-31 22:16:27 +08:00
parent b1cf9471c9
commit 7f00f32cb7
5 changed files with 37 additions and 28 deletions

View File

@@ -45,6 +45,7 @@ rtmp:
kickexist: false
publishtimeout: 10
waitclosetimeout: 0
delayclosetimeout: 0
subscribe:
subaudio: true
subvideo: true
@@ -55,9 +56,8 @@ rtmp:
listennum: 0
pull:
repull: 0 # 当断开后是否自动重新拉流0代表不进行重新拉流-1代表无限次重新拉流
pullonstart: false # 是否在m7s启动的时候自动拉流
pullonsubscribe: false # 是否在有人订阅的时候自动拉流(按需拉流)
pulllist: {} # 拉流列表,以 streamPath为key远程地址为value
pullonstart: {} # 是否在m7s启动的时候自动拉流
pullonsub: {} # 是否在有人订阅的时候自动拉流(按需拉流)
push:
repush: 0 # 当断开后是否自动重新推流0代表不进行重新推流-1代表无限次重新推流
pushlist: {} # 推流列表,以 streamPath为key远程地址为value
@@ -71,4 +71,11 @@ subscribe
:::
## API
### `rtmp/api/list`
获取所有rtmp流
### `rtmp/api/pull?target=[RTMP地址]&streamPath=[流标识]`
从远程拉取rtmp到m7s中
### `rtmp/api/push?target=[RTMP地址]&streamPath=[流标识]`
将本地的流推送到远端

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.18
require (
go.uber.org/zap v1.21.0
m7s.live/engine/v4 v4.9.3
m7s.live/engine/v4 v4.9.5
)
require (

4
go.sum
View File

@@ -250,5 +250,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
m7s.live/engine/v4 v4.9.3 h1:FP5duB4mdv0vw60dB1cQ5whkQqZZ3Ekw1J98Srb66As=
m7s.live/engine/v4 v4.9.3/go.mod h1:OgI9lOQ1bE64s9rApdGGop1MBAJIpc/V2MJ190d9ig4=
m7s.live/engine/v4 v4.9.5 h1:xTZYokxH/kNOqrGzQlkOQIsElbkb8VsfwlktjjOXZ08=
m7s.live/engine/v4 v4.9.5/go.mod h1:OgI9lOQ1bE64s9rApdGGop1MBAJIpc/V2MJ190d9ig4=

View File

@@ -91,17 +91,21 @@ func (nc *NetConnection) Handshake() error {
return nc.complex_handshake(C1)
}
func (client *NetConnection) ClientHandshake() error {
func (client *NetConnection) ClientHandshake() (err error) {
C0C1 := make([]byte, C1S1_SIZE+1)
C0C1[0] = RTMP_HANDSHAKE_VERSION
client.Write(C0C1)
S1C1 := ReadBuf(client.Reader, C1S1_SIZE+C1S1_SIZE+1)
if S1C1[0] != RTMP_HANDSHAKE_VERSION {
return errors.New("S1 C1 Error")
if _, err = client.Write(C0C1); err == nil {
// read S0 S1
if _, err = io.ReadFull(client.Reader, C0C1); err == nil {
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
err = errors.New("S1 C1 Error")
// C2
} else if _, err = client.Write(C0C1[1:]); err == nil {
_, err = io.ReadFull(client.Reader, C0C1[1:]) // S2
}
}
}
C2 := S1C1[1 : 1536+1]
client.Write(C2)
return nil
return
}
func (nc *NetConnection) simple_handshake(C1 []byte) error {

24
main.go
View File

@@ -3,6 +3,7 @@ package rtmp
import (
"context"
"net/http"
"strconv"
"time"
"go.uber.org/zap"
@@ -28,11 +29,9 @@ func (c *RTMPConfig) OnEvent(event any) {
RTMPPlugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
go c.Listen(RTMPPlugin, c)
}
if c.PullOnStart {
for streamPath, url := range c.PullList {
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
for streamPath, url := range c.PullOnStart {
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil {
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
}
case config.Config:
@@ -51,14 +50,12 @@ func (c *RTMPConfig) OnEvent(event any) {
}
}
case *Stream: //按需拉流
if c.PullOnSubscribe {
for streamPath, url := range c.PullList {
if streamPath == v.Path {
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
break
for streamPath, url := range c.PullOnSub {
if streamPath == v.Path {
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil {
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
break
}
}
}
@@ -87,7 +84,8 @@ func (*RTMPConfig) API_list(w http.ResponseWriter, r *http.Request) {
}
func (*RTMPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), r.URL.Query().Has("save"))
save, _ := strconv.Atoi(r.URL.Query().Get("save"))
err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), save)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
} else {