import logging import os import socket import threading import time import zipfile from concurrent.futures import ThreadPoolExecutor import requests from plyer import notification from pubsub import pub from src.api.preview_manager import PreviewManager from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.config import Config from src.utilities.misc_helper import get_file_size_human from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() class DistributedJobManager: def __init__(self): pass @classmethod def subscribe_to_listener(cls): """ Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. This should be called once, typically during the initialization phase. """ pub.subscribe(cls.__local_job_status_changed, 'status_change') pub.subscribe(cls.__local_job_frame_complete, 'frame_complete') @classmethod def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): """ Responds to the 'frame_complete' pubsub message for local jobs. Parameters: job_id (str): The ID of the job that has changed status. old_status (str): The previous status of the job. new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. """ render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: # ignore jobs not in the queue return logger.debug(f"Job {job_id} has completed frame #{frame_number}") replace_existing_previews = (frame_number % update_interval) == 0 cls.__job_update_shared(render_job, replace_existing_previews) @classmethod def __job_update_shared(cls, render_job, replace_existing_previews=False): # update previews PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) # notify parent to allow individual frames to be copied instead of waiting until the end if render_job.parent: parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] try: logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}') RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job) except Exception as e: logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): """ Responds to the 'status_change' pubsub message for local jobs. If it's a child job, it notifies the parent job about the status change. Parameters: job_id (str): The ID of the job that has changed status. old_status (str): The previous status of the job. new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'status_change' pubsub message. """ render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: # ignore jobs created but not yet added to queue return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) # Handle children if render_job.children: if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary for child in render_job.children: child_id, child_hostname = child.split('@') RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) # UI Notifications try: if new_status == RenderStatus.COMPLETED: logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.ERROR: logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.RUNNING: logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', timeout=10 # Display time in seconds ) except Exception as e: logger.debug(f"Unable to show UI notification: {e}") # -------------------------------------------- # Create Job # -------------------------------------------- @classmethod def create_render_job(cls, job_data, loaded_project_local_path): """ Creates render jobs. This method job data and a local path to a loaded project. It creates and returns new a render job. Args: job_data (dict): Job data. loaded_project_local_path (str): The local path to the loaded project. Returns: worker: Created job worker """ # get new output path in output_dir output_path = job_data.get('output_path') if not output_path: loaded_project_filename = os.path.basename(loaded_project_local_path) output_filename = os.path.splitext(loaded_project_filename)[0] else: output_filename = os.path.basename(output_path) # Prepare output path output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output') output_path = os.path.join(output_dir, output_filename) os.makedirs(output_dir, exist_ok=True) logger.debug(f"New job output path: {output_path}") # create & configure jobs worker = EngineManager.create_worker(renderer=job_data['renderer'], input_path=loaded_project_local_path, output_path=output_path, engine_version=job_data.get('engine_version'), args=job_data.get('args', {}), parent=job_data.get('parent'), name=job_data.get('name')) worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary? worker.priority = int(job_data.get('priority', worker.priority)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() # determine if we can / should split the job if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) PreviewManager.update_previews_for_job(worker) return worker # -------------------------------------------- # Handling Subjobs # -------------------------------------------- @classmethod def handle_subjob_update_notification(cls, local_job, subjob_data): """ Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. Args: local_job (BaseRenderWorker): The local parent job worker. subjob_data (dict): Subjob data sent from the remote server. """ subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = subjob_data['hostname'] subjob_key = f'{subjob_id}@{subjob_hostname}' old_status = local_job.children.get(subjob_key, {}).get('status') local_job.children[subjob_key] = subjob_data logname = f"" if old_status != subjob_status.value: logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") download_success = cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' @staticmethod def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): success = True try: local_files = [os.path.basename(x) for x in local_job.file_list()] subjob_proxy = RenderServerProxy(subjob_hostname) subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or [] for subjob_filename in subjob_files: if subjob_filename not in local_files: try: logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}") local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename) subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename, save_path=local_save_path) logger.debug(f'Downloaded successfully - {local_save_path}') except Exception as e: logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}") success = False except Exception as e: logger.exception(f'Uncaught exception while trying to download from subjob: {e}') success = False return success @staticmethod def download_all_from_subjob(local_job, subjob_id, subjob_hostname): """ Downloads and extracts files from a completed subjob on a remote server. Parameters: local_job (BaseRenderWorker): The local parent job worker. subjob_id (str or int): The ID of the subjob. subjob_hostname (str): The hostname of the remote server where the subjob is located. Returns: bool: True if the files have been downloaded and extracted successfully, False otherwise. """ child_key = f'{subjob_id}@{subjob_hostname}' logname = f"{local_job.id}:{child_key}" zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' # download zip file from server try: local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.error(f"Error downloading files from remote server: {e}") local_job.children[child_key]['download_status'] = 'failed' return False # extract zip try: logger.debug(f"Extracting zip file: {zip_file_path}") extract_path = os.path.dirname(zip_file_path) with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) local_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.exception(f"Exception extracting zip file: {e}") local_job.children[child_key]['download_status'] = 'failed' return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod def wait_for_subjobs(cls, parent_job): logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] def subjobs_not_downloaded(): return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or v['download_status'] == 'working' or v['download_status'] is None} logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {parent_job.id}') server_delay = 10 sleep_counter = 0 while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: if sleep_counter % server_delay == 0: # only ping servers every x seconds for child_key, subjob_cached_data in subjobs_not_downloaded().items(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] # Fetch info from server and handle failing case subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from {subjob_hostname}") # timeout / missing server situations parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' continue # Update parent job cache but keep the download status download_status = parent_job.children[child_key].get('download_status', None) parent_job.children[child_key] = subjob_data parent_job.children[child_key]['download_status'] = download_status status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) # Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data['file_count'] and status in statuses_to_download: try: cls.download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.error(f"Error downloading missing frames from subjob: {e}") parent_job.children[child_key]['download_status'] = 'error: {}' # Any finished jobs not successfully downloaded at this point are skipped if parent_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") parent_job.children[child_key]['download_status'] = 'skipped' if subjobs_not_downloaded(): logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(1) sleep_counter += 1 else: # exit the loop parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs # -------------------------------------------- @classmethod def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None): # todo: I don't love this parent_worker.status = RenderStatus.CONFIGURING cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data, project_path, system_os)) cls.background_worker.start() @classmethod def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None): """ Splits a job into subjobs and distributes them among available servers. This method checks the availability of servers, distributes the work among them, and creates subjobs on each server. If a server is the local host, it adjusts the frame range of the parent job instead of creating a subjob. Args: parent_worker (Worker): The worker that is handling the job. job_data (dict): The data for the job to be split. project_path (str): The path to the project associated with the job. system_os (str, optional): The operating system of the servers. Default is any OS. specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. """ # Check availability available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, system_os) # skip if theres no external servers found external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] if not external_servers: parent_worker.status = RenderStatus.NOT_STARTED return logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) # Prep and submit these sub-jobs logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") # save child info submission_results = post_results.json()[0] child_key = f"{submission_results['id']}@{subjob_hostname}" parent_worker.children[child_key] = submission_results # start subjobs logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") parent_worker.name = f"{parent_worker.name} (Parent)" parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(all_subjob_server_data) - 1} attempted subjobs") RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod def __create_subjob(job_data, project_path, server_data, server_hostname, parent_worker): subjob = job_data.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['start_frame'] = server_data['frame_range'][0] subjob['end_frame'] = server_data['frame_range'][-1] subjob['engine_version'] = parent_worker.renderer_version logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_list=[subjob]) return post_results # -------------------------------------------- # Server Handling # -------------------------------------------- @staticmethod def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'): """ Splits the frame range among available servers proportionally based on their performance (CPU count). Args: start_frame (int): The start frame number of the animation to be rendered. end_frame (int): The end frame number of the animation to be rendered. available_servers (list): A list of available server dictionaries. Each server dictionary should include 'hostname' and 'cpu_count' keys (see find_available_servers). method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count' and 'evenly'. Defaults to 'cpu_benchmark'. Returns: list: A list of server dictionaries where each dictionary includes the frame range and total number of frames to be rendered by the server. """ # Calculate respective frames for each server def divide_frames_by_cpu_count(frame_start, frame_end, servers): total_frames = frame_end - frame_start + 1 total_cpus = sum(server['cpu_count'] for server in servers) frame_ranges = {} current_frame = frame_start allocated_frames = 0 for i, server in enumerate(servers): if i == len(servers) - 1: # if it's the last server # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: num_frames = round((server['cpu_count'] / total_cpus) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 if current_frame <= frame_end_for_server: frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) current_frame = frame_end_for_server + 1 return frame_ranges def divide_frames_by_benchmark(frame_start, frame_end, servers): def fetch_benchmark(server): try: benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}' f'/api/cpu_benchmark').text server['cpu_benchmark'] = benchmark logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}') except requests.exceptions.RequestException as e: logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}') # Number of threads to use (can adjust based on your needs or number of servers) threads = len(servers) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(fetch_benchmark, servers) total_frames = frame_end - frame_start + 1 total_performance = sum(int(server['cpu_benchmark']) for server in servers) frame_ranges = {} current_frame = frame_start allocated_frames = 0 for i, server in enumerate(servers): if i == len(servers) - 1: # if it's the last server # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 if current_frame <= frame_end_for_server: frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) current_frame = frame_end_for_server + 1 return frame_ranges def divide_frames_equally(frame_start, frame_end, servers): frame_range = frame_end - frame_start + 1 frames_per_server = frame_range // len(servers) leftover_frames = frame_range % len(servers) frame_ranges = {} current_start = frame_start for i, server in enumerate(servers): current_end = current_start + frames_per_server - 1 if leftover_frames > 0: current_end += 1 leftover_frames -= 1 if current_start <= current_end: frame_ranges[server['hostname']] = (current_start, current_end) current_start = current_end + 1 return frame_ranges if len(available_servers) == 1: breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)} else: logger.debug(f'Splitting between {len(available_servers)} servers by {method} method') if method == 'evenly': breakdown = divide_frames_equally(start_frame, end_frame, available_servers) elif method == 'cpu_benchmark': breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers) elif method == 'cpu_count': breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) else: raise ValueError(f"Invalid distribution method: {method}") server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] for server in server_breakdown: server['frame_range'] = breakdown[server['hostname']] server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 return server_breakdown @staticmethod def find_available_servers(engine_name, system_os=None): """ Scan the Zeroconf network for currently available render servers supporting a specific engine. :param engine_name: str, The engine type to search for :param system_os: str, Restrict results to servers running a specific OS :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers """ available_servers = [] for hostname in ZeroconfServer.found_hostnames(): host_properties = ZeroconfServer.get_hostname_properties(hostname) if not system_os or (system_os and system_os == host_properties.get('system_os')): response = RenderServerProxy(hostname).is_engine_available(engine_name) if response and response.get('available', False): available_servers.append(response) return available_servers if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') ZeroconfServer.configure("_zordon._tcp.local.", 'testing', 8080) ZeroconfServer.start(listen_only=True) print("Starting Zeroconf...") time.sleep(2) available_servers = DistributedJobManager.find_available_servers('blender') print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}") # results = DistributedJobManager.distribute_server_work(1, 100, available_servers) # print(f"RESULTS: {results}") ZeroconfServer.stop()