From af6d6e15258243b2c010c221df7d1353ba22de76 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 23 Aug 2024 19:26:05 -0500 Subject: [PATCH] Document all the things! (#117) Add lots of docstrings everywhere --- src/api/api_server.py | 344 ++++++++++++++-------------- src/api/server_proxy.py | 146 +++++++----- src/distributed_job_manager.py | 84 +++---- src/engines/core/base_downloader.py | 134 ++++++++++- src/engines/core/base_engine.py | 164 +++++++++---- src/engines/core/base_worker.py | 66 ++++-- src/engines/engine_manager.py | 14 ++ src/init.py | 16 +- src/render_queue.py | 111 +++++---- src/ui/main_window.py | 16 +- 10 files changed, 699 insertions(+), 396 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index b353520..664c09d 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -10,143 +10,84 @@ import ssl import tempfile import time from datetime import datetime -from zipfile import ZipFile import psutil import yaml -from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort +from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from sqlalchemy.orm.exc import DetachedInstanceError from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.preview_manager import PreviewManager from src.distributed_job_manager import DistributedJobManager -from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError -from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ current_system_os_version, num_to_alphanumeric -from src.utilities.zeroconf_server import ZeroconfServer +from src.utilities.status_utils import string_to_status logger = logging.getLogger() server = Flask(__name__) ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads -categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, - RenderStatus.COMPLETED, RenderStatus.CANCELLED] + +def start_server(hostname=None): + + # get hostname + if not hostname: + local_hostname = socket.gethostname() + hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") + + # load flask settings + server.config['HOSTNAME'] = hostname + server.config['PORT'] = int(Config.port_number) + server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) + server.config['MAX_CONTENT_PATH'] = Config.max_content_path + server.config['enable_split_jobs'] = Config.enable_split_jobs + + # disable most Flask logging + flask_log = logging.getLogger('werkzeug') + flask_log.setLevel(Config.flask_log_level.upper()) + + logger.debug('Starting API server') + server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, + threaded=True) -# -- Error Handlers -- - -@server.errorhandler(JobNotFoundError) -def handle_job_not_found(job_error): - return str(job_error), 400 - - -@server.errorhandler(DetachedInstanceError) -def handle_detached_instance(error): - # logger.debug(f"detached instance: {error}") - return "Unavailable", 503 - - -@server.errorhandler(Exception) -def handle_general_error(general_error): - err_msg = f"Server error: {general_error}" - logger.error(err_msg) - return err_msg, 500 - - -# -- Jobs -- - - -def sorted_jobs(all_jobs, sort_by_date=True): - if not sort_by_date: - sorted_job_list = [] - if all_jobs: - for status_category in categories: - found_jobs = [x for x in all_jobs if x.status == status_category.value] - if found_jobs: - sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) - sorted_job_list.extend(sorted_found_jobs) - else: - sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True) - return sorted_job_list - +# -------------------------------------------- +# Get All Jobs +# -------------------------------------------- @server.get('/api/jobs') def jobs_json(): - try: - all_jobs = [x.json() for x in RenderQueue.all_jobs()] - job_cache_int = int(json.dumps(all_jobs).__hash__()) - job_cache_token = num_to_alphanumeric(job_cache_int) - return {'jobs': all_jobs, 'token': job_cache_token} - except DetachedInstanceError as e: - raise e - except Exception as e: - logger.error(f"Error fetching jobs_json: {e}") - raise e + """Retrieves all jobs from the render queue in JSON format. + + This endpoint fetches all jobs currently in the render queue, converts them to JSON format, + and returns them along with a cache token that represents the current state of the job list. + + Returns: + dict: A dictionary containing: + - 'jobs' (list[dict]): A list of job dictionaries, each representing a job in the queue. + - 'token' (str): A cache token generated from the hash of the job list. + """ + all_jobs = [x.json() for x in RenderQueue.all_jobs()] + job_cache_int = int(json.dumps(all_jobs).__hash__()) + job_cache_token = num_to_alphanumeric(job_cache_int) + return {'jobs': all_jobs, 'token': job_cache_token} @server.get('/api/jobs_long_poll') def long_polling_jobs(): - try: - hash_token = request.args.get('token', None) - start_time = time.time() - while True: - all_jobs = jobs_json() - if all_jobs['token'] != hash_token: - return all_jobs - # Break after 30 seconds to avoid gateway timeout - if time.time() - start_time > 30: - return {}, 204 - time.sleep(1) - except DetachedInstanceError as e: - raise e - except Exception as e: - logger.error(f"Error fetching long_polling_jobs: {e}") - raise e - - -@server.route('/api/job//thumbnail') -def job_thumbnail(job_id): - - try: - big_thumb = request.args.get('size', False) == "big" - video_ok = request.args.get('video_ok', False) - found_job = RenderQueue.job_with_id(job_id, none_ok=False) - - # trigger a thumbnail update - just in case - PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) - previews = PreviewManager.get_previews_for_job(found_job) - all_previews_list = previews.get('output', previews.get('input', [])) - - video_previews = [x for x in all_previews_list if x['kind'] == 'video'] - image_previews = [x for x in all_previews_list if x['kind'] == 'image'] - filtered_list = video_previews if video_previews and video_ok else image_previews - - # todo - sort by size or other metrics here - if filtered_list: - preview_to_send = filtered_list[0] - mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} - file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') - return send_file(preview_to_send['filename'], mimetype=file_mime_type) - except Exception as e: - logger.error(f'Error getting thumbnail: {e}') - return f'Error getting thumbnail: {e}', 500 - return "No thumbnail available", 404 - - -# Get job file routing -@server.route('/api/job//file/', methods=['GET']) -def get_job_file(job_id, filename): - found_job = RenderQueue.job_with_id(job_id) - try: - for full_path in found_job.file_list(): - if filename in full_path: - return send_file(path_or_file=full_path) - except FileNotFoundError: - abort(404) + hash_token = request.args.get('token', None) + start_time = time.time() + while True: + all_jobs = jobs_json() + if all_jobs['token'] != hash_token: + return all_jobs + # Break after 30 seconds to avoid gateway timeout + if time.time() - start_time > 30: + return {}, 204 + time.sleep(1) @server.get('/api/jobs/') @@ -159,20 +100,33 @@ def filtered_jobs_json(status_val): return f'Cannot find jobs with status {status_val}', 400 -@server.post('/api/job//send_subjob_update_notification') -def subjob_update_notification(job_id): - subjob_details = request.json - DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) - return Response(status=200) - +# -------------------------------------------- +# Job Details / File Handling +# -------------------------------------------- @server.get('/api/job/') -def get_job_status(job_id): +def get_job_details(job_id): + """Retrieves the details of a requested job in JSON format + + Args: + job_id (str): The ID of the render job. + + Returns: + dict: A JSON representation of the job's details. + """ return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): + """Retrieves the log file for a specific render job. + + Args: + job_id (str): The ID of the render job. + + Returns: + Response: The log file's content as plain text, or an empty response if the log file is not found. + """ found_job = RenderQueue.job_with_id(job_id) log_path = system_safe_path(found_job.log_path()) log_data = None @@ -188,7 +142,7 @@ def get_file_list(job_id): @server.route('/api/job//download') -def download_file(job_id): +def download_requested_file(job_id): requested_filename = request.args.get('filename') if not requested_filename: @@ -203,7 +157,7 @@ def download_file(job_id): @server.route('/api/job//download_all') -def download_all(job_id): +def download_all_files(job_id): zip_filename = None @after_this_request @@ -218,6 +172,7 @@ def download_all(job_id): found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): + from zipfile import ZipFile zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(), pathlib.Path(found_job.input_path).stem + '.zip')) with ZipFile(zip_filename, 'w') as zipObj: @@ -229,6 +184,10 @@ def download_all(job_id): return f'Cannot find project files for job {job_id}', 500 +# -------------------------------------------- +# System Environment / Status +# -------------------------------------------- + @server.get('/api/presets') def presets(): presets_path = system_safe_path('config/presets.yaml') @@ -260,13 +219,28 @@ def snapshot(): return server_data -@server.get('/api/_detected_clients') -def detected_clients(): - # todo: dev/debug only. Should not ship this - probably. - return ZeroconfServer.found_hostnames() +@server.route('/api/status') +def status(): + return {"timestamp": datetime.now().isoformat(), + "system_os": current_system_os(), + "system_os_version": current_system_os_version(), + "system_cpu": current_system_cpu(), + "cpu_percent": psutil.cpu_percent(percpu=False), + "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), + "cpu_count": psutil.cpu_count(logical=False), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "memory_percent": psutil.virtual_memory().percent, + "job_counts": RenderQueue.job_counts(), + "hostname": server.config['HOSTNAME'], + "port": server.config['PORT'] + } -# New version +# -------------------------------------------- +# Job Lifecyle (Create, Cancel, Delete) +# -------------------------------------------- + @server.post('/api/add_job') def add_job_handler(): # Process request data @@ -353,31 +327,9 @@ def delete_job(job_id): return f"Error deleting job: {e}", 500 -@server.get('/api/clear_history') -def clear_history(): - RenderQueue.clear_history() - return 'success' - - -@server.route('/api/status') -def status(): - - # Get system info - return {"timestamp": datetime.now().isoformat(), - "system_os": current_system_os(), - "system_os_version": current_system_os_version(), - "system_cpu": current_system_cpu(), - "cpu_percent": psutil.cpu_percent(percpu=False), - "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(logical=False), - "memory_total": psutil.virtual_memory().total, - "memory_available": psutil.virtual_memory().available, - "memory_percent": psutil.virtual_memory().percent, - "job_counts": RenderQueue.job_counts(), - "hostname": server.config['HOSTNAME'], - "port": server.config['PORT'] - } - +# -------------------------------------------- +# Engine Info and Management: +# -------------------------------------------- @server.get('/api/renderer_info') def renderer_info(): @@ -499,35 +451,95 @@ def get_renderer_help(renderer): return f"Cannot find renderer '{renderer}'", 400 +# -------------------------------------------- +# Miscellaneous: +# -------------------------------------------- +@server.post('/api/job//send_subjob_update_notification') +def subjob_update_notification(job_id): + subjob_details = request.json + DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + return Response(status=200) + + +@server.route('/api/job//thumbnail') +def job_thumbnail(job_id): + + try: + big_thumb = request.args.get('size', False) == "big" + video_ok = request.args.get('video_ok', False) + found_job = RenderQueue.job_with_id(job_id, none_ok=False) + + # trigger a thumbnail update - just in case + PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) + previews = PreviewManager.get_previews_for_job(found_job) + all_previews_list = previews.get('output', previews.get('input', [])) + + video_previews = [x for x in all_previews_list if x['kind'] == 'video'] + image_previews = [x for x in all_previews_list if x['kind'] == 'image'] + filtered_list = video_previews if video_previews and video_ok else image_previews + + # todo - sort by size or other metrics here + if filtered_list: + preview_to_send = filtered_list[0] + mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} + file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') + return send_file(preview_to_send['filename'], mimetype=file_mime_type) + except Exception as e: + logger.error(f'Error getting thumbnail: {e}') + return f'Error getting thumbnail: {e}', 500 + return "No thumbnail available", 404 + + +# -------------------------------------------- +# System Benchmarks: +# -------------------------------------------- + @server.get('/api/cpu_benchmark') def get_cpu_benchmark_score(): + from src.utilities.benchmark import cpu_benchmark return str(cpu_benchmark(10)) @server.get('/api/disk_benchmark') def get_disk_benchmark(): + from src.utilities.benchmark import disk_io_benchmark results = disk_io_benchmark() return {'write_speed': results[0], 'read_speed': results[-1]} -def start_server(hostname=None): +# -------------------------------------------- +# Error Handlers: +# -------------------------------------------- - # get hostname - if not hostname: - local_hostname = socket.gethostname() - hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") +@server.errorhandler(JobNotFoundError) +def handle_job_not_found(job_error): + return str(job_error), 400 - # load flask settings - server.config['HOSTNAME'] = hostname - server.config['PORT'] = int(Config.port_number) - server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) - server.config['MAX_CONTENT_PATH'] = Config.max_content_path - server.config['enable_split_jobs'] = Config.enable_split_jobs - # disable most Flask logging - flask_log = logging.getLogger('werkzeug') - flask_log.setLevel(Config.flask_log_level.upper()) +@server.errorhandler(DetachedInstanceError) +def handle_detached_instance(_): + return "Unavailable", 503 - logger.debug('Starting API server') - server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, - threaded=True) + +@server.errorhandler(Exception) +def handle_general_error(general_error): + err_msg = f"Server error: {general_error}" + logger.error(err_msg) + return err_msg, 500 + + +# -------------------------------------------- +# Debug / Development Only: +# -------------------------------------------- + +@server.get('/api/_debug/detected_clients') +def detected_clients(): + # todo: dev/debug only. Should not ship this - probably. + from src.utilities.zeroconf_server import ZeroconfServer + return ZeroconfServer.found_hostnames() + + +@server.get('/api/_debug/clear_history') +def clear_history(): + RenderQueue.clear_history() + return 'success' diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 6c8fcc1..e2ba538 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -10,7 +10,6 @@ from urllib.parse import urljoin from src.utilities.misc_helper import is_localhost from src.utilities.status_utils import RenderStatus -from src.utilities.zeroconf_server import ZeroconfServer status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green', RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', @@ -26,15 +25,8 @@ LOOPBACK = '127.0.0.1' class RenderServerProxy: - """ - The ServerProxy class is responsible for interacting with a remote server. - It provides methods to request data from the server and store the status of the server. - - Attributes: - system_cpu (str): The CPU type of the system. - system_cpu_count (int): The number of CPUs in the system. - system_os (str): The operating system of the system. - system_os_version (str): The version of the operating system. + """The ServerProxy class is responsible for interacting with a remote server. + It provides convenience methods to request data from the server and store the status of the server. """ def __init__(self, hostname, server_port="8080"): @@ -55,6 +47,10 @@ class RenderServerProxy: self.system_os = None self.system_os_version = None + # -------------------------------------------- + # Basics / Connection: + # -------------------------------------------- + def __repr__(self): return f"" @@ -73,6 +69,10 @@ class RenderServerProxy: running_jobs = [x for x in self.__jobs_cache if x['status'] == 'running'] if self.__jobs_cache else [] return f"{len(running_jobs)} running" if running_jobs else "Ready" + # -------------------------------------------- + # Requests: + # -------------------------------------------- + def request_data(self, payload, timeout=5): try: req = self.request(payload, timeout) @@ -103,6 +103,10 @@ class RenderServerProxy: hostname = LOOPBACK if self.is_localhost else self.hostname return requests.get(f'http://{hostname}:{self.port}/api/{payload}', timeout=timeout) + # -------------------------------------------- + # Background Updates: + # -------------------------------------------- + def start_background_update(self): if self.__update_in_background: return @@ -119,17 +123,6 @@ class RenderServerProxy: self.__background_thread.daemon = True self.__background_thread.start() - def stop_background_update(self): - self.__update_in_background = False - - def get_job_info(self, job_id, timeout=5): - return self.request_data(f'job/{job_id}', timeout=timeout) - - def get_all_jobs(self, timeout=5, ignore_token=False): - if not self.__update_in_background or ignore_token: - self.__update_job_cache(timeout, ignore_token) - return self.__jobs_cache.copy() if self.__jobs_cache else None - def __update_job_cache(self, timeout=40, ignore_token=False): if self.__offline_flags: # if we're offline, don't bother with the long poll @@ -147,15 +140,21 @@ class RenderServerProxy: self.__jobs_cache = sorted_jobs self.__jobs_cache_token = status_result['token'] + def stop_background_update(self): + self.__update_in_background = False + + # -------------------------------------------- + # Get System Info: + # -------------------------------------------- + + def get_all_jobs(self, timeout=5, ignore_token=False): + if not self.__update_in_background or ignore_token: + self.__update_job_cache(timeout, ignore_token) + return self.__jobs_cache.copy() if self.__jobs_cache else None + def get_data(self, timeout=5): return self.request_data('full_status', timeout=timeout) - def cancel_job(self, job_id, confirm=False): - return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') - - def delete_job(self, job_id, confirm=False): - return self.request_data(f'job/{job_id}/delete?confirm={confirm}') - def get_status(self): status = self.request_data('status') if status and not self.system_cpu: @@ -165,26 +164,19 @@ class RenderServerProxy: self.system_os_version = status['system_os_version'] return status - def is_engine_available(self, engine_name): - return self.request_data(f'{engine_name}/is_available') + # -------------------------------------------- + # Get Job Info: + # -------------------------------------------- - def get_all_engines(self): - return self.request_data('all_engines') + def get_job_info(self, job_id, timeout=5): + return self.request_data(f'job/{job_id}', timeout=timeout) - def send_subjob_update_notification(self, parent_id, subjob): - """ - Notifies the parent job of an update in a subjob. + def get_job_files_list(self, job_id): + return self.request_data(f"job/{job_id}/file_list") - Args: - parent_id (str): The ID of the parent job. - subjob (Job): The subjob that has updated. - - Returns: - Response: The response from the server. - """ - hostname = LOOPBACK if self.is_localhost else self.hostname - return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification', - json=subjob.json()) + # -------------------------------------------- + # Job Lifecycle: + # -------------------------------------------- def post_job_to_server(self, file_path, job_list, callback=None): """ @@ -232,29 +224,36 @@ class RenderServerProxy: except Exception as e: logger.error(f"An error occurred: {e}") - def get_job_files_list(self, job_id): - return self.request_data(f"job/{job_id}/file_list") + def cancel_job(self, job_id, confirm=False): + return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') - def download_all_job_files(self, job_id, save_path): + def delete_job(self, job_id, confirm=False): + return self.request_data(f'job/{job_id}/delete?confirm={confirm}') + + def send_subjob_update_notification(self, parent_id, subjob): + """ + Notifies the parent job of an update in a subjob. + + Args: + parent_id (str): The ID of the parent job. + subjob (Job): The subjob that has updated. + + Returns: + Response: The response from the server. + """ hostname = LOOPBACK if self.is_localhost else self.hostname - url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all" - return self.__download_file_from_url(url, output_filepath=save_path) + return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification', + json=subjob.json()) - def download_job_file(self, job_id, job_filename, save_path): - hostname = LOOPBACK if self.is_localhost else self.hostname - url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}" - return self.__download_file_from_url(url, output_filepath=save_path) + # -------------------------------------------- + # Renderers: + # -------------------------------------------- - @staticmethod - def __download_file_from_url(url, output_filepath): - with requests.get(url, stream=True) as r: - r.raise_for_status() - with open(output_filepath, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): - f.write(chunk) - return output_filepath + def is_engine_available(self, engine_name): + return self.request_data(f'{engine_name}/is_available') - # --- Renderer --- # + def get_all_engines(self): + return self.request_data('all_engines') def get_renderer_info(self, response_type='standard', timeout=5): """ @@ -285,3 +284,26 @@ class RenderServerProxy: form_data = {'engine': engine, 'version': version, 'system_cpu': system_cpu} hostname = LOOPBACK if self.is_localhost else self.hostname return requests.post(f'http://{hostname}:{self.port}/api/delete_engine', json=form_data) + + # -------------------------------------------- + # Download Files: + # -------------------------------------------- + + def download_all_job_files(self, job_id, save_path): + hostname = LOOPBACK if self.is_localhost else self.hostname + url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all" + return self.__download_file_from_url(url, output_filepath=save_path) + + def download_job_file(self, job_id, job_filename, save_path): + hostname = LOOPBACK if self.is_localhost else self.hostname + url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}" + return self.__download_file_from_url(url, output_filepath=save_path) + + @staticmethod + def __download_file_from_url(url, output_filepath): + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(output_filepath, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + return output_filepath diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 68098e1..d3cb38a 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -39,10 +39,10 @@ class DistributedJobManager: """ Responds to the 'frame_complete' pubsub message for local jobs. - Parameters: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. + Args: + job_id (str): The ID of the job that has changed status. + old_status (str): The previous status of the job. + new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. """ @@ -75,10 +75,10 @@ class DistributedJobManager: Responds to the 'status_change' pubsub message for local jobs. If it's a child job, it notifies the parent job about the status change. - Parameters: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. + Args: + job_id (str): The ID of the job that has changed status. + old_status (str): The previous status of the job. + new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'status_change' pubsub message. """ @@ -129,14 +129,12 @@ class DistributedJobManager: # -------------------------------------------- @classmethod - def create_render_job(cls, job_data, loaded_project_local_path): - """ - Creates render jobs. - - This method job data and a local path to a loaded project. It creates and returns new a render job. + def create_render_job(cls, new_job_attributes, loaded_project_local_path): + """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new + render job. Args: - job_data (dict): Job data. + new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) loaded_project_local_path (str): The local path to the loaded project. Returns: @@ -144,7 +142,7 @@ class DistributedJobManager: """ # get new output path in output_dir - output_path = job_data.get('output_path') + output_path = new_job_attributes.get('output_path') if not output_path: loaded_project_filename = os.path.basename(loaded_project_local_path) output_filename = os.path.splitext(loaded_project_filename)[0] @@ -158,27 +156,27 @@ class DistributedJobManager: logger.debug(f"New job output path: {output_path}") # create & configure jobs - worker = EngineManager.create_worker(renderer=job_data['renderer'], + worker = EngineManager.create_worker(renderer=new_job_attributes['renderer'], input_path=loaded_project_local_path, output_path=output_path, - engine_version=job_data.get('engine_version'), - args=job_data.get('args', {}), - parent=job_data.get('parent'), - name=job_data.get('name')) - worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary? - worker.priority = int(job_data.get('priority', worker.priority)) - worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) - worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) + engine_version=new_job_attributes.get('engine_version'), + args=new_job_attributes.get('args', {}), + parent=new_job_attributes.get('parent'), + name=new_job_attributes.get('name')) + worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? + worker.priority = int(new_job_attributes.get('priority', worker.priority)) + worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) + worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() # determine if we can / should split the job - if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: - cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) + if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: + cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + RenderQueue.add_to_render_queue(worker, force_start=new_job_attributes.get('force_start', False)) PreviewManager.update_previews_for_job(worker) return worker @@ -189,8 +187,7 @@ class DistributedJobManager: @classmethod def handle_subjob_update_notification(cls, local_job, subjob_data): - """ - Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. + """Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. Args: local_job (BaseRenderWorker): The local parent job worker. @@ -214,6 +211,14 @@ class DistributedJobManager: @classmethod def wait_for_subjobs(cls, parent_job): + """Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs + when they are completed. + + Args: + parent_job: Worker object that has child jobs + + Returns: + """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] @@ -280,15 +285,15 @@ class DistributedJobManager: # -------------------------------------------- @classmethod - def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None): + def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): # todo: I don't love this parent_worker.status = RenderStatus.CONFIGURING - cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data, + cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, project_path, system_os)) cls.background_worker.start() @classmethod - def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None): + def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): """ Splits a job into subjobs and distributes them among available servers. @@ -297,10 +302,10 @@ class DistributedJobManager: subjob. Args: - parent_worker (Worker): The worker that is handling the job. - job_data (dict): The data for the job to be split. - project_path (str): The path to the project associated with the job. - system_os (str, optional): The operating system of the servers. Default is any OS. + parent_worker (Worker): The parent job what we're creating the subjobs for. + new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) + project_path (str): The path to the project. + system_os (str, optional): Required OS. Default is any. specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. """ @@ -321,7 +326,7 @@ class DistributedJobManager: try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] - post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, + post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") @@ -342,8 +347,9 @@ class DistributedJobManager: RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod - def __create_subjob(job_data, project_path, server_data, server_hostname, parent_worker): - subjob = job_data.copy() + def __create_subjob(new_job_attributes, project_path, server_data, server_hostname, parent_worker): + """Convenience method to create subjobs for a parent worker""" + subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['start_frame'] = server_data['frame_range'][0] diff --git a/src/engines/core/base_downloader.py b/src/engines/core/base_downloader.py index 356edea..f1d370f 100644 --- a/src/engines/core/base_downloader.py +++ b/src/engines/core/base_downloader.py @@ -1,9 +1,7 @@ import logging import os import shutil -import tarfile import tempfile -import zipfile import requests from tqdm import tqdm @@ -12,26 +10,150 @@ logger = logging.getLogger() class EngineDownloader: + """A class responsible for downloading and extracting rendering engines from publicly available URLs. + + Attributes: + supported_formats (list[str]): A list of file formats supported by the downloader. + """ supported_formats = ['.zip', '.tar.xz', '.dmg'] def __init__(self): pass + # -------------------------------------------- + # Required Overrides for Subclasses: + # -------------------------------------------- + @classmethod def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False): - raise NotImplementedError # implement this method in your engine subclass + """ + Finds the most recent version of the rendering engine available for download. + + This method should be overridden in a subclass to implement the logic for determining + the most recent version of the rendering engine, optionally filtering by long-term + support (LTS) versions, the operating system, and CPU architecture. + + Args: + system_os (str, optional): Desired OS ('linux', 'macos', 'windows'). Defaults to system os. + cpu (str, optional): The CPU architecture for which to download the engine. Default is system cpu. + lts_only (bool, optional): Limit the search to LTS (long-term support) versions only. Default is False. + + Returns: + dict: A dict with the following keys: + - 'cpu' (str): The CPU architecture. + - 'system_os' (str): The operating system. + - 'file' (str): The filename of the version's download file. + - 'url' (str): The remote URL for downloading the version. + - 'version' (str): The version number. + + Raises: + NotImplementedError: If the method is not overridden in a subclass. + """ + raise NotImplementedError(f"find_most_recent_version not implemented for {cls.__class__.__name__}") @classmethod def version_is_available_to_download(cls, version, system_os=None, cpu=None): - raise NotImplementedError # implement this method in your engine subclass + """Checks if a requested version of the rendering engine is available for download. + + This method should be overridden in a subclass to implement the logic for determining + whether a given version of the rendering engine is available for download, based on the + operating system and CPU architecture. + + Args: + version (str): The requested renderer version to download. + system_os (str, optional): Desired OS ('linux', 'macos', 'windows'). Defaults to system os. + cpu (str, optional): The CPU architecture for which to download the engine. Default is system cpu. + + Returns: + bool: True if the version is available for download, False otherwise. + + Raises: + NotImplementedError: If the method is not overridden in a subclass. + """ + raise NotImplementedError(f"version_is_available_to_download not implemented for {cls.__class__.__name__}") @classmethod def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120): - raise NotImplementedError # implement this method in your engine subclass + """Downloads the requested version of the rendering engine to the given download location. + + This method should be overridden in a subclass to implement the logic for downloading + a specific version of the rendering engine. The method is intended to handle the + downloading process based on the version, operating system, CPU architecture, and + timeout parameters. + + Args: + version (str): The requested renderer version to download. + download_location (str): The directory where the engine should be downloaded. + system_os (str, optional): Desired OS ('linux', 'macos', 'windows'). Defaults to system os. + cpu (str, optional): The CPU architecture for which to download the engine. Default is system cpu. + timeout (int, optional): The maximum time in seconds to wait for the download. Default is 120 seconds. + + Raises: + NotImplementedError: If the method is not overridden in a subclass. + """ + raise NotImplementedError(f"download_engine not implemented for {cls.__class__.__name__}") + + # -------------------------------------------- + # Optional Overrides for Subclasses: + # -------------------------------------------- + + @classmethod + def all_versions(cls, system_os=None, cpu=None): + """Retrieves a list of available versions of the software for a specific operating system and CPU architecture. + + This method fetches all available versions for the given operating system and CPU type, constructing + a list of dictionaries containing details such as the version, CPU architecture, system OS, and the + remote URL for downloading each version. + + Args: + system_os (str, optional): Desired OS ('linux', 'macos', 'windows'). Defaults to system os. + cpu (str, optional): The CPU architecture for which to download the engine. Default is system cpu. + + Returns: + list[dict]: A list of dictionaries, each containing: + - 'cpu' (str): The CPU architecture. + - 'file' (str): The filename of the version's download file. + - 'system_os' (str): The operating system. + - 'url' (str): The remote URL for downloading the version. + - 'version' (str): The version number. + """ + return [] + + # -------------------------------------------- + # Do Not Override These Methods: + # -------------------------------------------- @classmethod def download_and_extract_app(cls, remote_url, download_location, timeout=120): + """Downloads an application from the given remote URL and extracts it to the specified location. + + This method handles the downloading of the application, supports multiple archive formats, + and extracts the contents to the specified `download_location`. It also manages temporary + files and logs progress throughout the process. + + Args: + remote_url (str): The URL of the application to download. + download_location (str): The directory where the application should be extracted. + timeout (int, optional): The maximum time in seconds to wait for the download. Default is 120 seconds. + + Returns: + str: The path to the directory where the application was extracted. + + Raises: + Exception: Catches and logs any exceptions that occur during the download or extraction process. + + Supported Formats: + - `.tar.xz`: Extracted using the `tarfile` module. + - `.zip`: Extracted using the `zipfile` module. + - `.dmg`: macOS disk image files, handled using the `dmglib` library. + - Other formats will result in an error being logged. + + Notes: + - If the application already exists in the `download_location`, the method will log an error + and return without downloading or extracting. + - Temporary files created during the download process are cleaned up after completion. + """ # Create a temp download directory temp_download_dir = tempfile.mkdtemp() @@ -80,6 +202,7 @@ class EngineDownloader: # Extract the downloaded file # Process .tar.xz files if temp_downloaded_file_path.lower().endswith('.tar.xz'): + import tarfile try: with tarfile.open(temp_downloaded_file_path, 'r:xz') as tar: tar.extractall(path=download_location) @@ -93,6 +216,7 @@ class EngineDownloader: # Process .zip files elif temp_downloaded_file_path.lower().endswith('.zip'): + import zipfile try: with zipfile.ZipFile(temp_downloaded_file_path, 'r') as zip_ref: zip_ref.extractall(download_location) diff --git a/src/engines/core/base_engine.py b/src/engines/core/base_engine.py index 57fbf5c..ec09133 100644 --- a/src/engines/core/base_engine.py +++ b/src/engines/core/base_engine.py @@ -8,9 +8,21 @@ SUBPROCESS_TIMEOUT = 5 class BaseRenderEngine(object): + """Base class for render engines. This class provides common functionality and structure for various rendering + engines. Create subclasses and override the methods marked below to add additional renderers + + Attributes: + install_paths (list): A list of default installation paths where the render engine + might be found. This list can be populated with common paths to help locate the + executable on different operating systems or environments. + """ install_paths = [] + # -------------------------------------------- + # Required Overrides for Subclasses: + # -------------------------------------------- + def __init__(self, custom_path=None): self.custom_renderer_path = custom_path if not self.renderer_path() or not os.path.exists(self.renderer_path()): @@ -20,6 +32,115 @@ class BaseRenderEngine(object): logger.warning(f"Path is not executable. Setting permissions to 755 for {self.renderer_path()}") os.chmod(self.renderer_path(), 0o755) + def version(self): + """Return the version number as a string. + + Returns: + str: Version number. + + Raises: + NotImplementedError: If not overridden. + """ + raise NotImplementedError(f"version not implemented for {self.__class__.__name__}") + + def get_project_info(self, project_path, timeout=10): + """Extracts detailed project information from the given project path. + + Args: + project_path (str): The path to the project file. + timeout (int, optional): The maximum time (in seconds) to wait for the operation. Default is 10 seconds. + + Returns: + dict: A dictionary containing project information (subclasses should define the structure). + + Raises: + NotImplementedError: If the method is not overridden in a subclass. + """ + raise NotImplementedError(f"get_project_info not implemented for {self.__class__.__name__}") + + @classmethod + def get_output_formats(cls): + """Returns a list of available output formats supported by the renderer. + + Returns: + list[str]: A list of strings representing the available output formats. + """ + raise NotImplementedError(f"get_output_formats not implemented for {cls.__name__}") + + @staticmethod + def worker_class(): # override when subclassing to link worker class + raise NotImplementedError("Worker class not implemented") + + # -------------------------------------------- + # Optional Overrides for Subclasses: + # -------------------------------------------- + + def supported_extensions(self): + """ + Returns: + list[str]: list of supported extensions + """ + return [] + + def get_help(self): + """Retrieves the help documentation for the renderer. + + This method runs the renderer's help command (default: '-h') and captures the output. + Override this method if the renderer uses a different help flag. + + Returns: + str: The help documentation as a string. + + Raises: + FileNotFoundError: If the renderer path is not found. + """ + path = self.renderer_path() + if not path: + raise FileNotFoundError("renderer path not found") + creationflags = subprocess.CREATE_NO_WINDOW if platform.system() == 'Windows' else 0 + help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT, + timeout=SUBPROCESS_TIMEOUT, creationflags=creationflags).decode('utf-8') + return help_doc + + def system_info(self): + """Return additional information about the system specfic to the engine (configured GPUs, render engines, etc) + + Returns: + dict: A dictionary with engine-specific system information + """ + return {} + + def perform_presubmission_tasks(self, project_path): + """Perform any pre-submission tasks on a project file before uploading it to a server (pack textures, etc.) + + Override this method to: + 1. Copy the project file to a temporary location (DO NOT MODIFY ORIGINAL PATH). + 2. Perform additional modifications or tasks. + 3. Return the path to the modified project file. + + Args: + project_path (str): The original project file path. + + Returns: + str: The path to the modified project file. + """ + return project_path + + def get_arguments(self): + pass + + @staticmethod + def downloader(): # override when subclassing if using a downloader class + return None + + @staticmethod + def ui_options(system_info): # override to return options for ui + return {} + + # -------------------------------------------- + # Do Not Override These Methods: + # -------------------------------------------- + def renderer_path(self): return self.custom_renderer_path or self.default_renderer_path() @@ -39,46 +160,3 @@ class BaseRenderEngine(object): except Exception as e: logger.exception(e) return path - - def version(self): - raise NotImplementedError("version not implemented") - - def supported_extensions(self): - return [] - - @staticmethod - def downloader(): # override when subclassing if using a downloader class - return None - - @staticmethod - def worker_class(): # override when subclassing to link worker class - raise NotImplementedError("Worker class not implemented") - - @staticmethod - def ui_options(system_info): # override to return options for ui - return {} - - def get_help(self): # override if renderer uses different help flag - path = self.renderer_path() - if not path: - raise FileNotFoundError("renderer path not found") - creationflags = subprocess.CREATE_NO_WINDOW if platform.system() == 'Windows' else 0 - help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT, - timeout=SUBPROCESS_TIMEOUT, creationflags=creationflags).decode('utf-8') - return help_doc - - def get_project_info(self, project_path, timeout=10): - raise NotImplementedError(f"get_project_info not implemented for {self.__name__}") - - @classmethod - def get_output_formats(cls): - raise NotImplementedError(f"get_output_formats not implemented for {cls.__name__}") - - def get_arguments(self): - pass - - def system_info(self): - pass - - def perform_presubmission_tasks(self, project_path): - return project_path diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index 9676865..b2da574 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -48,6 +48,10 @@ class BaseRenderWorker(Base): engine = None + # -------------------------------------------- + # Required Overrides for Subclasses: + # -------------------------------------------- + def __init__(self, input_path, output_path, engine_path, priority=2, args=None, ignore_extensions=True, parent=None, name=None): @@ -57,7 +61,7 @@ class BaseRenderWorker(Base): logger.error(err_meg) raise ValueError(err_meg) if not self.engine: - raise NotImplementedError("Engine not defined") + raise NotImplementedError(f"Engine not defined for {self.__class__.__name__}") def generate_id(): import uuid @@ -103,6 +107,50 @@ class BaseRenderWorker(Base): self.__last_output_time = None self.watchdog_timeout = 120 + def generate_worker_subprocess(self): + """Generate a return a list of the command line arguments necessary to perform requested job + + Returns: + list[str]: list of command line arguments + """ + raise NotImplementedError("generate_worker_subprocess not implemented") + + def _parse_stdout(self, line): + """Parses a line of standard output from the renderer. + + This method should be overridden in a subclass to implement the logic for processing + and interpreting a single line of output from the renderer's standard output stream. + + On frame completion, the subclass should: + 1. Update value of self.current_frame + 2. Call self._send_frame_complete_notification() + + Args: + line (str): A line of text from the renderer's standard output. + + Raises: + NotImplementedError: If the method is not overridden in a subclass. + """ + raise NotImplementedError(f"_parse_stdout not implemented for {self.__class__.__name__}") + + # -------------------------------------------- + # Optional Overrides for Subclasses: + # -------------------------------------------- + + def percent_complete(self): + # todo: fix this + if self.status == RenderStatus.COMPLETED: + return 1.0 + return 0 + + def post_processing(self): + """Override to perform any engine-specific postprocessing""" + pass + + # -------------------------------------------- + # Do Not Override These Methods: + # -------------------------------------------- + def __repr__(self): return f"" @@ -142,16 +190,13 @@ class BaseRenderWorker(Base): return generated_args def get_raw_args(self): - raw_args_string = self.args.get('raw', None) + raw_args_string = self.args.get('raw', '') raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args - def generate_worker_subprocess(self): - raise NotImplementedError("generate_worker_subprocess not implemented") - def log_path(self): filename = (self.name or os.path.basename(self.input_path)) + '_' + \ self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log' @@ -387,9 +432,6 @@ class BaseRenderWorker(Base): except Exception as e: logger.error(f"Error stopping the process: {e}") - def post_processing(self): - pass - def is_running(self): if hasattr(self, '__thread'): return self.__thread.is_alive() @@ -418,14 +460,6 @@ class BaseRenderWorker(Base): if self.is_running(): # allow the log files to close self.__thread.join(timeout=5) - def percent_complete(self): - if self.status == RenderStatus.COMPLETED: - return 1.0 - return 0 - - def _parse_stdout(self, line): - raise NotImplementedError("_parse_stdout not implemented") - def time_elapsed(self): return get_time_elapsed(self.start_time, self.end_time) diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index 9721181..77018f8 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -12,6 +12,9 @@ logger = logging.getLogger() class EngineManager: + """Class that manages different versions of installed renderers and handles fetching and downloading new versions, + if possible. + """ engines_path = None download_tasks = [] @@ -283,6 +286,17 @@ class EngineManager: class EngineDownloadWorker(threading.Thread): + """A thread worker for downloading a specific version of a rendering engine. + + This class handles the process of downloading a rendering engine in a separate thread, + ensuring that the download process does not block the main application. + + Attributes: + engine (str): The name of the rendering engine to download. + version (str): The version of the rendering engine to download. + system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type. + cpu (str, optional): Requested CPU architecture. Defaults to system CPU type. + """ def __init__(self, engine, version, system_os=None, cpu=None): super().__init__() self.engine = engine diff --git a/src/init.py b/src/init.py index c5cc400..6abdad4 100644 --- a/src/init.py +++ b/src/init.py @@ -1,11 +1,9 @@ -''' app/init.py ''' import logging import multiprocessing import os import socket import sys import threading -import time from collections import deque from src.api.api_server import start_server @@ -22,8 +20,10 @@ logger = logging.getLogger() def run(server_only=False) -> int: - """ - Initializes the application and runs it. + """Initializes the application and runs it. + + Args: + server_only: Run in server-only CLI mode. Default is False (runs in GUI mode). Returns: int: The exit status code. @@ -84,9 +84,7 @@ def run(server_only=False) -> int: 'system_os_version': current_system_os_version()} ZeroconfServer.start() logger.info(f"Zordon Render Server started - Hostname: {local_hostname}") - - RenderQueue.evaluation_inverval = Config.queue_eval_seconds - RenderQueue.start() + RenderQueue.start() # Start evaluating the render queue # start in gui or server only (cli) mode logger.debug(f"Launching in {'server only' if server_only else 'GUI'} mode") @@ -137,8 +135,8 @@ def __setup_buffer_handler(): buffer_handler = BufferingHandler() buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter) - logger = logging.getLogger() - logger.addHandler(buffer_handler) + new_logger = logging.getLogger() + new_logger.addHandler(buffer_handler) return buffer_handler diff --git a/src/render_queue.py b/src/render_queue.py index e7cb504..c6df222 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -29,19 +29,37 @@ class RenderQueue: maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} last_saved_counts = {} is_running = False - __eval_thread = None - evaluation_inverval = 1 # -------------------------------------------- - # Start / Stop Background Updates + # Render Queue Evaluation: # -------------------------------------------- @classmethod def start(cls): + """Start evaluating the render queue""" logger.debug("Starting render queue updates") cls.is_running = True cls.evaluate_queue() + @classmethod + def evaluate_queue(cls): + try: + not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) + for job in not_started: + if cls.is_available_for_job(job.renderer, job.priority): + cls.start_job(job) + + scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) + for job in scheduled: + if job.scheduled_start <= datetime.now(): + logger.debug(f"Starting scheduled job: {job}") + cls.start_job(job) + + if cls.last_saved_counts != cls.job_counts(): + cls.save_state() + except DetachedInstanceError: + pass + @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): render_job = RenderQueue.job_with_id(job_id, none_ok=True) @@ -55,20 +73,9 @@ class RenderQueue: cls.is_running = False # -------------------------------------------- - # Queue Management + # Fetch Jobs: # -------------------------------------------- - @classmethod - def add_to_render_queue(cls, render_job, force_start=False): - logger.info(f"Adding job to render queue: {render_job}") - cls.job_queue.append(render_job) - if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): - cls.start_job(render_job) - cls.session.add(render_job) - cls.save_state() - if cls.is_running: - cls.evaluate_queue() - @classmethod def all_jobs(cls): return cls.job_queue @@ -98,12 +105,15 @@ class RenderQueue: return found_job @classmethod - def clear_history(cls): - to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] - for job_to_remove in to_remove: - cls.delete_job(job_to_remove) - cls.save_state() + def job_counts(cls): + job_counts = {} + for job_status in RenderStatus: + job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) + return job_counts + + # -------------------------------------------- + # Startup / Shutdown: + # -------------------------------------------- @classmethod def load_state(cls, database_directory): @@ -128,6 +138,16 @@ class RenderQueue: cls.save_state() cls.session.close() + # -------------------------------------------- + # Renderer Availability: + # -------------------------------------------- + + @classmethod + def renderer_instances(cls): + from collections import Counter + all_instances = [x.renderer for x in cls.running_jobs()] + return Counter(all_instances) + @classmethod def is_available_for_job(cls, renderer, priority=2): @@ -137,24 +157,20 @@ class RenderQueue: maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs + # -------------------------------------------- + # Job Lifecycle Management: + # -------------------------------------------- + @classmethod - def evaluate_queue(cls): - try: - not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) - for job in not_started: - if cls.is_available_for_job(job.renderer, job.priority): - cls.start_job(job) - - scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) - for job in scheduled: - if job.scheduled_start <= datetime.now(): - logger.debug(f"Starting scheduled job: {job}") - cls.start_job(job) - - if cls.last_saved_counts != cls.job_counts(): - cls.save_state() - except DetachedInstanceError: - pass + def add_to_render_queue(cls, render_job, force_start=False): + logger.info(f"Adding job to render queue: {render_job}") + cls.job_queue.append(render_job) + if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): + cls.start_job(render_job) + cls.session.add(render_job) + cls.save_state() + if cls.is_running: + cls.evaluate_queue() @classmethod def start_job(cls, job): @@ -177,15 +193,14 @@ class RenderQueue: cls.save_state() return True - @classmethod - def renderer_instances(cls): - from collections import Counter - all_instances = [x.renderer for x in cls.running_jobs()] - return Counter(all_instances) + # -------------------------------------------- + # Miscellaneous: + # -------------------------------------------- @classmethod - def job_counts(cls): - job_counts = {} - for job_status in RenderStatus: - job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts + def clear_history(cls): + to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] + for job_to_remove in to_remove: + cls.delete_job(job_to_remove) + cls.save_state() diff --git a/src/ui/main_window.py b/src/ui/main_window.py index 7577b07..e56782a 100644 --- a/src/ui/main_window.py +++ b/src/ui/main_window.py @@ -20,14 +20,14 @@ from src.render_queue import RenderQueue from src.utilities.misc_helper import get_time_elapsed, resources_dir, is_localhost from src.utilities.status_utils import RenderStatus from src.utilities.zeroconf_server import ZeroconfServer -from .add_job import NewRenderJobForm -from .console import ConsoleWindow -from .engine_browser import EngineBrowserWindow -from .log_viewer import LogViewer -from .widgets.menubar import MenuBar -from .widgets.proportional_image_label import ProportionalImageLabel -from .widgets.statusbar import StatusBar -from .widgets.toolbar import ToolBar +from src.ui.add_job import NewRenderJobForm +from src.ui.console import ConsoleWindow +from src.ui.engine_browser import EngineBrowserWindow +from src.ui.log_viewer import LogViewer +from src.ui.widgets.menubar import MenuBar +from src.ui.widgets.proportional_image_label import ProportionalImageLabel +from src.ui.widgets.statusbar import StatusBar +from src.ui.widgets.toolbar import ToolBar from src.api.serverproxy_manager import ServerProxyManager from src.utilities.misc_helper import launch_url