diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index aac7788..68098e1 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -3,10 +3,7 @@ import os import socket import threading import time -import zipfile -from concurrent.futures import ThreadPoolExecutor -import requests from plyer import notification from pubsub import pub @@ -15,7 +12,7 @@ from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.config import Config -from src.utilities.misc_helper import get_file_size_human +from src.utilities.server_helper import download_missing_frames_from_subjob, distribute_server_work from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.zeroconf_server import ZeroconfServer @@ -211,78 +208,10 @@ class DistributedJobManager: if old_status != subjob_status.value: logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") - download_success = cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) + download_success = download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) if subjob_data['status'] == 'completed' and download_success: local_job.children[subjob_key]['download_status'] = 'completed' - @staticmethod - def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): - success = True - try: - local_files = [os.path.basename(x) for x in local_job.file_list()] - subjob_proxy = RenderServerProxy(subjob_hostname) - subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or [] - - for subjob_filename in subjob_files: - if subjob_filename not in local_files: - try: - logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}") - local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename) - subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename, - save_path=local_save_path) - logger.debug(f'Downloaded successfully - {local_save_path}') - except Exception as e: - logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}") - success = False - except Exception as e: - logger.exception(f'Uncaught exception while trying to download from subjob: {e}') - success = False - return success - - @staticmethod - def download_all_from_subjob(local_job, subjob_id, subjob_hostname): - """ - Downloads and extracts files from a completed subjob on a remote server. - - Parameters: - local_job (BaseRenderWorker): The local parent job worker. - subjob_id (str or int): The ID of the subjob. - subjob_hostname (str): The hostname of the remote server where the subjob is located. - - Returns: - bool: True if the files have been downloaded and extracted successfully, False otherwise. - """ - - child_key = f'{subjob_id}@{subjob_hostname}' - logname = f"{local_job.id}:{child_key}" - zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' - - # download zip file from server - try: - local_job.children[child_key]['download_status'] = 'working' - logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") - RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path) - logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") - except Exception as e: - logger.error(f"Error downloading files from remote server: {e}") - local_job.children[child_key]['download_status'] = 'failed' - return False - - # extract zip - try: - logger.debug(f"Extracting zip file: {zip_file_path}") - extract_path = os.path.dirname(zip_file_path) - with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: - zip_ref.extractall(extract_path) - logger.info(f"Successfully extracted zip to: {extract_path}") - os.remove(zip_file_path) - local_job.children[child_key]['download_status'] = 'complete' - except Exception as e: - logger.exception(f"Exception extracting zip file: {e}") - local_job.children[child_key]['download_status'] = 'failed' - - return local_job.children[child_key].get('download_status', None) == 'complete' - @classmethod def wait_for_subjobs(cls, parent_job): logger.debug(f"Waiting for subjobs for job {parent_job}") @@ -326,7 +255,7 @@ class DistributedJobManager: # Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data['file_count'] and status in statuses_to_download: try: - cls.download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) + download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) parent_job.children[child_key]['download_status'] = 'complete' except Exception as e: logger.error(f"Error downloading missing frames from subjob: {e}") @@ -385,7 +314,7 @@ class DistributedJobManager: return logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") - all_subjob_server_data = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) + all_subjob_server_data = distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) # Prep and submit these sub-jobs logger.info(f"Job {parent_worker.id} split plan: {all_subjob_server_data}") @@ -430,126 +359,6 @@ class DistributedJobManager: # Server Handling # -------------------------------------------- - @staticmethod - def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'): - """ - Splits the frame range among available servers proportionally based on their performance (CPU count). - - Args: - start_frame (int): The start frame number of the animation to be rendered. - end_frame (int): The end frame number of the animation to be rendered. - available_servers (list): A list of available server dictionaries. Each server dictionary should include - 'hostname' and 'cpu_count' keys (see find_available_servers). - method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count' - and 'evenly'. - Defaults to 'cpu_benchmark'. - - Returns: - list: A list of server dictionaries where each dictionary includes the frame range and total number of - frames to be rendered by the server. - """ - # Calculate respective frames for each server - def divide_frames_by_cpu_count(frame_start, frame_end, servers): - total_frames = frame_end - frame_start + 1 - total_cpus = sum(server['cpu_count'] for server in servers) - - frame_ranges = {} - current_frame = frame_start - allocated_frames = 0 - - for i, server in enumerate(servers): - if i == len(servers) - 1: # if it's the last server - # Give all remaining frames to the last server - num_frames = total_frames - allocated_frames - else: - num_frames = round((server['cpu_count'] / total_cpus) * total_frames) - allocated_frames += num_frames - - frame_end_for_server = current_frame + num_frames - 1 - - if current_frame <= frame_end_for_server: - frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) - current_frame = frame_end_for_server + 1 - - return frame_ranges - - def divide_frames_by_benchmark(frame_start, frame_end, servers): - - def fetch_benchmark(server): - try: - benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}' - f'/api/cpu_benchmark').text - server['cpu_benchmark'] = benchmark - logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}') - except requests.exceptions.RequestException as e: - logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}') - - # Number of threads to use (can adjust based on your needs or number of servers) - threads = len(servers) - - with ThreadPoolExecutor(max_workers=threads) as executor: - executor.map(fetch_benchmark, servers) - - total_frames = frame_end - frame_start + 1 - total_performance = sum(int(server['cpu_benchmark']) for server in servers) - - frame_ranges = {} - current_frame = frame_start - allocated_frames = 0 - - for i, server in enumerate(servers): - if i == len(servers) - 1: # if it's the last server - # Give all remaining frames to the last server - num_frames = total_frames - allocated_frames - else: - num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames) - allocated_frames += num_frames - - frame_end_for_server = current_frame + num_frames - 1 - - if current_frame <= frame_end_for_server: - frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) - current_frame = frame_end_for_server + 1 - - return frame_ranges - - def divide_frames_equally(frame_start, frame_end, servers): - frame_range = frame_end - frame_start + 1 - frames_per_server = frame_range // len(servers) - leftover_frames = frame_range % len(servers) - - frame_ranges = {} - current_start = frame_start - for i, server in enumerate(servers): - current_end = current_start + frames_per_server - 1 - if leftover_frames > 0: - current_end += 1 - leftover_frames -= 1 - if current_start <= current_end: - frame_ranges[server['hostname']] = (current_start, current_end) - current_start = current_end + 1 - - return frame_ranges - - if len(available_servers) == 1: - breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)} - else: - logger.debug(f'Splitting between {len(available_servers)} servers by {method} method') - if method == 'evenly': - breakdown = divide_frames_equally(start_frame, end_frame, available_servers) - elif method == 'cpu_benchmark': - breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers) - elif method == 'cpu_count': - breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) - else: - raise ValueError(f"Invalid distribution method: {method}") - - server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] - for server in server_breakdown: - server['frame_range'] = breakdown[server['hostname']] - server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 - return server_breakdown - @staticmethod def find_available_servers(engine_name, system_os=None): """ @@ -579,6 +388,6 @@ if __name__ == '__main__': time.sleep(2) available_servers = DistributedJobManager.find_available_servers('blender') print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}") - # results = DistributedJobManager.distribute_server_work(1, 100, available_servers) + # results = distribute_server_work(1, 100, available_servers) # print(f"RESULTS: {results}") ZeroconfServer.stop() diff --git a/src/utilities/server_helper.py b/src/utilities/server_helper.py index bb37bd7..1040b7a 100644 --- a/src/utilities/server_helper.py +++ b/src/utilities/server_helper.py @@ -1,47 +1,200 @@ import logging import os -import subprocess -import threading +import zipfile +from concurrent.futures import ThreadPoolExecutor -from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame +import requests + +from src.api.server_proxy import RenderServerProxy +from src.utilities.misc_helper import get_file_size_human +from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() -def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_width=320): +def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): + success = True + try: + local_files = [os.path.basename(x) for x in local_job.file_list()] + subjob_proxy = RenderServerProxy(subjob_hostname) + subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or [] - # Simple thread to generate thumbs in background - def generate_thumb_thread(source): - in_progress_path = thumb_video_path + '_IN-PROGRESS' - subprocess.run(['touch', in_progress_path]) - try: - logger.debug(f"Generating video thumbnail for {source}") - generate_thumbnail(source_path=source, dest_path=thumb_video_path, max_width=max_width) - except subprocess.CalledProcessError as err: - logger.error(f"Error generating video thumbnail for {source}: {err}") + for subjob_filename in subjob_files: + if subjob_filename not in local_files: + try: + logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}") + local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename) + subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename, + save_path=local_save_path) + logger.debug(f'Downloaded successfully - {local_save_path}') + except Exception as e: + logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}") + success = False + except Exception as e: + logger.exception(f'Uncaught exception while trying to download from subjob: {e}') + success = False + return success - try: - os.remove(in_progress_path) - except FileNotFoundError: - pass - # Determine best source file to use for thumbs - source_files = job.file_list() or [job.input_path] - if source_files: - video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv'] - image_formats = ['.jpg', '.png', '.exr'] +def download_all_from_subjob(local_job, subjob_id, subjob_hostname): + """ + Downloads and extracts files from a completed subjob on a remote server. - image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in image_formats] - video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in video_formats] + Parameters: + local_job (BaseRenderWorker): The local parent job worker. + subjob_id (str or int): The ID of the subjob. + subjob_hostname (str): The hostname of the remote server where the subjob is located. - if (video_files or image_files) and not os.path.exists(thumb_image_path): + Returns: + bool: True if the files have been downloaded and extracted successfully, False otherwise. + """ + + child_key = f'{subjob_id}@{subjob_hostname}' + logname = f"{local_job.id}:{child_key}" + zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' + + # download zip file from server + try: + local_job.children[child_key]['download_status'] = 'working' + logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") + RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path) + logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") + except Exception as e: + logger.error(f"Error downloading files from remote server: {e}") + local_job.children[child_key]['download_status'] = 'failed' + return False + + # extract zip + try: + logger.debug(f"Extracting zip file: {zip_file_path}") + extract_path = os.path.dirname(zip_file_path) + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_path) + logger.info(f"Successfully extracted zip to: {extract_path}") + os.remove(zip_file_path) + local_job.children[child_key]['download_status'] = 'complete' + except Exception as e: + logger.exception(f"Exception extracting zip file: {e}") + local_job.children[child_key]['download_status'] = 'failed' + + return local_job.children[child_key].get('download_status', None) == 'complete' + + +def distribute_server_work(start_frame, end_frame, available_servers, method='evenly'): + """ + Splits the frame range among available servers proportionally based on their performance (CPU count). + + Args: + start_frame (int): The start frame number of the animation to be rendered. + end_frame (int): The end frame number of the animation to be rendered. + available_servers (list): A list of available server dictionaries. Each server dictionary should include + 'hostname' and 'cpu_count' keys (see find_available_servers). + method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count' + and 'evenly'. + Defaults to 'cpu_benchmark'. + + Returns: + list: A list of server dictionaries where each dictionary includes the frame range and total number of + frames to be rendered by the server. + """ + # Calculate respective frames for each server + def divide_frames_by_cpu_count(frame_start, frame_end, servers): + total_frames = frame_end - frame_start + 1 + total_cpus = sum(server['cpu_count'] for server in servers) + + frame_ranges = {} + current_frame = frame_start + allocated_frames = 0 + + for i, server in enumerate(servers): + if i == len(servers) - 1: # if it's the last server + # Give all remaining frames to the last server + num_frames = total_frames - allocated_frames + else: + num_frames = round((server['cpu_count'] / total_cpus) * total_frames) + allocated_frames += num_frames + + frame_end_for_server = current_frame + num_frames - 1 + + if current_frame <= frame_end_for_server: + frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) + current_frame = frame_end_for_server + 1 + + return frame_ranges + + def divide_frames_by_benchmark(frame_start, frame_end, servers): + + def fetch_benchmark(server): try: - path_of_source = image_files[0] if image_files else video_files[0] - logger.debug(f"Generating image thumbnail for {path_of_source}") - save_first_frame(source_path=path_of_source, dest_path=thumb_image_path, max_width=max_width) - except Exception as e: - logger.error(f"Exception saving first frame: {e}") + benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}' + f'/api/cpu_benchmark').text + server['cpu_benchmark'] = benchmark + logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}') + except requests.exceptions.RequestException as e: + logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}') - if video_files and not os.path.exists(thumb_video_path): - x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],)) - x.start() + # Number of threads to use (can adjust based on your needs or number of servers) + threads = len(servers) + + with ThreadPoolExecutor(max_workers=threads) as executor: + executor.map(fetch_benchmark, servers) + + total_frames = frame_end - frame_start + 1 + total_performance = sum(int(server['cpu_benchmark']) for server in servers) + + frame_ranges = {} + current_frame = frame_start + allocated_frames = 0 + + for i, server in enumerate(servers): + if i == len(servers) - 1: # if it's the last server + # Give all remaining frames to the last server + num_frames = total_frames - allocated_frames + else: + num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames) + allocated_frames += num_frames + + frame_end_for_server = current_frame + num_frames - 1 + + if current_frame <= frame_end_for_server: + frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) + current_frame = frame_end_for_server + 1 + + return frame_ranges + + def divide_frames_equally(frame_start, frame_end, servers): + frame_range = frame_end - frame_start + 1 + frames_per_server = frame_range // len(servers) + leftover_frames = frame_range % len(servers) + + frame_ranges = {} + current_start = frame_start + for i, server in enumerate(servers): + current_end = current_start + frames_per_server - 1 + if leftover_frames > 0: + current_end += 1 + leftover_frames -= 1 + if current_start <= current_end: + frame_ranges[server['hostname']] = (current_start, current_end) + current_start = current_end + 1 + + return frame_ranges + + if len(available_servers) == 1: + breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)} + else: + logger.debug(f'Splitting between {len(available_servers)} servers by {method} method') + if method == 'evenly': + breakdown = divide_frames_equally(start_frame, end_frame, available_servers) + elif method == 'cpu_benchmark': + breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers) + elif method == 'cpu_count': + breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) + else: + raise ValueError(f"Invalid distribution method: {method}") + + server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] + for server in server_breakdown: + server['frame_range'] = breakdown[server['hostname']] + server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 + return server_breakdown