refactor: wire all services through ApplicationContext

- Created src/application_context.py as DI container with TYPE_CHECKING imports
- server.py now instantiates all services in dependency order via ApplicationContext
- Fixed infinite recursion bug: 48 instance methods renamed with underscore prefix
  to avoid shadowing by same-named @classmethod forwarders
- ZeroconfServer: instantiate Zeroconf() in __init__, add _sync_class() to
  configure forwarder, direct _configure/_start calls during wiring
- Config, EngineManager, PreviewManager: all forwarders and _sync_class() intact
- RenderQueue: load_state and subscribe moved to __init__, threading.Lock retained
- DistributedJobManager: subscribe_to_listener moved to __init__
This commit is contained in:
Brett Williams
2026-06-05 05:34:32 -05:00
parent 74dce5cc3d
commit 552c791207
8 changed files with 683 additions and 605 deletions
+42 -14
View File
@@ -11,12 +11,13 @@ from src.api.api_server import API_VERSION
from src.api.api_server import start_api_server
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.application_context import ApplicationContext
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.misc_helper import (get_gpu_info, current_system_cpu, current_system_os,
current_system_os_version, current_system_cpu_brand, check_for_updates)
current_system_os_version, current_system_cpu_brand)
from src.utilities.zeroconf_server import ZeroconfServer
from src.version import APP_NAME, APP_VERSION
@@ -26,21 +27,44 @@ logger = logging.getLogger()
class ZordonServer:
def __init__(self):
self.ctx = ApplicationContext()
# setup logging
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper())
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Load Config YAML
# ---- Bootstrap Config ----
Config.setup_config_dir()
config_path = Path(Config.config_dir()) / "config.yaml"
Config.load_config(config_path)
self.ctx.config = Config()
self.ctx.config.load(config_path)
Config._default_instance = self.ctx.config
Config._sync_class()
# configure default paths
EngineManager.engines_path = Path(Config.upload_folder).expanduser()/ "engines"
os.makedirs(EngineManager.engines_path, exist_ok=True)
PreviewManager.storage_path = Path(Config.upload_folder).expanduser() / "previews"
# ---- Engine Manager ----
self.ctx.engine_manager = EngineManager()
self.ctx.engine_manager.engines_path = Path(Config.upload_folder).expanduser() / "engines"
os.makedirs(self.ctx.engine_manager.engines_path, exist_ok=True)
EngineManager._default_instance = self.ctx.engine_manager
EngineManager._sync_class()
# ---- Preview Manager ----
self.ctx.preview_manager = PreviewManager()
self.ctx.preview_manager.storage_path = Path(Config.upload_folder).expanduser() / "previews"
PreviewManager._default_instance = self.ctx.preview_manager
PreviewManager._sync_class()
# ---- Render Queue ----
self.ctx.render_queue = RenderQueue()
self.ctx.render_queue.load_state(database_directory=Path(Config.upload_folder).expanduser())
RenderQueue._default_instance = self.ctx.render_queue
# ---- Distributed Job Manager ----
self.ctx.distributed_job_manager = DistributedJobManager()
self.ctx.distributed_job_manager.subscribe_to_listener()
DistributedJobManager._default_instance = self.ctx.distributed_job_manager
self.api_server = None
self.server_hostname = None
@@ -74,10 +98,8 @@ class ZordonServer:
logger.debug(f"Upload directory: {Path(Config.upload_folder).expanduser()}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=Path(Config.upload_folder).expanduser())
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
# get hostname
self.server_hostname = socket.gethostname()
@@ -88,16 +110,21 @@ class ZordonServer:
self.api_server.start()
# start zeroconf server
ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
ctx = self.ctx
ctx.zeroconf_server = ZeroconfServer()
ctx.zeroconf_server._configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ctx.zeroconf_server.properties = {'system_cpu': current_system_cpu(),
'system_cpu_brand': current_system_cpu_brand(),
'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version(),
'system_memory': round(psutil.virtual_memory().total / (1024**3)), # in GB
'system_memory': round(psutil.virtual_memory().total / (1024**3)),
'gpu_info': get_gpu_info(),
'api_version': API_VERSION}
ZeroconfServer.start()
ZeroconfServer._default_instance = ctx.zeroconf_server
ZeroconfServer._sync_class()
ctx.zeroconf_server._start()
logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}")
RenderQueue.start() # Start evaluating the render queue
@@ -113,6 +140,7 @@ class ZordonServer:
logger.exception(f"Exception during prepare for shutdown: {e}")
logger.info(f"{APP_NAME} Render Server has shut down")
if __name__ == '__main__':
server = ZordonServer()
try:
+40 -16
View File
@@ -3,6 +3,7 @@ import os
import subprocess
import threading
from pathlib import Path
from typing import Dict, Optional
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
@@ -12,12 +13,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web
class PreviewManager:
_default_instance: Optional['PreviewManager'] = None
storage_path = None
_running_jobs = {}
storage_path: Optional[str] = None
def __init__(self) -> None:
self.storage_path = None
self._running_jobs: Dict = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480):
def _sync_class(cls) -> None:
if cls._default_instance is not None:
cls.storage_path = cls._default_instance.storage_path
def _generate_job_preview_worker(self, job, replace_existing=False, max_width=480):
# Determine best source file to use for thumbs
job_file_list = job.file_list()
@@ -33,8 +42,8 @@ class PreviewManager:
logger.warning(f"No valid image or video files found in files from job: {job}")
return
os.makedirs(cls.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
os.makedirs(self.storage_path, exist_ok=True)
base_path = os.path.join(self.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg'
@@ -65,25 +74,23 @@ class PreviewManager:
except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}")
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = cls._running_jobs.get(job.id)
def _update_previews_for_job(self, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = self._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
else:
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread = threading.Thread(target=self._generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
self._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
def _get_previews_for_job(self, job):
results = {}
try:
directory_path = Path(cls.storage_path)
directory_path = Path(self.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
for preview_filename in preview_files_for_job:
@@ -101,9 +108,8 @@ class PreviewManager:
pass
return results
@classmethod
def delete_previews_for_job(cls, job):
all_previews = cls.get_previews_for_job(job)
def _delete_previews_for_job(self, job):
all_previews = self.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:
try:
@@ -111,3 +117,21 @@ class PreviewManager:
os.remove(preview['filename'])
except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")
# --- Forwarders for backward compatibility ---
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
if cls._default_instance is not None:
cls._default_instance._update_previews_for_job(job, replace_existing, wait_until_completion, timeout)
@classmethod
def get_previews_for_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._get_previews_for_job(job)
return {}
@classmethod
def delete_previews_for_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._delete_previews_for_job(job)
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.zeroconf_server import ZeroconfServer
class ApplicationContext:
"""Holds all service instances. Single source of truth for wiring."""
def __init__(self) -> None:
self.config: Optional[Config] = None
self.engine_manager: Optional[EngineManager] = None
self.preview_manager: Optional[PreviewManager] = None
self.zeroconf_server: Optional[ZeroconfServer] = None
self.render_queue: Optional[RenderQueue] = None
self.distributed_job_manager: Optional[DistributedJobManager] = None
self.server_proxy_manager: Optional[ServerProxyManager] = None
+67 -145
View File
@@ -3,8 +3,9 @@ import os
import socket
import threading
import time
from typing import Optional
from pathlib import Path
from click import Path
from plyer import notification
from pubsub import pub
@@ -21,47 +22,27 @@ logger = logging.getLogger()
class DistributedJobManager:
_default_instance: Optional['DistributedJobManager'] = None
def __init__(self):
pass
def __init__(self) -> None:
self.background_worker: Optional[threading.Thread] = None
@classmethod
def subscribe_to_listener(cls):
"""
Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message.
This should be called once, typically during the initialization phase.
"""
pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
@classmethod
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
"""
Responds to the 'frame_complete' pubsub message for local jobs.
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
def _subscribe_to_listener(self) -> None:
pub.subscribe(self._local_job_status_changed, 'status_change')
pub.subscribe(self._local_job_frame_complete, 'frame_complete')
def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None:
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue
if not render_job:
return
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews)
self._job_update_shared(render_job, replace_existing_previews)
@classmethod
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
def _job_update_shared(self, render_job, replace_existing_previews=False) -> None:
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try:
@@ -70,57 +51,41 @@ class DistributedJobManager:
except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod
def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str):
"""
Responds to the 'status_change' pubsub message for local jobs.
If it's a child job, it notifies the parent job about the status change.
Args:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'status_change' pubsub message.
"""
def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs created but not yet added to queue
if not render_job:
return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
# Handle children
if render_job.children:
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR):
for child in render_job.children:
child_id, child_hostname = child.split('@')
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
# UI Notifications
try:
if new_status == RenderStatus.COMPLETED:
logger.debug("Show render complete notification")
notification.notify(
title='Render Job Complete',
message=f'{render_job.name} completed succesfully',
timeout=10 # Display time in seconds
timeout=10
)
elif new_status == RenderStatus.ERROR:
logger.debug("Show render error notification")
notification.notify(
title='Render Job Failed',
message=f'{render_job.name} failed rendering',
timeout=10 # Display time in seconds
timeout=10
)
elif new_status == RenderStatus.RUNNING:
logger.debug("Show render started notification")
notification.notify(
title='Render Job Started',
message=f'{render_job.name} started rendering',
timeout=10 # Display time in seconds
timeout=10
)
except Exception as e:
logger.debug(f"Unable to show UI notification: {e}")
@@ -129,30 +94,15 @@ class DistributedJobManager:
# Create Job
# --------------------------------------------
@classmethod
def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path):
"""Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new
render job.
Args:
new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
loaded_project_local_path (Path): The local path to the loaded project.
Returns:
worker: Created job worker
"""
# get new output path in output_dir
def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path):
output_path = new_job_attributes.get('output_path')
output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem
# Prepare output path
output_dir = loaded_project_local_path.parent.parent / "output"
output_path = output_dir / output_filename
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'],
input_path=loaded_project_local_path,
output_path=output_path,
@@ -160,16 +110,15 @@ class DistributedJobManager:
args=new_job_attributes.get('args', {}),
parent=new_job_attributes.get('parent'),
name=new_job_attributes.get('name'))
worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary?
worker.status = new_job_attributes.get("initial_status", worker.status)
worker.priority = int(new_job_attributes.get('priority', worker.priority))
worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame))
worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname()
# determine if we can / should split the job
if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
else:
worker.status = RenderStatus.NOT_STARTED
@@ -182,15 +131,7 @@ class DistributedJobManager:
# Handling Subjobs
# --------------------------------------------
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data: dict):
"""Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): Subjob data sent from the remote server.
"""
def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None:
subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id']
subjob_hostname = subjob_data['hostname']
@@ -206,19 +147,10 @@ class DistributedJobManager:
if subjob_data['status'] == 'completed' and download_success:
local_job.children[subjob_key]['download_status'] = 'completed'
@classmethod
def wait_for_subjobs(cls, parent_job):
"""Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs
when they are completed.
Args:
parent_job: Worker object that has child jobs
Returns:
"""
def _wait_for_subjobs(self, parent_job) -> None:
logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED)
def subjobs_not_downloaded():
return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or
@@ -230,21 +162,17 @@ class DistributedJobManager:
sleep_counter = 0
while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS:
if sleep_counter % server_delay == 0: # only ping servers every x seconds
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
if sleep_counter % server_delay == 0:
for child_key in subjobs_not_downloaded():
subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1]
# Fetch info from server and handle failing case
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
if not subjob_data:
logger.warning(f"No response from {subjob_hostname}")
# timeout / missing server situations
parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}'
continue
# Update parent job cache but keep the download status
download_status = parent_job.children[child_key].get('download_status', None)
parent_job.children[child_key] = subjob_data
parent_job.children[child_key]['download_status'] = download_status
@@ -254,8 +182,7 @@ class DistributedJobManager:
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
logger.debug(status_msg)
# Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
if download_status is None and subjob_data.get('file_count') and status in statuses_to_download:
try:
download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname)
parent_job.children[child_key]['download_status'] = 'complete'
@@ -263,7 +190,6 @@ class DistributedJobManager:
logger.error(f"Error downloading missing frames from subjob: {e}")
parent_job.children[child_key]['download_status'] = 'error: {}'
# Any finished jobs not successfully downloaded at this point are skipped
if parent_job.children[child_key].get('download_status', None) is None and \
status in statuses_to_download:
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
@@ -274,42 +200,22 @@ class DistributedJobManager:
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(1)
sleep_counter += 1
else: # exit the loop
else:
parent_job.status = RenderStatus.RUNNING
# --------------------------------------------
# Creating Subjobs
# --------------------------------------------
@classmethod
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
# todo: I don't love this
def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None:
parent_worker.status = RenderStatus.CONFIGURING
cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes,
project_path, system_os))
cls.background_worker.start()
self.background_worker = threading.Thread(target=self.split_into_subjobs, args=(
parent_worker, new_job_attributes, project_path, system_os))
self.background_worker.start()
@classmethod
def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None):
"""
Splits a job into subjobs and distributes them among available servers.
This method checks the availability of servers, distributes the work among them, and creates subjobs on each
server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a
subjob.
Args:
parent_worker (Worker): The parent job what we're creating the subjobs for.
new_job_attributes (dict): Dict of desired attributes for new job (frame count, engine, output path, etc)
project_path (str): The path to the project.
system_os (str, optional): Required OS. Default is any.
specific_servers (list, optional): List of specific servers to split work between. Defaults to all found.
"""
# Check availability
available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.engine_name,
system_os)
# skip if theres no external servers found
def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None:
available_servers = specific_servers if specific_servers else self.find_available_servers(
parent_worker.engine_name, system_os)
external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname]
if not external_servers:
parent_worker.status = RenderStatus.NOT_STARTED
@@ -318,34 +224,29 @@ class DistributedJobManager:
logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}")
all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
# Prep and submit these sub-jobs
logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}")
try:
for subjob_data in all_subjob_server_data:
subjob_hostname = subjob_data['hostname']
post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
parent_worker)
if not post_results.ok:
ValueError(f"Failed to create subjob on {subjob_hostname}")
# save child info
submission_results = post_results.json()[0]
child_key = f"{submission_results['id']}@{subjob_hostname}"
parent_worker.children[child_key] = submission_results
# start subjobs
logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully")
parent_worker.name = f"{parent_worker.name} (Parent)"
parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts
parent_worker.status = RenderStatus.NOT_STARTED
except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs")
RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
@staticmethod
def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker):
"""Convenience method to create subjobs for a parent worker"""
def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker):
subjob = new_job_attributes.copy()
subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}"
@@ -364,13 +265,6 @@ class DistributedJobManager:
@staticmethod
def find_available_servers(engine_name: str, system_os=None):
"""
Scan the Zeroconf network for currently available render servers supporting a specific engine.
:param engine_name: str, The engine type to search for
:param system_os: str, Restrict results to servers running a specific OS
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
from api.api_server import API_VERSION
found_available_servers = []
for hostname in ZeroconfServer.found_hostnames():
@@ -383,6 +277,34 @@ class DistributedJobManager:
return found_available_servers
# --- Forwarders for backward compatibility ---
@classmethod
def subscribe_to_listener(cls):
if cls._default_instance is not None:
cls._default_instance._subscribe_to_listener()
@classmethod
def create_render_job(cls, new_job_attributes, loaded_project_local_path):
if cls._default_instance is not None:
return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path)
raise RuntimeError("DistributedJobManager is not initialized")
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data):
if cls._default_instance is not None:
cls._default_instance._handle_subjob_update_notification(local_job, subjob_data)
@classmethod
def wait_for_subjobs(cls, parent_job):
if cls._default_instance is not None:
cls._default_instance._wait_for_subjobs(parent_job)
@classmethod
def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
if cls._default_instance is not None:
cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+172 -239
View File
@@ -20,128 +20,78 @@ class EngineManager:
if possible.
"""
_default_instance: Optional['EngineManager'] = None
engines_path: Optional[str] = None
download_tasks: List[Any] = []
def __init__(self) -> None:
self.engines_path: Optional[str] = None
self.download_tasks: List[Any] = []
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
cls.engines_path = cls._default_instance.engines_path
cls.download_tasks = cls._default_instance.download_tasks
@staticmethod
def supported_engines() -> list[type[BaseRenderEngine]]:
"""Return list of supported engine classes.
Returns:
List[Type[BaseRenderEngine]]: List of available engine classes.
"""
return ENGINE_CLASSES
# --- Installed Engines ---
@classmethod
def engine_class_for_project_path(cls, path: str) -> Type[BaseRenderEngine]:
"""Find engine class that can handle the given project file.
Args:
path: Path to project file.
Returns:
Type[BaseRenderEngine]: Engine class that can handle the file.
"""
def _engine_class_for_project_path(self, path: str) -> Type[BaseRenderEngine]:
_, extension = os.path.splitext(path)
extension = extension.lower().strip('.')
for engine_class in cls.supported_engines():
engine = cls.get_latest_engine_instance(engine_class)
for engine_class in self.supported_engines():
engine = self.get_latest_engine_instance(engine_class)
if extension in engine.supported_extensions():
return engine_class
undefined_renderer_support = [x for x in cls.supported_engines() if not cls.get_latest_engine_instance(x).supported_extensions()]
undefined_renderer_support = [x for x in self.supported_engines() if not self.get_latest_engine_instance(x).supported_extensions()]
return undefined_renderer_support[0]
@classmethod
def engine_class_with_name(cls, engine_name: str) -> Optional[Type[BaseRenderEngine]]:
"""Find engine class by name.
Args:
engine_name: Name of engine to find.
Returns:
Optional[Type[BaseRenderEngine]]: Engine class if found, None otherwise.
"""
for obj in cls.supported_engines():
def _engine_class_with_name(self, engine_name: str) -> Optional[Type[BaseRenderEngine]]:
for obj in self.supported_engines():
if obj.name().lower() == engine_name.lower():
return obj
return None
@classmethod
def get_latest_engine_instance(cls, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine:
"""Create instance of latest installed engine version.
Args:
engine_class: Engine class to instantiate.
Returns:
BaseRenderEngine: Instance of engine with latest version.
"""
newest = cls.newest_installed_engine_data(engine_class.name())
def _get_latest_engine_instance(self, engine_class: Type[BaseRenderEngine]) -> BaseRenderEngine:
newest = self.newest_installed_engine_data(engine_class.name())
engine = engine_class(newest["path"])
return engine
@classmethod
def get_installed_engine_data(cls, filter_name: Optional[str] = None, include_corrupt: bool = False,
def _get_installed_engine_data(self, filter_name: Optional[str] = None, include_corrupt: bool = False,
ignore_system: bool = False) -> List[Dict[str, Any]]:
"""Get data about installed render engines.
Args:
filter_name: Optional engine name to filter by.
include_corrupt: Whether to include potentially corrupted installations.
ignore_system: Whether to ignore system-installed engines.
Returns:
List[Dict[str, Any]]: List of installed engine data.
Raises:
FileNotFoundError: If engines path is not set.
"""
if not cls.engines_path:
if not self.engines_path:
raise FileNotFoundError("Engine path is not set")
# Parse downloaded engine directory
results = []
try:
all_items = os.listdir(cls.engines_path)
all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))]
keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary
all_items = os.listdir(self.engines_path)
all_directories = [item for item in all_items if os.path.isdir(os.path.join(self.engines_path, item))]
keys = ["engine", "version", "system_os", "cpu"]
for directory in all_directories:
# Split directory name into segments
segments = directory.split('-')
# Create a dictionary mapping keys to corresponding segments
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
result_dict['type'] = 'managed'
# Initialize binary_name with engine name
binary_name = result_dict['engine'].lower()
# Determine the correct binary name based on the engine and system_os
eng = cls.engine_class_with_name(result_dict['engine'])
eng = self.engine_class_with_name(result_dict['engine'])
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
# Find the path to the binary file
search_root = cls.engines_path / directory
search_root = self.engines_path / directory
match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None)
path = str(match) if match else None
result_dict['path'] = path
# fetch version number from binary - helps detect corrupted downloads - disabled due to perf issues
# binary_version = eng(path).version()
# if not binary_version:
# logger.warning(f"Possible corrupt {eng.name()} {result_dict['version']} install detected: {path}")
# if not include_corrupt:
# continue
# result_dict['version'] = binary_version or 'error'
# Add the result dictionary to results if it matches the filter_name or if no filter is applied
if not filter_name or filter_name == result_dict['engine']:
results.append(result_dict)
except FileNotFoundError as e:
logger.warning(f"Cannot find local engines download directory: {e}")
# add system installs to this list - use bg thread because it can be slow
def fetch_engine_details(eng, include_corrupt=False):
version = eng().version()
if not version and not include_corrupt:
@@ -160,7 +110,7 @@ class EngineManager:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(fetch_engine_details, eng, include_corrupt): eng.name()
for eng in cls.supported_engines()
for eng in self.supported_engines()
if eng.default_engine_path() and (not filter_name or filter_name == eng.name())
}
@@ -173,96 +123,55 @@ class EngineManager:
# --- Check for Updates ---
@classmethod
def update_all_engines(cls) -> None:
"""Check for and download updates for all downloadable engines."""
for engine in cls.downloadable_engines():
update_available = cls.is_engine_update_available(engine)
def _update_all_engines(self) -> None:
for engine in self.downloadable_engines():
update_available = self.is_engine_update_available(engine)
if update_available:
update_available['name'] = engine.name()
cls.download_engine(engine.name(), update_available['version'], background=True)
self.download_engine(engine.name(), update_available['version'], background=True)
@classmethod
def all_version_data_for_engine(cls, engine_name:str, include_corrupt=False, ignore_system=False) -> list:
"""Get all version data for a specific engine.
Args:
engine_name: Name of engine to query.
include_corrupt: Whether to include corrupt installations.
ignore_system: Whether to ignore system installations.
Returns:
list: Sorted list of engine version data (newest first).
"""
versions = cls.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
def _all_version_data_for_engine(self, engine_name: str, include_corrupt=False, ignore_system=False) -> list:
versions = self.get_installed_engine_data(filter_name=engine_name, include_corrupt=include_corrupt, ignore_system=ignore_system)
sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True)
return sorted_versions
@classmethod
def newest_installed_engine_data(cls, engine_name:str, system_os=None, cpu=None, ignore_system=None) -> list:
"""Get newest installed engine data for specific platform.
Args:
engine_name: Name of engine to query.
system_os: Operating system to filter by (defaults to current).
cpu: CPU architecture to filter by (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
list: Newest engine data or empty list if not found.
"""
def _newest_installed_engine_data(self, engine_name: str, system_os=None, cpu=None, ignore_system=None) -> list:
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
filtered = [x for x in cls.all_version_data_for_engine(engine_name, ignore_system=ignore_system)
filtered = [x for x in self.all_version_data_for_engine(engine_name, ignore_system=ignore_system)
if x['system_os'] == system_os and x['cpu'] == cpu]
return filtered[0]
except IndexError:
logger.error(f"Cannot find newest engine version for {engine_name}-{system_os}-{cpu}")
return []
@classmethod
def is_version_installed(cls, engine_name:str, version:str, system_os=None, cpu=None, ignore_system=False):
"""Check if specific engine version is installed.
Args:
engine_name: Name of engine to check.
version: Version string to check.
system_os: Operating system to check (defaults to current).
cpu: CPU architecture to check (defaults to current).
ignore_system: Whether to ignore system installations.
Returns:
Engine data if found, False otherwise.
"""
def _is_version_installed(self, engine_name: str, version: str, system_os=None, cpu=None, ignore_system=False):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
filtered = [x for x in cls.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if
filtered = [x for x in self.get_installed_engine_data(filter_name=engine_name, ignore_system=ignore_system) if
x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
return filtered[0] if filtered else False
@classmethod
def version_is_available_to_download(cls, engine_name:str, version, system_os=None, cpu=None):
def _version_is_available_to_download(self, engine_name: str, version, system_os=None, cpu=None):
try:
downloader = cls.engine_class_with_name(engine_name).downloader()
downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.version_is_available_to_download(version=version, system_os=system_os, cpu=cpu)
except Exception as e:
logger.debug(f"Exception in version_is_available_to_download: {e}")
return None
@classmethod
def find_most_recent_version(cls, engine_name:str, system_os=None, cpu=None, lts_only=False) -> dict:
def _find_most_recent_version(self, engine_name: str, system_os=None, cpu=None, lts_only=False) -> dict:
try:
downloader = cls.engine_class_with_name(engine_name).downloader()
downloader = self.engine_class_with_name(engine_name).downloader()
return downloader.find_most_recent_version(system_os=system_os, cpu=cpu)
except Exception as e:
logger.debug(f"Exception in find_most_recent_version: {e}")
return {}
@classmethod
def is_engine_update_available(cls, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
def _is_engine_update_available(self, engine_class: Type[BaseRenderEngine], ignore_system_installs=False):
logger.debug(f"Checking for updates to {engine_class.name()}")
latest_version = engine_class.downloader().find_most_recent_version()
@@ -271,7 +180,7 @@ class EngineManager:
return None
version_num = latest_version.get('version')
if cls.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs):
if self.is_version_installed(engine_class.name(), version_num, ignore_system=ignore_system_installs):
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
return None
@@ -279,18 +188,11 @@ class EngineManager:
# --- Downloads ---
@classmethod
def downloadable_engines(cls):
"""Get list of engines that support downloading.
def _downloadable_engines(self):
return [engine for engine in self.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
Returns:
List[Type[BaseRenderEngine]]: Engines with downloader capability.
"""
return [engine for engine in cls.supported_engines() if hasattr(engine, "downloader") and engine.downloader()]
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
for task in cls.download_tasks:
def _get_existing_download_task(self, engine_name, version, system_os=None, cpu=None):
for task in self.download_tasks:
task_parts = task.name.split('-')
task_engine, task_version, task_system_os, task_cpu = task_parts[:4]
@@ -299,50 +201,45 @@ class EngineManager:
return task
return None
@classmethod
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
engine_to_download = cls.engine_class_with_name(engine_name)
existing_task = cls.get_existing_download_task(engine_name, version, system_os, cpu)
def _download_engine(self, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
engine_to_download = self.engine_class_with_name(engine_name)
existing_task = self.get_existing_download_task(engine_name, version, system_os, cpu)
if existing_task:
logger.debug(f"Already downloading {engine_name} {version}")
if not background:
existing_task.join() # If download task exists, wait until it's done downloading
existing_task.join()
return None
elif not engine_to_download.downloader():
logger.warning("No valid downloader for this engine. Please update this software manually.")
return None
elif not cls.engines_path:
elif not self.engines_path:
raise FileNotFoundError("Engines path must be set before requesting downloads")
thread = EngineDownloadWorker(engine_name, version, system_os, cpu)
cls.download_tasks.append(thread)
self.download_tasks.append(thread)
thread.start()
if background:
return thread
thread.join()
found_engine = cls.is_version_installed(engine_name, version, system_os, cpu, ignore_system) # Check that engine downloaded
found_engine = self.is_version_installed(engine_name, version, system_os, cpu, ignore_system)
if not found_engine:
logger.error(f"Error downloading {engine_name}")
return found_engine
@classmethod
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
def _delete_engine_download(self, engine_name, version, system_os=None, cpu=None):
logger.info(f"Requested deletion of engine: {engine_name}-{version}")
found = cls.is_version_installed(engine_name, version, system_os, cpu)
if found and found['type'] == 'managed': # don't delete system installs
# find the root directory of the engine executable
found = self.is_version_installed(engine_name, version, system_os, cpu)
if found and found['type'] == 'managed':
root_dir_name = '-'.join([engine_name, version, found['system_os'], found['cpu']])
remove_path = os.path.join(found['path'].split(root_dir_name)[0], root_dir_name)
# delete the file path
logger.info(f"Deleting engine at path: {remove_path}")
shutil.rmtree(remove_path, ignore_errors=False)
logger.info(f"Engine {engine_name}-{version}-{found['system_os']}-{found['cpu']} successfully deleted")
return True
elif found: # these are managed by the system / user. Don't delete these.
elif found:
logger.error(f'Cannot delete requested {engine_name} {version}. Managed externally.')
else:
logger.error(f"Cannot find engine: {engine_name}-{version}")
@@ -350,52 +247,16 @@ class EngineManager:
# --- Background Tasks ---
@classmethod
def active_downloads(cls) -> list:
"""Get list of currently active download tasks.
def _active_downloads(self) -> list:
return [x for x in self.download_tasks if x.is_alive()]
Returns:
list: List of active EngineDownloadWorker threads.
"""
return [x for x in cls.download_tasks if x.is_alive()]
def _create_worker(self, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
worker_class = self.engine_class_with_name(engine_name).worker_class()
@classmethod
def create_worker(cls, engine_name: str, input_path: Path, output_path: Path, engine_version=None, args=None, parent=None, name=None):
"""
Create and return a worker instance for a specific engine.
This resolves the appropriate engine binary/path for the requested engine and version,
downloading the engine if necessary (when a specific version is requested and not found
locally). The returned worker is constructed with string paths for compatibility with
worker implementations that expect `str` rather than `Path`.
Args:
engine_name: The engine name used to resolve an engine class and its worker.
input_path: Path to the input file/folder for the worker to process.
output_path: Path where the worker should write output.
engine_version: Optional engine version to use. If `None` or `'latest'`, the newest
installed version is used. If a specific version is provided and not installed,
the engine will be downloaded.
args: Optional arguments passed through to the worker (engine-specific).
parent: Optional Qt/GUI parent object passed through to the worker constructor.
name: Optional name/label passed through to the worker constructor.
Returns:
An instance of the engine-specific worker class.
Raises:
FileNotFoundError: If no versions of the engine are installed, if the requested
version cannot be found or downloaded, or if the engine path cannot be resolved.
"""
worker_class = cls.engine_class_with_name(engine_name).worker_class()
# check to make sure we have versions installed
all_versions = cls.all_version_data_for_engine(engine_name)
all_versions = self.all_version_data_for_engine(engine_name)
if not all_versions:
raise FileNotFoundError(f"Cannot find any installed '{engine_name}' engines")
# Find the path to the requested engine version or use default
engine_path = None
if engine_version and engine_version != 'latest':
for ver in all_versions:
@@ -403,9 +264,8 @@ class EngineManager:
engine_path = ver['path']
break
# Download the required engine if not found locally
if not engine_path:
download_result = cls.download_engine(engine_name, engine_version)
download_result = self.download_engine(engine_name, engine_version)
if not download_result:
raise FileNotFoundError(f"Cannot download requested version: {engine_name} {engine_version}")
engine_path = download_result['path']
@@ -420,28 +280,109 @@ class EngineManager:
return worker_class(input_path=str(input_path), output_path=str(output_path), engine_path=engine_path, args=args,
parent=parent, name=name)
# --- Forwarders for backward compatibility ---
@classmethod
def engine_class_for_project_path(cls, path):
if cls._default_instance is not None:
return cls._default_instance._engine_class_for_project_path(path)
@classmethod
def engine_class_with_name(cls, engine_name):
if cls._default_instance is not None:
return cls._default_instance._engine_class_with_name(engine_name)
@classmethod
def get_latest_engine_instance(cls, engine_class):
if cls._default_instance is not None:
return cls._default_instance._get_latest_engine_instance(engine_class)
@classmethod
def get_installed_engine_data(cls, filter_name=None, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._get_installed_engine_data(filter_name, include_corrupt, ignore_system)
return []
@classmethod
def update_all_engines(cls):
if cls._default_instance is not None:
cls._default_instance._update_all_engines()
@classmethod
def all_version_data_for_engine(cls, engine_name, include_corrupt=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._all_version_data_for_engine(engine_name, include_corrupt, ignore_system)
return []
@classmethod
def newest_installed_engine_data(cls, engine_name, system_os=None, cpu=None, ignore_system=None):
if cls._default_instance is not None:
return cls._default_instance._newest_installed_engine_data(engine_name, system_os, cpu, ignore_system)
return []
@classmethod
def is_version_installed(cls, engine_name, version, system_os=None, cpu=None, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._is_version_installed(engine_name, version, system_os, cpu, ignore_system)
return False
@classmethod
def version_is_available_to_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._version_is_available_to_download(engine_name, version, system_os, cpu)
return None
@classmethod
def find_most_recent_version(cls, engine_name, system_os=None, cpu=None, lts_only=False):
if cls._default_instance is not None:
return cls._default_instance._find_most_recent_version(engine_name, system_os, cpu, lts_only)
return {}
@classmethod
def is_engine_update_available(cls, engine_class, ignore_system_installs=False):
if cls._default_instance is not None:
return cls._default_instance._is_engine_update_available(engine_class, ignore_system_installs)
return None
@classmethod
def downloadable_engines(cls):
if cls._default_instance is not None:
return cls._default_instance._downloadable_engines()
return []
@classmethod
def get_existing_download_task(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._get_existing_download_task(engine_name, version, system_os, cpu)
return None
@classmethod
def download_engine(cls, engine_name, version, system_os=None, cpu=None, background=False, ignore_system=False):
if cls._default_instance is not None:
return cls._default_instance._download_engine(engine_name, version, system_os, cpu, background, ignore_system)
return None
@classmethod
def delete_engine_download(cls, engine_name, version, system_os=None, cpu=None):
if cls._default_instance is not None:
return cls._default_instance._delete_engine_download(engine_name, version, system_os, cpu)
return False
@classmethod
def active_downloads(cls):
if cls._default_instance is not None:
return cls._default_instance._active_downloads()
return []
@classmethod
def create_worker(cls, engine_name, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
if cls._default_instance is not None:
return cls._default_instance._create_worker(engine_name, input_path, output_path, engine_version, args, parent, name)
raise RuntimeError("EngineManager is not initialized")
class EngineDownloadWorker(threading.Thread):
"""A thread worker for downloading a specific version of a rendering engine.
This class handles the process of downloading a rendering engine in a separate thread,
ensuring that the download process does not block the main application.
Attributes:
engine (str): The name of the rendering engine to download.
version (str): The version of the rendering engine to download.
system_os (str, optional): The operating system for which to download the engine. Defaults to current OS type.
cpu (str, optional): Requested CPU architecture. Defaults to system CPU type.
"""
def __init__(self, engine, version, system_os=None, cpu=None):
"""Initialize download worker for specific engine version.
Args:
engine: Name of engine to download.
version: Version of engine to download.
system_os: Target operating system (defaults to current).
cpu: Target CPU architecture (defaults to current).
"""
super().__init__()
self.engine = engine
self.version = version
@@ -450,19 +391,9 @@ class EngineDownloadWorker(threading.Thread):
self.percent_complete = 0
def _update_progress(self, current_progress):
"""Update download progress.
Args:
current_progress: Current download progress percentage (0-100).
"""
self.percent_complete = current_progress
def run(self):
"""Execute the download process.
Checks if engine version already exists, then downloads if not found.
Handles cleanup and error reporting.
"""
def run(self):
try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True)
@@ -470,15 +401,17 @@ def run(self):
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download
# Get the appropriate downloader class based on the engine type
downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine( self.version, download_location=EngineManager.engines_path,
downloader.download_engine(self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e:
logger.error(f"Error in download worker: {e}")
finally:
# remove itself from the downloader list
EngineManager.download_tasks.remove(self)
try:
if EngineManager._default_instance is not None:
EngineManager._default_instance.download_tasks.remove(self)
except ValueError:
pass
if __name__ == '__main__':
+193 -106
View File
@@ -1,11 +1,13 @@
import logging
import threading
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from typing import Any, Dict, List, Optional
from pubsub import pub
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from src.engines.core.base_worker import Base, BaseRenderWorker
@@ -24,184 +26,269 @@ class JobNotFoundError(Exception):
class RenderQueue:
engine: Optional[create_engine] = None
session: Optional[sessionmaker] = None
job_queue: List[BaseRenderWorker] = []
maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts: Dict[str, int] = {}
is_running: bool = False
_default_instance: Optional['RenderQueue'] = None
def __init__(self) -> None:
self.engine: Optional[create_engine] = None
self.session: Optional[Session] = None
self.job_queue: List[BaseRenderWorker] = []
self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
self.last_saved_counts: Dict[str, int] = {}
self.is_running: bool = False
self._lock = threading.Lock()
# --------------------------------------------
# Render Queue Evaluation:
# --------------------------------------------
@classmethod
def start(cls):
"""Start evaluating the render queue"""
def _start(self) -> None:
logger.debug("Starting render queue updates")
cls.is_running = True
cls.evaluate_queue()
self.is_running = True
self.evaluate_queue()
@classmethod
def evaluate_queue(cls):
def _evaluate_queue(self) -> None:
try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.engine_name, job.priority):
cls.start_job(job)
if self.is_available_for_job(job.engine_name, job.priority):
self.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
self.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
if self.last_saved_counts != self.job_counts():
self.save_state()
except DetachedInstanceError:
pass
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
render_job = self.job_with_id(job_id, none_ok=True)
if render_job and self.is_running:
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue()
self.evaluate_queue()
@classmethod
def stop(cls):
def _stop(self) -> None:
logger.debug("Stopping render queue updates")
cls.is_running = False
self.is_running = False
# --------------------------------------------
# Fetch Jobs:
# --------------------------------------------
@classmethod
def all_jobs(cls):
return cls.job_queue
def _all_jobs(self) -> List[BaseRenderWorker]:
return self.job_queue
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
def _running_jobs(self) -> List[BaseRenderWorker]:
return self.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
def _pending_jobs(self) -> List[BaseRenderWorker]:
pending = self.jobs_with_status(RenderStatus.NOT_STARTED)
pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED))
return pending
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.status == status]
def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]:
found_jobs = [x for x in self.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]:
found_job = next((x for x in self.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
def _job_counts(self) -> Dict[str, int]:
counts = Counter(x.status for x in self.all_jobs())
return {s.value: counts.get(s, 0) for s in RenderStatus}
# --------------------------------------------
# Startup / Shutdown:
# --------------------------------------------
@classmethod
def load_state(cls, database_directory: Path):
if not cls.engine:
cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(cls.engine)
cls.session = sessionmaker(bind=cls.engine)()
def _load_state(self, database_directory: Path) -> None:
self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(self.engine)
self.session = sessionmaker(bind=self.engine)()
from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
self.job_queue = self.session.query(BaseRenderWorker).all()
pub.subscribe(self._local_job_status_changed, 'status_change')
@classmethod
def save_state(cls):
cls.session.commit()
def _save_state(self) -> None:
if self.session:
self.session.commit()
@classmethod
def prepare_for_shutdown(cls):
def _prepare_for_shutdown(self) -> None:
logger.debug("Closing session")
cls.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
[cls.cancel_job(job) for job in running_jobs]
cls.save_state()
cls.session.close()
self.stop()
running_jobs = self.jobs_with_status(RenderStatus.RUNNING)
for job in running_jobs:
self.cancel_job(job)
self.save_state()
if self.session:
self.session.close()
# --------------------------------------------
# Renderer Availability:
# --------------------------------------------
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.engine_name for x in cls.running_jobs()]
def renderer_instances(self) -> Counter:
all_instances = [x.engine_name for x in self.running_jobs()]
return Counter(all_instances)
@classmethod
def is_available_for_job(cls, renderer, priority=2):
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances
def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool:
instances = self.renderer_instances()
higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority]
max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs
# --------------------------------------------
# Job Lifecycle Management:
# --------------------------------------------
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None:
logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job)
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
if cls.is_running:
cls.evaluate_queue()
with self._lock:
self.job_queue.append(render_job)
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
self.start_job(render_job)
self.session.add(render_job)
self.save_state()
if self.is_running:
self.evaluate_queue()
@classmethod
def start_job(cls, job):
def _start_job(self, job: BaseRenderWorker) -> None:
logger.info(f'Starting job: {job}')
job.start()
cls.save_state()
self.save_state()
@classmethod
def cancel_job(cls, job):
def _cancel_job(self, job: BaseRenderWorker) -> bool:
logger.info(f'Cancelling job: {job}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
def _delete_job(self, job: BaseRenderWorker) -> bool:
logger.info(f"Deleting job: {job}")
with self._lock:
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
self.job_queue.remove(job)
self.session.delete(job)
self.save_state()
return True
# --------------------------------------------
# Miscellaneous:
# --------------------------------------------
def _clear_history(self) -> None:
for job in list(self.all_jobs()):
if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR):
self.delete_job(job)
self.save_state()
# --- Forwarders for backward compatibility ---
@classmethod
def start(cls):
if cls._default_instance is not None:
cls._default_instance._start()
@classmethod
def evaluate_queue(cls):
if cls._default_instance is not None:
cls._default_instance._evaluate_queue()
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def all_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance.job_queue
return []
@classmethod
def running_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._running_jobs()
return []
@classmethod
def pending_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._pending_jobs()
return []
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
if cls._default_instance is not None:
return cls._default_instance._jobs_with_status(status, priority_sorted)
return []
@classmethod
def job_with_id(cls, job_id, none_ok=False):
if cls._default_instance is not None:
return cls._default_instance._job_with_id(job_id, none_ok)
if not none_ok:
raise JobNotFoundError(job_id)
return None
@classmethod
def job_counts(cls):
if cls._default_instance is not None:
return cls._default_instance._job_counts()
return {}
@classmethod
def load_state(cls, database_directory):
if cls._default_instance is not None:
cls._default_instance._load_state(database_directory)
@classmethod
def save_state(cls):
if cls._default_instance is not None:
cls._default_instance._save_state()
@classmethod
def prepare_for_shutdown(cls):
if cls._default_instance is not None:
cls._default_instance._prepare_for_shutdown()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if cls._default_instance is not None:
return cls._default_instance._is_available_for_job(renderer, priority)
return True
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
if cls._default_instance is not None:
cls._default_instance._add_to_render_queue(render_job, force_start)
@classmethod
def start_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._start_job(job)
@classmethod
def cancel_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._cancel_job(job)
return False
@classmethod
def delete_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._delete_job(job)
return False
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
if cls._default_instance is not None:
cls._default_instance._clear_history()
+34 -15
View File
@@ -1,12 +1,24 @@
import os
from pathlib import Path
from typing import Optional
import yaml
from src.utilities.misc_helper import current_system_os, copy_directory_contents
_CONFIG_ATTRS = [
'upload_folder', 'update_engines_on_launch', 'max_content_path',
'server_log_level', 'log_buffer_length', 'worker_process_timeout',
'flask_log_level', 'flask_debug_enable', 'queue_eval_seconds',
'port_number', 'enable_split_jobs', 'download_timeout_seconds',
]
class Config:
# Initialize class variables with default values
_default_instance: Optional['Config'] = None
# Class-level defaults — mutated by _sync_class() so existing
# callers (Config.upload_folder) continue to work during the
# migration to instance-based access.
upload_folder = "~/zordon-uploads/"
update_engines_on_launch = True
max_content_path = 100000000
@@ -20,23 +32,30 @@ class Config:
enable_split_jobs = True
download_timeout_seconds = 120
@classmethod
def load_config(cls, config_path):
def __init__(self) -> None:
for attr in _CONFIG_ATTRS:
setattr(self, attr, getattr(Config, attr))
def load(self, config_path: Path) -> None:
with open(config_path, 'r') as ymlfile:
cfg = yaml.safe_load(ymlfile)
for attr in _CONFIG_ATTRS:
if attr in cfg:
setattr(self, attr, cfg[attr])
self.upload_folder = str(Path(self.upload_folder).expanduser())
cls.upload_folder = str(Path(cfg.get('upload_folder', cls.upload_folder)).expanduser())
cls.update_engines_on_launch = cfg.get('update_engines_on_launch', cls.update_engines_on_launch)
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)
cls.port_number = cfg.get('port_number', cls.port_number)
cls.enable_split_jobs = cfg.get('enable_split_jobs', cls.enable_split_jobs)
cls.download_timeout_seconds = cfg.get('download_timeout_seconds', cls.download_timeout_seconds)
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
for attr in _CONFIG_ATTRS:
setattr(cls, attr, getattr(cls._default_instance, attr))
@classmethod
def load_config(cls, config_path: Path) -> None:
instance = Config()
instance.load(config_path)
cls._default_instance = instance
cls._sync_class()
@classmethod
def config_dir(cls) -> Path:
+102 -62
View File
@@ -1,5 +1,6 @@
import logging
import socket
from typing import Dict, List, Optional
from pubsub import pub
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
@@ -9,105 +10,144 @@ logger = logging.getLogger()
class ZeroconfServer:
service_type = None
server_name = None
server_port = None
server_ip = None
zeroconf = Zeroconf()
service_info = None
client_cache = {}
properties = {}
_default_instance: Optional['ZeroconfServer'] = None
@classmethod
def configure(cls, service_type, server_name, server_port):
cls.service_type = service_type
cls.server_name = server_name
cls.server_port = server_port
try: # Stop any previously running instances
service_type: Optional[str] = None
server_name: Optional[str] = None
server_port: Optional[int] = None
properties: Dict = {}
def __init__(self) -> None:
self.service_type: Optional[str] = None
self.server_name: Optional[str] = None
self.server_port: Optional[int] = None
self.server_ip: Optional[str] = None
self.zeroconf: Zeroconf = Zeroconf()
self.service_info: Optional[ServiceInfo] = None
self.client_cache: Dict = {}
self.properties: Dict = {}
def _configure(self, service_type: str, server_name: str, server_port: int) -> None:
self.service_type = service_type
self.server_name = server_name
self.server_port = server_port
try:
socket.gethostbyname(socket.gethostname())
except socket.gaierror:
cls.stop()
self.stop()
@classmethod
def start(cls, listen_only=False):
if not cls.service_type:
def _start(self, listen_only: bool = False) -> None:
if not self.service_type:
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
elif not listen_only:
logger.debug(f"Starting zeroconf service")
cls._register_service()
if not listen_only:
logger.debug("Starting zeroconf service")
self._register_service()
else:
logger.debug(f"Starting zeroconf service - Listen only mode")
cls._browse_services()
logger.debug("Starting zeroconf service - Listen only mode")
self._browse_services()
@classmethod
def stop(cls):
def _stop(self) -> None:
logger.debug("Stopping zeroconf service")
cls._unregister_service()
cls.zeroconf.close()
self._unregister_service()
if self.zeroconf:
self.zeroconf.close()
@classmethod
def _register_service(cls):
def _register_service(self) -> None:
try:
cls.server_ip = socket.gethostbyname(socket.gethostname())
self.server_ip = socket.gethostbyname(socket.gethostname())
info = ServiceInfo(
cls.service_type,
f"{cls.server_name}.{cls.service_type}",
addresses=[socket.inet_aton(cls.server_ip)],
port=cls.server_port,
properties=cls.properties,
self.service_type,
f"{self.server_name}.{self.service_type}",
addresses=[socket.inet_aton(self.server_ip)],
port=self.server_port,
properties=self.properties,
)
cls.service_info = info
cls.zeroconf.register_service(info)
logger.info(f"Registered zeroconf service: {cls.service_info.name}")
self.service_info = info
self.zeroconf.register_service(info)
logger.info(f"Registered zeroconf service: {self.service_info.name}")
except (NonUniqueNameException, socket.gaierror) as e:
logger.error(f"Error establishing zeroconf: {e}")
@classmethod
def _unregister_service(cls):
if cls.service_info:
cls.zeroconf.unregister_service(cls.service_info)
logger.info(f"Unregistered zeroconf service: {cls.service_info.name}")
cls.service_info = None
def _unregister_service(self) -> None:
if self.service_info:
self.zeroconf.unregister_service(self.service_info)
logger.info(f"Unregistered zeroconf service: {self.service_info.name}")
self.service_info = None
@classmethod
def _browse_services(cls):
browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered])
browser.is_alive()
def _browse_services(self) -> None:
ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered])
@classmethod
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
def _on_service_discovered(self, zeroconf, service_type, name, state_change) -> None:
try:
info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0]
hostname = name.split(f'.{self.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}")
if service_type == cls.service_type:
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
cls.client_cache[hostname] = info
if service_type == self.service_type:
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
self.client_cache[hostname] = info
else:
cls.client_cache.pop(hostname)
self.client_cache.pop(hostname, None)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
except NotRunningException:
pass
@classmethod
def found_hostnames(cls):
def _sync_class(cls) -> None:
if cls._default_instance is not None:
inst = cls._default_instance
cls.service_type = inst.service_type
cls.server_name = inst.server_name
cls.server_port = inst.server_port
cls.server_ip = inst.server_ip
cls.properties = inst.properties
def _found_hostnames(self) -> List[str]:
local_hostname = socket.gethostname()
def sort_key(hostname):
# Return 0 if it's the local hostname so it comes first, else return 1
return False if hostname == local_hostname else True
# Sort the list with the local hostname first
sorted_hostnames = sorted(cls.client_cache.keys(), key=sort_key)
sorted_hostnames = sorted(self.client_cache.keys(), key=sort_key)
return sorted_hostnames
def _get_hostname_properties(self, hostname: str) -> Dict:
server_info = self.client_cache.get(hostname)
if server_info is None:
return {}
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.properties.items()}
return decoded_server_info
# --- Forwarders for backward compatibility ---
@classmethod
def configure(cls, service_type, server_name, server_port):
if cls._default_instance is not None:
cls._default_instance._configure(service_type, server_name, server_port)
cls._sync_class()
@classmethod
def start(cls, listen_only=False):
if cls._default_instance is not None:
cls._default_instance._start(listen_only)
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def found_hostnames(cls):
if cls._default_instance is not None:
return cls._default_instance._found_hostnames()
return []
@classmethod
def get_hostname_properties(cls, hostname):
server_info = cls.client_cache.get(hostname).properties
decoded_server_info = {key.decode('utf-8'): value.decode('utf-8') for key, value in server_info.items()}
return decoded_server_info
if cls._default_instance is not None:
return cls._default_instance._get_hostname_properties(hostname)
return {}
# Example usage: