import logging import os import zipfile from concurrent.futures import ThreadPoolExecutor import requests from src.api.server_proxy import RenderServerProxy from src.utilities.misc_helper import get_file_size_human from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): success = True try: local_files = [os.path.basename(x) for x in local_job.file_list()] subjob_proxy = RenderServerProxy(subjob_hostname) subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or [] for subjob_filename in subjob_files: if subjob_filename not in local_files: try: logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}") local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename) subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename, save_path=local_save_path) logger.debug(f'Downloaded successfully - {local_save_path}') except Exception as e: logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}") success = False except Exception as e: logger.exception(f'Uncaught exception while trying to download from subjob: {e}') success = False return success def download_all_from_subjob(local_job, subjob_id, subjob_hostname): """ Downloads and extracts files from a completed subjob on a remote server. Parameters: local_job (BaseRenderWorker): The local parent job worker. subjob_id (str or int): The ID of the subjob. subjob_hostname (str): The hostname of the remote server where the subjob is located. Returns: bool: True if the files have been downloaded and extracted successfully, False otherwise. """ child_key = f'{subjob_id}@{subjob_hostname}' logname = f"{local_job.id}:{child_key}" zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' # download zip file from server try: local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.error(f"Error downloading files from remote server: {e}") local_job.children[child_key]['download_status'] = 'failed' return False # extract zip try: logger.debug(f"Extracting zip file: {zip_file_path}") extract_path = os.path.dirname(zip_file_path) with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) local_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.exception(f"Exception extracting zip file: {e}") local_job.children[child_key]['download_status'] = 'failed' return local_job.children[child_key].get('download_status', None) == 'complete' def distribute_server_work(start_frame, end_frame, available_servers, method='evenly'): """ Splits the frame range among available servers proportionally based on their performance (CPU count). Args: start_frame (int): The start frame number of the animation to be rendered. end_frame (int): The end frame number of the animation to be rendered. available_servers (list): A list of available server dictionaries. Each server dictionary should include 'hostname' and 'cpu_count' keys (see find_available_servers). method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count' and 'evenly'. Defaults to 'cpu_benchmark'. Returns: list: A list of server dictionaries where each dictionary includes the frame range and total number of frames to be rendered by the server. """ # Calculate respective frames for each server def divide_frames_by_cpu_count(frame_start, frame_end, servers): total_frames = frame_end - frame_start + 1 total_cpus = sum(server['cpu_count'] for server in servers) frame_ranges = {} current_frame = frame_start allocated_frames = 0 for i, server in enumerate(servers): if i == len(servers) - 1: # if it's the last server # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: num_frames = round((server['cpu_count'] / total_cpus) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 if current_frame <= frame_end_for_server: frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) current_frame = frame_end_for_server + 1 return frame_ranges def divide_frames_by_benchmark(frame_start, frame_end, servers): def fetch_benchmark(server): try: benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}' f'/api/cpu_benchmark').text server['cpu_benchmark'] = benchmark logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}') except requests.exceptions.RequestException as e: logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}') # Number of threads to use (can adjust based on your needs or number of servers) threads = len(servers) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(fetch_benchmark, servers) total_frames = frame_end - frame_start + 1 total_performance = sum(int(server['cpu_benchmark']) for server in servers) frame_ranges = {} current_frame = frame_start allocated_frames = 0 for i, server in enumerate(servers): if i == len(servers) - 1: # if it's the last server # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 if current_frame <= frame_end_for_server: frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) current_frame = frame_end_for_server + 1 return frame_ranges def divide_frames_equally(frame_start, frame_end, servers): frame_range = frame_end - frame_start + 1 frames_per_server = frame_range // len(servers) leftover_frames = frame_range % len(servers) frame_ranges = {} current_start = frame_start for i, server in enumerate(servers): current_end = current_start + frames_per_server - 1 if leftover_frames > 0: current_end += 1 leftover_frames -= 1 if current_start <= current_end: frame_ranges[server['hostname']] = (current_start, current_end) current_start = current_end + 1 return frame_ranges if len(available_servers) == 1: breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)} else: logger.debug(f'Splitting between {len(available_servers)} servers by {method} method') if method == 'evenly': breakdown = divide_frames_equally(start_frame, end_frame, available_servers) elif method == 'cpu_benchmark': breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers) elif method == 'cpu_count': breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) else: raise ValueError(f"Invalid distribution method: {method}") server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] for server in server_breakdown: server['frame_range'] = breakdown[server['hostname']] server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 return server_breakdown