Streaming pipeline enhancements

Streaming tweaks

Continued streaming tweaks

Tweak snapshot ffmpeg process handling

Snapshot stream performance tuning

Tweak live stream startup

More snapshot stream tweaks

Tweak livesnaps pipeline

Tweak livesnaps stream

Refactor stream worker code

Snapshot stream improvements

Split camera stream handlers from main code
This commit is contained in:
Tom Sightler
2025-02-03 23:27:19 -05:00
parent 7ea1db8eee
commit 2960a083f3
12 changed files with 660 additions and 452 deletions

View File

@@ -1,112 +0,0 @@
import { parentPort, workerData } from 'worker_threads'
import { WebrtcConnection } from '../lib/streaming/webrtc-connection.js'
import { StreamingSession } from '../lib/streaming/streaming-session.js'
const deviceName = workerData.deviceName
const doorbotId = workerData.doorbotId
let liveStream = false
let streamStopping = false
parentPort.on("message", async(data) => {
const streamData = data.streamData
switch (data.command) {
case 'start':
if (streamStopping) {
parentPort.postMessage({type: 'log_error', data: "Live stream could not be started because it is in stopping state"})
parentPort.postMessage({type: 'state', data: 'failed'})
} else if (!liveStream) {
startLiveStream(streamData)
} else {
parentPort.postMessage({type: 'log_error', data: "Live stream could not be started because there is already an active stream"})
parentPort.postMessage({type: 'state', data: 'active'})
}
break;
case 'stop':
if (liveStream) {
stopLiveStream()
}
break;
}
})
async function startLiveStream(streamData) {
parentPort.postMessage({type: 'log_info', data: 'Live stream WebRTC worker received start command'})
try {
const cameraData = {
name: deviceName,
id: doorbotId
}
const streamConnection = new WebrtcConnection(streamData.ticket, cameraData)
liveStream = new StreamingSession(cameraData, streamConnection)
liveStream.connection.pc.onConnectionState.subscribe(async (data) => {
switch(data) {
case 'connected':
parentPort.postMessage({type: 'state', data: 'active'})
parentPort.postMessage({type: 'log_info', data: 'Live stream WebRTC session is connected'})
break;
case 'failed':
parentPort.postMessage({type: 'state', data: 'failed'})
parentPort.postMessage({type: 'log_info', data: 'Live stream WebRTC connection has failed'})
liveStream.stop()
await new Promise(res => setTimeout(res, 2000))
liveStream = false
break;
}
})
parentPort.postMessage({type: 'log_info', data: 'Live stream transcoding process is starting'})
await liveStream.startTranscoding({
// The native AVC video stream is copied to the RTSP server unmodified while the audio
// stream is converted into two output streams using both AAC and Opus codecs. This
// provides a stream with wide compatibility across various media player technologies.
audio: [
'-map', '0:v',
'-map', '0:a',
'-map', '0:a',
'-c:a:0', 'aac',
'-c:a:1', 'copy',
],
video: [
'-c:v', 'copy'
],
output: [
'-flags', '+global_header',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
streamData.rtspPublishUrl
]
})
parentPort.postMessage({type: 'log_info', data: 'Live stream transcoding process has started'})
liveStream.onCallEnded.subscribe(() => {
parentPort.postMessage({type: 'log_info', data: 'Live stream WebRTC session has disconnected'})
parentPort.postMessage({type: 'state', data: 'inactive'})
liveStream = false
})
} catch(error) {
parentPort.postMessage({type: 'log_error', data: error})
parentPort.postMessage({type: 'state', data: 'failed'})
liveStream = false
}
}
async function stopLiveStream() {
if (!streamStopping) {
streamStopping = true
let stopTimeout = 10
liveStream.stop()
do {
await new Promise(res => setTimeout(res, 200))
if (liveStream) {
parentPort.postMessage({type: 'log_info', data: 'Live stream failed to stop on request, deleting anyway...'})
parentPort.postMessage({type: 'state', data: 'inactive'})
liveStream = false
}
stopTimeout--
} while (liveStream && stopTimeout)
streamStopping = false
}
}

View File

