From 700bd1e3efb59c1b01bebab3b8344e24aebcf657 Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Wed, 17 Jul 2019 22:25:59 -0500 Subject: [PATCH] use a thread to capture frames from the subprocess so it can be killed properly --- frigate/video.py | 175 +++++++++++++++++++++++++---------------------- 1 file changed, 92 insertions(+), 83 deletions(-) diff --git a/frigate/video.py b/frigate/video.py index 5581e5e1e..09f464f82 100644 --- a/frigate/video.py +++ b/frigate/video.py @@ -12,60 +12,6 @@ from . object_detection import FramePrepper from . objects import ObjectCleaner, BestPersonFrame from . mqtt import MqttObjectPublisher -# fetch the frames as fast a possible and store current frame in a shared memory array -def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape, rtsp_url, take_frame=1, ffmpeg_hwaccel_args=[]): - # convert shared memory array into numpy and shape into image array - arr = tonumpyarray(shared_arr).reshape(frame_shape) - frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2] - - ffmpeg_global_args = [ - '-hide_banner', '-loglevel', 'panic' - ] - ffmpeg_input_args = [ - '-avoid_negative_ts', 'make_zero', - '-fflags', 'nobuffer', - '-flags', 'low_delay', - '-strict', 'experimental', - '-fflags', '+genpts', - '-rtsp_transport', 'tcp', - '-stimeout', '5000000', - '-use_wallclock_as_timestamps', '1' - ] - - ffmpeg_cmd = (['ffmpeg'] + - ffmpeg_global_args + - ffmpeg_hwaccel_args + - ffmpeg_input_args + - ['-i', rtsp_url, - '-f', 'rawvideo', - '-pix_fmt', 'rgb24', - 'pipe:']) - - print(" ".join(ffmpeg_cmd)) - - pipe = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size) - - frame_num = 0 - while True: - raw_image = pipe.stdout.read(frame_size) - frame_num += 1 - if (frame_num % take_frame) != 0: - continue - frame = ( - np - .frombuffer(raw_image, np.uint8) - .reshape(frame_shape) - ) - - with frame_lock: - shared_frame_time.value = datetime.datetime.now().timestamp() - arr[:] = frame - # Notify with the condition that a new frame is ready - with frame_ready: - frame_ready.notify_all() - - pipe.stdout.flush() - # Stores 2 seconds worth of frames when motion is detected so they can be used for other threads class FrameTracker(threading.Thread): def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames): @@ -125,13 +71,48 @@ class CameraWatchdog(threading.Thread): while True: # wait a bit before checking - time.sleep(60) + time.sleep(10) - if (datetime.datetime.now().timestamp() - self.camera.shared_frame_time.value) > 2: + if (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > 2: print("last frame is more than 2 seconds old, restarting camera capture...") self.camera.start_or_restart_capture() time.sleep(5) +# Thread to read the stdout of the ffmpeg process and update the current frame +class CameraCapture(threading.Thread): + def __init__(self, camera): + threading.Thread.__init__(self) + self.camera = camera + + def run(self): + frame_num = 0 + while True: + if self.camera.ffmpeg_process.poll() != None: + print("ffmpeg process is not running. exiting capture thread...") + break + + raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size) + + if len(raw_image) == 0: + print("ffmpeg didnt return a frame. something is wrong. exiting capture thread...") + break + + frame_num += 1 + if (frame_num % self.camera.take_frame) != 0: + continue + + with self.camera.frame_lock: + self.camera.frame_time.value = datetime.datetime.now().timestamp() + + self.camera.current_frame[:] = ( + np + .frombuffer(raw_image, np.uint8) + .reshape(self.camera.frame_shape) + ) + # Notify with the condition that a new frame is ready + with self.camera.frame_ready: + self.camera.frame_ready.notify_all() + class Camera: def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix): self.name = name @@ -143,15 +124,14 @@ class Camera: self.ffmpeg_hwaccel_args = self.config.get('ffmpeg_hwaccel_args', []) self.regions = self.config['regions'] self.frame_shape = get_frame_shape(self.rtsp_url) + self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2] self.mqtt_client = mqtt_client self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name) - # compute the flattened array length from the shape of the frame - flat_array_length = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2] - # create shared array for storing the full frame image data - self.shared_frame_array = mp.Array(ctypes.c_uint8, flat_array_length) + # create a numpy array for the current frame in initialize to zeros + self.current_frame = np.zeros(self.frame_shape, np.uint8) # create shared value for storing the frame_time - self.shared_frame_time = mp.Value('d', 0.0) + self.frame_time = mp.Value('d', 0.0) # Lock to control access to the frame self.frame_lock = mp.Lock() # Condition for notifying that a new frame is ready @@ -159,10 +139,8 @@ class Camera: # Condition for notifying that objects were parsed self.objects_parsed = mp.Condition() - # shape current frame so it can be treated as a numpy image - self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape) - - self.capture_process = None + self.ffmpeg_process = None + self.capture_thread = None # for each region, create a separate thread to resize the region and prep for detection self.detection_prep_threads = [] @@ -175,8 +153,8 @@ class Camera: region['threshold'] = 0.5 self.detection_prep_threads.append(FramePrepper( self.name, - self.shared_frame_np, - self.shared_frame_time, + self.current_frame, + self.frame_time, self.frame_ready, self.frame_lock, region['size'], region['x_offset'], region['y_offset'], region['threshold'], @@ -184,7 +162,7 @@ class Camera: )) # start a thread to store recent motion frames for processing - self.frame_tracker = FrameTracker(self.shared_frame_np, self.shared_frame_time, + self.frame_tracker = FrameTracker(self.current_frame, self.frame_time, self.frame_ready, self.frame_lock, self.recent_frames) self.frame_tracker.start() @@ -215,20 +193,51 @@ class Camera: def start_or_restart_capture(self): - if not self.capture_process is None: - print("Terminating the existing capture process...") - self.capture_process.terminate() - del self.capture_process - self.capture_process = None + if not self.ffmpeg_process is None: + print("Killing the existing ffmpeg process...") + self.ffmpeg_process.kill() + self.ffmpeg_process.wait() + print("Waiting for the capture thread to exit...") + self.capture_thread.join() + self.ffmpeg_process = None + self.capture_thread = None # create the process to capture frames from the RTSP stream and store in a shared array - print("Creating a new capture process...") - self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array, - self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, - self.rtsp_url, self.take_frame, self.ffmpeg_hwaccel_args)) - self.capture_process.daemon = True - print("Starting a new capture process...") - self.capture_process.start() + print("Creating a new ffmpeg process...") + self.start_ffmpeg() + + print("Creating a new capture thread...") + self.capture_thread = CameraCapture(self) + print("Starting a new capture thread...") + self.capture_thread.start() + + def start_ffmpeg(self): + ffmpeg_global_args = [ + '-hide_banner', '-loglevel', 'panic' + ] + ffmpeg_input_args = [ + '-avoid_negative_ts', 'make_zero', + '-fflags', 'nobuffer', + '-flags', 'low_delay', + '-strict', 'experimental', + '-fflags', '+genpts', + '-rtsp_transport', 'tcp', + '-stimeout', '5000000', + '-use_wallclock_as_timestamps', '1' + ] + + ffmpeg_cmd = (['ffmpeg'] + + ffmpeg_global_args + + self.ffmpeg_hwaccel_args + + ffmpeg_input_args + + ['-i', self.rtsp_url, + '-f', 'rawvideo', + '-pix_fmt', 'rgb24', + 'pipe:']) + + print(" ".join(ffmpeg_cmd)) + + self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size) def start(self): self.start_or_restart_capture() @@ -238,10 +247,10 @@ class Camera: self.watchdog.start() def join(self): - self.capture_process.join() + self.capture_thread.join() def get_capture_pid(self): - return self.capture_process.pid + return self.ffmpeg_process.pid def add_objects(self, objects): if len(objects) == 0: @@ -291,7 +300,7 @@ class Camera: detected_objects = self.detected_objects.copy() # lock and make a copy of the current frame with self.frame_lock: - frame = self.shared_frame_np.copy() + frame = self.current_frame.copy() # draw the bounding boxes on the screen for obj in detected_objects: