From e8992fc91ab42aace32e3bea442f2ee0daedcd6b Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 20 Jan 2026 23:13:38 -0600 Subject: [PATCH 1/2] Add missing docstrings and pylint improvements (#130) --- README.md | 195 ++++++++++++++++++++++++++++---- server.py | 9 +- src/api/api_server.py | 23 +--- src/api/job_import_handler.py | 85 ++++++++++++-- src/api/preview_manager.py | 37 +++++- src/engines/core/base_engine.py | 1 + src/engines/engine_manager.py | 40 +++---- src/render_queue.py | 5 +- 8 files changed, 312 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index 5914293..11a7eed 100644 --- a/README.md +++ b/README.md @@ -4,44 +4,193 @@ # Zordon -A lightweight, zero-install, distributed rendering and management tool designed to streamline and optimize rendering workflows across multiple machines +A Python-based distributed rendering management tool that supports Blender, FFmpeg, and other render engines. Zordon efficiently manages render jobs across multiple machines, making it ideal for small render farms in home studios or small businesses. ## What is Zordon? -Zordon is tool designed for small render farms, such as those used in home studios or small businesses, to efficiently manage and run render jobs for Blender, FFMPEG, and other video renderers. It simplifies the process of distributing rendering tasks across multiple available machines, optimizing the rendering workflow for artists, animators, and video professionals. +Zordon is a tool designed for small render farms, such as those used in home studios or small businesses, to efficiently manage and run render jobs for Blender, FFmpeg, and other video renderers. It simplifies the process of distributing rendering tasks across multiple available machines, optimizing the rendering workflow for artists, animators, and video professionals. + +The system works by: +- **Server**: Central coordinator that manages job queues and distributes tasks to available workers +- **Clients**: Lightweight workers that run on rendering machines and execute assigned jobs +- **API**: RESTful endpoints for programmatic job submission and monitoring + +## Features + +- **Distributed Rendering**: Queue and distribute render jobs across multiple machines +- **Multi-Engine Support**: Compatible with Blender, FFmpeg, and extensible to other render engines +- **Desktop UI**: PyQt6 interface for job management and monitoring +- **REST API**: Flask-based API for programmatic access +- **Cross-Platform**: Runs on Windows, macOS, and Linux +- **Zero-Install Clients**: Lightweight client executables for worker machines + +## Installation + +### Prerequisites + +- Python 3.11 or later +- Git + +### Setup + +1. Clone the repository: + ```bash + git clone https://github.com/blw1138/Zordon.git + cd Zordon + ``` + +2. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +3. (Optional) Install PyInstaller for building executables: + ```bash + pip install pyinstaller pyinstaller_versionfile + ``` + +## Usage + +### Quick Start + +1. **Start the Server**: Run the central server to coordinate jobs. + ```bash + python server.py + ``` + +2. **Launch Clients**: On each rendering machine, run the client to connect to the server. + ```bash + python client.py + ``` -Notice: This should be considered a beta and is meant for casual / hobbiest use. Do not use in mission critical environments! -## Supported Renderers +### Detailed Workflow -Zordon supports or plans to support the following renderers: +#### Setting Up a Render Farm + +1. Choose one machine as the server (preferably a dedicated machine with good network connectivity). +2. Build and distribute client executables to worker machines: + ```bash + pyinstaller client.spec + ``` + Copy the generated executable to each worker machine. + +3. Ensure all machines can communicate via network (same subnet recommended). + +#### Submitting Render Jobs + +Jobs can be submitted via the desktop UI or programmatically via the API: + +- **Via UI**: Use the desktop interface to upload project files, specify render settings, and queue jobs. +- **Via API**: Send POST requests to `/api/jobs` with job configuration in JSON format. + +Example API request: +```bash +curl -X POST http://localhost:5000/api/jobs \ + -H "Content-Type: application/json" \ + -d '{ + "engine": "blender", + "project_path": "/path/to/project.blend", + "output_path": "/path/to/output", + "frames": "1-100", + "settings": {"resolution": "1920x1080"} + }' +``` + +#### Monitoring and Managing Jobs + +- **UI**: View job status, progress, logs, and worker availability in real-time. +- **API Endpoints**: + - `GET /api/jobs`: List all jobs + - `GET /api/jobs/{id}`: Get job details + - `DELETE /api/jobs/{id}`: Cancel a job + - `GET /api/workers`: List connected workers + +#### Worker Management + +Workers automatically connect to the server when started. You can: +- View worker status and capabilities in the dashboard +- Configure worker priorities and resource limits +- Monitor CPU/GPU usage per worker + +### Development Mode + +For development and testing: + +Run the server: +```bash +python server.py +``` + +Run a client (can run multiple for testing): +```bash +python client.py +``` + + +### Building Executables + +Build server executable: +```bash +pyinstaller server.spec +``` + +Build client executable: +```bash +pyinstaller client.spec +``` + +## Configuration + +Settings are stored in `src/utilities/config.py`. Supports YAML/JSON for data serialization and environment-specific configurations. + +## Architecture + +Zordon follows a modular architecture with the following key components: + +- **API Server** (`src/api/`): Flask-based REST API +- **Engine System** (`src/engines/`): Pluggable render engines (Blender, FFmpeg, etc.) +- **UI** (`src/ui/`): PyQt6-based interface +- **Job Management** (`src/render_queue.py`): Distributed job queue + +Design patterns include Factory Pattern for engine creation, Observer Pattern for status updates, and Strategy Pattern for different worker implementations. + +## Contributing + +1. Fork the repository +2. Create a feature branch: `git checkout -b feature/your-feature` +3. Follow the code style guidelines in `AGENTS.md` +4. Test the build: `pyinstaller server.spec` +5. Submit a pull request + +### Commit Message Format + +``` +feat: add support for new render engine +fix: resolve crash when engine path is invalid +docs: update API documentation +refactor: simplify job status handling +``` + +## Supported Render Engines - **Blender** -- **FFMPEG** -- **Adobe After Effects** ([coming soon](https://github.com/blw1138/Zordon/issues/84)) -- **Cinema 4D** ([planned](https://github.com/blw1138/Zordon/issues/105)) -- **Autodesk Maya** ([planned](https://github.com/blw1138/Zordon/issues/106)) +- **FFmpeg** +- **Adobe After Effects** (planned) +- **Cinema 4D** (planned) +- **Autodesk Maya** (planned) ## System Requirements - Windows 10 or later - macOS Ventura (13.0) or later -- Linux (Supported versions TBD) - -## Build using Pyinstaller - -Zordon is regularly tested with Python 3.11 and later. It's packaged and distributed with pyinstaller. It is supported on Windows, macOS and Linux. - -``` -git clone https://github.com/blw1138/Zordon.git -pip3 install -r requirements.txt -pip3 install pyinstaller -pip3 install pyinstaller_versionfile -pyinstaller client.spec -pyinstaller server.spec -``` +- Linux (supported versions TBD) ## License Zordon is licensed under the MIT License. See the [LICENSE](LICENSE.txt) file for more details. + +## Notice + +This software is in beta and intended for casual/hobbyist use. Not recommended for mission-critical environments. \ No newline at end of file diff --git a/server.py b/server.py index 4450810..40520ac 100755 --- a/server.py +++ b/server.py @@ -16,7 +16,7 @@ from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.config import Config from src.utilities.misc_helper import (get_gpu_info, current_system_cpu, current_system_os, - current_system_os_version, current_system_cpu_brand, check_for_updates) + current_system_os_version, current_system_cpu_brand) from src.utilities.zeroconf_server import ZeroconfServer from src.version import APP_NAME, APP_VERSION @@ -38,17 +38,16 @@ class ZordonServer: Config.load_config(config_path) # configure default paths - EngineManager.engines_path = Path(Config.upload_folder).expanduser()/ "engines" + EngineManager.engines_path = str(Path(Config.upload_folder).expanduser()/ "engines") os.makedirs(EngineManager.engines_path, exist_ok=True) PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews" self.api_server = None - self.server_hostname = None + self.server_hostname: str = socket.gethostname() def start_server(self): def existing_process(process_name): - import psutil current_pid = os.getpid() current_process = psutil.Process(current_pid) for proc in psutil.process_iter(['pid', 'name', 'ppid']): @@ -79,7 +78,7 @@ class ZordonServer: ServerProxyManager.subscribe_to_listener() DistributedJobManager.subscribe_to_listener() - # get hostname + # update hostname self.server_hostname = socket.gethostname() # configure and start API server diff --git a/src/api/api_server.py b/src/api/api_server.py index 4d5ed2c..4c48df6 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -300,26 +300,7 @@ def add_job_handler(): return err_msg, 500 try: - loaded_project_local_path = processed_job_data['__loaded_project_local_path'] - created_jobs = [] - if processed_job_data.get("child_jobs"): - for child_job_diffs in processed_job_data["child_jobs"]: - processed_child_job_data = processed_job_data.copy() - processed_child_job_data.pop("child_jobs") - processed_child_job_data.update(child_job_diffs) - child_job = DistributedJobManager.create_render_job(processed_child_job_data, loaded_project_local_path) - created_jobs.append(child_job) - else: - new_job = DistributedJobManager.create_render_job(processed_job_data, loaded_project_local_path) - created_jobs.append(new_job) - - # Save notes to .txt - if processed_job_data.get("notes"): - parent_dir = Path(loaded_project_local_path).parent.parent - notes_name = processed_job_data['name'] + "-notes.txt" - with (Path(parent_dir) / notes_name).open("w") as f: - f.write(processed_job_data["notes"]) - return [x.json() for x in created_jobs] + return JobImportHandler.create_jobs_from_processed_data(processed_job_data) except Exception as e: logger.exception(f"Error creating render job: {e}") return 'unknown error', 500 @@ -447,6 +428,7 @@ def engine_info(): return result except Exception as e: + traceback.print_exc(e) logger.error(f"Error fetching details for engine '{engine.name()}': {e}") return {} @@ -650,6 +632,7 @@ def handle_404(error): @server.errorhandler(Exception) def handle_general_error(general_error): + traceback.print_exception(type(general_error), general_error, general_error.__traceback__) err_msg = f"Server error: {general_error}" logger.error(err_msg) return err_msg, 500 diff --git a/src/api/job_import_handler.py b/src/api/job_import_handler.py index 305a74c..59be69e 100644 --- a/src/api/job_import_handler.py +++ b/src/api/job_import_handler.py @@ -5,19 +5,77 @@ import shutil import tempfile import zipfile from datetime import datetime +from pathlib import Path import requests -from pathlib import Path from tqdm import tqdm from werkzeug.utils import secure_filename +from distributed_job_manager import DistributedJobManager + logger = logging.getLogger() class JobImportHandler: + """Handles job import operations for rendering projects. + + This class provides functionality to validate, download, and process + job data and project files for the rendering queue system. + """ + + @classmethod + def create_jobs_from_processed_data(cls, processed_job_data: dict) -> list[dict]: + """ Takes processed job data and creates new jobs + + Args: processed_job_data: Dictionary containing job information""" + loaded_project_local_path = processed_job_data['__loaded_project_local_path'] + + # prepare child job data + job_data_to_create = [] + if processed_job_data.get("child_jobs"): + for child_job_diffs in processed_job_data["child_jobs"]: + processed_child_job_data = processed_job_data.copy() + processed_child_job_data.pop("child_jobs") + processed_child_job_data.update(child_job_diffs) + job_data_to_create.append(processed_child_job_data) + else: + job_data_to_create.append(processed_job_data) + + # create the jobs + created_jobs = [] + for job_data in job_data_to_create: + new_job = DistributedJobManager.create_render_job(job_data, loaded_project_local_path) + created_jobs.append(new_job) + + # Save notes to .txt + if processed_job_data.get("notes"): + parent_dir = Path(loaded_project_local_path).parent.parent + notes_name = processed_job_data['name'] + "-notes.txt" + with (Path(parent_dir) / notes_name).open("w") as f: + f.write(processed_job_data["notes"]) + return [x.json() for x in created_jobs] @classmethod def validate_job_data(cls, new_job_data: dict, upload_directory: Path, uploaded_file=None) -> dict: + """Validates and prepares job data for import. + + This method validates the job data dictionary, handles project file + acquisition (upload, download, or local copy), and prepares the job + directory structure. + + Args: + new_job_data: Dictionary containing job information including + 'name', 'engine_name', and optionally 'url' or 'local_path'. + upload_directory: Base directory for storing uploaded jobs. + uploaded_file: Optional uploaded file object from the request. + + Returns: + The validated job data dictionary with additional metadata. + + Raises: + KeyError: If required fields 'name' or 'engine_name' are missing. + FileNotFoundError: If no valid project file can be found. + """ loaded_project_local_path = None # check for required keys @@ -25,7 +83,7 @@ class JobImportHandler: engine_name = new_job_data.get('engine_name') if not job_name: raise KeyError("Missing job name") - elif not engine_name: + if not engine_name: raise KeyError("Missing engine name") project_url = new_job_data.get('url', None) @@ -45,8 +103,7 @@ class JobImportHandler: # Prepare the local filepath cleaned_path_name = job_name.replace(' ', '-') - timestamp = datetime.now().strftime("%Y.%m.%d_%H.%M.%S") - folder_name = f"{cleaned_path_name}-{engine_name}-{timestamp}" + folder_name = f"{cleaned_path_name}-{engine_name}-{datetime.now().strftime('%Y.%m.%d_%H.%M.%S')}" job_dir = Path(upload_directory) / folder_name os.makedirs(job_dir, exist_ok=True) project_source_dir = Path(job_dir) / 'source' @@ -81,12 +138,24 @@ class JobImportHandler: @staticmethod def download_project_from_url(project_url: str): + """Downloads a project file from the given URL. + + Downloads the file from the specified URL to a temporary directory + with progress tracking. Returns the filename and temporary path. + + Args: + project_url: The URL to download the project file from. + + Returns: + A tuple of (filename, temp_file_path) if successful, + (None, None) if download fails. + """ # This nested function is to handle downloading from a URL logger.info(f"Downloading project from url: {project_url}") referred_name = os.path.basename(project_url) try: - response = requests.get(project_url, stream=True) + response = requests.get(project_url, stream=True, timeout=300) if response.status_code == 200: # Get the total file size from the "Content-Length" header file_size = int(response.headers.get("Content-Length", 0)) @@ -113,8 +182,8 @@ class JobImportHandler: """ Processes a zipped project. - This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file. - If the zip file contains more than one project file or none, an error is raised. + This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project + file. If the zip file contains more than one project file or none, an error is raised. Args: zip_path (Path): The path to the zip file. @@ -148,5 +217,5 @@ class JobImportHandler: except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: logger.error(f"Error processing zip file: {e}") - raise ValueError(f"Error processing zip file: {e}") + raise ValueError(f'Error processing zip file: {e}') from e return extracted_project_path diff --git a/src/api/preview_manager.py b/src/api/preview_manager.py index 6f9db38..be41d5e 100644 --- a/src/api/preview_manager.py +++ b/src/api/preview_manager.py @@ -12,12 +12,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web class PreviewManager: + """Manages generation, storage, and retrieval of preview images and videos for rendering jobs.""" storage_path = None _running_jobs = {} @classmethod def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480): + """Generates image and video previews for a given job. + + Args: + job: The job object containing file information. + replace_existing (bool): Whether to replace existing previews. Defaults to False. + max_width (int): Maximum width for the preview images/videos. Defaults to 480. + """ # Determine best source file to use for thumbs job_file_list = job.file_list() @@ -67,20 +75,36 @@ class PreviewManager: @classmethod def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): + """Updates previews for a given job by starting a background thread. + + Args: + job: The job object. + replace_existing (bool): Whether to replace existing previews. Defaults to False. + wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False. + timeout (float): Timeout for waiting, if applicable. + """ job_thread = cls._running_jobs.get(job.id) if job_thread and job_thread.is_alive(): logger.debug(f'Preview generation job already running for {job}') - else: - job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,)) - job_thread.start() - cls._running_jobs[job.id] = job_thread + return + + job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,)) + job_thread.start() + cls._running_jobs[job.id] = job_thread if wait_until_completion: job_thread.join(timeout=timeout) @classmethod def get_previews_for_job(cls, job): + """Retrieves previews for a given job. + Args: + job: The job object. + + Returns: + dict: A dictionary containing preview information. + """ results = {} try: directory_path = Path(cls.storage_path) @@ -103,6 +127,11 @@ class PreviewManager: @classmethod def delete_previews_for_job(cls, job): + """Deletes all previews associated with a given job. + + Args: + job: The job object. + """ all_previews = cls.get_previews_for_job(job) flattened_list = [item for sublist in all_previews.values() for item in sublist] for preview in flattened_list: diff --git a/src/engines/core/base_engine.py b/src/engines/core/base_engine.py index 365ead5..2428652 100644 --- a/src/engines/core/base_engine.py +++ b/src/engines/core/base_engine.py @@ -19,6 +19,7 @@ class BaseRenderEngine(object): """ install_paths: List[str] = [] + binary_names: Dict[str, str] = {} # -------------------------------------------- # Required Overrides for Subclasses: diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index d1d36a1..b4ba308 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -122,7 +122,7 @@ class EngineManager: binary_name = eng.binary_names.get(result_dict['system_os'], binary_name) # Find the path to the binary file - search_root = cls.engines_path / directory + search_root = Path(cls.engines_path) / directory match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None) path = str(match) if match else None result_dict['path'] = path @@ -458,27 +458,27 @@ class EngineDownloadWorker(threading.Thread): self.percent_complete = current_progress def run(self): - """Execute the download process. + """Execute the download process. - Checks if engine version already exists, then downloads if not found. - Handles cleanup and error reporting. - """ - try: - existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, - ignore_system=True) - if existing_download: - logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") - return existing_download + Checks if engine version already exists, then downloads if not found. + Handles cleanup and error reporting. + """ + try: + existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, + ignore_system=True) + if existing_download: + logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") + return existing_download - # Get the appropriate downloader class based on the engine type - downloader = EngineManager.engine_class_with_name(self.engine).downloader() - downloader.download_engine( self.version, download_location=EngineManager.engines_path, - system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) - except Exception as e: - logger.error(f"Error in download worker: {e}") - finally: - # remove itself from the downloader list - EngineManager.download_tasks.remove(self) + # Get the appropriate downloader class based on the engine type + downloader = EngineManager.engine_class_with_name(self.engine).downloader() + downloader.download_engine( self.version, download_location=EngineManager.engines_path, + system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) + except Exception as e: + logger.error(f"Error in download worker: {e}") + finally: + # remove itself from the downloader list + EngineManager.download_tasks.remove(self) if __name__ == '__main__': diff --git a/src/render_queue.py b/src/render_queue.py index cb7fa4f..b94612f 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -1,4 +1,5 @@ import logging +from collections import Counter from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional @@ -122,7 +123,6 @@ class RenderQueue: cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") Base.metadata.create_all(cls.engine) cls.session = sessionmaker(bind=cls.engine)() - from src.engines.core.base_worker import BaseRenderWorker cls.job_queue = cls.session.query(BaseRenderWorker).all() pub.subscribe(cls.__local_job_status_changed, 'status_change') @@ -135,7 +135,7 @@ class RenderQueue: logger.debug("Closing session") cls.stop() running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs - [cls.cancel_job(job) for job in running_jobs] + _ = [cls.cancel_job(job) for job in running_jobs] cls.save_state() cls.session.close() @@ -145,7 +145,6 @@ class RenderQueue: @classmethod def renderer_instances(cls): - from collections import Counter all_instances = [x.engine_name for x in cls.running_jobs()] return Counter(all_instances) From fa4a97f6fad61a556f39a9a0330cbcfae9c62cd7 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 5 Jun 2026 22:01:20 -0500 Subject: [PATCH 2/2] Refactor: ApplicationContext DI wiring (#131) * refactor: wire all services through ApplicationContext - Created src/application_context.py as DI container with TYPE_CHECKING imports - server.py now instantiates all services in dependency order via ApplicationContext - Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix to avoid shadowing by same-named @classmethod forwarders - ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to configure forwarder, direct _configure/_start calls during wiring - Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact - RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained - DistributedJobManager: subscribe_to_listener moved to __init__ * fix: greedy regex in get_render_devices swallows BlenderKit log output Changed regex from greedy [\s\S]* to non-greedy .*? with re.DOTALL so it stops at the first ] (the end of the GPU data JSON array) instead of matching through timestamped log lines like [19:36:22.109, __init__.py:2881] that contain trailing brackets. * fix: AttributeError on .enabled in update_job_count prevents options from rendering * refactor: log silent AttributeError catches, add _sync_class to remaining services, drop dead ctx slot --- server.py | 68 ++-- src/api/job_import_handler.py | 2 +- src/api/preview_manager.py | 87 +++--- src/application_context.py | 23 ++ src/distributed_job_manager.py | 213 +++++-------- src/engines/blender/blender_engine.py | 4 +- src/engines/engine_manager.py | 433 +++++++++++--------------- src/render_queue.py | 305 +++++++++++------- src/ui/add_job_window.py | 9 +- src/ui/main_window.py | 8 +- src/utilities/config.py | 49 ++- src/utilities/zeroconf_server.py | 164 ++++++---- 12 files changed, 714 insertions(+), 651 deletions(-) create mode 100644 src/application_context.py diff --git a/server.py b/server.py index 40520ac..5b4192c 100755 --- a/server.py +++ b/server.py @@ -11,6 +11,7 @@ from src.api.api_server import API_VERSION from src.api.api_server import start_api_server from src.api.preview_manager import PreviewManager from src.api.serverproxy_manager import ServerProxyManager +from src.application_context import ApplicationContext from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue @@ -26,21 +27,46 @@ logger = logging.getLogger() class ZordonServer: def __init__(self): + self.ctx = ApplicationContext() + # setup logging logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=Config.server_log_level.upper()) logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging logging.getLogger("urllib3").setLevel(logging.WARNING) - # Load Config YAML + # ---- Bootstrap Config ---- Config.setup_config_dir() config_path = Path(Config.config_dir()) / "config.yaml" - Config.load_config(config_path) + self.ctx.config = Config() + self.ctx.config.load(config_path) + Config._default_instance = self.ctx.config + Config._sync_class() - # configure default paths - EngineManager.engines_path = str(Path(Config.upload_folder).expanduser()/ "engines") - os.makedirs(EngineManager.engines_path, exist_ok=True) - PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews" + # ---- Engine Manager ---- + self.ctx.engine_manager = EngineManager() + self.ctx.engine_manager.engines_path = Path(Config.upload_folder).expanduser() / "engines" + os.makedirs(self.ctx.engine_manager.engines_path, exist_ok=True) + EngineManager._default_instance = self.ctx.engine_manager + EngineManager._sync_class() + + # ---- Preview Manager ---- + self.ctx.preview_manager = PreviewManager() + self.ctx.preview_manager.storage_path = Path(Config.upload_folder).expanduser() / "previews" + PreviewManager._default_instance = self.ctx.preview_manager + PreviewManager._sync_class() + + # ---- Render Queue ---- + self.ctx.render_queue = RenderQueue() + self.ctx.render_queue.load_state(database_directory=Path(Config.upload_folder).expanduser()) + RenderQueue._default_instance = self.ctx.render_queue + RenderQueue._sync_class() + + # ---- Distributed Job Manager ---- + self.ctx.distributed_job_manager = DistributedJobManager() + self.ctx.distributed_job_manager.subscribe_to_listener() + DistributedJobManager._default_instance = self.ctx.distributed_job_manager + DistributedJobManager._sync_class() self.api_server = None self.server_hostname: str = socket.gethostname() @@ -73,10 +99,8 @@ class ZordonServer: logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}") logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") logger.debug(f"Engines directory: {EngineManager.engines_path}") - # Set up the RenderQueue object - RenderQueue.load_state(database_directory=Path(Config.upload_folder).expanduser()) + ServerProxyManager.subscribe_to_listener() - DistributedJobManager.subscribe_to_listener() # update hostname self.server_hostname = socket.gethostname() @@ -87,16 +111,21 @@ class ZordonServer: self.api_server.start() # start zeroconf server - ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) - ZeroconfServer.properties = {'system_cpu': current_system_cpu(), - 'system_cpu_brand': current_system_cpu_brand(), - 'system_cpu_cores': multiprocessing.cpu_count(), - 'system_os': current_system_os(), - 'system_os_version': current_system_os_version(), - 'system_memory': round(psutil.virtual_memory().total / (1024**3)), # in GB - 'gpu_info': get_gpu_info(), - 'api_version': API_VERSION} - ZeroconfServer.start() + ctx = self.ctx + ctx.zeroconf_server = ZeroconfServer() + ctx.zeroconf_server._configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) + ctx.zeroconf_server.properties = {'system_cpu': current_system_cpu(), + 'system_cpu_brand': current_system_cpu_brand(), + 'system_cpu_cores': multiprocessing.cpu_count(), + 'system_os': current_system_os(), + 'system_os_version': current_system_os_version(), + 'system_memory': round(psutil.virtual_memory().total / (1024**3)), + 'gpu_info': get_gpu_info(), + 'api_version': API_VERSION} + ZeroconfServer._default_instance = ctx.zeroconf_server + ZeroconfServer._sync_class() + ctx.zeroconf_server._start() + logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}") RenderQueue.start() # Start evaluating the render queue @@ -112,6 +141,7 @@ class ZordonServer: logger.exception(f"Exception during prepare for shutdown: {e}") logger.info(f"{APP_NAME} Render Server has shut down") + if __name__ == '__main__': server = ZordonServer() try: diff --git a/src/api/job_import_handler.py b/src/api/job_import_handler.py index 59be69e..3fafd53 100644 --- a/src/api/job_import_handler.py +++ b/src/api/job_import_handler.py @@ -11,7 +11,7 @@ import requests from tqdm import tqdm from werkzeug.utils import secure_filename -from distributed_job_manager import DistributedJobManager +from src.distributed_job_manager import DistributedJobManager logger = logging.getLogger() diff --git a/src/api/preview_manager.py b/src/api/preview_manager.py index be41d5e..a01670d 100644 --- a/src/api/preview_manager.py +++ b/src/api/preview_manager.py @@ -3,6 +3,7 @@ import os import subprocess import threading from pathlib import Path +from typing import Dict, Optional from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame @@ -12,20 +13,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web class PreviewManager: - """Manages generation, storage, and retrieval of preview images and videos for rendering jobs.""" + _default_instance: Optional['PreviewManager'] = None - storage_path = None - _running_jobs = {} + storage_path: Optional[str] = None + + def __init__(self) -> None: + self.storage_path = None + self._running_jobs: Dict = {} @classmethod - def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480): - """Generates image and video previews for a given job. + def _sync_class(cls) -> None: + if cls._default_instance is not None: + cls.storage_path = cls._default_instance.storage_path - Args: - job: The job object containing file information. - replace_existing (bool): Whether to replace existing previews. Defaults to False. - max_width (int): Maximum width for the preview images/videos. Defaults to 480. - """ + def _generate_job_preview_worker(self, job, replace_existing=False, max_width=480): # Determine best source file to use for thumbs job_file_list = job.file_list() @@ -41,8 +42,8 @@ class PreviewManager: logger.warning(f"No valid image or video files found in files from job: {job}") return - os.makedirs(cls.storage_path, exist_ok=True) - base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") + os.makedirs(self.storage_path, exist_ok=True) + base_path = os.path.join(self.storage_path, f"{job.id}-{preview_label}-{max_width}") preview_video_path = base_path + '.mp4' preview_image_path = base_path + '.jpg' @@ -73,41 +74,23 @@ class PreviewManager: except subprocess.CalledProcessError as e: logger.error(f"Error generating video preview for {job}: {e}") - @classmethod - def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): - """Updates previews for a given job by starting a background thread. - - Args: - job: The job object. - replace_existing (bool): Whether to replace existing previews. Defaults to False. - wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False. - timeout (float): Timeout for waiting, if applicable. - """ - job_thread = cls._running_jobs.get(job.id) + def _update_previews_for_job(self, job, replace_existing=False, wait_until_completion=False, timeout=None): + job_thread = self._running_jobs.get(job.id) if job_thread and job_thread.is_alive(): logger.debug(f'Preview generation job already running for {job}') - return - - job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,)) - job_thread.start() - cls._running_jobs[job.id] = job_thread + else: + job_thread = threading.Thread(target=self._generate_job_preview_worker, args=(job, replace_existing,)) + job_thread.start() + self._running_jobs[job.id] = job_thread if wait_until_completion: job_thread.join(timeout=timeout) - @classmethod - def get_previews_for_job(cls, job): - """Retrieves previews for a given job. + def _get_previews_for_job(self, job): - Args: - job: The job object. - - Returns: - dict: A dictionary containing preview information. - """ results = {} try: - directory_path = Path(cls.storage_path) + directory_path = Path(self.storage_path) preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] for preview_filename in preview_files_for_job: @@ -125,14 +108,8 @@ class PreviewManager: pass return results - @classmethod - def delete_previews_for_job(cls, job): - """Deletes all previews associated with a given job. - - Args: - job: The job object. - """ - all_previews = cls.get_previews_for_job(job) + def _delete_previews_for_job(self, job): + all_previews = self.get_previews_for_job(job) flattened_list = [item for sublist in all_previews.values() for item in sublist] for preview in flattened_list: try: @@ -140,3 +117,21 @@ class PreviewManager: os.remove(preview['filename']) except OSError as e: logger.error(f"Error removing preview '{preview.get('filename')}': {e}") + + # --- Forwarders for backward compatibility --- + + @classmethod + def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): + if cls._default_instance is not None: + cls._default_instance._update_previews_for_job(job, replace_existing, wait_until_completion, timeout) + + @classmethod + def get_previews_for_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._get_previews_for_job(job) + return {} + + @classmethod + def delete_previews_for_job(cls, job): + if cls._default_instance is not None: + cls._default_instance._delete_previews_for_job(job) diff --git a/src/application_context.py b/src/application_context.py new file mode 100644 index 0000000..64b7cfa --- /dev/null +++ b/src/application_context.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from src.api.preview_manager import PreviewManager + from src.distributed_job_manager import DistributedJobManager + from src.engines.engine_manager import EngineManager + from src.render_queue import RenderQueue + from src.utilities.config import Config + from src.utilities.zeroconf_server import ZeroconfServer + + +class ApplicationContext: + """Holds all service instances. Single source of truth for wiring.""" + + def __init__(self) -> None: + self.config: Optional[Config] = None + self.engine_manager: Optional[EngineManager] = None + self.preview_manager: Optional[PreviewManager] = None + self.zeroconf_server: Optional[ZeroconfServer] = None + self.render_queue: Optional[RenderQueue] = None + self.distributed_job_manager: Optional[DistributedJobManager] = None diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index ccd5047..e817261 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -3,8 +3,9 @@ import os import socket import threading import time +from typing import Optional +from pathlib import Path -from click import Path from plyer import notification from pubsub import pub @@ -21,47 +22,32 @@ logger = logging.getLogger() class DistributedJobManager: - - def __init__(self): - pass + _default_instance: Optional['DistributedJobManager'] = None @classmethod - def subscribe_to_listener(cls): - """ - Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. - This should be called once, typically during the initialization phase. - """ - pub.subscribe(cls.__local_job_status_changed, 'status_change') - pub.subscribe(cls.__local_job_frame_complete, 'frame_complete') + def _sync_class(cls) -> None: + if cls._default_instance is not None: + pass # no class-level attributes to sync - @classmethod - def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): + def __init__(self) -> None: + self.background_worker: Optional[threading.Thread] = None - """ - Responds to the 'frame_complete' pubsub message for local jobs. - - Args: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. - - Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. - """ + def _subscribe_to_listener(self) -> None: + pub.subscribe(self._local_job_status_changed, 'status_change') + pub.subscribe(self._local_job_frame_complete, 'frame_complete') + def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if not render_job: # ignore jobs not in the queue + if not render_job: return logger.debug(f"Job {job_id} has completed frame #{frame_number}") replace_existing_previews = (frame_number % update_interval) == 0 - cls.__job_update_shared(render_job, replace_existing_previews) + self._job_update_shared(render_job, replace_existing_previews) - @classmethod - def __job_update_shared(cls, render_job, replace_existing_previews=False): - # update previews + def _job_update_shared(self, render_job, replace_existing_previews=False) -> None: PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) - # notify parent to allow individual frames to be copied instead of waiting until the end if render_job.parent: parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] try: @@ -70,57 +56,41 @@ class DistributedJobManager: except Exception as e: logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") - @classmethod - def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str): - """ - Responds to the 'status_change' pubsub message for local jobs. - If it's a child job, it notifies the parent job about the status change. - - Args: - job_id (str): The ID of the job that has changed status. - old_status (str): The previous status of the job. - new_status (str): The new (current) status of the job. - - Note: Do not call directly. Instead, call via the 'status_change' pubsub message. - """ - + def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if not render_job: # ignore jobs created but not yet added to queue + if not render_job: return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") + self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) - cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) - - # Handle children if render_job.children: - if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary + if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR): for child in render_job.children: child_id, child_hostname = child.split('@') RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) - # UI Notifications try: if new_status == RenderStatus.COMPLETED: logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', - timeout=10 # Display time in seconds + timeout=10 ) elif new_status == RenderStatus.ERROR: logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', - timeout=10 # Display time in seconds + timeout=10 ) elif new_status == RenderStatus.RUNNING: logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', - timeout=10 # Display time in seconds + timeout=10 ) except Exception as e: logger.debug(f"Unable to show UI notification: {e}") @@ -129,30 +99,15 @@ class DistributedJobManager: # Create Job # -------------------------------------------- - @classmethod - def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path): - """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new - render job. - - Args: - new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) - loaded_project_local_path (Path): The local path to the loaded project. - - Returns: - worker: Created job worker - """ - - # get new output path in output_dir + def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path): output_path = new_job_attributes.get('output_path') output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem - # Prepare output path output_dir = loaded_project_local_path.parent.parent / "output" output_path = output_dir / output_filename os.makedirs(output_dir, exist_ok=True) logger.debug(f"New job output path: {output_path}") - # create & configure jobs worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'], input_path=loaded_project_local_path, output_path=output_path, @@ -160,16 +115,15 @@ class DistributedJobManager: args=new_job_attributes.get('args', {}), parent=new_job_attributes.get('parent'), name=new_job_attributes.get('name')) - worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? + worker.status = new_job_attributes.get("initial_status", worker.status) worker.priority = int(new_job_attributes.get('priority', worker.priority)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() - # determine if we can / should split the job if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: - cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) + self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED @@ -182,15 +136,7 @@ class DistributedJobManager: # Handling Subjobs # -------------------------------------------- - @classmethod - def handle_subjob_update_notification(cls, local_job, subjob_data: dict): - """Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. - - Args: - local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): Subjob data sent from the remote server. - """ - + def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None: subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = subjob_data['hostname'] @@ -206,19 +152,10 @@ class DistributedJobManager: if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' - @classmethod - def wait_for_subjobs(cls, parent_job): - """Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs - when they are completed. - - Args: - parent_job: Worker object that has child jobs - - Returns: - """ + def _wait_for_subjobs(self, parent_job) -> None: logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] + statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED) def subjobs_not_downloaded(): return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or @@ -230,21 +167,17 @@ class DistributedJobManager: sleep_counter = 0 while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: - if sleep_counter % server_delay == 0: # only ping servers every x seconds - for child_key, subjob_cached_data in subjobs_not_downloaded().items(): - + if sleep_counter % server_delay == 0: + for child_key in subjobs_not_downloaded(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] - # Fetch info from server and handle failing case subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from {subjob_hostname}") - # timeout / missing server situations parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' continue - # Update parent job cache but keep the download status download_status = parent_job.children[child_key].get('download_status', None) parent_job.children[child_key] = subjob_data parent_job.children[child_key]['download_status'] = download_status @@ -254,8 +187,7 @@ class DistributedJobManager: f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) - # Check if job is finished, but has not had files copied yet over yet - if download_status is None and subjob_data['file_count'] and status in statuses_to_download: + if download_status is None and subjob_data.get('file_count') and status in statuses_to_download: try: download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' @@ -263,7 +195,6 @@ class DistributedJobManager: logger.error(f"Error downloading missing frames from subjob: {e}") parent_job.children[child_key]['download_status'] = 'error: {}' - # Any finished jobs not successfully downloaded at this point are skipped if parent_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") @@ -274,42 +205,22 @@ class DistributedJobManager: f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(1) sleep_counter += 1 - else: # exit the loop + else: parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs # -------------------------------------------- - @classmethod - def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): - # todo: I don't love this + def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None: parent_worker.status = RenderStatus.CONFIGURING - cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, - project_path, system_os)) - cls.background_worker.start() + self.background_worker = threading.Thread(target=self.split_into_subjobs, args=( + parent_worker, new_job_attributes, project_path, system_os)) + self.background_worker.start() - @classmethod - def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): - """ - Splits a job into subjobs and distributes them among available servers. - - This method checks the availability of servers, distributes the work among them, and creates subjobs on each - server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a - subjob. - - Args: - parent_worker (Worker): The parent job what we're creating the subjobs for. - new_job_attributes (dict): Dict of desired attributes for new job (frame count, engine, output path, etc) - project_path (str): The path to the project. - system_os (str, optional): Required OS. Default is any. - specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. - """ - - # Check availability - available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.engine_name, - system_os) - # skip if theres no external servers found + def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None: + available_servers = specific_servers if specific_servers else self.find_available_servers( + parent_worker.engine_name, system_os) external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] if not external_servers: parent_worker.status = RenderStatus.NOT_STARTED @@ -318,34 +229,29 @@ class DistributedJobManager: logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) - # Prep and submit these sub-jobs logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] - post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, + post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") - # save child info submission_results = post_results.json()[0] child_key = f"{submission_results['id']}@{subjob_hostname}" parent_worker.children[child_key] = submission_results - # start subjobs logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") parent_worker.name = f"{parent_worker.name} (Parent)" - parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts + parent_worker.status = RenderStatus.NOT_STARTED except Exception as e: - # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod - def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker): - """Convenience method to create subjobs for a parent worker""" + def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker): subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" @@ -364,13 +270,6 @@ class DistributedJobManager: @staticmethod def find_available_servers(engine_name: str, system_os=None): - """ - Scan the Zeroconf network for currently available render servers supporting a specific engine. - - :param engine_name: str, The engine type to search for - :param system_os: str, Restrict results to servers running a specific OS - :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers - """ from api.api_server import API_VERSION found_available_servers = [] for hostname in ZeroconfServer.found_hostnames(): @@ -383,6 +282,34 @@ class DistributedJobManager: return found_available_servers + # --- Forwarders for backward compatibility --- + + @classmethod + def subscribe_to_listener(cls): + if cls._default_instance is not None: + cls._default_instance._subscribe_to_listener() + + @classmethod + def create_render_job(cls, new_job_attributes, loaded_project_local_path): + if cls._default_instance is not None: + return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path) + raise RuntimeError("DistributedJobManager is not initialized") + + @classmethod + def handle_subjob_update_notification(cls, local_job, subjob_data): + if cls._default_instance is not None: + cls._default_instance._handle_subjob_update_notification(local_job, subjob_data) + + @classmethod + def wait_for_subjobs(cls, parent_job): + if cls._default_instance is not None: + cls._default_instance._wait_for_subjobs(parent_job) + + @classmethod + def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): + if cls._default_instance is not None: + cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os) + if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') diff --git a/src/engines/blender/blender_engine.py b/src/engines/blender/blender_engine.py index 4cb0277..15d268e 100644 --- a/src/engines/blender/blender_engine.py +++ b/src/engines/blender/blender_engine.py @@ -173,7 +173,7 @@ class Blender(BaseRenderEngine): script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py') results = self.run_python_script(script_path=script_path) output = results.stdout.decode() - match = re.search(r"GPU DATA:(\[[\s\S]*\])", output) + match = re.search(r"GPU DATA:(\[.*?\])", output, re.DOTALL) if match: gpu_data_json = match.group(1) gpus_info = json.loads(gpu_data_json) @@ -193,5 +193,5 @@ class Blender(BaseRenderEngine): if __name__ == "__main__": - x = Blender().get_render_devices() + x = Blender().system_info() print(x) diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index b4ba308..cd1e6ba 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -20,128 +20,78 @@ class EngineManager: if possible. """ + _default_instance: Optional['EngineManager'] = None + engines_path: Optional[str] = None download_tasks: List[Any] = [] + def __init__(self) -> None: + self.engines_path: Optional[str] = None + self.download_tasks: List[Any] = [] + + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + cls.engines_path = cls._default_instance.engines_path + cls.download_tasks = cls._default_instance.download_tasks + @staticmethod def supported_engines() -> list[type[BaseRenderEngine]]: - """Return list of supported engine classes. - - Returns: - List[Type[BaseRenderEngine]]: List of available engine classes. - """ return ENGINE_CLASSES # --- Installed Engines --- - @classmethod - def engine_class_for_project_path(cls, path: str) -> Type[BaseRenderEngine]: - """Find engine class that can handle the given project file. - - Args: - path: Path to project file. - - Returns: - Type[BaseRenderEngine]: Engine class that can handle the file. - """ + def _engine_class_for_project_path(self, path: str) -> Type[BaseRenderEngine]: _, extension = os.path.splitext(path) extension = extension.lower().strip('.') - for engine_class in cls.supported_engines(): - engine = cls.get_latest_engine_instance(engine_class) + for engine_class in self.supported_engines(): + engine = self.get_latest_engine_instance(engine_class) if extension in engine.supported_extensions(): return engine_class - undefined_renderer_support = [x for x in cls.supported_engines() if not cls.get_latest_engine_instance(x).supported_extensions()] + undefined_renderer_support = [x for x in self.supported_engines() if not self.get_latest_engine_instance(x).supported_extensions()] return undefined_renderer_support[0] - @classmethod - def engine_class_with_name(cls, engine_name: str) -> Optional[Type[BaseRenderEngine]]: - """Find engine class by name. - - Args: - engine_name: Name of engine to find. - - Returns: - Optional[Type[BaseRenderEngine]]: Engine class if found, None otherwise. - """ - for obj in cls.supported_engines(): + def _engine_class_with_name(self, engine_name: str) -> Optional[Type[BaseRenderEngine]]: + for obj in self.supported_engines(): if obj.name().lower() == engine_name.lower(): return obj return None - @classmethod - def get_latest_engine_instance(cls, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine: - """Create instance of latest installed engine version. - - Args: - engine_class: Engine class to instantiate. - - Returns: - BaseRenderEngine: Instance of engine with latest version. - """ - newest = cls.newest_installed_engine_data(engine_class.name()) + def _get_latest_engine_instance(self, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine: + newest = self.newest_installed_engine_data(engine_class.name()) engine = engine_class(newest["path"]) return engine - @classmethod - def get_installed_engine_data(cls, filter_name: Optional[str] = None, include_corrupt: bool = False, + def _get_installed_engine_data(self, filter_name: Optional[str] = None, include_corrupt: bool = False, ignore_system: bool = False) -> List[Dict[str, Any]]: - """Get data about installed render engines. - - Args: - filter_name: Optional engine name to filter by. - include_corrupt: Whether to include potentially corrupted installations. - ignore_system: Whether to ignore system-installed engines. - - Returns: - List[Dict[str, Any]]: List of installed engine data. - - Raises: - FileNotFoundError: If engines path is not set. - """ - if not cls.engines_path: + if not self.engines_path: raise FileNotFoundError("Engine path is not set") - # Parse downloaded engine directory results = [] try: - all_items = os.listdir(cls.engines_path) - all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))] - keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary + all_items = os.listdir(self.engines_path) + all_directories = [item for item in all_items if os.path.isdir(os.path.join(self.engines_path, item))] + keys = ["engine", "version", "system_os", "cpu"] for directory in all_directories: - # Split directory name into segments segments = directory.split('-') - # Create a dictionary mapping keys to corresponding segments result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))} result_dict['type'] = 'managed' - # Initialize binary_name with engine name binary_name = result_dict['engine'].lower() - # Determine the correct binary name based on the engine and system_os - eng = cls.engine_class_with_name(result_dict['engine']) + eng = self.engine_class_with_name(result_dict['engine']) binary_name = eng.binary_names.get(result_dict['system_os'], binary_name) - # Find the path to the binary file - search_root = Path(cls.engines_path) / directory + search_root = self.engines_path / directory match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None) path = str(match) if match else None result_dict['path'] = path - # fetch version number from binary - helps detect corrupted downloads - disabled due to perf issues - # binary_version = eng(path).version() - # if not binary_version: - # logger.warning(f"Possible corrupt {eng.name()} {result_dict['version']} install detected: {path}") - # if not include_corrupt: - # continue - # result_dict['version'] = binary_version or 'error' - - # Add the result dictionary to results if it matches the filter_name or if no filter is applied if not filter_name or filter_name == result_dict['engine']: results.append(result_dict) except FileNotFoundError as e: logger.warning(f"Cannot find local engines download directory: {e}") - # add system installs to this list - use bg thread because it can be slow def fetch_engine_details(eng, include_corrupt=False): version = eng().version() if not version and not include_corrupt: @@ -160,7 +110,7 @@ class EngineManager: with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(fetch_engine_details, eng, include_corrupt): eng.name() - for eng in cls.supported_engines() + for eng in self.supported_engines() if eng.default_engine_path() and (not filter_name or filter_name == eng.name()) } @@ -173,96 +123,55 @@ class EngineManager: # --- Check for Updates --- - @classmethod - def update_all_engines(cls) -> None: - """Check for and download updates for all downloadable engines.""" - for engine in cls.downloadable_engines(): - update_available = cls.is_engine_update_available(engine) + def _update_all_engines(self) -> None: + for engine in self.downloadable_engines(): + update_available = self.is_engine_update_available(engine) if update_available: update_available['name'] = engine.name() - cls.download_engine(engine.name(), update_available['version'], background=True) + self.download_engine(engine.name(), update_available['version'], background=True) - @classmethod - def all_version_data_for_engine(cls, engine_name:str, include_corrupt=False, ignore_system=False) -> list: - """Get all version data for a specific engine. - - Args: - engine_name: Name of engine to query. - include_corrupt: Whether to include corrupt installations. - ignore_system: Whether to ignore system installations. - - Returns: - list: Sorted list of engine version data (newest first). - """ - versions = cls.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system) + def _all_version_data_for_engine(self, engine_name: str, include_corrupt=False, ignore_system=False) -> list: + versions = self.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system) sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True) return sorted_versions - @classmethod - def newest_installed_engine_data(cls, engine_name:str, system_os=None, cpu=None, ignore_system=None) -> list: - """Get newest installed engine data for specific platform. - - Args: - engine_name: Name of engine to query. - system_os: Operating system to filter by (defaults to current). - cpu: CPU architecture to filter by (defaults to current). - ignore_system: Whether to ignore system installations. - - Returns: - list: Newest engine data or empty list if not found. - """ + def _newest_installed_engine_data(self, engine_name: str, system_os=None, cpu=None, ignore_system=None) -> list: system_os = system_os or current_system_os() cpu = cpu or current_system_cpu() try: - filtered = [x for x in cls.all_version_data_for_engine(engine_name, ignore_system=ignore_system) + filtered = [x for x in self.all_version_data_for_engine(engine_name, ignore_system=ignore_system) if x['system_os'] == system_os and x['cpu'] == cpu] return filtered[0] except IndexError: logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}") return [] - @classmethod - def is_version_installed(cls, engine_name:str, version:str, system_os=None, cpu=None, ignore_system=False): - """Check if specific engine version is installed. - - Args: - engine_name: Name of engine to check. - version: Version string to check. - system_os: Operating system to check (defaults to current). - cpu: CPU architecture to check (defaults to current). - ignore_system: Whether to ignore system installations. - - Returns: - Engine data if found, False otherwise. - """ + def _is_version_installed(self, engine_name: str, version: str, system_os=None, cpu=None, ignore_system=False): system_os = system_os or current_system_os() cpu = cpu or current_system_cpu() - filtered = [x for x in cls.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if + filtered = [x for x in self.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version] return filtered[0] if filtered else False - @classmethod - def version_is_available_to_download(cls, engine_name:str, version, system_os=None, cpu=None): + def _version_is_available_to_download(self, engine_name: str, version, system_os=None, cpu=None): try: - downloader = cls.engine_class_with_name(engine_name).downloader() + downloader = self.engine_class_with_name(engine_name).downloader() return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu) except Exception as e: logger.debug(f"Exception in version_is_available_to_download: {e}") return None - @classmethod - def find_most_recent_version(cls, engine_name:str, system_os=None, cpu=None, lts_only=False) -> dict: + def _find_most_recent_version(self, engine_name: str, system_os=None, cpu=None, lts_only=False) -> dict: try: - downloader = cls.engine_class_with_name(engine_name).downloader() + downloader = self.engine_class_with_name(engine_name).downloader() return downloader.find_most_recent_version(system_os=system_os, cpu=cpu) except Exception as e: logger.debug(f"Exception in find_most_recent_version: {e}") return {} - @classmethod - def is_engine_update_available(cls, engine_class: Type[BaseRenderEngine], ignore_system_installs=False): + def _is_engine_update_available(self, engine_class: Type[BaseRenderEngine], ignore_system_installs=False): logger.debug(f"Checking for updates to {engine_class.name()}") latest_version = engine_class.downloader().find_most_recent_version() @@ -271,7 +180,7 @@ class EngineManager: return None version_num = latest_version.get('version') - if cls.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs): + if self.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs): logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded") return None @@ -279,18 +188,11 @@ class EngineManager: # --- Downloads --- - @classmethod - def downloadable_engines(cls): - """Get list of engines that support downloading. + def _downloadable_engines(self): + return [engine for engine in self.supported_engines() if hasattr(engine, "downloader") and engine.downloader()] - Returns: - List[Type[BaseRenderEngine]]: Engines with downloader capability. - """ - return [engine for engine in cls.supported_engines() if hasattr(engine, "downloader") and engine.downloader()] - - @classmethod - def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None): - for task in cls.download_tasks: + def _get_existing_download_task(self, engine_name, version, system_os=None, cpu=None): + for task in self.download_tasks: task_parts = task.name.split('-') task_engine, task_version, task_system_os, task_cpu = task_parts[:4] @@ -299,50 +201,45 @@ class EngineManager: return task return None - @classmethod - def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): - - engine_to_download = cls.engine_class_with_name(engine_name) - existing_task = cls.get_existing_download_task(engine_name, version, system_os, cpu) + def _download_engine(self, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): + engine_to_download = self.engine_class_with_name(engine_name) + existing_task = self.get_existing_download_task(engine_name, version, system_os, cpu) if existing_task: logger.debug(f"Already downloading {engine_name} {version}") if not background: - existing_task.join() # If download task exists, wait until it's done downloading + existing_task.join() return None elif not engine_to_download.downloader(): logger.warning("No valid downloader for this engine. Please update this software manually.") return None - elif not cls.engines_path: + elif not self.engines_path: raise FileNotFoundError("Engines path must be set before requesting downloads") thread = EngineDownloadWorker(engine_name, version, system_os, cpu) - cls.download_tasks.append(thread) + self.download_tasks.append(thread) thread.start() if background: return thread thread.join() - found_engine = cls.is_version_installed(engine_name, version, system_os, cpu, ignore_system) # Check that engine downloaded + found_engine = self.is_version_installed(engine_name, version, system_os, cpu, ignore_system) if not found_engine: logger.error(f"Error downloading {engine_name}") return found_engine - @classmethod - def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None): + def _delete_engine_download(self, engine_name, version, system_os=None, cpu=None): logger.info(f"Requested deletion of engine: {engine_name}-{version}") - found = cls.is_version_installed(engine_name, version, system_os, cpu) - if found and found['type'] == 'managed': # don't delete system installs - # find the root directory of the engine executable + found = self.is_version_installed(engine_name, version, system_os, cpu) + if found and found['type'] == 'managed': root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']]) remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name) - # delete the file path logger.info(f"Deleting engine at path: {remove_path}") shutil.rmtree(remove_path, ignore_errors=False) logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted") return True - elif found: # these are managed by the system / user. Don't delete these. + elif found: logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.') else: logger.error(f"Cannot find engine: {engine_name}-{version}") @@ -350,52 +247,16 @@ class EngineManager: # --- Background Tasks --- - @classmethod - def active_downloads(cls) -> list: - """Get list of currently active download tasks. + def _active_downloads(self) -> list: + return [x for x in self.download_tasks if x.is_alive()] - Returns: - list: List of active EngineDownloadWorker threads. - """ - return [x for x in cls.download_tasks if x.is_alive()] + def _create_worker(self, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None): + worker_class = self.engine_class_with_name(engine_name).worker_class() - @classmethod - def create_worker(cls, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None): - """ - Create and return a worker instance for a specific engine. - - This resolves the appropriate engine binary/path for the requested engine and version, - downloading the engine if necessary (when a specific version is requested and not found - locally). The returned worker is constructed with string paths for compatibility with - worker implementations that expect `str` rather than `Path`. - - Args: - engine_name: The engine name used to resolve an engine class and its worker. - input_path: Path to the input file/folder for the worker to process. - output_path: Path where the worker should write output. - engine_version: Optional engine version to use. If `None` or `'latest'`, the newest - installed version is used. If a specific version is provided and not installed, - the engine will be downloaded. - args: Optional arguments passed through to the worker (engine-specific). - parent: Optional Qt/GUI parent object passed through to the worker constructor. - name: Optional name/label passed through to the worker constructor. - - Returns: - An instance of the engine-specific worker class. - - Raises: - FileNotFoundError: If no versions of the engine are installed, if the requested - version cannot be found or downloaded, or if the engine path cannot be resolved. - """ - - worker_class = cls.engine_class_with_name(engine_name).worker_class() - - # check to make sure we have versions installed - all_versions = cls.all_version_data_for_engine(engine_name) + all_versions = self.all_version_data_for_engine(engine_name) if not all_versions: raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines") - # Find the path to the requested engine version or use default engine_path = None if engine_version and engine_version != 'latest': for ver in all_versions: @@ -403,9 +264,8 @@ class EngineManager: engine_path = ver['path'] break - # Download the required engine if not found locally if not engine_path: - download_result = cls.download_engine(engine_name, engine_version) + download_result = self.download_engine(engine_name, engine_version) if not download_result: raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}") engine_path = download_result['path'] @@ -420,28 +280,109 @@ class EngineManager: return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args, parent=parent, name=name) + # --- Forwarders for backward compatibility --- + + @classmethod + def engine_class_for_project_path(cls, path): + if cls._default_instance is not None: + return cls._default_instance._engine_class_for_project_path(path) + + @classmethod + def engine_class_with_name(cls, engine_name): + if cls._default_instance is not None: + return cls._default_instance._engine_class_with_name(engine_name) + + @classmethod + def get_latest_engine_instance(cls, engine_class): + if cls._default_instance is not None: + return cls._default_instance._get_latest_engine_instance(engine_class) + + @classmethod + def get_installed_engine_data(cls, filter_name=None, include_corrupt=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._get_installed_engine_data(filter_name, include_corrupt, ignore_system) + return [] + + @classmethod + def update_all_engines(cls): + if cls._default_instance is not None: + cls._default_instance._update_all_engines() + + @classmethod + def all_version_data_for_engine(cls, engine_name, include_corrupt=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._all_version_data_for_engine(engine_name, include_corrupt, ignore_system) + return [] + + @classmethod + def newest_installed_engine_data(cls, engine_name, system_os=None, cpu=None, ignore_system=None): + if cls._default_instance is not None: + return cls._default_instance._newest_installed_engine_data(engine_name, system_os, cpu, ignore_system) + return [] + + @classmethod + def is_version_installed(cls, engine_name, version, system_os=None, cpu=None, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._is_version_installed(engine_name, version, system_os, cpu, ignore_system) + return False + + @classmethod + def version_is_available_to_download(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._version_is_available_to_download(engine_name, version, system_os, cpu) + return None + + @classmethod + def find_most_recent_version(cls, engine_name, system_os=None, cpu=None, lts_only=False): + if cls._default_instance is not None: + return cls._default_instance._find_most_recent_version(engine_name, system_os, cpu, lts_only) + return {} + + @classmethod + def is_engine_update_available(cls, engine_class, ignore_system_installs=False): + if cls._default_instance is not None: + return cls._default_instance._is_engine_update_available(engine_class, ignore_system_installs) + return None + + @classmethod + def downloadable_engines(cls): + if cls._default_instance is not None: + return cls._default_instance._downloadable_engines() + return [] + + @classmethod + def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._get_existing_download_task(engine_name, version, system_os, cpu) + return None + + @classmethod + def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): + if cls._default_instance is not None: + return cls._default_instance._download_engine(engine_name, version, system_os, cpu, background, ignore_system) + return None + + @classmethod + def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None): + if cls._default_instance is not None: + return cls._default_instance._delete_engine_download(engine_name, version, system_os, cpu) + return False + + @classmethod + def active_downloads(cls): + if cls._default_instance is not None: + return cls._default_instance._active_downloads() + return [] + + @classmethod + def create_worker(cls, engine_name, input_path, output_path, engine_version=None, args=None, parent=None, name=None): + if cls._default_instance is not None: + return cls._default_instance._create_worker(engine_name, input_path, output_path, engine_version, args, parent, name) + raise RuntimeError("EngineManager is not initialized") + class EngineDownloadWorker(threading.Thread): - """A thread worker for downloading a specific version of a rendering engine. - - This class handles the process of downloading a rendering engine in a separate thread, - ensuring that the download process does not block the main application. - - Attributes: - engine (str): The name of the rendering engine to download. - version (str): The version of the rendering engine to download. - system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type. - cpu (str, optional): Requested CPU architecture. Defaults to system CPU type. - """ def __init__(self, engine, version, system_os=None, cpu=None): - """Initialize download worker for specific engine version. - - Args: - engine: Name of engine to download. - version: Version of engine to download. - system_os: Target operating system (defaults to current). - cpu: Target CPU architecture (defaults to current). - """ super().__init__() self.engine = engine self.version = version @@ -450,35 +391,27 @@ class EngineDownloadWorker(threading.Thread): self.percent_complete = 0 def _update_progress(self, current_progress): - """Update download progress. - - Args: - current_progress: Current download progress percentage (0-100). - """ self.percent_complete = current_progress -def run(self): - """Execute the download process. + def run(self): + try: + existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, + ignore_system=True) + if existing_download: + logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") + return existing_download - Checks if engine version already exists, then downloads if not found. - Handles cleanup and error reporting. - """ - try: - existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, - ignore_system=True) - if existing_download: - logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") - return existing_download - - # Get the appropriate downloader class based on the engine type - downloader = EngineManager.engine_class_with_name(self.engine).downloader() - downloader.download_engine( self.version, download_location=EngineManager.engines_path, - system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) - except Exception as e: - logger.error(f"Error in download worker: {e}") - finally: - # remove itself from the downloader list - EngineManager.download_tasks.remove(self) + downloader = EngineManager.engine_class_with_name(self.engine).downloader() + downloader.download_engine(self.version, download_location=EngineManager.engines_path, + system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) + except Exception as e: + logger.error(f"Error in download worker: {e}") + finally: + try: + if EngineManager._default_instance is not None: + EngineManager._default_instance.download_tasks.remove(self) + except ValueError: + pass if __name__ == '__main__': diff --git a/src/render_queue.py b/src/render_queue.py index b94612f..beb41d5 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -1,12 +1,13 @@ import logging +import threading from collections import Counter from datetime import datetime from pathlib import Path -from typing import List, Dict, Any, Optional +from typing import Any, Dict, List, Optional from pubsub import pub from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm.exc import DetachedInstanceError from src.engines.core.base_worker import Base, BaseRenderWorker @@ -25,182 +26,274 @@ class JobNotFoundError(Exception): class RenderQueue: - engine: Optional[create_engine] = None - session: Optional[sessionmaker] = None - job_queue: List[BaseRenderWorker] = [] - maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} - last_saved_counts: Dict[str, int] = {} - is_running: bool = False + _default_instance: Optional['RenderQueue'] = None + + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + pass # no class-level attributes to sync + + def __init__(self) -> None: + self.engine: Optional[create_engine] = None + self.session: Optional[Session] = None + self.job_queue: List[BaseRenderWorker] = [] + self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} + self.last_saved_counts: Dict[str, int] = {} + self.is_running: bool = False + self._lock = threading.Lock() # -------------------------------------------- # Render Queue Evaluation: # -------------------------------------------- - @classmethod - def start(cls): - """Start evaluating the render queue""" + def _start(self) -> None: logger.debug("Starting render queue updates") - cls.is_running = True - cls.evaluate_queue() + self.is_running = True + self.evaluate_queue() - @classmethod - def evaluate_queue(cls): + def _evaluate_queue(self) -> None: try: - not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) + not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) for job in not_started: - if cls.is_available_for_job(job.engine_name, job.priority): - cls.start_job(job) + if self.is_available_for_job(job.engine_name, job.priority): + self.start_job(job) - scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) + scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): logger.debug(f"Starting scheduled job: {job}") - cls.start_job(job) + self.start_job(job) - if cls.last_saved_counts != cls.job_counts(): - cls.save_state() + if self.last_saved_counts != self.job_counts(): + self.save_state() except DetachedInstanceError: pass - @classmethod - def __local_job_status_changed(cls, job_id, old_status, new_status): - render_job = RenderQueue.job_with_id(job_id, none_ok=True) - if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet + def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: + render_job = self.job_with_id(job_id, none_ok=True) + if render_job and self.is_running: logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}") - RenderQueue.evaluate_queue() + self.evaluate_queue() - @classmethod - def stop(cls): + def _stop(self) -> None: logger.debug("Stopping render queue updates") - cls.is_running = False + self.is_running = False # -------------------------------------------- # Fetch Jobs: # -------------------------------------------- - @classmethod - def all_jobs(cls): - return cls.job_queue + def _all_jobs(self) -> List[BaseRenderWorker]: + return self.job_queue - @classmethod - def running_jobs(cls): - return cls.jobs_with_status(RenderStatus.RUNNING) + def _running_jobs(self) -> List[BaseRenderWorker]: + return self.jobs_with_status(RenderStatus.RUNNING) - @classmethod - def pending_jobs(cls): - pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) - pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) - return pending_jobs + def _pending_jobs(self) -> List[BaseRenderWorker]: + pending = self.jobs_with_status(RenderStatus.NOT_STARTED) + pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED)) + return pending - @classmethod - def jobs_with_status(cls, status, priority_sorted=False): - found_jobs = [x for x in cls.all_jobs() if x.status == status] + def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]: + found_jobs = [x for x in self.all_jobs() if x.status == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs - @classmethod - def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) + def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]: + found_job = next((x for x in self.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job - @classmethod - def job_counts(cls): - job_counts = {} - for job_status in RenderStatus: - job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts + def _job_counts(self) -> Dict[str, int]: + counts = Counter(x.status for x in self.all_jobs()) + return {s.value: counts.get(s, 0) for s in RenderStatus} # -------------------------------------------- # Startup / Shutdown: # -------------------------------------------- - @classmethod - def load_state(cls, database_directory: Path): - if not cls.engine: - cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") - Base.metadata.create_all(cls.engine) - cls.session = sessionmaker(bind=cls.engine)() - cls.job_queue = cls.session.query(BaseRenderWorker).all() - pub.subscribe(cls.__local_job_status_changed, 'status_change') + def _load_state(self, database_directory: Path) -> None: + self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") + Base.metadata.create_all(self.engine) + self.session = sessionmaker(bind=self.engine)() + from src.engines.core.base_worker import BaseRenderWorker + self.job_queue = self.session.query(BaseRenderWorker).all() + pub.subscribe(self._local_job_status_changed, 'status_change') - @classmethod - def save_state(cls): - cls.session.commit() + def _save_state(self) -> None: + if self.session: + self.session.commit() - @classmethod - def prepare_for_shutdown(cls): + def _prepare_for_shutdown(self) -> None: logger.debug("Closing session") - cls.stop() - running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs - _ = [cls.cancel_job(job) for job in running_jobs] - cls.save_state() - cls.session.close() + self.stop() + running_jobs = self.jobs_with_status(RenderStatus.RUNNING) + for job in running_jobs: + self.cancel_job(job) + self.save_state() + if self.session: + self.session.close() # -------------------------------------------- # Renderer Availability: # -------------------------------------------- - @classmethod - def renderer_instances(cls): - all_instances = [x.engine_name for x in cls.running_jobs()] + def renderer_instances(self) -> Counter: + all_instances = [x.engine_name for x in self.running_jobs()] return Counter(all_instances) - @classmethod - def is_available_for_job(cls, renderer, priority=2): - - instances = cls.renderer_instances() - higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] - max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1) - maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances + def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool: + instances = self.renderer_instances() + higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority] + max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1) + maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs # -------------------------------------------- # Job Lifecycle Management: # -------------------------------------------- - @classmethod - def add_to_render_queue(cls, render_job, force_start=False): + def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None: logger.info(f"Adding job to render queue: {render_job}") - cls.job_queue.append(render_job) - if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): - cls.start_job(render_job) - cls.session.add(render_job) - cls.save_state() - if cls.is_running: - cls.evaluate_queue() + with self._lock: + self.job_queue.append(render_job) + if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): + self.start_job(render_job) + self.session.add(render_job) + self.save_state() + if self.is_running: + self.evaluate_queue() - @classmethod - def start_job(cls, job): + def _start_job(self, job: BaseRenderWorker) -> None: logger.info(f'Starting job: {job}') job.start() - cls.save_state() + self.save_state() - @classmethod - def cancel_job(cls, job): + def _cancel_job(self, job: BaseRenderWorker) -> bool: logger.info(f'Cancelling job: {job}') job.stop() return job.status == RenderStatus.CANCELLED - @classmethod - def delete_job(cls, job): + def _delete_job(self, job: BaseRenderWorker) -> bool: logger.info(f"Deleting job: {job}") - job.stop() - cls.job_queue.remove(job) - cls.session.delete(job) - cls.save_state() + with self._lock: + job.stop() + self.job_queue.remove(job) + self.session.delete(job) + self.save_state() return True # -------------------------------------------- # Miscellaneous: # -------------------------------------------- + def _clear_history(self) -> None: + for job in list(self.all_jobs()): + if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR): + self.delete_job(job) + self.save_state() + + # --- Forwarders for backward compatibility --- + + @classmethod + def start(cls): + if cls._default_instance is not None: + cls._default_instance._start() + + @classmethod + def evaluate_queue(cls): + if cls._default_instance is not None: + cls._default_instance._evaluate_queue() + + @classmethod + def stop(cls): + if cls._default_instance is not None: + cls._default_instance._stop() + + @classmethod + def all_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance.job_queue + return [] + + @classmethod + def running_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance._running_jobs() + return [] + + @classmethod + def pending_jobs(cls): + if cls._default_instance is not None: + return cls._default_instance._pending_jobs() + return [] + + @classmethod + def jobs_with_status(cls, status, priority_sorted=False): + if cls._default_instance is not None: + return cls._default_instance._jobs_with_status(status, priority_sorted) + return [] + + @classmethod + def job_with_id(cls, job_id, none_ok=False): + if cls._default_instance is not None: + return cls._default_instance._job_with_id(job_id, none_ok) + if not none_ok: + raise JobNotFoundError(job_id) + return None + + @classmethod + def job_counts(cls): + if cls._default_instance is not None: + return cls._default_instance._job_counts() + return {} + + @classmethod + def load_state(cls, database_directory): + if cls._default_instance is not None: + cls._default_instance._load_state(database_directory) + + @classmethod + def save_state(cls): + if cls._default_instance is not None: + cls._default_instance._save_state() + + @classmethod + def prepare_for_shutdown(cls): + if cls._default_instance is not None: + cls._default_instance._prepare_for_shutdown() + + @classmethod + def is_available_for_job(cls, renderer, priority=2): + if cls._default_instance is not None: + return cls._default_instance._is_available_for_job(renderer, priority) + return True + + @classmethod + def add_to_render_queue(cls, render_job, force_start=False): + if cls._default_instance is not None: + cls._default_instance._add_to_render_queue(render_job, force_start) + + @classmethod + def start_job(cls, job): + if cls._default_instance is not None: + cls._default_instance._start_job(job) + + @classmethod + def cancel_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._cancel_job(job) + return False + + @classmethod + def delete_job(cls, job): + if cls._default_instance is not None: + return cls._default_instance._delete_job(job) + return False + @classmethod def clear_history(cls): - to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] - for job_to_remove in to_remove: - cls.delete_job(job_to_remove) - cls.save_state() + if cls._default_instance is not None: + cls._default_instance._clear_history() diff --git a/src/ui/add_job_window.py b/src/ui/add_job_window.py index ae9da14..1e73688 100644 --- a/src/ui/add_job_window.py +++ b/src/ui/add_job_window.py @@ -1,8 +1,11 @@ +import logging import socket from pathlib import Path import psutil from PyQt6.QtCore import QThread, pyqtSignal, Qt, pyqtSlot + +logger = logging.getLogger(__name__) from PyQt6.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QSpinBox, QComboBox, QGroupBox, QCheckBox, QProgressBar, QPlainTextEdit, QDoubleSpinBox, QMessageBox, QListWidget, QListWidgetItem, @@ -306,7 +309,7 @@ class NewRenderJobForm(QWidget): def update_job_count(self, changed_item=None): checked = 1 - if self.cameras_group.enabled: + if self.cameras_group.isEnabled(): checked = 0 total = self.cameras_list.count() @@ -463,8 +466,8 @@ class NewRenderJobForm(QWidget): text_box = QLineEdit() h_layout.addWidget(text_box) self.engine_options_layout.addLayout(h_layout) - except AttributeError: - pass + except AttributeError as e: + logger.error(f"AttributeError in post_get_project_info_update: {e}") def toggle_engine_enablement(self, enabled=False): """Toggle on/off all the render settings""" diff --git a/src/ui/main_window.py b/src/ui/main_window.py index a4029c3..daa3871 100644 --- a/src/ui/main_window.py +++ b/src/ui/main_window.py @@ -247,9 +247,8 @@ class MainWindow(QMainWindow): # Update server information display self.update_server_info_display(new_hostname) - except AttributeError: - # Handle cases where the server list view might not be properly initialized - pass + except AttributeError as e: + logger.error(f"AttributeError in server_picked: {e}") def update_server_info_display(self, hostname): """Updates the server information section of the UI.""" @@ -405,7 +404,8 @@ class MainWindow(QMainWindow): id_item = self.job_list_view.item(selected_row.row(), 0) job_ids.append(id_item.text()) return job_ids - except AttributeError: + except AttributeError as e: + logger.error(f"AttributeError in selected_job_ids: {e}") return [] diff --git a/src/utilities/config.py b/src/utilities/config.py index ffb4ab6..c152f3a 100644 --- a/src/utilities/config.py +++ b/src/utilities/config.py @@ -1,12 +1,24 @@ import os from pathlib import Path +from typing import Optional import yaml from src.utilities.misc_helper import current_system_os, copy_directory_contents +_CONFIG_ATTRS = [ + 'upload_folder', 'update_engines_on_launch', 'max_content_path', + 'server_log_level', 'log_buffer_length', 'worker_process_timeout', + 'flask_log_level', 'flask_debug_enable', 'queue_eval_seconds', + 'port_number', 'enable_split_jobs', 'download_timeout_seconds', +] + class Config: - # Initialize class variables with default values + _default_instance: Optional['Config'] = None + + # Class-level defaults — mutated by _sync_class() so existing + # callers (Config.upload_folder) continue to work during the + # migration to instance-based access. upload_folder = "~/zordon-uploads/" update_engines_on_launch = True max_content_path = 100000000 @@ -20,23 +32,30 @@ class Config: enable_split_jobs = True download_timeout_seconds = 120 - @classmethod - def load_config(cls, config_path): + def __init__(self) -> None: + for attr in _CONFIG_ATTRS: + setattr(self, attr, getattr(Config, attr)) + + def load(self, config_path: Path) -> None: with open(config_path, 'r') as ymlfile: cfg = yaml.safe_load(ymlfile) + for attr in _CONFIG_ATTRS: + if attr in cfg: + setattr(self, attr, cfg[attr]) + self.upload_folder = str(Path(self.upload_folder).expanduser()) - cls.upload_folder = str(Path(cfg.get('upload_folder', cls.upload_folder)).expanduser()) - cls.update_engines_on_launch = cfg.get('update_engines_on_launch', cls.update_engines_on_launch) - cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) - cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) - cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) - cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout) - cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) - cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) - cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds) - cls.port_number = cfg.get('port_number', cls.port_number) - cls.enable_split_jobs = cfg.get('enable_split_jobs', cls.enable_split_jobs) - cls.download_timeout_seconds = cfg.get('download_timeout_seconds', cls.download_timeout_seconds) + @classmethod + def _sync_class(cls) -> None: + if cls._default_instance is not None: + for attr in _CONFIG_ATTRS: + setattr(cls, attr, getattr(cls._default_instance, attr)) + + @classmethod + def load_config(cls, config_path: Path) -> None: + instance = Config() + instance.load(config_path) + cls._default_instance = instance + cls._sync_class() @classmethod def config_dir(cls) -> Path: diff --git a/src/utilities/zeroconf_server.py b/src/utilities/zeroconf_server.py index e1d526e..fe6d659 100644 --- a/src/utilities/zeroconf_server.py +++ b/src/utilities/zeroconf_server.py @@ -1,5 +1,6 @@ import logging import socket +from typing import Dict, List, Optional from pubsub import pub from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \ @@ -9,105 +10,144 @@ logger = logging.getLogger() class ZeroconfServer: - service_type = None - server_name = None - server_port = None - server_ip = None - zeroconf = Zeroconf() - service_info = None - client_cache = {} - properties = {} + _default_instance: Optional['ZeroconfServer'] = None - @classmethod - def configure(cls, service_type, server_name, server_port): - cls.service_type = service_type - cls.server_name = server_name - cls.server_port = server_port - try: # Stop any previously running instances + service_type: Optional[str] = None + server_name: Optional[str] = None + server_port: Optional[int] = None + properties: Dict = {} + + def __init__(self) -> None: + self.service_type: Optional[str] = None + self.server_name: Optional[str] = None + self.server_port: Optional[int] = None + self.server_ip: Optional[str] = None + self.zeroconf: Zeroconf = Zeroconf() + self.service_info: Optional[ServiceInfo] = None + self.client_cache: Dict = {} + self.properties: Dict = {} + + def _configure(self, service_type: str, server_name: str, server_port: int) -> None: + self.service_type = service_type + self.server_name = server_name + self.server_port = server_port + try: socket.gethostbyname(socket.gethostname()) except socket.gaierror: - cls.stop() + self.stop() - @classmethod - def start(cls, listen_only=False): - if not cls.service_type: + def _start(self, listen_only: bool = False) -> None: + if not self.service_type: raise RuntimeError("The 'configure' method must be run before starting the zeroconf server") - elif not listen_only: - logger.debug(f"Starting zeroconf service") - cls._register_service() + if not listen_only: + logger.debug("Starting zeroconf service") + self._register_service() else: - logger.debug(f"Starting zeroconf service - Listen only mode") - cls._browse_services() + logger.debug("Starting zeroconf service - Listen only mode") + self._browse_services() - @classmethod - def stop(cls): + def _stop(self) -> None: logger.debug("Stopping zeroconf service") - cls._unregister_service() - cls.zeroconf.close() + self._unregister_service() + if self.zeroconf: + self.zeroconf.close() - @classmethod - def _register_service(cls): + def _register_service(self) -> None: try: - cls.server_ip = socket.gethostbyname(socket.gethostname()) + self.server_ip = socket.gethostbyname(socket.gethostname()) info = ServiceInfo( - cls.service_type, - f"{cls.server_name}.{cls.service_type}", - addresses=[socket.inet_aton(cls.server_ip)], - port=cls.server_port, - properties=cls.properties, + self.service_type, + f"{self.server_name}.{self.service_type}", + addresses=[socket.inet_aton(self.server_ip)], + port=self.server_port, + properties=self.properties, ) - cls.service_info = info - cls.zeroconf.register_service(info) - logger.info(f"Registered zeroconf service: {cls.service_info.name}") + self.service_info = info + self.zeroconf.register_service(info) + logger.info(f"Registered zeroconf service: {self.service_info.name}") except (NonUniqueNameException, socket.gaierror) as e: logger.error(f"Error establishing zeroconf: {e}") - @classmethod - def _unregister_service(cls): - if cls.service_info: - cls.zeroconf.unregister_service(cls.service_info) - logger.info(f"Unregistered zeroconf service: {cls.service_info.name}") - cls.service_info = None + def _unregister_service(self) -> None: + if self.service_info: + self.zeroconf.unregister_service(self.service_info) + logger.info(f"Unregistered zeroconf service: {self.service_info.name}") + self.service_info = None - @classmethod - def _browse_services(cls): - browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered]) - browser.is_alive() + def _browse_services(self) -> None: + ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered]) - @classmethod - def _on_service_discovered(cls, zeroconf, service_type, name, state_change): + def _on_service_discovered(self, zeroconf, service_type, name, state_change) -> None: try: info = zeroconf.get_service_info(service_type, name) - hostname = name.split(f'.{cls.service_type}')[0] + hostname = name.split(f'.{self.service_type}')[0] logger.debug(f"Zeroconf: {hostname} {state_change}") - if service_type == cls.service_type: - if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated: - cls.client_cache[hostname] = info + if service_type == self.service_type: + if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated): + self.client_cache[hostname] = info else: - cls.client_cache.pop(hostname) + self.client_cache.pop(hostname, None) pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change) except NotRunningException: pass @classmethod - def found_hostnames(cls): + def _sync_class(cls) -> None: + if cls._default_instance is not None: + inst = cls._default_instance + cls.service_type = inst.service_type + cls.server_name = inst.server_name + cls.server_port = inst.server_port + cls.server_ip = inst.server_ip + cls.properties = inst.properties + + def _found_hostnames(self) -> List[str]: local_hostname = socket.gethostname() def sort_key(hostname): - # Return 0 if it's the local hostname so it comes first, else return 1 return False if hostname == local_hostname else True - # Sort the list with the local hostname first - sorted_hostnames = sorted(cls.client_cache.keys(), key=sort_key) + sorted_hostnames = sorted(self.client_cache.keys(), key=sort_key) return sorted_hostnames + def _get_hostname_properties(self, hostname: str) -> Dict: + server_info = self.client_cache.get(hostname) + if server_info is None: + return {} + decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.properties.items()} + return decoded_server_info + + # --- Forwarders for backward compatibility --- + + @classmethod + def configure(cls, service_type, server_name, server_port): + if cls._default_instance is not None: + cls._default_instance._configure(service_type, server_name, server_port) + cls._sync_class() + + @classmethod + def start(cls, listen_only=False): + if cls._default_instance is not None: + cls._default_instance._start(listen_only) + + @classmethod + def stop(cls): + if cls._default_instance is not None: + cls._default_instance._stop() + + @classmethod + def found_hostnames(cls): + if cls._default_instance is not None: + return cls._default_instance._found_hostnames() + return [] + @classmethod def get_hostname_properties(cls, hostname): - server_info = cls.client_cache.get(hostname).properties - decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.items()} - return decoded_server_info + if cls._default_instance is not None: + return cls._default_instance._get_hostname_properties(hostname) + return {} # Example usage: