import logging import os import socket import time import zipfile from pubsub import pub from lib.render_queue import RenderQueue from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer from lib.utilities.misc_helper import get_file_size_human from lib.workers.base_worker import RenderStatus, string_to_status logger = logging.getLogger() class DistributedJobManager: def __init__(self): pass @classmethod def start(cls): """ Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. This should be called once, typically during the initialization phase. """ pub.subscribe(cls.__local_job_status_changed, 'status_change') @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): """ Responds to the 'status_change' pubsub message for local jobs. If it's a child job, it notifies the parent job about the status change. Parameters: job_id (str): The ID of the job that has changed status. old_status (str): The previous status of the job. new_status (str): The new (current) status of the job. Note: Do not call directly. Instead, call via the 'status_change' pubsub message. """ render_job = RenderQueue.job_with_id(job_id, none_ok=True) if not render_job: # ignore jobs created but not yet added to queue return logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") if render_job.parent: # If local job is a subjob from a remote server parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job) elif render_job.children and new_status == RenderStatus.CANCELLED: # todo: handle cancelling all the children pass @classmethod def handle_subjob_status_change(cls, local_job, subjob_data): """ Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. Parameters: local_job (BaseRenderWorker): The local parent job worker. subjob_data (dict): subjob data sent from remote server. Returns: None """ subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if hostname.split('@')[0] == subjob_id), None) local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") # Download complete or partial render jobs if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \ subjob_data['file_count']: download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) if not download_result: # todo: handle error logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: # todo: determine missing frames and schedule new job pass @staticmethod def download_from_subjob(local_job, subjob_id, subjob_hostname): """ Downloads and extracts files from a completed subjob on a remote server. Parameters: local_job (BaseRenderWorker): The local parent job worker. subjob_id (str or int): The ID of the subjob. subjob_hostname (str): The hostname of the remote server where the subjob is located. Returns: bool: True if the files have been downloaded and extracted successfully, False otherwise. """ child_key = f'{subjob_id}@{subjob_hostname}' logname = f"{local_job.id}:{child_key}" zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' # download zip file from server try: local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.exception(f"Exception downloading files from remote server: {e}") local_job.children[child_key]['download_status'] = 'failed' return False # extract zip try: logger.debug(f"Extracting zip file: {zip_file_path}") extract_path = os.path.dirname(zip_file_path) with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) local_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.exception(f"Exception extracting zip file: {e}") local_job.children[child_key]['download_status'] = 'failed' return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod def wait_for_subjobs(cls, local_job): logger.debug(f"Waiting for subjobs for job {local_job}") local_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] def subjobs_not_downloaded(): return {k: v for k, v in local_job.children.items() if 'download_status' not in v or v['download_status'] == 'working' or v['download_status'] is None} logger.debug(f'subjobs_not_downloaded: {subjobs_not_downloaded()}') while len(subjobs_not_downloaded()): for child_key, subjob_cached_data in subjobs_not_downloaded().items(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] # Fetch info from server and handle failing case subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) if not subjob_data: logger.warning(f"No response from: {subjob_hostname}") # todo: handle timeout / missing server situations continue # Update parent job cache but keep the download status download_status = local_job.children[child_key].get('download_status', None) local_job.children[child_key] = subjob_data local_job.children[child_key]['download_status'] = download_status status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) # Still working in another thread - keep waiting if download_status == 'working': continue # Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data['file_count'] and status in statuses_to_download: download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) if not download_result: logger.error("Failed to download from subjob") # todo: error handling here # Any finished jobs not successfully downloaded at this point are skipped if local_job.children[child_key].get('download_status', None) is None and \ status in statuses_to_download: logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") local_job.children[child_key]['download_status'] = 'skipped' if subjobs_not_downloaded(): logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(5) @classmethod def split_into_subjobs(cls, worker, job_data, project_path): # Check availability available_servers = cls.find_available_servers(worker.renderer) subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) local_hostname = socket.gethostname() # Prep and submit these sub-jobs logger.info(f"Job {worker.id} split plan: {subjob_servers}") try: for server_data in subjob_servers: server_hostname = server_data['hostname'] if server_hostname != local_hostname: post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker) if post_results.ok: server_data['submission_results'] = post_results.json()[0] else: logger.error(f"Failed to create subjob on {server_hostname}") break else: # truncate parent render_job worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") server_data['submission_results'] = worker.json() # check that job posts were all successful. if not all(d.get('submission_results') is not None for d in subjob_servers): raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # start subjobs logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") for server_data in subjob_servers: if server_data['hostname'] != local_hostname: child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" worker.children[child_key] = server_data['submission_results'] worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in # submission_results.items()] # todo: fix this @staticmethod def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker): subjob = job_data.copy() subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{worker.id}@{local_hostname}" subjob['start_frame'] = server_data['frame_range'][0] subjob['end_frame'] = server_data['frame_range'][-1] logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_list=[subjob]) return post_results @staticmethod def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_count'): """ Splits the frame range among available servers proportionally based on their performance (CPU count). :param start_frame: int, The start frame number of the animation to be rendered. :param end_frame: int, The end frame number of the animation to be rendered. :param available_servers: list, A list of available server dictionaries. Each server dictionary should include 'hostname' and 'cpu_count' keys (see find_available_servers) :param method: str, Optional. Specifies the distribution method. Possible values are 'cpu_count' and 'equally' :return: A list of server dictionaries where each dictionary includes the frame range and total number of frames to be rendered by the server. """ # Calculate respective frames for each server def divide_frames_by_cpu_count(frame_start, frame_end, servers): total_frames = frame_end - frame_start + 1 total_performance = sum(server['cpu_count'] for server in servers) frame_ranges = {} current_frame = frame_start allocated_frames = 0 for i, server in enumerate(servers): if i == len(servers) - 1: # if it's the last server # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: num_frames = round((server['cpu_count'] / total_performance) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 if current_frame <= frame_end_for_server: frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) current_frame = frame_end_for_server + 1 return frame_ranges def divide_frames_equally(frame_start, frame_end, servers): frame_range = frame_end - frame_start + 1 frames_per_server = frame_range // len(servers) leftover_frames = frame_range % len(servers) frame_ranges = {} current_start = frame_start for i, server in enumerate(servers): current_end = current_start + frames_per_server - 1 if leftover_frames > 0: current_end += 1 leftover_frames -= 1 if current_start <= current_end: frame_ranges[server['hostname']] = (current_start, current_end) current_start = current_end + 1 return frame_ranges if method == 'equally': breakdown = divide_frames_equally(start_frame, end_frame, available_servers) # elif method == 'benchmark_score': # todo: implement benchmark score # pass else: breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] for server in server_breakdown: server['frame_range'] = breakdown[server['hostname']] server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 return server_breakdown @staticmethod def find_available_servers(renderer): """ Scan the Zeroconf network for currently available render servers supporting a specific renderer. :param renderer: str, The renderer type to search for :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers """ available_servers = [] for hostname in ZeroconfServer.found_clients(): response = RenderServerProxy(hostname).get_status() if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False): available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])}) return available_servers