first commit

This commit is contained in:
erroot
2024-03-17 22:55:02 +08:00
commit e5081e623e
12 changed files with 1898 additions and 0 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 Monibuca
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

84
README.md Normal file
View File

@@ -0,0 +1,84 @@
## 预览插件
## 主要功能
基于websocket 实现两个m7s 之间的级联
- 支持http 请求代理方式级联
- 支持公网级联下级平台在net 后面如4G 网络)
- 支持上级平台只开放一个端口实现级联http/https端口要求支持websocket 配置网关代理时需要注意
- 支持音视频级联(下级平台推流到上级平台)
- 支持推流协议: rtmp 推流rtsp 推流, websocket flv 推流等上级平台端口资源有限限制开发只有一个https 端口)
注: rtmp 推流rtsp 推流 是m7s 开发源插件功能,需要使能相关插件
m7s 基于webscoket 实现平台级联
## 插件地址
https://github.com/erroot/plugin-erwscascade
## 插件引入
```go
import (
_ "m7s.live/plugin/erwscascade/v4"
)
```
## 配置
# websocket级联配置
erwscascade:
cid: "test-c001" #本机平台ID 不配置则随机uuid
server: #级联上级平台配置,支持同时接入多个上级平台
-
protocol: "wss" #支持的协议ws,wss
host: "47.111.28.16"
port: 8441
conextpath: ""
push:
repush: -1
pushlist:
njtv/glgc: ws://127.0.0.1:8450/erwscascade/wspush/on #推送本地流到上级平台新的streamPath 为 streamPath-cid
## API
## server API
- `/erwscascade/httpproxy/xx_m7s_url_xx?cid=test-c001` http协议透传接口
- xx_m7s_url_xx 含义是 m7s 普通url 链接
- cid 客户端ID
- 示例1: webrtc sdp 交互信息透传
浏览器请求链接https://www.server.com.cn:8441/erwscascade/httpproxy?httpPath=webrtc/play/njtv/glgc-ts?cid=test-c001
server 收到请求后通过ws链路 把http 请求转换封装为json对象,发送给client, client 解析转发给自己的webrtc 插件接口把结果再发送给sever,server 再把结果响应给浏览器
POST sdp ws sdp POST sdp
|--------------------> --------------------> -------------------->|
browser <-> | -- server -- -- client -- |<-->client service--
|<-------------------- <-------------------- <-------------------- |
RSP sdp ws sdp RSP sdp
- 示例2请求下级平台test-c001,通过erwscascade ws 推流接口推流到上级 推送本地的流njtv/glgc 到上级平台 ws://127.0.0.1:8450/erwscascade/wspush/on 这个地址
http://127.0.0.1:8450/erwscascade/httpproxy/?cid=test-c001&httpPath=/erwscascade/api/push?streamPath=njtv/glgc&target=ws://127.0.0.1:8450/erwscascade/wspush/on
- 示例3: 请求下级平台 test-c001, 通过rtmp 推流接口推送流到上级 推送本地的流njtv/glgc 到上级平台 rtmp://127.0.0.1:1945/njtv/glgc-rtmp-push 这个地址
http://127.0.0.1:8450/erwscascade/httpproxy/?cid=test-c001&httpPath=/rtmp/api/push?streamPath=njtv/glgc&target=rtmp://127.0.0.1:1945/njtv/glgc-rtmp-push
*
type ProxyMessage struct {
Url string `json:"url"`
Header http.Header `json:"header"`
Method string `json:"method"`
Body []byte `json:"body"`
}
type CascadingWsMessage struct {
Sn int `json:"sn"`
Type MessageType `json:"type"`
Pad []byte `json:"pad"`
}
const (
CInfo MessageType = iota
HTTPProxyReq
HTTPProxyRsp
// 在此添加更多的枚举成员
)
## 使用erwscascade注意事项
- 本地测试需要本地启动https服务并配置有效的证书

325
client.go Normal file
View File

@@ -0,0 +1,325 @@
package erwscascade
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/google/uuid"
)
var wsclients = make(map[string]*CascadingWsClient)
var cSn int = 0
type CascadingWsClient struct {
URL string
Conn net.Conn
IsClosed bool
IsRecving bool
}
type ProxyMessage struct {
Url string `json:"url"`
Header http.Header `json:"header"`
Method string `json:"method"`
Body []byte `json:"body"`
}
// MessageType 定义枚举类型 MessageType
type MessageType int
const (
CInfo MessageType = iota
HTTPProxyReq
HTTPProxyRsp
// 在此添加更多的枚举成员
)
func (m MessageType) String() string {
types := [...]string{"CInfo", "HTTPProxyReq", "HTTPProxyRsp"}
if m < CInfo || m > HTTPProxyRsp {
return "Unknown"
}
return types[m]
}
type CascadingWsMessage struct {
Sn int `json:"sn"`
Type MessageType `json:"type"`
Pad []byte `json:"pad"`
}
type ClientInfo struct {
Name string `json:"name"`
Serial string `json:"serial"`
}
func NewCascadingWsClient(url string) *CascadingWsClient {
return &CascadingWsClient{
URL: url,
IsClosed: true,
IsRecving: false,
}
}
func (c *CascadingWsClient) Connect() error {
log.Printf("try 2 ws Connect: %v", c.URL)
// 创建自定义的 TLS 配置
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
// 创建 Dialer
dialer := ws.Dialer{
TLSConfig: tlsConfig,
}
//conn, _, _, err := ws.Dial(context.Background(), c.URL)
conn, _, _, err := dialer.Dial(context.Background(), c.URL)
if err != nil {
log.Printf("ws Connect faild: %v", err)
return err
}
c.Conn = conn
c.IsClosed = false
return nil
}
func (c *CascadingWsClient) Close() {
c.Conn.Close()
c.IsClosed = true
}
func (c *CascadingWsClient) SendClientInfo() error {
clientInfo := ClientInfo{
Name: "html 客户端",
Serial: "SNVR-01029-003932-1330",
}
infoBytes, _ := json.Marshal(clientInfo)
cSn++
msg := CascadingWsMessage{
Sn: cSn,
Type: CInfo,
Pad: infoBytes,
}
msgBytes, _ := json.Marshal(msg)
err := wsutil.WriteClientMessage(c.Conn, ws.OpText, msgBytes)
if err != nil {
return err
}
return nil
}
func (c *CascadingWsClient) Reconnect() error {
if !c.IsClosed {
//c.Close()
//链接成功
return nil
}
err := c.Connect()
if err != nil {
//5 秒后重新链接
time.Sleep(time.Duration(5) * time.Second)
log.Printf("Connect: %v", err)
return err
}
log.Printf("connect 2 ws server sucess....")
err = c.SendClientInfo()
if err != nil {
log.Printf("SendClientInfo: %v", err)
return err
}
if !c.IsRecving {
go c.receiveWsMessages()
}
return nil
}
func (c *CascadingWsClient) onWsProxyMessages(wsMessage CascadingWsMessage, proxyMessage ProxyMessage) error {
targetURL := proxyMessage.Url
log.Printf("Parsed ProxyMessage:%v,%v,%v\n",
proxyMessage.Url,
proxyMessage.Header,
proxyMessage.Method)
if !strings.HasPrefix(proxyMessage.Url, "http:") {
//log.Println("字符串以'http:'打头")
targetURL = "http://127.0.0.1:8440" + proxyMessage.Url
}
// 发起代理请求
req, err := http.NewRequest(proxyMessage.Method, targetURL, bytes.NewBuffer(proxyMessage.Body))
if err != nil {
log.Printf("Error creating HTTP request: %v", err)
return err
}
if proxyMessage.Header != nil {
//req.Header = proxyMessage.Header
for key, values := range proxyMessage.Header {
// 添加自定义的 Header
req.Header.Set(key, values[0])
}
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Printf("Error sending HTTP request: %v", err)
return err
}
defer resp.Body.Close()
// 读取响应内容
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading response body: %v", err)
return err
}
var rspProxyMessage CascadingWsMessage
// 将响应内容填充到 CascadingWsProxyMessage 中并返回给服务器端
rspProxyMessage.Pad = respBody
rspProxyMessage.Sn = wsMessage.Sn
rspProxyMessage.Type = HTTPProxyRsp
responseJSON, _ := json.Marshal(rspProxyMessage)
err = wsutil.WriteClientMessage(c.Conn, ws.OpText, responseJSON)
if err != nil {
log.Printf("Error sending response: %v", err)
//延迟10秒
return err
}
return nil
}
func (c *CascadingWsClient) receiveWsMessages() {
c.IsRecving = true
for {
if c.IsClosed {
//延迟1秒
time.Sleep(time.Duration(1) * time.Second)
continue
}
msg, _, err := wsutil.ReadServerData(c.Conn)
if err != nil {
// 判断错误类型
// if opErr, ok := err.(*net.OpError); ok {
// // 检测错误消息中是否包含特定子字符串
// if strings.Contains(opErr.Err.Error(), "An existing connection was forcibly closed by the remote host") {
// log.Println("Connection forcibly closed by remote host")
// // 在这里可以添加处理逻辑,比如自动重连等
// } else {
// log.Println("Other net.OpError occurred:", opErr)
// }
// } else if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
// log.Println("Temporary error occurred, retrying...")
// // 在这里可以添加自动重连的逻辑
// c.Close()
// continue
// } else if err == net.ErrClosed {
// // 在这里可以添加自动重连的逻辑
// log.Println("Connection closed by server,try reconnect...")
// c.Close()
// continue
// }
log.Printf("Error reading message: %v", err)
//延迟10秒
//continue
//断开重连
c.Close()
break
}
// 解析收到的消息为 CascadingWsProxyMessage
var wsMessage CascadingWsMessage
err = json.Unmarshal(msg, &wsMessage)
if err != nil {
log.Printf("Error decoding message: %v", err)
continue
}
// 根据 MessageType 的值解析 Pad 字段为 ProxyMessage
var proxyMessage ProxyMessage
if wsMessage.Type == HTTPProxyReq {
err := json.Unmarshal(wsMessage.Pad, &proxyMessage)
if err != nil {
log.Println("Error parsing ProxyMessage:", err)
continue
}
c.onWsProxyMessages(wsMessage, proxyMessage)
} else {
log.Println("MessageType is:", wsMessage.Type.String())
}
}
c.IsRecving = false
}
func (p *ErWsCascadeConfig) onClientSetup() {
cid := p.Cid
if cid == "" {
// 创建一个新的UUID
newUUID, err := uuid.NewRandom()
if err != nil {
log.Println("生成UUID错误:", err)
return
}
// 将UUID转换为字符串形式
cid = newUUID.String()
}
go func() {
for idx, s := range p.ServerConfig {
//client.Reconnect()
protocol := "ws"
if s.Protocol == "https" {
protocol = "wss"
} else if s.Protocol == "wss" {
protocol = "wss"
}
host := s.Host
if s.Port != 80 && s.Port != 443 {
host += fmt.Sprintf(":%d", s.Port)
}
u := url.URL{
Scheme: protocol,
Host: host,
Path: s.ConextPath + "/erwscascade/wsocket/register?cid=" + cid,
}
client := NewCascadingWsClient(u.String())
wsclients[fmt.Sprintf("%d", idx)] = client
}
// 保持连接
for {
for _, client := range wsclients {
client.Reconnect()
}
time.Sleep(time.Duration(5) * time.Second)
}
}()
}

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module m7s.live/plugin/erwscascade/v4
go 1.19

347
main.go Normal file
View File

@@ -0,0 +1,347 @@
package erwscascade
import (
"embed"
"fmt"
"io/ioutil"
"mime"
"net/http"
"net/url"
"path/filepath"
"strings"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
//go:embed ui
var f embed.FS
var defaultYaml DefaultYaml
type ServerConfig struct {
Protocol string `default:"https" desc:"上级平台端口" yaml:"protocol"`
Host string `default:"127.0.0.1" desc:"上级平台IP" yaml:"host"`
Port int `default:"8440" desc:"上级平台端口" yaml:"port"`
ConextPath string `default:"" desc:"上级平台根目录" yaml:"conextpath"`
}
type ErWsCascadeConfig struct {
DefaultYaml
Cid string `default:"" desc:"客户端ID" yaml:"cid"`
//erwscascade/wsocket/register
ServerConfig []ServerConfig `yaml:"server"`
config.Publish
config.Subscribe
config.Push
}
func (p *ErWsCascadeConfig) push(streamPath string, url string) {
if err := ErWsCascadePlugin.Push(streamPath, url, NewWscPusher(p), false); err != nil {
ErWsCascadePlugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
}
func (p *ErWsCascadeConfig) OnEvent(event any) {
switch event.(type) {
case FirstConfig:
p.onClientSetup()
for streamPath, url := range p.PushList {
p.push(streamPath, url)
}
break
case config.Config:
break
case SEclose:
break
}
}
var ErWsCascadePlugin = InstallPlugin(&ErWsCascadeConfig{})
func (p *ErWsCascadeConfig) Ui_(w http.ResponseWriter, r *http.Request) {
ss := strings.Split(r.URL.Path, "/")
if b, err := f.ReadFile("ui/" + ss[len(ss)-1]); err == nil {
w.Header().Set("Content-Type", mime.TypeByExtension(filepath.Ext(ss[len(ss)-1])))
w.Write(b)
} else {
//w.Header().Set("Cross-Origin-Opener-Policy", "same-origin")
//w.Header().Set("Cross-Origin-Embedder-Policy", "require-corp")
b, err = f.ReadFile("ui/index.html")
w.Write(b)
}
}
// http 接口向上级平台推流
func (p *ErWsCascadeConfig) API_Push(rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
err := ErWsCascadePlugin.Push(query.Get("streamPath"), query.Get("target"), NewWscPusher(p), query.Has("save"))
if err != nil {
util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
} else {
util.ReturnOK(rw, r)
}
}
/*
http ws 代理转发接口,实现 m7s 上级代理转发下级api
示例1请求下级平台test-c001,通过erwscascade ws 推流接口推流到上级 推送本地的流njtv/glgc 到上级平台 ws://127.0.0.1:8450/erwscascade/wspush/on 这个地址
http://127.0.0.1:8450/erwscascade/httpproxy/?cid=test-c001&httpPath=/erwscascade/api/push?streamPath=njtv/glgc&target=ws://127.0.0.1:8450/erwscascade/wspush/on
示例2: 请求下级平台 test-c001, 通过rtmp 推流接口推送流到上级 推送本地的流njtv/glgc 到上级平台 rtmp://127.0.0.1:1945/njtv/glgc-rtmp-push 这个地址
http://127.0.0.1:8450/erwscascade/httpproxy/?cid=test-c001&httpPath=/rtmp/api/push?streamPath=njtv/glgc&target=rtmp://127.0.0.1:1945/njtv/glgc-rtmp-push
*/
func (p *ErWsCascadeConfig) HttpProxy_(w http.ResponseWriter, r *http.Request) {
///erwscascade/httpProxy/webrtc/play/live/webrtc
// 修改请求 URL
//fmt.Printf("config: %v\n", config.Gloub)
// 去除原始请求中的 /proxy 部分
//targetPath := strings.TrimPrefix(r.URL.Path, "/httpproxy")
// if r.URL.RawQuery != "" {
// targetURL += "?" + r.URL.RawQuery
// }
// 构造新的查询参数
queryParams := r.URL.Query()
cid := queryParams.Get("cid")
if cid != "" {
queryParams.Del("cid")
} else {
util.ReturnError(util.APIErrorQueryParse, "invalid cid", w, r)
return
}
httpPath := queryParams.Get("httpPath")
if httpPath != "" {
queryParams.Del("httpPath")
} else {
util.ReturnError(util.APIErrorQueryParse, "invalid httpPath", w, r)
return
}
// 重新构造查询参数
newQuery := queryParams.Encode() //特殊字符被编码了
//代理到下级平台
if newQuery != "" {
if strings.Contains(httpPath, "?") {
httpPath += "&"
} else {
httpPath += "?"
}
httpPath += newQuery
}
decodedStr, err := url.QueryUnescape(httpPath) //url 解码
if err != nil {
util.ReturnError(util.APIErrorQueryParse, "QueryUnescape faild", w, r)
return
}
req := ProxyMessage{
Url: decodedStr, //url 解码
Method: r.Method,
}
// 创建一个新的 http.Header 对象
newHeader := make(http.Header)
// 获取 Content-Type
contentType := r.Header.Get("Content-Type")
if contentType != "" {
newHeader.Set("Content-Type", contentType)
}
// 获取 Set-Cookie
setCookie := r.Header.Get("Set-Cookie")
if setCookie != "" {
newHeader.Set("Set-Cookie", setCookie)
}
req.Header = newHeader
if r.ContentLength > 0 {
// 读取 Body 的内容
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("Error reading request body: %v\n", err)
return
}
req.Body = body
}
//ws send msg
rsp, err := p.transWsProxyMessage(cid, req)
if err != nil {
fmt.Printf("WsProxy faild: %v\n", err)
return
}
// 将读取的 Body 内容作为响应返回给客户端
w.Write(rsp)
return
// //本机代理
// targetURL := "http://127.0.0.1:8440" + targetPath
// if newQuery != "" {
// targetURL += "?" + newQuery
// }
// // 创建新的请求
// req, err := http.NewRequest(r.Method, targetURL, r.Body)
// if err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// // 复制请求头
// req.Header = make(http.Header)
// for key, values := range r.Header {
// for _, value := range values {
// req.Header.Add(key, value)
// }
// }
// // 发送请求
// resp, err := http.DefaultClient.Do(req)
// if err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// defer resp.Body.Close()
// // 将响应写回给客户端
// for key, values := range resp.Header {
// for _, value := range values {
// w.Header().Add(key, value)
// }
// w.WriteHeader(resp.StatusCode)
// io.Copy(w, resp.Body)
// }
}
/*
func (p *ErWsCascadeConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var conn net.Conn
var err error
//下级平台 ws接入链接
if r.Header.Get("Upgrade") == "websocket" {
// 判断 URL 中是否包含 "register" 关键字
if matched, _ := regexp.MatchString("/register", r.URL.Path); matched {
// 获取 cid 参数
cid := r.URL.Query().Get("cid")
fmt.Printf("register in client id %s\n", cid)
conn, _, _, err = ws.UpgradeHTTP(r, w)
if err != nil {
fmt.Printf("UpgradeHTTP error:%v", err)
return
}
connectionsLock.Lock()
connections[cid] = &conn
connectionsThreads[cid] = make(chan struct{})
connectionsLock.Unlock()
// 启动线程接收客户端消息
go p.receiveWsMessages(&conn, cid)
}
return
}
if r.URL.Path == "/erwscascade/" {
var s string
Streams.Range(func(streamPath string, _ *Stream) {
s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath)
})
if s != "" {
s = "<b>Live Streams</b><br>" + s
}
for name, p := range Plugins {
if pullcfg, ok := p.Config.(config.PullConfig); ok {
if pullonsub := pullcfg.GetPullConfig().PullOnSub; pullonsub != nil {
s += fmt.Sprintf("<b>%s pull stream on subscribe</b><br>", name)
for streamPath, url := range pullonsub {
s += fmt.Sprintf("<a href='%s'>%s</a> <-- %s<br>", streamPath, streamPath, url)
}
}
}
}
w.Write([]byte(s))
return
}
ss := strings.Split(r.URL.Path, "/")
if b, err := f.ReadFile("ui/" + ss[len(ss)-1]); err == nil {
w.Header().Set("Content-Type", mime.TypeByExtension(filepath.Ext(ss[len(ss)-1])))
w.Write(b)
} else {
//w.Header().Set("Cross-Origin-Opener-Policy", "same-origin")
//w.Header().Set("Cross-Origin-Embedder-Policy", "require-corp")
b, err = f.ReadFile("ui/index.html")
w.Write(b)
}
}
*/
/*
type CascadingStream struct {
Source string
StreamPath string
}
func filterStreams() (ss []*CascadingStream) {
//优先获取配置文件中视频流
for name, p := range Plugins {
if pullcfg, ok := p.Config.(config.PullConfig); ok {
if pullonstart := pullcfg.GetPullConfig().PullOnStart; pullonstart != nil {
//s += fmt.Sprintf("<b>%s pull stream on subscribe</b><br>", name)
//for streamPath, url := range pullonsub {
var sourcename string = name
sourcename += "Pull"
for streamPath := range pullonstart {
ss = append(ss, &CascadingStream{sourcename, streamPath})
//s += fmt.Sprintf("<a href='%s'>%s</a> <-- %s<br>", streamPath, streamPath, url)
}
}
if pullonsub := pullcfg.GetPullConfig().PullOnSub; pullonsub != nil {
//s += fmt.Sprintf("<b>%s pull stream on subscribe</b><br>", name)
//for streamPath, url := range pullonsub {
var sourcename string = name
sourcename += "Pull"
for streamPath := range pullonsub {
ss = append(ss, &CascadingStream{sourcename, streamPath})
//s += fmt.Sprintf("<a href='%s'>%s</a> <-- %s<br>", streamPath, streamPath, url)
}
}
}
}
//过滤出动态添加的视频流
//Streams.RLock()
//defer Streams.RUnlock()
Streams.Range(func(streamPath string, s *Stream) {
//s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath)
var isrepeat bool = false
for _, s := range ss {
if streamPath == s.StreamPath {
isrepeat = true
}
}
if !isrepeat {
ss = append(ss, &CascadingStream{"api", streamPath})
}
})
return
}
func (*ErWsCascadeConfig) API_streamslist(w http.ResponseWriter, r *http.Request) {
util.ReturnFetchValue(filterStreams, w, r)
}
*/

BIN
result.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 523 KiB

207
server.go Normal file
View File

@@ -0,0 +1,207 @@
package erwscascade
import (
"encoding/json"
"errors"
"log"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
var responseChan = make(chan CascadingWsMessage)
type WsClientConn struct {
Conn *net.Conn
RspChan chan CascadingWsMessage
}
var clientConnections = make(map[string]*WsClientConn)
var connectionsThreads = make(map[string]chan struct{})
var connectionsLock sync.RWMutex
var gSn int = 0
func (p *ErWsCascadeConfig) transWsProxyMessage(cid string, req ProxyMessage) ([]byte, error) {
connectionsLock.Lock()
client, ok := clientConnections[cid]
connectionsLock.Unlock()
if !ok {
return nil, errors.New("no find client")
}
reqPad, _ := json.Marshal(req)
gSn++
reqMsg := CascadingWsMessage{
Sn: gSn,
Type: HTTPProxyReq,
Pad: reqPad,
}
reqBytes, _ := json.Marshal(reqMsg)
log.Printf("server proxy msg sn:%v, type:%v\n", reqMsg.Sn, reqMsg.Type)
err := wsutil.WriteServerMessage(*client.Conn, ws.OpText, reqBytes)
if err != nil {
log.Println("WriteServerMessage err:", err)
return nil, err
}
// 设置超时时间为 10 秒
timeout := time.After(10 * time.Second)
for {
select {
case rspMsg := <-client.RspChan:
log.Printf("client rsp msg sn:%v, type:%v\n", rspMsg.Sn, rspMsg.Type)
// 收到响应消息
if rspMsg.Sn == reqMsg.Sn {
return rspMsg.Pad, nil
} else {
//继续等超时
continue
}
case <-timeout:
log.Println("Timeout: No response received within 10 seconds")
return nil, errors.New("time out")
}
}
return nil, nil
}
func (p *ErWsCascadeConfig) sendWsMessageToClient(cid string, message string) {
connectionsLock.Lock()
client, ok := clientConnections[cid]
connectionsLock.Unlock()
if ok {
err := wsutil.WriteServerMessage(*client.Conn, ws.OpText, []byte(message))
if err != nil {
log.Println("Error sending message to client:", err)
}
}
}
func (p *ErWsCascadeConfig) receiveWsMessages(client *WsClientConn, cid string) {
for {
msg, _, err := wsutil.ReadClientData(*client.Conn)
if err != nil {
log.Println("Error reading message:", err)
break
}
log.Printf("Received message from client: %s\n", cid)
// Handle client messages here
//p.sendWsMessageToClient(cid, "Server RSP")
// 解析收到的消息为 CascadingWsProxyMessage
var wsMessage CascadingWsMessage
err = json.Unmarshal(msg, &wsMessage)
if err != nil {
log.Printf("Error decoding message: %v", err)
continue
}
if wsMessage.Type == HTTPProxyRsp {
//通知阻塞函数
client.RspChan <- wsMessage
} else if wsMessage.Type == CInfo {
var clientInfo ClientInfo
err := json.Unmarshal(wsMessage.Pad, &clientInfo)
if err != nil {
log.Println("Error parsing clientInfo:", err)
continue
}
log.Println("Parsed ClientInfo:", clientInfo)
} else {
log.Println("MessageType is:", wsMessage.Type.String())
}
}
log.Printf("ws client offline cid: %s\n", cid)
// 断开连接时清理操作
connectionsLock.Lock()
delete(clientConnections, cid)
close(connectionsThreads[cid])
connectionsLock.Unlock()
}
func (p *ErWsCascadeConfig) Wsocket_(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
// 判断 URL 中是否包含 "register" 关键字
//customHeader := r.Header.Get("X-Custom-Header")
// 获取完整的 URL
fullURL := r.URL.String()
//判断URL编码特殊字符%2F,%2B,%3F,%25包含这些则自动转码?= 等
log.Printf("Full URL: %s\n", fullURL)
if matched, _ := regexp.MatchString("/register", r.URL.Path); matched {
// 获取 cid 参数
//cid := r.URL.Query().Get("cid")
// 解码 URL
decodedURL, err := url.QueryUnescape(fullURL)
if err != nil {
log.Printf("Error decoding URL: %v\n", err)
return
}
// 手动解析解码后的 URL 中的参数
var queryString string
if strings.Contains(decodedURL, "?") {
queryString = strings.Split(decodedURL, "?")[1]
}
queryParams, err := url.ParseQuery(queryString)
if err != nil {
log.Printf("Error parsing query string: %v\n", err)
return
}
cid := queryParams.Get("cid")
log.Printf("CID value: %s\n", cid)
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
log.Printf("UpgradeHTTP error:%v", err)
return
}
if cid == "" {
log.Printf("invalid cid refuse connect\n")
conn.Close()
return
}
client := WsClientConn{
Conn: &conn,
RspChan: make(chan CascadingWsMessage),
}
connectionsLock.Lock()
clientConnections[cid] = &client
connectionsThreads[cid] = make(chan struct{})
connectionsLock.Unlock()
// 启动线程接收客户端消息
go p.receiveWsMessages(&client, cid)
}
return
} else {
return
}
}

86
ui/clienttest.html Normal file
View File

@@ -0,0 +1,86 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Live Camera Feed</title>
</head>
<body>
<canvas id="canvas" width="640" height="480"></canvas>
<button id="wsMsgSendBtn" width="80" height="40">ws发送测试消息</button>
<script>
const imgWidth = 1920;
const imgHeight = 1080;
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
const ws = new WebSocket('ws://localhost:8440/erwscascade/wspush/on?cid=test-c001&streamPath=njtv/glgc');
ws.binaryType = 'arraybuffer';
const imageData = ctx.createImageData(imgWidth, imgHeight);
const tempCanvas = document.createElement('canvas');
tempCanvas.width = imgWidth;
tempCanvas.height = imgHeight;
const tempCtx = tempCanvas.getContext('2d');
const wsMsgSendBtn = document.getElementById('wsMsgSendBtn');
var deviceInfo={
"method": "deviceInfo",
"name": "html 客户端",
"serial": "SNVR-01029-003932-1330"
}
// 为该按钮绑定点击事件处理程序
wsMsgSendBtn.addEventListener('click', function() {
//alert('按钮被点击了!');
//调用sendMessage函数发送消息
sendMessage(JSON.stringify(deviceInfo));
});
// 在打开连接前添加自定义header
ws.onopen = function(event) {
// 设置自定义header [clientId]
//ws.headers = { 'clientId': 'your-client-id-html' };
console.log('open succes: ');
};
// 接收到服务器消息的处理
ws.onmessage = function(event) {
console.log('Received Message: ', event.data);
};
// 发送消息
function sendMessage(message) {
ws.send(message);
}
// 当WebSocket连接关闭时
ws.onclose = function(event) {
console.log("WebSocket disconnected");
};
// 当有错误发生时
ws.onerror = function(error) {
console.log("WebSocket error observed:", error);
};
</script>
</body>
</html>

116
ui/demo.html Normal file
View File

@@ -0,0 +1,116 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>测试WebRTC拉流</title>
<style>
</style>
</head>
<body>
<video id="video" width="640" height="480" autoplay muted controls>
</video>
<!-- <button id="sw" onclick="action()" type="button" style="width:100px;height:30px;display: block;">unpublish</button> -->
<pre>
<div style="display: flex;" style="width: 100vw;">
<div style="width: 50vw; padding-left: 20px; border-left: 1px solid red;">
<text>local</text>
<code id="localSdp">
</code>
</div>
<div style="width: 50vw; padding-left: 20px; border-left: 1px solid red;">
<text>remote</text>
<code id="remoteSdp" >
</code>
</div>
</div>
</pre>
</body>
<script>
let action = () => { console.log('action not set'); };
(async () => {
const pc = new RTCPeerConnection({
iceServers: [
{
urls: 'stun:stun.l.google.com:19302'
}
]
});
pc.ontrack = (e) => {
console.log('ontrack', e);
if (e.streams.length === 0) return;
document.getElementById('video').srcObject = e.streams[0];
document.getElementById('video').play();
};
pc.oniceconnectionstatechange = () => {
console.log('oniceconnectionstatechange', pc.iceConnectionState);
};
pc.onicecandidate = (e) => {
console.log('onicecandidate', e.candidate);
};
pc.addTransceiver('video', { direction: 'recvonly' });
pc.addTransceiver('audio', { direction: 'recvonly' });
// const dc = pc.createDataChannel('sdp');
const offer = await pc.createOffer();
const localSdp = offer.sdp;
document.getElementById('localSdp').innerText = localSdp;
await pc.setLocalDescription(offer);
const searchParams = new URLSearchParams(location.search);
const streamPath = searchParams.get('streamPath') || 'live/webrtc';
searchParams.delete('streamPath')
const result = await fetch(
`/erwscascade/proxy/webrtc/play/${streamPath}${searchParams.lenght?`?${searchParams.toString()}`:''}`,
{
method: 'POST',
mode: 'cors',
cache: 'no-cache',
credentials: 'include',
redirect: 'follow',
referrerPolicy: 'no-referrer',
headers: { 'Content-Type': 'application/sdp' },
body: offer.sdp,
},
);
const remoteSdp = await result.text();
document.getElementById('remoteSdp').innerText = remoteSdp;
await pc.setRemoteDescription(
new RTCSessionDescription({ type: 'answer', sdp: remoteSdp }),
);
// dc.onmessage = async (e) => {
// await pc.setRemoteDescription(
// new RTCSessionDescription({ type: 'answer', sdp: e.data }),
// );
// };
// const publish = async () => {
// videoTransceiver.direction = 'sendonly';
// audioTransceiver.direction = 'sendonly';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('1' + offer.sdp);
// action = unpublish;
// document.getElementById('sw').innerText = 'unpublish';
// };
// const unpublish = async () => {
// videoTransceiver.direction = 'inactive';
// audioTransceiver.direction = 'inactive';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('0' + offer.sdp);
// action = publish;
// document.getElementById('sw').innerText = 'publish';
// };
// action = unpublish;
})()
</script>
</html>

113
ui/subscribe.html Normal file
View File

