Refactor: ApplicationContext DI wiring (#131)

* refactor: wire all services through ApplicationContext

- Created src/application_context.py as DI container with TYPE_CHECKING imports
- server.py now instantiates all services in dependency order via ApplicationContext
- Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix
  to avoid shadowing by same-named @classmethod forwarders
- ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to
  configure forwarder, direct _configure/_start calls during wiring
- Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact
- RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained
- DistributedJobManager: subscribe_to_listener moved to __init__

* fix: greedy regex in get_render_devices swallows BlenderKit log output

Changed regex from greedy [\s\S]* to non-greedy .*? with re.DOTALL
so it stops at the first ] (the end of the GPU data JSON array)
instead of matching through timestamped log lines like
[19:36:22.109, __init__.py:2881] that contain trailing brackets.

* fix: AttributeError on .enabled in update_job_count prevents options from rendering

* refactor: log silent AttributeError catches, add _sync_class to remaining services, drop dead ctx slot
This commit is contained in:
2026-06-05 22:01:20 -05:00
committed by GitHub
parent e8992fc91a
commit fa4a97f6fa
12 changed files with 714 additions and 651 deletions
+43 -13
View File
@@ -11,6 +11,7 @@ from src.api.api_server import API_VERSION
from src.api.api_server import start_api_server from src.api.api_server import start_api_server
from src.api.preview_manager import PreviewManager from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager from src.api.serverproxy_manager import ServerProxyManager
from src.application_context import ApplicationContext
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
@@ -26,21 +27,46 @@ logger = logging.getLogger()
class ZordonServer: class ZordonServer:
def __init__(self): def __init__(self):
self.ctx = ApplicationContext()
# setup logging # setup logging
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper()) level=Config.server_log_level.upper())
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
# Load Config YAML # ---- Bootstrap Config ----
Config.setup_config_dir() Config.setup_config_dir()
config_path = Path(Config.config_dir()) / "config.yaml" config_path = Path(Config.config_dir()) / "config.yaml"
Config.load_config(config_path) self.ctx.config = Config()
self.ctx.config.load(config_path)
Config._default_instance = self.ctx.config
Config._sync_class()
# configure default paths # ---- Engine Manager ----
EngineManager.engines_path = str(Path(Config.upload_folder).expanduser()/ "engines") self.ctx.engine_manager = EngineManager()
os.makedirs(EngineManager.engines_path, exist_ok=True) self.ctx.engine_manager.engines_path = Path(Config.upload_folder).expanduser() / "engines"
PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews" os.makedirs(self.ctx.engine_manager.engines_path, exist_ok=True)
EngineManager._default_instance = self.ctx.engine_manager
EngineManager._sync_class()
# ---- Preview Manager ----
self.ctx.preview_manager = PreviewManager()
self.ctx.preview_manager.storage_path = Path(Config.upload_folder).expanduser() / "previews"
PreviewManager._default_instance = self.ctx.preview_manager
PreviewManager._sync_class()
# ---- Render Queue ----
self.ctx.render_queue = RenderQueue()
self.ctx.render_queue.load_state(database_directory=Path(Config.upload_folder).expanduser())
RenderQueue._default_instance = self.ctx.render_queue
RenderQueue._sync_class()
# ---- Distributed Job Manager ----
self.ctx.distributed_job_manager = DistributedJobManager()
self.ctx.distributed_job_manager.subscribe_to_listener()
DistributedJobManager._default_instance = self.ctx.distributed_job_manager
DistributedJobManager._sync_class()
self.api_server = None self.api_server = None
self.server_hostname: str = socket.gethostname() self.server_hostname: str = socket.gethostname()
@@ -73,10 +99,8 @@ class ZordonServer:
logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}") logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}") logger.debug(f"Engines directory: {EngineManager.engines_path}")
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=Path(Config.upload_folder).expanduser())
ServerProxyManager.subscribe_to_listener() ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
# update hostname # update hostname
self.server_hostname = socket.gethostname() self.server_hostname = socket.gethostname()
@@ -87,16 +111,21 @@ class ZordonServer:
self.api_server.start() self.api_server.start()
# start zeroconf server # start zeroconf server
ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) ctx = self.ctx
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), ctx.zeroconf_server = ZeroconfServer()
ctx.zeroconf_server._configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ctx.zeroconf_server.properties = {'system_cpu': current_system_cpu(),
'system_cpu_brand': current_system_cpu_brand(), 'system_cpu_brand': current_system_cpu_brand(),
'system_cpu_cores': multiprocessing.cpu_count(), 'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(), 'system_os': current_system_os(),
'system_os_version': current_system_os_version(), 'system_os_version': current_system_os_version(),
'system_memory': round(psutil.virtual_memory().total / (1024**3)), # in GB 'system_memory': round(psutil.virtual_memory().total / (1024**3)),
'gpu_info': get_gpu_info(), 'gpu_info': get_gpu_info(),
'api_version': API_VERSION} 'api_version': API_VERSION}
ZeroconfServer.start() ZeroconfServer._default_instance = ctx.zeroconf_server
ZeroconfServer._sync_class()
ctx.zeroconf_server._start()
logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}") logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}")
RenderQueue.start() # Start evaluating the render queue RenderQueue.start() # Start evaluating the render queue
@@ -112,6 +141,7 @@ class ZordonServer:
logger.exception(f"Exception during prepare for shutdown: {e}") logger.exception(f"Exception during prepare for shutdown: {e}")
logger.info(f"{APP_NAME} Render Server has shut down") logger.info(f"{APP_NAME} Render Server has shut down")
if __name__ == '__main__': if __name__ == '__main__':
server = ZordonServer() server = ZordonServer()
try: try:
+1 -1
View File
@@ -11,7 +11,7 @@ import requests
from tqdm import tqdm from tqdm import tqdm
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
logger = logging.getLogger() logger = logging.getLogger()
+40 -45
View File
@@ -3,6 +3,7 @@ import os
import subprocess import subprocess
import threading import threading
from pathlib import Path from pathlib import Path
from typing import Dict, Optional
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
@@ -12,20 +13,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web
class PreviewManager: class PreviewManager:
"""Manages generation, storage, and retrieval of preview images and videos for rendering jobs.""" _default_instance: Optional['PreviewManager'] = None
storage_path = None storage_path: Optional[str] = None
_running_jobs = {}
def __init__(self) -> None:
self.storage_path = None
self._running_jobs: Dict = {}
@classmethod @classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480): def _sync_class(cls) -> None:
"""Generates image and video previews for a given job. if cls._default_instance is not None:
cls.storage_path = cls._default_instance.storage_path
Args: def _generate_job_preview_worker(self, job, replace_existing=False, max_width=480):
job: The job object containing file information.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
max_width (int): Maximum width for the preview images/videos. Defaults to 480.
"""
# Determine best source file to use for thumbs # Determine best source file to use for thumbs
job_file_list = job.file_list() job_file_list = job.file_list()
@@ -41,8 +42,8 @@ class PreviewManager:
logger.warning(f"No valid image or video files found in files from job: {job}") logger.warning(f"No valid image or video files found in files from job: {job}")
return return
os.makedirs(cls.storage_path, exist_ok=True) os.makedirs(self.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") base_path = os.path.join(self.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4' preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg' preview_image_path = base_path + '.jpg'
@@ -73,41 +74,23 @@ class PreviewManager:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}") logger.error(f"Error generating video preview for {job}: {e}")
@classmethod def _update_previews_for_job(self, job, replace_existing=False, wait_until_completion=False, timeout=None):
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None): job_thread = self._running_jobs.get(job.id)
"""Updates previews for a given job by starting a background thread.
Args:
job: The job object.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False.
timeout (float): Timeout for waiting, if applicable.
"""
job_thread = cls._running_jobs.get(job.id)
if job_thread and job_thread.is_alive(): if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}') logger.debug(f'Preview generation job already running for {job}')
return else:
job_thread = threading.Thread(target=self._generate_job_preview_worker, args=(job, replace_existing,))
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start() job_thread.start()
cls._running_jobs[job.id] = job_thread self._running_jobs[job.id] = job_thread
if wait_until_completion: if wait_until_completion:
job_thread.join(timeout=timeout) job_thread.join(timeout=timeout)
@classmethod def _get_previews_for_job(self, job):
def get_previews_for_job(cls, job):
"""Retrieves previews for a given job.
Args:
job: The job object.
Returns:
dict: A dictionary containing preview information.
"""
results = {} results = {}
try: try:
directory_path = Path(cls.storage_path) directory_path = Path(self.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
for preview_filename in preview_files_for_job: for preview_filename in preview_files_for_job:
@@ -125,14 +108,8 @@ class PreviewManager:
pass pass
return results return results
@classmethod def _delete_previews_for_job(self, job):
def delete_previews_for_job(cls, job): all_previews = self.get_previews_for_job(job)
"""Deletes all previews associated with a given job.
Args:
job: The job object.
"""
all_previews = cls.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist] flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list: for preview in flattened_list:
try: try:
@@ -140,3 +117,21 @@ class PreviewManager:
os.remove(preview['filename']) os.remove(preview['filename'])
except OSError as e: except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}") logger.error(f"Error removing preview '{preview.get('filename')}': {e}")
# --- Forwarders for backward compatibility ---
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
if cls._default_instance is not None:
cls._default_instance._update_previews_for_job(job, replace_existing, wait_until_completion, timeout)
@classmethod
def get_previews_for_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._get_previews_for_job(job)
return {}
@classmethod
def delete_previews_for_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._delete_previews_for_job(job)
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.api.preview_manager import PreviewManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.zeroconf_server import ZeroconfServer
class ApplicationContext:
"""Holds all service instances. Single source of truth for wiring."""
def __init__(self) -> None:
self.config: Optional[Config] = None
self.engine_manager: Optional[EngineManager] = None
self.preview_manager: Optional[PreviewManager] = None
self.zeroconf_server: Optional[ZeroconfServer] = None
self.render_queue: Optional[RenderQueue] = None
self.distributed_job_manager: Optional[DistributedJobManager] = None
+70 -143
View File
@@ -3,8 +3,9 @@ import os
import socket import socket
import threading import threading
import time import time
from typing import Optional
from pathlib import Path
from click import Path
from plyer import notification from plyer import notification
from pubsub import pub from pubsub import pub
@@ -21,47 +22,32 @@ logger = logging.getLogger()
class DistributedJobManager: class DistributedJobManager:
_default_instance: Optional['DistributedJobManager'] = None
def __init__(self):
pass
@classmethod @classmethod
def subscribe_to_listener(cls): def _sync_class(cls) -> None:
""" if cls._default_instance is not None:
Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. pass # no class-level attributes to sync
This should be called once, typically during the initialization phase.
"""
pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
@classmethod def __init__(self) -> None:
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): self.background_worker: Optional[threading.Thread] = None
""" def _subscribe_to_listener(self) -> None:
Responds to the 'frame_complete' pubsub message for local jobs. pub.subscribe(self._local_job_status_changed, 'status_change')
pub.subscribe(self._local_job_frame_complete, 'frame_complete')
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None:
render_job = RenderQueue.job_with_id(job_id, none_ok=True) render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue if not render_job:
return return
logger.debug(f"Job {job_id} has completed frame #{frame_number}") logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0 replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews) self._job_update_shared(render_job, replace_existing_previews)
@classmethod def _job_update_shared(self, render_job, replace_existing_previews=False) -> None:
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent: if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try: try:
@@ -70,57 +56,41 @@ class DistributedJobManager:
except Exception as e: except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str):
"""
Responds to the 'status_change' pubsub message for local jobs.
If it's a child job, it notifies the parent job about the status change.
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'status_change' pubsub message.
"""
render_job = RenderQueue.job_with_id(job_id, none_ok=True) render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs created but not yet added to queue if not render_job:
return return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
# Handle children
if render_job.children: if render_job.children:
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR):
for child in render_job.children: for child in render_job.children:
child_id, child_hostname = child.split('@') child_id, child_hostname = child.split('@')
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
# UI Notifications
try: try:
if new_status == RenderStatus.COMPLETED: if new_status == RenderStatus.COMPLETED:
logger.debug("Show render complete notification") logger.debug("Show render complete notification")
notification.notify( notification.notify(
title='Render Job Complete', title='Render Job Complete',
message=f'{render_job.name} completed succesfully', message=f'{render_job.name} completed succesfully',
timeout=10 # Display time in seconds timeout=10
) )
elif new_status == RenderStatus.ERROR: elif new_status == RenderStatus.ERROR:
logger.debug("Show render error notification") logger.debug("Show render error notification")
notification.notify( notification.notify(
title='Render Job Failed', title='Render Job Failed',
message=f'{render_job.name} failed rendering', message=f'{render_job.name} failed rendering',
timeout=10 # Display time in seconds timeout=10
) )
elif new_status == RenderStatus.RUNNING: elif new_status == RenderStatus.RUNNING:
logger.debug("Show render started notification") logger.debug("Show render started notification")
notification.notify( notification.notify(
title='Render Job Started', title='Render Job Started',
message=f'{render_job.name} started rendering', message=f'{render_job.name} started rendering',
timeout=10 # Display time in seconds timeout=10
) )
except Exception as e: except Exception as e:
logger.debug(f"Unable to show UI notification: {e}") logger.debug(f"Unable to show UI notification: {e}")
@@ -129,30 +99,15 @@ class DistributedJobManager:
# Create Job # Create Job
# -------------------------------------------- # --------------------------------------------
@classmethod def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path):
def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path):
"""Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new
render job.
Args:
new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
loaded_project_local_path (Path): The local path to the loaded project.
Returns:
worker: Created job worker
"""
# get new output path in output_dir
output_path = new_job_attributes.get('output_path') output_path = new_job_attributes.get('output_path')
output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem
# Prepare output path
output_dir = loaded_project_local_path.parent.parent / "output" output_dir = loaded_project_local_path.parent.parent / "output"
output_path = output_dir / output_filename output_path = output_dir / output_filename
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}") logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'], worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'],
input_path=loaded_project_local_path, input_path=loaded_project_local_path,
output_path=output_path, output_path=output_path,
@@ -160,16 +115,15 @@ class DistributedJobManager:
args=new_job_attributes.get('args', {}), args=new_job_attributes.get('args', {}),
parent=new_job_attributes.get('parent'), parent=new_job_attributes.get('parent'),
name=new_job_attributes.get('name')) name=new_job_attributes.get('name'))
worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? worker.status = new_job_attributes.get("initial_status", worker.status)
worker.priority = int(new_job_attributes.get('priority', worker.priority)) worker.priority = int(new_job_attributes.get('priority', worker.priority))
worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame))
worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname() worker.hostname = socket.gethostname()
# determine if we can / should split the job
if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
else: else:
worker.status = RenderStatus.NOT_STARTED worker.status = RenderStatus.NOT_STARTED
@@ -182,15 +136,7 @@ class DistributedJobManager:
# Handling Subjobs # Handling Subjobs
# -------------------------------------------- # --------------------------------------------
@classmethod def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None:
def handle_subjob_update_notification(cls, local_job, subjob_data: dict):
"""Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): Subjob data sent from the remote server.
"""
subjob_status = string_to_status(subjob_data['status']) subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id'] subjob_id = subjob_data['id']
subjob_hostname = subjob_data['hostname'] subjob_hostname = subjob_data['hostname']
@@ -206,19 +152,10 @@ class DistributedJobManager:
if subjob_data['status'] == 'completed' and download_success: if subjob_data['status'] == 'completed' and download_success:
local_job.children[subjob_key]['download_status'] = 'completed' local_job.children[subjob_key]['download_status'] = 'completed'
@classmethod def _wait_for_subjobs(self, parent_job) -> None:
def wait_for_subjobs(cls, parent_job):
"""Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs
when they are completed.
Args:
parent_job: Worker object that has child jobs
Returns:
"""
logger.debug(f"Waiting for subjobs for job {parent_job}") logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED)
def subjobs_not_downloaded(): def subjobs_not_downloaded():
return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or
@@ -230,21 +167,17 @@ class DistributedJobManager:
sleep_counter = 0 sleep_counter = 0
while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS:
if sleep_counter % server_delay == 0: # only ping servers every x seconds if sleep_counter % server_delay == 0:
for child_key, subjob_cached_data in subjobs_not_downloaded().items(): for child_key in subjobs_not_downloaded():
subjob_id = child_key.split('@')[0] subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1] subjob_hostname = child_key.split('@')[-1]
# Fetch info from server and handle failing case
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
if not subjob_data: if not subjob_data:
logger.warning(f"No response from {subjob_hostname}") logger.warning(f"No response from {subjob_hostname}")
# timeout / missing server situations
parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}'
continue continue
# Update parent job cache but keep the download status
download_status = parent_job.children[child_key].get('download_status', None) download_status = parent_job.children[child_key].get('download_status', None)
parent_job.children[child_key] = subjob_data parent_job.children[child_key] = subjob_data
parent_job.children[child_key]['download_status'] = download_status parent_job.children[child_key]['download_status'] = download_status
@@ -254,8 +187,7 @@ class DistributedJobManager:
f"{float(subjob_data.get('percent_complete')) * 100.0}%" f"{float(subjob_data.get('percent_complete')) * 100.0}%"
logger.debug(status_msg) logger.debug(status_msg)
# Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data.get('file_count') and status in statuses_to_download:
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
try: try:
download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname)
parent_job.children[child_key]['download_status'] = 'complete' parent_job.children[child_key]['download_status'] = 'complete'
@@ -263,7 +195,6 @@ class DistributedJobManager:
logger.error(f"Error downloading missing frames from subjob: {e}") logger.error(f"Error downloading missing frames from subjob: {e}")
parent_job.children[child_key]['download_status'] = 'error: {}' parent_job.children[child_key]['download_status'] = 'error: {}'
# Any finished jobs not successfully downloaded at this point are skipped
if parent_job.children[child_key].get('download_status', None) is None and \ if parent_job.children[child_key].get('download_status', None) is None and \
status in statuses_to_download: status in statuses_to_download:
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
@@ -274,42 +205,22 @@ class DistributedJobManager:
f"{', '.join(list(subjobs_not_downloaded().keys()))}") f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(1) time.sleep(1)
sleep_counter += 1 sleep_counter += 1
else: # exit the loop else:
parent_job.status = RenderStatus.RUNNING parent_job.status = RenderStatus.RUNNING
# -------------------------------------------- # --------------------------------------------
# Creating Subjobs # Creating Subjobs
# -------------------------------------------- # --------------------------------------------
@classmethod def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None:
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
# todo: I don't love this
parent_worker.status = RenderStatus.CONFIGURING parent_worker.status = RenderStatus.CONFIGURING
cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, self.background_worker = threading.Thread(target=self.split_into_subjobs, args=(
project_path, system_os)) parent_worker, new_job_attributes, project_path, system_os))
cls.background_worker.start() self.background_worker.start()
@classmethod def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None:
def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): available_servers = specific_servers if specific_servers else self.find_available_servers(
""" parent_worker.engine_name, system_os)
Splits a job into subjobs and distributes them among available servers.
This method checks the availability of servers, distributes the work among them, and creates subjobs on each
server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a
subjob.
Args:
parent_worker (Worker): The parent job what we're creating the subjobs for.
new_job_attributes (dict): Dict of desired attributes for new job (frame count, engine, output path, etc)
project_path (str): The path to the project.
system_os (str, optional): Required OS. Default is any.
specific_servers (list, optional): List of specific servers to split work between. Defaults to all found.
"""
# Check availability
available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.engine_name,
system_os)
# skip if theres no external servers found
external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname]
if not external_servers: if not external_servers:
parent_worker.status = RenderStatus.NOT_STARTED parent_worker.status = RenderStatus.NOT_STARTED
@@ -318,34 +229,29 @@ class DistributedJobManager:
logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}")
all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
# Prep and submit these sub-jobs
logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}")
try: try:
for subjob_data in all_subjob_server_data: for subjob_data in all_subjob_server_data:
subjob_hostname = subjob_data['hostname'] subjob_hostname = subjob_data['hostname']
post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
parent_worker) parent_worker)
if not post_results.ok: if not post_results.ok:
ValueError(f"Failed to create subjob on {subjob_hostname}") ValueError(f"Failed to create subjob on {subjob_hostname}")
# save child info
submission_results = post_results.json()[0] submission_results = post_results.json()[0]
child_key = f"{submission_results['id']}@{subjob_hostname}" child_key = f"{submission_results['id']}@{subjob_hostname}"
parent_worker.children[child_key] = submission_results parent_worker.children[child_key] = submission_results
# start subjobs
logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully")
parent_worker.name = f"{parent_worker.name} (Parent)" parent_worker.name = f"{parent_worker.name} (Parent)"
parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts parent_worker.status = RenderStatus.NOT_STARTED
except Exception as e: except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}") logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs")
RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
@staticmethod @staticmethod
def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker): def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker):
"""Convenience method to create subjobs for a parent worker"""
subjob = new_job_attributes.copy() subjob = new_job_attributes.copy()
subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}"
@@ -364,13 +270,6 @@ class DistributedJobManager:
@staticmethod @staticmethod
def find_available_servers(engine_name: str, system_os=None): def find_available_servers(engine_name: str, system_os=None):
"""
Scan the Zeroconf network for currently available render servers supporting a specific engine.
:param engine_name: str, The engine type to search for
:param system_os: str, Restrict results to servers running a specific OS
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
from api.api_server import API_VERSION from api.api_server import API_VERSION
found_available_servers = [] found_available_servers = []
for hostname in ZeroconfServer.found_hostnames(): for hostname in ZeroconfServer.found_hostnames():
@@ -383,6 +282,34 @@ class DistributedJobManager:
return found_available_servers return found_available_servers
# --- Forwarders for backward compatibility ---
@classmethod
def subscribe_to_listener(cls):
if cls._default_instance is not None:
cls._default_instance._subscribe_to_listener()
@classmethod
def create_render_job(cls, new_job_attributes, loaded_project_local_path):
if cls._default_instance is not None:
return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path)
raise RuntimeError("DistributedJobManager is not initialized")
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data):
if cls._default_instance is not None:
cls._default_instance._handle_subjob_update_notification(local_job, subjob_data)
@classmethod
def wait_for_subjobs(cls, parent_job):
if cls._default_instance is not None:
cls._default_instance._wait_for_subjobs(parent_job)
@classmethod
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
if cls._default_instance is not None:
cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+2 -2
View File
@@ -173,7 +173,7 @@ class Blender(BaseRenderEngine):
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py') script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')
results = self.run_python_script(script_path=script_path) results = self.run_python_script(script_path=script_path)
output = results.stdout.decode() output = results.stdout.decode()
match = re.search(r"GPU DATA:(\[[\s\S]*\])", output) match = re.search(r"GPU DATA:(\[.*?\])", output, re.DOTALL)
if match: if match:
gpu_data_json = match.group(1) gpu_data_json = match.group(1)
gpus_info = json.loads(gpu_data_json) gpus_info = json.loads(gpu_data_json)
@@ -193,5 +193,5 @@ class Blender(BaseRenderEngine):
if __name__ == "__main__": if __name__ == "__main__":
x = Blender().get_render_devices() x = Blender().system_info()
print(x) print(x)
+170 -237
View File
@@ -20,128 +20,78 @@ class EngineManager:
if possible. if possible.
""" """
_default_instance: Optional['EngineManager'] = None
engines_path: Optional[str] = None engines_path: Optional[str] = None
download_tasks: List[Any] = [] download_tasks: List[Any] = []
def __init__(self) -> None:
self.engines_path: Optional[str] = None
self.download_tasks: List[Any] = []
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
cls.engines_path = cls._default_instance.engines_path
cls.download_tasks = cls._default_instance.download_tasks
@staticmethod @staticmethod
def supported_engines() -> list[type[BaseRenderEngine]]: def supported_engines() -> list[type[BaseRenderEngine]]:
"""Return list of supported engine classes.
Returns:
List[Type[BaseRenderEngine]]: List of available engine classes.
"""
return ENGINE_CLASSES return ENGINE_CLASSES
# --- Installed Engines --- # --- Installed Engines ---
@classmethod def _engine_class_for_project_path(self, path: str) -> Type[BaseRenderEngine]:
def engine_class_for_project_path(cls, path: str) -> Type[BaseRenderEngine]:
"""Find engine class that can handle the given project file.
Args:
path: Path to project file.
Returns:
Type[BaseRenderEngine]: Engine class that can handle the file.
"""
_, extension = os.path.splitext(path) _, extension = os.path.splitext(path)
extension = extension.lower().strip('.') extension = extension.lower().strip('.')
for engine_class in cls.supported_engines(): for engine_class in self.supported_engines():
engine = cls.get_latest_engine_instance(engine_class) engine = self.get_latest_engine_instance(engine_class)
if extension in engine.supported_extensions(): if extension in engine.supported_extensions():
return engine_class return engine_class
undefined_renderer_support = [x for x in cls.supported_engines() if not cls.get_latest_engine_instance(x).supported_extensions()] undefined_renderer_support = [x for x in self.supported_engines() if not self.get_latest_engine_instance(x).supported_extensions()]
return undefined_renderer_support[0] return undefined_renderer_support[0]
@classmethod def _engine_class_with_name(self, engine_name: str) -> Optional[Type[BaseRenderEngine]]:
def engine_class_with_name(cls, engine_name: str) -> Optional[Type[BaseRenderEngine]]: for obj in self.supported_engines():
"""Find engine class by name.
Args:
engine_name: Name of engine to find.
Returns:
Optional[Type[BaseRenderEngine]]: Engine class if found, None otherwise.
"""
for obj in cls.supported_engines():
if obj.name().lower() == engine_name.lower(): if obj.name().lower() == engine_name.lower():
return obj return obj
return None return None
@classmethod def _get_latest_engine_instance(self, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine:
def get_latest_engine_instance(cls, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine: newest = self.newest_installed_engine_data(engine_class.name())
"""Create instance of latest installed engine version.
Args:
engine_class: Engine class to instantiate.
Returns:
BaseRenderEngine: Instance of engine with latest version.
"""
newest = cls.newest_installed_engine_data(engine_class.name())
engine = engine_class(newest["path"]) engine = engine_class(newest["path"])
return engine return engine
@classmethod def _get_installed_engine_data(self, filter_name: Optional[str] = None, include_corrupt: bool = False,
def get_installed_engine_data(cls, filter_name: Optional[str] = None, include_corrupt: bool = False,
ignore_system: bool = False) -> List[Dict[str, Any]]: ignore_system: bool = False) -> List[Dict[str, Any]]:
"""Get data about installed render engines. if not self.engines_path:
Args:
filter_name: Optional engine name to filter by.
include_corrupt: Whether to include potentially corrupted installations.
ignore_system: Whether to ignore system-installed engines.
Returns:
List[Dict[str, Any]]: List of installed engine data.
Raises:
FileNotFoundError: If engines path is not set.
"""
if not cls.engines_path:
raise FileNotFoundError("Engine path is not set") raise FileNotFoundError("Engine path is not set")
# Parse downloaded engine directory
results = [] results = []
try: try:
all_items = os.listdir(cls.engines_path) all_items = os.listdir(self.engines_path)
all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))] all_directories = [item for item in all_items if os.path.isdir(os.path.join(self.engines_path, item))]
keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary keys = ["engine", "version", "system_os", "cpu"]
for directory in all_directories: for directory in all_directories:
# Split directory name into segments
segments = directory.split('-') segments = directory.split('-')
# Create a dictionary mapping keys to corresponding segments
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))} result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
result_dict['type'] = 'managed' result_dict['type'] = 'managed'
# Initialize binary_name with engine name
binary_name = result_dict['engine'].lower() binary_name = result_dict['engine'].lower()
# Determine the correct binary name based on the engine and system_os eng = self.engine_class_with_name(result_dict['engine'])
eng = cls.engine_class_with_name(result_dict['engine'])
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name) binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
# Find the path to the binary file search_root = self.engines_path / directory
search_root = Path(cls.engines_path) / directory
match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None) match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None)
path = str(match) if match else None path = str(match) if match else None
result_dict['path'] = path result_dict['path'] = path
# fetch version number from binary - helps detect corrupted downloads - disabled due to perf issues
# binary_version = eng(path).version()
# if not binary_version:
# logger.warning(f"Possible corrupt {eng.name()} {result_dict['version']} install detected: {path}")
# if not include_corrupt:
# continue
# result_dict['version'] = binary_version or 'error'
# Add the result dictionary to results if it matches the filter_name or if no filter is applied
if not filter_name or filter_name == result_dict['engine']: if not filter_name or filter_name == result_dict['engine']:
results.append(result_dict) results.append(result_dict)
except FileNotFoundError as e: except FileNotFoundError as e:
logger.warning(f"Cannot find local engines download directory: {e}") logger.warning(f"Cannot find local engines download directory: {e}")
# add system installs to this list - use bg thread because it can be slow
def fetch_engine_details(eng, include_corrupt=False): def fetch_engine_details(eng, include_corrupt=False):
version = eng().version() version = eng().version()
if not version and not include_corrupt: if not version and not include_corrupt:
@@ -160,7 +110,7 @@ class EngineManager:
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
futures = { futures = {
executor.submit(fetch_engine_details, eng, include_corrupt): eng.name() executor.submit(fetch_engine_details, eng, include_corrupt): eng.name()
for eng in cls.supported_engines() for eng in self.supported_engines()
if eng.default_engine_path() and (not filter_name or filter_name == eng.name()) if eng.default_engine_path() and (not filter_name or filter_name == eng.name())
} }
@@ -173,96 +123,55 @@ class EngineManager:
# --- Check for Updates --- # --- Check for Updates ---
@classmethod def _update_all_engines(self) -> None:
def update_all_engines(cls) -> None: for engine in self.downloadable_engines():
"""Check for and download updates for all downloadable engines.""" update_available = self.is_engine_update_available(engine)
for engine in cls.downloadable_engines():
update_available = cls.is_engine_update_available(engine)
if update_available: if update_available:
update_available['name'] = engine.name() update_available['name'] = engine.name()
cls.download_engine(engine.name(), update_available['version'], background=True) self.download_engine(engine.name(), update_available['version'], background=True)
@classmethod def _all_version_data_for_engine(self, engine_name: str, include_corrupt=False, ignore_system=False) -> list:
def all_version_data_for_engine(cls, engine_name:str, include_corrupt=False, ignore_system=False) -> list: versions = self.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
"""Get all version data for a specific engine.
Args:
engine_name: Name of engine to query.
include_corrupt: Whether to include corrupt installations.
ignore_system: Whether to ignore system installations.
Returns:
list: Sorted list of engine version data (newest first).
"""
versions = cls.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True) sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True)
return sorted_versions return sorted_versions
@classmethod def _newest_installed_engine_data(self, engine_name: str, system_os=None, cpu=None, ignore_system=None) -> list:
def newest_installed_engine_data(cls, engine_name:str, system_os=None, cpu=None, ignore_system=None) -> list:
"""Get newest installed engine data for specific platform.
Args:
engine_name: Name of engine to query.
system_os: Operating system to filter by (defaults to current).
cpu: CPU architecture to filter by (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
list: Newest engine data or empty list if not found.
"""
system_os = system_os or current_system_os() system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu() cpu = cpu or current_system_cpu()
try: try:
filtered = [x for x in cls.all_version_data_for_engine(engine_name, ignore_system=ignore_system) filtered = [x for x in self.all_version_data_for_engine(engine_name, ignore_system=ignore_system)
if x['system_os'] == system_os and x['cpu'] == cpu] if x['system_os'] == system_os and x['cpu'] == cpu]
return filtered[0] return filtered[0]
except IndexError: except IndexError:
logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}") logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}")
return [] return []
@classmethod def _is_version_installed(self, engine_name: str, version: str, system_os=None, cpu=None, ignore_system=False):
def is_version_installed(cls, engine_name:str, version:str, system_os=None, cpu=None, ignore_system=False):
"""Check if specific engine version is installed.
Args:
engine_name: Name of engine to check.
version: Version string to check.
system_os: Operating system to check (defaults to current).
cpu: CPU architecture to check (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
Engine data if found, False otherwise.
"""
system_os = system_os or current_system_os() system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu() cpu = cpu or current_system_cpu()
filtered = [x for x in cls.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if filtered = [x for x in self.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if
x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version] x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
return filtered[0] if filtered else False return filtered[0] if filtered else False
@classmethod def _version_is_available_to_download(self, engine_name: str, version, system_os=None, cpu=None):
def version_is_available_to_download(cls, engine_name:str, version, system_os=None, cpu=None):
try: try:
downloader = cls.engine_class_with_name(engine_name).downloader() downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu) return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu)
except Exception as e: except Exception as e:
logger.debug(f"Exception in version_is_available_to_download: {e}") logger.debug(f"Exception in version_is_available_to_download: {e}")
return None return None
@classmethod def _find_most_recent_version(self, engine_name: str, system_os=None, cpu=None, lts_only=False) -> dict:
def find_most_recent_version(cls, engine_name:str, system_os=None, cpu=None, lts_only=False) -> dict:
try: try:
downloader = cls.engine_class_with_name(engine_name).downloader() downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.find_most_recent_version(system_os=system_os, cpu=cpu) return downloader.find_most_recent_version(system_os=system_os, cpu=cpu)
except Exception as e: except Exception as e:
logger.debug(f"Exception in find_most_recent_version: {e}") logger.debug(f"Exception in find_most_recent_version: {e}")
return {} return {}
@classmethod def _is_engine_update_available(self, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
def is_engine_update_available(cls, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
logger.debug(f"Checking for updates to {engine_class.name()}") logger.debug(f"Checking for updates to {engine_class.name()}")
latest_version = engine_class.downloader().find_most_recent_version() latest_version = engine_class.downloader().find_most_recent_version()
@@ -271,7 +180,7 @@ class EngineManager:
return None return None
version_num = latest_version.get('version') version_num = latest_version.get('version')
if cls.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs): if self.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs):
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded") logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
return None return None
@@ -279,18 +188,11 @@ class EngineManager:
# --- Downloads --- # --- Downloads ---
@classmethod def _downloadable_engines(self):
def downloadable_engines(cls): return [engine for engine in self.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
"""Get list of engines that support downloading.
Returns: def _get_existing_download_task(self, engine_name, version, system_os=None, cpu=None):
List[Type[BaseRenderEngine]]: Engines with downloader capability. for task in self.download_tasks:
"""
return [engine for engine in cls.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
for task in cls.download_tasks:
task_parts = task.name.split('-') task_parts = task.name.split('-')
task_engine, task_version, task_system_os, task_cpu = task_parts[:4] task_engine, task_version, task_system_os, task_cpu = task_parts[:4]
@@ -299,50 +201,45 @@ class EngineManager:
return task return task
return None return None
@classmethod def _download_engine(self, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False): engine_to_download = self.engine_class_with_name(engine_name)
existing_task = self.get_existing_download_task(engine_name, version, system_os, cpu)
engine_to_download = cls.engine_class_with_name(engine_name)
existing_task = cls.get_existing_download_task(engine_name, version, system_os, cpu)
if existing_task: if existing_task:
logger.debug(f"Already downloading {engine_name} {version}") logger.debug(f"Already downloading {engine_name} {version}")
if not background: if not background:
existing_task.join() # If download task exists, wait until it's done downloading existing_task.join()
return None return None
elif not engine_to_download.downloader(): elif not engine_to_download.downloader():
logger.warning("No valid downloader for this engine. Please update this software manually.") logger.warning("No valid downloader for this engine. Please update this software manually.")
return None return None
elif not cls.engines_path: elif not self.engines_path:
raise FileNotFoundError("Engines path must be set before requesting downloads") raise FileNotFoundError("Engines path must be set before requesting downloads")
thread = EngineDownloadWorker(engine_name, version, system_os, cpu) thread = EngineDownloadWorker(engine_name, version, system_os, cpu)
cls.download_tasks.append(thread) self.download_tasks.append(thread)
thread.start() thread.start()
if background: if background:
return thread return thread
thread.join() thread.join()
found_engine = cls.is_version_installed(engine_name, version, system_os, cpu, ignore_system) # Check that engine downloaded found_engine = self.is_version_installed(engine_name, version, system_os, cpu, ignore_system)
if not found_engine: if not found_engine:
logger.error(f"Error downloading {engine_name}") logger.error(f"Error downloading {engine_name}")
return found_engine return found_engine
@classmethod def _delete_engine_download(self, engine_name, version, system_os=None, cpu=None):
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
logger.info(f"Requested deletion of engine: {engine_name}-{version}") logger.info(f"Requested deletion of engine: {engine_name}-{version}")
found = cls.is_version_installed(engine_name, version, system_os, cpu) found = self.is_version_installed(engine_name, version, system_os, cpu)
if found and found['type'] == 'managed': # don't delete system installs if found and found['type'] == 'managed':
# find the root directory of the engine executable
root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']]) root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']])
remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name) remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name)
# delete the file path
logger.info(f"Deleting engine at path: {remove_path}") logger.info(f"Deleting engine at path: {remove_path}")
shutil.rmtree(remove_path, ignore_errors=False) shutil.rmtree(remove_path, ignore_errors=False)
logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted") logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted")
return True return True
elif found: # these are managed by the system / user. Don't delete these. elif found:
logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.') logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.')
else: else:
logger.error(f"Cannot find engine: {engine_name}-{version}") logger.error(f"Cannot find engine: {engine_name}-{version}")
@@ -350,52 +247,16 @@ class EngineManager:
# --- Background Tasks --- # --- Background Tasks ---
@classmethod def _active_downloads(self) -> list:
def active_downloads(cls) -> list: return [x for x in self.download_tasks if x.is_alive()]
"""Get list of currently active download tasks.
Returns: def _create_worker(self, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
list: List of active EngineDownloadWorker threads. worker_class = self.engine_class_with_name(engine_name).worker_class()
"""
return [x for x in cls.download_tasks if x.is_alive()]
@classmethod all_versions = self.all_version_data_for_engine(engine_name)
def create_worker(cls, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
"""
Create and return a worker instance for a specific engine.
This resolves the appropriate engine binary/path for the requested engine and version,
downloading the engine if necessary (when a specific version is requested and not found
locally). The returned worker is constructed with string paths for compatibility with
worker implementations that expect `str` rather than `Path`.
Args:
engine_name: The engine name used to resolve an engine class and its worker.
input_path: Path to the input file/folder for the worker to process.
output_path: Path where the worker should write output.
engine_version: Optional engine version to use. If `None` or `'latest'`, the newest
installed version is used. If a specific version is provided and not installed,
the engine will be downloaded.
args: Optional arguments passed through to the worker (engine-specific).
parent: Optional Qt/GUI parent object passed through to the worker constructor.
name: Optional name/label passed through to the worker constructor.
Returns:
An instance of the engine-specific worker class.
Raises:
FileNotFoundError: If no versions of the engine are installed, if the requested
version cannot be found or downloaded, or if the engine path cannot be resolved.
"""
worker_class = cls.engine_class_with_name(engine_name).worker_class()
# check to make sure we have versions installed
all_versions = cls.all_version_data_for_engine(engine_name)
if not all_versions: if not all_versions:
raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines") raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines")
# Find the path to the requested engine version or use default
engine_path = None engine_path = None
if engine_version and engine_version != 'latest': if engine_version and engine_version != 'latest':
for ver in all_versions: for ver in all_versions:
@@ -403,9 +264,8 @@ class EngineManager:
engine_path = ver['path'] engine_path = ver['path']
break break
# Download the required engine if not found locally
if not engine_path: if not engine_path:
download_result = cls.download_engine(engine_name, engine_version) download_result = self.download_engine(engine_name, engine_version)
if not download_result: if not download_result:
raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}") raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}")
engine_path = download_result['path'] engine_path = download_result['path']
@@ -420,28 +280,109 @@ class EngineManager:
return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args, return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args,
parent=parent, name=name) parent=parent, name=name)
# --- Forwarders for backward compatibility ---
@classmethod
def engine_class_for_project_path(cls, path):
if cls._default_instance is not None:
return cls._default_instance._engine_class_for_project_path(path)
@classmethod
def engine_class_with_name(cls, engine_name):
if cls._default_instance is not None:
return cls._default_instance._engine_class_with_name(engine_name)
@classmethod
def get_latest_engine_instance(cls, engine_class):
if cls._default_instance is not None:
return cls._default_instance._get_latest_engine_instance(engine_class)
@classmethod
def get_installed_engine_data(cls, filter_name=None, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._get_installed_engine_data(filter_name, include_corrupt, ignore_system)
return []
@classmethod
def update_all_engines(cls):
if cls._default_instance is not None:
cls._default_instance._update_all_engines()
@classmethod
def all_version_data_for_engine(cls, engine_name, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._all_version_data_for_engine(engine_name, include_corrupt, ignore_system)
return []
@classmethod
def newest_installed_engine_data(cls, engine_name, system_os=None, cpu=None, ignore_system=None):
if cls._default_instance is not None:
return cls._default_instance._newest_installed_engine_data(engine_name, system_os, cpu, ignore_system)
return []
@classmethod
def is_version_installed(cls, engine_name, version, system_os=None, cpu=None, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._is_version_installed(engine_name, version, system_os, cpu, ignore_system)
return False
@classmethod
def version_is_available_to_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._version_is_available_to_download(engine_name, version, system_os, cpu)
return None
@classmethod
def find_most_recent_version(cls, engine_name, system_os=None, cpu=None, lts_only=False):
if cls._default_instance is not None:
return cls._default_instance._find_most_recent_version(engine_name, system_os, cpu, lts_only)
return {}
@classmethod
def is_engine_update_available(cls, engine_class, ignore_system_installs=False):
if cls._default_instance is not None:
return cls._default_instance._is_engine_update_available(engine_class, ignore_system_installs)
return None
@classmethod
def downloadable_engines(cls):
if cls._default_instance is not None:
return cls._default_instance._downloadable_engines()
return []
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._get_existing_download_task(engine_name, version, system_os, cpu)
return None
@classmethod
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._download_engine(engine_name, version, system_os, cpu, background, ignore_system)
return None
@classmethod
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._delete_engine_download(engine_name, version, system_os, cpu)
return False
@classmethod
def active_downloads(cls):
if cls._default_instance is not None:
return cls._default_instance._active_downloads()
return []
@classmethod
def create_worker(cls, engine_name, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
if cls._default_instance is not None:
return cls._default_instance._create_worker(engine_name, input_path, output_path, engine_version, args, parent, name)
raise RuntimeError("EngineManager is not initialized")
class EngineDownloadWorker(threading.Thread): class EngineDownloadWorker(threading.Thread):
"""A thread worker for downloading a specific version of a rendering engine.
This class handles the process of downloading a rendering engine in a separate thread,
ensuring that the download process does not block the main application.
Attributes:
engine (str): The name of the rendering engine to download.
version (str): The version of the rendering engine to download.
system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type.
cpu (str, optional): Requested CPU architecture. Defaults to system CPU type.
"""
def __init__(self, engine, version, system_os=None, cpu=None): def __init__(self, engine, version, system_os=None, cpu=None):
"""Initialize download worker for specific engine version.
Args:
engine: Name of engine to download.
version: Version of engine to download.
system_os: Target operating system (defaults to current).
cpu: Target CPU architecture (defaults to current).
"""
super().__init__() super().__init__()
self.engine = engine self.engine = engine
self.version = version self.version = version
@@ -450,19 +391,9 @@ class EngineDownloadWorker(threading.Thread):
self.percent_complete = 0 self.percent_complete = 0
def _update_progress(self, current_progress): def _update_progress(self, current_progress):
"""Update download progress.
Args:
current_progress: Current download progress percentage (0-100).
"""
self.percent_complete = current_progress self.percent_complete = current_progress
def run(self): def run(self):
"""Execute the download process.
Checks if engine version already exists, then downloads if not found.
Handles cleanup and error reporting.
"""
try: try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu, existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True) ignore_system=True)
@@ -470,15 +401,17 @@ def run(self):
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists") logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download return existing_download
# Get the appropriate downloader class based on the engine type
downloader = EngineManager.engine_class_with_name(self.engine).downloader() downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine(self.version, download_location=EngineManager.engines_path, downloader.download_engine(self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress) system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e: except Exception as e:
logger.error(f"Error in download worker: {e}") logger.error(f"Error in download worker: {e}")
finally: finally:
# remove itself from the downloader list try:
EngineManager.download_tasks.remove(self) if EngineManager._default_instance is not None:
EngineManager._default_instance.download_tasks.remove(self)
except ValueError:
pass
if __name__ == '__main__': if __name__ == '__main__':
+198 -105
View File
@@ -1,12 +1,13 @@
import logging import logging
import threading
from collections import Counter from collections import Counter
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import List, Dict, Any, Optional from typing import Any, Dict, List, Optional
from pubsub import pub from pubsub import pub
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError from sqlalchemy.orm.exc import DetachedInstanceError
from src.engines.core.base_worker import Base, BaseRenderWorker from src.engines.core.base_worker import Base, BaseRenderWorker
@@ -25,182 +26,274 @@ class JobNotFoundError(Exception):
class RenderQueue: class RenderQueue:
engine: Optional[create_engine] = None _default_instance: Optional['RenderQueue'] = None
session: Optional[sessionmaker] = None
job_queue: List[BaseRenderWorker] = [] @classmethod
maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} def _sync_class(cls) -> None:
last_saved_counts: Dict[str, int] = {} if cls._default_instance is not None:
is_running: bool = False pass # no class-level attributes to sync
def __init__(self) -> None:
self.engine: Optional[create_engine] = None
self.session: Optional[Session] = None
self.job_queue: List[BaseRenderWorker] = []
self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
self.last_saved_counts: Dict[str, int] = {}
self.is_running: bool = False
self._lock = threading.Lock()
# -------------------------------------------- # --------------------------------------------
# Render Queue Evaluation: # Render Queue Evaluation:
# -------------------------------------------- # --------------------------------------------
@classmethod def _start(self) -> None:
def start(cls):
"""Start evaluating the render queue"""
logger.debug("Starting render queue updates") logger.debug("Starting render queue updates")
cls.is_running = True self.is_running = True
cls.evaluate_queue() self.evaluate_queue()
@classmethod def _evaluate_queue(self) -> None:
def evaluate_queue(cls):
try: try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started: for job in not_started:
if cls.is_available_for_job(job.engine_name, job.priority): if self.is_available_for_job(job.engine_name, job.priority):
cls.start_job(job) self.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled: for job in scheduled:
if job.scheduled_start <= datetime.now(): if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}") logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job) self.start_job(job)
if cls.last_saved_counts != cls.job_counts(): if self.last_saved_counts != self.job_counts():
cls.save_state() self.save_state()
except DetachedInstanceError: except DetachedInstanceError:
pass pass
@classmethod def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
def __local_job_status_changed(cls, job_id, old_status, new_status): render_job = self.job_with_id(job_id, none_ok=True)
render_job = RenderQueue.job_with_id(job_id, none_ok=True) if render_job and self.is_running:
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}") logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue() self.evaluate_queue()
@classmethod def _stop(self) -> None:
def stop(cls):
logger.debug("Stopping render queue updates") logger.debug("Stopping render queue updates")
cls.is_running = False self.is_running = False
# -------------------------------------------- # --------------------------------------------
# Fetch Jobs: # Fetch Jobs:
# -------------------------------------------- # --------------------------------------------
@classmethod def _all_jobs(self) -> List[BaseRenderWorker]:
def all_jobs(cls): return self.job_queue
return cls.job_queue
@classmethod def _running_jobs(self) -> List[BaseRenderWorker]:
def running_jobs(cls): return self.jobs_with_status(RenderStatus.RUNNING)
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod def _pending_jobs(self) -> List[BaseRenderWorker]:
def pending_jobs(cls): pending = self.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED))
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) return pending
return pending_jobs
@classmethod def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]:
def jobs_with_status(cls, status, priority_sorted=False): found_jobs = [x for x in self.all_jobs() if x.status == status]
found_jobs = [x for x in cls.all_jobs() if x.status == status]
if priority_sorted: if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs return found_jobs
@classmethod def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]:
def job_with_id(cls, job_id, none_ok=False): found_job = next((x for x in self.all_jobs() if x.id == job_id), None)
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok: if not found_job and not none_ok:
raise JobNotFoundError(job_id) raise JobNotFoundError(job_id)
return found_job return found_job
@classmethod def _job_counts(self) -> Dict[str, int]:
def job_counts(cls): counts = Counter(x.status for x in self.all_jobs())
job_counts = {} return {s.value: counts.get(s, 0) for s in RenderStatus}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
# -------------------------------------------- # --------------------------------------------
# Startup / Shutdown: # Startup / Shutdown:
# -------------------------------------------- # --------------------------------------------
@classmethod def _load_state(self, database_directory: Path) -> None:
def load_state(cls, database_directory: Path): self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
if not cls.engine: Base.metadata.create_all(self.engine)
cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") self.session = sessionmaker(bind=self.engine)()
Base.metadata.create_all(cls.engine) from src.engines.core.base_worker import BaseRenderWorker
cls.session = sessionmaker(bind=cls.engine)() self.job_queue = self.session.query(BaseRenderWorker).all()
cls.job_queue = cls.session.query(BaseRenderWorker).all() pub.subscribe(self._local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_status_changed, 'status_change')
@classmethod def _save_state(self) -> None:
def save_state(cls): if self.session:
cls.session.commit() self.session.commit()
@classmethod def _prepare_for_shutdown(self) -> None:
def prepare_for_shutdown(cls):
logger.debug("Closing session") logger.debug("Closing session")
cls.stop() self.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs running_jobs = self.jobs_with_status(RenderStatus.RUNNING)
_ = [cls.cancel_job(job) for job in running_jobs] for job in running_jobs:
cls.save_state() self.cancel_job(job)
cls.session.close() self.save_state()
if self.session:
self.session.close()
# -------------------------------------------- # --------------------------------------------
# Renderer Availability: # Renderer Availability:
# -------------------------------------------- # --------------------------------------------
@classmethod def renderer_instances(self) -> Counter:
def renderer_instances(cls): all_instances = [x.engine_name for x in self.running_jobs()]
all_instances = [x.engine_name for x in cls.running_jobs()]
return Counter(all_instances) return Counter(all_instances)
@classmethod def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool:
def is_available_for_job(cls, renderer, priority=2): instances = self.renderer_instances()
higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority]
instances = cls.renderer_instances() max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1)
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs return not maxed_out_instances and not higher_priority_jobs
# -------------------------------------------- # --------------------------------------------
# Job Lifecycle Management: # Job Lifecycle Management:
# -------------------------------------------- # --------------------------------------------
@classmethod def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None:
def add_to_render_queue(cls, render_job, force_start=False):
logger.info(f"Adding job to render queue: {render_job}") logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job) with self._lock:
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): self.job_queue.append(render_job)
cls.start_job(render_job) if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.session.add(render_job) self.start_job(render_job)
cls.save_state() self.session.add(render_job)
if cls.is_running: self.save_state()
cls.evaluate_queue() if self.is_running:
self.evaluate_queue()
@classmethod def _start_job(self, job: BaseRenderWorker) -> None:
def start_job(cls, job):
logger.info(f'Starting job: {job}') logger.info(f'Starting job: {job}')
job.start() job.start()
cls.save_state() self.save_state()
@classmethod def _cancel_job(self, job: BaseRenderWorker) -> bool:
def cancel_job(cls, job):
logger.info(f'Cancelling job: {job}') logger.info(f'Cancelling job: {job}')
job.stop() job.stop()
return job.status == RenderStatus.CANCELLED return job.status == RenderStatus.CANCELLED
@classmethod def _delete_job(self, job: BaseRenderWorker) -> bool:
def delete_job(cls, job):
logger.info(f"Deleting job: {job}") logger.info(f"Deleting job: {job}")
with self._lock:
job.stop() job.stop()
cls.job_queue.remove(job) self.job_queue.remove(job)
cls.session.delete(job) self.session.delete(job)
cls.save_state() self.save_state()
return True return True
# -------------------------------------------- # --------------------------------------------
# Miscellaneous: # Miscellaneous:
# -------------------------------------------- # --------------------------------------------
def _clear_history(self) -> None:
for job in list(self.all_jobs()):
if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR):
self.delete_job(job)
self.save_state()
# --- Forwarders for backward compatibility ---
@classmethod
def start(cls):
if cls._default_instance is not None:
cls._default_instance._start()
@classmethod
def evaluate_queue(cls):
if cls._default_instance is not None:
cls._default_instance._evaluate_queue()
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def all_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance.job_queue
return []
@classmethod
def running_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._running_jobs()
return []
@classmethod
def pending_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._pending_jobs()
return []
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
if cls._default_instance is not None:
return cls._default_instance._jobs_with_status(status, priority_sorted)
return []
@classmethod
def job_with_id(cls, job_id, none_ok=False):
if cls._default_instance is not None:
return cls._default_instance._job_with_id(job_id, none_ok)
if not none_ok:
raise JobNotFoundError(job_id)
return None
@classmethod
def job_counts(cls):
if cls._default_instance is not None:
return cls._default_instance._job_counts()
return {}
@classmethod
def load_state(cls, database_directory):
if cls._default_instance is not None:
cls._default_instance._load_state(database_directory)
@classmethod
def save_state(cls):
if cls._default_instance is not None:
cls._default_instance._save_state()
@classmethod
def prepare_for_shutdown(cls):
if cls._default_instance is not None:
cls._default_instance._prepare_for_shutdown()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if cls._default_instance is not None:
return cls._default_instance._is_available_for_job(renderer, priority)
return True
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
if cls._default_instance is not None:
cls._default_instance._add_to_render_queue(render_job, force_start)
@classmethod
def start_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._start_job(job)
@classmethod
def cancel_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._cancel_job(job)
return False
@classmethod
def delete_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._delete_job(job)
return False
@classmethod @classmethod
def clear_history(cls): def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, if cls._default_instance is not None:
RenderStatus.COMPLETED, RenderStatus.ERROR]] cls._default_instance._clear_history()
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
+6 -3
View File
@@ -1,8 +1,11 @@
import logging
import socket import socket
from pathlib import Path from pathlib import Path
import psutil import psutil
from PyQt6.QtCore import QThread, pyqtSignal, Qt, pyqtSlot from PyQt6.QtCore import QThread, pyqtSignal, Qt, pyqtSlot
logger = logging.getLogger(__name__)
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QSpinBox, QComboBox, QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QSpinBox, QComboBox,
QGroupBox, QCheckBox, QProgressBar, QPlainTextEdit, QDoubleSpinBox, QMessageBox, QListWidget, QListWidgetItem, QGroupBox, QCheckBox, QProgressBar, QPlainTextEdit, QDoubleSpinBox, QMessageBox, QListWidget, QListWidgetItem,
@@ -306,7 +309,7 @@ class NewRenderJobForm(QWidget):
def update_job_count(self, changed_item=None): def update_job_count(self, changed_item=None):
checked = 1 checked = 1
if self.cameras_group.enabled: if self.cameras_group.isEnabled():
checked = 0 checked = 0
total = self.cameras_list.count() total = self.cameras_list.count()
@@ -463,8 +466,8 @@ class NewRenderJobForm(QWidget):
text_box = QLineEdit() text_box = QLineEdit()
h_layout.addWidget(text_box) h_layout.addWidget(text_box)
self.engine_options_layout.addLayout(h_layout) self.engine_options_layout.addLayout(h_layout)
except AttributeError: except AttributeError as e:
pass logger.error(f"AttributeError in post_get_project_info_update: {e}")
def toggle_engine_enablement(self, enabled=False): def toggle_engine_enablement(self, enabled=False):
"""Toggle on/off all the render settings""" """Toggle on/off all the render settings"""
+4 -4
View File
@@ -247,9 +247,8 @@ class MainWindow(QMainWindow):
# Update server information display # Update server information display
self.update_server_info_display(new_hostname) self.update_server_info_display(new_hostname)
except AttributeError: except AttributeError as e:
# Handle cases where the server list view might not be properly initialized logger.error(f"AttributeError in server_picked: {e}")
pass
def update_server_info_display(self, hostname): def update_server_info_display(self, hostname):
"""Updates the server information section of the UI.""" """Updates the server information section of the UI."""
@@ -405,7 +404,8 @@ class MainWindow(QMainWindow):
id_item = self.job_list_view.item(selected_row.row(), 0) id_item = self.job_list_view.item(selected_row.row(), 0)
job_ids.append(id_item.text()) job_ids.append(id_item.text())
return job_ids return job_ids
except AttributeError: except AttributeError as e:
logger.error(f"AttributeError in selected_job_ids: {e}")
return [] return []
+34 -15
View File
@@ -1,12 +1,24 @@
import os import os
from pathlib import Path from pathlib import Path
from typing import Optional
import yaml import yaml
from src.utilities.misc_helper import current_system_os, copy_directory_contents from src.utilities.misc_helper import current_system_os, copy_directory_contents
_CONFIG_ATTRS = [
'upload_folder', 'update_engines_on_launch', 'max_content_path',
'server_log_level', 'log_buffer_length', 'worker_process_timeout',
'flask_log_level', 'flask_debug_enable', 'queue_eval_seconds',
'port_number', 'enable_split_jobs', 'download_timeout_seconds',
]
class Config: class Config:
# Initialize class variables with default values _default_instance: Optional['Config'] = None
# Class-level defaults — mutated by _sync_class() so existing
# callers (Config.upload_folder) continue to work during the
# migration to instance-based access.
upload_folder = "~/zordon-uploads/" upload_folder = "~/zordon-uploads/"
update_engines_on_launch = True update_engines_on_launch = True
max_content_path = 100000000 max_content_path = 100000000
@@ -20,23 +32,30 @@ class Config:
enable_split_jobs = True enable_split_jobs = True
download_timeout_seconds = 120 download_timeout_seconds = 120
@classmethod def __init__(self) -> None:
def load_config(cls, config_path): for attr in _CONFIG_ATTRS:
setattr(self, attr, getattr(Config, attr))
def load(self, config_path: Path) -> None:
with open(config_path, 'r') as ymlfile: with open(config_path, 'r') as ymlfile:
cfg = yaml.safe_load(ymlfile) cfg = yaml.safe_load(ymlfile)
for attr in _CONFIG_ATTRS:
if attr in cfg:
setattr(self, attr, cfg[attr])
self.upload_folder = str(Path(self.upload_folder).expanduser())
cls.upload_folder = str(Path(cfg.get('upload_folder', cls.upload_folder)).expanduser()) @classmethod
cls.update_engines_on_launch = cfg.get('update_engines_on_launch', cls.update_engines_on_launch) def _sync_class(cls) -> None:
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) if cls._default_instance is not None:
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) for attr in _CONFIG_ATTRS:
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) setattr(cls, attr, getattr(cls._default_instance, attr))
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) @classmethod
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) def load_config(cls, config_path: Path) -> None:
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds) instance = Config()
cls.port_number = cfg.get('port_number', cls.port_number) instance.load(config_path)
cls.enable_split_jobs = cfg.get('enable_split_jobs', cls.enable_split_jobs) cls._default_instance = instance
cls.download_timeout_seconds = cfg.get('download_timeout_seconds', cls.download_timeout_seconds) cls._sync_class()
@classmethod @classmethod
def config_dir(cls) -> Path: def config_dir(cls) -> Path:
+102 -62
View File
@@ -1,5 +1,6 @@
import logging import logging
import socket import socket
from typing import Dict, List, Optional
from pubsub import pub from pubsub import pub
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \ from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
@@ -9,105 +10,144 @@ logger = logging.getLogger()
class ZeroconfServer: class ZeroconfServer:
service_type = None _default_instance: Optional['ZeroconfServer'] = None
server_name = None
server_port = None
server_ip = None
zeroconf = Zeroconf()
service_info = None
client_cache = {}
properties = {}
@classmethod service_type: Optional[str] = None
def configure(cls, service_type, server_name, server_port): server_name: Optional[str] = None
cls.service_type = service_type server_port: Optional[int] = None
cls.server_name = server_name properties: Dict = {}
cls.server_port = server_port
try: # Stop any previously running instances def __init__(self) -> None:
self.service_type: Optional[str] = None
self.server_name: Optional[str] = None
self.server_port: Optional[int] = None
self.server_ip: Optional[str] = None
self.zeroconf: Zeroconf = Zeroconf()
self.service_info: Optional[ServiceInfo] = None
self.client_cache: Dict = {}
self.properties: Dict = {}
def _configure(self, service_type: str, server_name: str, server_port: int) -> None:
self.service_type = service_type
self.server_name = server_name
self.server_port = server_port
try:
socket.gethostbyname(socket.gethostname()) socket.gethostbyname(socket.gethostname())
except socket.gaierror: except socket.gaierror:
cls.stop() self.stop()
@classmethod def _start(self, listen_only: bool = False) -> None:
def start(cls, listen_only=False): if not self.service_type:
if not cls.service_type:
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server") raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
elif not listen_only: if not listen_only:
logger.debug(f"Starting zeroconf service") logger.debug("Starting zeroconf service")
cls._register_service() self._register_service()
else: else:
logger.debug(f"Starting zeroconf service - Listen only mode") logger.debug("Starting zeroconf service - Listen only mode")
cls._browse_services() self._browse_services()
@classmethod def _stop(self) -> None:
def stop(cls):
logger.debug("Stopping zeroconf service") logger.debug("Stopping zeroconf service")
cls._unregister_service() self._unregister_service()
cls.zeroconf.close() if self.zeroconf:
self.zeroconf.close()
@classmethod def _register_service(self) -> None:
def _register_service(cls):
try: try:
cls.server_ip = socket.gethostbyname(socket.gethostname()) self.server_ip = socket.gethostbyname(socket.gethostname())
info = ServiceInfo( info = ServiceInfo(
cls.service_type, self.service_type,
f"{cls.server_name}.{cls.service_type}", f"{self.server_name}.{self.service_type}",
addresses=[socket.inet_aton(cls.server_ip)], addresses=[socket.inet_aton(self.server_ip)],
port=cls.server_port, port=self.server_port,
properties=cls.properties, properties=self.properties,
) )
cls.service_info = info self.service_info = info
cls.zeroconf.register_service(info) self.zeroconf.register_service(info)
logger.info(f"Registered zeroconf service: {cls.service_info.name}") logger.info(f"Registered zeroconf service: {self.service_info.name}")
except (NonUniqueNameException, socket.gaierror) as e: except (NonUniqueNameException, socket.gaierror) as e:
logger.error(f"Error establishing zeroconf: {e}") logger.error(f"Error establishing zeroconf: {e}")
@classmethod def _unregister_service(self) -> None:
def _unregister_service(cls): if self.service_info:
if cls.service_info: self.zeroconf.unregister_service(self.service_info)
cls.zeroconf.unregister_service(cls.service_info) logger.info(f"Unregistered zeroconf service: {self.service_info.name}")
logger.info(f"Unregistered zeroconf service: {cls.service_info.name}") self.service_info = None
cls.service_info = None
@classmethod def _browse_services(self) -> None:
def _browse_services(cls): ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered])
browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered])
browser.is_alive()
@classmethod def _on_service_discovered(self, zeroconf, service_type, name, state_change) -> None:
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
try: try:
info = zeroconf.get_service_info(service_type, name) info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0] hostname = name.split(f'.{self.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}") logger.debug(f"Zeroconf: {hostname} {state_change}")
if service_type == cls.service_type: if service_type == self.service_type:
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated: if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
cls.client_cache[hostname] = info self.client_cache[hostname] = info
else: else:
cls.client_cache.pop(hostname) self.client_cache.pop(hostname, None)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change) pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
except NotRunningException: except NotRunningException:
pass pass
@classmethod @classmethod
def found_hostnames(cls): def _sync_class(cls) -> None:
if cls._default_instance is not None:
inst = cls._default_instance
cls.service_type = inst.service_type
cls.server_name = inst.server_name
cls.server_port = inst.server_port
cls.server_ip = inst.server_ip
cls.properties = inst.properties
def _found_hostnames(self) -> List[str]:
local_hostname = socket.gethostname() local_hostname = socket.gethostname()
def sort_key(hostname): def sort_key(hostname):
# Return 0 if it's the local hostname so it comes first, else return 1
return False if hostname == local_hostname else True return False if hostname == local_hostname else True
# Sort the list with the local hostname first sorted_hostnames = sorted(self.client_cache.keys(), key=sort_key)
sorted_hostnames = sorted(cls.client_cache.keys(), key=sort_key)
return sorted_hostnames return sorted_hostnames
def _get_hostname_properties(self, hostname: str) -> Dict:
server_info = self.client_cache.get(hostname)
if server_info is None:
return {}
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.properties.items()}
return decoded_server_info
# --- Forwarders for backward compatibility ---
@classmethod
def configure(cls, service_type, server_name, server_port):
if cls._default_instance is not None:
cls._default_instance._configure(service_type, server_name, server_port)
cls._sync_class()
@classmethod
def start(cls, listen_only=False):
if cls._default_instance is not None:
cls._default_instance._start(listen_only)
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def found_hostnames(cls):
if cls._default_instance is not None:
return cls._default_instance._found_hostnames()
return []
@classmethod @classmethod
def get_hostname_properties(cls, hostname): def get_hostname_properties(cls, hostname):
server_info = cls.client_cache.get(hostname).properties if cls._default_instance is not None:
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.items()} return cls._default_instance._get_hostname_properties(hostname)
return decoded_server_info return {}
# Example usage: # Example usage: