From fa4a97f6fad61a556f39a9a0330cbcfae9c62cd7 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 5 Jun 2026 22:01:20 -0500 Subject: [PATCH] Refactor: ApplicationContext DI wiring (#131) * refactor: wire all services through ApplicationContext - Created src/application_context.py as DI container with TYPE_CHECKING imports - server.py now instantiates all services in dependency order via ApplicationContext - Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix to avoid shadowing by same-named @classmethod forwarders - ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to configure forwarder, direct _configure/_start calls during wiring - Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact - RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained - DistributedJobManager: subscribe_to_listener moved to __init__ * fix: greedy regex in get_render_devices swallows BlenderKit log output Changed regex from greedy [\s\S]* to non-greedy .*? with re.DOTALL so it stops at the first ] (the end of the GPU data JSON array) instead of matching through timestamped log lines like [19:36:22.109, __init__.py:2881] that contain trailing brackets. * fix: AttributeError on .enabled in update_job_count prevents options from rendering * refactor: log silent AttributeError catches, add _sync_class to remaining services, drop dead ctx slot --- server.py | 68 ++-- src/api/job_import_handler.py | 2 +- src/api/preview_manager.py | 87 +++--- src/application_context.py | 23 ++ src/distributed_job_manager.py | 213 +++++-------- src/engines/blender/blender_engine.py | 4 +- src/engines/engine_manager.py | 433 +++++++++++--------------- src/render_queue.py | 305 +++++++++++------- src/ui/add_job_window.py | 9 +- src/ui/main_window.py | 8 +- src/utilities/config.py | 49 ++- src/utilities/zeroconf_server.py | 164 ++++++---- 12 files changed, 714 insertions(+), 651 deletions(-) create mode 100644 src/application_context.py diff --git a/server.py b/server.py index 40520ac..5b4192c 100755 --- a/server.py +++ b/server.py @@ -11,6 +11,7 @@ from src.api.api_server import API_VERSION from src.api.api_server import start_api_server from src.api.preview_manager import PreviewManager from src.api.serverproxy_manager import ServerProxyManager +from src.application_context import ApplicationContext from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue @@ -26,21 +27,46 @@ logger = logging.getLogger() class ZordonServer: def __init__(self): + self.ctx = ApplicationContext() + # setup logging logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=Config.server_log_level.upper()) logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging logging.getLogger("urllib3").setLevel(logging.WARNING) - # Load Config YAML + # ---- Bootstrap Config ---- Config.setup_config_dir() config_path = Path(Config.config_dir()) / "config.yaml" - Config.load_config(config_path) + self.ctx.config = Config() + self.ctx.config.load(config_path) + Config._default_instance = self.ctx.config + Config._sync_class() - # configure default paths - EngineManager.engines_path = str(Path(Config.upload_folder).expanduser()/ "engines") - os.makedirs(EngineManager.engines_path, exist_ok=True) - PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews" + # ---- Engine Manager ---- + self.ctx.engine_manager = EngineManager() + self.ctx.engine_manager.engines_path = Path(Config.upload_folder).expanduser() / "engines" + os.makedirs(self.ctx.engine_manager.engines_path, exist_ok=True) + EngineManager._default_instance = self.ctx.engine_manager + EngineManager._sync_class() + + # ---- Preview Manager ---- + self.ctx.preview_manager = PreviewManager() + self.ctx.preview_manager.storage_path = Path(Config.upload_folder).expanduser() / "previews" + PreviewManager._default_instance = self.ctx.preview_manager + PreviewManager._sync_class() + + # ---- Render Queue ---- + self.ctx.render_queue = RenderQueue() + self.ctx.render_queue.load_state(database_directory=Path(Config.upload_folder).expanduser()) + RenderQueue._default_instance = self.ctx.render_queue + RenderQueue._sync_class() + + # ---- Distributed Job Manager ---- + self.ctx.distributed_job_manager = DistributedJobManager() + self.ctx.distributed_job_manager.subscribe_to_listener() + DistributedJobManager._default_instance = self.ctx.distributed_job_manager + DistributedJobManager._sync_class() self.api_server = None self.server_hostname: str = socket.gethostname() @@ -73,10 +99,8 @@ class ZordonServer: logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}") logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") logger.debug(f"Engines directory: {EngineManager.engines_path}") - # Set up the RenderQueue object - RenderQueue.load_state(database_directory=Path(Config.upload_folder).expanduser()) + ServerProxyManager.subscribe_to_listener() - DistributedJobManager.subscribe_to_listener() # update hostname self.server_hostname = socket.gethostname() @@ -87,16 +111,21 @@ class ZordonServer: self.api_server.start() # start zeroconf server - ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) - ZeroconfServer.properties = {'system_cpu': current_system_cpu(), - 'system_cpu_brand': current_system_cpu_brand(), - 'system_cpu_cores': multiprocessing.cpu_count(), - 'system_os': current_system_os(), - 'system_os_version': current_system_os_version(), - 'system_memory': round(psutil.virtual_memory().total / (1024**3)), # in GB - 'gpu_info': get_gpu_info(), - 'api_version': API_VERSION} - ZeroconfServer.start() + ctx = self.ctx + ctx.zeroconf_server = ZeroconfServer() + ctx.zeroconf_server._configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) + ctx.zeroconf_server.properties = {'system_cpu': current_system_cpu(), + 'system_cpu_brand': current_system_cpu_brand(), + 'system_cpu_cores': multiprocessing.cpu_count(), + 'system_os': current_system_os(), + 'system_os_version': current_system_os_version(), + 'system_memory': round(psutil.virtual_memory().total / (1024**3)), + 'gpu_info': get_gpu_info(), + 'api_version': API_VERSION} + ZeroconfServer._default_instance = ctx.zeroconf_server + ZeroconfServer._sync_class() + ctx.zeroconf_server._start() + logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}") RenderQueue.start() # Start evaluating the render queue @@ -112,6 +141,7 @@ class ZordonServer: logger.exception(f"Exception during prepare for shutdown: {e}") logger.info(f"{APP_NAME} Render Server has shut down") + if __name__ == '__main__': server = ZordonServer() try: diff --git a/src/api/job_import_handler.py b/src/api/job_import_handler.py index 59be69e..3fafd53 100644 --- a/src/api/job_import_handler.py +++ b/src/api/job_import_handler.py @@ -11,7 +11,7 @@ import requests from tqdm import tqdm from werkzeug.utils import secure_filename -from distributed_job_manager import DistributedJobManager +from src.distributed_job_manager import DistributedJobManager logger = logging.getLogger() diff --git a/src/api/preview_manager.py b/src/api/preview_manager.py index be41d5e..a01670d 100644 --- a/src/api/preview_manager.py +++ b/src/api/preview_manager.py @@ -3,6 +3,7 @@ import os import subprocess import threading from pathlib import Path +from typing import Dict, Optional from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame @@ -12,20 +13,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web class PreviewManager: - """Manages generation, storage, and retrieval of preview images and videos for rendering jobs.""" + _default_instance: Optional['PreviewManager'] = None - storage_path = None - _running_jobs = {} + storage_path: Optional[str] = None + + def __init__(self) -> None: + self.storage_path = None + self._running_jobs: Dict = {} @classmethod - def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480): - """Generates image and video previews for a given job. + def _sync_class(cls) -> None: + if cls._default_instance is not None: + cls.storage_path = cls._default_instance.storage_path - Args: - job: The job object containing file information. - replace_existing (bool): Whether to replace existing previews. Defaults to False. - max_width (int): Maximum width for the preview images/videos. Defaults to 480. - """ + def _generate_job_preview_worker(self, job, replace_existing=False, max_width=480): # Determine best source file to use for thumbs job_file_list = job.file_list() @@ -41,8 +42,8 @@ class PreviewManager: logger.warning(f"No valid image or video files found in files from job: {job}") return - os.makedirs(cls.storage_path, exist_ok=True) - base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") + os.makedirs(self.storage_path, exist_ok=True) + base_path = os.path.join(self.storage_path, f"{job.id}-{preview_label}-{max_width}") preview_video_path = base_path + '.mp4' preview_image_path = base_path + '.jpg' @@ -73,41 +74,23 @@ class PreviewManager: except subprocess.CalledProcessError as e: logger.error(f"Error generating video preview for {job}: {e}") - @classmethod - def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): - """Updates previews for a given job by starting a background thread. - - Args: - job: The job object. - replace_existing (bool): Whether to replace existing previews. Defaults to False. - wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False. - timeout (float): Timeout for waiting, if applicable. - """ - job_thread = cls._running_jobs.get(job.id) + def _update_previews_for_job(self, job, replace_existing=False, wait_until_completion=False, timeout=None): + job_thread = self._running_jobs.get(job.id) if job_thread and job_thread.is_alive(): logger.debug(f'Preview generation job already running for {job}') - return - - job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,)) - job_thread.start() - cls._running_jobs[job.id] = job_thread + else: + job_thread = threading.Thread(target=self._generate_job_preview_worker, args=(job, replace_existing,)) + job_thread.start() + self._running_jobs[job.id] = job_thread if wait_until_completion: job_thread.join(timeout=timeout) - @classmethod - def get_previews_for_job(cls, job): - """Retrieves previews for a given job. + def _get_previews_for_job(self, job): - Args: - job: The job object. - - Returns: - dict: A dictionary containing preview information. - """ results = {} try: - directory_path = Path(cls.storage_path) + directory_path = Path(self.storage_path) preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] for preview_filename in preview_files_for_job: @@ -125,14 +108,8 @@ class PreviewManager: pass return results - @classmethod - def delete_previews_for_job(cls, job): - """Deletes all previews associated with a given job. - - Args: - job: The job object. - """ - all_previews = cls.get_previews_for_job(job) + def _delete_previews_for_job(self, job): + all_previews = self.get_previews_for_job(job) flattened_list = [item for sublist in all_previews.values() for item in sublist] for preview in flattened_list: try: @@ -140,3 +117,21 @@ class PreviewManager: os.remove(preview['filename']) except OSError as e: logger.error(f"Error removing preview '{preview.get('filename')}': {e}") + + # --- Forwarders for backward compatibility --- + + @classmethod + def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): + if cls._default_instance is not None: + cls._default_instance._update_previews_for_job(job, replace_existing, wait_until_completion, timeout) + + @classmethod + def get_previews_for_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._get_previews_for_job(job) + return {} + + @classmethod + def delete_previews_for_job(cls, job): + if cls._default_instance is not None: + cls._default_instance._delete_previews_for_job(job) diff --git a/src/application_context.py b/src/application_context.py new file mode 100644 index 0000000..64b7cfa --- /dev/null +++ b/src/application_context.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from src.api.preview_manager import PreviewManager + from src.distributed_job_manager import DistributedJobManager + from src.engines.engine_manager import EngineManager + from src.render_queue import RenderQueue + from src.utilities.config import Config + from src.utilities.zeroconf_server import ZeroconfServer + + +class ApplicationContext: + """Holds all service instances. Single source of truth for wiring.""" + + def __init__(self) -> None: + self.config: Optional[Config] = None + self.engine_manager: Optional[EngineManager] = None + self.preview_manager: Optional[PreviewManager] = None + self.zeroconf_server: Optional[ZeroconfServer] = None + self.render_queue: Optional[RenderQueue] = None + self.distributed_job_manager: Optional[DistributedJobManager] = None diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index ccd5047..e817261 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -3,8 +3,9 @@ import os import socket import threading import time +from typing import Optional +from pathlib import Path -from click import Path from plyer import notification from pubsub import pub @@ -21,47 +22,32 @@ logger = logging.getLogger() class DistributedJobManager: - - def __init__(self): - pass + _default_instance: Optional['DistributedJobManager'] = None @classmethod - def subscribe_to_listener(cls): - """ - Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. - This should be called once, typically during the initialization phase. - """ - pub.subscribe(cls.__local_job_status_changed, 'status_change') - pub.subscribe(cls.__local_job_frame_complete, 'frame_complete') + def _sync_class(cls) -> None: + if cls._default_instance is not None: + pass # no class-level attributes to sync - @classmethod - def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): + def __init__(self) -> None: + self.background_worker: Optional[threading.Thread] = None - """ - Responds to the 'frame_complete' pubsub message for local jobs. - - Args: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. - - Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. - """ + def _subscribe_to_listener(self) -> None: + pub.subscribe(self._local_job_status_changed, 'status_change') + pub.subscribe(self._local_job_frame_complete, 'frame_complete') + def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if not render_job: # ignore jobs not in the queue + if not render_job: return logger.debug(f"Job {job_id} has completed frame #{frame_number}") replace_existing_previews = (frame_number % update_interval) == 0 - cls.__job_update_shared(render_job, replace_existing_previews) + self._job_update_shared(render_job, replace_existing_previews) - @classmethod - def __job_update_shared(cls, render_job, replace_existing_previews=False): - # update previews + def _job_update_shared(self, render_job, replace_existing_previews=False) -> None: PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) - # notify parent to allow individual frames to be copied instead of waiting until the end if render_job.parent: parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] try: @@ -70,57 +56,41 @@ class DistributedJobManager: except Exception as e: logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") - @classmethod - def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str): - """ - Responds to the 'status_change' pubsub message for local jobs. - If it's a child job, it notifies the parent job about the status change. - - Args: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. - - Note: Do not call directly. Instead, call via the 'status_change' pubsub message. - """ - + def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if not render_job: # ignore jobs created but not yet added to queue + if not render_job: return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") + self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) - cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) - - # Handle children if render_job.children: - if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary + if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR): for child in render_job.children: child_id, child_hostname = child.split('@') RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) - # UI Notifications try: if new_status == RenderStatus.COMPLETED: logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', - timeout=10 # Display time in seconds + timeout=10 ) elif new_status == RenderStatus.ERROR: logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', - timeout=10 # Display time in seconds + timeout=10 ) elif new_status == RenderStatus.RUNNING: logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', - timeout=10 # Display time in seconds + timeout=10 ) except Exception as e: logger.debug(f"Unable to show UI notification: {e}") @@ -129,30 +99,15 @@ class DistributedJobManager: # Create Job # -------------------------------------------- - @classmethod - def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path): - """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new - render job. - - Args: - new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) - loaded_project_local_path (Path): The local path to the loaded project. - - Returns: - worker: Created job worker - """ - - # get new output path in output_dir + def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path): output_path = new_job_attributes.get('output_path') output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem - # Prepare output path output_dir = loaded_project_local_path.parent.parent / "output" output_path = output_dir / output_filename os.makedirs(output_dir, exist_ok=True) logger.debug(f"New job output path: {output_path}") - # create & configure jobs worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'], input_path=loaded_project_local_path, output_path=output_path, @@ -160,16 +115,15 @@ class DistributedJobManager: args=new_job_attributes.get('args', {}), parent=new_job_attributes.get('parent'), name=new_job_attributes.get('name')) - worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? + worker.status = new_job_attributes.get("initial_status", worker.status) worker.priority = int(new_job_attributes.get('priority', worker.priority)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() - # determine if we can / should split the job if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: - cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) + self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED @@ -182,15 +136,7 @@ class DistributedJobManager: # Handling Subjobs # -------------------------------------------- - @classmethod - def handle_subjob_update_notification(cls, local_job, subjob_data: dict): - """Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. - - Args: - local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): Subjob data sent from the remote server. - """ - + def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None: subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = subjob_data['hostname'] @@ -206,19 +152,10 @@ class DistributedJobManager: if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' - @classmethod - def wait_for_subjobs(cls, parent_job): - """Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs - when they are completed. - - Args: - parent_job: Worker object that has child jobs - - Returns: - """ + def _wait_for_subjobs(self, parent_job) -> None: logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] + statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED) def subjobs_not_downloaded(): return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or @@ -230,21 +167,17 @@ class DistributedJobManager: sleep_counter = 0 while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: - if sleep_counter % server_delay == 0: # only ping servers every x seconds - for child_key, subjob_cached_data in subjobs_not_downloaded().items(): - + if sleep_counter % server_delay == 0: + for child_key in subjobs_not_downloaded(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] - # Fetch info from server and handle failing case subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from {subjob_hostname}") - # timeout / missing server situations parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' continue - # Update parent job cache but keep the download status download_status = parent_job.children[child_key].get('download_status', None) parent_job.children[child_key] = subjob_data parent_job.children[child_key]['download_status'] = download_status @@ -254,8 +187,7 @@ class DistributedJobManager: f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) - # Check if job is finished, but has not had files copied yet over yet - if download_status is None and subjob_data['file_count'] and status in statuses_to_download: + if download_status is None and subjob_data.get('file_count') and status in statuses_to_download: try: download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' @@ -263,7 +195,6 @@ class DistributedJobManager: logger.error(f"Error downloading missing frames from subjob: {e}") parent_job.children[child_key]['download_status'] = 'error: {}' - # Any finished jobs not successfully downloaded at this point are skipped if parent_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") @@ -274,42 +205,22 @@ class DistributedJobManager: f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(1) sleep_counter += 1 - else: # exit the loop + else: parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs # -------------------------------------------- - @classmethod - def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): - # todo: I don't love this + def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None: parent_worker.status = RenderStatus.CONFIGURING - cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, - project_path, system_os)) - cls.background_worker.start() + self.background_worker = threading.Thread(target=self.split_into_subjobs, args=( + parent_worker, new_job_attributes, project_path, system_os)) + self.background_worker.start() - @classmethod - def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): - """ - Splits a job into subjobs and distributes them among available servers. - - This method checks the availability of servers, distributes the work among them, and creates subjobs on each - server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a - subjob. - - Args: - parent_worker (Worker): The parent job what we're creating the subjobs for. - new_job_attributes (dict): Dict of desired attributes for new job (frame count, engine, output path, etc) - project_path (str): The path to the project. - system_os (str, optional): Required OS. Default is any. - specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. - """ - - # Check availability - available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.engine_name, - system_os) - # skip if theres no external servers found + def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None: + available_servers = specific_servers if specific_servers else self.find_available_servers( + parent_worker.engine_name, system_os) external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] if not external_servers: parent_worker.status = RenderStatus.NOT_STARTED @@ -318,34 +229,29 @@ class DistributedJobManager: logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) - # Prep and submit these sub-jobs logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] - post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, + post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") - # save child info submission_results = post_results.json()[0] child_key = f"{submission_results['id']}@{subjob_hostname}" parent_worker.children[child_key] = submission_results - # start subjobs logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") parent_worker.name = f"{parent_worker.name} (Parent)" - parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts + parent_worker.status = RenderStatus.NOT_STARTED except Exception as e: - # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod - def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker): - """Convenience method to create subjobs for a parent worker""" + def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker): subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" @@ -364,13 +270,6 @@ class DistributedJobManager: @staticmethod def find_available_servers(engine_name: str, system_os=None): - """ - Scan the Zeroconf network for currently available render servers supporting a specific engine. - - :param engine_name: str, The engine type to search for - :param system_os: str, Restrict results to servers running a specific OS - :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers - """ from api.api_server import API_VERSION found_available_servers = [] for hostname in ZeroconfServer.found_hostnames(): @@ -383,6 +282,34 @@ class DistributedJobManager: return found_available_servers + # --- Forwarders for backward compatibility --- + + @classmethod + def subscribe_to_listener(cls): + if cls._default_instance is not None: + cls._default_instance._subscribe_to_listener() + + @classmethod + def create_render_job(cls, new_job_attributes, loaded_project_local_path): + if cls._default_instance is not None: + return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path) + raise RuntimeError("DistributedJobManager is not initialized") + + @classmethod + def handle_subjob_update_notification(cls, local_job, subjob_data): + if cls._default_instance is not None: + cls._default_instance._handle_subjob_update_notification(local_job, subjob_data) + + @classmethod + def wait_for_subjobs(cls, parent_job): + if cls._default_instance is not None: + cls._default_instance._wait_for_subjobs(parent_job) + + @classmethod + def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): + if cls._default_instance is not None: + cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os) + if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') diff --git a/src/engines/blender/blender_engine.py b/src/engines/blender/blender_engine.py index 4cb0277..15d268e 100644 --- a/src/engines/blender/blender_engine.py +++ b/src/engines/blender/blender_engine.py @@ -173,7 +173,7 @@ class Blender(BaseRenderEngine): script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py') results = self.run_python_script(script_path=script_path) output = results.stdout.decode() - match = re.search(r"GPU DATA:(\[[\s\S]*\])", output) + match = re.search(r"GPU DATA:(\[.*?\])", output, re.DOTALL) if match: gpu_data_json = match.group(1) gpus_info = json.loads(gpu_data_json) @@ -193,5 +193,5 @@ class Blender(BaseRenderEngine): if __name__ == "__main__": - x = Blender().get_render_devices() + x = Blender().system_info() print(x) diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index b4ba308..cd1e6ba 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -20,128 +20,78 @@ class EngineManager: if possible. """ + _default_instance: Optional['EngineManager'] = None + engines_path: Optional[str] = None download_tasks: List[Any] = [] + def __init__(self) -> None: + self.engines_path: Optional[str] = None + self.download_tasks: List[Any] = [] + + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + cls.engines_path = cls._default_instance.engines_path + cls.download_tasks = cls._default_instance.download_tasks + @staticmethod def supported_engines() -> list[type[BaseRenderEngine]]: - """Return list of supported engine classes. - - Returns: - List[Type[BaseRenderEngine]]: List of available engine classes. - """ return ENGINE_CLASSES # --- Installed Engines --- - @classmethod - def engine_class_for_project_path(cls, path: str) -> Type[BaseRenderEngine]: - """Find engine class that can handle the given project file. - - Args: - path: Path to project file. - - Returns: - Type[BaseRenderEngine]: Engine class that can handle the file. - """ + def _engine_class_for_project_path(self, path: str) -> Type[BaseRenderEngine]: _, extension = os.path.splitext(path) extension = extension.lower().strip('.') - for engine_class in cls.supported_engines(): - engine = cls.get_latest_engine_instance(engine_class) + for engine_class in self.supported_engines(): + engine = self.get_latest_engine_instance(engine_class) if extension in engine.supported_extensions(): return engine_class - undefined_renderer_support = [x for x in cls.supported_engines() if not cls.get_latest_engine_instance(x).supported_extensions()] + undefined_renderer_support = [x for x in self.supported_engines() if not self.get_latest_engine_instance(x).supported_extensions()] return undefined_renderer_support[0] - @classmethod - def engine_class_with_name(cls, engine_name: str) -> Optional[Type[BaseRenderEngine]]: - """Find engine class by name. - - Args: - engine_name: Name of engine to find. - - Returns: - Optional[Type[BaseRenderEngine]]: Engine class if found, None otherwise. - """ - for obj in cls.supported_engines(): + def _engine_class_with_name(self, engine_name: str) -> Optional[Type[BaseRenderEngine]]: + for obj in self.supported_engines(): if obj.name().lower() == engine_name.lower(): return obj return None - @classmethod - def get_latest_engine_instance(cls, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine: - """Create instance of latest installed engine version. - - Args: - engine_class: Engine class to instantiate. - - Returns: - BaseRenderEngine: Instance of engine with latest version. - """ - newest = cls.newest_installed_engine_data(engine_class.name()) + def _get_latest_engine_instance(self, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine: + newest = self.newest_installed_engine_data(engine_class.name()) engine = engine_class(newest["path"]) return engine - @classmethod - def get_installed_engine_data(cls, filter_name: Optional[str] = None, include_corrupt: bool = False, + def _get_installed_engine_data(self, filter_name: Optional[str] = None, include_corrupt: bool = False, ignore_system: bool = False) -> List[Dict[str, Any]]: - """Get data about installed render engines. - - Args: - filter_name: Optional engine name to filter by. - include_corrupt: Whether to include potentially corrupted installations. - ignore_system: Whether to ignore system-installed engines. - - Returns: - List[Dict[str, Any]]: List of installed engine data. - - Raises: - FileNotFoundError: If engines path is not set. - """ - if not cls.engines_path: + if not self.engines_path: raise FileNotFoundError("Engine path is not set") - # Parse downloaded engine directory results = [] try: - all_items = os.listdir(cls.engines_path) - all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))] - keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary + all_items = os.listdir(self.engines_path) + all_directories = [item for item in all_items if os.path.isdir(os.path.join(self.engines_path, item))] + keys = ["engine", "version", "system_os", "cpu"] for directory in all_directories: - # Split directory name into segments segments = directory.split('-') - # Create a dictionary mapping keys to corresponding segments result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))} result_dict['type'] = 'managed' - # Initialize binary_name with engine name binary_name = result_dict['engine'].lower() - # Determine the correct binary name based on the engine and system_os - eng = cls.engine_class_with_name(result_dict['engine']) + eng = self.engine_class_with_name(result_dict['engine']) binary_name = eng.binary_names.get(result_dict['system_os'], binary_name) - # Find the path to the binary file - search_root = Path(cls.engines_path) / directory + search_root = self.engines_path / directory match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None) path = str(match) if match else None result_dict['path'] = path - # fetch version number from binary - helps detect corrupted downloads - disabled due to perf issues - # binary_version = eng(path).version() - # if not binary_version: - # logger.warning(f"Possible corrupt {eng.name()} {result_dict['version']} install detected: {path}") - # if not include_corrupt: - # continue - # result_dict['version'] = binary_version or 'error' - - # Add the result dictionary to results if it matches the filter_name or if no filter is applied if not filter_name or filter_name == result_dict['engine']: results.append(result_dict) except FileNotFoundError as e: logger.warning(f"Cannot find local engines download directory: {e}") - # add system installs to this list - use bg thread because it can be slow def fetch_engine_details(eng, include_corrupt=False): version = eng().version() if not version and not include_corrupt: @@ -160,7 +110,7 @@ class EngineManager: with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(fetch_engine_details, eng, include_corrupt): eng.name() - for eng in cls.supported_engines() + for eng in self.supported_engines() if eng.default_engine_path() and (not filter_name or filter_name == eng.name()) } @@ -173,96 +123,55 @@ class EngineManager: # --- Check for Updates --- - @classmethod - def update_all_engines(cls) -> None: - """Check for and download updates for all downloadable engines.""" - for engine in cls.downloadable_engines(): - update_available = cls.is_engine_update_available(engine) + def _update_all_engines(self) -> None: + for engine in self.downloadable_engines(): + update_available = self.is_engine_update_available(engine) if update_available: update_available['name'] = engine.name() - cls.download_engine(engine.name(), update_available['version'], background=True) + self.download_engine(engine.name(), update_available['version'], background=True) - @classmethod - def all_version_data_for_engine(cls, engine_name:str, include_corrupt=False, ignore_system=False) -> list: - """Get all version data for a specific engine. - - Args: - engine_name: Name of engine to query. - include_corrupt: Whether to include corrupt installations. - ignore_system: Whether to ignore system installations. - - Returns: - list: Sorted list of engine version data (newest first). - """ - versions = cls.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system) + def _all_version_data_for_engine(self, engine_name: str, include_corrupt=False, ignore_system=False) -> list: + versions = self.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system) sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True) return sorted_versions - @classmethod - def newest_installed_engine_data(cls, engine_name:str, system_os=None, cpu=None, ignore_system=None) -> list: - """Get newest installed engine data for specific platform. - - Args: - engine_name: Name of engine to query. - system_os: Operating system to filter by (defaults to current). - cpu: CPU architecture to filter by (defaults to current). - ignore_system: Whether to ignore system installations. - - Returns: - list: Newest engine data or empty list if not found. - """ + def _newest_installed_engine_data(self, engine_name: str, system_os=None, cpu=None, ignore_system=None) -> list: system_os = system_os or current_system_os() cpu = cpu or current_system_cpu() try: - filtered = [x for x in cls.all_version_data_for_engine(engine_name, ignore_system=ignore_system) + filtered = [x for x in self.all_version_data_for_engine(engine_name, ignore_system=ignore_system) if x['system_os'] == system_os and x['cpu'] == cpu] return filtered[0] except IndexError: logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}") return [] - @classmethod - def is_version_installed(cls, engine_name:str, version:str, system_os=None, cpu=None, ignore_system=False): - """Check if specific engine version is installed. - - Args: - engine_name: Name of engine to check. - version: Version string to check. - system_os: Operating system to check (defaults to current). - cpu: CPU architecture to check (defaults to current). - ignore_system: Whether to ignore system installations. - - Returns: - Engine data if found, False otherwise. - """ + def _is_version_installed(self, engine_name: str, version: str, system_os=None, cpu=None, ignore_system=False): system_os = system_os or current_system_os() cpu = cpu or current_system_cpu() - filtered = [x for x in cls.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if + filtered = [x for x in self.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version] return filtered[0] if filtered else False - @classmethod - def version_is_available_to_download(cls, engine_name:str, version, system_os=None, cpu=None): + def _version_is_available_to_download(self, engine_name: str, version, system_os=None, cpu=None): try: - downloader = cls.engine_class_with_name(engine_name).downloader() + downloader = self.engine_class_with_name(engine_name).downloader() return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu) except Exception as e: logger.debug(f"Exception in version_is_available_to_download: {e}") return None - @classmethod - def find_most_recent_version(cls, engine_name:str, system_os=None, cpu=None, lts_only=False) -> dict: + def _find_most_recent_version(self, engine_name: str, system_os=None, cpu=None, lts_only=False) -> dict: try: - downloader = cls.engine_class_with_name(engine_name).downloader() + downloader = self.engine_class_with_name(engine_name).downloader() return downloader.find_most_recent_version(system_os=system_os, cpu=cpu) except Exception as e: logger.debug(f"Exception in find_most_recent_version: {e}") return {} - @classmethod - def is_engine_update_available(cls, engine_class: Type[BaseRenderEngine], ignore_system_installs=False): + def _is_engine_update_available(self, engine_class: Type[BaseRenderEngine], ignore_system_installs=False): logger.debug(f"Checking for updates to {engine_class.name()}") latest_version = engine_class.downloader().find_most_recent_version() @@ -271,7 +180,7 @@ class EngineManager: return None version_num = latest_version.get('version') - if cls.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs): + if self.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs): logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded") return None @@ -279,18 +188,11 @@ class EngineManager: # --- Downloads --- - @classmethod - def downloadable_engines(cls): - """Get list of engines that support downloading. + def _downloadable_engines(self): + return [engine for engine in self.supported_engines() if hasattr(engine, "downloader") and engine.downloader()] - Returns: - List[Type[BaseRenderEngine]]: Engines with downloader capability. - """ - return [engine for engine in cls.supported_engines() if hasattr(engine, "downloader") and engine.downloader()] - - @classmethod - def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None): - for task in cls.download_tasks: + def _get_existing_download_task(self, engine_name, version, system_os=None, cpu=None): + for task in self.download_tasks: task_parts = task.name.split('-') task_engine, task_version, task_system_os, task_cpu = task_parts[:4] @@ -299,50 +201,45 @@ class EngineManager: return task return None - @classmethod - def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): - - engine_to_download = cls.engine_class_with_name(engine_name) - existing_task = cls.get_existing_download_task(engine_name, version, system_os, cpu) + def _download_engine(self, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): + engine_to_download = self.engine_class_with_name(engine_name) + existing_task = self.get_existing_download_task(engine_name, version, system_os, cpu) if existing_task: logger.debug(f"Already downloading {engine_name} {version}") if not background: - existing_task.join() # If download task exists, wait until it's done downloading + existing_task.join() return None elif not engine_to_download.downloader(): logger.warning("No valid downloader for this engine. Please update this software manually.") return None - elif not cls.engines_path: + elif not self.engines_path: raise FileNotFoundError("Engines path must be set before requesting downloads") thread = EngineDownloadWorker(engine_name, version, system_os, cpu) - cls.download_tasks.append(thread) + self.download_tasks.append(thread) thread.start() if background: return thread thread.join() - found_engine = cls.is_version_installed(engine_name, version, system_os, cpu, ignore_system) # Check that engine downloaded + found_engine = self.is_version_installed(engine_name, version, system_os, cpu, ignore_system) if not found_engine: logger.error(f"Error downloading {engine_name}") return found_engine - @classmethod - def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None): + def _delete_engine_download(self, engine_name, version, system_os=None, cpu=None): logger.info(f"Requested deletion of engine: {engine_name}-{version}") - found = cls.is_version_installed(engine_name, version, system_os, cpu) - if found and found['type'] == 'managed': # don't delete system installs - # find the root directory of the engine executable + found = self.is_version_installed(engine_name, version, system_os, cpu) + if found and found['type'] == 'managed': root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']]) remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name) - # delete the file path logger.info(f"Deleting engine at path: {remove_path}") shutil.rmtree(remove_path, ignore_errors=False) logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted") return True - elif found: # these are managed by the system / user. Don't delete these. + elif found: logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.') else: logger.error(f"Cannot find engine: {engine_name}-{version}") @@ -350,52 +247,16 @@ class EngineManager: # --- Background Tasks --- - @classmethod - def active_downloads(cls) -> list: - """Get list of currently active download tasks. + def _active_downloads(self) -> list: + return [x for x in self.download_tasks if x.is_alive()] - Returns: - list: List of active EngineDownloadWorker threads. - """ - return [x for x in cls.download_tasks if x.is_alive()] + def _create_worker(self, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None): + worker_class = self.engine_class_with_name(engine_name).worker_class() - @classmethod - def create_worker(cls, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None): - """ - Create and return a worker instance for a specific engine. - - This resolves the appropriate engine binary/path for the requested engine and version, - downloading the engine if necessary (when a specific version is requested and not found - locally). The returned worker is constructed with string paths for compatibility with - worker implementations that expect `str` rather than `Path`. - - Args: - engine_name: The engine name used to resolve an engine class and its worker. - input_path: Path to the input file/folder for the worker to process. - output_path: Path where the worker should write output. - engine_version: Optional engine version to use. If `None` or `'latest'`, the newest - installed version is used. If a specific version is provided and not installed, - the engine will be downloaded. - args: Optional arguments passed through to the worker (engine-specific). - parent: Optional Qt/GUI parent object passed through to the worker constructor. - name: Optional name/label passed through to the worker constructor. - - Returns: - An instance of the engine-specific worker class. - - Raises: - FileNotFoundError: If no versions of the engine are installed, if the requested - version cannot be found or downloaded, or if the engine path cannot be resolved. - """ - - worker_class = cls.engine_class_with_name(engine_name).worker_class() - - # check to make sure we have versions installed - all_versions = cls.all_version_data_for_engine(engine_name) + all_versions = self.all_version_data_for_engine(engine_name) if not all_versions: raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines") - # Find the path to the requested engine version or use default engine_path = None if engine_version and engine_version != 'latest': for ver in all_versions: @@ -403,9 +264,8 @@ class EngineManager: engine_path = ver['path'] break - # Download the required engine if not found locally if not engine_path: - download_result = cls.download_engine(engine_name, engine_version) + download_result = self.download_engine(engine_name, engine_version) if not download_result: raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}") engine_path = download_result['path'] @@ -420,28 +280,109 @@ class EngineManager: return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args, parent=parent, name=name) + # --- Forwarders for backward compatibility --- + + @classmethod + def engine_class_for_project_path(cls, path): + if cls._default_instance is not None: + return cls._default_instance._engine_class_for_project_path(path) + + @classmethod + def engine_class_with_name(cls, engine_name): + if cls._default_instance is not None: + return cls._default_instance._engine_class_with_name(engine_name) + + @classmethod + def get_latest_engine_instance(cls, engine_class): + if cls._default_instance is not None: + return cls._default_instance._get_latest_engine_instance(engine_class) + + @classmethod + def get_installed_engine_data(cls, filter_name=None, include_corrupt=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._get_installed_engine_data(filter_name, include_corrupt, ignore_system) + return [] + + @classmethod + def update_all_engines(cls): + if cls._default_instance is not None: + cls._default_instance._update_all_engines() + + @classmethod + def all_version_data_for_engine(cls, engine_name, include_corrupt=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._all_version_data_for_engine(engine_name, include_corrupt, ignore_system) + return [] + + @classmethod + def newest_installed_engine_data(cls, engine_name, system_os=None, cpu=None, ignore_system=None): + if cls._default_instance is not None: + return cls._default_instance._newest_installed_engine_data(engine_name, system_os, cpu, ignore_system) + return [] + + @classmethod + def is_version_installed(cls, engine_name, version, system_os=None, cpu=None, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._is_version_installed(engine_name, version, system_os, cpu, ignore_system) + return False + + @classmethod + def version_is_available_to_download(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._version_is_available_to_download(engine_name, version, system_os, cpu) + return None + + @classmethod + def find_most_recent_version(cls, engine_name, system_os=None, cpu=None, lts_only=False): + if cls._default_instance is not None: + return cls._default_instance._find_most_recent_version(engine_name, system_os, cpu, lts_only) + return {} + + @classmethod + def is_engine_update_available(cls, engine_class, ignore_system_installs=False): + if cls._default_instance is not None: + return cls._default_instance._is_engine_update_available(engine_class, ignore_system_installs) + return None + + @classmethod + def downloadable_engines(cls): + if cls._default_instance is not None: + return cls._default_instance._downloadable_engines() + return [] + + @classmethod + def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._get_existing_download_task(engine_name, version, system_os, cpu) + return None + + @classmethod + def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._download_engine(engine_name, version, system_os, cpu, background, ignore_system) + return None + + @classmethod + def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._delete_engine_download(engine_name, version, system_os, cpu) + return False + + @classmethod + def active_downloads(cls): + if cls._default_instance is not None: + return cls._default_instance._active_downloads() + return [] + + @classmethod + def create_worker(cls, engine_name, input_path, output_path, engine_version=None, args=None, parent=None, name=None): + if cls._default_instance is not None: + return cls._default_instance._create_worker(engine_name, input_path, output_path, engine_version, args, parent, name) + raise RuntimeError("EngineManager is not initialized") + class EngineDownloadWorker(threading.Thread): - """A thread worker for downloading a specific version of a rendering engine. - - This class handles the process of downloading a rendering engine in a separate thread, - ensuring that the download process does not block the main application. - - Attributes: - engine (str): The name of the rendering engine to download. - version (str): The version of the rendering engine to download. - system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type. - cpu (str, optional): Requested CPU architecture. Defaults to system CPU type. - """ def __init__(self, engine, version, system_os=None, cpu=None): - """Initialize download worker for specific engine version. - - Args: - engine: Name of engine to download. - version: Version of engine to download. - system_os: Target operating system (defaults to current). - cpu: Target CPU architecture (defaults to current). - """ super().__init__() self.engine = engine self.version = version @@ -450,35 +391,27 @@ class EngineDownloadWorker(threading.Thread): self.percent_complete = 0 def _update_progress(self, current_progress): - """Update download progress. - - Args: - current_progress: Current download progress percentage (0-100). - """ self.percent_complete = current_progress -def run(self): - """Execute the download process. + def run(self): + try: + existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, + ignore_system=True) + if existing_download: + logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") + return existing_download - Checks if engine version already exists, then downloads if not found. - Handles cleanup and error reporting. - """ - try: - existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, - ignore_system=True) - if existing_download: - logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") - return existing_download - - # Get the appropriate downloader class based on the engine type - downloader = EngineManager.engine_class_with_name(self.engine).downloader() - downloader.download_engine( self.version, download_location=EngineManager.engines_path, - system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) - except Exception as e: - logger.error(f"Error in download worker: {e}") - finally: - # remove itself from the downloader list - EngineManager.download_tasks.remove(self) + downloader = EngineManager.engine_class_with_name(self.engine).downloader() + downloader.download_engine(self.version, download_location=EngineManager.engines_path, + system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) + except Exception as e: + logger.error(f"Error in download worker: {e}") + finally: + try: + if EngineManager._default_instance is not None: + EngineManager._default_instance.download_tasks.remove(self) + except ValueError: + pass if __name__ == '__main__': diff --git a/src/render_queue.py b/src/render_queue.py index b94612f..beb41d5 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -1,12 +1,13 @@ import logging +import threading from collections import Counter from datetime import datetime from pathlib import Path -from typing import List, Dict, Any, Optional +from typing import Any, Dict, List, Optional from pubsub import pub from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm.exc import DetachedInstanceError from src.engines.core.base_worker import Base, BaseRenderWorker @@ -25,182 +26,274 @@ class JobNotFoundError(Exception): class RenderQueue: - engine: Optional[create_engine] = None - session: Optional[sessionmaker] = None - job_queue: List[BaseRenderWorker] = [] - maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} - last_saved_counts: Dict[str, int] = {} - is_running: bool = False + _default_instance: Optional['RenderQueue'] = None + + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + pass # no class-level attributes to sync + + def __init__(self) -> None: + self.engine: Optional[create_engine] = None + self.session: Optional[Session] = None + self.job_queue: List[BaseRenderWorker] = [] + self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} + self.last_saved_counts: Dict[str, int] = {} + self.is_running: bool = False + self._lock = threading.Lock() # -------------------------------------------- # Render Queue Evaluation: # -------------------------------------------- - @classmethod - def start(cls): - """Start evaluating the render queue""" + def _start(self) -> None: logger.debug("Starting render queue updates") - cls.is_running = True - cls.evaluate_queue() + self.is_running = True + self.evaluate_queue() - @classmethod - def evaluate_queue(cls): + def _evaluate_queue(self) -> None: try: - not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) + not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) for job in not_started: - if cls.is_available_for_job(job.engine_name, job.priority): - cls.start_job(job) + if self.is_available_for_job(job.engine_name, job.priority): + self.start_job(job) - scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) + scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): logger.debug(f"Starting scheduled job: {job}") - cls.start_job(job) + self.start_job(job) - if cls.last_saved_counts != cls.job_counts(): - cls.save_state() + if self.last_saved_counts != self.job_counts(): + self.save_state() except DetachedInstanceError: pass - @classmethod - def __local_job_status_changed(cls, job_id, old_status, new_status): - render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet + def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: + render_job = self.job_with_id(job_id, none_ok=True) + if render_job and self.is_running: logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}") - RenderQueue.evaluate_queue() + self.evaluate_queue() - @classmethod - def stop(cls): + def _stop(self) -> None: logger.debug("Stopping render queue updates") - cls.is_running = False + self.is_running = False # -------------------------------------------- # Fetch Jobs: # -------------------------------------------- - @classmethod - def all_jobs(cls): - return cls.job_queue + def _all_jobs(self) -> List[BaseRenderWorker]: + return self.job_queue - @classmethod - def running_jobs(cls): - return cls.jobs_with_status(RenderStatus.RUNNING) + def _running_jobs(self) -> List[BaseRenderWorker]: + return self.jobs_with_status(RenderStatus.RUNNING) - @classmethod - def pending_jobs(cls): - pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) - pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) - return pending_jobs + def _pending_jobs(self) -> List[BaseRenderWorker]: + pending = self.jobs_with_status(RenderStatus.NOT_STARTED) + pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED)) + return pending - @classmethod - def jobs_with_status(cls, status, priority_sorted=False): - found_jobs = [x for x in cls.all_jobs() if x.status == status] + def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]: + found_jobs = [x for x in self.all_jobs() if x.status == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs - @classmethod - def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) + def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]: + found_job = next((x for x in self.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job - @classmethod - def job_counts(cls): - job_counts = {} - for job_status in RenderStatus: - job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts + def _job_counts(self) -> Dict[str, int]: + counts = Counter(x.status for x in self.all_jobs()) + return {s.value: counts.get(s, 0) for s in RenderStatus} # -------------------------------------------- # Startup / Shutdown: # -------------------------------------------- - @classmethod - def load_state(cls, database_directory: Path): - if not cls.engine: - cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") - Base.metadata.create_all(cls.engine) - cls.session = sessionmaker(bind=cls.engine)() - cls.job_queue = cls.session.query(BaseRenderWorker).all() - pub.subscribe(cls.__local_job_status_changed, 'status_change') + def _load_state(self, database_directory: Path) -> None: + self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") + Base.metadata.create_all(self.engine) + self.session = sessionmaker(bind=self.engine)() + from src.engines.core.base_worker import BaseRenderWorker + self.job_queue = self.session.query(BaseRenderWorker).all() + pub.subscribe(self._local_job_status_changed, 'status_change') - @classmethod - def save_state(cls): - cls.session.commit() + def _save_state(self) -> None: + if self.session: + self.session.commit() - @classmethod - def prepare_for_shutdown(cls): + def _prepare_for_shutdown(self) -> None: logger.debug("Closing session") - cls.stop() - running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs - _ = [cls.cancel_job(job) for job in running_jobs] - cls.save_state() - cls.session.close() + self.stop() + running_jobs = self.jobs_with_status(RenderStatus.RUNNING) + for job in running_jobs: + self.cancel_job(job) + self.save_state() + if self.session: + self.session.close() # -------------------------------------------- # Renderer Availability: # -------------------------------------------- - @classmethod - def renderer_instances(cls): - all_instances = [x.engine_name for x in cls.running_jobs()] + def renderer_instances(self) -> Counter: + all_instances = [x.engine_name for x in self.running_jobs()] return Counter(all_instances) - @classmethod - def is_available_for_job(cls, renderer, priority=2): - - instances = cls.renderer_instances() - higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] - max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1) - maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances + def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool: + instances = self.renderer_instances() + higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority] + max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1) + maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs # -------------------------------------------- # Job Lifecycle Management: # -------------------------------------------- - @classmethod - def add_to_render_queue(cls, render_job, force_start=False): + def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None: logger.info(f"Adding job to render queue: {render_job}") - cls.job_queue.append(render_job) - if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): - cls.start_job(render_job) - cls.session.add(render_job) - cls.save_state() - if cls.is_running: - cls.evaluate_queue() + with self._lock: + self.job_queue.append(render_job) + if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): + self.start_job(render_job) + self.session.add(render_job) + self.save_state() + if self.is_running: + self.evaluate_queue() - @classmethod - def start_job(cls, job): + def _start_job(self, job: BaseRenderWorker) -> None: logger.info(f'Starting job: {job}') job.start() - cls.save_state() + self.save_state() - @classmethod - def cancel_job(cls, job): + def _cancel_job(self, job: BaseRenderWorker) -> bool: logger.info(f'Cancelling job: {job}') job.stop() return job.status == RenderStatus.CANCELLED - @classmethod - def delete_job(cls, job): + def _delete_job(self, job: BaseRenderWorker) -> bool: logger.info(f"Deleting job: {job}") - job.stop() - cls.job_queue.remove(job) - cls.session.delete(job) - cls.save_state() + with self._lock: + job.stop() + self.job_queue.remove(job) + self.session.delete(job) + self.save_state() return True # -------------------------------------------- # Miscellaneous: # -------------------------------------------- + def _clear_history(self) -> None: + for job in list(self.all_jobs()): + if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR): + self.delete_job(job) + self.save_state() + + # --- Forwarders for backward compatibility --- + + @classmethod + def start(cls): + if cls._default_instance is not None: + cls._default_instance._start() + + @classmethod + def evaluate_queue(cls): + if cls._default_instance is not None: + cls._default_instance._evaluate_queue() + + @classmethod + def stop(cls): + if cls._default_instance is not None: + cls._default_instance._stop() + + @classmethod + def all_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance.job_queue + return [] + + @classmethod + def running_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance._running_jobs() + return [] + + @classmethod + def pending_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance._pending_jobs() + return [] + + @classmethod + def jobs_with_status(cls, status, priority_sorted=False): + if cls._default_instance is not None: + return cls._default_instance._jobs_with_status(status, priority_sorted) + return [] + + @classmethod + def job_with_id(cls, job_id, none_ok=False): + if cls._default_instance is not None: + return cls._default_instance._job_with_id(job_id, none_ok) + if not none_ok: + raise JobNotFoundError(job_id) + return None + + @classmethod + def job_counts(cls): + if cls._default_instance is not None: + return cls._default_instance._job_counts() + return {} + + @classmethod + def load_state(cls, database_directory): + if cls._default_instance is not None: + cls._default_instance._load_state(database_directory) + + @classmethod + def save_state(cls): + if cls._default_instance is not None: + cls._default_instance._save_state() + + @classmethod + def prepare_for_shutdown(cls): + if cls._default_instance is not None: + cls._default_instance._prepare_for_shutdown() + + @classmethod + def is_available_for_job(cls, renderer, priority=2): + if cls._default_instance is not None: + return cls._default_instance._is_available_for_job(renderer, priority) + return True + + @classmethod + def add_to_render_queue(cls, render_job, force_start=False): + if cls._default_instance is not None: + cls._default_instance._add_to_render_queue(render_job, force_start) + + @classmethod + def start_job(cls, job): + if cls._default_instance is not None: + cls._default_instance._start_job(job) + + @classmethod + def cancel_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._cancel_job(job) + return False + + @classmethod + def delete_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._delete_job(job) + return False + @classmethod def clear_history(cls): - to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] - for job_to_remove in to_remove: - cls.delete_job(job_to_remove) - cls.save_state() + if cls._default_instance is not None: + cls._default_instance._clear_history() diff --git a/src/ui/add_job_window.py b/src/ui/add_job_window.py index ae9da14..1e73688 100644 --- a/src/ui/add_job_window.py +++ b/src/ui/add_job_window.py @@ -1,8 +1,11 @@ +import logging import socket from pathlib import Path import psutil from PyQt6.QtCore import QThread, pyqtSignal, Qt, pyqtSlot + +logger = logging.getLogger(__name__) from PyQt6.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QSpinBox, QComboBox, QGroupBox, QCheckBox, QProgressBar, QPlainTextEdit, QDoubleSpinBox, QMessageBox, QListWidget, QListWidgetItem, @@ -306,7 +309,7 @@ class NewRenderJobForm(QWidget): def update_job_count(self, changed_item=None): checked = 1 - if self.cameras_group.enabled: + if self.cameras_group.isEnabled(): checked = 0 total = self.cameras_list.count() @@ -463,8 +466,8 @@ class NewRenderJobForm(QWidget): text_box = QLineEdit() h_layout.addWidget(text_box) self.engine_options_layout.addLayout(h_layout) - except AttributeError: - pass + except AttributeError as e: + logger.error(f"AttributeError in post_get_project_info_update: {e}") def toggle_engine_enablement(self, enabled=False): """Toggle on/off all the render settings""" diff --git a/src/ui/main_window.py b/src/ui/main_window.py index a4029c3..daa3871 100644 --- a/src/ui/main_window.py +++ b/src/ui/main_window.py @@ -247,9 +247,8 @@ class MainWindow(QMainWindow): # Update server information display self.update_server_info_display(new_hostname) - except AttributeError: - # Handle cases where the server list view might not be properly initialized - pass + except AttributeError as e: + logger.error(f"AttributeError in server_picked: {e}") def update_server_info_display(self, hostname): """Updates the server information section of the UI.""" @@ -405,7 +404,8 @@ class MainWindow(QMainWindow): id_item = self.job_list_view.item(selected_row.row(), 0) job_ids.append(id_item.text()) return job_ids - except AttributeError: + except AttributeError as e: + logger.error(f"AttributeError in selected_job_ids: {e}") return [] diff --git a/src/utilities/config.py b/src/utilities/config.py index ffb4ab6..c152f3a 100644 --- a/src/utilities/config.py +++ b/src/utilities/config.py @@ -1,12 +1,24 @@ import os from pathlib import Path +from typing import Optional import yaml from src.utilities.misc_helper import current_system_os, copy_directory_contents +_CONFIG_ATTRS = [ + 'upload_folder', 'update_engines_on_launch', 'max_content_path', + 'server_log_level', 'log_buffer_length', 'worker_process_timeout', + 'flask_log_level', 'flask_debug_enable', 'queue_eval_seconds', + 'port_number', 'enable_split_jobs', 'download_timeout_seconds', +] + class Config: - # Initialize class variables with default values + _default_instance: Optional['Config'] = None + + # Class-level defaults — mutated by _sync_class() so existing + # callers (Config.upload_folder) continue to work during the + # migration to instance-based access. upload_folder = "~/zordon-uploads/" update_engines_on_launch = True max_content_path = 100000000 @@ -20,23 +32,30 @@ class Config: enable_split_jobs = True download_timeout_seconds = 120 - @classmethod - def load_config(cls, config_path): + def __init__(self) -> None: + for attr in _CONFIG_ATTRS: + setattr(self, attr, getattr(Config, attr)) + + def load(self, config_path: Path) -> None: with open(config_path, 'r') as ymlfile: cfg = yaml.safe_load(ymlfile) + for attr in _CONFIG_ATTRS: + if attr in cfg: + setattr(self, attr, cfg[attr]) + self.upload_folder = str(Path(self.upload_folder).expanduser()) - cls.upload_folder = str(Path(cfg.get('upload_folder', cls.upload_folder)).expanduser()) - cls.update_engines_on_launch = cfg.get('update_engines_on_launch', cls.update_engines_on_launch) - cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) - cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) - cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) - cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout) - cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) - cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) - cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds) - cls.port_number = cfg.get('port_number', cls.port_number) - cls.enable_split_jobs = cfg.get('enable_split_jobs', cls.enable_split_jobs) - cls.download_timeout_seconds = cfg.get('download_timeout_seconds', cls.download_timeout_seconds) + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + for attr in _CONFIG_ATTRS: + setattr(cls, attr, getattr(cls._default_instance, attr)) + + @classmethod + def load_config(cls, config_path: Path) -> None: + instance = Config() + instance.load(config_path) + cls._default_instance = instance + cls._sync_class() @classmethod def config_dir(cls) -> Path: diff --git a/src/utilities/zeroconf_server.py b/src/utilities/zeroconf_server.py index e1d526e..fe6d659 100644 --- a/src/utilities/zeroconf_server.py +++ b/src/utilities/zeroconf_server.py @@ -1,5 +1,6 @@ import logging import socket +from typing import Dict, List, Optional from pubsub import pub from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \ @@ -9,105 +10,144 @@ logger = logging.getLogger() class ZeroconfServer: - service_type = None - server_name = None - server_port = None - server_ip = None - zeroconf = Zeroconf() - service_info = None - client_cache = {} - properties = {} + _default_instance: Optional['ZeroconfServer'] = None - @classmethod - def configure(cls, service_type, server_name, server_port): - cls.service_type = service_type - cls.server_name = server_name - cls.server_port = server_port - try: # Stop any previously running instances + service_type: Optional[str] = None + server_name: Optional[str] = None + server_port: Optional[int] = None + properties: Dict = {} + + def __init__(self) -> None: + self.service_type: Optional[str] = None + self.server_name: Optional[str] = None + self.server_port: Optional[int] = None + self.server_ip: Optional[str] = None + self.zeroconf: Zeroconf = Zeroconf() + self.service_info: Optional[ServiceInfo] = None + self.client_cache: Dict = {} + self.properties: Dict = {} + + def _configure(self, service_type: str, server_name: str, server_port: int) -> None: + self.service_type = service_type + self.server_name = server_name + self.server_port = server_port + try: socket.gethostbyname(socket.gethostname()) except socket.gaierror: - cls.stop() + self.stop() - @classmethod - def start(cls, listen_only=False): - if not cls.service_type: + def _start(self, listen_only: bool = False) -> None: + if not self.service_type: raise RuntimeError("The 'configure' method must be run before starting the zeroconf server") - elif not listen_only: - logger.debug(f"Starting zeroconf service") - cls._register_service() + if not listen_only: + logger.debug("Starting zeroconf service") + self._register_service() else: - logger.debug(f"Starting zeroconf service - Listen only mode") - cls._browse_services() + logger.debug("Starting zeroconf service - Listen only mode") + self._browse_services() - @classmethod - def stop(cls): + def _stop(self) -> None: logger.debug("Stopping zeroconf service") - cls._unregister_service() - cls.zeroconf.close() + self._unregister_service() + if self.zeroconf: + self.zeroconf.close() - @classmethod - def _register_service(cls): + def _register_service(self) -> None: try: - cls.server_ip = socket.gethostbyname(socket.gethostname()) + self.server_ip = socket.gethostbyname(socket.gethostname()) info = ServiceInfo( - cls.service_type, - f"{cls.server_name}.{cls.service_type}", - addresses=[socket.inet_aton(cls.server_ip)], - port=cls.server_port, - properties=cls.properties, + self.service_type, + f"{self.server_name}.{self.service_type}", + addresses=[socket.inet_aton(self.server_ip)], + port=self.server_port, + properties=self.properties, ) - cls.service_info = info - cls.zeroconf.register_service(info) - logger.info(f"Registered zeroconf service: {cls.service_info.name}") + self.service_info = info + self.zeroconf.register_service(info) + logger.info(f"Registered zeroconf service: {self.service_info.name}") except (NonUniqueNameException, socket.gaierror) as e: logger.error(f"Error establishing zeroconf: {e}") - @classmethod - def _unregister_service(cls): - if cls.service_info: - cls.zeroconf.unregister_service(cls.service_info) - logger.info(f"Unregistered zeroconf service: {cls.service_info.name}") - cls.service_info = None + def _unregister_service(self) -> None: + if self.service_info: + self.zeroconf.unregister_service(self.service_info) + logger.info(f"Unregistered zeroconf service: {self.service_info.name}") + self.service_info = None - @classmethod - def _browse_services(cls): - browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered]) - browser.is_alive() + def _browse_services(self) -> None: + ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered]) - @classmethod - def _on_service_discovered(cls, zeroconf, service_type, name, state_change): + def _on_service_discovered(self, zeroconf, service_type, name, state_change) -> None: try: info = zeroconf.get_service_info(service_type, name) - hostname = name.split(f'.{cls.service_type}')[0] + hostname = name.split(f'.{self.service_type}')[0] logger.debug(f"Zeroconf: {hostname} {state_change}") - if service_type == cls.service_type: - if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated: - cls.client_cache[hostname] = info + if service_type == self.service_type: + if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated): + self.client_cache[hostname] = info else: - cls.client_cache.pop(hostname) + self.client_cache.pop(hostname, None) pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change) except NotRunningException: pass @classmethod - def found_hostnames(cls): + def _sync_class(cls) -> None: + if cls._default_instance is not None: + inst = cls._default_instance + cls.service_type = inst.service_type + cls.server_name = inst.server_name + cls.server_port = inst.server_port + cls.server_ip = inst.server_ip + cls.properties = inst.properties + + def _found_hostnames(self) -> List[str]: local_hostname = socket.gethostname() def sort_key(hostname): - # Return 0 if it's the local hostname so it comes first, else return 1 return False if hostname == local_hostname else True - # Sort the list with the local hostname first - sorted_hostnames = sorted(cls.client_cache.keys(), key=sort_key) + sorted_hostnames = sorted(self.client_cache.keys(), key=sort_key) return sorted_hostnames + def _get_hostname_properties(self, hostname: str) -> Dict: + server_info = self.client_cache.get(hostname) + if server_info is None: + return {} + decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.properties.items()} + return decoded_server_info + + # --- Forwarders for backward compatibility --- + + @classmethod + def configure(cls, service_type, server_name, server_port): + if cls._default_instance is not None: + cls._default_instance._configure(service_type, server_name, server_port) + cls._sync_class() + + @classmethod + def start(cls, listen_only=False): + if cls._default_instance is not None: + cls._default_instance._start(listen_only) + + @classmethod + def stop(cls): + if cls._default_instance is not None: + cls._default_instance._stop() + + @classmethod + def found_hostnames(cls): + if cls._default_instance is not None: + return cls._default_instance._found_hostnames() + return [] + @classmethod def get_hostname_properties(cls, hostname): - server_info = cls.client_cache.get(hostname).properties - decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.items()} - return decoded_server_info + if cls._default_instance is not None: + return cls._default_instance._get_hostname_properties(hostname) + return {} # Example usage: