mirror of
https://github.com/tsightler/ring-mqtt.git
synced 2025-10-29 03:02:26 +08:00
v5.0.0-beta3
This commit is contained in:
@@ -1,5 +1,30 @@
|
||||
const utils = require('./utils')
|
||||
// Simple worker thread implementation for WebRTC live calls.
|
||||
//
|
||||
// Ring now uses WebRTC as the primary streaming protocol for both web browsers
|
||||
// and mobile apps. As NodeJS does not have a native WebRTC implementation
|
||||
// ring-client-api leverages the werift package which is an implementation of
|
||||
// WebRTC completely in Typescript.
|
||||
//
|
||||
// The werift code offers excellent WebRTC compatibility but handling the inbound
|
||||
// RTP traffic, including the decryption of SRTP packets, conpletely within
|
||||
// Typescript/Javascript has quite significant CPU overhead. While this isn't too
|
||||
// on typical Intel processors it is far more noticable on low-power CPUs like the
|
||||
// ARM processors in common devices such as the Raspberry Pi 3/4. Even as few
|
||||
// as 2 streams can completely saturate a single core and keep the main NodeJS
|
||||
// thread nearly busy.
|
||||
//
|
||||
// This implementation allows live calls to be assigned to a pool of worker threads
|
||||
// rather than running in the main thread, allowing these calls to take advantage
|
||||
// of the additional CPU cores. This increases the number of parallel WebRTC streams
|
||||
// that can be supported on these low powered CPUs. Testing shows that an RPi 4 is
|
||||
// able to support ~5-6 streams while using ~75-80% of the available CPU resources.
|
||||
//
|
||||
// Note that to get the best reliability and least videos artifacts, the default Linux
|
||||
// buffers (net.core.rmem_default/max) should be increased from their default values
|
||||
// (~200K) to at least 1MB, with 2MB being even better.
|
||||
|
||||
const { Worker } = require('worker_threads')
|
||||
const utils = require('./utils')
|
||||
const colors = require('colors/safe')
|
||||
|
||||
class StreamWorkers {
|
||||
@@ -10,70 +35,64 @@ class StreamWorkers {
|
||||
init() {
|
||||
const cpuCores = utils.getCpuCores()
|
||||
const numWorkers = cpuCores > 4 ? 4 : Math.round(cpuCores/1.5)
|
||||
utils.debug(`Detected ${cpuCores} CPU cores, using ${numWorkers} live stream worker threads`)
|
||||
utils.debug(`Detected ${cpuCores} CPU cores, starting ${numWorkers} live stream worker${numWorkers>1 ? 's' : ''}`)
|
||||
|
||||
for (let i = 0; i < numWorkers; i++) {
|
||||
this.streamWorkers[i] = {
|
||||
liveCall: new Worker('./lib/livecall.js', {
|
||||
workerData: {
|
||||
workerId: i+1
|
||||
}
|
||||
}),
|
||||
liveCall: new Worker('./lib/livecall.js'),
|
||||
sessions: {}
|
||||
}
|
||||
|
||||
this.streamWorkers[i].liveCall.on('message', (data) => {
|
||||
const session = this.getSession(data.sessionId)
|
||||
if (session) {
|
||||
const deviceId = data.liveCallData.deviceId
|
||||
const workerId = this.getWorkerId(deviceId)
|
||||
if (workerId >= 0) {
|
||||
switch (data.state) {
|
||||
case 'active':
|
||||
utils.event.emit(`${session.deviceId}_livestream`, 'active', data.sessionId)
|
||||
utils.event.emit(`${deviceId}_livestream`, 'active')
|
||||
this.streamWorkers[workerId].sessions[deviceId].streamData.sessionId = data.liveCallData.sessionId
|
||||
console.log(this.streamWorkers[workerId].sessions[deviceId].streamData)
|
||||
break;
|
||||
case 'inactive':
|
||||
utils.event.emit(`${session.deviceId}_livestream`, 'inactive', data.sessionId)
|
||||
delete this.streamWorkers[session.workerId].sessions[data.sessionId]
|
||||
utils.event.emit(`${deviceId}_livestream`, 'inactive')
|
||||
delete this.streamWorkers[workerId].sessions[deviceId]
|
||||
break;
|
||||
case 'failed':
|
||||
utils.event.emit(`${session.deviceId}_livestream`, 'failed', data.sessionId)
|
||||
delete this.streamWorkers[session.workerId].sessions[data.sessionId]
|
||||
utils.event.emit(`${deviceId}_livestream`, 'failed')
|
||||
delete this.streamWorkers[workerId].sessions[deviceId]
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
utils.event.on('start_livestream', (deviceId, deviceName, sessionId, rtspPublishUrl) => {
|
||||
let workerId = 0
|
||||
for (const id in this.streamWorkers) {
|
||||
if (Object.keys(this.streamWorkers[id].sessions).length < Object.keys(this.streamWorkers[workerId].sessions).length) {
|
||||
workerId = id
|
||||
}
|
||||
utils.event.on('start_livestream', (streamData) => {
|
||||
if (this.getWorkerId(streamData.deviceId) < 0) {
|
||||
// Create an array with the number of active sessions per worker
|
||||
const workerSessions = this.streamWorkers.map(worker => Object.keys(worker.sessions).length)
|
||||
// Find the fewest number of active sessions for any worker
|
||||
const fewestSessions = Math.min(...workerSessions)
|
||||
// Find index of first worker that matches the fewest active sessions
|
||||
const workerId = workerSessions.indexOf(fewestSessions)
|
||||
utils.debug(colors.green(`[${streamData.deviceName}] `)+`Live stream assigned to worker ${parseInt(workerId)+1} with ${fewestSessions} active session${fewestSessions === 1 ? '' : 's'}`)
|
||||
this.streamWorkers[workerId].sessions[streamData.deviceId] = { streamData }
|
||||
this.streamWorkers[workerId].liveCall.postMessage({ command: 'start', streamData })
|
||||
console.log(streamData)
|
||||
}
|
||||
utils.debug(colors.green(`[${deviceName}] `)+`Live stream request allocated to worker ${parseInt(workerId)+1} with ${Object.keys(this.streamWorkers[workerId].sessions).length} active sessions`)
|
||||
this.streamWorkers[workerId].sessions[sessionId] = { deviceId, workerId }
|
||||
this.streamWorkers[workerId].liveCall.postMessage({ command: 'start', deviceName, sessionId, rtspPublishUrl })
|
||||
})
|
||||
|
||||
utils.event.on('stop_livestream', (sessionId) => {
|
||||
const session = this.getSession(sessionId)
|
||||
if (session) {
|
||||
this.streamWorkers[session.workerId].liveCall.postMessage({ command: 'stop', sessionId })
|
||||
utils.event.on('stop_livestream', (deviceId) => {
|
||||
const workerId = this.getWorkerId(deviceId)
|
||||
if (workerId >= 0) {
|
||||
this.streamWorkers[workerId].liveCall.postMessage({ command: 'stop', streamData: this.streamWorkers[workerId].sessions[deviceId].streamData })
|
||||
} else {
|
||||
utils.debug('Received requirest to stop live stream session but no active session found')
|
||||
utils.debug('Received request to stop live stream session but no active session found')
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getSession(sessionId) {
|
||||
let allSessions = {}
|
||||
for (const worker of this.streamWorkers) {
|
||||
allSessions = { ...allSessions, ...worker.sessions }
|
||||
}
|
||||
if (allSessions.hasOwnProperty(sessionId)) {
|
||||
return allSessions[sessionId]
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
getWorkerId(deviceId) {
|
||||
return this.streamWorkers.findIndex(worker => worker.sessions.hasOwnProperty(deviceId))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user