@@ -1,15 +1,27 @@
import RingPolledDevice from './base-polled-device.js'
import utils from '../lib/utils.js'
import pathToFfmpeg from 'ffmpeg-for-homebridge'
import { Worker } from 'worker_threads'
import { spawn } from 'child_process'
import { parseISO, addSeconds } from 'date-fns';
import { parseISO, addSeconds } from 'date-fns'
import chalk from 'chalk'
import { LiveStream } from './stream/live.js'
import { SnapshotStream } from './stream/snapshot.js'
import { EventStream } from './stream/event.js'
export default class Camera extends RingPolledDevice {
constructor(deviceInfo, events) {
super(deviceInfo, 'camera')
this.streams = {
live: new LiveStream(this),
snapshot: new SnapshotStream(this),
event: new EventStream(this)
}
this.rtspCredentials = utils.config().livestream_user && utils.config().livestream_pass
? `${utils.config().livestream_user}:${utils.config().livestream_pass}@`
: ''
const savedState = this.getSavedState()
this.hasBattery1 = Boolean(this.device.data.hasOwnProperty('battery_voltage'))
@@ -22,7 +34,7 @@ export default class Camera extends RingPolledDevice {
this.data = {
motion: {
active_ding: false,
duration: savedState?.motion?.duration ? savedState.motion.duration : 180,
duration: savedState?.motion?.duration ? savedState.motion.duration : 30,
publishedDuration: false,
last_ding: 0,
last_ding_expires: 0,
@@ -36,7 +48,7 @@ export default class Camera extends RingPolledDevice {
...this.device.isDoorbot ? {
ding: {
active_ding: false,
duration: savedState?.ding?.duration ? savedState.ding.duration : 180,
duration: savedState?.ding?.duration ? savedState.ding.duration : 30,
publishedDurations: false,
last_ding: 0,
last_ding_expires: 0,
@@ -46,48 +58,36 @@ export default class Camera extends RingPolledDevice {
}
} : {},
snapshot: {
mode: savedState?.snapshot?.mode
? savedState.snapshot.mode.replace(/(^\w{1})|(\s+\w{1})/g, letter => letter.toUpperCase())
: 'Auto',
ding: false,
motion: false,
interval: false,
autoInterval: savedState?.snapshot?.autoInterval
? savedState.snapshot.autoInterval
: true,
demandTimestamp: 0,
ding: false,
mode: savedState?.snapshot?.mode
? savedState.snapshot.mode.replace(/(^\w{1})|(\s+\w{1})/g, letter => letter.toUpperCase())
: 'Auto',
motion: false,
image: null,
imageType: null,
imageTime: 0,
interval: false,
intervalDuration: savedState?.snapshot?.intervalDuration
? savedState.snapshot.intervalDuration
: (this.device.operatingOnBattery) ? 600 : 30,
intervalTimerId: null,
cache: null,
cacheType: null,
timestamp: null,
onDemandTimestamp: 0
intervalTimerId: null
},
stream: {
live: {
state: 'OFF',
status: 'inactive',
session: false,
publishedStatus: '',
worker: new Worker('./devices/camera-livestream.js', {
workerData: {
doorbotId: this.device.id,
deviceName: this.deviceData.name
}
})
},
event: {
state: 'OFF',
status: 'inactive',
session: false,
publishedStatus: ''
},
snapshot: {
state: 'OFF',
status: 'inactive',
session: false,
publishedStatus: ''
},
keepalive:{
active: false,
@@ -255,41 +255,12 @@ export default class Camera extends RingPolledDevice {
}
}
this.data.stream.live.worker.on('message', (message) => {
if (message.type === 'state') {
switch (message.data) {
case 'active':
this.data.stream.live.status = 'active'
this.data.stream.live.session = true
break;
case 'inactive':
this.data.stream.live.status = 'inactive'
this.data.stream.live.session = false
break;
case 'failed':
this.data.stream.live.status = 'failed'
this.data.stream.live.session = false
break;
}
this.publishStreamState()
} else {
switch (message.type) {
case 'log_info':
this.debug(message.data, 'wrtc')
break;
case 'log_error':
this.debug(chalk.redBright(message.data), 'wrtc')
break;
}
}
})
this.device.onNewNotification.subscribe(notification => {
this.processNotification(notification)
})
this.updateSnapshotMode()
this.scheduleSnapshotRefresh()
this.scheduleSnapshotUpdate()
this.updateDeviceState()
}
@@ -400,26 +371,24 @@ export default class Camera extends RingPolledDevice {
}
let stillImageUrlBase = 'localhost'
let streamSourceUrlBase
let rtspSourceHost
if (process.env.RUNMODE === 'addon') {
// For the addon we get some values populated from the startup script
// that queries the HA API via bashio
stillImageUrlBase = process.env.HAHOSTNAME
streamSourceUrlBase = process.env.ADDONHOSTNAME
rtspSourceHost = process.env.ADDONHOSTNAME
} else if (process.env.RUNMODE === 'docker') {
// For docker we don't have any API to query so we just use the IP of the docker container
// since it probably doesn't have a DNS entry
streamSourceUrlBase = await utils.getHostIp()
rtspSourceHost = await utils.getHostIp()
} else {
// For the stadalone install we try to get the host FQDN
streamSourceUrlBase = await utils.getHostFqdn()
rtspSourceHost = await utils.getHostFqdn()
}
// Set some helper attributes for streaming
this.data.stream.live.stillImageURL = `https://${stillImageUrlBase}:8123{{ states.camera.${this.device.name.toLowerCase().replace(" ","_")}_snapshot.attributes.entity_picture }}`,
this.data.stream.live.streamSource = (utils.config().livestream_user && utils.config().livestream_pass)
? `rtsp://${utils.config().livestream_user}:${utils.config().livestream_pass}@${streamSourceUrlBase}:8554/${this.deviceId}_live`
: `rtsp://${streamSourceUrlBase}:8554/${this.deviceId}_live`
this.data.stream.live.streamSource = `rtsp://${this.rtspCredentials}${rtspSourceHost}:8554/${this.deviceId}_live`
}
updateSnapshotMode() {
@@ -472,7 +441,7 @@ export default class Camera extends RingPolledDevice {
this.publishDingDurationState(isPublish)
this.publishSnapshotMode()
if (this.data.snapshot.motion || this.data.snapshot.ding || this.data.snapshot.interval) {
this.data.snapshot.cache ? this.publishSnapshot() : this.refreshSnapshot('interval')
this.data.snapshot.image ? this.publishSnapshot() : this.updateSnapshot('interval')
this.publishSnapshotInterval(isPublish)
}
this.publishAttributes()
@@ -512,6 +481,10 @@ export default class Camera extends RingPolledDevice {
}
this.debug(`Received ${dingKind} push notification, expires in ${this.data[dingKind].duration} seconds`)
if (this.data.stream.snapshot.status === 'active') {
this.streams.snapshot.startLivesnaps(this.data[dingKind].duration)
}
// Is this a new Ding or refresh of active ding?
const newDing = Boolean(!this.data[dingKind].active_ding)
this.data[dingKind].active_ding = true
@@ -525,11 +498,11 @@ export default class Camera extends RingPolledDevice {
if (dingKind === 'motion') {
this.data[dingKind].is_person = Boolean(pushData.data?.event?.ding?.detection_type === 'human')
if (this.data.snapshot.motion) {
this.refreshSnapshot('motion', pushData?.img?.snapshot_uuid)
this.updateSnapshot('motion', pushData?.img?.snapshot_uuid)
}
} else if (this.data.snapshot.ding) {
// If doorbell press and snapshots on ding are enabled, publish a new snapshot
this.refreshSnapshot('ding', pushData?.img?.snapshot_uuid)
this.updateSnapshot('ding', pushData?.img?.snapshot_uuid)
}
// Publish MQTT active sensor state
@@ -678,7 +651,7 @@ export default class Camera extends RingPolledDevice {
if (this.data.snapshot.autoInterval && this.data.snapshot.intervalDuration !== this.device.data.settings.lite_24x7.frequency_secs) {
this.data.snapshot.intervalDuration = this.device.data.settings.lite_24x7.frequency_secs
clearInterval(this.data.snapshot.intervalTimerId)
this.scheduleSnapshotRefresh()
this.scheduleSnapshotUpdate()
}
this.mqttPublish(this.entity.snapshot_interval.state_topic, this.data.snapshot.intervalDuration.toString())
}
@@ -689,22 +662,22 @@ export default class Camera extends RingPolledDevice {
}
publishStreamState(isPublish) {
['live', 'event', 'snapshot'].forEach(type => {
Object.entries(this.streams).forEach(([type, stream]) => {
const entityProp = (type === 'live') ? 'stream' : `${type}_stream`
if (this.entity.hasOwnProperty(entityProp)) {
const streamState = (this.data.stream[type].status === 'active' || this.data.stream[type].status === 'activating') ? 'ON' : 'OFF'
const streamState = (stream.status === 'active' || stream.status === 'activating') ? 'ON' : 'OFF'
if (streamState !== this.data.stream[type].state || isPublish) {
this.data.stream[type].state = streamState
this.mqttPublish(this.entity[entityProp].state_topic, this.data.stream[type].state)
// Publish state to IPC broker as well
utils.event.emit('mqtt_ipc_publish', this.entity[entityProp].state_topic, this.data.stream[type].state)
this.mqttPublish(this.entity[entityProp].state_topic, streamState)
utils.event.emit('mqtt_ipc_publish', this.entity[entityProp].state_topic, streamState)
}
if (this.data.stream[type].publishedStatus !== this.data.stream[type].status || isPublish) {
this.data.stream[type].publishedStatus = this.data.stream[type].status
const attributes = { status: this.data.stream[type].status }
if (this.data.stream[type].status !== stream.status || isPublish) {
this.data.stream[type].status = stream.status
const attributes = { status: stream.status }
this.mqttPublish(this.entity[entityProp].json_attributes_topic, JSON.stringify(attributes), 'attr')
// Publish attribute state to IPC broker as well
utils.event.emit('mqtt_ipc_publish', this.entity[entityProp].json_attributes_topic, JSON.stringify(attributes))
}
}
@@ -735,24 +708,24 @@ export default class Camera extends RingPolledDevice {
// Publish snapshot image/metadata
publishSnapshot() {
this.mqttPublish(this.entity.snapshot.topic, this.data.snapshot.cache, 'mqtt', '<binary_image_data>')
this.mqttPublish(this.entity.snapshot.topic, this.data.snapshot.image, 'mqtt', '<binary_image_data>')
const attributes = {
timestamp: this.data.snapshot.timestamp,
type: this.data.snapshot.cacheType
timestamp: this.data.snapshot.imageTime,
type: this.data.snapshot.imageType
}
this.mqttPublish(this.entity.snapshot.json_attributes_topic, JSON.stringify(attributes), 'attr')
}
// Refresh snapshot on scheduled interval
scheduleSnapshotRefresh() {
scheduleSnapshotUpdate() {
this.data.snapshot.intervalTimerId = setInterval(() => {
if (this.isOnline() && this.data.snapshot.interval && !(this.data.snapshot.motion && this.data.motion.active_ding)) {
this.refreshSnapshot('interval')
this.updateSnapshot('interval')
}
}, this.data.snapshot.intervalDuration * 1000)
}
async refreshSnapshot(type, image_uuid) {
async updateSnapshot(type, image_uuid) {
let newSnapshot = false
let loop = 3
@@ -767,7 +740,7 @@ export default class Camera extends RingPolledDevice {
case 'interval':
case 'on-demand':
this.debug(`Requesting an updated ${type} snapshot`)
newSnapshot = await this.device.getNextSnapshot({ force: true })
newSnapshot = await this.device.getNextSnapshot({ afterMs: (this.data.snapshot.imageTime + 1) * 1000 , maxWaitMs: 2500, force: true })
break;
case 'motion':
case 'ding':
@@ -776,15 +749,14 @@ export default class Camera extends RingPolledDevice {
newSnapshot = await this.device.getNextSnapshot({ uuid: image_uuid })
} else if (!this.device.operatingOnBattery) {
this.debug(`Requesting an updated ${type} snapshot`)
newSnapshot = await this.device.getNextSnapshot({ force: true })
newSnapshot = await this.device.getNextSnapshot({ afterMs: (this.data.snapshot.imageTime + 1) * 1000 , maxWaitMs: 2500, force: true })
} else {
this.debug(`The ${type} notification did not contain image UUID and battery cameras are unable to snapshot while recording`)
loop = 0 // Don't retry in this case
}
break;
}
} catch (err) {
this.debug(err)
} catch {
if (loop > 1) {
this.debug(`Failed to retrieve updated ${type} snapshot, retrying in one second...`)
await utils.sleep(1)
@@ -797,178 +769,18 @@ export default class Camera extends RingPolledDevice {
if (newSnapshot) {
this.debug(`Successfully retrieved updated ${type} snapshot`)
this.data.snapshot.cache = newSnapshot
this.data.snapshot.cacheType = type
this.data.snapshot.timestamp = Math.round(Date.now()/1000)
this.data.snapshot.image = newSnapshot
this.data.snapshot.imageType = type
this.data.snapshot.imageTime = Math.round(Date.now()/1000)
this.publishSnapshot()
}
}
async startLiveStream(rtspPublishUrl) {
this.data.stream.live.session = true
const streamData = {
rtspPublishUrl,
ticket: null
}
try {
this.debug('Acquiring a live stream WebRTC signaling session ticket')
const response = await this.device.restClient.request({
method: 'POST',
url: 'https://app.ring.com/api/v1/clap/ticket/request/signalsocket'
})
streamData.ticket = response.ticket
} catch(error) {
if (error?.response?.statusCode === 403) {
this.debug(`Camera returned 403 when starting a live stream. This usually indicates that live streaming is blocked by Modes settings. Check your Ring app and verify that you are able to stream from this camera with the current Modes settings.`)
} else {
this.debug(error)
}
}
if (streamData.ticket) {
this.debug('Live stream WebRTC signaling session ticket acquired, starting live stream worker')
this.data.stream.live.worker.postMessage({ command: 'start', streamData })
} else {
this.debug('Live stream failed to initialize WebRTC signaling session')
this.data.stream.live.status = 'failed'
this.data.stream.live.session = false
this.publishStreamState()
}
}
async startEventStream(rtspPublishUrl) {
const eventSelect = this.data.event_select.state.split(' ')
const eventType = eventSelect[0].toLowerCase().replace('-', '_')
const eventNumber = eventSelect[1]
if (this.data.event_select.recordingUrl.match(/Recording Not Found|Transcoding in Progress/)) {
this.debug(`No recording available for the ${(eventNumber==1?"":eventNumber==2?"2nd ":eventNumber==3?"3rd ":eventNumber+"th ")}most recent ${eventType} event!`)
this.data.stream.event.status = 'failed'
this.data.stream.event.session = false
this.publishStreamState()
return
}
this.debug(`Streaming the ${(eventNumber==1?"":eventNumber==2?"2nd ":eventNumber==3?"3rd ":eventNumber+"th ")}most recently recorded ${eventType} event`)
try {
if (this.data.event_select.transcoded || this.hevcEnabled) {
// If camera is in HEVC mode, recordings are also in HEVC so transcode the video back to H.264/AVC on the fly
// Ring videos transcoded for download are not optimized for RTSP streaming (limited keyframes) so they must
// also be re-transcoded on-the-fly to allow streamers to join early
this.data.stream.event.session = spawn(pathToFfmpeg, [
'-re',
'-i', this.data.event_select.recordingUrl,
'-map', '0:v',
'-map', '0:a',
'-map', '0:a',
'-c:v', 'libx264',
'-g', '20',
'-keyint_min', '10',
'-crf', '23',
'-preset', 'ultrafast',
'-c:a:0', 'copy',
'-c:a:1', 'libopus',
'-flags', '+global_header',
'-rtsp_transport', 'tcp',
'-f', 'rtsp',
rtspPublishUrl
])
} else {
this.data.stream.event.session = spawn(pathToFfmpeg, [
'-re',
'-i', this.data.event_select.recordingUrl,
'-map', '0:v',
'-map', '0:a',
'-map', '0:a',
'-c:v', 'copy',
'-c:a:0', 'copy',
'-c:a:1', 'libopus',
'-flags', '+global_header',
'-rtsp_transport', 'tcp',
'-f', 'rtsp',
rtspPublishUrl
])
}
this.data.stream.event.session.on('spawn', async () => {
this.debug(`The recorded ${eventType} event stream has started`)
this.data.stream.event.status = 'active'
this.publishStreamState()
})
this.data.stream.event.session.on('close', async () => {
this.debug(`The recorded ${eventType} event stream has ended`)
this.data.stream.event.status = 'inactive'
this.data.stream.event.session = false
this.publishStreamState()
})
} catch(e) {
this.debug(e)
this.data.stream.event.status = 'failed'
this.data.stream.event.session = false
this.publishStreamState()
}
}
async startSnapshotStream(rtspPublishUrl) {
if (!this.data.snapshot.cache) {
this.debug(`Could not start the snapshot stream because No snapshot is available in the snapshot cache`)
this.data.stream.snapshot.status = 'failed'
this.data.stream.snapshot.session = false
this.publishStreamState()
return
}
this.debug(`Starting a snapshot stream using the most recently cached snapshot`)
try {
this.data.stream.snapshot.session = spawn(pathToFfmpeg, [
'-f', 'image2pipe',
'-framerate', '5',
'-i', '-',
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-vf', `scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2`,
'-flags', '+global_header',
'-rtsp_transport', 'tcp',
'-f', 'rtsp',
rtspPublishUrl
])
this.data.stream.snapshot.session.on('spawn', async () => {
this.debug(`The snapshot stream has started`)
this.data.stream.snapshot.status = 'active'
this.publishStreamState()
this.data.stream.snapshot.session.stdin.write(this.data.snapshot.cache)
})
this.data.stream.snapshot.session.on('close', async () => {
this.debug(`The snapshot stream has ended`)
this.data.stream.snapshot.status = 'inactive'
this.data.stream.snapshot.session = false
this.publishStreamState()
})
} catch(e) {
this.debug(e)
this.data.stream.snapshot.status = 'failed'
this.data.stream.snapshot.session = false
this.publishStreamState()
}
}
async startKeepaliveStream() {
const duration = 86400
if (this.data.stream.keepalive.active) { return }
this.data.stream.keepalive.active = true
const rtspPublishUrl = (utils.config().livestream_user && utils.config().livestream_pass)
? `rtsp://${utils.config().livestream_user}:${utils.config().livestream_pass}@localhost:8554/${this.deviceId}_live`
: `rtsp://localhost:8554/${this.deviceId}_live`
this.debug(`Starting a keepalive stream for camera`)
// Keepalive stream is used only when the live stream is started
@@ -976,7 +788,7 @@ export default class Camera extends RingPolledDevice {
// trigger rtsp server to start the on-demand stream and keep it running
// when there are no other RTSP readers.
this.data.stream.keepalive.session = spawn(pathToFfmpeg, [
'-i', rtspPublishUrl,
'-i', `rtsp://${this.rtspCredentials}localhost:8554/${this.deviceId}_live`,
'-map', '0:a:0',
'-c:a', 'copy',
'-f', 'null',
@@ -1176,13 +988,13 @@ export default class Camera extends RingPolledDevice {
this.takeSnapshot(message)
break;
case 'stream/command':
this.setLiveStreamState(message)
this.setStreamState(message, 'live')
break;
case 'event_stream/command':
this.setEventStreamState(message)
this.setStreamState(message, 'event')
break;
case 'snapshot_stream/command':
this.setSnapshotStreamState(message)
this.setStreamState(message, 'snapshot')
break;
case 'event_select/command':
this.setEventSelect(message)
@@ -1314,7 +1126,7 @@ export default class Camera extends RingPolledDevice {
this.publishSnapshotMode()
}
clearInterval(this.data.snapshot.intervalTimerId)
this.scheduleSnapshotRefresh()
this.scheduleSnapshotUpdate()
this.publishSnapshotInterval()
this.debug('Snapshot refresh interval has been set to '+this.data.snapshot.intervalDuration+' seconds')
this.updateDeviceState()
@@ -1324,11 +1136,11 @@ export default class Camera extends RingPolledDevice {
takeSnapshot(message) {
if (message.toLowerCase() === 'press') {
this.debug('Received command to take an on-demand snapshot')
if (this.data.snapshot.onDemandTimestamp + 10 > Math.round(Date.now()/1000 ) ) {
if (this.data.snapshot.demandTimestamp + 10 > Math.round(Date.now()/1000 ) ) {
this.debug('On-demand snapshots are limited to one snapshot every 10 seconds')
} else {
this.data.snapshot.onDemandTimestamp = Math.round(Date.now()/1000)
this.refreshSnapshot('on-demand')
this.data.snapshot.demandTimestamp = Math.round(Date.now()/1000)
this.updateSnapshot('on-demand')
}
} else {
this.debug(`Received invalid command via on-demand snapshot topic: ${message}`)
@@ -1348,7 +1160,7 @@ export default class Camera extends RingPolledDevice {
if (snapshotMode === 'Auto') {
this.debug(`Snapshot mode has been set to ${snapshotMode}, resetting to default values for camera type`)
clearInterval(this.data.snapshot.intervalTimerId)
this.scheduleSnapshotRefresh()
this.scheduleSnapshotUpdate()
this.publishSnapshotInterval()
} else {
this.debug(`Snapshot mode has been set to ${snapshotMode}`)
@@ -1358,99 +1170,41 @@ export default class Camera extends RingPolledDevice {
} else {
this.debug(`Received invalid command for snapshot mode`)
}
}
}
setLiveStreamState(message) {
setStreamState(message, streamType) {
const command = message.toLowerCase()
this.debug(`Received set live stream state ${message}`)
this.debug(`Received set ${streamType} stream state ${message}`)
if (command.startsWith('on-demand')) {
if (this.data.stream.live.status === 'active' || this.data.stream.live.status === 'activating') {
const stream = this.streams[streamType]
if (stream.status === 'active' || stream.status === 'activating') {
this.publishStreamState()
} else {
this.data.stream.live.status = 'activating'
stream.status = 'activating'
this.publishStreamState()
this.startLiveStream(message.split(' ')[1]) // Portion after space is the RTSP publish URL
stream.start(message.split(' ')[1])
}
} else {
switch (command) {
case 'on':
// Stream was manually started, create a dummy, audio only
// RTSP source stream to trigger stream startup and keep it active
this.startKeepaliveStream()
break;
if (streamType === 'live') {
// Only live stream supports manual activation
this.startKeepaliveStream()
} else {
this.debug(`${streamType} stream can only be started on-demand!`)
}
break
case 'off':
if (this.data.stream.keepalive.session) {
if (streamType === 'live' && this.data.stream.keepalive.session) {
this.debug('Stopping the keepalive stream')
this.data.stream.keepalive.session.kill()
} else if (this.data.stream.live.session) {
this.data.stream.live.worker.postMessage({ command: 'stop' })
} else {
this.data.stream.live.status = 'inactive'
this.publishStreamState()
this.streams[streamType].stop()
}
break;
break
default:
this.debug(`Received unknown command for live stream`)
}
}
}
setEventStreamState(message) {
const command = message.toLowerCase()
this.debug(`Received set event stream state ${message}`)
if (command.startsWith('on-demand')) {
if (this.data.stream.event.status === 'active' || this.data.stream.event.status === 'activating') {
this.publishStreamState()
} else {
this.data.stream.event.status = 'activating'
this.publishStreamState()
this.startEventStream(message.split(' ')[1]) // Portion after backslash is RTSP publish URL
}
} else {
switch (command) {
case 'on':
this.debug(`Event stream can only be started on-demand!`)
break;
case 'off':
if (this.data.stream.event.session) {
this.data.stream.event.session.kill()
} else {
this.data.stream.event.status = 'inactive'
this.publishStreamState()
}
break;
default:
this.debug(`Received unknown command for event stream`)
}
}
}
setSnapshotStreamState(message) {
const command = message.toLowerCase()
this.debug(`Received set snapshot stream state ${message}`)
if (command.startsWith('on-demand')) {
if (this.data.stream.snapshot.status === 'active' || this.data.stream.event.status === 'activating') {
this.publishStreamState()
} else {
this.data.stream.snapshot.status = 'activating'
this.publishStreamState()
this.startSnapshotStream(message.split(' ')[1]) // Portion after backslash is RTSP publish URL
}
} else {
switch (command) {
case 'on':
this.debug(`Snapshot stream can only be started on-demand!`)
break;
case 'off':
if (this.data.stream.snapshot.session) {
this.data.stream.snapshot.session.kill()
} else {
this.data.stream.snapshot.status = 'inactive'
this.publishStreamState()
}
break;
default:
this.debug(`Received unknown command for snapshot stream`)
this.debug(`Received unknown command for ${streamType} stream`)
}
}
}

91
devices/stream/event.js Normal file
View File

@@ -0,0 +1,91 @@
import { spawn } from 'child_process'
import pathToFfmpeg from 'ffmpeg-for-homebridge'
export class EventStream {
constructor(device) {
this.mqttCamera = device
this.status = 'inactive'
this.session = false
}
async start(rtspPublishUrl) {
const eventSelect = this.mqttCamera.data.event_select.state.split(' ')
const eventType = eventSelect[0].toLowerCase().replace('-', '_')
const eventNumber = eventSelect[1]
if (this.mqttCamera.data.event_select.recordingUrl.match(/Recording Not Found|Transcoding in Progress/)) {
this.mqttCamera.debug(`No recording available for the ${(eventNumber==1?"":eventNumber==2?"2nd ":eventNumber==3?"3rd ":eventNumber+"th ")}most recent ${eventType} event!`)
this.status = 'failed'
this.session = false
this.mqttCamera.publishStreamState()
return
}
this.mqttCamera.debug(`Streaming the ${(eventNumber==1?"":eventNumber==2?"2nd ":eventNumber==3?"3rd ":eventNumber+"th ")}most recently recorded ${eventType} event`)
try {
if (this.mqttCamera.data.event_select.transcoded || this.mqttCamera.hevcEnabled) {
// If camera is in HEVC mode, recordings are also in HEVC so transcode to H.264/AVC
this.session = spawn(pathToFfmpeg, [
'-re',
'-i', this.mqttCamera.data.event_select.recordingUrl,
'-map', '0:v',
'-map', '0:a',
'-map', '0:a',
'-c:v', 'libx264',
'-g', '20',
'-keyint_min', '10',
'-crf', '23',
'-preset', 'ultrafast',
'-c:a:0', 'copy',
'-c:a:1', 'libopus',
'-flags', '+global_header',
'-rtsp_transport', 'tcp',
'-f', 'rtsp',
rtspPublishUrl
])
} else {
this.session = spawn(pathToFfmpeg, [
'-re',
'-i', this.mqttCamera.data.event_select.recordingUrl,
'-map', '0:v',
'-map', '0:a',
'-map', '0:a',
'-c:v', 'copy',
'-c:a:0', 'copy',
'-c:a:1', 'libopus',
'-flags', '+global_header',
'-rtsp_transport', 'tcp',
'-f', 'rtsp',
rtspPublishUrl
])
}
this.session.on('spawn', async () => {
this.mqttCamera.debug(`The recorded ${eventType} event stream has started`)
this.status = 'active'
this.mqttCamera.publishStreamState()
})
this.session.on('close', async () => {
this.mqttCamera.debug(`The recorded ${eventType} event stream has ended`)
this.status = 'inactive'
this.session = false
this.mqttCamera.publishStreamState()
})
} catch(e) {
this.mqttCamera.debug(e)
this.status = 'failed'
this.session = false
this.mqttCamera.publishStreamState()
}
}
async stop() {
if (this.session) {
this.session.kill()
this.session = false
}
this.status = 'inactive'
}
}

87
devices/stream/live.js Normal file
View File

@@ -0,0 +1,87 @@
import { Worker } from 'worker_threads'
export class LiveStream {
constructor(device) {
this.mqttCamera = device
this.session = false
this.status = 'inactive'
this.worker = new Worker('./devices/stream/worker.js', {
workerData: {
doorbotId: this.mqttCamera.device.id,
deviceName: this.mqttCamera.deviceData.name
}
})
this.worker.on('message', (message) => {
if (message.type === 'state') {
switch (message.data) {
case 'active':
this.status = 'active'
this.session = true
break
case 'inactive':
this.status = 'inactive'
this.session = false
break
case 'failed':
this.status = 'failed'
this.session = false
break
}
this.mqttCamera.publishStreamState()
} else {
switch (message.type) {
case 'log_info':
this.mqttCamera.debug(message.data, 'wrtc')
break
case 'log_error':
this.mqttCamera.debug(message.data, 'wrtc-error')
break
}
}
})
}
async start(rtspPublishUrl) {
this.session = true
const streamData = {
rtspPublishUrl,
ticket: null
}
try {
this.mqttCamera.debug('Acquiring a live stream WebRTC signaling session ticket')
const response = await this.mqttCamera.device.restClient.request({
method: 'POST',
url: 'https://app.ring.com/api/v1/clap/ticket/request/signalsocket'
})
streamData.ticket = response.ticket
} catch(error) {
if (error?.response?.statusCode === 403) {
this.mqttCamera.debug('Camera returned 403 when starting a live stream. This usually indicates that live streaming is blocked by Modes settings.')
} else {
this.mqttCamera.debug(error)
}
}
if (streamData.ticket) {
this.mqttCamera.debug('Live stream WebRTC signaling session ticket acquired, starting live stream worker')
this.worker.postMessage({ command: 'start', streamData })
} else {
this.mqttCamera.debug('Live stream failed to initialize WebRTC signaling session')
this.status = 'failed'
this.session = false
this.mqttCamera.publishStreamState()
}
}
async stop() {
if (this.session) {
this.worker.postMessage({ command: 'stop' })
}
this.status = 'inactive'
this.session = false
}
}

193
devices/stream/snapshot.js Normal file
View File

@@ -0,0 +1,193 @@
import { spawn } from 'child_process'
import utils from '../../lib/utils.js'
import pathToFfmpeg from 'ffmpeg-for-homebridge'
export class SnapshotStream {
constructor(device) {
this.mqttCamera = device
this.status = 'inactive'
this.interval = null
this.rtsp = {
active: false,
session: false
}
this.snapshot = {
active: false,
session: false
}
this.livesnaps = {
active: false,
session: false,
timeout: 0
}
}
async start(rtspPublishUrl) {
if (!this.mqttCamera.data.snapshot.image) {
this.mqttCamera.debug('Snapshot stream failed to start - No available snapshot')
this.status = 'failed'
this.mqttCamera.publishStreamState()
return
}
try {
await this.startSnapshotStream(rtspPublishUrl)
} catch {
this.mqttCamera.debug('Snapshot stream failed to start - Failed to spawn ffmpeg')
this.status = 'failed'
this.mqttCamera.publishStreamState()
}
}
async startSnapshotStream(rtspPublishUrl) {
this.rtsp.session = spawn(pathToFfmpeg, [
'-f', 'mpegts',
'-probesize', '32k',
'-analyzeduration', '0',
'-i', '-',
'-ss', '0.2',
'-c:v', 'copy',
'-avioflags', 'direct',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
rtspPublishUrl
])
// Handle process exit
this.rtsp.session.on('close', () => {
this.mqttCamera.debug('Snapshot stream transcoding session has ended')
this.stop()
})
// Return a promise that resolves when the process is ready
return new Promise((resolve, reject) => {
this.rtsp.session.on('spawn', async () => {
this.snapshot.session = spawn(pathToFfmpeg, [
'-f', 'image2pipe',
'-probesize', '32k',
'-analyzeduration', '0',
'-i', '-',
'-vf', 'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
'-sws_flags', 'lanczos',
'-c:v', 'libx264',
'-b:v', '2M',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-r', '5',
'-g', '1',
'-avioflags', 'direct',
'-f', 'mpegts',
'-'
])
this.snapshot.session.on('spawn', async () => {
this.startSnapshotInterval()
this.mqttCamera.debug('Snapshot stream transcoding session has started')
this.status = 'active'
this.mqttCamera.publishStreamState()
resolve()
})
this.snapshot.session.stdout.once('data', () => {
this.snapshot.session.stdout.pipe(this.rtsp.session.stdin)
})
})
this.rtsp.session.on('error', reject)
})
}
async startLivesnaps(duration) {
if (this.livesnaps.active) { return }
this.livesnaps.active = true
this.mqttCamera.debug('Starting a live snapshot stream for camera')
this.livesnaps.session = spawn(pathToFfmpeg, [
'-rtsp_transport', 'tcp',
'-probesize', '32K',
'-analyzeduration', '0',
'-i', `rtsp://${this.mqttCamera.rtspCredentials}localhost:8554/${this.mqttCamera.deviceId}_live`,
'-vf', 'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
'-sws_flags', 'lanczos',
'-c:v', 'libx264',
'-b:v', '2M',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-r', '5',
'-g', '1',
'-avioflags', 'direct',
'-f', 'mpegts',
'pipe:1'
])
this.livesnaps.session.stdout.once('data', () => {
this.snapshot.session.stdout.unpipe(this.rtsp.session.stdin)
this.livesnaps.session.stdout.pipe(this.rtsp.session.stdin)
})
this.livesnaps.session.on('close', async () => {
this.mqttCamera.debug('The live snapshot stream has stopped')
this.mqttCamera.updateSnapshot('interval')
this.livesnaps.session.stdout.unpipe(this.rtsp.session.stdin)
this.snapshot.session.stdout.pipe(this.rtsp.session.stdin)
Object.assign(this.livesnaps, { active: false, session: false, image: false })
})
// The livesnap stream will stop after the specified duration
this.livesnaps.timeout = Math.floor(Date.now()/1000) + duration + 5
while (this.livesnaps.active && Math.floor(Date.now()/1000) < this.livesnaps.timeout) {
await utils.sleep(1)
}
if (this.livesnaps.session) {
this.livesnaps.session.kill()
}
}
startSnapshotInterval() {
this.interval = setInterval(async () => {
if (this.status === 'active') {
try {
this.snapshot.session.stdin.write(this.mqttCamera.data.snapshot.image)
} catch {
this.mqttCamera.debug('Writing image to snapshot stream failed')
this.stop()
}
} else {
this.stop()
}
}, 40)
}
async stop() {
clearInterval(this.interval)
this.interval = null
this.status = 'inactive'
this.mqttCamera.publishStreamState()
if (!this.rtsp.session) return
const rtspSession = this.rtsp.session
this.rtsp.session = false
try {
rtspSession.stdin.end()
await Promise.race([
new Promise((resolve) => {
rtspSession.stdin.once('finish', () => {
rtspSession.kill()
resolve()
})
}),
new Promise((_, reject) => {
setTimeout(() => {
reject()
}, 2000)
})
])
} catch {
rtspSession.kill('SIGKILL')
}
}
}

188
devices/stream/worker.js Normal file
View File

@@ -0,0 +1,188 @@
import { parentPort, workerData } from 'worker_threads'
import { WebrtcConnection } from './lib/webrtc-connection.js'
import { StreamingSession } from './lib/streaming-session.js'
class LiveStreamWorker {
constructor(deviceName, doorbotId) {
this.deviceName = deviceName
this.doorbotId = doorbotId
this.liveStream = null
this.stopping = false
this.initializeMessageHandler()
}
initializeMessageHandler() {
parentPort.on("message", async (data) => {
const { command, streamData } = data
try {
switch (command) {
case 'start':
await this.handleStartCommand(streamData)
break;
case 'stop':
await this.handleStopCommand()
break;
default:
this.logError(`Unknown command received: ${command}`)
}
} catch (error) {
this.logError(`Error handling command ${command}: ${error.message}`)
this.updateState('failed')
}
})
}
async handleStartCommand(streamData) {
if (this.isStreamStopping) {
this.logError("Live stream could not be started because it is in stopping state")
this.updateState('failed')
return
}
if (this.liveStream) {
this.logError("Live stream could not be started because there is already an active stream")
this.updateState('active')
return
}
await this.startLiveStream(streamData)
}
async handleStopCommand() {
if (this.liveStream) {
await this.stopLiveStream()
}
}
async startLiveStream(streamData) {
this.logInfo('Live stream WebRTC worker received start command')
try {
const cameraData = {
name: this.deviceName,
id: this.doorbotId
}
const streamConnection = new WebrtcConnection(streamData.ticket, cameraData)
this.liveStream = new StreamingSession(cameraData, streamConnection)
this.setupConnectionStateHandler()
await this.startTranscodingProcess(streamData.rtspPublishUrl)
this.setupCallEndedHandler()
} catch (error) {
this.handleStreamError(error)
}
}
setupConnectionStateHandler() {
this.liveStream.connection.pc.onConnectionState.subscribe(async (state) => {
switch(state) {
case 'connected':
this.updateState('active')
this.logInfo('Live stream WebRTC session is connected')
break
case 'failed':
this.updateState('failed')
this.logInfo('Live stream WebRTC connection has failed')
await this.handleConnectionFailure()
break
}
})
}
async handleConnectionFailure() {
this.liveStream.stop()
await new Promise(res => setTimeout(res, 2000))
this.liveStream = null
}
async startTranscodingProcess(rtspPublishUrl) {
this.logInfo('Live stream transcoding process is starting')
const transcodingConfig = {
input: [
'-use_wallclock_as_timestamps', '1',
'-itsoffset', '0.2',
'-probesize', '32K',
'-analyzeduration', '0',
],
audio: [
'-map', '0:a',
'-c:a:0', 'aac',
'-map', '0:a',
'-c:a:1', 'copy'
],
video: [
'-map', '0:v',
'-c:v', 'copy'
],
output: [
'-flags', '+global_header',
'-avioflags', 'direct',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
rtspPublishUrl
]}
await this.liveStream.startTranscoding(transcodingConfig)
this.logInfo('Live stream transcoding process has started')
}
setupCallEndedHandler() {
this.liveStream.onCallEnded.subscribe(() => {
this.logInfo('Live stream WebRTC session has disconnected')
this.updateState('inactive')
this.liveStream = null
});
}
async stopLiveStream() {
if (this.stopping) { return }
this.stopping = true
let stopTimeout = 10
this.liveStream.stop()
while (this.liveStream && stopTimeout > 0) {
await new Promise(res => setTimeout(res, 200))
stopTimeout -= 1
if (this.liveStream) {
this.logInfo('Live stream failed to stop on request, trying again...')
this.liveStream.stop()
} else {
this.logError('Live stream failed to stop on request, deleting anyway...')
this.updateState('inactive')
this.liveStream = null
}
}
this.stopping = false
}
handleStreamError(error) {
this.logError(error)
this.updateState('failed')
this.liveStream = null
}
// Helper methods for logging and state updates
logInfo(message) {
parentPort.postMessage({ type: 'log_info', data: message })
}
logError(message) {
parentPort.postMessage({ type: 'log_error', data: message })
}
updateState(state) {
parentPort.postMessage({ type: 'state', data: state })
}
}
// Initialize the worker
const worker = new LiveStreamWorker(workerData.deviceName, workerData.doorbotId)
export default worker

View File

@@ -1,14 +1,21 @@
## v5.9.0
The release introduces a new feature, the snapshot stream.
This release includes a new feature I refer to as the **Snapshot Stream**. This "psuedo-live" RTSP video stream with the goal to make it possible, even easy, to use Ring cameras, even battery powered ones, with video systems that require a 24x7 live stream, while not actually keeping a live stream, thus preserving all of the existing features of Ring camera motion notifications, while fully integrating with NVR tools such as Frigate.
It does this by producing an H.264/AVC video stream, running at 720p and 5 FPS, generated from standard Ring snapshots, however, when motion or ding events are detected, the code temporarily activates a live stream and graps images directly from the stream, producing a video that is usable for NVR tools. Please read more about the snapshot stream in the [Video Streaming](https://github.com/tsightler/ring-mqtt/wiki/Video-Streaming) section of the project wiki.
**Minor Enhancements**
- Significantly reduced live stream startup time, live streams now consistently start in <1.5 seconds on most hardware.
**Dependency Updates**
- chalk 5.4.0
- chalk 5.4.1
- ring-client-api 14.0.0-beta.0
- werift 0.21.12
## v5.8.0
The 5.8.x branch is focused primarily on cleaning up various portions of the code to improve long-term maintainability, there are no major features planned for this branch.
**Minor Enhancements**
- The web based authenticator has been completely re-written in this release. It now serves an singlem dynamic html document, from memory, properly displays error messages, more safely handles sensitive data such as passwords/2fa codes (information is deleted from session state immediately after it is submitted) and uses an updated theme that integrates more cleanly into the Home Assistant UI, including automatic support for light/dark mode.
- The web based authenticator has been completely re-written in this release. It now serves an singlem dynamic html document, from memory, properly displays error messages, more safely handles sensitive data such as passwords/2fa codes (information is deleted from session state immediately after it is submitted) and uses an updated theme that integrates more cleanly into the Home Assistant UI, including automatic support for light/dark mode.
**Dependency Updates**
- @homebridge/camera-utils 2.2.7

View File

@@ -73,7 +73,7 @@ logger() {
}
# Trap signals so that the MQTT command to stop the stream can be published on exit
trap stop INT TERM EXIT
trap stop INT TERM
logger "Sending command to activate ${type} stream ON-DEMAND"
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${command_topic}" -m "ON-DEMAND ${rtsp_pub_url}" &