import logging import os import socket import threading import time from typing import Optional from pathlib import Path from plyer import notification from pubsub import pub from src.api.preview_manager import PreviewManager from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.config import Config from src.utilities.server_helper import download_missing_frames_from_subjob, distribute_server_work from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() class DistributedJobManager: _default_instance: Optional['DistributedJobManager'] = None @classmethod def _sync_class(cls) -> None: if cls._default_instance is not None: pass # no class-level attributes to sync def __init__(self) -> None: self.background_worker: Optional[threading.Thread] = None def _subscribe_to_listener(self) -> None: pub.subscribe(self._local_job_status_changed, 'status_change') pub.subscribe(self._local_job_frame_complete, 'frame_complete') def _local_job_frame_complete(self, job_id, frame_number, update_interval=5) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: return logger.debug(f"Job {job_id} has completed frame #{frame_number}") replace_existing_previews = (frame_number % update_interval) == 0 self._job_update_shared(render_job, replace_existing_previews) def _job_update_shared(self, render_job, replace_existing_previews=False) -> None: PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) if render_job.parent: parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] try: logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}') RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job) except Exception as e: logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") self._job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) if render_job.children: if new_status in (RenderStatus.CANCELLED, RenderStatus.ERROR): for child in render_job.children: child_id, child_hostname = child.split('@') RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) try: if new_status == RenderStatus.COMPLETED: logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', timeout=10 ) elif new_status == RenderStatus.ERROR: logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', timeout=10 ) elif new_status == RenderStatus.RUNNING: logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', timeout=10 ) except Exception as e: logger.debug(f"Unable to show UI notification: {e}") # -------------------------------------------- # Create Job # -------------------------------------------- def _create_render_job(self, new_job_attributes: dict, loaded_project_local_path: Path): output_path = new_job_attributes.get('output_path') output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem output_dir = loaded_project_local_path.parent.parent / "output" output_path = output_dir / output_filename os.makedirs(output_dir, exist_ok=True) logger.debug(f"New job output path: {output_path}") worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'], input_path=loaded_project_local_path, output_path=output_path, engine_version=new_job_attributes.get('engine_version'), args=new_job_attributes.get('args', {}), parent=new_job_attributes.get('parent'), name=new_job_attributes.get('name')) worker.status = new_job_attributes.get("initial_status", worker.status) worker.priority = int(new_job_attributes.get('priority', worker.priority)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: self.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED RenderQueue.add_to_render_queue(worker, force_start=new_job_attributes.get('force_start', False)) PreviewManager.update_previews_for_job(worker) return worker # -------------------------------------------- # Handling Subjobs # -------------------------------------------- def _handle_subjob_update_notification(self, local_job, subjob_data: dict) -> None: subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = subjob_data['hostname'] subjob_key = f'{subjob_id}@{subjob_hostname}' old_status = local_job.children.get(subjob_key, {}).get('status') local_job.children[subjob_key] = subjob_data logname = f"" if old_status != subjob_status.value: logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") download_success = download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' def _wait_for_subjobs(self, parent_job) -> None: logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = (RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED) def subjobs_not_downloaded(): return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or v['download_status'] == 'working' or v['download_status'] is None} logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {parent_job.id}') server_delay = 10 sleep_counter = 0 while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: if sleep_counter % server_delay == 0: for child_key in subjobs_not_downloaded(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from {subjob_hostname}") parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' continue download_status = parent_job.children[child_key].get('download_status', None) parent_job.children[child_key] = subjob_data parent_job.children[child_key]['download_status'] = download_status status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) if download_status is None and subjob_data.get('file_count') and status in statuses_to_download: try: download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.error(f"Error downloading missing frames from subjob: {e}") parent_job.children[child_key]['download_status'] = 'error: {}' if parent_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") parent_job.children[child_key]['download_status'] = 'skipped' if subjobs_not_downloaded(): logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(1) sleep_counter += 1 else: parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs # -------------------------------------------- def _split_into_subjobs_async(self, parent_worker, new_job_attributes, project_path, system_os=None) -> None: parent_worker.status = RenderStatus.CONFIGURING self.background_worker = threading.Thread(target=self.split_into_subjobs, args=( parent_worker, new_job_attributes, project_path, system_os)) self.background_worker.start() def split_into_subjobs(self, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None) -> None: available_servers = specific_servers if specific_servers else self.find_available_servers( parent_worker.engine_name, system_os) external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] if not external_servers: parent_worker.status = RenderStatus.NOT_STARTED return logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] post_results = self._create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") submission_results = post_results.json()[0] child_key = f"{submission_results['id']}@{subjob_hostname}" parent_worker.children[child_key] = submission_results logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") parent_worker.name = f"{parent_worker.name} (Parent)" parent_worker.status = RenderStatus.NOT_STARTED except Exception as e: logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod def _create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname, parent_worker): subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['start_frame'] = server_data['frame_range'][0] subjob['end_frame'] = server_data['frame_range'][-1] subjob['engine_version'] = parent_worker.engine_version logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_data=subjob) return post_results # -------------------------------------------- # Server Handling # -------------------------------------------- @staticmethod def find_available_servers(engine_name: str, system_os=None): from api.api_server import API_VERSION found_available_servers = [] for hostname in ZeroconfServer.found_hostnames(): host_properties = ZeroconfServer.get_hostname_properties(hostname) if host_properties.get('api_version') == API_VERSION: if not system_os or (system_os and system_os == host_properties.get('system_os')): response = RenderServerProxy(hostname).is_engine_available(engine_name) if response and response.get('available', False): found_available_servers.append(response) return found_available_servers # --- Forwarders for backward compatibility --- @classmethod def subscribe_to_listener(cls): if cls._default_instance is not None: cls._default_instance._subscribe_to_listener() @classmethod def create_render_job(cls, new_job_attributes, loaded_project_local_path): if cls._default_instance is not None: return cls._default_instance._create_render_job(new_job_attributes, loaded_project_local_path) raise RuntimeError("DistributedJobManager is not initialized") @classmethod def handle_subjob_update_notification(cls, local_job, subjob_data): if cls._default_instance is not None: cls._default_instance._handle_subjob_update_notification(local_job, subjob_data) @classmethod def wait_for_subjobs(cls, parent_job): if cls._default_instance is not None: cls._default_instance._wait_for_subjobs(parent_job) @classmethod def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): if cls._default_instance is not None: cls._default_instance._split_into_subjobs_async(parent_worker, new_job_attributes, project_path, system_os) if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') ZeroconfServer.configure("_zordon._tcp.local.", 'testing', 8080) ZeroconfServer.start(listen_only=True) print("Starting Zeroconf...") time.sleep(2) available_servers = DistributedJobManager.find_available_servers('blender') print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}") # results = distribute_server_work(1, 100, available_servers) # print(f"RESULTS: {results}") ZeroconfServer.stop()