From 1cdb7810bf39a95f6e0799aee8d08ef217415fe3 Mon Sep 17 00:00:00 2001 From: Brett Date: Sun, 4 Aug 2024 16:45:46 -0500 Subject: [PATCH] New PreviewManager to handle generating previews asynchronously (#86) * Add PreviewManager * Refactoring and better error handling * Integrate PreviewManager into api_server.py * Integrate PreviewManager into distributed_job_manager.py * Add method to preview_manager.py to delete previews and integrate it into api_server * Misc logging improvements * Misc code cleanup * Replace existing preview on job completion - Minor code fixes --- src/api/api_server.py | 168 +++++++++++++++----------------- src/api/preview_manager.py | 109 +++++++++++++++++++++ src/distributed_job_manager.py | 12 ++- src/engines/core/base_worker.py | 6 +- src/render_queue.py | 6 +- 5 files changed, 203 insertions(+), 98 deletions(-) create mode 100644 src/api/preview_manager.py diff --git a/src/api/api_server.py b/src/api/api_server.py index 2900360..d358874 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -19,17 +19,17 @@ import yaml from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project +from src.api.preview_manager import PreviewManager from src.api.serverproxy_manager import ServerProxyManager from src.distributed_job_manager import DistributedJobManager from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError +from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ current_system_os_version, num_to_alphanumeric -from src.utilities.server_helper import generate_thumbnail_for_job from src.utilities.zeroconf_server import ZeroconfServer -from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark logger = logging.getLogger() server = Flask(__name__) @@ -85,43 +85,31 @@ def long_polling_jobs(): @server.route('/api/job//thumbnail') def job_thumbnail(job_id): - big_thumb = request.args.get('size', False) == "big" - video_ok = request.args.get('video_ok', False) - found_job = RenderQueue.job_with_id(job_id, none_ok=True) - if found_job: - os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True) - thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') - thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') - big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4') - big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg') + try: + big_thumb = request.args.get('size', False) == "big" + video_ok = request.args.get('video_ok', False) + found_job = RenderQueue.job_with_id(job_id, none_ok=False) - # generate regular thumb if it doesn't exist - if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \ - found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: - generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240) + # trigger a thumbnail update - just in case + PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) + previews = PreviewManager.get_previews_for_job(found_job) + all_previews_list = previews.get('output', previews.get('input', [])) - # generate big thumb if it doesn't exist - if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \ - found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: - generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800) + video_previews = [x for x in all_previews_list if x['kind'] == 'video'] + image_previews = [x for x in all_previews_list if x['kind'] == 'image'] + filtered_list = video_previews if video_previews and video_ok else image_previews - # generated videos - if video_ok: - if big_thumb and os.path.exists(big_video_path) and not os.path.exists( - big_video_path + '_IN-PROGRESS'): - return send_file(big_video_path, mimetype="video/mp4") - elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): - return send_file(thumb_video_path, mimetype="video/mp4") - - # Generated thumbs - if big_thumb and os.path.exists(big_image_path): - return send_file(big_image_path, mimetype='image/jpeg') - elif os.path.exists(thumb_image_path): - return send_file(thumb_image_path, mimetype='image/jpeg') - - return found_job.status.value, 200 - return found_job.status.value, 404 + # todo - sort by size or other metrics here + if filtered_list: + preview_to_send = filtered_list[0] + mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} + file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') + return send_file(preview_to_send['filename'], mimetype=file_mime_type) + except Exception as e: + logger.exception(f'Error getting thumbnail: {e}') + return f'Error getting thumbnail: {e}', 500 + return "No thumbnail available", 404 # Get job file routing @@ -305,14 +293,7 @@ def delete_job(job_id): if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) - # Remove any thumbnails - for filename in os.listdir(server.config['THUMBS_FOLDER']): - if job_id in filename: - os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename)) - - thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') - if os.path.exists(thumb_path): - os.remove(thumb_path) + PreviewManager.delete_previews_for_job(found_job) # See if we own the project_dir (i.e. was it uploaded) project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) @@ -495,54 +476,59 @@ def start_server(): RenderQueue.evaluate_queue() time.sleep(delay_sec) - # get hostname - local_hostname = socket.gethostname() - local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") - - # load flask settings - server.config['HOSTNAME'] = local_hostname - server.config['PORT'] = int(Config.port_number) - server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) - server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs')) - server.config['MAX_CONTENT_PATH'] = Config.max_content_path - server.config['enable_split_jobs'] = Config.enable_split_jobs - - # Setup directory for saving engines to - EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder), - 'engines'))) - os.makedirs(EngineManager.engines_path, exist_ok=True) - - # Debug info - logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}") - logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}") - logger.debug(f"Engines directory: {EngineManager.engines_path}") - - # disable most Flask logging - flask_log = logging.getLogger('werkzeug') - flask_log.setLevel(Config.flask_log_level.upper()) - - # check for updates for render engines if configured or on first launch - if Config.update_engines_on_launch or not EngineManager.get_engines(): - EngineManager.update_all_engines() - - # Set up the RenderQueue object - RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER']) - ServerProxyManager.subscribe_to_listener() - DistributedJobManager.subscribe_to_listener() - - thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True) - thread.start() - - logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") - ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) - ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(), - 'system_os': current_system_os(), - 'system_os_version': current_system_os_version()} - ZeroconfServer.start() - try: - server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, - use_reloader=False, threaded=True) + # get hostname + local_hostname = socket.gethostname() + local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") + + # load flask settings + server.config['HOSTNAME'] = local_hostname + server.config['PORT'] = int(Config.port_number) + server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) + server.config['MAX_CONTENT_PATH'] = Config.max_content_path + server.config['enable_split_jobs'] = Config.enable_split_jobs + + # Setup storage directories + EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder), + 'engines'))) + os.makedirs(EngineManager.engines_path, exist_ok=True) + PreviewManager.storage_path = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'previews')) + + server.config['THUMBS_FOLDER'] = PreviewManager.storage_path # todo: remove this + + # Debug info + logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}") + logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") + logger.debug(f"Engines directory: {EngineManager.engines_path}") + + # disable most Flask logging + flask_log = logging.getLogger('werkzeug') + flask_log.setLevel(Config.flask_log_level.upper()) + + # check for updates for render engines if configured or on first launch + if Config.update_engines_on_launch or not EngineManager.get_engines(): + EngineManager.update_all_engines() + + # Set up the RenderQueue object + RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER']) + ServerProxyManager.subscribe_to_listener() + DistributedJobManager.subscribe_to_listener() + + thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True) + thread.start() + + logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") + ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) + ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(), + 'system_os': current_system_os(), + 'system_os_version': current_system_os_version()} + ZeroconfServer.start() + + try: + server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, + use_reloader=False, threaded=True) + finally: + RenderQueue.save_state() + finally: - RenderQueue.save_state() ZeroconfServer.stop() diff --git a/src/api/preview_manager.py b/src/api/preview_manager.py new file mode 100644 index 0000000..9a9b2d8 --- /dev/null +++ b/src/api/preview_manager.py @@ -0,0 +1,109 @@ +import logging +import os +import subprocess +import threading +from pathlib import Path + +from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame + +logger = logging.getLogger() +supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv'] +supported_image_formats = ['.jpg', '.png', '.exr', '.tif'] + + +class PreviewManager: + + storage_path = None + _running_jobs = {} + + @classmethod + def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320): + + # Determine best source file to use for thumbs + job_file_list = job.file_list() + source_files = job_file_list if job_file_list else [job.input_path] + preview_label = "output" if job_file_list else "input" + + # filter by type + found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats] + found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats] + + # check if we even have any valid files to work from + if source_files and not found_video_files and not found_image_files: + logger.warning(f"No valid image or video files found in files from job: {job}") + return + + base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") + preview_video_path = base_path + '.mp4' + preview_image_path = base_path + '.jpg' + + if replace_existing: + for x in [preview_image_path, preview_video_path]: + try: + os.remove(x) + except OSError: + pass + + # Generate image previews + if (found_video_files or found_image_files) and not os.path.exists(preview_image_path): + try: + path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1] + logger.debug(f"Generating image preview for {path_of_source}") + save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width) + logger.debug(f"Successfully created image preview for {path_of_source}") + except Exception as e: + logger.error(f"Error generating image preview for {job}: {e}") + + # Generate video previews + if found_video_files and not os.path.exists(preview_video_path): + try: + path_of_source = found_video_files[0] + logger.debug(f"Generating video preview for {path_of_source}") + generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width) + logger.debug(f"Successfully created video preview for {path_of_source}") + except subprocess.CalledProcessError as e: + logger.error(f"Error generating video preview for {job}: {e}") + + @classmethod + def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): + job_thread = cls._running_jobs.get(job.id) + if job_thread and job_thread.is_alive(): + logger.debug(f'Preview generation job already running for {job}') + else: + job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,)) + job_thread.start() + cls._running_jobs[job.id] = job_thread + + if wait_until_completion: + job_thread.join(timeout=timeout) + + @classmethod + def get_previews_for_job(cls, job): + + directory_path = Path(cls.storage_path) + preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] + + results = {} + for preview_filename in preview_files_for_job: + try: + pixel_width = str(preview_filename).split('-')[-1] + preview_label = str(os.path.basename(preview_filename)).split('-')[1] + extension = os.path.splitext(preview_filename)[-1].lower() + kind = 'video' if extension in supported_video_formats else \ + 'image' if extension in supported_image_formats else 'unknown' + results[preview_label] = results.get(preview_label, []) + results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind}) + except IndexError: # ignore invalid filenames + pass + return results + + @classmethod + def delete_previews_for_job(cls, job): + all_previews = cls.get_previews_for_job(job) + flattened_list = [item for sublist in all_previews.values() for item in sublist] + for preview in flattened_list: + try: + logger.debug(f"Removing preview: {preview['filename']}") + os.remove(preview['filename']) + except OSError as e: + logger.error(f"Error removing preview '{preview.get('filename')}': {e}") diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 90d8aa7..1350362 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -10,6 +10,7 @@ import requests from plyer import notification from pubsub import pub +from src.api.preview_manager import PreviewManager from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue @@ -52,12 +53,18 @@ class DistributedJobManager: return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") + + # Update previews - Force replace if job has completed successfully + PreviewManager.update_previews_for_job(render_job, + replace_existing=(render_job.status == RenderStatus.COMPLETED)) + + # Notify parent if necessary if render_job.parent: # If local job is a subjob from a remote server parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job) - # handle cancelling all the children - elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: + # Cancelling children if necessary + if render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: for child in render_job.children: child_id, hostname = child.split('@') RenderServerProxy(hostname).cancel_job(child_id, confirm=True) @@ -143,6 +150,7 @@ class DistributedJobManager: logger.debug("Not splitting into subjobs") RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + PreviewManager.update_previews_for_job(worker) return worker diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index ae87550..0e7e23b 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -99,6 +99,9 @@ class BaseRenderWorker(Base): self.__process = None self.last_output = None + def __repr__(self): + return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>" + @property def total_frames(self): return (self.end_frame or self.project_length) - self.start_frame + 1 @@ -131,9 +134,8 @@ class BaseRenderWorker(Base): generated_args = [str(x) for x in self.generate_worker_subprocess()] generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): - msg = "Cannot generate subprocess - Multiple arg conflicts detected" + msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}" logger.error(msg) - logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args diff --git a/src/render_queue.py b/src/render_queue.py index 23170c6..8a7c9df 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -121,19 +121,19 @@ class RenderQueue: @classmethod def start_job(cls, job): - logger.info(f'Starting render: {job.name} - Priority {job.priority}') + logger.info(f'Starting job: {job} - Priority {job.priority}') job.start() cls.save_state() @classmethod def cancel_job(cls, job): - logger.info(f'Cancelling job ID: {job.id}') + logger.info(f'Cancelling job: {job}') job.stop() return job.status == RenderStatus.CANCELLED @classmethod def delete_job(cls, job): - logger.info(f"Deleting job ID: {job.id}") + logger.info(f"Deleting job: {job}") job.stop() cls.job_queue.remove(job) cls.session.delete(job)