7 Commits

Author SHA1 Message Date
brett 24eb7b5616 Add Unit Tests (#132)
* Add tests and new github workflow

* Add new unit tests

* Add Github CI workflow

* Workflow fix

* Add pytest install to workflow file

* More CI / test updates

* More test cleanup

* Whitespace cleanup and link complexity override

* More whitespace cleanup

* Make lint less strict

* More lint tweaks
2026-06-06 00:02:01 -05:00
Brett Williams 7bf5fb554e Merge remote-tracking branch 'origin/master'
# Conflicts:
#	server.py
2026-06-05 22:05:42 -05:00
brett fa4a97f6fa Refactor: ApplicationContext DI wiring (#131)
* refactor: wire all services through ApplicationContext

- Created src/application_context.py as DI container with TYPE_CHECKING imports
- server.py now instantiates all services in dependency order via ApplicationContext
- Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix
  to avoid shadowing by same-named @classmethod forwarders
- ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to
  configure forwarder, direct _configure/_start calls during wiring
- Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact
- RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained
- DistributedJobManager: subscribe_to_listener moved to __init__

* fix: greedy regex in get_render_devices swallows BlenderKit log output

Changed regex from greedy [\s\S]* to non-greedy .*? with re.DOTALL
so it stops at the first ] (the end of the GPU data JSON array)
instead of matching through timestamped log lines like
[19:36:22.109, __init__.py:2881] that contain trailing brackets.

* fix: AttributeError on .enabled in update_job_count prevents options from rendering

* refactor: log silent AttributeError catches, add _sync_class to remaining services, drop dead ctx slot
2026-06-05 22:01:20 -05:00
Brett Williams b7ba1201e4 refactor: log silent AttributeError catches, add _sync_class to remaining services, drop dead ctx slot 2026-06-05 21:27:02 -05:00
Brett Williams c592236c98 fix: AttributeError on .enabled in update_job_count prevents options from rendering 2026-06-05 20:12:14 -05:00
Brett Williams 0c62f454a7 fix: greedy regex in get_render_devices swallows BlenderKit log output
Changed regex from greedy [\s\S]* to non-greedy .*? with re.DOTALL
so it stops at the first ] (the end of the GPU data JSON array)
instead of matching through timestamped log lines like
[19:36:22.109, __init__.py:2881] that contain trailing brackets.
2026-06-05 19:37:12 -05:00
Brett Williams 552c791207 refactor: wire all services through ApplicationContext
- Created src/application_context.py as DI container with TYPE_CHECKING imports
- server.py now instantiates all services in dependency order via ApplicationContext
- Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix
  to avoid shadowing by same-named @classmethod forwarders
- ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to
  configure forwarder, direct _configure/_start calls during wiring
- Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact
- RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained
- DistributedJobManager: subscribe_to_listener moved to __init__
2026-06-05 05:34:32 -05:00
26 changed files with 2224 additions and 749 deletions
+45
View File
@@ -0,0 +1,45 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install system deps
run: sudo apt-get update && sudo apt-get install -y libxcb-cursor0 libxcb-xinerama0 blender
- name: Install Python deps
run: |
pip install -r requirements.txt
pip install pytest flake8 pylint
- name: Test
run: python -m pytest tests/ --ignore=tests/job_creation_tests.py -v
- name: Lint — bugs (flake8)
# Fails on syntax errors only. Ignored: ~100+ pre-existing
# style issues (E/W), star-import false positives (F403/F405),
# unused imports (F401), unused vars (F841), f-string debt (F541).
run: flake8 --select=E9 --exclude=src/engines/aerender,build,dist,.git,__pycache__,venv --max-line-length=127 --max-complexity=35
- name: Lint — style (flake8)
# Reports style issues but never fails the build.
# TODO: resolve ~100+ pre-existing style issues
run: flake8 --exit-zero --exclude=src/engines/aerender,build,dist,.git,__pycache__,venv --max-line-length=127 --max-complexity=35
- name: Lint (pylint)
# Reports all issues but never fails the build.
# TODO: resolve pre-existing debt (current score 7.73/10)
run: pylint src/ --max-line-length=120 --ignore-paths=^src/engines/aerender/ --fail-under=0
-40
View File
@@ -1,40 +0,0 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python
name: Python application
on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
permissions:
contents: read
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
continue-on-error: false
# - name: Test with pytest
# run: |
# pytest
+2
View File
@@ -1,2 +1,4 @@
[pytest]
norecursedirs = src/engines/aerender .git build dist *.egg venv .venv env .env __pycache__ .pytest_cache
testpaths = tests
python_files = test_*.py job_creation_tests.py
+49 -19
View File
@@ -11,6 +11,7 @@ from src.api.api_server import API_VERSION
from src.api.api_server import start_api_server
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.application_context import ApplicationContext
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
@@ -26,21 +27,46 @@ logger = logging.getLogger()
class ZordonServer:
def __init__(self):
self.ctx = ApplicationContext()
# setup logging
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper())
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Load Config YAML
# ---- Bootstrap Config ----
Config.setup_config_dir()
config_path = Path(Config.config_dir()) / "config.yaml"
Config.load_config(config_path)
self.ctx.config = Config()
self.ctx.config.load(config_path)
Config._default_instance = self.ctx.config
Config._sync_class()
# configure default paths
EngineManager.engines_path = str(Path(Config.upload_folder).expanduser()/ "engines")
os.makedirs(EngineManager.engines_path, exist_ok=True)
PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews"
# ---- Engine Manager ----
self.ctx.engine_manager = EngineManager()
self.ctx.engine_manager.engines_path = Path(Config.upload_folder).expanduser() / "engines"
os.makedirs(self.ctx.engine_manager.engines_path, exist_ok=True)
EngineManager._default_instance = self.ctx.engine_manager
EngineManager._sync_class()
# ---- Preview Manager ----
self.ctx.preview_manager = PreviewManager()
self.ctx.preview_manager.storage_path = Path(Config.upload_folder).expanduser() / "previews"
PreviewManager._default_instance = self.ctx.preview_manager
PreviewManager._sync_class()
# ---- Render Queue ----
self.ctx.render_queue = RenderQueue()
self.ctx.render_queue.load_state(database_directory=Path(Config.upload_folder).expanduser())
RenderQueue._default_instance = self.ctx.render_queue
RenderQueue._sync_class()
# ---- Distributed Job Manager ----
self.ctx.distributed_job_manager = DistributedJobManager()
self.ctx.distributed_job_manager.subscribe_to_listener()
DistributedJobManager._default_instance = self.ctx.distributed_job_manager
DistributedJobManager._sync_class()
self.api_server = None
self.server_hostname: str = socket.gethostname()
@@ -73,10 +99,8 @@ class ZordonServer:
logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=Path(Config.upload_folder).expanduser())
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
# update hostname
self.server_hostname = socket.gethostname()
@@ -87,16 +111,21 @@ class ZordonServer:
self.api_server.start()
# start zeroconf server
ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
'system_cpu_brand': current_system_cpu_brand(),
'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version(),
'system_memory': round(psutil.virtual_memory().total / (1024**3)), # in GB
'gpu_info': get_gpu_info(),
'api_version': API_VERSION}
ZeroconfServer.start()
ctx = self.ctx
ctx.zeroconf_server = ZeroconfServer()
ctx.zeroconf_server._configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ctx.zeroconf_server.properties = {'system_cpu': current_system_cpu(),
'system_cpu_brand': current_system_cpu_brand(),
'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version(),
'system_memory': round(psutil.virtual_memory().total / (1024**3)),
'gpu_info': get_gpu_info(),
'api_version': API_VERSION}
ZeroconfServer._default_instance = ctx.zeroconf_server
ZeroconfServer._sync_class()
ctx.zeroconf_server._start()
logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}")
RenderQueue.start() # Start evaluating the render queue
@@ -112,6 +141,7 @@ class ZordonServer:
logger.exception(f"Exception during prepare for shutdown: {e}")
logger.info(f"{APP_NAME} Render Server has shut down")
if __name__ == '__main__':
server = ZordonServer()
try:
+1 -1
View File
@@ -11,7 +11,7 @@ import requests
from tqdm import tqdm
from werkzeug.utils import secure_filename
from distributed_job_manager import DistributedJobManager
from src.distributed_job_manager import DistributedJobManager
logger = logging.getLogger()
+41 -46
View File
@@ -3,6 +3,7 @@ import os
import subprocess
import threading
from pathlib import Path
from typing import Dict, Optional
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
@@ -12,20 +13,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web
class PreviewManager:
"""Manages generation, storage, and retrieval of preview images and videos for rendering jobs."""
_default_instance: Optional['PreviewManager'] = None
storage_path = None
_running_jobs = {}
storage_path: Optional[str] = None
def __init__(self) -> None:
self.storage_path = None
self._running_jobs: Dict = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480):
"""Generates image and video previews for a given job.
def _sync_class(cls) -> None:
if cls._default_instance is not None:
cls.storage_path = cls._default_instance.storage_path
Args:
job: The job object containing file information.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
max_width (int): Maximum width for the preview images/videos. Defaults to 480.
"""
def _generate_job_preview_worker(self, job, replace_existing=False, max_width=480):
# Determine best source file to use for thumbs
job_file_list = job.file_list()
@@ -41,8 +42,8 @@ class PreviewManager:
logger.warning(f"No valid image or video files found in files from job: {job}")
return
os.makedirs(cls.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
os.makedirs(self.storage_path, exist_ok=True)
base_path = os.path.join(self.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg'
@@ -73,41 +74,23 @@ class PreviewManager:
except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}")
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
"""Updates previews for a given job by starting a background thread.
Args:
job: The job object.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False.
timeout (float): Timeout for waiting, if applicable.
"""
job_thread = cls._running_jobs.get(job.id)
def _update_previews_for_job(self, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = self._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
return
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
else:
job_thread = threading.Thread(target=self._generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
self._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
"""Retrieves previews for a given job.
def _get_previews_for_job(self, job):
Args:
job: The job object.
Returns:
dict: A dictionary containing preview information.
"""
results = {}
try:
directory_path = Path(cls.storage_path)
directory_path = Path(self.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
for preview_filename in preview_files_for_job:
@@ -125,14 +108,8 @@ class PreviewManager:
pass
return results
@classmethod
def delete_previews_for_job(cls, job):
"""Deletes all previews associated with a given job.
Args:
job: The job object.
"""
all_previews = cls.get_previews_for_job(job)
def _delete_previews_for_job(self, job):
all_previews = self.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:
try:
@@ -140,3 +117,21 @@ class PreviewManager:
os.remove(preview['filename'])
except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")
# --- Forwarders for backward compatibility ---
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
if cls._default_instance is not None:
cls._default_instance._update_previews_for_job(job, replace_existing, wait_until_completion, timeout)
@classmethod
def get_previews_for_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._get_previews_for_job(job)
return {}
@classmethod
def delete_previews_for_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._delete_previews_for_job(job)
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.api.preview_manager import PreviewManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.zeroconf_server import ZeroconfServer
class ApplicationContext:
"""Holds all service instances. Single source of truth for wiring."""
def __init__(self) -> None:
self.config: Optional[Config] = None
self.engine_manager: Optional[EngineManager] = None
self.preview_manager: Optional[PreviewManager] = None
self.zeroconf_server: Optional[ZeroconfServer] = None
self.render_queue: Optional[RenderQueue] = None
self.distributed_job_manager: Optional[DistributedJobManager] = None
+71 -144
View File
@@ -3,8 +3,9 @@ import os
import socket
import threading
import time
from typing import Optional
from pathlib import Path
from click import Path
from plyer import notification
from pubsub import pub
@@ -21,47 +22,32 @@ logger = logging.getLogger()
class DistributedJobManager:
def __init__(self):
pass
_default_instance: Optional['DistributedJobManager'] = None
@classmethod
def subscribe_to_listener(cls):
"""
Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message.
This should be called once, typically during the initialization phase.
"""
pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
def _sync_class(cls) -> None:
if cls._default_instance is not None:
pass # no class-level attributes to sync
@classmethod
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
def __init__(self) -> None:
self.background_worker: Optional[threading.Thread] = None
"""
Responds to the 'frame_complete' pubsub message for local jobs.
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
def _subscribe_to_listener(self) -> None:
pub.subscribe(self._local_job_status_changed, 'status_change')
pub.subscribe(self._local_job_frame_complete, 'frame_complete')
def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None:
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue
if not render_job:
return
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews)
self._job_update_shared(render_job, replace_existing_previews)
@classmethod
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
def _job_update_shared(self, render_job, replace_existing_previews=False) -> None:
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try:
@@ -70,57 +56,41 @@ class DistributedJobManager:
except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod
def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str):
"""
Responds to the 'status_change' pubsub message for local jobs.
If it's a child job, it notifies the parent job about the status change.
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'status_change' pubsub message.
"""
def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs created but not yet added to queue
if not render_job:
return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
# Handle children
if render_job.children:
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR):
for child in render_job.children:
child_id, child_hostname = child.split('@')
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
# UI Notifications
try:
if new_status == RenderStatus.COMPLETED:
logger.debug("Show render complete notification")
notification.notify(
title='Render Job Complete',
message=f'{render_job.name} completed succesfully',
timeout=10 # Display time in seconds
timeout=10
)
elif new_status == RenderStatus.ERROR:
logger.debug("Show render error notification")
notification.notify(
title='Render Job Failed',
message=f'{render_job.name} failed rendering',
timeout=10 # Display time in seconds
timeout=10
)
elif new_status == RenderStatus.RUNNING:
logger.debug("Show render started notification")
notification.notify(
title='Render Job Started',
message=f'{render_job.name} started rendering',
timeout=10 # Display time in seconds
timeout=10
)
except Exception as e:
logger.debug(f"Unable to show UI notification: {e}")
@@ -129,30 +99,15 @@ class DistributedJobManager:
# Create Job
# --------------------------------------------
@classmethod
def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path):
"""Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new
render job.
Args:
new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
loaded_project_local_path (Path): The local path to the loaded project.
Returns:
worker: Created job worker
"""
# get new output path in output_dir
def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path):
output_path = new_job_attributes.get('output_path')
output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem
# Prepare output path
output_dir = loaded_project_local_path.parent.parent / "output"
output_path = output_dir / output_filename
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'],
input_path=loaded_project_local_path,
output_path=output_path,
@@ -160,16 +115,15 @@ class DistributedJobManager:
args=new_job_attributes.get('args', {}),
parent=new_job_attributes.get('parent'),
name=new_job_attributes.get('name'))
worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary?
worker.status = new_job_attributes.get("initial_status", worker.status)
worker.priority = int(new_job_attributes.get('priority', worker.priority))
worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame))
worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname()
# determine if we can / should split the job
if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
else:
worker.status = RenderStatus.NOT_STARTED
@@ -182,15 +136,7 @@ class DistributedJobManager:
# Handling Subjobs
# --------------------------------------------
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data: dict):
"""Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): Subjob data sent from the remote server.
"""
def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None:
subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id']
subjob_hostname = subjob_data['hostname']
@@ -206,19 +152,10 @@ class DistributedJobManager:
if subjob_data['status'] == 'completed' and download_success:
local_job.children[subjob_key]['download_status'] = 'completed'
@classmethod
def wait_for_subjobs(cls, parent_job):
"""Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs
when they are completed.
Args:
parent_job: Worker object that has child jobs
Returns:
"""
def _wait_for_subjobs(self, parent_job) -> None:
logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED)
def subjobs_not_downloaded():
return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or
@@ -230,21 +167,17 @@ class DistributedJobManager:
sleep_counter = 0
while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS:
if sleep_counter % server_delay == 0: # only ping servers every x seconds
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
if sleep_counter % server_delay == 0:
for child_key in subjobs_not_downloaded():
subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1]
# Fetch info from server and handle failing case
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
if not subjob_data:
logger.warning(f"No response from {subjob_hostname}")
# timeout / missing server situations
parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}'
continue
# Update parent job cache but keep the download status
download_status = parent_job.children[child_key].get('download_status', None)
parent_job.children[child_key] = subjob_data
parent_job.children[child_key]['download_status'] = download_status
@@ -254,8 +187,7 @@ class DistributedJobManager:
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
logger.debug(status_msg)
# Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
if download_status is None and subjob_data.get('file_count') and status in statuses_to_download:
try:
download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname)
parent_job.children[child_key]['download_status'] = 'complete'
@@ -263,7 +195,6 @@ class DistributedJobManager:
logger.error(f"Error downloading missing frames from subjob: {e}")
parent_job.children[child_key]['download_status'] = 'error: {}'
# Any finished jobs not successfully downloaded at this point are skipped
if parent_job.children[child_key].get('download_status', None) is None and \
status in statuses_to_download:
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
@@ -274,42 +205,22 @@ class DistributedJobManager:
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(1)
sleep_counter += 1
else: # exit the loop
else:
parent_job.status = RenderStatus.RUNNING
# --------------------------------------------
# Creating Subjobs
# --------------------------------------------
@classmethod
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
# todo: I don't love this
def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None:
parent_worker.status = RenderStatus.CONFIGURING
cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes,
project_path, system_os))
cls.background_worker.start()
self.background_worker = threading.Thread(target=self.split_into_subjobs, args=(
parent_worker, new_job_attributes, project_path, system_os))
self.background_worker.start()
@classmethod
def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None):
"""
Splits a job into subjobs and distributes them among available servers.
This method checks the availability of servers, distributes the work among them, and creates subjobs on each
server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a
subjob.
Args:
parent_worker (Worker): The parent job what we're creating the subjobs for.
new_job_attributes (dict): Dict of desired attributes for new job (frame count, engine, output path, etc)
project_path (str): The path to the project.
system_os (str, optional): Required OS. Default is any.
specific_servers (list, optional): List of specific servers to split work between. Defaults to all found.
"""
# Check availability
available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.engine_name,
system_os)
# skip if theres no external servers found
def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None:
available_servers = specific_servers if specific_servers else self.find_available_servers(
parent_worker.engine_name, system_os)
external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname]
if not external_servers:
parent_worker.status = RenderStatus.NOT_STARTED
@@ -318,34 +229,29 @@ class DistributedJobManager:
logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}")
all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
# Prep and submit these sub-jobs
logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}")
try:
for subjob_data in all_subjob_server_data:
subjob_hostname = subjob_data['hostname']
post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
parent_worker)
if not post_results.ok:
ValueError(f"Failed to create subjob on {subjob_hostname}")
# save child info
submission_results = post_results.json()[0]
child_key = f"{submission_results['id']}@{subjob_hostname}"
parent_worker.children[child_key] = submission_results
# start subjobs
logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully")
parent_worker.name = f"{parent_worker.name} (Parent)"
parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts
parent_worker.status = RenderStatus.NOT_STARTED
except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs")
RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
@staticmethod
def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker):
"""Convenience method to create subjobs for a parent worker"""
def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker):
subjob = new_job_attributes.copy()
subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}"
@@ -364,14 +270,7 @@ class DistributedJobManager:
@staticmethod
def find_available_servers(engine_name: str, system_os=None):
"""
Scan the Zeroconf network for currently available render servers supporting a specific engine.
:param engine_name: str, The engine type to search for
:param system_os: str, Restrict results to servers running a specific OS
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
from api.api_server import API_VERSION
from src.api.api_server import API_VERSION
found_available_servers = []
for hostname in ZeroconfServer.found_hostnames():
host_properties = ZeroconfServer.get_hostname_properties(hostname)
@@ -383,6 +282,34 @@ class DistributedJobManager:
return found_available_servers
# --- Forwarders for backward compatibility ---
@classmethod
def subscribe_to_listener(cls):
if cls._default_instance is not None:
cls._default_instance._subscribe_to_listener()
@classmethod
def create_render_job(cls, new_job_attributes, loaded_project_local_path):
if cls._default_instance is not None:
return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path)
raise RuntimeError("DistributedJobManager is not initialized")
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data):
if cls._default_instance is not None:
cls._default_instance._handle_subjob_update_notification(local_job, subjob_data)
@classmethod
def wait_for_subjobs(cls, parent_job):
if cls._default_instance is not None:
cls._default_instance._wait_for_subjobs(parent_job)
@classmethod
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
if cls._default_instance is not None:
cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+2 -2
View File
@@ -173,7 +173,7 @@ class Blender(BaseRenderEngine):
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')
results = self.run_python_script(script_path=script_path)
output = results.stdout.decode()
match = re.search(r"GPU DATA:(\[[\s\S]*\])", output)
match = re.search(r"GPU DATA:(\[.*?\])", output, re.DOTALL)
if match:
gpu_data_json = match.group(1)
gpus_info = json.loads(gpu_data_json)
@@ -193,5 +193,5 @@ class Blender(BaseRenderEngine):
if __name__ == "__main__":
x = Blender().get_render_devices()
x = Blender().system_info()
print(x)
+185 -251
View File
@@ -20,128 +20,79 @@ class EngineManager:
if possible.
"""
_default_instance: Optional['EngineManager'] = None
engines_path: Optional[str] = None
download_tasks: List[Any] = []
def __init__(self) -> None:
self.engines_path: Optional[str] = None
self.download_tasks: List[Any] = []
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
cls.engines_path = cls._default_instance.engines_path
cls.download_tasks = cls._default_instance.download_tasks
@staticmethod
def supported_engines() -> list[type[BaseRenderEngine]]:
"""Return list of supported engine classes.
Returns:
List[Type[BaseRenderEngine]]: List of available engine classes.
"""
return ENGINE_CLASSES
# --- Installed Engines ---
@classmethod
def engine_class_for_project_path(cls, path: str) -> Type[BaseRenderEngine]:
"""Find engine class that can handle the given project file.
Args:
path: Path to project file.
Returns:
Type[BaseRenderEngine]: Engine class that can handle the file.
"""
def _engine_class_for_project_path(self, path: str) -> Type[BaseRenderEngine]:
_, extension = os.path.splitext(path)
extension = extension.lower().strip('.')
for engine_class in cls.supported_engines():
engine = cls.get_latest_engine_instance(engine_class)
for engine_class in self.supported_engines():
engine = self.get_latest_engine_instance(engine_class)
if extension in engine.supported_extensions():
return engine_class
undefined_renderer_support = [x for x in cls.supported_engines() if not cls.get_latest_engine_instance(x).supported_extensions()]
undefined_renderer_support = [x for x in self.supported_engines() if not self.get_latest_engine_instance(x).supported_extensions()]
return undefined_renderer_support[0]
@classmethod
def engine_class_with_name(cls, engine_name: str) -> Optional[Type[BaseRenderEngine]]:
"""Find engine class by name.
Args:
engine_name: Name of engine to find.
Returns:
Optional[Type[BaseRenderEngine]]: Engine class if found, None otherwise.
"""
for obj in cls.supported_engines():
def _engine_class_with_name(self, engine_name: str) -> Optional[Type[BaseRenderEngine]]:
for obj in self.supported_engines():
if obj.name().lower() == engine_name.lower():
return obj
return None
@classmethod
def get_latest_engine_instance(cls, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine:
"""Create instance of latest installed engine version.
Args:
engine_class: Engine class to instantiate.
Returns:
BaseRenderEngine: Instance of engine with latest version.
"""
newest = cls.newest_installed_engine_data(engine_class.name())
def _get_latest_engine_instance(self, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine:
newest = self.newest_installed_engine_data(engine_class.name())
engine = engine_class(newest["path"])
return engine
@classmethod
def get_installed_engine_data(cls, filter_name: Optional[str] = None, include_corrupt: bool = False,
def _get_installed_engine_data(self, filter_name: Optional[str] = None, include_corrupt: bool = False,
ignore_system: bool = False) -> List[Dict[str, Any]]:
"""Get data about installed render engines.
Args:
filter_name: Optional engine name to filter by.
include_corrupt: Whether to include potentially corrupted installations.
ignore_system: Whether to ignore system-installed engines.
Returns:
List[Dict[str, Any]]: List of installed engine data.
Raises:
FileNotFoundError: If engines path is not set.
"""
if not cls.engines_path:
if not self.engines_path:
raise FileNotFoundError("Engine path is not set")
# Parse downloaded engine directory
results = []
try:
all_items = os.listdir(cls.engines_path)
all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))]
keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary
all_items = os.listdir(self.engines_path)
all_directories = [item for item in all_items if os.path.isdir(os.path.join(self.engines_path, item))]
keys = ["engine", "version", "system_os", "cpu"]
for directory in all_directories:
# Split directory name into segments
segments = directory.split('-')
# Create a dictionary mapping keys to corresponding segments
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
result_dict['type'] = 'managed'
# Initialize binary_name with engine name
binary_name = result_dict['engine'].lower()
# Determine the correct binary name based on the engine and system_os
eng = cls.engine_class_with_name(result_dict['engine'])
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
eng = self.engine_class_with_name(result_dict['engine'])
if eng:
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
# Find the path to the binary file
search_root = Path(cls.engines_path) / directory
search_root = Path(self.engines_path) / directory
match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None)
path = str(match) if match else None
result_dict['path'] = path
# fetch version number from binary - helps detect corrupted downloads - disabled due to perf issues
# binary_version = eng(path).version()
# if not binary_version:
# logger.warning(f"Possible corrupt {eng.name()} {result_dict['version']} install detected: {path}")
# if not include_corrupt:
# continue
# result_dict['version'] = binary_version or 'error'
# Add the result dictionary to results if it matches the filter_name or if no filter is applied
if not filter_name or filter_name == result_dict['engine']:
results.append(result_dict)
except FileNotFoundError as e:
logger.warning(f"Cannot find local engines download directory: {e}")
# add system installs to this list - use bg thread because it can be slow
def fetch_engine_details(eng, include_corrupt=False):
version = eng().version()
if not version and not include_corrupt:
@@ -160,7 +111,7 @@ class EngineManager:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(fetch_engine_details, eng, include_corrupt): eng.name()
for eng in cls.supported_engines()
for eng in self.supported_engines()
if eng.default_engine_path() and (not filter_name or filter_name == eng.name())
}
@@ -173,96 +124,55 @@ class EngineManager:
# --- Check for Updates ---
@classmethod
def update_all_engines(cls) -> None:
"""Check for and download updates for all downloadable engines."""
for engine in cls.downloadable_engines():
update_available = cls.is_engine_update_available(engine)
def _update_all_engines(self) -> None:
for engine in self.downloadable_engines():
update_available = self.is_engine_update_available(engine)
if update_available:
update_available['name'] = engine.name()
cls.download_engine(engine.name(), update_available['version'], background=True)
self.download_engine(engine.name(), update_available['version'], background=True)
@classmethod
def all_version_data_for_engine(cls, engine_name:str, include_corrupt=False, ignore_system=False) -> list:
"""Get all version data for a specific engine.
Args:
engine_name: Name of engine to query.
include_corrupt: Whether to include corrupt installations.
ignore_system: Whether to ignore system installations.
Returns:
list: Sorted list of engine version data (newest first).
"""
versions = cls.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
def _all_version_data_for_engine(self, engine_name: str, include_corrupt=False, ignore_system=False) -> list:
versions = self.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True)
return sorted_versions
@classmethod
def newest_installed_engine_data(cls, engine_name:str, system_os=None, cpu=None, ignore_system=None) -> list:
"""Get newest installed engine data for specific platform.
Args:
engine_name: Name of engine to query.
system_os: Operating system to filter by (defaults to current).
cpu: CPU architecture to filter by (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
list: Newest engine data or empty list if not found.
"""
def _newest_installed_engine_data(self, engine_name: str, system_os=None, cpu=None, ignore_system=None) -> list:
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
filtered = [x for x in cls.all_version_data_for_engine(engine_name, ignore_system=ignore_system)
filtered = [x for x in self.all_version_data_for_engine(engine_name, ignore_system=ignore_system)
if x['system_os'] == system_os and x['cpu'] == cpu]
return filtered[0]
except IndexError:
logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}")
return []
@classmethod
def is_version_installed(cls, engine_name:str, version:str, system_os=None, cpu=None, ignore_system=False):
"""Check if specific engine version is installed.
Args:
engine_name: Name of engine to check.
version: Version string to check.
system_os: Operating system to check (defaults to current).
cpu: CPU architecture to check (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
Engine data if found, False otherwise.
"""
def _is_version_installed(self, engine_name: str, version: str, system_os=None, cpu=None, ignore_system=False):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
filtered = [x for x in cls.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if
filtered = [x for x in self.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if
x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
return filtered[0] if filtered else False
@classmethod
def version_is_available_to_download(cls, engine_name:str, version, system_os=None, cpu=None):
def _version_is_available_to_download(self, engine_name: str, version, system_os=None, cpu=None):
try:
downloader = cls.engine_class_with_name(engine_name).downloader()
downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu)
except Exception as e:
logger.debug(f"Exception in version_is_available_to_download: {e}")
return None
@classmethod
def find_most_recent_version(cls, engine_name:str, system_os=None, cpu=None, lts_only=False) -> dict:
def _find_most_recent_version(self, engine_name: str, system_os=None, cpu=None, lts_only=False) -> dict:
try:
downloader = cls.engine_class_with_name(engine_name).downloader()
downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.find_most_recent_version(system_os=system_os, cpu=cpu)
except Exception as e:
logger.debug(f"Exception in find_most_recent_version: {e}")
return {}
@classmethod
def is_engine_update_available(cls, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
def _is_engine_update_available(self, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
logger.debug(f"Checking for updates to {engine_class.name()}")
latest_version = engine_class.downloader().find_most_recent_version()
@@ -271,7 +181,7 @@ class EngineManager:
return None
version_num = latest_version.get('version')
if cls.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs):
if self.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs):
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
return None
@@ -279,18 +189,11 @@ class EngineManager:
# --- Downloads ---
@classmethod
def downloadable_engines(cls):
"""Get list of engines that support downloading.
def _downloadable_engines(self):
return [engine for engine in self.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
Returns:
List[Type[BaseRenderEngine]]: Engines with downloader capability.
"""
return [engine for engine in cls.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
for task in cls.download_tasks:
def _get_existing_download_task(self, engine_name, version, system_os=None, cpu=None):
for task in self.download_tasks:
task_parts = task.name.split('-')
task_engine, task_version, task_system_os, task_cpu = task_parts[:4]
@@ -299,50 +202,45 @@ class EngineManager:
return task
return None
@classmethod
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
engine_to_download = cls.engine_class_with_name(engine_name)
existing_task = cls.get_existing_download_task(engine_name, version, system_os, cpu)
def _download_engine(self, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
engine_to_download = self.engine_class_with_name(engine_name)
existing_task = self.get_existing_download_task(engine_name, version, system_os, cpu)
if existing_task:
logger.debug(f"Already downloading {engine_name} {version}")
if not background:
existing_task.join() # If download task exists, wait until it's done downloading
existing_task.join()
return None
elif not engine_to_download.downloader():
logger.warning("No valid downloader for this engine. Please update this software manually.")
return None
elif not cls.engines_path:
elif not self.engines_path:
raise FileNotFoundError("Engines path must be set before requesting downloads")
thread = EngineDownloadWorker(engine_name, version, system_os, cpu)
cls.download_tasks.append(thread)
self.download_tasks.append(thread)
thread.start()
if background:
return thread
thread.join()
found_engine = cls.is_version_installed(engine_name, version, system_os, cpu, ignore_system) # Check that engine downloaded
found_engine = self.is_version_installed(engine_name, version, system_os, cpu, ignore_system)
if not found_engine:
logger.error(f"Error downloading {engine_name}")
return found_engine
@classmethod
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
def _delete_engine_download(self, engine_name, version, system_os=None, cpu=None):
logger.info(f"Requested deletion of engine: {engine_name}-{version}")
found = cls.is_version_installed(engine_name, version, system_os, cpu)
if found and found['type'] == 'managed': # don't delete system installs
# find the root directory of the engine executable
found = self.is_version_installed(engine_name, version, system_os, cpu)
if found and found['type'] == 'managed':
root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']])
remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name)
# delete the file path
logger.info(f"Deleting engine at path: {remove_path}")
shutil.rmtree(remove_path, ignore_errors=False)
logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted")
return True
elif found: # these are managed by the system / user. Don't delete these.
elif found:
logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.')
else:
logger.error(f"Cannot find engine: {engine_name}-{version}")
@@ -350,52 +248,16 @@ class EngineManager:
# --- Background Tasks ---
@classmethod
def active_downloads(cls) -> list:
"""Get list of currently active download tasks.
def _active_downloads(self) -> list:
return [x for x in self.download_tasks if x.is_alive()]
Returns:
list: List of active EngineDownloadWorker threads.
"""
return [x for x in cls.download_tasks if x.is_alive()]
def _create_worker(self, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
worker_class = self.engine_class_with_name(engine_name).worker_class()
@classmethod
def create_worker(cls, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
"""
Create and return a worker instance for a specific engine.
This resolves the appropriate engine binary/path for the requested engine and version,
downloading the engine if necessary (when a specific version is requested and not found
locally). The returned worker is constructed with string paths for compatibility with
worker implementations that expect `str` rather than `Path`.
Args:
engine_name: The engine name used to resolve an engine class and its worker.
input_path: Path to the input file/folder for the worker to process.
output_path: Path where the worker should write output.
engine_version: Optional engine version to use. If `None` or `'latest'`, the newest
installed version is used. If a specific version is provided and not installed,
the engine will be downloaded.
args: Optional arguments passed through to the worker (engine-specific).
parent: Optional Qt/GUI parent object passed through to the worker constructor.
name: Optional name/label passed through to the worker constructor.
Returns:
An instance of the engine-specific worker class.
Raises:
FileNotFoundError: If no versions of the engine are installed, if the requested
version cannot be found or downloaded, or if the engine path cannot be resolved.
"""
worker_class = cls.engine_class_with_name(engine_name).worker_class()
# check to make sure we have versions installed
all_versions = cls.all_version_data_for_engine(engine_name)
all_versions = self.all_version_data_for_engine(engine_name)
if not all_versions:
raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines")
# Find the path to the requested engine version or use default
engine_path = None
if engine_version and engine_version != 'latest':
for ver in all_versions:
@@ -403,9 +265,8 @@ class EngineManager:
engine_path = ver['path']
break
# Download the required engine if not found locally
if not engine_path:
download_result = cls.download_engine(engine_name, engine_version)
download_result = self.download_engine(engine_name, engine_version)
if not download_result:
raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}")
engine_path = download_result['path']
@@ -420,28 +281,109 @@ class EngineManager:
return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args,
parent=parent, name=name)
# --- Forwarders for backward compatibility ---
@classmethod
def engine_class_for_project_path(cls, path):
if cls._default_instance is not None:
return cls._default_instance._engine_class_for_project_path(path)
@classmethod
def engine_class_with_name(cls, engine_name):
if cls._default_instance is not None:
return cls._default_instance._engine_class_with_name(engine_name)
@classmethod
def get_latest_engine_instance(cls, engine_class):
if cls._default_instance is not None:
return cls._default_instance._get_latest_engine_instance(engine_class)
@classmethod
def get_installed_engine_data(cls, filter_name=None, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._get_installed_engine_data(filter_name, include_corrupt, ignore_system)
return []
@classmethod
def update_all_engines(cls):
if cls._default_instance is not None:
cls._default_instance._update_all_engines()
@classmethod
def all_version_data_for_engine(cls, engine_name, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._all_version_data_for_engine(engine_name, include_corrupt, ignore_system)
return []
@classmethod
def newest_installed_engine_data(cls, engine_name, system_os=None, cpu=None, ignore_system=None):
if cls._default_instance is not None:
return cls._default_instance._newest_installed_engine_data(engine_name, system_os, cpu, ignore_system)
return []
@classmethod
def is_version_installed(cls, engine_name, version, system_os=None, cpu=None, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._is_version_installed(engine_name, version, system_os, cpu, ignore_system)
return False
@classmethod
def version_is_available_to_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._version_is_available_to_download(engine_name, version, system_os, cpu)
return None
@classmethod
def find_most_recent_version(cls, engine_name, system_os=None, cpu=None, lts_only=False):
if cls._default_instance is not None:
return cls._default_instance._find_most_recent_version(engine_name, system_os, cpu, lts_only)
return {}
@classmethod
def is_engine_update_available(cls, engine_class, ignore_system_installs=False):
if cls._default_instance is not None:
return cls._default_instance._is_engine_update_available(engine_class, ignore_system_installs)
return None
@classmethod
def downloadable_engines(cls):
if cls._default_instance is not None:
return cls._default_instance._downloadable_engines()
return []
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._get_existing_download_task(engine_name, version, system_os, cpu)
return None
@classmethod
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._download_engine(engine_name, version, system_os, cpu, background, ignore_system)
return None
@classmethod
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._delete_engine_download(engine_name, version, system_os, cpu)
return False
@classmethod
def active_downloads(cls):
if cls._default_instance is not None:
return cls._default_instance._active_downloads()
return []
@classmethod
def create_worker(cls, engine_name, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
if cls._default_instance is not None:
return cls._default_instance._create_worker(engine_name, input_path, output_path, engine_version, args, parent, name)
raise RuntimeError("EngineManager is not initialized")
class EngineDownloadWorker(threading.Thread):
"""A thread worker for downloading a specific version of a rendering engine.
This class handles the process of downloading a rendering engine in a separate thread,
ensuring that the download process does not block the main application.
Attributes:
engine (str): The name of the rendering engine to download.
version (str): The version of the rendering engine to download.
system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type.
cpu (str, optional): Requested CPU architecture. Defaults to system CPU type.
"""
def __init__(self, engine, version, system_os=None, cpu=None):
"""Initialize download worker for specific engine version.
Args:
engine: Name of engine to download.
version: Version of engine to download.
system_os: Target operating system (defaults to current).
cpu: Target CPU architecture (defaults to current).
"""
super().__init__()
self.engine = engine
self.version = version
@@ -450,35 +392,27 @@ class EngineDownloadWorker(threading.Thread):
self.percent_complete = 0
def _update_progress(self, current_progress):
"""Update download progress.
Args:
current_progress: Current download progress percentage (0-100).
"""
self.percent_complete = current_progress
def run(self):
"""Execute the download process.
def run(self):
try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True)
if existing_download:
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download
Checks if engine version already exists, then downloads if not found.
Handles cleanup and error reporting.
"""
try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True)
if existing_download:
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download
# Get the appropriate downloader class based on the engine type
downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine( self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e:
logger.error(f"Error in download worker: {e}")
finally:
# remove itself from the downloader list
EngineManager.download_tasks.remove(self)
downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine(self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e:
logger.error(f"Error in download worker: {e}")
finally:
try:
if EngineManager._default_instance is not None:
EngineManager._default_instance.download_tasks.remove(self)
except ValueError:
pass
if __name__ == '__main__':
+199 -106
View File
@@ -1,12 +1,13 @@
import logging
import threading
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from typing import Any, Dict, List, Optional
from pubsub import pub
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from src.engines.core.base_worker import Base, BaseRenderWorker
@@ -25,182 +26,274 @@ class JobNotFoundError(Exception):
class RenderQueue:
engine: Optional[create_engine] = None
session: Optional[sessionmaker] = None
job_queue: List[BaseRenderWorker] = []
maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts: Dict[str, int] = {}
is_running: bool = False
_default_instance: Optional['RenderQueue'] = None
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
pass # no class-level attributes to sync
def __init__(self) -> None:
self.engine: Optional[create_engine] = None
self.session: Optional[Session] = None
self.job_queue: List[BaseRenderWorker] = []
self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
self.last_saved_counts: Dict[str, int] = {}
self.is_running: bool = False
self._lock = threading.Lock()
# --------------------------------------------
# Render Queue Evaluation:
# --------------------------------------------
@classmethod
def start(cls):
"""Start evaluating the render queue"""
def _start(self) -> None:
logger.debug("Starting render queue updates")
cls.is_running = True
cls.evaluate_queue()
self.is_running = True
self.evaluate_queue()
@classmethod
def evaluate_queue(cls):
def _evaluate_queue(self) -> None:
try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.engine_name, job.priority):
cls.start_job(job)
if self.is_available_for_job(job.engine_name, job.priority):
self.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
self.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
if self.last_saved_counts != self.job_counts():
self.save_state()
except DetachedInstanceError:
pass
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
render_job = self.job_with_id(job_id, none_ok=True)
if render_job and self.is_running:
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue()
self.evaluate_queue()
@classmethod
def stop(cls):
def _stop(self) -> None:
logger.debug("Stopping render queue updates")
cls.is_running = False
self.is_running = False
# --------------------------------------------
# Fetch Jobs:
# --------------------------------------------
@classmethod
def all_jobs(cls):
return cls.job_queue
def _all_jobs(self) -> List[BaseRenderWorker]:
return self.job_queue
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
def _running_jobs(self) -> List[BaseRenderWorker]:
return self.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
def _pending_jobs(self) -> List[BaseRenderWorker]:
pending = self.jobs_with_status(RenderStatus.NOT_STARTED)
pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED))
return pending
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.status == status]
def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]:
found_jobs = [x for x in self.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]:
found_job = next((x for x in self.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
def _job_counts(self) -> Dict[str, int]:
counts = Counter(x.status for x in self.all_jobs())
return {s.value: counts.get(s, 0) for s in RenderStatus}
# --------------------------------------------
# Startup / Shutdown:
# --------------------------------------------
@classmethod
def load_state(cls, database_directory: Path):
if not cls.engine:
cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(cls.engine)
cls.session = sessionmaker(bind=cls.engine)()
cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
def _load_state(self, database_directory: Path) -> None:
self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(self.engine)
self.session = sessionmaker(bind=self.engine)()
from src.engines.core.base_worker import BaseRenderWorker
self.job_queue = self.session.query(BaseRenderWorker).all()
pub.subscribe(self._local_job_status_changed, 'status_change')
@classmethod
def save_state(cls):
cls.session.commit()
def _save_state(self) -> None:
if self.session:
self.session.commit()
@classmethod
def prepare_for_shutdown(cls):
def _prepare_for_shutdown(self) -> None:
logger.debug("Closing session")
cls.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
_ = [cls.cancel_job(job) for job in running_jobs]
cls.save_state()
cls.session.close()
self.stop()
running_jobs = self.jobs_with_status(RenderStatus.RUNNING)
for job in running_jobs:
self.cancel_job(job)
self.save_state()
if self.session:
self.session.close()
# --------------------------------------------
# Renderer Availability:
# --------------------------------------------
@classmethod
def renderer_instances(cls):
all_instances = [x.engine_name for x in cls.running_jobs()]
def renderer_instances(self) -> Counter:
all_instances = [x.engine_name for x in self.running_jobs()]
return Counter(all_instances)
@classmethod
def is_available_for_job(cls, renderer, priority=2):
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances
def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool:
instances = self.renderer_instances()
higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority]
max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs
# --------------------------------------------
# Job Lifecycle Management:
# --------------------------------------------
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None:
logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job)
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
if cls.is_running:
cls.evaluate_queue()
with self._lock:
self.job_queue.append(render_job)
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
self.start_job(render_job)
self.session.add(render_job)
self.save_state()
if self.is_running:
self.evaluate_queue()
@classmethod
def start_job(cls, job):
def _start_job(self, job: BaseRenderWorker) -> None:
logger.info(f'Starting job: {job}')
job.start()
cls.save_state()
self.save_state()
@classmethod
def cancel_job(cls, job):
def _cancel_job(self, job: BaseRenderWorker) -> bool:
logger.info(f'Cancelling job: {job}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
def _delete_job(self, job: BaseRenderWorker) -> bool:
logger.info(f"Deleting job: {job}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
with self._lock:
job.stop()
self.job_queue.remove(job)
self.session.delete(job)
self.save_state()
return True
# --------------------------------------------
# Miscellaneous:
# --------------------------------------------
def _clear_history(self) -> None:
for job in list(self.all_jobs()):
if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR):
self.delete_job(job)
self.save_state()
# --- Forwarders for backward compatibility ---
@classmethod
def start(cls):
if cls._default_instance is not None:
cls._default_instance._start()
@classmethod
def evaluate_queue(cls):
if cls._default_instance is not None:
cls._default_instance._evaluate_queue()
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def all_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance.job_queue
return []
@classmethod
def running_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._running_jobs()
return []
@classmethod
def pending_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._pending_jobs()
return []
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
if cls._default_instance is not None:
return cls._default_instance._jobs_with_status(status, priority_sorted)
return []
@classmethod
def job_with_id(cls, job_id, none_ok=False):
if cls._default_instance is not None:
return cls._default_instance._job_with_id(job_id, none_ok)
if not none_ok:
raise JobNotFoundError(job_id)
return None
@classmethod
def job_counts(cls):
if cls._default_instance is not None:
return cls._default_instance._job_counts()
return {}
@classmethod
def load_state(cls, database_directory):
if cls._default_instance is not None:
cls._default_instance._load_state(database_directory)
@classmethod
def save_state(cls):
if cls._default_instance is not None:
cls._default_instance._save_state()
@classmethod
def prepare_for_shutdown(cls):
if cls._default_instance is not None:
cls._default_instance._prepare_for_shutdown()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if cls._default_instance is not None:
return cls._default_instance._is_available_for_job(renderer, priority)
return True
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
if cls._default_instance is not None:
cls._default_instance._add_to_render_queue(render_job, force_start)
@classmethod
def start_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._start_job(job)
@classmethod
def cancel_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._cancel_job(job)
return False
@classmethod
def delete_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._delete_job(job)
return False
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
if cls._default_instance is not None:
cls._default_instance._clear_history()
+6 -3
View File
@@ -1,8 +1,11 @@
import logging
import socket
from pathlib import Path
import psutil
from PyQt6.QtCore import QThread, pyqtSignal, Qt, pyqtSlot
logger = logging.getLogger(__name__)
from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QSpinBox, QComboBox,
QGroupBox, QCheckBox, QProgressBar, QPlainTextEdit, QDoubleSpinBox, QMessageBox, QListWidget, QListWidgetItem,
@@ -306,7 +309,7 @@ class NewRenderJobForm(QWidget):
def update_job_count(self, changed_item=None):
checked = 1
if self.cameras_group.enabled:
if self.cameras_group.isEnabled():
checked = 0
total = self.cameras_list.count()
@@ -463,8 +466,8 @@ class NewRenderJobForm(QWidget):
text_box = QLineEdit()
h_layout.addWidget(text_box)
self.engine_options_layout.addLayout(h_layout)
except AttributeError:
pass
except AttributeError as e:
logger.error(f"AttributeError in post_get_project_info_update: {e}")
def toggle_engine_enablement(self, enabled=False):
"""Toggle on/off all the render settings"""
+4 -4
View File
@@ -247,9 +247,8 @@ class MainWindow(QMainWindow):
# Update server information display
self.update_server_info_display(new_hostname)
except AttributeError:
# Handle cases where the server list view might not be properly initialized
pass
except AttributeError as e:
logger.error(f"AttributeError in server_picked: {e}")
def update_server_info_display(self, hostname):
"""Updates the server information section of the UI."""
@@ -405,7 +404,8 @@ class MainWindow(QMainWindow):
id_item = self.job_list_view.item(selected_row.row(), 0)
job_ids.append(id_item.text())
return job_ids
except AttributeError:
except AttributeError as e:
logger.error(f"AttributeError in selected_job_ids: {e}")
return []
+34 -15
View File
@@ -1,12 +1,24 @@
import os
from pathlib import Path
from typing import Optional
import yaml
from src.utilities.misc_helper import current_system_os, copy_directory_contents
_CONFIG_ATTRS = [
'upload_folder', 'update_engines_on_launch', 'max_content_path',
'server_log_level', 'log_buffer_length', 'worker_process_timeout',
'flask_log_level', 'flask_debug_enable', 'queue_eval_seconds',
'port_number', 'enable_split_jobs', 'download_timeout_seconds',
]
class Config:
# Initialize class variables with default values
_default_instance: Optional['Config'] = None
# Class-level defaults — mutated by _sync_class() so existing
# callers (Config.upload_folder) continue to work during the
# migration to instance-based access.
upload_folder = "~/zordon-uploads/"
update_engines_on_launch = True
max_content_path = 100000000
@@ -20,23 +32,30 @@ class Config:
enable_split_jobs = True
download_timeout_seconds = 120
@classmethod
def load_config(cls, config_path):
def __init__(self) -> None:
for attr in _CONFIG_ATTRS:
setattr(self, attr, getattr(Config, attr))
def load(self, config_path: Path) -> None:
with open(config_path, 'r') as ymlfile:
cfg = yaml.safe_load(ymlfile)
for attr in _CONFIG_ATTRS:
if attr in cfg:
setattr(self, attr, cfg[attr])
self.upload_folder = str(Path(self.upload_folder).expanduser())
cls.upload_folder = str(Path(cfg.get('upload_folder', cls.upload_folder)).expanduser())
cls.update_engines_on_launch = cfg.get('update_engines_on_launch', cls.update_engines_on_launch)
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)
cls.port_number = cfg.get('port_number', cls.port_number)
cls.enable_split_jobs = cfg.get('enable_split_jobs', cls.enable_split_jobs)
cls.download_timeout_seconds = cfg.get('download_timeout_seconds', cls.download_timeout_seconds)
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
for attr in _CONFIG_ATTRS:
setattr(cls, attr, getattr(cls._default_instance, attr))
@classmethod
def load_config(cls, config_path: Path) -> None:
instance = Config()
instance.load(config_path)
cls._default_instance = instance
cls._sync_class()
@classmethod
def config_dir(cls) -> Path:
+42 -41
View File
@@ -340,55 +340,56 @@ def get_gpu_info() -> List[Dict[str, Any]]:
else: # Assume Linux or other
return get_linux_gpu_info()
COMMON_RESOLUTIONS = {
# SD
"SD_480p": (640, 480),
"NTSC_DVD": (720, 480),
"PAL_DVD": (720, 576),
# SD
"SD_480p": (640, 480),
"NTSC_DVD": (720, 480),
"PAL_DVD": (720, 576),
# HD
"HD_720p": (1280, 720),
"HD_900p": (1600, 900),
"HD_1080p": (1920, 1080),
# HD
"HD_720p": (1280, 720),
"HD_900p": (1600, 900),
"HD_1080p": (1920, 1080),
# Cinema / Film
"2K_DCI": (2048, 1080),
"4K_DCI": (4096, 2160),
# Cinema / Film
"2K_DCI": (2048, 1080),
"4K_DCI": (4096, 2160),
# UHD / Consumer
"UHD_4K": (3840, 2160),
"UHD_5K": (5120, 2880),
"UHD_8K": (7680, 4320),
# UHD / Consumer
"UHD_4K": (3840, 2160),
"UHD_5K": (5120, 2880),
"UHD_8K": (7680, 4320),
# Ultrawide / Aspect Variants
"UW_1080p": (2560, 1080),
"UW_1440p": (3440, 1440),
"UW_5K": (5120, 2160),
# Ultrawide / Aspect Variants
"UW_1080p": (2560, 1080),
"UW_1440p": (3440, 1440),
"UW_5K": (5120, 2160),
# Mobile / Social
"VERTICAL_1080x1920": (1080, 1920),
"SQUARE_1080": (1080, 1080),
# Mobile / Social
"VERTICAL_1080x1920": (1080, 1920),
"SQUARE_1080": (1080, 1080),
# Classic / Legacy
"VGA": (640, 480),
"SVGA": (800, 600),
"XGA": (1024, 768),
"WXGA": (1280, 800),
# Classic / Legacy
"VGA": (640, 480),
"SVGA": (800, 600),
"XGA": (1024, 768),
"WXGA": (1280, 800),
}
COMMON_FRAME_RATES = {
"23.976 (NTSC Film)": 23.976,
"24 (Cinema)": 24.0,
"25 (PAL)": 25.0,
"29.97 (NTSC)": 29.97,
"30": 30.0,
"48 (HFR Film)": 48.0,
"50 (PAL HFR)": 50.0,
"59.94": 59.94,
"60": 60.0,
"72": 72.0,
"90 (VR)": 90.0,
"120": 120.0,
"144 (Gaming)": 144.0,
"240 (HFR)": 240.0,
"23.976 (NTSC Film)": 23.976,
"24 (Cinema)": 24.0,
"25 (PAL)": 25.0,
"29.97 (NTSC)": 29.97,
"30": 30.0,
"48 (HFR Film)": 48.0,
"50 (PAL HFR)": 50.0,
"59.94": 59.94,
"60": 60.0,
"72": 72.0,
"90 (VR)": 90.0,
"120": 120.0,
"144 (Gaming)": 144.0,
"240 (HFR)": 240.0,
}
+102 -62
View File
@@ -1,5 +1,6 @@
import logging
import socket
from typing import Dict, List, Optional
from pubsub import pub
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
@@ -9,105 +10,144 @@ logger = logging.getLogger()
class ZeroconfServer:
service_type = None
server_name = None
server_port = None
server_ip = None
zeroconf = Zeroconf()
service_info = None
client_cache = {}
properties = {}
_default_instance: Optional['ZeroconfServer'] = None
@classmethod
def configure(cls, service_type, server_name, server_port):
cls.service_type = service_type
cls.server_name = server_name
cls.server_port = server_port
try: # Stop any previously running instances
service_type: Optional[str] = None
server_name: Optional[str] = None
server_port: Optional[int] = None
properties: Dict = {}
def __init__(self) -> None:
self.service_type: Optional[str] = None
self.server_name: Optional[str] = None
self.server_port: Optional[int] = None
self.server_ip: Optional[str] = None
self.zeroconf: Zeroconf = Zeroconf()
self.service_info: Optional[ServiceInfo] = None
self.client_cache: Dict = {}
self.properties: Dict = {}
def _configure(self, service_type: str, server_name: str, server_port: int) -> None:
self.service_type = service_type
self.server_name = server_name
self.server_port = server_port
try:
socket.gethostbyname(socket.gethostname())
except socket.gaierror:
cls.stop()
self.stop()
@classmethod
def start(cls, listen_only=False):
if not cls.service_type:
def _start(self, listen_only: bool = False) -> None:
if not self.service_type:
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
elif not listen_only:
logger.debug(f"Starting zeroconf service")
cls._register_service()
if not listen_only:
logger.debug("Starting zeroconf service")
self._register_service()
else:
logger.debug(f"Starting zeroconf service - Listen only mode")
cls._browse_services()
logger.debug("Starting zeroconf service - Listen only mode")
self._browse_services()
@classmethod
def stop(cls):
def _stop(self) -> None:
logger.debug("Stopping zeroconf service")
cls._unregister_service()
cls.zeroconf.close()
self._unregister_service()
if self.zeroconf:
self.zeroconf.close()
@classmethod
def _register_service(cls):
def _register_service(self) -> None:
try:
cls.server_ip = socket.gethostbyname(socket.gethostname())
self.server_ip = socket.gethostbyname(socket.gethostname())
info = ServiceInfo(
cls.service_type,
f"{cls.server_name}.{cls.service_type}",
addresses=[socket.inet_aton(cls.server_ip)],
port=cls.server_port,
properties=cls.properties,
self.service_type,
f"{self.server_name}.{self.service_type}",
addresses=[socket.inet_aton(self.server_ip)],
port=self.server_port,
properties=self.properties,
)
cls.service_info = info
cls.zeroconf.register_service(info)
logger.info(f"Registered zeroconf service: {cls.service_info.name}")
self.service_info = info
self.zeroconf.register_service(info)
logger.info(f"Registered zeroconf service: {self.service_info.name}")
except (NonUniqueNameException, socket.gaierror) as e:
logger.error(f"Error establishing zeroconf: {e}")
@classmethod
def _unregister_service(cls):
if cls.service_info:
cls.zeroconf.unregister_service(cls.service_info)
logger.info(f"Unregistered zeroconf service: {cls.service_info.name}")
cls.service_info = None
def _unregister_service(self) -> None:
if self.service_info:
self.zeroconf.unregister_service(self.service_info)
logger.info(f"Unregistered zeroconf service: {self.service_info.name}")
self.service_info = None
@classmethod
def _browse_services(cls):
browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered])
browser.is_alive()
def _browse_services(self) -> None:
ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered])
@classmethod
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
def _on_service_discovered(self, zeroconf, service_type, name, state_change) -> None:
try:
info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0]
hostname = name.split(f'.{self.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}")
if service_type == cls.service_type:
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
cls.client_cache[hostname] = info
if service_type == self.service_type:
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
self.client_cache[hostname] = info
else:
cls.client_cache.pop(hostname)
self.client_cache.pop(hostname, None)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
except NotRunningException:
pass
@classmethod
def found_hostnames(cls):
def _sync_class(cls) -> None:
if cls._default_instance is not None:
inst = cls._default_instance
cls.service_type = inst.service_type
cls.server_name = inst.server_name
cls.server_port = inst.server_port
cls.server_ip = inst.server_ip
cls.properties = inst.properties
def _found_hostnames(self) -> List[str]:
local_hostname = socket.gethostname()
def sort_key(hostname):
# Return 0 if it's the local hostname so it comes first, else return 1
return False if hostname == local_hostname else True
# Sort the list with the local hostname first
sorted_hostnames = sorted(cls.client_cache.keys(), key=sort_key)
sorted_hostnames = sorted(self.client_cache.keys(), key=sort_key)
return sorted_hostnames
def _get_hostname_properties(self, hostname: str) -> Dict:
server_info = self.client_cache.get(hostname)
if server_info is None:
return {}
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.properties.items()}
return decoded_server_info
# --- Forwarders for backward compatibility ---
@classmethod
def configure(cls, service_type, server_name, server_port):
if cls._default_instance is not None:
cls._default_instance._configure(service_type, server_name, server_port)
cls._sync_class()
@classmethod
def start(cls, listen_only=False):
if cls._default_instance is not None:
cls._default_instance._start(listen_only)
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def found_hostnames(cls):
if cls._default_instance is not None:
return cls._default_instance._found_hostnames()
return []
@classmethod
def get_hostname_properties(cls, hostname):
server_info = cls.client_cache.get(hostname).properties
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.items()}
return decoded_server_info
if cls._default_instance is not None:
return cls._default_instance._get_hostname_properties(hostname)
return {}
# Example usage:
View File
+134
View File
@@ -0,0 +1,134 @@
from unittest.mock import patch
from pathlib import Path
import pytest
from src.render_queue import RenderQueue
from src.engines.engine_manager import EngineManager
from src.api.preview_manager import PreviewManager
from src.utilities.config import Config
from src.distributed_job_manager import DistributedJobManager
from src.utilities.zeroconf_server import ZeroconfServer
# ---------------------------------------------------------------------------
# Global pubsub patch real pubsub can fire callbacks across tests.
# Each service module does `from pubsub import pub` at import time, so we
# must patch each module-level reference individually.
# ---------------------------------------------------------------------------
_PUBSUB_TARGETS = [
'pubsub.pub',
'src.render_queue.pub',
'src.distributed_job_manager.pub',
'src.utilities.zeroconf_server.pub',
'src.engines.core.base_worker.pub',
]
@pytest.fixture(autouse=True)
def _patch_pubsub():
mocks = {}
patchers = []
for target in _PUBSUB_TARGETS:
p = patch(target)
patchers.append(p)
mocks[target] = p.start()
yield mocks.get('pubsub.pub')
for p in reversed(patchers):
p.stop()
# ---------------------------------------------------------------------------
# Temp directory for file-system-dependent tests
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_workspace(tmp_path: Path) -> Path:
ws = tmp_path / 'zordon_test'
ws.mkdir()
return ws
# ---------------------------------------------------------------------------
# Config fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def config_instance(tmp_workspace: Path) -> Config:
orig = Config._default_instance
cfg = Config()
cfg.upload_folder = str(tmp_workspace / 'uploads')
Config._default_instance = cfg
Config._sync_class()
yield cfg
Config._default_instance = orig
Config._sync_class()
# ---------------------------------------------------------------------------
# EngineManager fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def engine_manager_instance(tmp_workspace: Path) -> EngineManager:
orig = EngineManager._default_instance
em = EngineManager()
em.engines_path = str(tmp_workspace / 'engines')
EngineManager._default_instance = em
EngineManager._sync_class()
yield em
EngineManager._default_instance = orig
EngineManager._sync_class()
# ---------------------------------------------------------------------------
# RenderQueue fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def render_queue_instance() -> RenderQueue:
orig = RenderQueue._default_instance
rq = RenderQueue()
RenderQueue._default_instance = rq
RenderQueue._sync_class()
yield rq
RenderQueue._default_instance = orig
RenderQueue._sync_class()
# ---------------------------------------------------------------------------
# DistributedJobManager fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def distributed_job_manager_instance() -> DistributedJobManager:
orig = DistributedJobManager._default_instance
djm = DistributedJobManager()
DistributedJobManager._default_instance = djm
DistributedJobManager._sync_class()
yield djm
DistributedJobManager._default_instance = orig
DistributedJobManager._sync_class()
# ---------------------------------------------------------------------------
# PreviewManager fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def preview_manager_instance(tmp_workspace: Path) -> PreviewManager:
orig = PreviewManager._default_instance
pm = PreviewManager()
pm.storage_path = str(tmp_workspace / 'previews')
PreviewManager._default_instance = pm
PreviewManager._sync_class()
yield pm
PreviewManager._default_instance = orig
PreviewManager._sync_class()
# ---------------------------------------------------------------------------
# ZeroconfServer fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def zeroconf_server_instance() -> ZeroconfServer:
orig = ZeroconfServer._default_instance
zs = ZeroconfServer()
ZeroconfServer._default_instance = zs
ZeroconfServer._sync_class()
yield zs
ZeroconfServer._default_instance = orig
ZeroconfServer._sync_class()
+85
View File
@@ -0,0 +1,85 @@
import logging
import os
import time
import unittest
from src.api.server_proxy import RenderServerProxy
logger = logging.getLogger(__name__)
SERVER_HOST = os.environ.get('ZORDON_TEST_HOST', '127.0.0.1')
SERVER_PORT = os.environ.get('ZORDON_TEST_PORT', '8080')
@unittest.skipIf(os.environ.get('ZORDON_SKIP_INTEGRATION'),
'set ZORDON_SKIP_INTEGRATION to skip integration tests')
class SubmissionTestCase(unittest.TestCase):
"""Integration tests requiring a running Zordon server.
Start the server: python server.py
Run tests: ZORDON_TEST_HOST=127.0.0.1 python -m pytest tests/job_creation_tests.py
Override host/port via ZORDON_TEST_HOST and ZORDON_TEST_PORT env vars.
"""
render_server = None
test_job_id = None
@classmethod
def setUpClass(cls):
cls.render_server = RenderServerProxy(SERVER_HOST, SERVER_PORT)
@classmethod
def tearDownClass(cls):
cls.render_server = None
def test_connection_ok(self):
self.assertTrue(self.render_server.is_online(),
msg=f'Server not reachable at {SERVER_HOST}:{SERVER_PORT}')
def test_submit_job(self):
sample_file_path = os.path.join(os.path.dirname(__file__), 'resources',
'batman_sample.blend')
self.assertTrue(os.path.exists(sample_file_path),
msg=f'Test file not found: {sample_file_path}')
sample_job = {
'name': 'sample_blender_job',
'renderer': 'blender',
'start_frame': 1,
'end_frame': 5,
'args': {'engine': 'CYCLES'},
'enable_split_jobs': False,
}
response = self.render_server.post_job_to_server(
file_path=sample_file_path, job_list=[sample_job])
self.assertIsNotNone(response, msg='No response from server')
self.assertTrue(response.ok, msg=f'Server returned {response.status_code}')
logger.info('Submitted to server ok!')
job_data = response.json()
self.__class__.test_job_id = job_data[0]['id']
def test_wait_for_job_to_complete(self):
if not self.__class__.test_job_id:
self.skipTest('No job was submitted in test_submit_job')
file_count = 0
while True:
update_response = self.render_server.get_job_info(self.__class__.test_job_id)
if update_response:
print(f"Status: {update_response['status']}")
if update_response['status'] not in [
'not_started', 'running', 'waiting_for_subjobs', 'configuring'
]:
self.assertEqual(update_response['status'], 'completed',
msg=f"Job ended with status: {update_response['status']}")
break
if update_response['file_count'] > file_count:
file_count = update_response['file_count']
print(f"File count is now {file_count}")
time.sleep(1)
Binary file not shown.
+172
View File
@@ -0,0 +1,172 @@
from pathlib import Path
from unittest.mock import patch
from src.utilities.config import Config, _CONFIG_ATTRS
class TestConfigDefaults:
"""Default attribute values."""
def test_default_upload_folder(self):
assert Config.upload_folder == '~/zordon-uploads/'
def test_default_port(self):
assert Config.port_number == 8080
def test_default_server_log_level(self):
assert Config.server_log_level == 'debug'
def test_default_enable_split_jobs(self):
assert Config.enable_split_jobs is True
def test_default_worker_timeout(self):
assert Config.worker_process_timeout == 120
class TestConfigInstance:
"""Instance creation and attribute initialisation."""
def test_init_copies_class_attrs(self):
cfg = Config()
assert cfg.upload_folder == '~/zordon-uploads/'
assert cfg.port_number == 8080
def test_init_has_all_attrs(self):
cfg = Config()
for attr in _CONFIG_ATTRS:
assert hasattr(cfg, attr), f'missing attr: {attr}'
class TestConfigLoad:
"""Loading configuration from YAML."""
def test_load_sets_attributes(self, tmp_path):
config_file = tmp_path / 'config.yaml'
config_file.write_text('port_number: 9090\nserver_log_level: info\n')
cfg = Config()
cfg.load(config_file)
assert cfg.port_number == 9090
assert cfg.server_log_level == 'info'
def test_load_expands_upload_folder(self, tmp_path):
config_file = tmp_path / 'config.yaml'
config_file.write_text('upload_folder: ~/custom-uploads/\n')
cfg = Config()
cfg.load(config_file)
# expanduser strips trailing slash
assert cfg.upload_folder.endswith('/custom-uploads')
def test_load_ignores_unknown_keys(self, tmp_path):
config_file = tmp_path / 'config.yaml'
config_file.write_text('nonexistent_key: value\nport_number: 7070\n')
cfg = Config()
cfg.load(config_file)
assert cfg.port_number == 7070
def test_load_raises_on_missing_file(self):
cfg = Config()
try:
cfg.load(Path('/nonexistent/path.yaml'))
assert False, 'expected FileNotFoundError'
except FileNotFoundError:
pass
class TestConfigSyncClass:
"""_sync_class propagates instance attrs to class level."""
def test_sync_class_propagates_attrs(self):
orig = Config._default_instance
try:
cfg = Config()
cfg.port_number = 7777
Config._default_instance = cfg
Config._sync_class()
assert Config.port_number == 7777
finally:
Config._default_instance = orig
Config._sync_class()
def test_sync_class_noop_when_no_instance(self):
orig = Config._default_instance
try:
Config._default_instance = None
Config._sync_class()
finally:
Config._default_instance = orig
Config._sync_class()
class TestConfigLoadConfig:
"""Classmethod load_config — full bootstrap."""
def test_load_config_sets_default_instance(self, tmp_path):
orig = Config._default_instance
try:
config_file = tmp_path / 'config.yaml'
config_file.write_text('port_number: 9999\n')
Config._default_instance = None
Config.load_config(config_file)
assert Config._default_instance is not None
assert Config.port_number == 9999
finally:
Config._default_instance = orig
Config._sync_class()
class TestConfigDir:
"""config_dir() returns OS-specific path."""
@patch('src.utilities.config.current_system_os')
def test_config_dir_macos(self, mock_os):
mock_os.return_value = 'macos'
result = Config.config_dir()
assert 'Library/Application Support/Zordon' in str(result)
@patch('src.utilities.config.current_system_os')
def test_config_dir_windows(self, mock_os):
mock_os.return_value = 'windows'
with patch.dict('os.environ', {'APPDATA': 'C:\\Users\\Test\\AppData\\Roaming'}):
result = Config.config_dir()
assert 'Zordon' in str(result)
@patch('src.utilities.config.current_system_os')
def test_config_dir_linux(self, mock_os):
mock_os.return_value = 'linux'
result = Config.config_dir()
assert '.config/Zordon' in str(result)
class TestSetupConfigDir:
"""setup_config_dir creates dir and copies template."""
@patch('src.utilities.config.copy_directory_contents')
@patch('src.utilities.config.os.makedirs')
@patch('src.utilities.config.os.path.exists')
def test_creates_dir_when_missing(self, mock_exists, mock_makedirs, mock_copy):
mock_exists.return_value = False
Config.setup_config_dir()
mock_makedirs.assert_called_once()
@patch('src.utilities.config.copy_directory_contents')
@patch('src.utilities.config.os.makedirs')
@patch('src.utilities.config.os.path.exists')
def test_skips_when_dir_exists(self, mock_exists, mock_makedirs, mock_copy):
mock_exists.return_value = True
Config.setup_config_dir()
mock_makedirs.assert_not_called()
mock_copy.assert_not_called()
+138
View File
@@ -0,0 +1,138 @@
from unittest.mock import MagicMock, patch
import src.distributed_job_manager as djm_module
from src.distributed_job_manager import DistributedJobManager
from src.utilities.status_utils import RenderStatus
class TestSubscribeToListener:
"""PubSub subscription."""
def test_subscribes_to_status_change(self, distributed_job_manager_instance):
distributed_job_manager_instance._subscribe_to_listener()
# Check via the module-level reference (the one _subscribe_to_listener uses)
djm_module.pub.subscribe.assert_any_call(
distributed_job_manager_instance._local_job_status_changed,
'status_change',
)
djm_module.pub.subscribe.assert_any_call(
distributed_job_manager_instance._local_job_frame_complete,
'frame_complete',
)
class TestCreateRenderJob:
"""Creating a render job."""
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_creates_worker_and_adds_to_queue(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
worker.id = 'job-1'
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {'engine': 'CYCLES'},
'name': 'Test Job',
'start_frame': 1,
'end_frame': 10,
'priority': 2,
'enable_split_jobs': False,
}
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue') as mock_add:
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
result = DistributedJobManager.create_render_job(attrs, project_path)
assert result == worker
assert worker.status == RenderStatus.NOT_STARTED
mock_add.assert_called_once_with(worker, force_start=False)
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_split_jobs_enabled_calls_split_async(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {},
'name': 'Split Job',
'start_frame': 1,
'end_frame': 10,
'priority': 2,
'enable_split_jobs': True,
}
# The forwarder passes system_os=None by default
with patch.object(distributed_job_manager_instance, '_split_into_subjobs_async') as mock_split:
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue'):
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
DistributedJobManager.create_render_job(attrs, project_path)
mock_split.assert_called_once_with(worker, attrs, project_path, None)
class TestHandleSubjobUpdate:
"""Processing subjob update notifications."""
def test_updates_child_info(self, distributed_job_manager_instance):
parent_job = MagicMock()
parent_job.children = {}
subjob_data = {
'id': 'sub-1',
'hostname': 'worker-1',
'status': 'completed',
'percent_complete': 1.0,
'file_count': 5,
}
with patch('src.distributed_job_manager.download_missing_frames_from_subjob',
return_value=True):
DistributedJobManager.handle_subjob_update_notification(parent_job, subjob_data)
assert 'sub-1@worker-1' in parent_job.children
assert parent_job.children['sub-1@worker-1']['download_status'] == 'completed'
class TestFindAvailableServers:
"""Discovering remote servers."""
@patch('src.distributed_job_manager.ZeroconfServer.found_hostnames')
@patch('src.distributed_job_manager.ZeroconfServer.get_hostname_properties')
@patch('src.distributed_job_manager.RenderServerProxy')
def test_finds_matching_server(
self, mock_proxy_class, mock_get_props, mock_found_hostnames,
):
mock_found_hostnames.return_value = ['server-1.local']
mock_get_props.return_value = {'api_version': '0.1', 'system_os': 'macos'}
mock_proxy = MagicMock()
mock_proxy.is_engine_available.return_value = {
'available': True,
'hostname': 'server-1.local',
}
mock_proxy_class.return_value = mock_proxy
result = DistributedJobManager.find_available_servers('blender')
assert len(result) == 1
assert result[0]['hostname'] == 'server-1.local'
+239
View File
@@ -0,0 +1,239 @@
from unittest.mock import MagicMock, patch
import pytest
from src.engines.engine_manager import EngineManager, EngineDownloadWorker
class TestEngineManagerSyncClass:
"""_sync_class propagates instance attrs to class level."""
def test_sync_class_propagates_engines_path(self, engine_manager_instance):
assert EngineManager.engines_path == engine_manager_instance.engines_path
def test_sync_class_noop_when_no_instance(self):
orig = EngineManager._default_instance
try:
EngineManager._default_instance = None
EngineManager.engines_path = 'original'
EngineManager._sync_class()
assert EngineManager.engines_path == 'original'
finally:
EngineManager._default_instance = orig
EngineManager._sync_class()
def test_supported_engines_returns_list(self):
engines = EngineManager.supported_engines()
assert len(engines) >= 2
class TestEngineClassMapping:
"""Mapping file extensions and names to engine classes."""
def test_engine_class_with_name_finds_blender(self, engine_manager_instance):
cls = EngineManager.engine_class_with_name('blender')
assert cls is not None
assert cls.__name__ == 'Blender'
def test_engine_class_with_name_finds_ffmpeg(self, engine_manager_instance):
cls = EngineManager.engine_class_with_name('ffmpeg')
assert cls is not None
assert cls.__name__ == 'FFMPEG'
def test_engine_class_with_name_returns_none_for_unknown(self, engine_manager_instance):
cls = EngineManager.engine_class_with_name('nonexistent')
assert cls is None
def test_engine_class_with_name_case_insensitive(self, engine_manager_instance):
cls = EngineManager.engine_class_with_name('BLENDER')
assert cls is not None
assert cls.__name__ == 'Blender'
def test_engine_class_for_project_path_no_engines_path(self, engine_manager_instance):
engine_manager_instance.engines_path = None
with pytest.raises(FileNotFoundError):
EngineManager.engine_class_for_project_path('test.blend')
class TestGetInstalledEngineData:
"""Parsing directory listings for managed engines."""
def test_get_installed_engine_data_no_path(self, engine_manager_instance):
engine_manager_instance.engines_path = None
with pytest.raises(FileNotFoundError):
EngineManager.get_installed_engine_data()
def test_get_installed_engine_data_empty_dir(self, engine_manager_instance, tmp_path):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
result = EngineManager.get_installed_engine_data(ignore_system=True)
assert result == []
@patch('src.engines.engine_manager.os.listdir')
@patch('src.engines.engine_manager.os.path.isdir')
def test_parse_managed_engine_directory(
self, mock_isdir, mock_listdir, engine_manager_instance, tmp_path,
):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
mock_listdir.return_value = ['blender-3.6.0-macos-arm64']
mock_isdir.return_value = True
result = EngineManager.get_installed_engine_data(ignore_system=True)
assert len(result) == 1
assert result[0]['engine'] == 'blender'
assert result[0]['version'] == '3.6.0'
assert result[0]['system_os'] == 'macos'
assert result[0]['cpu'] == 'arm64'
assert result[0]['type'] == 'managed'
@patch('src.engines.engine_manager.os.listdir')
@patch('src.engines.engine_manager.os.path.isdir')
def test_filter_by_engine_name(
self, mock_isdir, mock_listdir, engine_manager_instance, tmp_path,
):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
mock_listdir.return_value = ['blender-3.6.0-macos-arm64', 'ffmpeg-6.0-macos-arm64']
mock_isdir.return_value = True
blender_only = EngineManager.get_installed_engine_data(
filter_name='blender', ignore_system=True)
assert len(blender_only) == 1
assert blender_only[0]['engine'] == 'blender'
class TestNewestInstalledEngineData:
"""Filtering by system and CPU."""
@patch('src.engines.engine_manager.current_system_os')
@patch('src.engines.engine_manager.current_system_cpu')
def test_newest_filters_by_system_and_cpu(
self, mock_cpu, mock_os, engine_manager_instance, tmp_path,
):
mock_os.return_value = 'macos'
mock_cpu.return_value = 'arm64'
engine_manager_instance.engines_path = str(tmp_path / 'engines')
dirs = [
'blender-3.6.0-macos-arm64',
'blender-4.0.0-linux-x86_64',
'blender-4.1.0-macos-arm64',
]
with (patch('src.engines.engine_manager.os.listdir', return_value=dirs),
patch('src.engines.engine_manager.os.path.isdir', return_value=True)):
result = EngineManager.newest_installed_engine_data(
'blender', ignore_system=True)
assert result['version'] == '4.1.0'
assert result['system_os'] == 'macos'
assert result['cpu'] == 'arm64'
def test_newest_returns_empty_on_no_match(self, engine_manager_instance, tmp_path):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
result = EngineManager.newest_installed_engine_data('nonexistent')
assert result == []
class TestIsVersionInstalled:
"""Checking whether a specific version is installed."""
@patch('src.engines.engine_manager.current_system_os')
@patch('src.engines.engine_manager.current_system_cpu')
def test_finds_matching_version(
self, mock_cpu, mock_os, engine_manager_instance, tmp_path,
):
mock_os.return_value = 'macos'
mock_cpu.return_value = 'arm64'
engine_manager_instance.engines_path = str(tmp_path / 'engines')
dirs = ['blender-3.6.0-macos-arm64', 'blender-4.1.0-macos-arm64']
with (patch('src.engines.engine_manager.os.listdir', return_value=dirs),
patch('src.engines.engine_manager.os.path.isdir', return_value=True)):
result = EngineManager.is_version_installed('blender', '3.6.0')
assert result is not False
@patch('src.engines.engine_manager.current_system_os')
@patch('src.engines.engine_manager.current_system_cpu')
def test_returns_false_on_no_match(
self, mock_cpu, mock_os, engine_manager_instance, tmp_path,
):
mock_os.return_value = 'macos'
mock_cpu.return_value = 'arm64'
engine_manager_instance.engines_path = str(tmp_path / 'engines')
result = EngineManager.is_version_installed('blender', '99.0.0')
assert result is False
class TestDeleteEngineDownload:
"""Deleting a managed engine directory."""
@patch('src.engines.engine_manager.shutil.rmtree')
@patch('src.engines.engine_manager.current_system_os', return_value='macos')
@patch('src.engines.engine_manager.current_system_cpu', return_value='arm64')
def test_delete_managed_engine(
self, mock_cpu, mock_os, mock_rmtree, engine_manager_instance, tmp_path,
):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
engines_dir = tmp_path / 'engines' / 'blender-3.6.0-macos-arm64'
engines_dir.mkdir(parents=True)
(engines_dir / 'Blender').write_text('fake binary')
result = EngineManager.delete_engine_download('blender', '3.6.0')
assert result is True
mock_rmtree.assert_called_once_with(str(engines_dir), ignore_errors=False)
def test_delete_nonexistent_engine(self, engine_manager_instance, tmp_path):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
result = EngineManager.delete_engine_download('nonexistent', '1.0.0')
assert result is False
class TestActiveDownloads:
"""Background download tracking."""
def test_active_downloads_empty_initially(self, engine_manager_instance):
assert EngineManager.active_downloads() == []
def test_download_tasks_tracked(self, engine_manager_instance):
task = MagicMock(spec=EngineDownloadWorker)
task.is_alive.return_value = True
task.name = 'blender-4.0.0-macos-arm64'
engine_manager_instance.download_tasks.append(task)
active = EngineManager.active_downloads()
assert len(active) == 1
class TestCreateWorker:
"""Creating worker instances for render jobs."""
def test_create_worker_raises_when_no_engines_installed(self, engine_manager_instance, tmp_path):
engine_manager_instance.engines_path = str(tmp_path / 'engines')
with patch.object(engine_manager_instance, '_get_installed_engine_data', return_value=[]):
with pytest.raises(FileNotFoundError, match='Cannot find any installed'):
EngineManager.create_worker(
'blender',
input_path=tmp_path / 'test.blend',
output_path=tmp_path / 'output',
)
def test_create_worker_raises_for_unknown_engine(self, engine_manager_instance):
with pytest.raises(AttributeError):
EngineManager.create_worker(
'nonexistent',
input_path='/tmp/test.blend',
output_path='/tmp/output',
)
class TestDownloadableEngines:
"""Engines with a downloader."""
def test_downloadable_engines_returns_list(self, engine_manager_instance):
engines = EngineManager.downloadable_engines()
assert isinstance(engines, list)
+169
View File
@@ -0,0 +1,169 @@
from unittest.mock import MagicMock, patch
from src.api.preview_manager import PreviewManager
def make_mock_job(job_id='test-job-1', input_path='/tmp/test.blend',
file_list=None, **kwargs):
job = MagicMock()
job.id = job_id
job.input_path = input_path
job.file_list.return_value = file_list or []
for k, v in kwargs.items():
setattr(job, k, v)
return job
class TestPreviewManagerDefaults:
"""Default state."""
def test_storage_path_none_initially(self, preview_manager_instance):
assert preview_manager_instance.storage_path is not None
def test_running_jobs_empty_initially(self, preview_manager_instance):
assert preview_manager_instance._running_jobs == {}
class TestGeneratePreviewWorker:
"""Core preview generation logic."""
def test_skips_when_no_supported_files(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job(file_list=[str(tmp_path / 'output.txt')])
with patch.object(preview_manager_instance, '_generate_job_preview_worker') as mock_gen:
preview_manager_instance._update_previews_for_job(job)
mock_gen.assert_called_once_with(job, False)
def test_uses_input_path_when_no_file_list(self, preview_manager_instance, tmp_path):
"""When file_list is empty, falls back to input_path."""
job = make_mock_job(input_path=str(tmp_path / 'output.mp4'))
with patch.object(preview_manager_instance, '_generate_job_preview_worker') as mock_gen:
preview_manager_instance._update_previews_for_job(job)
mock_gen.assert_called_once()
def test_generate_preview_checks_existing_files(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job(input_path=str(tmp_path / 'test.jpg'))
with patch('src.api.preview_manager.save_first_frame') as mock_save:
with patch('src.api.preview_manager.os.path.exists', return_value=False):
preview_manager_instance._generate_job_preview_worker(job)
# No file_list → falls back to input_path → label is "input"
expected_img = str(tmp_path / f'{job.id}-input-480.jpg')
mock_save.assert_called_once_with(
source_path=job.input_path,
dest_path=expected_img,
max_width=480,
)
class TestUpdatePreviewsForJob:
"""Dispatch of preview generation."""
def test_starts_new_thread(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job(file_list=[str(tmp_path / 'test.mp4')])
with patch('src.api.preview_manager.threading.Thread') as mock_thread:
thread_instance = MagicMock()
mock_thread.return_value = thread_instance
preview_manager_instance._update_previews_for_job(job)
mock_thread.assert_called_once()
thread_instance.start.assert_called_once()
assert preview_manager_instance._running_jobs[job.id] == thread_instance
def test_reuses_existing_thread(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job()
existing_thread = MagicMock()
existing_thread.is_alive.return_value = True
preview_manager_instance._running_jobs[job.id] = existing_thread
with patch('src.api.preview_manager.threading.Thread') as mock_thread:
preview_manager_instance._update_previews_for_job(job)
mock_thread.assert_not_called()
def test_join_when_wait_requested(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job()
with patch('src.api.preview_manager.threading.Thread') as mock_thread:
thread_instance = MagicMock()
mock_thread.return_value = thread_instance
preview_manager_instance._update_previews_for_job(
job, wait_until_completion=True, timeout=30,
)
thread_instance.join.assert_called_once_with(timeout=30)
class TestGetPreviewsForJob:
"""Reading preview files."""
def test_returns_empty_when_no_previews(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job()
result = preview_manager_instance._get_previews_for_job(job)
assert result == {}
def test_returns_preview_files(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job(job_id='abc')
# Create a fake preview file
(tmp_path / 'abc-output-480.jpg').write_text('preview')
result = preview_manager_instance._get_previews_for_job(job)
assert 'output' in result
assert len(result['output']) == 1
assert result['output'][0]['kind'] == 'image'
class TestDeletePreviewsForJob:
"""Cleaning up preview files."""
def test_deletes_existing_previews(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job(job_id='abc')
preview_file = tmp_path / 'abc-output-480.jpg'
preview_file.write_text('preview')
preview_manager_instance._delete_previews_for_job(job)
assert not preview_file.exists()
def test_no_error_when_no_previews(self, preview_manager_instance, tmp_path):
preview_manager_instance.storage_path = str(tmp_path)
job = make_mock_job()
preview_manager_instance._delete_previews_for_job(job)
class TestForwarders:
"""Classmethod forwarders delegate to instance."""
def test_update_previews_for_job_forwarder(self, preview_manager_instance):
job = make_mock_job()
with patch.object(preview_manager_instance, '_update_previews_for_job') as mock_method:
PreviewManager.update_previews_for_job(job)
mock_method.assert_called_once_with(job, False, False, None)
def test_get_previews_for_job_forwarder(self, preview_manager_instance):
job = make_mock_job()
with patch.object(preview_manager_instance, '_get_previews_for_job',
return_value={'input': []}) as mock_method:
result = PreviewManager.get_previews_for_job(job)
assert result == {'input': []}
mock_method.assert_called_once_with(job)
def test_delete_previews_for_job_forwarder(self, preview_manager_instance):
job = make_mock_job()
with patch.object(preview_manager_instance, '_delete_previews_for_job') as mock_method:
PreviewManager.delete_previews_for_job(job)
mock_method.assert_called_once_with(job)
+307
View File
@@ -0,0 +1,307 @@
from unittest.mock import MagicMock, patch
from datetime import datetime
import pytest
from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.status_utils import RenderStatus
def make_mock_job(job_id='test-1', status=RenderStatus.NOT_STARTED,
engine_name='blender', priority=2, **kwargs):
job = MagicMock()
job.id = job_id
job.status = status
job.engine_name = engine_name
job.priority = priority
job.name = kwargs.get('name', 'Test Job')
job.scheduled_start = kwargs.get('scheduled_start', None)
for k, v in kwargs.items():
setattr(job, k, v)
return job
class TestRenderQueueDefaults:
"""Default state."""
def test_init_sets_empty_queue(self, render_queue_instance):
assert render_queue_instance.job_queue == []
def test_init_sets_max_instances(self, render_queue_instance):
assert render_queue_instance.maximum_renderer_instances == {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
def test_init_is_not_running(self, render_queue_instance):
assert render_queue_instance.is_running is False
class TestAllJobs:
"""Returning all jobs."""
def test_all_jobs_empty_initially(self, render_queue_instance):
assert RenderQueue.all_jobs() == []
def test_all_jobs_returns_job_list(self, render_queue_instance):
job = make_mock_job()
render_queue_instance.job_queue.append(job)
assert len(RenderQueue.all_jobs()) == 1
assert RenderQueue.all_jobs()[0] == job
class TestJobsWithStatus:
"""Filtering jobs by status."""
def test_jobs_with_status_returns_matching(self, render_queue_instance):
running = make_mock_job('job-1', RenderStatus.RUNNING)
pending = make_mock_job('job-2', RenderStatus.NOT_STARTED)
render_queue_instance.job_queue.extend([running, pending])
result = RenderQueue.jobs_with_status(RenderStatus.RUNNING)
assert len(result) == 1
assert result[0].id == 'job-1'
def test_jobs_with_status_empty_when_no_match(self, render_queue_instance):
job = make_mock_job('job-1', RenderStatus.COMPLETED)
render_queue_instance.job_queue.append(job)
result = RenderQueue.jobs_with_status(RenderStatus.RUNNING)
assert result == []
def test_jobs_with_status_sorts_by_priority(self, render_queue_instance):
high = make_mock_job('job-1', RenderStatus.NOT_STARTED, priority=1)
low = make_mock_job('job-2', RenderStatus.NOT_STARTED, priority=5)
render_queue_instance.job_queue.extend([low, high])
result = RenderQueue.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
assert result[0].id == 'job-1'
class TestRunningAndPending:
"""Convenience methods for running/pending."""
def test_running_jobs(self, render_queue_instance):
running = make_mock_job('job-1', RenderStatus.RUNNING)
pending = make_mock_job('job-2', RenderStatus.NOT_STARTED)
render_queue_instance.job_queue.extend([running, pending])
result = RenderQueue.running_jobs()
assert len(result) == 1
assert result[0].id == 'job-1'
def test_pending_jobs_includes_not_started_and_scheduled(self, render_queue_instance):
ns = make_mock_job('job-1', RenderStatus.NOT_STARTED)
sched = make_mock_job('job-2', RenderStatus.SCHEDULED)
running = make_mock_job('job-3', RenderStatus.RUNNING)
render_queue_instance.job_queue.extend([ns, sched, running])
result = RenderQueue.pending_jobs()
assert len(result) == 2
class TestJobWithId:
"""Lookup by job ID."""
def test_finds_job(self, render_queue_instance):
job = make_mock_job('abc-123')
render_queue_instance.job_queue.append(job)
found = RenderQueue.job_with_id('abc-123')
assert found == job
def test_raises_when_not_found(self, render_queue_instance):
with pytest.raises(JobNotFoundError, match='abc-123'):
RenderQueue.job_with_id('abc-123')
def test_returns_none_when_not_found_with_none_ok(self, render_queue_instance):
result = RenderQueue.job_with_id('abc-123', none_ok=True)
assert result is None
class TestJobCounts:
"""Counting jobs by status."""
def test_job_counts_returns_all_statuses(self, render_queue_instance):
render_queue_instance.job_queue.extend([
make_mock_job('j1', RenderStatus.RUNNING),
make_mock_job('j2', RenderStatus.COMPLETED),
make_mock_job('j3', RenderStatus.NOT_STARTED),
])
counts = RenderQueue.job_counts()
assert counts[RenderStatus.RUNNING.value] == 1
assert counts[RenderStatus.COMPLETED.value] == 1
assert counts[RenderStatus.NOT_STARTED.value] == 1
assert counts[RenderStatus.ERROR.value] == 0
assert counts[RenderStatus.CANCELLED.value] == 0
class TestIsAvailableForJob:
"""Renderer availability checks."""
def test_available_when_no_running_jobs(self, render_queue_instance):
assert RenderQueue.is_available_for_job('blender') is True
def test_not_available_when_maxed_out(self, render_queue_instance):
render_queue_instance.job_queue.append(
make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender')
)
assert RenderQueue.is_available_for_job('blender') is False
def test_available_for_different_renderer(self, render_queue_instance):
render_queue_instance.job_queue.append(
make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender')
)
assert RenderQueue.is_available_for_job('ffmpeg') is True
def test_blocked_by_higher_priority(self, render_queue_instance):
render_queue_instance.job_queue.append(
make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender', priority=0)
)
assert RenderQueue.is_available_for_job('blender', priority=2) is False
def test_ffmpeg_allows_multiple_instances(self, render_queue_instance):
for i in range(4):
render_queue_instance.job_queue.append(
make_mock_job(f'j{i}', RenderStatus.RUNNING, engine_name='ffmpeg')
)
assert RenderQueue.is_available_for_job('ffmpeg') is False
class TestAddToRenderQueue:
"""Adding jobs."""
def test_add_job_appends_to_queue(self, render_queue_instance):
render_queue_instance.session = MagicMock()
job = make_mock_job()
RenderQueue.add_to_render_queue(job)
assert job in render_queue_instance.job_queue
def test_add_job_saves_state(self, render_queue_instance):
render_queue_instance.session = MagicMock()
job = make_mock_job()
RenderQueue.add_to_render_queue(job)
render_queue_instance.session.add.assert_called_once_with(job)
def test_add_job_force_start_calls_start(self, render_queue_instance):
render_queue_instance.session = MagicMock()
job = make_mock_job(status=RenderStatus.NOT_STARTED)
with patch.object(render_queue_instance, '_start_job') as mock_start:
RenderQueue.add_to_render_queue(job, force_start=True)
mock_start.assert_called_once_with(job)
def test_add_job_force_start_skips_completed(self, render_queue_instance):
render_queue_instance.session = MagicMock()
job = make_mock_job(status=RenderStatus.COMPLETED)
with patch.object(render_queue_instance, '_start_job') as mock_start:
RenderQueue.add_to_render_queue(job, force_start=True)
mock_start.assert_not_called()
def test_add_job_evaluates_queue_when_running(self, render_queue_instance):
render_queue_instance.session = MagicMock()
render_queue_instance.is_running = True
job = make_mock_job()
with patch.object(render_queue_instance, '_evaluate_queue') as mock_eval:
RenderQueue.add_to_render_queue(job)
mock_eval.assert_called_once()
class TestStartCancelDelete:
"""Job lifecycle."""
def test_start_job_calls_job_start(self, render_queue_instance):
job = make_mock_job()
with patch.object(render_queue_instance, '_save_state'):
RenderQueue.start_job(job)
job.start.assert_called_once()
def test_cancel_job_calls_job_stop(self, render_queue_instance):
job = make_mock_job()
job.stop.return_value = None
job.status = RenderStatus.CANCELLED
result = RenderQueue.cancel_job(job)
assert result is True
job.stop.assert_called_once()
def test_delete_job_removes_from_queue(self, render_queue_instance):
job = make_mock_job()
render_queue_instance.job_queue.append(job)
render_queue_instance.session = MagicMock()
result = RenderQueue.delete_job(job)
assert result is True
assert job not in render_queue_instance.job_queue
render_queue_instance.session.delete.assert_called_once_with(job)
class TestStartStop:
"""Starting and stopping the queue evaluation loop."""
def test_start_sets_running_and_evaluates(self, render_queue_instance):
with patch.object(render_queue_instance, '_evaluate_queue') as mock_eval:
RenderQueue.start()
assert render_queue_instance.is_running is True
mock_eval.assert_called_once()
def test_stop_clears_running(self, render_queue_instance):
render_queue_instance.is_running = True
RenderQueue.stop()
assert render_queue_instance.is_running is False
class TestEvaluateQueue:
"""Queue evaluation dispatches jobs."""
def test_evaluate_starts_not_started_jobs(self, render_queue_instance):
job = make_mock_job()
render_queue_instance.job_queue.append(job)
with patch.object(render_queue_instance, '_start_job') as mock_start:
with patch.object(render_queue_instance, '_save_state'):
RenderQueue.evaluate_queue()
mock_start.assert_called_once_with(job)
def test_evaluate_respects_max_instances(self, render_queue_instance):
# One already running, only 1 slot for blender
render_queue_instance.job_queue.append(
make_mock_job('running-1', RenderStatus.RUNNING, engine_name='blender')
)
waiting = make_mock_job('waiting-1', RenderStatus.NOT_STARTED, engine_name='blender')
render_queue_instance.job_queue.append(waiting)
with patch.object(render_queue_instance, '_start_job') as mock_start:
with patch.object(render_queue_instance, '_save_state'):
RenderQueue.evaluate_queue()
mock_start.assert_not_called()
def test_evaluate_starts_scheduled_jobs(self, render_queue_instance):
past = datetime(2020, 1, 1)
job = make_mock_job(status=RenderStatus.SCHEDULED, scheduled_start=past)
render_queue_instance.job_queue.append(job)
with patch.object(render_queue_instance, '_start_job') as mock_start:
with patch.object(render_queue_instance, '_save_state'):
RenderQueue.evaluate_queue()
mock_start.assert_called_once_with(job)
class TestClearHistory:
"""Clearing completed/cancelled/error jobs."""
def test_clear_history_removes_finished_jobs(self, render_queue_instance):
render_queue_instance.session = MagicMock()
jobs = [
make_mock_job('c', RenderStatus.COMPLETED),
make_mock_job('e', RenderStatus.ERROR),
make_mock_job('x', RenderStatus.CANCELLED),
make_mock_job('r', RenderStatus.RUNNING),
]
render_queue_instance.job_queue.extend(jobs)
RenderQueue.clear_history()
assert len(render_queue_instance.job_queue) == 1
assert render_queue_instance.job_queue[0].id == 'r'
+159
View File
@@ -0,0 +1,159 @@
from unittest.mock import MagicMock, patch
import pytest
from src.utilities.zeroconf_server import ZeroconfServer
class TestConfigure:
"""Configuring service parameters."""
def test_configure_sets_attributes(self, zeroconf_server_instance):
with patch('socket.gethostbyname', return_value='192.168.1.1'):
ZeroconfServer.configure('_zordon._tcp.local.', 'test-server', 8080)
assert zeroconf_server_instance.service_type == '_zordon._tcp.local.'
assert zeroconf_server_instance.server_name == 'test-server'
assert zeroconf_server_instance.server_port == 8080
def test_configure_calls_sync_class(self, zeroconf_server_instance):
with patch('socket.gethostbyname', return_value='192.168.1.1'):
ZeroconfServer.configure('_zordon._tcp.local.', 'test-server', 8080)
assert ZeroconfServer.service_type == '_zordon._tcp.local.'
assert ZeroconfServer.server_port == 8080
def test_configure_stops_on_gaierror(self, zeroconf_server_instance):
import socket
with patch('socket.gethostbyname', side_effect=socket.gaierror):
with patch.object(zeroconf_server_instance, '_stop') as mock_stop:
ZeroconfServer.configure('_zordon._tcp.local.', 'test-server', 8080)
mock_stop.assert_called_once()
class TestSyncClass:
"""_sync_class propagates instance attrs to class level."""
def test_sync_class_propagates_all_attrs(self, zeroconf_server_instance):
zeroconf_server_instance.service_type = '_test._tcp.'
zeroconf_server_instance.server_name = 'foo'
zeroconf_server_instance.server_port = 9999
zeroconf_server_instance.server_ip = '10.0.0.1'
zeroconf_server_instance.properties = {'key': 'val'}
ZeroconfServer._sync_class()
assert ZeroconfServer.service_type == '_test._tcp.'
assert ZeroconfServer.server_name == 'foo'
assert ZeroconfServer.server_port == 9999
assert ZeroconfServer.server_ip == '10.0.0.1'
assert ZeroconfServer.properties == {'key': 'val'}
class TestStartStop:
"""Service lifecycle."""
def test_start_raises_without_configure(self, zeroconf_server_instance):
with pytest.raises(RuntimeError, match='configure'):
ZeroconfServer.start()
@patch('src.utilities.zeroconf_server.ServiceBrowser')
def test_start_listen_only_skips_register(self, mock_browser, zeroconf_server_instance):
with patch('socket.gethostbyname', return_value='192.168.1.1'):
ZeroconfServer.configure('_zordon._tcp.local.', 'test', 8080)
with patch.object(zeroconf_server_instance, '_register_service') as mock_register:
ZeroconfServer.start(listen_only=True)
mock_register.assert_not_called()
@patch('src.utilities.zeroconf_server.ServiceBrowser')
def test_start_registers_service(self, mock_browser, zeroconf_server_instance):
with patch('socket.gethostbyname', return_value='192.168.1.1'):
ZeroconfServer.configure('_zordon._tcp.local.', 'test', 8080)
with patch.object(zeroconf_server_instance, '_register_service') as mock_register:
ZeroconfServer.start(listen_only=False)
mock_register.assert_called_once()
def test_stop_unregisters_and_closes(self, zeroconf_server_instance):
zeroconf_server_instance.service_info = MagicMock()
with patch.object(zeroconf_server_instance, '_unregister_service') as mock_unreg:
with patch.object(zeroconf_server_instance.zeroconf, 'close') as mock_close:
ZeroconfServer.stop()
mock_unreg.assert_called_once()
mock_close.assert_called_once()
class TestRegisterService:
"""Service registration with Zeroconf."""
@patch('src.utilities.zeroconf_server.ServiceInfo')
@patch('socket.gethostbyname')
def test_registers_service_info(self, mock_gethostbyname, mock_service_info,
zeroconf_server_instance):
mock_gethostbyname.return_value = '192.168.1.1'
zeroconf_server_instance.service_type = '_zordon._tcp.local.'
zeroconf_server_instance.server_name = 'test'
zeroconf_server_instance.server_port = 8080
zeroconf_server_instance.properties = {}
# Replace real Zeroconf with a mock so we don't actually register
zeroconf_server_instance.zeroconf = MagicMock()
mock_info = MagicMock()
mock_service_info.return_value = mock_info
with patch('socket.inet_aton', return_value=b'\xc0\xa8\x01\x01'):
zeroconf_server_instance._register_service()
zeroconf_server_instance.zeroconf.register_service.assert_called_once_with(mock_info)
assert zeroconf_server_instance.service_info == mock_info
class TestFoundHostnames:
"""Discovery cache."""
def test_found_hostnames_empty_initially(self, zeroconf_server_instance):
result = zeroconf_server_instance._found_hostnames()
assert result == []
@patch('socket.gethostname', return_value='my-machine')
def test_hostnames_sorted_with_local_first(self, mock_hostname, zeroconf_server_instance):
zeroconf_server_instance.client_cache = {
'other-machine': MagicMock(),
'my-machine': MagicMock(),
}
result = zeroconf_server_instance._found_hostnames()
# sort_key returns False (0) for local → sorted first
assert result[0] == 'my-machine'
def test_get_hostname_properties_returns_decoded(self, zeroconf_server_instance):
info = MagicMock()
info.properties = {b'key': b'value', b'num': b'42'}
zeroconf_server_instance.client_cache['server-1'] = info
result = zeroconf_server_instance._get_hostname_properties('server-1')
assert result == {'key': 'value', 'num': '42'}
def test_get_hostname_properties_returns_empty_for_unknown(self, zeroconf_server_instance):
result = zeroconf_server_instance._get_hostname_properties('unknown')
assert result == {}
class TestForwarders:
"""Classmethod forwarders delegate to instance."""
def test_found_hostnames_forwarder(self, zeroconf_server_instance):
zeroconf_server_instance.client_cache['svr'] = MagicMock()
result = ZeroconfServer.found_hostnames()
assert 'svr' in result
def test_get_hostname_properties_forwarder(self, zeroconf_server_instance):
info = MagicMock()
info.properties = {}
zeroconf_server_instance.client_cache['svr'] = info
result = ZeroconfServer.get_hostname_properties('svr')
assert result == {}