New PreviewManager to handle generating previews asynchronously (#86)

* Add PreviewManager

* Refactoring and better error handling

* Integrate PreviewManager into api_server.py

* Integrate PreviewManager into distributed_job_manager.py

* Add method to preview_manager.py to delete previews and integrate it into api_server

* Misc logging improvements

* Misc code cleanup

* Replace existing preview on job completion - Minor code fixes
This commit is contained in:
2024-08-04 16:45:46 -05:00
committed by GitHub
parent 21011e47ca
commit 1cdb7810bf
5 changed files with 203 additions and 98 deletions

View File

@@ -19,17 +19,17 @@ import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
current_system_os_version, num_to_alphanumeric
from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
logger = logging.getLogger()
server = Flask(__name__)
@@ -85,43 +85,31 @@ def long_polling_jobs():
@server.route('/api/job/<job_id>/thumbnail')
def job_thumbnail(job_id):
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
if found_job:
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4')
big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg')
try:
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
# generate regular thumb if it doesn't exist
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
# trigger a thumbnail update - just in case
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
previews = PreviewManager.get_previews_for_job(found_job)
all_previews_list = previews.get('output', previews.get('input', []))
# generate big thumb if it doesn't exist
if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800)
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
filtered_list = video_previews if video_previews and video_ok else image_previews
# generated videos
if video_ok:
if big_thumb and os.path.exists(big_video_path) and not os.path.exists(
big_video_path + '_IN-PROGRESS'):
return send_file(big_video_path, mimetype="video/mp4")
elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
return send_file(thumb_video_path, mimetype="video/mp4")
# Generated thumbs
if big_thumb and os.path.exists(big_image_path):
return send_file(big_image_path, mimetype='image/jpeg')
elif os.path.exists(thumb_image_path):
return send_file(thumb_image_path, mimetype='image/jpeg')
return found_job.status.value, 200
return found_job.status.value, 404
# todo - sort by size or other metrics here
if filtered_list:
preview_to_send = filtered_list[0]
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
except Exception as e:
logger.exception(f'Error getting thumbnail: {e}')
return f'Error getting thumbnail: {e}', 500
return "No thumbnail available", 404
# Get job file routing
@@ -305,14 +293,7 @@ def delete_job(job_id):
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
shutil.rmtree(output_dir)
# Remove any thumbnails
for filename in os.listdir(server.config['THUMBS_FOLDER']):
if job_id in filename:
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
if os.path.exists(thumb_path):
os.remove(thumb_path)
PreviewManager.delete_previews_for_job(found_job)
# See if we own the project_dir (i.e. was it uploaded)
project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
@@ -495,54 +476,59 @@ def start_server():
RenderQueue.evaluate_queue()
time.sleep(delay_sec)
# get hostname
local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings
server.config['HOSTNAME'] = local_hostname
server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs'))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs
# Setup directory for saving engines to
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
# Debug info
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(Config.flask_log_level.upper())
# check for updates for render engines if configured or on first launch
if Config.update_engines_on_launch or not EngineManager.get_engines():
EngineManager.update_all_engines()
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
thread.start()
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
try:
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
use_reloader=False, threaded=True)
# get hostname
local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings
server.config['HOSTNAME'] = local_hostname
server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs
# Setup storage directories
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
PreviewManager.storage_path = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
server.config['THUMBS_FOLDER'] = PreviewManager.storage_path # todo: remove this
# Debug info
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(Config.flask_log_level.upper())
# check for updates for render engines if configured or on first launch
if Config.update_engines_on_launch or not EngineManager.get_engines():
EngineManager.update_all_engines()
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
thread.start()
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
try:
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
use_reloader=False, threaded=True)
finally:
RenderQueue.save_state()
finally:
RenderQueue.save_state()
ZeroconfServer.stop()

109
src/api/preview_manager.py Normal file
View File

@@ -0,0 +1,109 @@
import logging
import os
import subprocess
import threading
from pathlib import Path
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
logger = logging.getLogger()
supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
supported_image_formats = ['.jpg', '.png', '.exr', '.tif']
class PreviewManager:
storage_path = None
_running_jobs = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320):
# Determine best source file to use for thumbs
job_file_list = job.file_list()
source_files = job_file_list if job_file_list else [job.input_path]
preview_label = "output" if job_file_list else "input"
# filter by type
found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats]
found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats]
# check if we even have any valid files to work from
if source_files and not found_video_files and not found_image_files:
logger.warning(f"No valid image or video files found in files from job: {job}")
return
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg'
if replace_existing:
for x in [preview_image_path, preview_video_path]:
try:
os.remove(x)
except OSError:
pass
# Generate image previews
if (found_video_files or found_image_files) and not os.path.exists(preview_image_path):
try:
path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1]
logger.debug(f"Generating image preview for {path_of_source}")
save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width)
logger.debug(f"Successfully created image preview for {path_of_source}")
except Exception as e:
logger.error(f"Error generating image preview for {job}: {e}")
# Generate video previews
if found_video_files and not os.path.exists(preview_video_path):
try:
path_of_source = found_video_files[0]
logger.debug(f"Generating video preview for {path_of_source}")
generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width)
logger.debug(f"Successfully created video preview for {path_of_source}")
except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}")
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = cls._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
else:
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
directory_path = Path(cls.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
results = {}
for preview_filename in preview_files_for_job:
try:
pixel_width = str(preview_filename).split('-')[-1]
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
extension = os.path.splitext(preview_filename)[-1].lower()
kind = 'video' if extension in supported_video_formats else \
'image' if extension in supported_image_formats else 'unknown'
results[preview_label] = results.get(preview_label, [])
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
except IndexError: # ignore invalid filenames
pass
return results
@classmethod
def delete_previews_for_job(cls, job):
all_previews = cls.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:
try:
logger.debug(f"Removing preview: {preview['filename']}")
os.remove(preview['filename'])
except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")

View File

@@ -10,6 +10,7 @@ import requests
from plyer import notification
from pubsub import pub
from src.api.preview_manager import PreviewManager
from src.api.server_proxy import RenderServerProxy
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
@@ -52,12 +53,18 @@ class DistributedJobManager:
return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
# Update previews - Force replace if job has completed successfully
PreviewManager.update_previews_for_job(render_job,
replace_existing=(render_job.status == RenderStatus.COMPLETED))
# Notify parent if necessary
if render_job.parent: # If local job is a subjob from a remote server
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
# handle cancelling all the children
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
# Cancelling children if necessary
if render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
for child in render_job.children:
child_id, hostname = child.split('@')
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
@@ -143,6 +150,7 @@ class DistributedJobManager:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
PreviewManager.update_previews_for_job(worker)
return worker

View File

@@ -99,6 +99,9 @@ class BaseRenderWorker(Base):
self.__process = None
self.last_output = None
def __repr__(self):
return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>"
@property
def total_frames(self):
return (self.end_frame or self.project_length) - self.start_frame + 1
@@ -131,9 +134,8 @@ class BaseRenderWorker(Base):
generated_args = [str(x) for x in self.generate_worker_subprocess()]
generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)):
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}"
logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg)
return generated_args

View File

@@ -121,19 +121,19 @@ class RenderQueue:
@classmethod
def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
logger.info(f'Starting job: {job} - Priority {job.priority}')
job.start()
cls.save_state()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
logger.info(f'Cancelling job: {job}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}")
logger.info(f"Deleting job: {job}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)