@@ -0,0 +1,113 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>测试WebRTC拉流</title>
<style>
</style>
</head>
<body>
<video id="video" width="640" height="480" autoplay muted controls>
</video>
<!-- <button id="sw" onclick="action()" type="button" style="width:100px;height:30px;display: block;">unpublish</button> -->
<pre>
<div style="display: flex;" style="width: 100vw;">
<div style="width: 50vw; padding-left: 20px; border-left: 1px solid red;">
<text>local</text>
<code id="localSdp">
</code>
</div>
<div style="width: 50vw; padding-left: 20px; border-left: 1px solid red;">
<text>remote</text>
<code id="remoteSdp" >
</code>
</div>
</div>
</pre>
</body>
<script>
let action = () => { console.log('action not set'); };
(async () => {
const pc = new RTCPeerConnection({
bundlePolicy: "max-bundle" ,
iceServers: [
// {
// //urls: 'stun:stun.l.google.com:19302'
// urls: "stun:erp.jshwx.com.cn:3478",
// },
// {
// urls: "turn:erp.jshwx.com.cn:3478",
// credential: "ert123",
// username: "ert"
// }
{
urls: "stun:stream.jshwx.com.cn:3478",
},
{
urls: "turn:stream.jshwx.com.cn:3478",
credential: "hwx",
username: "jshwx123"
}
]
});
pc.ontrack = (e) => {
console.log('ontrack', e);
if (e.streams.length === 0) return;
document.getElementById('video').srcObject = e.streams[0];
document.getElementById('video').play();
};
pc.oniceconnectionstatechange = () => {
console.log('oniceconnectionstatechange', pc.iceConnectionState);
};
pc.onicecandidate = (e) => {
console.log('onicecandidate', e.candidate);
};
pc.addTransceiver('video', { direction: 'recvonly' });
pc.addTransceiver('audio', { direction: 'recvonly' });
// const dc = pc.createDataChannel('sdp');
const offer = await pc.createOffer();
const localSdp = offer.sdp;
document.getElementById('localSdp').innerText = localSdp;
await pc.setLocalDescription(offer);
const searchParams = new URLSearchParams(location.search);
const streamPath = searchParams.get('streamPath') || 'live/webrtc';
searchParams.delete('streamPath')
if (searchParams.has("cid")){
searchParams.delete("cid")
}
const result = await fetch(
`/erwscascade/proxy/webrtc/play/${streamPath}${searchParams.lenght?`?${searchParams.toString()}`:''}`,
{
method: 'POST',
mode: 'cors',
cache: 'no-cache',
credentials: 'include',
redirect: 'follow',
referrerPolicy: 'no-referrer',
headers: { 'Content-Type': 'application/sdp' },
body: offer.sdp,
},
);
const remoteSdp = await result.text();
document.getElementById('remoteSdp').innerText = remoteSdp;
await pc.setRemoteDescription(
new RTCSessionDescription({ type: 'answer', sdp: remoteSdp }),
);
})()
</script>
</html>

355
wscPusher.go Normal file
View File

@@ -0,0 +1,355 @@
package erwscascade
import (
"context"
"crypto/tls"
"errors"
"net"
"strconv"
"sync"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/util"
)
/**
subordinate (下级平台 订阅自己的 flv 流) ws 推送 到上级平台
streamPath: original
**/
type WscPusher struct {
Subscriber
Pusher
mutex sync.Mutex
Conn *net.Conn
Status int
Cc *ErWsCascadeConfig
absTS uint32 //绝对时间戳
buf util.Buffer
pool util.BytesPool
connectCount int // 统计链接次数
}
func NewWscPusher(cc *ErWsCascadeConfig) *WscPusher {
return &WscPusher{
Conn: nil,
Cc: cc,
Status: 0,
connectCount: 0,
buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
pool: make(util.BytesPool, 17),
// buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
// pool: make(util.BytesPool, 17),
}
}
func (pusher *WscPusher) Disconnect() {
// if pusher.Closer != nil {
// pusher.Closer.Close()
// }
pusher.mutex.Lock()
defer pusher.mutex.Unlock()
if pusher.Status == 0 {
return
}
pusher.Status = 0
//客户端主动断开webscocket 链接
if pusher.Conn != nil {
pusher.Info("WscPusher Disconnect to close ws connect")
(*pusher.Conn).Close()
pusher.Conn = nil
}
//移除订阅者
stream := pusher.GetSubscriber().Stream
stream.Receive(Unsubscribe(pusher)) // 通知stream移除订阅者
pusher.Subscriber.Stop()
}
//自动重连问题需要,需要修改 engin pusher.go badPusher 判断返回问题
func (pusher *WscPusher) Connect() (err error) {
pusher.connectCount++
// 创建自定义的 TLS 配置
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
// 创建 Dialer
dialer := ws.Dialer{
TLSConfig: tlsConfig,
}
url := pusher.RemoteURL + "?cid=" + pusher.Cc.Cid
url += "&streamPath=" + pusher.StreamPath
pusher.Info("WscPusher try connect times:"+strconv.Itoa(pusher.connectCount), zap.String("remoteURL", url))
conn, _, _, err := dialer.Dial(context.Background(), url)
if err != nil {
pusher.Error("WscPusher connect faild", zap.Error(err))
return err
}
//
pusher.SetParentCtx(context.Background()) //注入context
pusher.RemoteAddr = url
//conn.SetWriteDeadline(time.Now().Add(pusher.WriteTimeout))
pusher.SetIO(conn)
pusher.Conn = &conn
pusher.Status = 1
//发送FlvHeader
pusher.WriteFlvHeader()
return nil
}
func (sub *WscPusher) WriteFlvHeader() {
at, vt := sub.Audio, sub.Video
hasAudio, hasVideo := at != nil, vt != nil
var amf util.AMF
amf.Marshal("onMetaData")
metaData := util.EcmaArray{
"MetaDataCreator": "m7s" + Engine.Version,
"hasVideo": hasVideo,
"hasAudio": hasAudio,
"hasMatadata": true,
"canSeekToEnd": false,
"duration": 0,
"hasKeyFrames": 0,
"framerate": 0,
"videodatarate": 0,
"filesize": 0,
}
var flags byte
if hasAudio {
flags |= (1 << 2)
metaData["audiocodecid"] = int(at.CodecID)
metaData["audiosamplerate"] = at.SampleRate
metaData["audiosamplesize"] = at.SampleSize
metaData["stereo"] = at.Channels == 2
}
if hasVideo {
flags |= 1
metaData["videocodecid"] = int(vt.CodecID)
metaData["width"] = vt.SPSInfo.Width
metaData["height"] = vt.SPSInfo.Height
}
amf.Marshal(metaData)
// 写入FLV头
if err := wsutil.WriteClientBinary(sub, []byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0}); err != nil {
sub.OnConnErr(zap.Error(err))
}
//codec.WriteFLVTag(sub, codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer)
//发送自定义FLV_TAG_TYPE_SCRIPT 脚本信息
buffers := codec.AVCC2FLV(codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer)
var data []byte
for _, buf := range buffers {
data = append(data, buf...)
}
if err := wsutil.WriteClientBinary(sub, data); err != nil {
sub.OnConnErr(zap.Error(err))
}
}
func (pusher *WscPusher) PlayFlv() {
go pusher.Subscriber.PlayFLV()
}
// 预留双向通信
func (pusher *WscPusher) ReadFLVTag() {
var startTs uint32
offsetTs := pusher.absTS
for {
//读取掩码头部信息
_, err := ws.ReadHeader(*pusher.Conn)
if err != nil {
// 处理读取头部信息错误
pusher.OnConnErr(zap.Error(err))
break
}
//log.Println(header)
//{"error": "frames from client to server must be masked"}
data, err := wsutil.ReadServerBinary(*pusher.Conn)
if err != nil {
//延迟10秒
pusher.OnConnErr(zap.Error(err))
break
}
// 解码数据
// mask := header.Mask
// for i, b := range data {
// data[i] = b ^ mask[i%4]
// }
t, timestamp, payload, err := pusher.ParseFLVTag(data)
if err != nil {
pusher.OnConnErr(zap.Error(err))
}
if startTs == 0 {
startTs = timestamp
}
pusher.absTS = offsetTs + (timestamp - startTs)
var frame util.BLL
mem := pusher.pool.Get(int(len(payload)))
frame.Push(mem)
//mem.Value = payload
copy(mem.Value, payload)
//log.Printf("type:%v, absTS:%v timestamp:%v\n", t, recever.absTS, timestamp)
switch t {
case codec.FLV_TAG_TYPE_AUDIO:
//recever.WriteAVCCAudio(recever.absTS, &frame, recever.pool)
case codec.FLV_TAG_TYPE_VIDEO:
//recever.WriteAVCCVideo(recever.absTS, &frame, recever.pool)
case codec.FLV_TAG_TYPE_SCRIPT:
//recever.Info("script", zap.ByteString("data", payload))
//frame.Recycle()
}
}
pusher.Info("WscPusher ReadFLVTag end...")
}
func (pusher *WscPusher) Push() (err error) {
pusher.Info("WscPusher try PlayFlv push...")
pusher.PlayFlv()
//这个循环对应Push 接口很重要否则会进入不断重连
//阻塞等待接收服务回传的flv tag 应用与后期开发平台级联对讲功能
go pusher.ReadFLVTag()
//pusher.Info("WscPusher PlayFlv end...")
for pusher.Status != 0 {
//判断
time.Sleep(15 * time.Second)
stream := pusher.GetSubscriber().Stream
//判断订阅的流是否关闭,如果关闭则,断开当前链接重新链接
//流被关闭,指的是流的发布者
if stream.IsClosed() {
pusher.OnConnErr(zap.Error(errors.New("stream close")))
}
//订阅者是否正常播放
if !pusher.IsPlaying() {
pusher.OnConnErr(zap.Error(errors.New("stream sub not palying")))
}
//pusher.Info(fmt.Sprintf("WscPusher push stream IsClosed:%v", stream.IsClosed()))
}
pusher.Info("WscPusher Push end...")
return nil
}
func (pusher *WscPusher) IsClosed() bool {
return pusher.Status != 1
}
// 统一处理读写错误
func (pusher *WscPusher) OnConnErr(reason ...zapcore.Field) {
pusher.Error("WscPusher OnConnErr", reason[0])
//停止订阅
//pusher.Stop(reason[0])
pusher.Disconnect()
}
func (pusher *WscPusher) ParseFLVTag(b []byte) (t byte, timestamp uint32, payload []byte, err error) {
if len(b) < 11 {
return 0, 0, nil, errors.New("FLV Tag data is too short")
}
t = b[0] // Tag类型
dataSize := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) // 数据部分大小
timestamp = uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) // 时间戳
timestamp |= uint32(b[7]) << 24 // 扩展时间戳
//streamID := uint32(b[8])<<16 | uint32(b[9])<<8 | uint32(b[10]) // 流ID 默认为0
payload = b[11 : 11+dataSize] // 数据payload
//log.Printf("dataSize:%v ,buf len:%v", dataSize+1, len(b))
return t, timestamp, payload, nil
}
func (pusher *WscPusher) WriteFLVTag(tag FLVFrame) {
//pusher.Info("try WriteFLVTag...")
if pusher.Status == 0 {
//pusher.Info("WriteFLVTag status not ready...")
return
}
// 掩码密钥
mask := [4]byte{0x12, 0x34, 0x56, 0x78}
// 将FLVFrame转换为net.Buffers类型
buffers := net.Buffers(tag)
// 逐个取出字节缓冲区中的内容,并写入单独的字节切片
var data []byte
for _, buf := range buffers {
data = append(data, buf...)
}
//for debug
// t, timestamp, _, err := pusher.ParseFLVTag(data)
// if err != nil {
// pusher.OnConnErr(zap.Error(err))
// } else {
// //log.Println("type:", t)
// //log.Println("timestamp:", timestamp)
// //log.Println("payload:", payload)
// }
// 对FLV数据集进行掩码处理
// maskedFLVData := make([]byte, len(data))
// for i, b := range data {
// maskedFLVData[i] = b ^ mask[i%4]
// }
//{"error": "frames from client to server must be masked"}
//需要注意websocket 客户端主动向服务器发送二进制消息需要添加掩码否则服务器接收数据会提示错误frames from client to server must be masked
//发送ws.Header 掩码信息后我局域网测试可以骗过服务器实际上没有对flv 进行掩码,如果测试不行则打开前面注释,对数据进行掩码处理
//
if err := ws.WriteHeader(pusher, ws.Header{
Fin: true,
OpCode: ws.OpBinary,
Masked: true,
Mask: mask, //[4]byte{byte(rand.Intn(256)), byte(rand.Intn(256)), byte(rand.Intn(256)), byte(rand.Intn(256))},
Length: int64(len(data)),
}); err != nil {
pusher.OnConnErr(zap.Error(err))
return
}
if err := wsutil.WriteClientBinary(pusher, data); err != nil {
pusher.OnConnErr(zap.Error(err))
}
}
func (pusher *WscPusher) OnEvent(event any) {
switch v := event.(type) {
case ISubscriber:
case FLVFrame:
pusher.WriteFLVTag(v)
default:
pusher.Subscriber.OnEvent(event)
}
}

241
wssRecever.go Normal file
View File

