mirror of
https://github.com/tsightler/ring-mqtt.git
synced 2025-09-26 21:01:12 +08:00
Use MJPEG for livesnaps
This commit is contained in:
@@ -6,7 +6,6 @@ export class SnapshotStream {
|
||||
constructor(device) {
|
||||
this.mqttCamera = device
|
||||
this.status = 'inactive'
|
||||
this.interval = null
|
||||
this.livesnaps = {
|
||||
active: false,
|
||||
session: false,
|
||||
@@ -16,12 +15,9 @@ export class SnapshotStream {
|
||||
active: false,
|
||||
session: false
|
||||
}
|
||||
this.rtsp = {
|
||||
active: false,
|
||||
session: false
|
||||
}
|
||||
this.snapshot = {
|
||||
active: false,
|
||||
interval: false,
|
||||
session: false
|
||||
}
|
||||
}
|
||||
@@ -44,61 +40,40 @@ export class SnapshotStream {
|
||||
}
|
||||
|
||||
async startSnapshotStream(rtspPublishUrl) {
|
||||
this.rtsp.session = spawn(pathToFfmpeg, [
|
||||
'-f', 'mpegts',
|
||||
'-probesize', '32k',
|
||||
'-analyzeduration', '0',
|
||||
'-i', 'pipe:',
|
||||
'-ss', '.2',
|
||||
'-c:v', 'copy',
|
||||
'-avioflags', 'direct',
|
||||
'-f', 'rtsp',
|
||||
'-rtsp_transport', 'tcp',
|
||||
rtspPublishUrl
|
||||
])
|
||||
|
||||
// Handle process exit
|
||||
this.rtsp.session.on('close', () => {
|
||||
this.mqttCamera.debug('Snapshot stream transcoding session has ended')
|
||||
this.stop()
|
||||
})
|
||||
|
||||
// Return a promise that resolves when the process is ready
|
||||
return new Promise((resolve, reject) => {
|
||||
this.rtsp.session.on('spawn', async () => {
|
||||
this.snapshot.session = spawn(pathToFfmpeg, [
|
||||
'-hide_banner',
|
||||
'-f', 'image2pipe',
|
||||
'-probesize', '32k',
|
||||
'-analyzeduration', '0',
|
||||
'-i', 'pipe:',
|
||||
'-vf', 'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
|
||||
'-sws_flags', 'lanczos',
|
||||
'-c:v', 'libx264',
|
||||
'-b:v', '6M',
|
||||
'-r', '5',
|
||||
'-g', '1',
|
||||
'-preset', 'ultrafast',
|
||||
'-tune', 'zerolatency',
|
||||
'-avioflags', 'direct',
|
||||
'-f', 'mpegts',
|
||||
'pipe:1'
|
||||
])
|
||||
this.snapshot.session = spawn(pathToFfmpeg, [
|
||||
'-f', 'image2pipe',
|
||||
'-probesize', '32k',
|
||||
'-analyzeduration', '0',
|
||||
'-i', 'pipe:',
|
||||
'-vf', 'fps=5,scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
|
||||
'-c:v', 'libx264',
|
||||
'-b:v', '6M',
|
||||
'-r', '5',
|
||||
'-g', '1',
|
||||
'-preset', 'ultrafast',
|
||||
'-tune', 'zerolatency',
|
||||
'-avioflags', 'direct',
|
||||
'-f', 'rtsp',
|
||||
'-rtsp_transport', 'tcp',
|
||||
rtspPublishUrl
|
||||
])
|
||||
|
||||
this.snapshot.session.on('spawn', async () => {
|
||||
this.startSnapshotInterval()
|
||||
this.mqttCamera.debug('Snapshot stream transcoding session has started')
|
||||
this.status = 'active'
|
||||
this.mqttCamera.publishStreamState()
|
||||
resolve()
|
||||
})
|
||||
|
||||
this.snapshot.session.stdout.once('data', () => {
|
||||
this.snapshot.session.stdout.pipe(this.rtsp.session.stdin)
|
||||
})
|
||||
this.snapshot.session.on('spawn', async () => {
|
||||
this.startSnapshotInterval()
|
||||
this.mqttCamera.debug('Snapshot stream transcoding session has started')
|
||||
this.status = 'active'
|
||||
this.mqttCamera.publishStreamState()
|
||||
resolve()
|
||||
})
|
||||
|
||||
this.rtsp.session.on('error', reject)
|
||||
this.snapshot.session.on('close', () => {
|
||||
this.mqttCamera.debug('Snapshot stream transcoding session has ended')
|
||||
this.stop()
|
||||
})
|
||||
|
||||
this.snapshot.session.on('error', reject)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@@ -120,6 +95,7 @@ export class SnapshotStream {
|
||||
this.keepalive.session.on('spawn', async () => {
|
||||
this.keepalive.active = true
|
||||
|
||||
const mjpegParser = new MjpegParser()
|
||||
const liveStream = this.mqttCamera.streams.live
|
||||
const liveStreamStartTimeout = Date.now() + 5000
|
||||
while (!liveStream.altVideoData && Date.now() < liveStreamStartTimeout) {
|
||||
@@ -129,25 +105,15 @@ export class SnapshotStream {
|
||||
if (liveStream.altVideoData) {
|
||||
this.livesnaps.session = spawn(pathToFfmpeg, [
|
||||
'-hide_banner',
|
||||
'-protocol_whitelist', 'pipe,udp,rtp,fd,file,crypto',
|
||||
'-fflags', 'nobuffer',
|
||||
'-flags', 'low_delay',
|
||||
'-use_wallclock_as_timestamps', '1',
|
||||
'-itsoffset', '-0.2',
|
||||
'-protocol_whitelist', 'pipe,udp,rtp,file,crypto',
|
||||
'-probesize', '32K',
|
||||
'-analyzeduration', '0',
|
||||
'-f', 'sdp',
|
||||
'-i', 'pipe:',
|
||||
'-vf', 'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
|
||||
'-sws_flags', 'lanczos',
|
||||
'-c:v', 'libx264',
|
||||
'-b:v', '6M',
|
||||
'-preset', 'ultrafast',
|
||||
'-tune', 'zerolatency',
|
||||
'-r', '5',
|
||||
'-g', '1',
|
||||
'-avioflags', 'direct',
|
||||
'-f', 'mpegts',
|
||||
'-vf', 'fps=5,scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2',
|
||||
'-c:v', 'mjpeg',
|
||||
'-q:v', '3',
|
||||
'-f', 'image2pipe',
|
||||
'pipe:1'
|
||||
])
|
||||
|
||||
@@ -157,16 +123,16 @@ export class SnapshotStream {
|
||||
this.livesnaps.session.stdin.end()
|
||||
})
|
||||
|
||||
this.livesnaps.session.stdout.once('data', () => {
|
||||
this.snapshot.session.stdout.unpipe(this.rtsp.session.stdin)
|
||||
this.livesnaps.session.stdout.pipe(this.rtsp.session.stdin)
|
||||
this.livesnaps.session.stdout.on('data', (chunk) => {
|
||||
const jpegImage = mjpegParser.processChunk(chunk)
|
||||
if (jpegImage) {
|
||||
this.livesnaps.image = jpegImage
|
||||
}
|
||||
})
|
||||
|
||||
this.livesnaps.session.on('close', async () => {
|
||||
this.mqttCamera.debug('The live snapshot stream has stopped')
|
||||
this.mqttCamera.updateSnapshot('interval')
|
||||
this.livesnaps.session.stdout.unpipe(this.rtsp.session.stdin)
|
||||
this.snapshot.session.stdout.pipe(this.rtsp.session.stdin)
|
||||
Object.assign(this.livesnaps, { active: false, session: false, image: false })
|
||||
})
|
||||
|
||||
@@ -195,10 +161,10 @@ export class SnapshotStream {
|
||||
}
|
||||
|
||||
startSnapshotInterval() {
|
||||
this.interval = setInterval(async () => {
|
||||
this.snapshot.interval = setInterval(async () => {
|
||||
if (this.status === 'active') {
|
||||
try {
|
||||
this.snapshot.session.stdin.write(this.mqttCamera.data.snapshot.image)
|
||||
this.snapshot.session.stdin.write(this.livesnaps.image || this.mqttCamera.data.snapshot.image)
|
||||
} catch {
|
||||
this.mqttCamera.debug('Writing image to snapshot stream failed')
|
||||
this.stop()
|
||||
@@ -210,22 +176,22 @@ export class SnapshotStream {
|
||||
}
|
||||
|
||||
async stop() {
|
||||
clearInterval(this.interval)
|
||||
this.interval = null
|
||||
clearInterval(this.snapshot.interval)
|
||||
this.snapshot.interval = false
|
||||
this.status = 'inactive'
|
||||
this.mqttCamera.publishStreamState()
|
||||
|
||||
if (!this.rtsp.session) return
|
||||
if (!this.snapshot.session) return
|
||||
|
||||
const rtspSession = this.rtsp.session
|
||||
this.rtsp.session = false
|
||||
const oldSession = this.snapshot.session
|
||||
this.snapshot.session = false
|
||||
|
||||
try {
|
||||
rtspSession.stdin.end()
|
||||
oldSession.stdin.end()
|
||||
await Promise.race([
|
||||
new Promise((resolve) => {
|
||||
rtspSession.stdin.once('finish', () => {
|
||||
rtspSession.kill()
|
||||
oldSession.stdin.once('finish', () => {
|
||||
oldSession.kill()
|
||||
resolve()
|
||||
})
|
||||
}),
|
||||
@@ -236,7 +202,44 @@ export class SnapshotStream {
|
||||
})
|
||||
])
|
||||
} catch {
|
||||
rtspSession.kill('SIGKILL')
|
||||
oldSession.kill('SIGKILL')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MjpegParser {
|
||||
constructor() {
|
||||
this.buffer = Buffer.alloc(0)
|
||||
this.frameStart = null
|
||||
}
|
||||
|
||||
processChunk(chunk) {
|
||||
this.buffer = Buffer.concat([this.buffer, chunk])
|
||||
|
||||
if (this.frameStart === null) {
|
||||
for (let i = 0; i < this.buffer.length - 1; i++) {
|
||||
if (this.buffer[i] === 0xFF && this.buffer[i + 1] === 0xD8) {
|
||||
this.frameStart = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if (this.frameStart === null) return null
|
||||
}
|
||||
|
||||
for (let i = this.frameStart + 2; i < this.buffer.length - 1; i++) {
|
||||
if (this.buffer[i] === 0xFF && this.buffer[i + 1] === 0xD9) {
|
||||
const frame = this.buffer.slice(this.frameStart, i + 2)
|
||||
this.buffer = this.buffer.slice(i + 2)
|
||||
this.frameStart = null
|
||||
return frame
|
||||
}
|
||||
}
|
||||
|
||||
if (this.buffer.length > 1000000) {
|
||||
this.buffer = Buffer.alloc(0)
|
||||
this.frameStart = null
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user