Compare commits

...

7 Commits

Author SHA1 Message Date
Blake Blackshear
68bfa6010d skip frames in the capture thread instead 2020-04-19 10:07:27 -05:00
Blake Blackshear
a810c56811 expose frame time at each step of processing 2020-04-19 07:49:23 -05:00
Blake Blackshear
5333b8ae1b ensure the previous frame is deleted when the new one is stored 2020-04-10 07:05:07 -04:00
Blake Blackshear
2bc57d271c move ffmpeg capture to a separate thread and use a queue 2020-03-14 15:32:51 -05:00
Blake Blackshear
8507bbbb31 make object processor resilient to plasma failures 2020-03-13 16:35:58 -05:00
Blake Blackshear
b6fcb88e5c remove sharedarray references 2020-03-13 15:50:27 -05:00
Blake Blackshear
d3cd4afa65 handle various scenarios with external process failures 2020-03-09 21:12:19 -05:00
7 changed files with 275 additions and 142 deletions

View File

@@ -25,7 +25,6 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \
imutils \
scipy \
&& python3.7 -m pip install -U \
SharedArray \
Flask \
paho-mqtt \
PyYAML \

View File

@@ -110,13 +110,6 @@ cameras:
################
take_frame: 1
################
# The expected framerate for the camera. Frigate will try and ensure it maintains this framerate
# by dropping frames as necessary. Setting this lower than the actual framerate will allow frigate
# to process every frame at the expense of realtime processing.
################
fps: 5
################
# Configuration for the snapshots in the debug view and mqtt
################

View File

@@ -1,4 +1,7 @@
import os
import sys
import traceback
import signal
import cv2
import time
import datetime
@@ -12,7 +15,7 @@ import logging
from flask import Flask, Response, make_response, jsonify, request
import paho.mqtt.client as mqtt
from frigate.video import track_camera
from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
from frigate.object_processing import TrackedObjectProcessor
from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess
@@ -58,41 +61,71 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
WEB_PORT = CONFIG.get('web_port', 5000)
DEBUG = (CONFIG.get('debug', '0') == '1')
def start_plasma_store():
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL)
time.sleep(1)
rc = plasma_process.poll()
if rc is not None:
return None
return plasma_process
class CameraWatchdog(threading.Thread):
def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor):
def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process):
threading.Thread.__init__(self)
self.camera_processes = camera_processes
self.config = config
self.tflite_process = tflite_process
self.tracked_objects_queue = tracked_objects_queue
self.object_processor = object_processor
self.plasma_process = plasma_process
def run(self):
time.sleep(10)
while True:
# wait a bit before checking
time.sleep(30)
time.sleep(10)
# check the plasma process
rc = self.plasma_process.poll()
if rc != None:
print(f"plasma_process exited unexpectedly with {rc}")
self.plasma_process = start_plasma_store()
# check the detection process
if (self.tflite_process.detection_start.value > 0.0 and
datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10):
print("Detection appears to be stuck. Restarting detection process")
self.tflite_process.start_or_restart()
time.sleep(30)
elif not self.tflite_process.detect_process.is_alive():
print("Detection appears to have stopped. Restarting detection process")
self.tflite_process.start_or_restart()
# check the camera processes
for name, camera_process in self.camera_processes.items():
process = camera_process['process']
if not process.is_alive():
print(f"Process for {name} is not alive. Starting again...")
camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['skipped_fps'].value = 0.0
print(f"Track process for {name} is not alive. Starting again...")
camera_process['process_fps'].value = 0.0
camera_process['detection_fps'].value = 0.0
process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps']))
camera_process['read_start'].value = 0.0
process = mp.Process(target=track_camera, args=(name, self.config[name], GLOBAL_OBJECT_CONFIG, camera_process['frame_queue'],
camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['process_fps'], camera_process['detection_fps'],
camera_process['read_start'], camera_process['detection_frame']))
process.daemon = True
camera_process['process'] = process
process.start()
print(f"Camera_process started for {name}: {process.pid}")
print(f"Track process started for {name}: {process.pid}")
if not camera_process['capture_thread'].is_alive():
frame_shape = camera_process['frame_shape']
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'])
camera_capture.start()
camera_process['ffmpeg_process'] = ffmpeg_process
camera_process['capture_thread'] = camera_capture
def main():
# connect to mqtt and setup last will
@@ -117,14 +150,7 @@ def main():
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()
# start plasma store
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL)
time.sleep(1)
rc = plasma_process.poll()
if rc is not None:
raise RuntimeError("plasma_store exited unexpectedly with "
"code %d" % (rc,))
plasma_process = start_plasma_store()
##
# Setup config defaults for cameras
@@ -135,7 +161,7 @@ def main():
}
# Queue for cameras to push tracked objects to
tracked_objects_queue = mp.Queue()
tracked_objects_queue = mp.SimpleQueue()
# Start the shared tflite process
tflite_process = EdgeTPUProcess()
@@ -143,14 +169,56 @@ def main():
# start the camera processes
camera_processes = {}
for name, config in CONFIG['cameras'].items():
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
take_frame = config.get('take_frame', 1)
detection_frame = mp.Value('d', 0.0)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
frame_queue = mp.SimpleQueue()
camera_fps = EventsPerSecond()
camera_fps.start()
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame)
camera_capture.start()
camera_processes[name] = {
'fps': mp.Value('d', float(config['fps'])),
'skipped_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0)
'camera_fps': camera_fps,
'take_frame': take_frame,
'process_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0),
'detection_frame': detection_frame,
'read_start': mp.Value('d', 0.0),
'ffmpeg_process': ffmpeg_process,
'ffmpeg_cmd': ffmpeg_cmd,
'frame_queue': frame_queue,
'frame_shape': frame_shape,
'capture_thread': camera_capture
}
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
tflite_process.detection_queue, tracked_objects_queue,
camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps']))
camera_process = mp.Process(target=track_camera, args=(name, config, GLOBAL_OBJECT_CONFIG, frame_queue, frame_shape,
tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['process_fps'],
camera_processes[name]['detection_fps'],
camera_processes[name]['read_start'], camera_processes[name]['detection_frame']))
camera_process.daemon = True
camera_processes[name]['process'] = camera_process
@@ -161,7 +229,7 @@ def main():
object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)
object_processor.start()
camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor)
camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process)
camera_watchdog.start()
# create a flask app that encodes frames a mjpeg on demand
@@ -174,6 +242,23 @@ def main():
# return a healh
return "Frigate is running. Alive and healthy!"
@app.route('/debug/stack')
def processor_stack():
frame = sys._current_frames().get(object_processor.ident, None)
if frame:
return "<br>".join(traceback.format_stack(frame)), 200
else:
return "no frame found", 200
@app.route('/debug/print_stack')
def print_stack():
pid = int(request.args.get('pid', 0))
if pid == 0:
return "missing pid", 200
else:
os.kill(pid, signal.SIGUSR1)
return "check logs", 200
@app.route('/debug/stats')
def stats():
stats = {}
@@ -182,24 +267,32 @@ def main():
for name, camera_stats in camera_processes.items():
total_detection_fps += camera_stats['detection_fps'].value
capture_thread = camera_stats['capture_thread']
stats[name] = {
'fps': round(camera_stats['fps'].value, 2),
'skipped_fps': round(camera_stats['skipped_fps'].value, 2),
'detection_fps': round(camera_stats['detection_fps'].value, 2)
'camera_fps': round(capture_thread.fps.eps(), 2),
'process_fps': round(camera_stats['process_fps'].value, 2),
'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
'detection_fps': round(camera_stats['detection_fps'].value, 2),
'read_start': camera_stats['read_start'].value,
'pid': camera_stats['process'].pid,
'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
'frame_info': {
'read': capture_thread.current_frame,
'detect': camera_stats['detection_frame'].value,
'process': object_processor.camera_data[name]['current_frame_time']
}
}
stats['coral'] = {
'fps': round(total_detection_fps, 2),
'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
'detection_queue': tflite_process.detection_queue.qsize(),
'detection_start': tflite_process.detection_start.value
'detection_start': tflite_process.detection_start.value,
'pid': tflite_process.detect_process.pid
}
rc = plasma_process.poll()
rc = camera_watchdog.plasma_process.poll()
stats['plasma_store_rc'] = rc
stats['tracked_objects_queue'] = tracked_objects_queue.qsize()
return jsonify(stats)
@app.route('/<camera_name>/<label>/best.jpg')
@@ -244,7 +337,7 @@ def main():
app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
camera_watchdog.join()
object_processor.join()
plasma_process.terminate()

View File

@@ -3,11 +3,10 @@ import datetime
import hashlib
import multiprocessing as mp
import numpy as np
import SharedArray as sa
import pyarrow.plasma as plasma
import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond
from frigate.util import EventsPerSecond, listen
def load_labels(path, encoding='utf-8'):
"""Loads labels from file (with or without index numbers).
@@ -64,6 +63,7 @@ class ObjectDetector():
def run_detector(detection_queue, avg_speed, start):
print(f"Starting detection process: {os.getpid()}")
listen()
plasma_client = plasma.connect("/tmp/plasma")
object_detector = ObjectDetector()
@@ -87,7 +87,7 @@ def run_detector(detection_queue, avg_speed, start):
class EdgeTPUProcess():
def __init__(self):
self.detection_queue = mp.Queue()
self.detection_queue = mp.SimpleQueue()
self.avg_inference_speed = mp.Value('d', 0.01)
self.detection_start = mp.Value('d', 0.0)
self.detect_process = None

View File

@@ -1,6 +1,7 @@
import json
import hashlib
import datetime
import time
import copy
import cv2
import threading
@@ -8,9 +9,8 @@ import numpy as np
from collections import Counter, defaultdict
import itertools
import pyarrow.plasma as plasma
import SharedArray as sa
import matplotlib.pyplot as plt
from frigate.util import draw_box_with_label
from frigate.util import draw_box_with_label, PlasmaManager
from frigate.edgetpu import load_labels
PATH_TO_LABELS = '/labelmap.txt'
@@ -29,14 +29,15 @@ class TrackedObjectProcessor(threading.Thread):
self.client = client
self.topic_prefix = topic_prefix
self.tracked_objects_queue = tracked_objects_queue
self.plasma_client = plasma.connect("/tmp/plasma")
self.camera_data = defaultdict(lambda: {
'best_objects': {},
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
'tracked_objects': {},
'current_frame': np.zeros((720,1280,3), np.uint8),
'current_frame_time': 0.0,
'object_id': None
})
self.plasma_client = PlasmaManager()
def get_best(self, camera, label):
if label in self.camera_data[camera]['best_objects']:
@@ -55,14 +56,12 @@ class TrackedObjectProcessor(threading.Thread):
best_objects = self.camera_data[camera]['best_objects']
current_object_status = self.camera_data[camera]['object_status']
self.camera_data[camera]['tracked_objects'] = tracked_objects
self.camera_data[camera]['current_frame_time'] = frame_time
###
# Draw tracked objects on the frame
###
object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.plasma_client.get(object_id, timeout_ms=0)
current_frame = self.plasma_client.get(f"{camera}{frame_time}")
if not current_frame is plasma.ObjectNotAvailable:
# draw the bounding boxes on the frame
@@ -86,15 +85,14 @@ class TrackedObjectProcessor(threading.Thread):
cv2.putText(current_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
###
# Set the current frame as ready
# Set the current frame
###
self.camera_data[camera]['current_frame'] = current_frame
# store the object id, so you can delete it at the next loop
previous_object_id = self.camera_data[camera]['object_id']
if not previous_object_id is None:
self.plasma_client.delete([previous_object_id])
self.camera_data[camera]['object_id'] = object_id
# delete the previous frame from the plasma store and update the object id
if not self.camera_data[camera]['object_id'] is None:
self.plasma_client.delete(self.camera_data[camera]['object_id'])
self.camera_data[camera]['object_id'] = f"{camera}{frame_time}"
###
# Maintain the highest scoring recent object and frame for each label
@@ -146,4 +144,4 @@ class TrackedObjectProcessor(threading.Thread):
ret, jpg = cv2.imencode('.jpg', best_frame)
if ret:
jpg_bytes = jpg.tobytes()
self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True)
self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True)

View File

@@ -1,9 +1,14 @@
import datetime
import time
import signal
import traceback
import collections
import numpy as np
import cv2
import threading
import matplotlib.pyplot as plt
import hashlib
import pyarrow.plasma as plasma
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None:
@@ -127,3 +132,52 @@ class EventsPerSecond:
now = datetime.datetime.now().timestamp()
seconds = min(now-self._start, last_n_seconds)
return len([t for t in self._timestamps if t > (now-last_n_seconds)]) / seconds
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
class PlasmaManager:
def __init__(self):
self.connect()
def connect(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get(self, name, timeout_ms=0):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=timeout_ms)
except:
self.connect()
time.sleep(1)
def put(self, name, obj):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.put(obj, object_id)
return
except Exception as e:
print(f"Failed to put in plasma: {e}")
self.connect()
time.sleep(1)
def delete(self, name):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.delete([object_id])
return
except:
self.connect()
time.sleep(1)

View File

@@ -5,17 +5,15 @@ import cv2
import queue
import threading
import ctypes
import pyarrow.plasma as plasma
import multiprocessing as mp
import subprocess as sp
import numpy as np
import hashlib
import pyarrow.plasma as plasma
import SharedArray as sa
import copy
import itertools
import json
from collections import defaultdict
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, PlasmaManager
from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector
@@ -104,33 +102,76 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
ffmpeg_process.terminate()
try:
print("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.wait(timeout=30)
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.wait()
ffmpeg_process.communicate()
ffmpeg_process = None
print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd))
return sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10)
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
return process
def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps):
class CameraCapture(threading.Thread):
def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame):
threading.Thread.__init__(self)
self.name = name
self.frame_shape = frame_shape
self.frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
self.frame_queue = frame_queue
self.take_frame = take_frame
self.fps = fps
self.skipped_fps = EventsPerSecond()
self.plasma_client = PlasmaManager()
self.ffmpeg_process = ffmpeg_process
self.current_frame = 0
self.last_frame = 0
self.detection_frame = detection_frame
def run(self):
frame_num = 0
self.skipped_fps.start()
while True:
if self.ffmpeg_process.poll() != None:
print(f"{self.name}: ffmpeg process is not running. exiting capture thread...")
break
frame_bytes = self.ffmpeg_process.stdout.read(self.frame_size)
self.current_frame = datetime.datetime.now().timestamp()
if len(frame_bytes) == 0:
print(f"{self.name}: ffmpeg didnt return a frame. something is wrong.")
continue
self.fps.update()
frame_num += 1
if (frame_num % self.take_frame) != 0:
self.skipped_fps.update()
continue
# if the detection process is more than 1 second behind, skip this frame
if self.detection_frame.value > 0.0 and (self.last_frame - self.detection_frame.value) > 1:
self.skipped_fps.update()
continue
# put the frame in the plasma store
self.plasma_client.put(f"{self.name}{self.current_frame}",
np
.frombuffer(frame_bytes, np.uint8)
.reshape(self.frame_shape)
)
# add to the queue
self.frame_queue.put(self.current_frame)
self.last_frame = self.current_frame
def track_camera(name, config, global_objects_config, frame_queue, frame_shape, detection_queue, detected_objects_queue, fps, detection_fps, read_start, detection_frame):
print(f"Starting process for {name}: {os.getpid()}")
listen()
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', ffmpeg_global_config['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
detection_frame.value = 0.0
# Merge the tracked object config with the global config
camera_objects_config = config.get('objects', {})
@@ -144,22 +185,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
for obj in objects_with_config:
object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
expected_fps = config['fps']
take_frame = config.get('take_frame', 1)
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
try:
sa.delete(name)
except:
pass
frame = sa.create(name, shape=frame_shape, dtype=np.uint8)
frame = np.zeros(frame_shape, np.uint8)
# load in the mask for object detection
if 'mask' in config:
@@ -175,60 +201,33 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue)
object_tracker = ObjectTracker(10)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
plasma_client = plasma.connect("/tmp/plasma")
frame_num = 0
plasma_client = PlasmaManager()
avg_wait = 0.0
fps_tracker = EventsPerSecond()
skipped_fps_tracker = EventsPerSecond()
fps_tracker.start()
skipped_fps_tracker.start()
object_detector.fps.start()
while True:
start = datetime.datetime.now().timestamp()
frame_bytes = ffmpeg_process.stdout.read(frame_size)
duration = datetime.datetime.now().timestamp()-start
read_start.value = datetime.datetime.now().timestamp()
frame_time = frame_queue.get()
duration = datetime.datetime.now().timestamp()-read_start.value
read_start.value = 0.0
avg_wait = (avg_wait*99+duration)/100
detection_frame.value = frame_time
# Get frame from plasma store
frame = plasma_client.get(f"{name}{frame_time}")
if not frame_bytes:
rc = ffmpeg_process.poll()
if rc is not None:
print(f"{name}: ffmpeg_process exited unexpectedly with {rc}")
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process)
time.sleep(10)
else:
print(f"{name}: ffmpeg_process is still running but didnt return any bytes")
continue
# limit frame rate
frame_num += 1
if (frame_num % take_frame) != 0:
if frame is plasma.ObjectNotAvailable:
continue
fps_tracker.update()
fps.value = fps_tracker.eps()
detection_fps.value = object_detector.fps.eps()
frame_time = datetime.datetime.now().timestamp()
# Store frame in numpy array
frame[:] = (np
.frombuffer(frame_bytes, np.uint8)
.reshape(frame_shape))
# look for motion
motion_boxes = motion_detector.detect(frame)
# skip object detection if we are below the min_fps and wait time is less than half the average
if frame_num > 100 and fps.value < expected_fps-1 and duration < 0.5*avg_wait:
skipped_fps_tracker.update()
skipped_fps.value = skipped_fps_tracker.eps()
continue
skipped_fps.value = skipped_fps_tracker.eps()
tracked_objects = object_tracker.tracked_objects.values()
# merge areas of motion that intersect with a known tracked object into a single area to look at
@@ -328,7 +327,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
for index in idxs:
obj = group[index[0]]
if clipped(obj, frame_shape): #obj['clipped']:
if clipped(obj, frame_shape):
box = obj[2]
# calculate a new region that will hopefully get the entire object
region = calculate_region(frame_shape,
@@ -368,9 +367,6 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
# now that we have refined our detections, we need to track objects
object_tracker.match_and_update(frame_time, detections)
# put the frame in the plasma store
object_id = hashlib.sha1(str.encode(f"{name}{frame_time}")).digest()
plasma_client.put(frame, plasma.ObjectID(object_id))
# add to the queue
detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects))