/** * BatchV2Client - WebRTC client for Monibuca BatchV2 protocol * Handles WebRTC connection, publishing, and subscribing to multiple streams */ class BatchV2Client { ws = null; pc = null; localStream = null; subscribedStreams = new Set(); videoSenders = new Map(); streamToTransceiver = new Map(); eventListeners = new Map(); wsUrl; /** * Create a new BatchV2Client * @param host Optional host for WebSocket connection. Defaults to current location */ constructor(host) { // Determine WebSocket URL const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; this.wsUrl = host ? `${wsProtocol}//${host}/webrtc/batchv2` : `${wsProtocol}//${location.host}/webrtc/batchv2`; } /** * Connect to the WebRTC server * @returns Promise that resolves when connection is established */ async connect() { try { this.log(`Connecting to ${this.wsUrl}...`); // Create WebSocket connection this.ws = new WebSocket(this.wsUrl); return new Promise((resolve, reject) => { if (!this.ws) { reject(new Error('WebSocket not initialized')); return; } this.ws.onopen = async () => { this.log('WebSocket connection established', 'success'); // Create and initialize PeerConnection const configuration = { iceTransportPolicy: 'all', bundlePolicy: 'max-bundle', rtcpMuxPolicy: 'require', iceCandidatePoolSize: 1 }; this.pc = new RTCPeerConnection(configuration); // Use addTransceiver to create sender and receiver const videoTransceiver = this.pc.addTransceiver('video', { direction: 'sendrecv' }); // Store sender reference this.videoSenders.set('placeholder', videoTransceiver.sender); this.log('Added placeholder tracks to PeerConnection', 'info'); // Set up event handlers this.setupPeerConnectionEventHandlers(); const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); // Send offer to server this.sendMessage({ type: 'offer', sdp: this.pc.localDescription.sdp }); this.emit('connected', null); resolve(); }; this.ws.onmessage = this.handleWebSocketMessage.bind(this); this.ws.onclose = () => { this.log('WebSocket connection closed'); this.cleanup(); this.emit('disconnected', null); reject(new Error('WebSocket connection closed')); }; this.ws.onerror = (error) => { this.log(`WebSocket error: ${error}`, 'error'); this.cleanup(); this.emit('error', { message: 'WebSocket error' }); reject(new Error('WebSocket error')); }; }); } catch (error) { this.log(`Connection error: ${error.message}`, 'error'); this.cleanup(); this.emit('error', { message: error.message }); throw error; } } /** * Disconnect from the WebRTC server */ disconnect() { this.cleanup(); } /** * Start publishing a stream * @param streamPath Path for the stream * @returns Promise that resolves when publishing starts */ async startPublishing(streamPath) { try { if (!streamPath) { throw new Error('Please enter a valid stream path'); } if (!this.pc || !this.ws) { throw new Error('Not connected to server'); } // Get user media - only video needed this.localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: false }); // Get actual video track const videoTrack = this.localStream.getVideoTracks()[0]; // Use existing sender to replace track const videoSender = this.videoSenders.get('placeholder'); if (videoSender) { await videoSender.replaceTrack(videoTrack); this.log('Replaced placeholder video track with real track', 'success'); } // Update sender mapping this.videoSenders.delete('placeholder'); this.videoSenders.set(streamPath, videoSender); // Create new offer const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); // Wait for ICE gathering to complete with a timeout await this.waitForIceGathering(); // Send publish signal this.sendMessage({ type: 'publish', streamPath: streamPath, offer: this.pc.localDescription.sdp }); this.log(`Started publishing to ${streamPath}`, 'success'); this.emit('publishStarted', { streamPath }); return Promise.resolve(); } catch (error) { this.log(`Publishing error: ${error.message}`, 'error'); this.emit('error', { message: error.message }); throw error; } } /** * Stop publishing a stream * @param streamPath Path of the stream to stop * @returns Promise that resolves when publishing stops */ async stopPublishing(streamPath) { try { if (!this.pc || !this.ws) { throw new Error('Not connected to server'); } // Get current sender const videoSender = this.videoSenders.get(streamPath); // Set track to null if (videoSender) { await videoSender.replaceTrack(null); this.log('Removed video track', 'info'); // Update sender mapping this.videoSenders.delete(streamPath); this.videoSenders.set('placeholder', videoSender); } // Stop local stream if (this.localStream) { this.localStream.getTracks().forEach(track => track.stop()); this.localStream = null; } // Create new offer const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); // Wait for ICE gathering to complete with a timeout await this.waitForIceGathering(); // Send unpublish signal this.sendMessage({ type: 'unpublish', streamPath: streamPath }); this.log(`Stopped publishing to ${streamPath}`, 'success'); this.emit('publishStopped', { streamPath }); return Promise.resolve(); } catch (error) { this.log(`Error stopping publish: ${error.message}`, 'error'); this.emit('error', { message: error.message }); throw error; } } /** * Get list of available streams */ getStreamList() { if (!this.ws) { this.log('Not connected to server', 'error'); return; } // Send getStreamList signal this.sendMessage({ type: 'getStreamList' }); this.log('Requested stream list', 'info'); } /** * Subscribe to streams * @param streamPaths Array of stream paths to subscribe to * @returns Promise that resolves when subscription is complete */ async subscribeToStreams(streamPaths) { try { if (!this.pc || !this.ws) { throw new Error('Not connected to server'); } if (streamPaths.length === 0) { throw new Error('Please select at least one stream'); } // Get the current subscribed streams before clearing const previousStreams = new Set(this.subscribedStreams); // Clear current subscriptions this.subscribedStreams.clear(); // Add all selected streams to the subscription list streamPaths.forEach(path => { this.subscribedStreams.add(path); }); // Find streams that were previously subscribed but are no longer in the list const removedStreams = []; previousStreams.forEach(stream => { if (!this.subscribedStreams.has(stream)) { // Get the transceiver associated with this stream const transceiver = this.streamToTransceiver.get(stream); // Set the transceiver to inactive if it exists if (transceiver) { transceiver.direction = 'inactive'; this.log(`Set transceiver for removed stream ${stream} to inactive`, 'info'); this.streamToTransceiver.delete(stream); } // Add to removed streams list removedStreams.push(stream); // Emit stream removed event this.emit('streamRemoved', { streamPath: stream }); } }); // Send unsubscribe signal for removed streams if (removedStreams.length > 0) { await this.sendUnsubscribeSignal(removedStreams); } // Find streams that need to be newly added const newStreamPaths = Array.from(this.subscribedStreams).filter(path => !previousStreams.has(path)); this.log(`New stream paths: ${newStreamPaths.join(', ')}`, 'info'); // If there are new streams to subscribe to if (newStreamPaths.length > 0) { // Get all video transceivers that are inactive const availableTransceivers = this.pc.getTransceivers().filter(transceiver => transceiver.direction === 'inactive'); this.log(`Available transceivers: ${availableTransceivers.length}`, 'info'); // Use available transceivers for new streams let remainingNewStreams = [...newStreamPaths]; while (remainingNewStreams.length > 0 && availableTransceivers.length > 0) { remainingNewStreams.pop(); availableTransceivers.pop().direction = 'recvonly'; } const transceiverToAdd = remainingNewStreams.length; // If available transceivers are not enough, create new ones if (transceiverToAdd > 0) { this.log(`Adding ${transceiverToAdd} new video transceivers`, 'info'); for (let i = 0; i < transceiverToAdd; i++) { this.pc.addTransceiver('video', { direction: 'recvonly' }); } } // Create offer const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); // Send subscribe signal only for the new streams this.sendMessage({ type: 'subscribe', streamList: newStreamPaths, offer: this.pc.localDescription.sdp }); this.log(`Subscribing to ${newStreamPaths.length} new streams`, 'success'); } this.log(`Total playing streams: ${this.subscribedStreams.size}`, 'success'); return Promise.resolve(); } catch (error) { this.log(`Error playing streams: ${error.message}`, 'error'); this.emit('error', { message: error.message }); throw error; } } /** * Send unsubscribe signal to the server * @param streamPaths Array of stream paths to unsubscribe from * @returns Promise that resolves when the unsubscribe signal is sent */ async sendUnsubscribeSignal(streamPaths) { if (!this.ws || !this.pc) { this.log('Not connected to server', 'error'); return; } if (streamPaths.length === 0) { return; } try { // Create offer for SDP exchange const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); // Wait for ICE gathering to complete await this.waitForIceGathering(); // Send unsubscribe signal with SDP this.sendMessage({ type: 'unsubscribe', streamList: streamPaths, offer: this.pc.localDescription.sdp }); this.log(`Sent unsubscribe signal for ${streamPaths.length} streams`, 'info'); } catch (error) { this.log(`Error sending unsubscribe signal: ${error.message}`, 'error'); throw error; } } /** * Unsubscribe from a stream * @param streamPath Path of the stream to unsubscribe from * @returns Promise that resolves when unsubscription is complete */ async unsubscribeFromStream(streamPath) { try { if (!this.pc || !this.ws) { throw new Error('Not connected to server'); } // Get the transceiver associated with this stream const transceiver = this.streamToTransceiver.get(streamPath); // Set the transceiver to inactive if it exists if (transceiver) { transceiver.direction = 'inactive'; this.log(`Set transceiver for ${streamPath} to inactive`, 'info'); this.streamToTransceiver.delete(streamPath); // Send unsubscribe signal with SDP exchange await this.sendUnsubscribeSignal([streamPath]); } this.subscribedStreams.delete(streamPath); this.emit('streamRemoved', { streamPath }); this.log(`Removed ${streamPath} from subscription list`, 'info'); return Promise.resolve(); } catch (error) { this.log(`Error unsubscribing from stream: ${error.message}`, 'error'); this.emit('error', { message: error.message }); throw error; } } /** * Get the local media stream * @returns The local media stream or null if not publishing */ getLocalStream() { return this.localStream; } /** * Get the list of currently subscribed streams * @returns Array of stream paths */ getSubscribedStreams() { return Array.from(this.subscribedStreams); } /** * Add event listener * @param event Event type * @param listener Event listener function */ on(event, listener) { if (!this.eventListeners.has(event)) { this.eventListeners.set(event, []); } this.eventListeners.get(event).push(listener); } /** * Remove event listener * @param event Event type * @param listener Event listener function to remove */ off(event, listener) { if (!this.eventListeners.has(event)) { return; } const listeners = this.eventListeners.get(event); const index = listeners.indexOf(listener); if (index !== -1) { listeners.splice(index, 1); } } /** * Emit an event * @param event Event type * @param data Event data */ emit(event, data) { if (!this.eventListeners.has(event)) { return; } const listeners = this.eventListeners.get(event); for (const listener of listeners) { listener(data); } } /** * Log a message and emit a log event * @param message Message to log * @param level Log level */ log(message, level = 'info') { this.emit('log', { message, level, time: new Date() }); } /** * Set up event handlers for the peer connection */ setupPeerConnectionEventHandlers() { if (!this.pc) { return; } this.pc.onicecandidate = event => { if (event.candidate) { this.log('ICE candidate: ' + event.candidate.candidate); } else { this.log('ICE gathering complete'); } }; this.pc.onicegatheringstatechange = () => { this.log(`ICE gathering state: ${this.pc.iceGatheringState}`); this.emit('iceStateChange', { state: this.pc.iceGatheringState }); }; this.pc.oniceconnectionstatechange = () => { this.log(`ICE connection state: ${this.pc.iceConnectionState}`); this.emit('iceStateChange', { state: this.pc.iceConnectionState }); if (this.pc.iceConnectionState === 'failed') { this.log('ICE connection failed', 'error'); } }; this.pc.onconnectionstatechange = () => { this.log(`Connection state changed: ${this.pc.connectionState}`); this.emit('connectionStateChange', { state: this.pc.connectionState }); if (this.pc.connectionState === 'connected') { this.log('PeerConnection established successfully', 'success'); } }; this.pc.ontrack = this.handleTrackEvent.bind(this); } /** * Handle track events from the peer connection * @param event Track event */ handleTrackEvent(event) { this.log(`Track received: ${event.track.kind}/${event.track.id}`, 'success'); // Get transceiver directly from event const transceiver = event.transceiver; if (!transceiver) { this.log(`Could not find transceiver for track: ${event.track.id}`, 'warning'); } // Add track statistics const stats = {}; event.track.onunmute = () => { this.log(`Track unmuted: ${event.track.kind}/${event.track.id}`, 'success'); }; // Periodically get statistics const statsInterval = setInterval(async () => { if (!this.pc || this.pc.connectionState !== 'connected') { this.log('Connection state changed, stopping stats collection', 'info'); clearInterval(statsInterval); return; } try { const rtcStats = await this.pc.getStats(event.track); rtcStats.forEach(stat => { if (stat.type === 'inbound-rtp' && stat.kind === event.track.kind) { const packetsReceived = stat.packetsReceived || 0; const prevPackets = stats[event.track.id] || 0; if (prevPackets !== packetsReceived) { stats[event.track.id] = packetsReceived; } } }); } catch (e) { this.log(`Error getting stats: ${e.message}`, 'error'); } }, 5000); // Update every 5 seconds if (event.track.kind === 'video' && event.streams[0]) { const streamId = event.streams[0].id; this.streamToTransceiver.set(streamId, transceiver); // Emit stream added event with stream information this.emit('streamAdded', { streamId, stream: event.streams[0], track: event.track }); } } /** * Handle WebSocket messages * @param event WebSocket message event */ async handleWebSocketMessage(event) { const message = JSON.parse(event.data); this.log(`Received message: ${message.type}`); if ('type' in message) { if (message.type === 'answer') { const answer = new RTCSessionDescription({ type: 'answer', sdp: message.sdp }); await this.pc.setRemoteDescription(answer); this.log('Remote description set', 'success'); } else if (message.type === 'error') { this.log(`Error: ${message.message}`, 'error'); this.emit('error', { message: message.message }); } else if (message.type === 'streamList') { this.log(`Received stream list with ${message.streams.length} streams`, 'info'); this.emit('streamList', { streams: message.streams }); } } } /** * Send a message to the WebSocket server * @param message Message to send */ sendMessage(message) { if (!this.ws) { this.log('Not connected to server', 'error'); return; } this.ws.send(JSON.stringify(message)); } /** * Wait for ICE gathering to complete with a timeout * @param timeout Timeout in milliseconds * @returns Promise that resolves when ICE gathering is complete or timeout is reached */ async waitForIceGathering(timeout = 2000) { if (!this.pc) { return Promise.reject(new Error('PeerConnection not initialized')); } return Promise.race([ new Promise(resolve => { if (this.pc.iceGatheringState === 'complete') { resolve(); } else { const checkState = () => { if (this.pc.iceGatheringState === 'complete') { this.pc.removeEventListener('icegatheringstatechange', checkState); resolve(); } }; this.pc.addEventListener('icegatheringstatechange', checkState); } }), new Promise(resolve => setTimeout(resolve, timeout)) ]); } /** * Clean up all resources */ cleanup() { // Close WebSocket if (this.ws) { this.ws.close(); this.ws = null; } // Close PeerConnection if (this.pc) { this.pc.close(); this.pc = null; } // Stop local stream if (this.localStream) { this.localStream.getTracks().forEach(track => track.stop()); this.localStream = null; } // Clear subscribed streams this.subscribedStreams.clear(); // Clear senders and transceiver mappings this.videoSenders.clear(); this.streamToTransceiver.clear(); this.log('Connection cleaned up', 'info'); } } export default BatchV2Client;