import logging import os import socket import threading import time from plyer import notification from pubsub import pub from src.api.preview_manager import PreviewManager from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.config import Config from src.utilities.server_helper import download_missing_frames_from_subjob, distribute_server_work from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() class DistributedJobManager: def __init__(self): pass @classmethod def subscribe_to_listener(cls): """ Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. This should be called once, typically during the initialization phase. """ pub.subscribe(cls.__local_job_status_changed, 'status_change') pub.subscribe(cls.__local_job_frame_complete, 'frame_complete') @classmethod def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): """ Responds to the 'frame_complete' pubsub message for local jobs. Args: job_id (str): The ID of the job that has changed status. old_status (str): The previous status of the job. new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. """ render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: # ignore jobs not in the queue return logger.debug(f"Job {job_id} has completed frame #{frame_number}") replace_existing_previews = (frame_number % update_interval) == 0 cls.__job_update_shared(render_job, replace_existing_previews) @classmethod def __job_update_shared(cls, render_job, replace_existing_previews=False): # update previews PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) # notify parent to allow individual frames to be copied instead of waiting until the end if render_job.parent: parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] try: logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}') RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job) except Exception as e: logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): """ Responds to the 'status_change' pubsub message for local jobs. If it's a child job, it notifies the parent job about the status change. Args: job_id (str): The ID of the job that has changed status. old_status (str): The previous status of the job. new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'status_change' pubsub message. """ render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: # ignore jobs created but not yet added to queue return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) # Handle children if render_job.children: if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary for child in render_job.children: child_id, child_hostname = child.split('@') RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) # UI Notifications try: if new_status == RenderStatus.COMPLETED: logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.ERROR: logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.RUNNING: logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', timeout=10 # Display time in seconds ) except Exception as e: logger.debug(f"Unable to show UI notification: {e}") # -------------------------------------------- # Create Job # -------------------------------------------- @classmethod def create_render_job(cls, new_job_attributes, loaded_project_local_path): """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new render job. Args: new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) loaded_project_local_path (str): The local path to the loaded project. Returns: worker: Created job worker """ # get new output path in output_dir output_path = new_job_attributes.get('output_path') if not output_path: loaded_project_filename = os.path.basename(loaded_project_local_path) output_filename = os.path.splitext(loaded_project_filename)[0] else: output_filename = os.path.basename(output_path) # Prepare output path output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output') output_path = os.path.join(output_dir, output_filename) os.makedirs(output_dir, exist_ok=True) logger.debug(f"New job output path: {output_path}") # create & configure jobs worker = EngineManager.create_worker(renderer=new_job_attributes['renderer'], input_path=loaded_project_local_path, output_path=output_path, engine_version=new_job_attributes.get('engine_version'), args=new_job_attributes.get('args', {}), parent=new_job_attributes.get('parent'), name=new_job_attributes.get('name')) worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? worker.priority = int(new_job_attributes.get('priority', worker.priority)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() # determine if we can / should split the job if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED RenderQueue.add_to_render_queue(worker, force_start=new_job_attributes.get('force_start', False)) PreviewManager.update_previews_for_job(worker) return worker # -------------------------------------------- # Handling Subjobs # -------------------------------------------- @classmethod def handle_subjob_update_notification(cls, local_job, subjob_data): """Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. Args: local_job (BaseRenderWorker): The local parent job worker. subjob_data (dict): Subjob data sent from the remote server. """ subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = subjob_data['hostname'] subjob_key = f'{subjob_id}@{subjob_hostname}' old_status = local_job.children.get(subjob_key, {}).get('status') local_job.children[subjob_key] = subjob_data logname = f"" if old_status != subjob_status.value: logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") download_success = download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' @classmethod def wait_for_subjobs(cls, parent_job): """Check the status of subjobs and waits until they are all finished. Download rendered frames from subjobs when they are completed. Args: parent_job: Worker object that has child jobs Returns: """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] def subjobs_not_downloaded(): return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or v['download_status'] == 'working' or v['download_status'] is None} logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {parent_job.id}') server_delay = 10 sleep_counter = 0 while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: if sleep_counter % server_delay == 0: # only ping servers every x seconds for child_key, subjob_cached_data in subjobs_not_downloaded().items(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] # Fetch info from server and handle failing case subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from {subjob_hostname}") # timeout / missing server situations parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' continue # Update parent job cache but keep the download status download_status = parent_job.children[child_key].get('download_status', None) parent_job.children[child_key] = subjob_data parent_job.children[child_key]['download_status'] = download_status status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) # Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data['file_count'] and status in statuses_to_download: try: download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.error(f"Error downloading missing frames from subjob: {e}") parent_job.children[child_key]['download_status'] = 'error: {}' # Any finished jobs not successfully downloaded at this point are skipped if parent_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") parent_job.children[child_key]['download_status'] = 'skipped' if subjobs_not_downloaded(): logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(1) sleep_counter += 1 else: # exit the loop parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs # -------------------------------------------- @classmethod def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): # todo: I don't love this parent_worker.status = RenderStatus.CONFIGURING cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, project_path, system_os)) cls.background_worker.start() @classmethod def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): """ Splits a job into subjobs and distributes them among available servers. This method checks the availability of servers, distributes the work among them, and creates subjobs on each server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a subjob. Args: parent_worker (Worker): The parent job what we're creating the subjobs for. new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) project_path (str): The path to the project. system_os (str, optional): Required OS. Default is any. specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. """ # Check availability available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, system_os) # skip if theres no external servers found external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] if not external_servers: parent_worker.status = RenderStatus.NOT_STARTED return logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) # Prep and submit these sub-jobs logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") # save child info submission_results = post_results.json()[0] child_key = f"{submission_results['id']}@{subjob_hostname}" parent_worker.children[child_key] = submission_results # start subjobs logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") parent_worker.name = f"{parent_worker.name} (Parent)" parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod def __create_subjob(new_job_attributes, project_path, server_data, server_hostname, parent_worker): """Convenience method to create subjobs for a parent worker""" subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['start_frame'] = server_data['frame_range'][0] subjob['end_frame'] = server_data['frame_range'][-1] subjob['engine_version'] = parent_worker.renderer_version logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_list=[subjob]) return post_results # -------------------------------------------- # Server Handling # -------------------------------------------- @staticmethod def find_available_servers(engine_name, system_os=None): """ Scan the Zeroconf network for currently available render servers supporting a specific engine. :param engine_name: str, The engine type to search for :param system_os: str, Restrict results to servers running a specific OS :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers """ available_servers = [] for hostname in ZeroconfServer.found_hostnames(): host_properties = ZeroconfServer.get_hostname_properties(hostname) if not system_os or (system_os and system_os == host_properties.get('system_os')): response = RenderServerProxy(hostname).is_engine_available(engine_name) if response and response.get('available', False): available_servers.append(response) return available_servers if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') ZeroconfServer.configure("_zordon._tcp.local.", 'testing', 8080) ZeroconfServer.start(listen_only=True) print("Starting Zeroconf...") time.sleep(2) available_servers = DistributedJobManager.find_available_servers('blender') print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}") # results = distribute_server_work(1, 100, available_servers) # print(f"RESULTS: {results}") ZeroconfServer.stop()