feat: add batch v2 to webrtc

This commit is contained in:
langhuihui
2025-04-17 20:36:05 +08:00
committed by dexter
parent 5f4815c7ed
commit d25f85f9a3
11 changed files with 2672 additions and 214 deletions

View File

@@ -1,14 +1,12 @@
package plugin_webrtc
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
. "github.com/pion/webrtc/v4"
"m7s.live/v5/pkg/task"
. "m7s.live/v5/plugin/webrtc/pkg"
)
@@ -37,10 +35,10 @@ func (conf *WebRTCPlugin) servePush(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
conn := Connection{
conn := MultipleConnection{
PLI: conf.PLI,
SDP: string(bytes),
}
conn.SDP = string(bytes)
conn.Logger = conf.Logger
if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
@@ -75,7 +73,7 @@ func (conf *WebRTCPlugin) servePlay(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
streamPath := r.PathValue("streamPath")
rawQuery := r.URL.RawQuery
var conn Connection
var conn MultipleConnection
conn.EnableDC = conf.EnableDC
bytes, err := io.ReadAll(r.Body)
defer func() {
@@ -114,140 +112,3 @@ func (conf *WebRTCPlugin) servePlay(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
// Batch 通过单个 PeerConnection 实现多个流的推拉
func (conf *WebRTCPlugin) Batch(w http.ResponseWriter, r *http.Request) {
conn := NewSingleConnection()
conn.EnableDC = true // Enable DataChannel for signaling
conn.Logger = conf.Logger
bytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
conn.SDP = string(bytes)
if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Create data channel for signaling
conn.PeerConnection.OnDataChannel(func(dataChannel *DataChannel) {
conf.Debug("data channel created", "label", dataChannel.Label())
dataChannel.OnMessage(func(msg DataChannelMessage) {
conf.Debug("received data channel message", "length", len(msg.Data), "is_string", msg.IsString)
var signal Signal
if err := json.Unmarshal(msg.Data, &signal); err != nil {
conf.Error("failed to unmarshal signal", "error", err)
return
}
conf.Debug("signal received", "type", signal.Type, "stream_path", signal.StreamPath)
switch signal.Type {
case SignalTypePublish:
if publisher, err := conf.Publish(conf.Context, signal.StreamPath); err == nil {
conn.Publisher = publisher
conn.Publisher.RemoteAddr = r.RemoteAddr
conn.Receive()
// Renegotiate SDP after successful publish
if answer, err := conn.GetAnswer(); err == nil {
answerSignal := NewAnswerSingal(answer.SDP)
conf.Debug("sending answer signal", "stream_path", signal.StreamPath)
dataChannel.SendText(answerSignal)
} else {
errSignal := NewErrorSignal(err.Error(), signal.StreamPath)
conf.Debug("sending error signal", "error", err.Error(), "stream_path", signal.StreamPath)
dataChannel.SendText(errSignal)
}
} else {
errSignal := NewErrorSignal(err.Error(), signal.StreamPath)
conf.Debug("sending error signal", "error", err.Error(), "stream_path", signal.StreamPath)
dataChannel.SendText(errSignal)
}
case SignalTypeSubscribe:
if err := conn.SetRemoteDescription(SessionDescription{
Type: SDPTypeOffer,
SDP: signal.Offer,
}); err != nil {
errSignal := NewErrorSignal("Failed to set remote description: "+err.Error(), "")
conf.Debug("sending error signal", "error", err.Error())
dataChannel.SendText(errSignal)
return
}
// First remove subscribers that are not in the new list
for streamPath := range conn.Subscribers {
found := false
for _, newPath := range signal.StreamList {
if streamPath == newPath {
found = true
break
}
}
if !found {
conn.RemoveSubscriber(streamPath)
}
}
// Then add new subscribers
for _, streamPath := range signal.StreamList {
// Skip if already subscribed
if conn.HasSubscriber(streamPath) {
continue
}
if subscriber, err := conf.Subscribe(conf.Context, streamPath); err == nil {
subscriber.RemoteAddr = r.RemoteAddr
conn.AddSubscriber(streamPath, subscriber)
} else {
errSignal := NewErrorSignal(err.Error(), streamPath)
conf.Debug("sending error signal", "error", err.Error(), "stream_path", streamPath)
dataChannel.SendText(errSignal)
}
}
case SignalTypeUnpublish:
// Handle stream removal
if conn.Publisher != nil && conn.Publisher.StreamPath == signal.StreamPath {
conn.Publisher.Stop(task.ErrStopByUser)
conn.Publisher = nil
// Renegotiate SDP after unpublish
if answer, err := conn.GetAnswer(); err == nil {
answerSignal := NewAnswerSingal(answer.SDP)
conf.Debug("sending answer signal", "stream_path", signal.StreamPath)
dataChannel.SendText(answerSignal)
} else {
errSignal := NewErrorSignal(err.Error(), signal.StreamPath)
conf.Debug("sending error signal", "error", err.Error(), "stream_path", signal.StreamPath)
dataChannel.SendText(errSignal)
}
}
case SignalTypeAnswer:
// Handle received answer from browser
if err := conn.SetRemoteDescription(SessionDescription{
Type: SDPTypeAnswer,
SDP: signal.Answer,
}); err != nil {
errSignal := NewErrorSignal("Failed to set remote description: "+err.Error(), "")
conf.Debug("sending error signal", "error", err.Error())
dataChannel.SendText(errSignal)
}
}
})
})
conf.AddTask(conn)
if err = conn.WaitStarted(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err = conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if answer, err := conn.GetAnswer(); err == nil {
w.Header().Set("Content-Type", "application/sdp")
w.Write([]byte(answer.SDP))
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}

376
plugin/webrtc/batchv2.go Normal file
View File

@@ -0,0 +1,376 @@
package plugin_webrtc
import (
"encoding/json"
"net"
"net/http"
"strings"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
. "github.com/pion/webrtc/v4"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/task"
. "m7s.live/v5/plugin/webrtc/pkg"
)
// BatchV2 通过WebSocket方式实现单PeerConnection传输多个流的功能
func (conf *WebRTCPlugin) BatchV2(w http.ResponseWriter, r *http.Request) {
// 检查是否是WebSocket请求
if r.Header.Get("Upgrade") != "websocket" {
http.Error(w, "WebSocket protocol required", http.StatusBadRequest)
return
}
// 升级HTTP连接为WebSocket连接
wsConn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
conf.Error("failed to upgrade to WebSocket", "error", err)
http.Error(w, "Failed to upgrade to WebSocket: "+err.Error(), http.StatusInternalServerError)
return
}
// 创建一个WebSocket处理器
wsHandler := &WebSocketHandler{
conn: wsConn,
config: conf,
}
// 创建PeerConnection并设置高级配置
if wsHandler.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
// 本地测试不需要配置 ICE 服务器
ICETransportPolicy: ICETransportPolicyAll,
BundlePolicy: BundlePolicyMaxBundle,
RTCPMuxPolicy: RTCPMuxPolicyRequire,
ICECandidatePoolSize: 1,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 添加任务
conf.AddTask(wsHandler).WaitStopped()
}
// WebSocketHandler 处理WebSocket连接和信令
type WebSocketHandler struct {
SingleConnection
conn net.Conn
config *WebRTCPlugin
}
// Go 处理WebSocket消息
func (wsh *WebSocketHandler) Go() (err error) {
var msg []byte
// 等待初始SDP offer
msg, err = wsutil.ReadClientText(wsh.conn)
if err != nil {
return err
}
// 解析初始SDP offer
var initialSignal struct {
Type string `json:"type"`
SDP string `json:"sdp"`
}
if err = json.Unmarshal(msg, &initialSignal); err != nil {
return err
}
if initialSignal.Type != "offer" {
return wsh.sendError("Initial message must be an SDP offer")
}
// 设置远程描述
wsh.SDP = initialSignal.SDP
// 验证SDP是否包含ICE ufrag
if !wsh.validateSDP(initialSignal.SDP) {
return wsh.sendError("Invalid SDP: missing ICE credentials")
}
// 设置远程描述
if err = wsh.SetRemoteDescription(SessionDescription{
Type: SDPTypeOffer,
SDP: initialSignal.SDP,
}); err != nil {
wsh.Error("Failed to set remote description", "error", err)
return err
}
// 创建并发送应答
if answer, err := wsh.GetAnswer(); err == nil {
wsh.sendAnswer(answer.SDP)
} else {
return err
}
wsh.Info("WebSocket connection established")
for {
msg, err := wsutil.ReadClientText(wsh.conn)
if err != nil {
wsh.Error("WebSocket read error", "error", err)
return err
}
var signal Signal
if err := json.Unmarshal(msg, &signal); err != nil {
wsh.Error("Failed to unmarshal signal", "error", err)
wsh.sendError("Invalid signal format: " + err.Error())
continue
}
wsh.Debug("Signal received", "type", signal.Type, "stream_path", signal.StreamPath)
switch signal.Type {
case SignalTypePublish:
wsh.handlePublish(signal)
case SignalTypeSubscribe:
wsh.handleSubscribe(signal)
case SignalTypeUnsubscribe:
wsh.handleUnsubscribe(signal)
case SignalTypeUnpublish:
wsh.handleUnpublish(signal)
case SignalTypeAnswer:
wsh.handleAnswer(signal)
case SignalTypeGetStreamList:
wsh.handleGetStreamList()
default:
wsh.sendError("Unknown signal type: " + string(signal.Type))
}
}
}
// Dispose 清理资源
func (wsh *WebSocketHandler) Dispose() {
wsh.PeerConnection.Close()
wsh.conn.Close()
}
// sendJSON 发送JSON消息
func (wsh *WebSocketHandler) sendJSON(data interface{}) error {
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
return wsutil.WriteServerText(wsh.conn, jsonData)
}
// sendAnswer 发送SDP应答
func (wsh *WebSocketHandler) sendAnswer(sdp string) error {
return wsh.sendJSON(SignalSDP{
Type: "answer",
SDP: sdp,
})
}
// sendError 发送错误消息
func (wsh *WebSocketHandler) sendError(message string) error {
return wsh.sendJSON(SignalError{
Type: "error",
Message: message,
})
}
// handlePublish 处理发布信号
func (wsh *WebSocketHandler) handlePublish(signal Signal) {
if publisher, err := wsh.config.Publish(wsh.config.Context, signal.StreamPath); err == nil {
wsh.Publisher = publisher
wsh.Receive()
// 重新协商SDP
if answer, err := wsh.GetAnswer(); err == nil {
wsh.sendAnswer(answer.SDP)
} else {
wsh.sendError(err.Error())
}
} else {
wsh.sendError(err.Error())
}
}
// handleSubscribe 处理订阅信号
func (wsh *WebSocketHandler) handleSubscribe(signal Signal) {
// 验证SDP是否包含ICE ufrag
if !wsh.validateSDP(signal.Offer) {
wsh.sendError("Invalid SDP: missing ICE credentials")
return
}
wsh.Debug("Received subscribe request", "streams", signal.StreamList)
// 设置远程描述
if err := wsh.SetRemoteDescription(SessionDescription{
Type: SDPTypeOffer,
SDP: signal.Offer,
}); err != nil {
wsh.sendError("Failed to set remote description: " + err.Error())
return
}
// 只添加新的订阅不处理移除操作移除操作由unsubscribe信号处理
for _, streamPath := range signal.StreamList {
// 跳过已订阅的流
if wsh.HasSubscriber(streamPath) {
continue
}
conf := wsh.config.GetCommonConf().Subscribe
// Disable audio as it's not needed in batchv2
conf.SubAudio = false
if subscriber, err := wsh.config.SubscribeWithConfig(wsh.config.Context, streamPath, conf); err == nil {
subscriber.RemoteAddr = wsh.RemoteAddr()
wsh.AddSubscriber(subscriber).WaitStarted()
wsh.Info("Subscribed to new stream", "stream", streamPath)
} else {
wsh.sendError(err.Error())
}
}
// 发送应答
if answer, err := wsh.GetAnswer(); err == nil {
wsh.Info("Created answer for subscribe request", "streams", signal.StreamList)
// 记录应答SDP中的编解码器信息
if strings.Contains(answer.SDP, "H264") {
wsh.Debug("Answer contains H264 codec")
// 提取profile-level-id和sprop-parameter-sets
if strings.Contains(answer.SDP, "profile-level-id=") {
wsh.Debug("Answer contains profile-level-id")
}
if strings.Contains(answer.SDP, "sprop-parameter-sets=") {
wsh.Debug("Answer contains sprop-parameter-sets")
}
}
wsh.sendAnswer(answer.SDP)
} else {
wsh.Error("Failed to create answer", "error", err)
wsh.sendError("Failed to create answer: " + err.Error())
}
}
// handleUnsubscribe 处理取消订阅信号
func (wsh *WebSocketHandler) handleUnsubscribe(signal Signal) {
// 验证SDP是否包含ICE ufrag
if !wsh.validateSDP(signal.Offer) {
wsh.sendError("Invalid SDP: missing ICE credentials")
return
}
wsh.Debug("Received unsubscribe request", "streams", signal.StreamList)
// 设置远程描述
if err := wsh.SetRemoteDescription(SessionDescription{
Type: SDPTypeOffer,
SDP: signal.Offer,
}); err != nil {
wsh.sendError("Failed to set remote description: " + err.Error())
return
}
// 移除指定的订阅
for _, streamPath := range signal.StreamList {
if wsh.HasSubscriber(streamPath) {
// 获取RemoteStream对象
if remoteStream, ok := wsh.Get(streamPath); ok {
wsh.RemoveSubscriber(remoteStream)
wsh.Info("Unsubscribed from stream", "stream", streamPath)
}
}
}
// 发送应答
if answer, err := wsh.GetAnswer(); err == nil {
wsh.Info("Created answer for unsubscribe request", "streams", signal.StreamList)
wsh.sendAnswer(answer.SDP)
} else {
wsh.Error("Failed to create answer", "error", err)
wsh.sendError("Failed to create answer: " + err.Error())
}
}
// handleUnpublish 处理取消发布信号
func (wsh *WebSocketHandler) handleUnpublish(signal Signal) {
if wsh.Publisher != nil && wsh.Publisher.StreamPath == signal.StreamPath {
wsh.Publisher.Stop(task.ErrStopByUser)
wsh.Publisher = nil
// 重新协商SDP
if answer, err := wsh.GetAnswer(); err == nil {
wsh.sendAnswer(answer.SDP)
} else {
wsh.sendError(err.Error())
}
} else {
wsh.sendError("Not publishing this stream")
}
}
// handleAnswer 处理应答信号
func (wsh *WebSocketHandler) handleAnswer(signal Signal) {
// 验证SDP是否包含ICE ufrag
if !wsh.validateSDP(signal.Answer) {
wsh.sendError("Invalid SDP: missing ICE credentials")
return
}
if err := wsh.SetRemoteDescription(SessionDescription{
Type: SDPTypeAnswer,
SDP: signal.Answer,
}); err != nil {
wsh.sendError("Failed to set remote description: " + err.Error())
}
}
// RemoteAddr 获取远程地址
func (wsh *WebSocketHandler) RemoteAddr() string {
if wsh.conn != nil {
return wsh.conn.RemoteAddr().String()
}
return ""
}
// validateSDP 验证SDP是否包含必要的ICE凭证
func (wsh *WebSocketHandler) validateSDP(sdp string) bool {
// 检查SDP是否为空
if sdp == "" {
return false
}
// 检查SDP是否包含ICE ufrag或pwd
hasUfrag := strings.Contains(sdp, "a=ice-ufrag:")
hasPwd := strings.Contains(sdp, "a=ice-pwd:")
// 在开发环境中,我们可以放宽要求,只要有一个就可以
return hasUfrag || hasPwd
}
// handleGetStreamList 处理获取流列表信号
func (wsh *WebSocketHandler) handleGetStreamList() {
// 获取所有可用的流列表
var streams []StreamInfo
// 遍历所有流检查是否有H.264视频编码
for publisher := range wsh.config.Server.Streams.SafeRange {
// 检查是否有视频轨道
if publisher.HasVideoTrack() {
// 获取视频编解码器上下文
ctx := publisher.GetVideoCodecCtx()
if ctx != nil {
switch ctx := ctx.GetBase().(type) {
case *codec.H264Ctx:
// 获取视频信息
streams = append(streams, StreamInfo{
Path: publisher.StreamPath,
Codec: "H264",
Width: uint32(ctx.Width()),
Height: uint32(ctx.Height()),
Fps: uint32(publisher.VideoTrack.FPS),
})
}
}
}
}
// 发送流列表响应
wsh.sendJSON(StreamListResponse{
Type: "streamList",
Streams: streams,
})
}

View File

@@ -169,6 +169,26 @@ func (p *WebRTCPlugin) testPage(w http.ResponseWriter, r *http.Request) {
name = "web/push.html"
case "pull":
name = "web/pull.html"
case "batchv2":
name = "web/batchv2.html"
default:
name = "web/" + name
}
// Set appropriate MIME type based on file extension
if strings.HasSuffix(name, ".html") {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
} else if strings.HasSuffix(name, ".js") {
w.Header().Set("Content-Type", "application/javascript")
// } else if strings.HasSuffix(name, ".css") {
// w.Header().Set("Content-Type", "text/css")
// } else if strings.HasSuffix(name, ".json") {
// w.Header().Set("Content-Type", "application/json")
// } else if strings.HasSuffix(name, ".png") {
// w.Header().Set("Content-Type", "image/png")
// } else if strings.HasSuffix(name, ".jpg") || strings.HasSuffix(name, ".jpeg") {
// w.Header().Set("Content-Type", "image/jpeg")
// } else if strings.HasSuffix(name, ".svg") {
// w.Header().Set("Content-Type", "image/svg+xml")
}
f, err := web.Open(name)
if err != nil {

View File

@@ -7,10 +7,12 @@ import (
type SignalType string
const (
SignalTypeSubscribe SignalType = "subscribe"
SignalTypePublish SignalType = "publish"
SignalTypeUnpublish SignalType = "unpublish"
SignalTypeAnswer SignalType = "answer"
SignalTypeSubscribe SignalType = "subscribe"
SignalTypeUnsubscribe SignalType = "unsubscribe"
SignalTypePublish SignalType = "publish"
SignalTypeUnpublish SignalType = "unpublish"
SignalTypeAnswer SignalType = "answer"
SignalTypeGetStreamList SignalType = "getStreamList"
)
type Signal struct {
@@ -55,6 +57,19 @@ type SignalError struct {
StreamPath string `json:"streamPath,omitempty"`
}
type StreamInfo struct {
Path string `json:"path"`
Codec string `json:"codec"`
Width uint32 `json:"width"`
Height uint32 `json:"height"`
Fps uint32 `json:"fps"`
}
type StreamListResponse struct {
Type string `json:"type"`
Streams []StreamInfo `json:"streams"`
}
func NewErrorSignal(message string, streamPath string) string {
s := SignalError{
Type: "error",
@@ -64,3 +79,12 @@ func NewErrorSignal(message string, streamPath string) string {
b, _ := json.Marshal(s)
return string(b)
}
func NewStreamListResponse(streams []StreamInfo) string {
s := StreamListResponse{
Type: "streamList",
Streams: streams,
}
b, _ := json.Marshal(s)
return string(b)
}

View File

@@ -14,7 +14,7 @@ type PullRequest struct {
}
type Client struct {
Connection
MultipleConnection
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string

View File

@@ -14,7 +14,7 @@ import (
type (
CFClient struct {
Connection
MultipleConnection
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string

View File

@@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"net"
"regexp"
"strings"
"time"
"github.com/pion/rtcp"
@@ -19,40 +21,9 @@ import (
)
type Connection struct {
task.Task
*PeerConnection
SDP string
// LocalSDP *sdp.SessionDescription
Publisher *m7s.Publisher
Subscriber *m7s.Subscriber
EnableDC bool
PLI time.Duration
}
func (IO *Connection) Start() (err error) {
if IO.Publisher != nil {
IO.Depend(IO.Publisher)
IO.Receive()
}
if IO.Subscriber != nil {
IO.Depend(IO.Subscriber)
IO.Send()
}
IO.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
IO.Info(ice.ToJSON().Candidate)
}
})
IO.OnConnectionStateChange(func(state PeerConnectionState) {
IO.Info("Connection State has changed:" + state.String())
switch state {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
IO.Stop(errors.New("connection state:" + state.String()))
}
})
return
Publisher *m7s.Publisher
SDP string
}
func (IO *Connection) GetOffer() (*SessionDescription, error) {
@@ -81,7 +52,50 @@ func (IO *Connection) GetAnswer() (*SessionDescription, error) {
return IO.LocalDescription(), nil
}
func (IO *Connection) Receive() {
type MultipleConnection struct {
task.Task
Connection
// LocalSDP *sdp.SessionDescription
Subscriber *m7s.Subscriber
EnableDC bool
PLI time.Duration
}
func (IO *MultipleConnection) Start() (err error) {
if IO.Publisher != nil {
IO.Depend(IO.Publisher)
IO.Receive()
}
if IO.Subscriber != nil {
IO.Depend(IO.Subscriber)
IO.Send()
}
IO.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
IO.Info(ice.ToJSON().Candidate)
}
})
// 监听ICE连接状态变化
IO.OnICEConnectionStateChange(func(state ICEConnectionState) {
IO.Debug("ICE connection state changed", "state", state.String())
if state == ICEConnectionStateFailed {
IO.Error("ICE connection failed")
}
})
IO.OnConnectionStateChange(func(state PeerConnectionState) {
IO.Info("Connection State has changed:" + state.String())
switch state {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
IO.Stop(errors.New("connection state:" + state.String()))
}
})
return
}
func (IO *MultipleConnection) Receive() {
puber := IO.Publisher
IO.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
IO.Info("OnTrack", "kind", track.Kind().String(), "payloadType", uint8(track.Codec().PayloadType))
@@ -190,18 +204,216 @@ func (IO *Connection) Receive() {
})
}
func (IO *Connection) SendSubscriber(subscriber *m7s.Subscriber) (err error) {
var useDC bool
var audioTLSRTP, videoTLSRTP *TrackLocalStaticRTP
var audioSender, videoSender *RTPSender
vctx, actx := subscriber.Publisher.GetVideoCodecCtx(), subscriber.Publisher.GetAudioCodecCtx()
if IO.EnableDC && vctx != nil && vctx.FourCC() == codec.FourCC_H265 {
useDC = true
}
if IO.EnableDC && actx != nil && actx.FourCC() == codec.FourCC_MP4A {
useDC = true
// H264CodecParams represents the parameters for an H.264 codec
type H264CodecParams struct {
ProfileLevelID string
PacketizationMode string
LevelAsymmetryAllowed string
SpropParameterSets string
OtherParams map[string]string
}
// parseH264Params parses H.264 codec parameters from an fmtp line
func parseH264Params(fmtpLine string) H264CodecParams {
params := H264CodecParams{
OtherParams: make(map[string]string),
}
// Split the fmtp line into key-value pairs
kvPairs := strings.Split(fmtpLine, ";")
for _, kv := range kvPairs {
kv = strings.TrimSpace(kv)
if kv == "" {
continue
}
parts := strings.SplitN(kv, "=", 2)
key := strings.TrimSpace(parts[0])
var value string
if len(parts) > 1 {
value = strings.TrimSpace(parts[1])
}
switch key {
case "profile-level-id":
params.ProfileLevelID = value
case "packetization-mode":
params.PacketizationMode = value
case "level-asymmetry-allowed":
params.LevelAsymmetryAllowed = value
case "sprop-parameter-sets":
params.SpropParameterSets = value
default:
params.OtherParams[key] = value
}
}
return params
}
// extractH264CodecParams extracts all H.264 codec parameters from an SDP
func extractH264CodecParams(sdp string) []H264CodecParams {
var result []H264CodecParams
// Find all fmtp lines for H.264 codecs
// First, find all a=rtpmap lines for H.264
rtpmapRegex := regexp.MustCompile(`a=rtpmap:(\d+) H264/\d+`)
rtpmapMatches := rtpmapRegex.FindAllStringSubmatch(sdp, -1)
for _, rtpmapMatch := range rtpmapMatches {
if len(rtpmapMatch) < 2 {
continue
}
// Get the payload type
payloadType := rtpmapMatch[1]
// Find the corresponding fmtp line
fmtpRegex := regexp.MustCompile(`a=fmtp:` + payloadType + ` ([^\r\n]+)`)
fmtpMatch := fmtpRegex.FindStringSubmatch(sdp)
if len(fmtpMatch) >= 2 {
// Parse the fmtp line
params := parseH264Params(fmtpMatch[1])
result = append(result, params)
}
}
return result
}
// findClosestProfileLevelID finds the closest matching profile-level-id
func findClosestProfileLevelID(availableIDs []string, currentID string) string {
// If current ID is empty, return the first available one
if currentID == "" && len(availableIDs) > 0 {
return availableIDs[0]
}
// If current ID is in the available ones, use it
for _, id := range availableIDs {
if strings.EqualFold(id, currentID) {
return currentID
}
}
// Try to match the profile part (first two characters)
if len(currentID) >= 2 {
currentProfile := currentID[:2]
for _, id := range availableIDs {
if len(id) >= 2 && strings.EqualFold(id[:2], currentProfile) {
return id
}
}
}
// If no match found, return the first available one
if len(availableIDs) > 0 {
return availableIDs[0]
}
// Fallback to the current one
return currentID
}
// findBestMatchingH264Codec finds the best matching H.264 codec configuration
func findBestMatchingH264Codec(sdp string, currentFmtpLine string) string {
// If no SDP or no current fmtp line, return the current one
if sdp == "" || currentFmtpLine == "" {
return currentFmtpLine
}
// Parse current parameters
currentParams := parseH264Params(currentFmtpLine)
// Extract all H.264 codec parameters from the SDP
availableParams := extractH264CodecParams(sdp)
// If no available parameters found, return the current one
if len(availableParams) == 0 {
return currentFmtpLine
}
// Extract all available profile-level-ids
var availableProfileLevelIDs []string
var packetizationModeMap = make(map[string]string)
for _, params := range availableParams {
if params.ProfileLevelID != "" {
availableProfileLevelIDs = append(availableProfileLevelIDs, params.ProfileLevelID)
// Store packetization mode for each profile-level-id
if params.PacketizationMode != "" {
packetizationModeMap[params.ProfileLevelID] = params.PacketizationMode
}
}
}
// Find the closest matching profile-level-id
closestProfileLevelID := findClosestProfileLevelID(availableProfileLevelIDs, currentParams.ProfileLevelID)
// Create result parameters
resultParams := H264CodecParams{
ProfileLevelID: closestProfileLevelID,
SpropParameterSets: currentParams.SpropParameterSets, // Always use original sprop-parameter-sets
LevelAsymmetryAllowed: "1", // Default to 1
}
// Use matching packetization mode if available
if mode, ok := packetizationModeMap[closestProfileLevelID]; ok {
resultParams.PacketizationMode = mode
} else if currentParams.PacketizationMode != "" {
resultParams.PacketizationMode = currentParams.PacketizationMode
} else {
resultParams.PacketizationMode = "1" // Default to 1
}
// Build and return the fmtp line
return buildFmtpLine(resultParams)
}
// buildFmtpLine builds an fmtp line from H.264 codec parameters
func buildFmtpLine(params H264CodecParams) string {
var parts []string
// Add profile-level-id if present
if params.ProfileLevelID != "" {
parts = append(parts, "profile-level-id="+params.ProfileLevelID)
}
// Add packetization-mode if present
if params.PacketizationMode != "" {
parts = append(parts, "packetization-mode="+params.PacketizationMode)
}
// Add level-asymmetry-allowed if present
if params.LevelAsymmetryAllowed != "" {
parts = append(parts, "level-asymmetry-allowed="+params.LevelAsymmetryAllowed)
}
// Add sprop-parameter-sets if present
if params.SpropParameterSets != "" {
parts = append(parts, "sprop-parameter-sets="+params.SpropParameterSets)
}
// Add other parameters
for k, v := range params.OtherParams {
parts = append(parts, k+"="+v)
}
return strings.Join(parts, ";")
}
func (IO *MultipleConnection) SendSubscriber(subscriber *m7s.Subscriber) (audioSender, videoSender *RTPSender, err error) {
var useDC bool
var audioTLSRTP, videoTLSRTP *TrackLocalStaticRTP
vctx, actx := subscriber.Publisher.GetVideoCodecCtx(), subscriber.Publisher.GetAudioCodecCtx()
if IO.EnableDC {
if IO.EnableDC && vctx != nil && vctx.FourCC() == codec.FourCC_H265 {
useDC = true
}
if IO.EnableDC && actx != nil && actx.FourCC() == codec.FourCC_MP4A {
useDC = true
}
}
if vctx != nil && !useDC {
videoCodec := vctx.FourCC()
var rcc RTPCodecParameters
@@ -217,6 +429,20 @@ func (IO *Connection) SendSubscriber(subscriber *m7s.Subscriber) (err error) {
return
}
}
// // For H.264, adjust codec parameters based on SDP
// if rcc.MimeType == MimeTypeH264 && IO.SDP != "" {
// // Find best matching codec configuration
// originalFmtpLine := rcc.SDPFmtpLine
// bestMatchingFmtpLine := findBestMatchingH264Codec(IO.SDP, rcc.SDPFmtpLine)
// // Update the codec parameters if a better match was found
// if bestMatchingFmtpLine != originalFmtpLine {
// rcc.SDPFmtpLine = bestMatchingFmtpLine
// IO.Info("Adjusted H.264 codec parameters", "from", originalFmtpLine, "to", bestMatchingFmtpLine)
// }
// }
videoTLSRTP, err = NewTrackLocalStaticRTP(rcc.RTPCodecCapability, videoCodec.String(), subscriber.StreamPath)
if err != nil {
return
@@ -324,49 +550,148 @@ func (IO *Connection) SendSubscriber(subscriber *m7s.Subscriber) (err error) {
return
}
func (IO *Connection) Send() (err error) {
func (IO *MultipleConnection) Send() (err error) {
if IO.Subscriber != nil {
err = IO.SendSubscriber(IO.Subscriber)
_, _, err = IO.SendSubscriber(IO.Subscriber)
}
return
}
func (IO *Connection) Dispose() {
func (IO *MultipleConnection) Dispose() {
IO.PeerConnection.Close()
}
type RemoteStream struct {
task.Task
pc *Connection
suber *m7s.Subscriber
videoTLSRTP *TrackLocalStaticRTP
videoSender *RTPSender
}
func (r *RemoteStream) GetKey() string {
return r.suber.StreamPath
}
func (r *RemoteStream) Start() (err error) {
vctx := r.suber.Publisher.GetVideoCodecCtx()
videoCodec := vctx.FourCC()
var rcc RTPCodecParameters
if ctx, ok := vctx.(mrtp.IRTPCtx); ok {
rcc = ctx.GetRTPCodecParameter()
} else {
var rtpCtx mrtp.RTPData
var tmpAVTrack AVTrack
tmpAVTrack.ICodecCtx, _, err = rtpCtx.ConvertCtx(vctx)
if err == nil {
rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
} else {
return
}
}
// // For H.264, adjust codec parameters based on SDP
// if rcc.MimeType == MimeTypeH264 && r.pc.SDP != "" {
// // Find best matching codec configuration
// originalFmtpLine := rcc.SDPFmtpLine
// bestMatchingFmtpLine := findBestMatchingH264Codec(r.pc.SDP, rcc.SDPFmtpLine)
// // Update the codec parameters if a better match was found
// if bestMatchingFmtpLine != originalFmtpLine {
// rcc.SDPFmtpLine = bestMatchingFmtpLine
// r.Info("Adjusted H.264 codec parameters", "from", originalFmtpLine, "to", bestMatchingFmtpLine)
// }
// }
r.videoTLSRTP, err = NewTrackLocalStaticRTP(rcc.RTPCodecCapability, videoCodec.String(), r.suber.StreamPath)
if err != nil {
return
}
r.videoSender, err = r.pc.AddTrack(r.videoTLSRTP)
return
}
func (r *RemoteStream) Go() (err error) {
go func() {
rtcpBuf := make([]byte, 1500)
for {
if n, _, rtcpErr := r.videoSender.Read(rtcpBuf); rtcpErr != nil {
r.suber.Warn("rtcp read error", "error", rtcpErr)
return
} else {
if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
for _, pp := range p {
switch pp.(type) {
case *rtcp.PictureLossIndication:
// fmt.Println("PictureLossIndication")
}
}
}
}
}
}()
return m7s.PlayBlock(r.suber, (func(frame *mrtp.Audio) (err error))(nil), func(frame *mrtp.Video) error {
for _, p := range frame.Packets {
if err := r.videoTLSRTP.WriteRTP(p); err != nil {
return err
}
}
return nil
})
}
// SingleConnection extends Connection to handle multiple subscribers in a single WebRTC connection
type SingleConnection struct {
task.Manager[string, *RemoteStream]
Connection
Subscribers map[string]*m7s.Subscriber // map streamPath to subscriber
}
func NewSingleConnection() *SingleConnection {
return &SingleConnection{
Subscribers: make(map[string]*m7s.Subscriber),
}
func (c *SingleConnection) Start() (err error) {
c.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
c.Info(ice.ToJSON().Candidate)
}
})
// 监听ICE连接状态变化
c.OnICEConnectionStateChange(func(state ICEConnectionState) {
c.Debug("ICE connection state changed", "state", state.String())
if state == ICEConnectionStateFailed {
c.Error("ICE connection failed")
}
})
c.OnConnectionStateChange(func(state PeerConnectionState) {
c.Info("Connection State has changed:" + state.String())
switch state {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
c.Stop(errors.New("connection state:" + state.String()))
}
})
return
}
func (c *SingleConnection) Receive() {
c.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
c.Info("OnTrack", "kind", track.Kind().String(), "payloadType", uint8(track.Codec().PayloadType))
})
}
// AddSubscriber adds a new subscriber to the connection and starts sending
func (c *SingleConnection) AddSubscriber(streamPath string, subscriber *m7s.Subscriber) {
c.Subscribers[streamPath] = subscriber
if err := c.SendSubscriber(subscriber); err != nil {
c.Error("failed to start subscriber", "error", err, "streamPath", streamPath)
subscriber.Stop(err)
delete(c.Subscribers, streamPath)
}
func (c *SingleConnection) AddSubscriber(subscriber *m7s.Subscriber) (remoteStream *RemoteStream) {
remoteStream = &RemoteStream{suber: subscriber, pc: &c.Connection}
subscriber.Depend(remoteStream)
c.Add(remoteStream)
return
}
// RemoveSubscriber removes a subscriber from the connection
func (c *SingleConnection) RemoveSubscriber(streamPath string) {
if subscriber, ok := c.Subscribers[streamPath]; ok {
subscriber.Stop(task.ErrStopByUser)
delete(c.Subscribers, streamPath)
}
func (c *SingleConnection) RemoveSubscriber(remoteStream *RemoteStream) {
c.RemoveTrack(remoteStream.videoSender)
remoteStream.Stop(task.ErrStopByUser)
}
// HasSubscriber checks if a stream is already subscribed
func (c *SingleConnection) HasSubscriber(streamPath string) bool {
_, ok := c.Subscribers[streamPath]
return ok
return c.Has(streamPath)
}

View File

@@ -0,0 +1,591 @@
/**
* BatchV2Client - WebRTC client for Monibuca BatchV2 protocol
* Handles WebRTC connection, publishing, and subscribing to multiple streams
*/
class BatchV2Client {
ws = null;
pc = null;
localStream = null;
subscribedStreams = new Set();
videoSenders = new Map();
streamToTransceiver = new Map();
eventListeners = new Map();
wsUrl;
/**
* Create a new BatchV2Client
* @param host Optional host for WebSocket connection. Defaults to current location
*/
constructor(host) {
// Determine WebSocket URL
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
this.wsUrl = host ?
`${wsProtocol}//${host}/webrtc/batchv2` :
`${wsProtocol}//${location.host}/webrtc/batchv2`;
}
/**
* Connect to the WebRTC server
* @returns Promise that resolves when connection is established
*/
async connect() {
try {
this.log(`Connecting to ${this.wsUrl}...`);
// Create WebSocket connection
this.ws = new WebSocket(this.wsUrl);
return new Promise((resolve, reject) => {
if (!this.ws) {
reject(new Error('WebSocket not initialized'));
return;
}
this.ws.onopen = async () => {
this.log('WebSocket connection established', 'success');
// Create and initialize PeerConnection
const configuration = {
iceTransportPolicy: 'all',
bundlePolicy: 'max-bundle',
rtcpMuxPolicy: 'require',
iceCandidatePoolSize: 1
};
this.pc = new RTCPeerConnection(configuration);
// Use addTransceiver to create sender and receiver
const videoTransceiver = this.pc.addTransceiver('video', {
direction: 'sendrecv'
});
// Store sender reference
this.videoSenders.set('placeholder', videoTransceiver.sender);
this.log('Added placeholder tracks to PeerConnection', 'info');
// Set up event handlers
this.setupPeerConnectionEventHandlers();
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Send offer to server
this.sendMessage({
type: 'offer',
sdp: this.pc.localDescription.sdp
});
this.emit('connected', null);
resolve();
};
this.ws.onmessage = this.handleWebSocketMessage.bind(this);
this.ws.onclose = () => {
this.log('WebSocket connection closed');
this.cleanup();
this.emit('disconnected', null);
reject(new Error('WebSocket connection closed'));
};
this.ws.onerror = (error) => {
this.log(`WebSocket error: ${error}`, 'error');
this.cleanup();
this.emit('error', { message: 'WebSocket error' });
reject(new Error('WebSocket error'));
};
});
}
catch (error) {
this.log(`Connection error: ${error.message}`, 'error');
this.cleanup();
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Disconnect from the WebRTC server
*/
disconnect() {
this.cleanup();
}
/**
* Start publishing a stream
* @param streamPath Path for the stream
* @returns Promise that resolves when publishing starts
*/
async startPublishing(streamPath) {
try {
if (!streamPath) {
throw new Error('Please enter a valid stream path');
}
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get user media - only video needed
this.localStream = await navigator.mediaDevices.getUserMedia({
video: true,
audio: false
});
// Get actual video track
const videoTrack = this.localStream.getVideoTracks()[0];
// Use existing sender to replace track
const videoSender = this.videoSenders.get('placeholder');
if (videoSender) {
await videoSender.replaceTrack(videoTrack);
this.log('Replaced placeholder video track with real track', 'success');
}
// Update sender mapping
this.videoSenders.delete('placeholder');
this.videoSenders.set(streamPath, videoSender);
// Create new offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete with a timeout
await this.waitForIceGathering();
// Send publish signal
this.sendMessage({
type: 'publish',
streamPath: streamPath,
offer: this.pc.localDescription.sdp
});
this.log(`Started publishing to ${streamPath}`, 'success');
this.emit('publishStarted', { streamPath });
return Promise.resolve();
}
catch (error) {
this.log(`Publishing error: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Stop publishing a stream
* @param streamPath Path of the stream to stop
* @returns Promise that resolves when publishing stops
*/
async stopPublishing(streamPath) {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get current sender
const videoSender = this.videoSenders.get(streamPath);
// Set track to null
if (videoSender) {
await videoSender.replaceTrack(null);
this.log('Removed video track', 'info');
// Update sender mapping
this.videoSenders.delete(streamPath);
this.videoSenders.set('placeholder', videoSender);
}
// Stop local stream
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
}
// Create new offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete with a timeout
await this.waitForIceGathering();
// Send unpublish signal
this.sendMessage({
type: 'unpublish',
streamPath: streamPath
});
this.log(`Stopped publishing to ${streamPath}`, 'success');
this.emit('publishStopped', { streamPath });
return Promise.resolve();
}
catch (error) {
this.log(`Error stopping publish: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Get list of available streams
*/
getStreamList() {
if (!this.ws) {
this.log('Not connected to server', 'error');
return;
}
// Send getStreamList signal
this.sendMessage({
type: 'getStreamList'
});
this.log('Requested stream list', 'info');
}
/**
* Subscribe to streams
* @param streamPaths Array of stream paths to subscribe to
* @returns Promise that resolves when subscription is complete
*/
async subscribeToStreams(streamPaths) {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
if (streamPaths.length === 0) {
throw new Error('Please select at least one stream');
}
// Get the current subscribed streams before clearing
const previousStreams = new Set(this.subscribedStreams);
// Clear current subscriptions
this.subscribedStreams.clear();
// Add all selected streams to the subscription list
streamPaths.forEach(path => {
this.subscribedStreams.add(path);
});
// Find streams that were previously subscribed but are no longer in the list
const removedStreams = [];
previousStreams.forEach(stream => {
if (!this.subscribedStreams.has(stream)) {
// Get the transceiver associated with this stream
const transceiver = this.streamToTransceiver.get(stream);
// Set the transceiver to inactive if it exists
if (transceiver) {
transceiver.direction = 'inactive';
this.log(`Set transceiver for removed stream ${stream} to inactive`, 'info');
this.streamToTransceiver.delete(stream);
}
// Add to removed streams list
removedStreams.push(stream);
// Emit stream removed event
this.emit('streamRemoved', { streamPath: stream });
}
});
// Send unsubscribe signal for removed streams
if (removedStreams.length > 0) {
await this.sendUnsubscribeSignal(removedStreams);
}
// Find streams that need to be newly added
const newStreamPaths = Array.from(this.subscribedStreams).filter(path => !previousStreams.has(path));
this.log(`New stream paths: ${newStreamPaths.join(', ')}`, 'info');
// If there are new streams to subscribe to
if (newStreamPaths.length > 0) {
// Get all video transceivers that are inactive
const availableTransceivers = this.pc.getTransceivers().filter(transceiver => transceiver.direction === 'inactive');
this.log(`Available transceivers: ${availableTransceivers.length}`, 'info');
// Use available transceivers for new streams
let remainingNewStreams = [...newStreamPaths];
while (remainingNewStreams.length > 0 && availableTransceivers.length > 0) {
remainingNewStreams.pop();
availableTransceivers.pop().direction = 'recvonly';
}
const transceiverToAdd = remainingNewStreams.length;
// If available transceivers are not enough, create new ones
if (transceiverToAdd > 0) {
this.log(`Adding ${transceiverToAdd} new video transceivers`, 'info');
for (let i = 0; i < transceiverToAdd; i++) {
this.pc.addTransceiver('video', { direction: 'recvonly' });
}
}
// Create offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Send subscribe signal only for the new streams
this.sendMessage({
type: 'subscribe',
streamList: newStreamPaths,
offer: this.pc.localDescription.sdp
});
this.log(`Subscribing to ${newStreamPaths.length} new streams`, 'success');
}
this.log(`Total playing streams: ${this.subscribedStreams.size}`, 'success');
return Promise.resolve();
}
catch (error) {
this.log(`Error playing streams: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Send unsubscribe signal to the server
* @param streamPaths Array of stream paths to unsubscribe from
* @returns Promise that resolves when the unsubscribe signal is sent
*/
async sendUnsubscribeSignal(streamPaths) {
if (!this.ws || !this.pc) {
this.log('Not connected to server', 'error');
return;
}
if (streamPaths.length === 0) {
return;
}
try {
// Create offer for SDP exchange
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete
await this.waitForIceGathering();
// Send unsubscribe signal with SDP
this.sendMessage({
type: 'unsubscribe',
streamList: streamPaths,
offer: this.pc.localDescription.sdp
});
this.log(`Sent unsubscribe signal for ${streamPaths.length} streams`, 'info');
}
catch (error) {
this.log(`Error sending unsubscribe signal: ${error.message}`, 'error');
throw error;
}
}
/**
* Unsubscribe from a stream
* @param streamPath Path of the stream to unsubscribe from
* @returns Promise that resolves when unsubscription is complete
*/
async unsubscribeFromStream(streamPath) {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get the transceiver associated with this stream
const transceiver = this.streamToTransceiver.get(streamPath);
// Set the transceiver to inactive if it exists
if (transceiver) {
transceiver.direction = 'inactive';
this.log(`Set transceiver for ${streamPath} to inactive`, 'info');
this.streamToTransceiver.delete(streamPath);
// Send unsubscribe signal with SDP exchange
await this.sendUnsubscribeSignal([streamPath]);
}
this.subscribedStreams.delete(streamPath);
this.emit('streamRemoved', { streamPath });
this.log(`Removed ${streamPath} from subscription list`, 'info');
return Promise.resolve();
}
catch (error) {
this.log(`Error unsubscribing from stream: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Get the local media stream
* @returns The local media stream or null if not publishing
*/
getLocalStream() {
return this.localStream;
}
/**
* Get the list of currently subscribed streams
* @returns Array of stream paths
*/
getSubscribedStreams() {
return Array.from(this.subscribedStreams);
}
/**
* Add event listener
* @param event Event type
* @param listener Event listener function
*/
on(event, listener) {
if (!this.eventListeners.has(event)) {
this.eventListeners.set(event, []);
}
this.eventListeners.get(event).push(listener);
}
/**
* Remove event listener
* @param event Event type
* @param listener Event listener function to remove
*/
off(event, listener) {
if (!this.eventListeners.has(event)) {
return;
}
const listeners = this.eventListeners.get(event);
const index = listeners.indexOf(listener);
if (index !== -1) {
listeners.splice(index, 1);
}
}
/**
* Emit an event
* @param event Event type
* @param data Event data
*/
emit(event, data) {
if (!this.eventListeners.has(event)) {
return;
}
const listeners = this.eventListeners.get(event);
for (const listener of listeners) {
listener(data);
}
}
/**
* Log a message and emit a log event
* @param message Message to log
* @param level Log level
*/
log(message, level = 'info') {
this.emit('log', { message, level, time: new Date() });
}
/**
* Set up event handlers for the peer connection
*/
setupPeerConnectionEventHandlers() {
if (!this.pc) {
return;
}
this.pc.onicecandidate = event => {
if (event.candidate) {
this.log('ICE candidate: ' + event.candidate.candidate);
}
else {
this.log('ICE gathering complete');
}
};
this.pc.onicegatheringstatechange = () => {
this.log(`ICE gathering state: ${this.pc.iceGatheringState}`);
this.emit('iceStateChange', { state: this.pc.iceGatheringState });
};
this.pc.oniceconnectionstatechange = () => {
this.log(`ICE connection state: ${this.pc.iceConnectionState}`);
this.emit('iceStateChange', { state: this.pc.iceConnectionState });
if (this.pc.iceConnectionState === 'failed') {
this.log('ICE connection failed', 'error');
}
};
this.pc.onconnectionstatechange = () => {
this.log(`Connection state changed: ${this.pc.connectionState}`);
this.emit('connectionStateChange', { state: this.pc.connectionState });
if (this.pc.connectionState === 'connected') {
this.log('PeerConnection established successfully', 'success');
}
};
this.pc.ontrack = this.handleTrackEvent.bind(this);
}
/**
* Handle track events from the peer connection
* @param event Track event
*/
handleTrackEvent(event) {
this.log(`Track received: ${event.track.kind}/${event.track.id}`, 'success');
// Get transceiver directly from event
const transceiver = event.transceiver;
if (!transceiver) {
this.log(`Could not find transceiver for track: ${event.track.id}`, 'warning');
}
// Add track statistics
const stats = {};
event.track.onunmute = () => {
this.log(`Track unmuted: ${event.track.kind}/${event.track.id}`, 'success');
};
// Periodically get statistics
const statsInterval = setInterval(async () => {
if (!this.pc || this.pc.connectionState !== 'connected') {
this.log('Connection state changed, stopping stats collection', 'info');
clearInterval(statsInterval);
return;
}
try {
const rtcStats = await this.pc.getStats(event.track);
rtcStats.forEach(stat => {
if (stat.type === 'inbound-rtp' && stat.kind === event.track.kind) {
const packetsReceived = stat.packetsReceived || 0;
const prevPackets = stats[event.track.id] || 0;
if (prevPackets !== packetsReceived) {
stats[event.track.id] = packetsReceived;
}
}
});
}
catch (e) {
this.log(`Error getting stats: ${e.message}`, 'error');
}
}, 5000); // Update every 5 seconds
if (event.track.kind === 'video' && event.streams[0]) {
const streamId = event.streams[0].id;
this.streamToTransceiver.set(streamId, transceiver);
// Emit stream added event with stream information
this.emit('streamAdded', {
streamId,
stream: event.streams[0],
track: event.track
});
}
}
/**
* Handle WebSocket messages
* @param event WebSocket message event
*/
async handleWebSocketMessage(event) {
const message = JSON.parse(event.data);
this.log(`Received message: ${message.type}`);
if ('type' in message) {
if (message.type === 'answer') {
const answer = new RTCSessionDescription({
type: 'answer',
sdp: message.sdp
});
await this.pc.setRemoteDescription(answer);
this.log('Remote description set', 'success');
}
else if (message.type === 'error') {
this.log(`Error: ${message.message}`, 'error');
this.emit('error', { message: message.message });
}
else if (message.type === 'streamList') {
this.log(`Received stream list with ${message.streams.length} streams`, 'info');
this.emit('streamList', { streams: message.streams });
}
}
}
/**
* Send a message to the WebSocket server
* @param message Message to send
*/
sendMessage(message) {
if (!this.ws) {
this.log('Not connected to server', 'error');
return;
}
this.ws.send(JSON.stringify(message));
}
/**
* Wait for ICE gathering to complete with a timeout
* @param timeout Timeout in milliseconds
* @returns Promise that resolves when ICE gathering is complete or timeout is reached
*/
async waitForIceGathering(timeout = 2000) {
if (!this.pc) {
return Promise.reject(new Error('PeerConnection not initialized'));
}
return Promise.race([
new Promise(resolve => {
if (this.pc.iceGatheringState === 'complete') {
resolve();
}
else {
const checkState = () => {
if (this.pc.iceGatheringState === 'complete') {
this.pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
};
this.pc.addEventListener('icegatheringstatechange', checkState);
}
}),
new Promise(resolve => setTimeout(resolve, timeout))
]);
}
/**
* Clean up all resources
*/
cleanup() {
// Close WebSocket
if (this.ws) {
this.ws.close();
this.ws = null;
}
// Close PeerConnection
if (this.pc) {
this.pc.close();
this.pc = null;
}
// Stop local stream
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
}
// Clear subscribed streams
this.subscribedStreams.clear();
// Clear senders and transceiver mappings
this.videoSenders.clear();
this.streamToTransceiver.clear();
this.log('Connection cleaned up', 'info');
}
}
export default BatchV2Client;

View File

@@ -0,0 +1,745 @@
/**
* BatchV2Client - WebRTC client for Monibuca BatchV2 protocol
* Handles WebRTC connection, publishing, and subscribing to multiple streams
*/
// Types for stream information
interface StreamInfo {
path: string;
width: number;
height: number;
fps: number;
}
// Types for WebSocket messages
type MessageType =
| { type: 'offer', sdp: string; }
| { type: 'answer', sdp: string; }
| { type: 'error', message: string; }
| { type: 'streamList', streams: StreamInfo[]; }
| { type: 'publish', streamPath: string, offer: string; }
| { type: 'unpublish', streamPath: string; }
| { type: 'subscribe', streamList: string[], offer: string; }
| { type: 'unsubscribe', streamList: string[], offer: string; }
| { type: 'getStreamList'; };
// Event types for the client
type EventType =
| 'connected'
| 'disconnected'
| 'error'
| 'streamList'
| 'publishStarted'
| 'publishStopped'
| 'streamAdded'
| 'streamRemoved'
| 'iceStateChange'
| 'connectionStateChange'
| 'log';
type LogLevel = 'info' | 'error' | 'success' | 'warning';
// Event listener type
interface EventListener {
(data: any): void;
}
class BatchV2Client {
private ws: WebSocket | null = null;
private pc: RTCPeerConnection | null = null;
private localStream: MediaStream | null = null;
private subscribedStreams: Set<string> = new Set();
private videoSenders: Map<string, RTCRtpSender> = new Map();
private streamToTransceiver: Map<string, RTCRtpTransceiver> = new Map();
private eventListeners: Map<EventType, EventListener[]> = new Map();
private wsUrl: string;
/**
* Create a new BatchV2Client
* @param host Optional host for WebSocket connection. Defaults to current location
*/
constructor(host?: string) {
// Determine WebSocket URL
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
this.wsUrl = host ?
`${wsProtocol}//${host}/webrtc/batchv2` :
`${wsProtocol}//${location.host}/webrtc/batchv2`;
}
/**
* Connect to the WebRTC server
* @returns Promise that resolves when connection is established
*/
public async connect(): Promise<void> {
try {
this.log(`Connecting to ${this.wsUrl}...`);
// Create WebSocket connection
this.ws = new WebSocket(this.wsUrl);
return new Promise((resolve, reject) => {
if (!this.ws) {
reject(new Error('WebSocket not initialized'));
return;
}
this.ws.onopen = async () => {
this.log('WebSocket connection established', 'success');
// Create and initialize PeerConnection
const configuration: RTCConfiguration = {
iceTransportPolicy: 'all',
bundlePolicy: 'max-bundle',
rtcpMuxPolicy: 'require',
iceCandidatePoolSize: 1
};
this.pc = new RTCPeerConnection(configuration);
// Use addTransceiver to create sender and receiver
const videoTransceiver = this.pc.addTransceiver('video', {
direction: 'sendrecv'
});
// Store sender reference
this.videoSenders.set('placeholder', videoTransceiver.sender);
this.log('Added placeholder tracks to PeerConnection', 'info');
// Set up event handlers
this.setupPeerConnectionEventHandlers();
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Send offer to server
this.sendMessage({
type: 'offer',
sdp: this.pc.localDescription!.sdp
});
this.emit('connected', null);
resolve();
};
this.ws.onmessage = this.handleWebSocketMessage.bind(this);
this.ws.onclose = () => {
this.log('WebSocket connection closed');
this.cleanup();
this.emit('disconnected', null);
reject(new Error('WebSocket connection closed'));
};
this.ws.onerror = (error) => {
this.log(`WebSocket error: ${error}`, 'error');
this.cleanup();
this.emit('error', { message: 'WebSocket error' });
reject(new Error('WebSocket error'));
};
});
} catch (error: any) {
this.log(`Connection error: ${error.message}`, 'error');
this.cleanup();
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Disconnect from the WebRTC server
*/
public disconnect(): void {
this.cleanup();
}
/**
* Start publishing a stream
* @param streamPath Path for the stream
* @returns Promise that resolves when publishing starts
*/
public async startPublishing(streamPath: string): Promise<void> {
try {
if (!streamPath) {
throw new Error('Please enter a valid stream path');
}
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get user media - only video needed
this.localStream = await navigator.mediaDevices.getUserMedia({
video: true,
audio: false
});
// Get actual video track
const videoTrack = this.localStream.getVideoTracks()[0];
// Use existing sender to replace track
const videoSender = this.videoSenders.get('placeholder');
if (videoSender) {
await videoSender.replaceTrack(videoTrack);
this.log('Replaced placeholder video track with real track', 'success');
}
// Update sender mapping
this.videoSenders.delete('placeholder');
this.videoSenders.set(streamPath, videoSender!);
// Create new offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete with a timeout
await this.waitForIceGathering();
// Send publish signal
this.sendMessage({
type: 'publish',
streamPath: streamPath,
offer: this.pc.localDescription!.sdp
});
this.log(`Started publishing to ${streamPath}`, 'success');
this.emit('publishStarted', { streamPath });
return Promise.resolve();
} catch (error: any) {
this.log(`Publishing error: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Stop publishing a stream
* @param streamPath Path of the stream to stop
* @returns Promise that resolves when publishing stops
*/
public async stopPublishing(streamPath: string): Promise<void> {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get current sender
const videoSender = this.videoSenders.get(streamPath);
// Set track to null
if (videoSender) {
await videoSender.replaceTrack(null);
this.log('Removed video track', 'info');
// Update sender mapping
this.videoSenders.delete(streamPath);
this.videoSenders.set('placeholder', videoSender);
}
// Stop local stream
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
}
// Create new offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete with a timeout
await this.waitForIceGathering();
// Send unpublish signal
this.sendMessage({
type: 'unpublish',
streamPath: streamPath
});
this.log(`Stopped publishing to ${streamPath}`, 'success');
this.emit('publishStopped', { streamPath });
return Promise.resolve();
} catch (error: any) {
this.log(`Error stopping publish: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Get list of available streams
*/
public getStreamList(): void {
if (!this.ws) {
this.log('Not connected to server', 'error');
return;
}
// Send getStreamList signal
this.sendMessage({
type: 'getStreamList'
});
this.log('Requested stream list', 'info');
}
/**
* Subscribe to streams
* @param streamPaths Array of stream paths to subscribe to
* @returns Promise that resolves when subscription is complete
*/
public async subscribeToStreams(streamPaths: string[]): Promise<void> {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
if (streamPaths.length === 0) {
throw new Error('Please select at least one stream');
}
// Get the current subscribed streams before clearing
const previousStreams = new Set(this.subscribedStreams);
// Clear current subscriptions
this.subscribedStreams.clear();
// Add all selected streams to the subscription list
streamPaths.forEach(path => {
this.subscribedStreams.add(path);
});
// Find streams that were previously subscribed but are no longer in the list
const removedStreams: string[] = [];
previousStreams.forEach(stream => {
if (!this.subscribedStreams.has(stream)) {
// Get the transceiver associated with this stream
const transceiver = this.streamToTransceiver.get(stream);
// Set the transceiver to inactive if it exists
if (transceiver) {
transceiver.direction = 'inactive';
this.log(`Set transceiver for removed stream ${stream} to inactive`, 'info');
this.streamToTransceiver.delete(stream);
}
// Add to removed streams list
removedStreams.push(stream);
// Emit stream removed event
this.emit('streamRemoved', { streamPath: stream });
}
});
// Send unsubscribe signal for removed streams
if (removedStreams.length > 0) {
await this.sendUnsubscribeSignal(removedStreams);
}
// Find streams that need to be newly added
const newStreamPaths = Array.from(this.subscribedStreams).filter(
path => !previousStreams.has(path)
);
this.log(`New stream paths: ${newStreamPaths.join(', ')}`, 'info');
// If there are new streams to subscribe to
if (newStreamPaths.length > 0) {
// Get all video transceivers that are inactive
const availableTransceivers = this.pc.getTransceivers().filter(
transceiver => transceiver.direction === 'inactive'
);
this.log(`Available transceivers: ${availableTransceivers.length}`, 'info');
// Use available transceivers for new streams
let remainingNewStreams = [...newStreamPaths];
while (remainingNewStreams.length > 0 && availableTransceivers.length > 0) {
remainingNewStreams.pop();
availableTransceivers.pop()!.direction = 'recvonly';
}
const transceiverToAdd = remainingNewStreams.length;
// If available transceivers are not enough, create new ones
if (transceiverToAdd > 0) {
this.log(`Adding ${transceiverToAdd} new video transceivers`, 'info');
for (let i = 0; i < transceiverToAdd; i++) {
this.pc.addTransceiver('video', { direction: 'recvonly' });
}
}
// Create offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Send subscribe signal only for the new streams
this.sendMessage({
type: 'subscribe',
streamList: newStreamPaths,
offer: this.pc.localDescription!.sdp
});
this.log(`Subscribing to ${newStreamPaths.length} new streams`, 'success');
}
this.log(`Total playing streams: ${this.subscribedStreams.size}`, 'success');
return Promise.resolve();
} catch (error: any) {
this.log(`Error playing streams: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Send unsubscribe signal to the server
* @param streamPaths Array of stream paths to unsubscribe from
* @returns Promise that resolves when the unsubscribe signal is sent
*/
private async sendUnsubscribeSignal(streamPaths: string[]): Promise<void> {
if (!this.ws || !this.pc) {
this.log('Not connected to server', 'error');
return;
}
if (streamPaths.length === 0) {
return;
}
try {
// Create offer for SDP exchange
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete
await this.waitForIceGathering();
// Send unsubscribe signal with SDP
this.sendMessage({
type: 'unsubscribe',
streamList: streamPaths,
offer: this.pc.localDescription!.sdp
});
this.log(`Sent unsubscribe signal for ${streamPaths.length} streams`, 'info');
} catch (error: any) {
this.log(`Error sending unsubscribe signal: ${error.message}`, 'error');
throw error;
}
}
/**
* Unsubscribe from a stream
* @param streamPath Path of the stream to unsubscribe from
* @returns Promise that resolves when unsubscription is complete
*/
public async unsubscribeFromStream(streamPath: string): Promise<void> {
try {
if (!this.pc || !this.ws) {
throw new Error('Not connected to server');
}
// Get the transceiver associated with this stream
const transceiver = this.streamToTransceiver.get(streamPath);
// Set the transceiver to inactive if it exists
if (transceiver) {
transceiver.direction = 'inactive';
this.log(`Set transceiver for ${streamPath} to inactive`, 'info');
this.streamToTransceiver.delete(streamPath);
// Send unsubscribe signal with SDP exchange
await this.sendUnsubscribeSignal([streamPath]);
}
this.subscribedStreams.delete(streamPath);
this.emit('streamRemoved', { streamPath });
this.log(`Removed ${streamPath} from subscription list`, 'info');
return Promise.resolve();
} catch (error: any) {
this.log(`Error unsubscribing from stream: ${error.message}`, 'error');
this.emit('error', { message: error.message });
throw error;
}
}
/**
* Get the local media stream
* @returns The local media stream or null if not publishing
*/
public getLocalStream(): MediaStream | null {
return this.localStream;
}
/**
* Get the list of currently subscribed streams
* @returns Array of stream paths
*/
public getSubscribedStreams(): string[] {
return Array.from(this.subscribedStreams);
}
/**
* Add event listener
* @param event Event type
* @param listener Event listener function
*/
public on(event: EventType, listener: EventListener): void {
if (!this.eventListeners.has(event)) {
this.eventListeners.set(event, []);
}
this.eventListeners.get(event)!.push(listener);
}
/**
* Remove event listener
* @param event Event type
* @param listener Event listener function to remove
*/
public off(event: EventType, listener: EventListener): void {
if (!this.eventListeners.has(event)) {
return;
}
const listeners = this.eventListeners.get(event)!;
const index = listeners.indexOf(listener);
if (index !== -1) {
listeners.splice(index, 1);
}
}
/**
* Emit an event
* @param event Event type
* @param data Event data
*/
private emit(event: EventType, data: any): void {
if (!this.eventListeners.has(event)) {
return;
}
const listeners = this.eventListeners.get(event)!;
for (const listener of listeners) {
listener(data);
}
}
/**
* Log a message and emit a log event
* @param message Message to log
* @param level Log level
*/
private log(message: string, level: LogLevel = 'info'): void {
this.emit('log', { message, level, time: new Date() });
}
/**
* Set up event handlers for the peer connection
*/
private setupPeerConnectionEventHandlers(): void {
if (!this.pc) {
return;
}
this.pc.onicecandidate = event => {
if (event.candidate) {
this.log('ICE candidate: ' + event.candidate.candidate);
} else {
this.log('ICE gathering complete');
}
};
this.pc.onicegatheringstatechange = () => {
this.log(`ICE gathering state: ${this.pc!.iceGatheringState}`);
this.emit('iceStateChange', { state: this.pc!.iceGatheringState });
};
this.pc.oniceconnectionstatechange = () => {
this.log(`ICE connection state: ${this.pc!.iceConnectionState}`);
this.emit('iceStateChange', { state: this.pc!.iceConnectionState });
if (this.pc!.iceConnectionState === 'failed') {
this.log('ICE connection failed', 'error');
}
};
this.pc.onconnectionstatechange = () => {
this.log(`Connection state changed: ${this.pc!.connectionState}`);
this.emit('connectionStateChange', { state: this.pc!.connectionState });
if (this.pc!.connectionState === 'connected') {
this.log('PeerConnection established successfully', 'success');
}
};
this.pc.ontrack = this.handleTrackEvent.bind(this);
}
/**
* Handle track events from the peer connection
* @param event Track event
*/
private handleTrackEvent(event: RTCTrackEvent): void {
this.log(`Track received: ${event.track.kind}/${event.track.id}`, 'success');
// Get transceiver directly from event
const transceiver = event.transceiver;
if (!transceiver) {
this.log(`Could not find transceiver for track: ${event.track.id}`, 'warning');
}
// Add track statistics
const stats: Record<string, number> = {};
event.track.onunmute = () => {
this.log(`Track unmuted: ${event.track.kind}/${event.track.id}`, 'success');
};
// Periodically get statistics
const statsInterval = setInterval(async () => {
if (!this.pc || this.pc.connectionState !== 'connected') {
this.log('Connection state changed, stopping stats collection', 'info');
clearInterval(statsInterval);
return;
}
try {
const rtcStats = await this.pc.getStats(event.track);
rtcStats.forEach(stat => {
if (stat.type === 'inbound-rtp' && stat.kind === event.track.kind) {
const packetsReceived = stat.packetsReceived || 0;
const prevPackets = stats[event.track.id] || 0;
if (prevPackets !== packetsReceived) {
stats[event.track.id] = packetsReceived;
}
}
});
} catch (e: any) {
this.log(`Error getting stats: ${e.message}`, 'error');
}
}, 5000); // Update every 5 seconds
if (event.track.kind === 'video' && event.streams[0]) {
const streamId = event.streams[0].id;
this.streamToTransceiver.set(streamId, transceiver);
// Emit stream added event with stream information
this.emit('streamAdded', {
streamId,
stream: event.streams[0],
track: event.track
});
}
}
/**
* Handle WebSocket messages
* @param event WebSocket message event
*/
private async handleWebSocketMessage(event: MessageEvent): Promise<void> {
const message = JSON.parse(event.data) as MessageType;
this.log(`Received message: ${(message as any).type}`);
if ('type' in message) {
if (message.type === 'answer') {
const answer = new RTCSessionDescription({
type: 'answer',
sdp: message.sdp
});
await this.pc!.setRemoteDescription(answer);
this.log('Remote description set', 'success');
} else if (message.type === 'error') {
this.log(`Error: ${message.message}`, 'error');
this.emit('error', { message: message.message });
} else if (message.type === 'streamList') {
this.log(`Received stream list with ${message.streams.length} streams`, 'info');
this.emit('streamList', { streams: message.streams });
}
}
}
/**
* Send a message to the WebSocket server
* @param message Message to send
*/
private sendMessage(message: any): void {
if (!this.ws) {
this.log('Not connected to server', 'error');
return;
}
this.ws.send(JSON.stringify(message));
}
/**
* Wait for ICE gathering to complete with a timeout
* @param timeout Timeout in milliseconds
* @returns Promise that resolves when ICE gathering is complete or timeout is reached
*/
private async waitForIceGathering(timeout: number = 2000): Promise<void> {
if (!this.pc) {
return Promise.reject(new Error('PeerConnection not initialized'));
}
return Promise.race([
new Promise<void>(resolve => {
if (this.pc!.iceGatheringState === 'complete') {
resolve();
} else {
const checkState = () => {
if (this.pc!.iceGatheringState === 'complete') {
this.pc!.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
};
this.pc!.addEventListener('icegatheringstatechange', checkState);
}
}),
new Promise<void>(resolve => setTimeout(resolve, timeout))
]);
}
/**
* Clean up all resources
*/
private cleanup(): void {
// Close WebSocket
if (this.ws) {
this.ws.close();
this.ws = null;
}
// Close PeerConnection
if (this.pc) {
this.pc.close();
this.pc = null;
}
// Stop local stream
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
}
// Clear subscribed streams
this.subscribedStreams.clear();
// Clear senders and transceiver mappings
this.videoSenders.clear();
this.streamToTransceiver.clear();
this.log('Connection cleaned up', 'info');
}
}
export default BatchV2Client;

View File

@@ -0,0 +1,503 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebRTC BatchV2 Test</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.container {
display: flex;
flex-wrap: wrap;
gap: 20px;
}
.control-panel {
flex: 1;
min-width: 300px;
border: 1px solid #ccc;
padding: 15px;
border-radius: 5px;
}
.video-container {
flex: 2;
min-width: 400px;
display: flex;
flex-direction: column;
gap: 10px;
}
.video-wrapper {
position: relative;
margin-bottom: 10px;
}
video {
width: 100%;
background-color: #000;
border-radius: 5px;
}
#remoteVideos {
display: flex;
flex-wrap: wrap;
gap: 5px;
width: 100%;
justify-content: flex-start;
min-height: 200px;
max-height: 600px;
overflow-y: auto;
}
#remoteVideos .video-wrapper {
flex-grow: 1;
margin-bottom: 5px;
}
.stream-info {
position: absolute;
top: 5px;
left: 5px;
background-color: rgba(0, 0, 0, 0.7);
color: white;
padding: 3px 8px;
border-radius: 3px;
font-size: 12px;
}
button {
padding: 8px 12px;
margin: 5px 0;
cursor: pointer;
}
input,
select {
width: 100%;
padding: 8px;
margin: 5px 0 10px;
box-sizing: border-box;
}
.log-container {
margin-top: 20px;
border: 1px solid #ccc;
padding: 10px;
height: 1000px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
background-color: #f5f5f5;
}
.log-entry {
margin: 2px 0;
border-bottom: 1px solid #eee;
padding-bottom: 2px;
}
.log-time {
color: #666;
margin-right: 5px;
}
.error {
color: red;
}
.success {
color: green;
}
.info {
color: blue;
}
</style>
</head>
<body>
<h1>WebRTC BatchV2 Test</h1>
<p>Test the BatchV2 functionality with WebSocket signaling for multiple streams over a single PeerConnection</p>
<div class="container">
<div class="control-panel">
<h2>Controls</h2>
<h3>Connection</h3>
<button type="button" id="connectBtn">Connect</button>
<button type="button" id="disconnectBtn" disabled>Disconnect</button>
<h3>Publish Stream</h3>
<label for="publishStreamPath">Stream Path:</label>
<input type="text" id="publishStreamPath" value="live/test" placeholder="e.g., live/test">
<button type="button" id="startPublishBtn" disabled>Start Publishing</button>
<button type="button" id="stopPublishBtn" disabled>Stop Publishing</button>
<h3>Available Streams</h3>
<button type="button" id="getStreamsBtn" disabled>Get Stream List</button>
<div id="availableStreams">
<h4>Available Streams:</h4>
<label for="streamSelect">Select streams to play:</label>
<select id="streamSelect" size="5" multiple aria-label="Available streams"></select>
</div>
<div>
<button type="button" id="playSelectedBtn" disabled>Play Selected Streams</button>
</div>
<div id="subscribedStreams">
<h4>Subscribed Streams:</h4>
<ul id="streamList"></ul>
</div>
</div>
<div class="video-container">
<h2>Local Video</h2>
<div class="video-wrapper">
<video id="localVideo" autoplay muted></video>
<div class="stream-info" id="localStreamInfo">Not publishing</div>
</div>
<h2>Remote Videos</h2>
<div id="remoteVideos" class="remote-videos-container"></div>
</div>
</div>
<div class="log-container" id="logContainer"></div>
<script type="module">
import BatchV2Client from './BatchV2Client.js';
// DOM elements
const connectBtn = document.getElementById('connectBtn');
const disconnectBtn = document.getElementById('disconnectBtn');
const startPublishBtn = document.getElementById('startPublishBtn');
const stopPublishBtn = document.getElementById('stopPublishBtn');
const getStreamsBtn = document.getElementById('getStreamsBtn');
const playSelectedBtn = document.getElementById('playSelectedBtn');
const publishStreamPath = document.getElementById('publishStreamPath');
const streamSelect = document.getElementById('streamSelect');
const streamList = document.getElementById('streamList');
const localVideo = document.getElementById('localVideo');
const localStreamInfo = document.getElementById('localStreamInfo');
const remoteVideos = document.getElementById('remoteVideos');
const logContainer = document.getElementById('logContainer');
let client = null;
let remoteVideoCount = 0;
// Logging function
function log(message, type = 'info') {
const logEntry = document.createElement('div');
logEntry.className = `log-entry ${type}`;
const time = document.createElement('span');
time.className = 'log-time';
time.textContent = new Date().toLocaleTimeString();
logEntry.appendChild(time);
logEntry.appendChild(document.createTextNode(message));
logContainer.appendChild(logEntry);
logContainer.scrollTop = logContainer.scrollHeight;
}
// Function to remove video element for a specific stream
function removeVideoElement(streamId) {
const videoElement = document.getElementById(`video-${streamId}`);
if (videoElement) {
const wrapper = videoElement.closest('.video-wrapper');
if (wrapper) {
wrapper.remove();
remoteVideoCount--;
log(`Removed video element for stream ${streamId}`, 'info');
updateRemoteVideoLayout();
}
}
}
// Update stream list UI
function updateStreamListUI() {
streamList.innerHTML = '';
if (!client) return;
const subscribed = client.getSubscribedStreams();
subscribed.forEach(streamPath => {
const li = document.createElement('li');
li.textContent = streamPath;
const removeBtn = document.createElement('button');
removeBtn.textContent = 'Remove';
removeBtn.style.marginLeft = '10px';
removeBtn.onclick = async () => {
try {
await client.unsubscribeFromStream(streamPath);
// The 'streamRemoved' event will handle UI updates
} catch (error) {
log(`Error removing stream ${streamPath}: ${error.message}`, 'error');
}
};
li.appendChild(removeBtn);
streamList.appendChild(li);
});
if (remoteVideos.querySelectorAll('.video-wrapper').length > 0) {
updateRemoteVideoLayout();
}
}
// Cleanup function
function cleanupUI() {
// Clear remote videos
remoteVideos.innerHTML = '';
remoteVideoCount = 0;
// Clear subscribed streams UI
streamList.innerHTML = '';
// Reset UI elements state
localVideo.srcObject = null;
localStreamInfo.textContent = 'Not publishing';
connectBtn.disabled = false;
disconnectBtn.disabled = true;
startPublishBtn.disabled = true;
stopPublishBtn.disabled = true;
getStreamsBtn.disabled = true;
playSelectedBtn.disabled = true;
// Clear stream select
streamSelect.innerHTML = '';
log('UI cleaned up', 'info');
}
// Function to update the layout of remote videos
function updateRemoteVideoLayout() {
const videoWrappers = remoteVideos.querySelectorAll('.video-wrapper');
if (videoWrappers.length === 0) return;
// Get the local video dimensions as a reference
const localVideoWrapper = document.querySelector('.video-container .video-wrapper');
const localVideoHeight = localVideoWrapper ? localVideoWrapper.offsetHeight : 300;
// Calculate the optimal grid layout
const count = videoWrappers.length;
let columns;
// Determine the number of columns based on the count
if (count <= 1) {
columns = 1;
} else if (count <= 4) {
columns = 2;
} else if (count <= 9) {
columns = 3;
} else {
columns = 4;
}
// Calculate rows based on count and columns
const rows = Math.ceil(count / columns);
// Calculate the width percentage for each video
const widthPercentage = (100 / columns) - 1; // Subtract 1% for the gap
// Calculate the height for each video to fit within the container
// and maintain proper spacing
const containerHeight = Math.min(localVideoHeight, 600); // Cap at 600px max
const heightPerVideo = (containerHeight / rows) - 10; // 10px for gap
// Apply the width and height to each video wrapper
videoWrappers.forEach(wrapper => {
wrapper.style.width = `${widthPercentage}%`;
wrapper.style.minWidth = `${widthPercentage}%`;
wrapper.style.height = `${heightPerVideo}px`;
wrapper.style.minHeight = '120px'; // Minimum height
});
log(`Updated remote video layout: ${count} videos in ${columns} columns and ${rows} rows`, 'info');
}
// --- Event Listeners for UI elements ---
connectBtn.addEventListener('click', async () => {
if (client) return; // Already connected or connecting
client = new BatchV2Client();
// Setup event listeners for the client
client.on('log', (data) => log(data.message, data.level));
client.on('error', (data) => log(`Client Error: ${data.message}`, 'error'));
client.on('connected', () => {
log('Client connected successfully', 'success');
connectBtn.disabled = true;
disconnectBtn.disabled = false;
startPublishBtn.disabled = false;
getStreamsBtn.disabled = false;
playSelectedBtn.disabled = false;
});
client.on('disconnected', () => {
log('Client disconnected');
cleanupUI();
client = null;
});
client.on('publishStarted', (data) => {
log(`Publishing started for ${data.streamPath}`, 'success');
localVideo.srcObject = client.getLocalStream();
localStreamInfo.textContent = `Publishing: ${data.streamPath}`;
startPublishBtn.disabled = true;
stopPublishBtn.disabled = false;
});
client.on('publishStopped', (data) => {
log(`Publishing stopped for ${data.streamPath}`, 'success');
localVideo.srcObject = null;
localStreamInfo.textContent = 'Not publishing';
startPublishBtn.disabled = false;
stopPublishBtn.disabled = true;
});
client.on('streamList', (data) => {
log(`Received stream list with ${data.streams.length} streams`, 'info');
streamSelect.innerHTML = ''; // Clear previous options
if (data.streams.length === 0) {
const option = document.createElement('option');
option.disabled = true;
option.textContent = 'No H.264 streams available';
streamSelect.appendChild(option);
} else {
data.streams.forEach(stream => {
const option = document.createElement('option');
option.value = stream.path;
option.textContent = `${stream.path} (${stream.width}x${stream.height} @ ${stream.fps}fps)`;
option.dataset.width = stream.width;
option.dataset.height = stream.height;
option.dataset.fps = stream.fps;
streamSelect.appendChild(option);
});
}
});
client.on('streamAdded', (data) => {
log(`Stream added: ${data.streamId}`, 'success');
let videoElement = document.getElementById(`video-${data.streamId}`);
if (!videoElement) {
const videoWrapper = document.createElement('div');
videoWrapper.className = 'video-wrapper';
videoElement = document.createElement('video');
videoElement.id = `video-${data.streamId}`;
videoElement.autoplay = true;
videoElement.playsInline = true;
videoElement.srcObject = data.stream;
videoElement.style.width = '100%';
videoElement.style.height = '100%';
const streamInfo = document.createElement('div');
streamInfo.className = 'stream-info';
streamInfo.textContent = `Stream: ${data.streamId}`;
videoWrapper.appendChild(videoElement);
videoWrapper.appendChild(streamInfo);
remoteVideos.appendChild(videoWrapper);
videoElement.play().catch(e => log(`Autoplay failed for ${data.streamId}: ${e.message}`, 'warning'));
log(`Created video element for stream ${data.streamId}`, 'success');
remoteVideoCount++;
updateRemoteVideoLayout();
updateStreamListUI(); // Update the list of subscribed streams
} else {
videoElement.srcObject = data.stream;
log(`Updated existing video element for stream ${data.streamId}`, 'info');
}
});
client.on('streamRemoved', (data) => {
log(`Stream removed: ${data.streamPath}`, 'info');
removeVideoElement(data.streamPath);
updateStreamListUI(); // Update the list of subscribed streams
});
try {
await client.connect();
} catch (error) {
log(`Connection failed: ${error.message}`, 'error');
cleanupUI();
client = null;
}
});
disconnectBtn.addEventListener('click', () => {
if (client) {
client.disconnect(); // This will trigger the 'disconnected' event and cleanup
} else {
cleanupUI(); // Ensure UI is cleaned up even if client wasn't fully connected
}
});
startPublishBtn.addEventListener('click', async () => {
if (!client) return;
const streamPath = publishStreamPath.value.trim();
try {
await client.startPublishing(streamPath);
} catch (error) {
log(`Failed to start publishing: ${error.message}`, 'error');
// UI updates are handled by 'publishStarted'/'publishStopped' events
}
});
stopPublishBtn.addEventListener('click', async () => {
if (!client) return;
const streamPath = publishStreamPath.value.trim();
try {
await client.stopPublishing(streamPath);
} catch (error) {
log(`Failed to stop publishing: ${error.message}`, 'error');
// UI updates are handled by 'publishStarted'/'publishStopped' events
}
});
getStreamsBtn.addEventListener('click', () => {
if (client) {
client.getStreamList();
}
});
playSelectedBtn.addEventListener('click', async () => {
if (!client) return;
const selectedOptions = Array.from(streamSelect.selectedOptions);
const streamPaths = selectedOptions.map(option => option.value);
try {
await client.subscribeToStreams(streamPaths);
log(`Subscribing to ${streamPaths.length} streams`, 'info');
// UI updates (adding videos, updating list) are handled by 'streamAdded' event
updateStreamListUI(); // Ensure the list reflects the new subscriptions immediately
} catch (error) {
log(`Failed to subscribe to streams: ${error.message}`, 'error');
}
});
// Add window resize event listener
window.addEventListener('resize', () => {
if (remoteVideos.querySelectorAll('.video-wrapper').length > 0) {
updateRemoteVideoLayout();
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ESNext", // 或者 "es6", "es2015", "es2020" 等,根据你的需求
"module": "esnext", // 或者 "commonjs", "amd", "umd" 等,根据你的需求
"lib": ["dom", "es6","ESNext"], // 包含浏览器相关的库
"moduleResolution": "node",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["BatchV2Client.ts"]
}