@@ -0,0 +1,241 @@
package erwscascade
import (
"errors"
"net"
"net/http"
"net/url"
"strings"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/util"
)
/**
Superior(上级平台) ws 接收推送 flv 视频流,发布为自己的流
streamPath: original-cid //原始流path 拼接cid
**/
type WssRecever struct {
Publisher
Cid string // 与配置文件中cid 不同
Conn *net.Conn
Status int
absTS uint32 //绝对时间戳
buf util.Buffer
pool util.BytesPool
Cc *ErWsCascadeConfig
}
func NewWssRecever(cc *ErWsCascadeConfig, cid string, conn *net.Conn) *WssRecever {
return &WssRecever{
Cc: cc,
Cid: cid,
Conn: conn,
buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
pool: make(util.BytesPool, 17),
}
}
// 统一处理读写错误
func (recever *WssRecever) OnConnErr(reason ...zapcore.Field) {
ErWsCascadePlugin.Error("WssRecever OnConnErr", reason[0])
recever.Status = 0
//停止发布
recever.Stop(reason[0])
}
func (recever *WssRecever) ParseFLVTag(b []byte) (t byte, timestamp uint32, payload []byte, err error) {
if len(b) < 11 {
return 0, 0, nil, errors.New("FLV Tag data is too short")
}
t = b[0] // Tag类型
dataSize := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) // 数据部分大小
timestamp = uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) // 时间戳
timestamp |= uint32(b[7]) << 24 // 扩展时间戳
//streamID := uint32(b[8])<<16 | uint32(b[9])<<8 | uint32(b[10]) // 流ID
payload = b[11 : 11+dataSize] // 数据payload
//log.Printf("dataSize:%v ,buf len:%v", dataSize+1, len(b))
return t, timestamp, payload, nil
}
func (recever *WssRecever) ReadFLVTag() {
var startTs uint32
offsetTs := recever.absTS
for {
//读取掩码头部信息
_, err := ws.ReadHeader(*recever.Conn)
if err != nil {
// 处理读取头部信息错误
recever.OnConnErr(zap.Error(err))
break
}
//log.Println(header)
//{"error": "frames from client to server must be masked"}
data, err := wsutil.ReadClientBinary(*recever.Conn)
if err != nil {
//延迟10秒
recever.OnConnErr(zap.Error(err))
break
}
// 解码数据
// mask := header.Mask
// for i, b := range data {
// data[i] = b ^ mask[i%4]
// }
t, timestamp, payload, err := recever.ParseFLVTag(data)
if err != nil {
recever.OnConnErr(zap.Error(err))
}
if startTs == 0 {
startTs = timestamp
}
recever.absTS = offsetTs + (timestamp - startTs)
var frame util.BLL
mem := recever.pool.Get(int(len(payload)))
frame.Push(mem)
//mem.Value = payload
copy(mem.Value, payload)
//log.Printf("type:%v, absTS:%v timestamp:%v\n", t, recever.absTS, timestamp)
switch t {
case codec.FLV_TAG_TYPE_AUDIO:
recever.WriteAVCCAudio(recever.absTS, &frame, recever.pool)
case codec.FLV_TAG_TYPE_VIDEO:
recever.WriteAVCCVideo(recever.absTS, &frame, recever.pool)
case codec.FLV_TAG_TYPE_SCRIPT:
recever.Info("script", zap.ByteString("data", payload))
//frame.Recycle()
}
}
}
/*
推送flv 数据
上级平台推送接口
ws://127.0.0.1:8450/erwscascade/wspush/on
*/
func (p *ErWsCascadeConfig) Wspush_(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") != "websocket" {
return
}
//customHeader := r.Header.Get("X-Custom-Header")
// 获取完整的 URL
fullURL := r.URL.String()
//判断URL编码特殊字符%2F,%2B,%3F,%25包含这些则自动转码?= 等
ErWsCascadePlugin.Info("Full URL:" + fullURL)
// 获取 cid 参数
//cid := r.URL.Query().Get("cid")
// 解码 URL
decodedURL, err := url.QueryUnescape(fullURL)
if err != nil {
ErWsCascadePlugin.Error("Error decodedURL", zap.Error(err))
return
}
// 手动解析解码后的 URL 中的参数
var queryString string
if strings.Contains(decodedURL, "?") {
queryString = strings.Split(decodedURL, "?")[1]
}
queryParams, err := url.ParseQuery(queryString)
if err != nil {
ErWsCascadePlugin.Error("parse query string", zap.Error(err))
return
}
cid := queryParams.Get("cid")
ErWsCascadePlugin.Info("wspush CID:" + cid)
// 配置WebSocket服务器选项
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
ErWsCascadePlugin.Error("UpgradeHTTP error", zap.Error(err))
return
}
if cid == "" {
ErWsCascadePlugin.Error("wspush", zap.Error(errors.New("invalid cid refuse connect")))
conn.Close()
return
}
streamPath := queryParams.Get("streamPath")
if streamPath == "" {
ErWsCascadePlugin.Error("wspush", zap.Error(errors.New("invalid streamPath refuse connect")))
conn.Close()
return
}
//生成客户推送的streamPath 修改添加 cid
newStreamPath := streamPath + "-" + cid
//read flv head
head, err := wsutil.ReadClientBinary(conn)
if err != nil {
ErWsCascadePlugin.Error("wspush read flv head faild", zap.Error(err))
conn.Close()
return
}
if head[0] != 'F' || head[1] != 'L' || head[2] != 'V' {
ErWsCascadePlugin.Error("wspush", zap.Error(errors.New("invalid flv head faild")))
conn.Close()
return
}
configCopy := p.GetPublishConfig()
configCopy.PubAudio = head[4]&0x04 != 0
configCopy.PubVideo = head[4]&0x01 != 0
//读取校验自定义脚本
sc, err := wsutil.ReadClientBinary(conn)
if err != nil {
ErWsCascadePlugin.Error("wspush", zap.Error(errors.New("read FLV_TAG_TYPE_SCRIPT faild")))
conn.Close()
return
}
if sc[0] != codec.FLV_TAG_TYPE_SCRIPT {
ErWsCascadePlugin.Error("wspush", zap.Error(errors.New("parse FLV_TAG_TYPE_SCRIPT faild")))
conn.Close()
return
}
wssRecever := NewWssRecever(p, cid, &conn)
wssRecever.Config = &configCopy
s := Streams.Get(newStreamPath)
if s == nil || s.Publisher == nil {
if err := ErWsCascadePlugin.Publish(newStreamPath, wssRecever); err != nil {
//p.Stream.Tracks =
ErWsCascadePlugin.Error("wspush", zap.Error(err))
conn.Close()
return
}
puber := wssRecever.GetPublisher()
// 老流中的音视频轨道不可再使用
puber.AudioTrack = nil
puber.VideoTrack = nil
//log.Println("Wspush publish tm:", wssRecever.Publisher.Stream.PublishTimeout)
}
//阻塞读取数据
wssRecever.ReadFLVTag()
}