From 34fbdaa4d96040b10dee6fdee4a5926d4d58c7e6 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Fri, 30 Jun 2023 19:49:57 -0500 Subject: [PATCH] Refactor: DistributedJobManager with pub/sub status change notifications (#25) * Add pubsub to render_queue and base_worker * Refactor: Convert ZeroconfServer to Singleton with Class Methods * New API for subjob servers to notify parent job servers of status changes * Refactor: Move all subjob related methods to distributed_job_manager.py * Rewrite for wait_for_subjobs * Fix: DistributedJobManager.find_available_servers() takes 1 positional argument but 3 were given * DistributedJobManager should now notify / be notified abotu background job changes * Fix the make_ready api. Change children keyname to be id@hostname so it can be unique * Fixes * Image sequence to movie needs to find the actual start frame * Fix: subjob_status_change did not return a valid response * Fix client renderer selection * Small fix for subjob status checking * Fix issue with divide_frames_equally * Fix issue where downloads were not occurring * Fix issue where old status was being reported * Add docstrings and code cleanup --- dashboard.py | 8 +- lib/client/dashboard_window.py | 6 +- lib/client/new_job_window.py | 5 +- lib/distributed_job_manager.py | 338 +++++++++++++++++++++++++++++++++ lib/render_queue.py | 12 +- lib/server/api_server.py | 130 +++++-------- lib/server/server_proxy.py | 8 +- lib/server/zeroconf_server.py | 96 ++++++---- lib/utilities/misc_helper.py | 17 ++ lib/utilities/server_helper.py | 72 ------- lib/workers/base_worker.py | 56 ++---- lib/workers/blender_worker.py | 10 +- 12 files changed, 503 insertions(+), 255 deletions(-) create mode 100644 lib/distributed_job_manager.py diff --git a/dashboard.py b/dashboard.py index 83a58ba..703a5d4 100755 --- a/dashboard.py +++ b/dashboard.py @@ -82,9 +82,9 @@ def create_node_tree(all_server_data) -> Tree: node_tree.add(Tree(stats_text)) - running_jobs = [job for job in server_data['jobs'] if job['status'] == 'running'] - not_started = [job for job in server_data['jobs'] if job['status'] == 'not_started'] - scheduled = [job for job in server_data['jobs'] if job['status'] == 'scheduled'] + running_jobs = [job for job in server_data['jobs'] if job['status'] == RenderStatus.RUNNING.value] + not_started = [job for job in server_data['jobs'] if job['status'] == RenderStatus.NOT_STARTED.value] + scheduled = [job for job in server_data['jobs'] if job['status'] == RenderStatus.SCHEDULED.value] jobs_to_display = running_jobs + not_started + scheduled jobs_tree = Tree(f"Running: [green]{len(running_jobs)} [default]| Queued: [cyan]{len(not_started)}" @@ -93,7 +93,7 @@ def create_node_tree(all_server_data) -> Tree: for job in jobs_to_display: renderer = f"{renderer_colors[job['renderer']]}{job['renderer']}[default]" filename = os.path.basename(job['input_path']).split('.')[0] - if job['status'] == 'running': + if job['status'] == RenderStatus.RUNNING.value: jobs_tree.add(f"[bold]{renderer} {filename} ({job['id']}) - {status_string_to_color(job['status'])}{(float(job['percent_complete']) * 100):.1f}%") else: jobs_tree.add(f"{filename} ({job['id']}) - {status_string_to_color(job['status'])}{job['status'].title()}") diff --git a/lib/client/dashboard_window.py b/lib/client/dashboard_window.py index 30fe991..34a3268 100644 --- a/lib/client/dashboard_window.py +++ b/lib/client/dashboard_window.py @@ -44,8 +44,8 @@ class DashboardWindow: self.added_hostnames = [] # Setup zeroconf - self.zeroconf = ZeroconfServer("_zordon._tcp.local.", socket.gethostname(), 8080) - self.zeroconf.start(listen_only=True) + ZeroconfServer.configure("_zordon._tcp.local.", socket.gethostname(), 8080) + ZeroconfServer.start(listen_only=True) # Setup photo preview photo_pad = tk.Frame(self.root, background="gray") @@ -299,7 +299,7 @@ class DashboardWindow: tree.item(item, values=new_values) break - current_servers = list(set(self.zeroconf.found_clients() + self.added_hostnames)) + current_servers = list(set(ZeroconfServer.found_clients() + self.added_hostnames)) for hostname in current_servers: if not self.server_proxies.get(hostname, None): new_proxy = RenderServerProxy(hostname=hostname) diff --git a/lib/client/new_job_window.py b/lib/client/new_job_window.py index ed530a3..46c3c2c 100755 --- a/lib/client/new_job_window.py +++ b/lib/client/new_job_window.py @@ -175,9 +175,8 @@ class NewJobWindow(Frame): self.presets = self.server_proxy.request_data('presets', timeout=3) or {} # update available renders - available_renderers = [x for x in self.renderer_info.keys() if self.renderer_info[x].get('available', False)] - self.renderer_combo['values'] = available_renderers - if available_renderers: + self.renderer_combo['values'] = list(self.renderer_info.keys()) + if self.renderer_info.keys(): self.renderer_combo.current(0) self.refresh_renderer_settings() diff --git a/lib/distributed_job_manager.py b/lib/distributed_job_manager.py new file mode 100644 index 0000000..cc13d68 --- /dev/null +++ b/lib/distributed_job_manager.py @@ -0,0 +1,338 @@ +import logging +import os +import socket +import time +import zipfile + +from pubsub import pub +from lib.render_queue import RenderQueue +from lib.server.server_proxy import RenderServerProxy +from lib.server.zeroconf_server import ZeroconfServer +from lib.utilities.misc_helper import get_file_size_human +from lib.workers.base_worker import RenderStatus, string_to_status + +logger = logging.getLogger() + + +class DistributedJobManager: + + def __init__(self): + pass + + @classmethod + def start(cls): + """ + Subscribes the private class method '__local_job_status_changed' to the 'status_change' pubsub message. + This should be called once, typically during the initialization phase. + """ + pub.subscribe(cls.__local_job_status_changed, 'status_change') + + @classmethod + def __local_job_status_changed(cls, job_id, old_status, new_status): + """ + Responds to the 'status_change' pubsub message for local jobs. + If it's a child job, it notifies the parent job about the status change. + + Parameters: + job_id (str): The ID of the job that has changed status. + old_status (str): The previous status of the job. + new_status (str): The new (current) status of the job. + + Note: Do not call directly. Instead, call via the 'status_change' pubsub message. + """ + + render_job = RenderQueue.job_with_id(job_id, none_ok=True) + if not render_job: # ignore jobs created but not yet added to queue + return + + logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") + if render_job.parent: # If local job is a subjob from a remote server + parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] + RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job) + + elif render_job.children and new_status == RenderStatus.CANCELLED: + # todo: handle cancelling all the children + pass + + @classmethod + def handle_subjob_status_change(cls, local_job, subjob_data): + """ + Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. + + Parameters: + local_job (BaseRenderWorker): The local parent job worker. + subjob_data (dict): subjob data sent from remote server. + + Returns: + None + """ + + subjob_status = string_to_status(subjob_data['status']) + subjob_id = subjob_data['id'] + subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if + hostname.split('@')[0] == subjob_id), None) + local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data + + logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" + logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") + + # Download complete or partial render jobs + if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \ + subjob_data['file_count']: + download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) + if not download_result: + # todo: handle error + logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") + + if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: + # todo: determine missing frames and schedule new job + pass + + @staticmethod + def download_from_subjob(local_job, subjob_id, subjob_hostname): + """ + Downloads and extracts files from a completed subjob on a remote server. + + Parameters: + local_job (BaseRenderWorker): The local parent job worker. + subjob_id (str or int): The ID of the subjob. + subjob_hostname (str): The hostname of the remote server where the subjob is located. + + Returns: + bool: True if the files have been downloaded and extracted successfully, False otherwise. + """ + + child_key = f'{subjob_id}@{subjob_hostname}' + logname = f"{local_job.id}:{child_key}" + zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip' + + # download zip file from server + try: + local_job.children[child_key]['download_status'] = 'working' + logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") + RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) + logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") + except Exception as e: + logger.exception(f"Exception downloading files from remote server: {e}") + local_job.children[child_key]['download_status'] = 'failed' + return False + + # extract zip + try: + logger.debug(f"Extracting zip file: {zip_file_path}") + extract_path = os.path.dirname(zip_file_path) + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_path) + logger.info(f"Successfully extracted zip to: {extract_path}") + os.remove(zip_file_path) + local_job.children[child_key]['download_status'] = 'complete' + except Exception as e: + logger.exception(f"Exception extracting zip file: {e}") + local_job.children[child_key]['download_status'] = 'failed' + + return local_job.children[child_key].get('download_status', None) == 'complete' + + @classmethod + def wait_for_subjobs(cls, local_job): + logger.debug(f"Waiting for subjobs for job {local_job}") + local_job.status = RenderStatus.WAITING_FOR_SUBJOBS + statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] + + def subjobs_not_downloaded(): + return {k: v for k, v in local_job.children.items() if 'download_status' not in v or + v['download_status'] == 'working' or v['download_status'] is None} + + logger.debug(f'subjobs_not_downloaded: {subjobs_not_downloaded()}') + + while len(subjobs_not_downloaded()): + for child_key, subjob_cached_data in subjobs_not_downloaded().items(): + + subjob_id = child_key.split('@')[0] + subjob_hostname = child_key.split('@')[-1] + + # Fetch info from server and handle failing case + subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) + if not subjob_data: + logger.warning(f"No response from: {subjob_hostname}") + # todo: handle timeout / missing server situations + continue + + # Update parent job cache but keep the download status + download_status = local_job.children[child_key].get('download_status', None) + local_job.children[child_key] = subjob_data + local_job.children[child_key]['download_status'] = download_status + + status = string_to_status(subjob_data.get('status', '')) + status_msg = f"Subjob {child_key} | {status} | " \ + f"{float(subjob_data.get('percent_complete')) * 100.0}%" + logger.debug(status_msg) + + # Still working in another thread - keep waiting + if download_status == 'working': + continue + + # Check if job is finished, but has not had files copied yet over yet + if download_status is None and subjob_data['file_count'] and status in statuses_to_download: + download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) + if not download_result: + logger.error("Failed to download from subjob") + # todo: error handling here + + # Any finished jobs not successfully downloaded at this point are skipped + if local_job.children[child_key].get('download_status', None) is None and \ + status in statuses_to_download: + logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") + local_job.children[child_key]['download_status'] = 'skipped' + + if subjobs_not_downloaded(): + logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " + f"{', '.join(list(subjobs_not_downloaded().keys()))}") + time.sleep(5) + + @classmethod + def split_into_subjobs(cls, worker, job_data, project_path): + + # Check availability + available_servers = cls.find_available_servers(worker.renderer) + subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) + local_hostname = socket.gethostname() + + # Prep and submit these sub-jobs + logger.info(f"Job {worker.id} split plan: {subjob_servers}") + try: + for server_data in subjob_servers: + server_hostname = server_data['hostname'] + if server_hostname != local_hostname: + post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, + server_hostname, worker) + if post_results.ok: + server_data['submission_results'] = post_results.json()[0] + else: + logger.error(f"Failed to create subjob on {server_hostname}") + break + else: + # truncate parent render_job + worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) + worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) + logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") + server_data['submission_results'] = worker.json() + + # check that job posts were all successful. + if not all(d.get('submission_results') is not None for d in subjob_servers): + raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs + + # start subjobs + logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") + for server_data in subjob_servers: + if server_data['hostname'] != local_hostname: + child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" + worker.children[child_key] = server_data['submission_results'] + worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" + + except Exception as e: + # cancel all the subjobs + logger.error(f"Failed to split job into subjobs: {e}") + logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") + # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in + # submission_results.items()] # todo: fix this + + @staticmethod + def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker): + subjob = job_data.copy() + subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" + subjob['parent'] = f"{worker.id}@{local_hostname}" + subjob['start_frame'] = server_data['frame_range'][0] + subjob['end_frame'] = server_data['frame_range'][-1] + logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" + f"{subjob['end_frame']} to {server_hostname}") + post_results = RenderServerProxy(server_hostname).post_job_to_server( + file_path=project_path, job_list=[subjob]) + return post_results + + @staticmethod + def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_count'): + """ + Splits the frame range among available servers proportionally based on their performance (CPU count). + + :param start_frame: int, The start frame number of the animation to be rendered. + :param end_frame: int, The end frame number of the animation to be rendered. + :param available_servers: list, A list of available server dictionaries. Each server dictionary should include + 'hostname' and 'cpu_count' keys (see find_available_servers) + :param method: str, Optional. Specifies the distribution method. Possible values are 'cpu_count' and 'equally' + + + :return: A list of server dictionaries where each dictionary includes the frame range and total number of frames + to be rendered by the server. + """ + + # Calculate respective frames for each server + def divide_frames_by_cpu_count(frame_start, frame_end, servers): + total_frames = frame_end - frame_start + 1 + total_performance = sum(server['cpu_count'] for server in servers) + + frame_ranges = {} + current_frame = frame_start + allocated_frames = 0 + + for i, server in enumerate(servers): + if i == len(servers) - 1: # if it's the last server + # Give all remaining frames to the last server + num_frames = total_frames - allocated_frames + else: + num_frames = round((server['cpu_count'] / total_performance) * total_frames) + allocated_frames += num_frames + + frame_end_for_server = current_frame + num_frames - 1 + + if current_frame <= frame_end_for_server: + frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) + current_frame = frame_end_for_server + 1 + + return frame_ranges + + def divide_frames_equally(frame_start, frame_end, servers): + frame_range = frame_end - frame_start + 1 + frames_per_server = frame_range // len(servers) + leftover_frames = frame_range % len(servers) + + frame_ranges = {} + current_start = frame_start + for i, server in enumerate(servers): + current_end = current_start + frames_per_server - 1 + if leftover_frames > 0: + current_end += 1 + leftover_frames -= 1 + if current_start <= current_end: + frame_ranges[server['hostname']] = (current_start, current_end) + current_start = current_end + 1 + + return frame_ranges + + if method == 'equally': + breakdown = divide_frames_equally(start_frame, end_frame, available_servers) + # elif method == 'benchmark_score': # todo: implement benchmark score + # pass + else: + breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) + + server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] + for server in server_breakdown: + server['frame_range'] = breakdown[server['hostname']] + server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 + return server_breakdown + + @staticmethod + def find_available_servers(renderer): + """ + Scan the Zeroconf network for currently available render servers supporting a specific renderer. + + :param renderer: str, The renderer type to search for + :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers + """ + available_servers = [] + for hostname in ZeroconfServer.found_clients(): + response = RenderServerProxy(hostname).get_status() + if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False): + available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])}) + + return available_servers diff --git a/lib/render_queue.py b/lib/render_queue.py index 99770d8..d710148 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -4,8 +4,8 @@ from datetime import datetime from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from .workers.base_worker import RenderStatus, BaseRenderWorker, Base -from .workers.worker_factory import RenderWorkerFactory +from lib.workers.base_worker import RenderStatus, BaseRenderWorker, Base +from lib.workers.worker_factory import RenderWorkerFactory logger = logging.getLogger() @@ -28,6 +28,14 @@ class RenderQueue: def __init__(self): pass + @classmethod + def start_queue(cls): + cls.load_state() + + @classmethod + def job_status_change(cls, job_id, status): + logger.debug(f"Job status changed: {job_id} -> {status}") + @classmethod def add_to_render_queue(cls, render_job, force_start=False): logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) diff --git a/lib/server/api_server.py b/lib/server/api_server.py index 98ccbfd..e75b7ed 100755 --- a/lib/server/api_server.py +++ b/lib/server/api_server.py @@ -1,9 +1,13 @@ #!/usr/bin/env python3 import json +import logging +import os import pathlib import platform import shutil +import socket import ssl +import threading import time import zipfile from datetime import datetime @@ -11,13 +15,16 @@ from urllib.request import urlretrieve from zipfile import ZipFile import json2html +import psutil import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from werkzeug.utils import secure_filename +from lib.distributed_job_manager import DistributedJobManager from lib.render_queue import RenderQueue, JobNotFoundError +from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer -from lib.utilities.server_helper import * +from lib.utilities.server_helper import generate_thumbnail_for_job from lib.workers.base_worker import string_to_status, RenderStatus from lib.workers.worker_factory import RenderWorkerFactory @@ -157,6 +164,17 @@ def filtered_jobs_json(status_val): return f'Cannot find jobs with status {status_val}', 400 +@server.post('/api/job//notify_parent_of_status_change') +def subjob_status_change(job_id): + try: + subjob_details = request.json + logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") + DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + return Response(status=200) + except JobNotFoundError: + return "Job not found", 404 + + @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return f'Cannot find job with ID {job_error.job_id}', 400 @@ -187,10 +205,12 @@ def get_file_list(job_id): def make_job_ready(job_id): try: found_job = RenderQueue.job_with_id(job_id) - if found_job.status in [RenderStatus.NOT_READY, RenderStatus.NOT_STARTED]: + if found_job.status in [RenderStatus.CONFIGURING, RenderStatus.NOT_STARTED]: if found_job.children: - for hostname, child_id in found_job.children.items(): - RenderServerProxy(hostname).request_data(f'/api/job//make_ready') + for child_key in found_job.children.keys(): + child_id = child_key.split('@')[0] + hostname = child_key.split('@')[-1] + RenderServerProxy(hostname).request_data(f'job/{child_id}/make_ready') found_job.status = RenderStatus.NOT_STARTED RenderQueue.save_state() return found_job.json(), 200 @@ -255,7 +275,7 @@ def snapshot(): @server.get('/api/_detected_clients') def detected_clients(): # todo: dev/debug only. Should not ship this - probably. - return server.config['ZEROCONF_SERVER'].found_clients() + return ZeroconfServer.found_clients() @server.post('/api/add_job') @@ -312,8 +332,9 @@ def add_job_handler(): return "Cannot find any valid project paths", 400 # prep local filepath - job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join( - [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, os.path.splitext(referred_name)[0]])) + cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_') + job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '-'.join( + [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) os.makedirs(job_dir, exist_ok=True) upload_dir = os.path.join(job_dir, 'source') os.makedirs(upload_dir, exist_ok=True) @@ -387,14 +408,15 @@ def add_job_handler(): # determine if we can / should split the job if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent: - create_subjobs(worker, job_data, zip_path or loaded_project_local_path) + DistributedJobManager.split_into_subjobs(worker, job_data, zip_path or loaded_project_local_path) RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) - make_job_ready(worker.id) + if not worker.parent: + make_job_ready(worker.id) results.append(worker.json()) except Exception as e: - err_msg = f"Error creating render job: {e}" - logger.error(err_msg) + err_msg = f"Exception creating render job: {e}" + logger.exception(err_msg) results.append({'error': err_msg}) # return any errors from results list @@ -413,61 +435,6 @@ def add_job_handler(): return 'unknown error', 500 -def create_subjobs(worker, job_data, project_path): - - # Check availablity - local_hostname = server.config['HOSTNAME'] - found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x] - - subjob_servers = find_available_servers(found_servers, worker.renderer, worker.start_frame, worker.end_frame) - - # Prep and submit these sub-jobs - logger.info(f"Job {worker.id} split plan: {subjob_servers}") - submission_results = {} - try: - for server_data in subjob_servers: - server_hostname = server_data['hostname'] - if server_hostname != local_hostname: - subjob = job_data.copy() - subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" - subjob['parent'] = f"{worker.id}@{local_hostname}" - subjob['start_frame'] = server_data['frame_range'][0] - subjob['end_frame'] = server_data['frame_range'][-1] - - logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" - f"{subjob['end_frame']} to {server_hostname}") - post_results = RenderServerProxy(server_hostname).post_job_to_server( - file_path=project_path, job_list=[subjob]) - if post_results.ok: - server_data['submission_results'] = post_results.json()[0] - else: - logger.error(f"Failed to create subjob on {server_hostname}") - break - else: - # truncate parent render_job - worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) - worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) - logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") - server_data['submission_results'] = worker.json() - - # check that job posts were all successful. - if not all(d.get('submission_results') is not None for d in subjob_servers): - raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs - - # start subjobs - logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") - for server_data in subjob_servers: - if server_data['hostname'] != local_hostname: - worker.children[server_data['hostname']] = server_data['submission_results']['id'] - worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" - - except Exception as e: - # cancel all the subjobs - logger.error(f"Failed to split job into subjobs: {e}") - logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") - [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()] - - @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): @@ -534,10 +501,11 @@ def status(): renderer_data = {} for render_class in RenderWorkerFactory.supported_classes(): - renderer_data[render_class.engine.name()] = \ - {'version': render_class.engine.version(), - 'is_ready': RenderQueue.is_available_for_job(render_class.engine.name()) - } + if render_class.engine.renderer_path(): # only return renderers installed on host + renderer_data[render_class.engine.name()] = \ + {'version': render_class.engine.version(), + 'is_available': RenderQueue.is_available_for_job(render_class.engine.name()) + } return {"timestamp": datetime.now().isoformat(), "platform": platform.platform(), @@ -559,12 +527,12 @@ def renderer_info(): renderer_data = {} for r in RenderWorkerFactory.supported_renderers(): engine = RenderWorkerFactory.class_for_name(r).engine - engine_available = engine.renderer_path() is not None - renderer_data[r] = {'available': engine_available, - 'version': engine.version() if engine_available else None, - 'supported_extensions': engine.supported_extensions, - 'supported_export_formats': engine.get_output_formats() if engine_available else None, - 'path': engine.renderer_path()} + if engine.renderer_path(): + renderer_data[r] = {'is_available': RenderQueue.is_available_for_job(engine.name()), + 'version': engine.version(), + 'supported_extensions': engine.supported_extensions, + 'supported_export_formats': engine.get_output_formats(), + 'path': engine.renderer_path()} return renderer_data @@ -602,15 +570,15 @@ def start_server(background_thread=False): flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Set up the RenderQueue object - RenderQueue.load_state() + RenderQueue.start_queue() + DistributedJobManager.start() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") - zeroconf_server = ZeroconfServer("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) - zeroconf_server.start() - server.config['ZEROCONF_SERVER'] = zeroconf_server + ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) + ZeroconfServer.start() try: if background_thread: @@ -623,4 +591,4 @@ def start_server(background_thread=False): use_reloader=False, threaded=True) finally: RenderQueue.save_state() - zeroconf_server.stop() + ZeroconfServer.stop() diff --git a/lib/server/server_proxy.py b/lib/server/server_proxy.py index ec55019..aaff5c4 100644 --- a/lib/server/server_proxy.py +++ b/lib/server/server_proxy.py @@ -10,9 +10,9 @@ from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonito status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green', RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', - RenderStatus.RUNNING: 'cyan'} + RenderStatus.RUNNING: 'cyan', RenderStatus.WAITING_FOR_SUBJOBS: 'blue'} -categories = [RenderStatus.RUNNING, RenderStatus.WAITING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, +categories = [RenderStatus.RUNNING, RenderStatus.WAITING_FOR_SUBJOBS, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] logger = logging.getLogger() @@ -114,6 +114,10 @@ class RenderServerProxy: def get_status(self): return self.request_data('status') + def notify_parent_of_status_change(self, parent_id, subjob): + return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', + json=subjob.json()) + def post_job_to_server(self, file_path, job_list, callback=None): # bypass uploading file if posting to localhost diff --git a/lib/server/zeroconf_server.py b/lib/server/zeroconf_server.py index 80334bf..93014a4 100644 --- a/lib/server/zeroconf_server.py +++ b/lib/server/zeroconf_server.py @@ -6,68 +6,80 @@ from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange logger = logging.getLogger() -class ZeroconfServer(): - def __init__(self, service_type, server_name, server_port): - self.service_type = service_type - self.server_name = server_name - self.server_port = server_port - self.server_ip = None - self.zeroconf = Zeroconf() - self.service_info = None - self.client_cache = {} - self.properties = {} +class ZeroconfServer: + service_type = None + server_name = None + server_port = None + server_ip = None + zeroconf = Zeroconf() + service_info = None + client_cache = {} + properties = {} - def start(self, listen_only=False): + @classmethod + def configure(cls, service_type, server_name, server_port): + cls.service_type = service_type + cls.server_name = server_name + cls.server_port = server_port + + @classmethod + def start(cls, listen_only=False): if not listen_only: - self._register_service() - self._browse_services() + cls._register_service() + cls._browse_services() - def stop(self): - self._unregister_service() - self.zeroconf.close() + @classmethod + def stop(cls): + cls._unregister_service() + cls.zeroconf.close() - def _register_service(self): - self.server_ip = socket.gethostbyname(socket.gethostname()) + @classmethod + def _register_service(cls): + cls.server_ip = socket.gethostbyname(socket.gethostname()) info = ServiceInfo( - self.service_type, - f"{self.server_name}.{self.service_type}", - addresses=[socket.inet_aton(self.server_ip)], - port=self.server_port, - properties=self.properties, + cls.service_type, + f"{cls.server_name}.{cls.service_type}", + addresses=[socket.inet_aton(cls.server_ip)], + port=cls.server_port, + properties=cls.properties, ) - self.service_info = info - self.zeroconf.register_service(info) - logger.info(f"Registered zeroconf service: {self.service_info.name}") + cls.service_info = info + cls.zeroconf.register_service(info) + logger.info(f"Registered zeroconf service: {cls.service_info.name}") - def _unregister_service(self): - if self.service_info: - self.zeroconf.unregister_service(self.service_info) - logger.info(f"Unregistered zeroconf service: {self.service_info.name}") - self.service_info = None + @classmethod + def _unregister_service(cls): + if cls.service_info: + cls.zeroconf.unregister_service(cls.service_info) + logger.info(f"Unregistered zeroconf service: {cls.service_info.name}") + cls.service_info = None - def _browse_services(self): - browser = ServiceBrowser(self.zeroconf, self.service_type, [self._on_service_discovered]) + @classmethod + def _browse_services(cls): + browser = ServiceBrowser(cls.zeroconf, cls.service_type, [cls._on_service_discovered]) - def _on_service_discovered(self, zeroconf, service_type, name, state_change): + @classmethod + def _on_service_discovered(cls, zeroconf, service_type, name, state_change): info = zeroconf.get_service_info(service_type, name) logger.debug(f"Zeroconf: {name} {state_change}") - if service_type == self.service_type: + if service_type == cls.service_type: if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated: - self.client_cache[name] = info + cls.client_cache[name] = info else: - self.client_cache.pop(name) + cls.client_cache.pop(name) - def found_clients(self): - return [x.split(f'.{self.service_type}')[0] for x in self.client_cache.keys()] + @classmethod + def found_clients(cls): + return [x.split(f'.{cls.service_type}')[0] for x in cls.client_cache.keys()] # Example usage: if __name__ == "__main__": - server = ZeroconfServer("_zordon._tcp.local.", "foobar.local", 8080) + ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080) try: - server.start() + ZeroconfServer.start() input("Server running - Press enter to end") finally: - server.stop() + ZeroconfServer.stop() diff --git a/lib/utilities/misc_helper.py b/lib/utilities/misc_helper.py index a5cc0af..15821e1 100644 --- a/lib/utilities/misc_helper.py +++ b/lib/utilities/misc_helper.py @@ -86,3 +86,20 @@ def get_time_elapsed(start_time=None, end_time=None): elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else None return elapsed_time_string + + +def get_file_size_human(file_path): + size_in_bytes = os.path.getsize(file_path) + + # Convert size to a human readable format + if size_in_bytes < 1024: + return f"{size_in_bytes} B" + elif size_in_bytes < 1024 ** 2: + return f"{size_in_bytes / 1024:.2f} KB" + elif size_in_bytes < 1024 ** 3: + return f"{size_in_bytes / 1024 ** 2:.2f} MB" + elif size_in_bytes < 1024 ** 4: + return f"{size_in_bytes / 1024 ** 3:.2f} GB" + else: + return f"{size_in_bytes / 1024 ** 4:.2f} TB" + diff --git a/lib/utilities/server_helper.py b/lib/utilities/server_helper.py index ebad005..7c8bb33 100644 --- a/lib/utilities/server_helper.py +++ b/lib/utilities/server_helper.py @@ -49,75 +49,3 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt if video_files and not os.path.exists(thumb_video_path): x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],)) x.start() - - -def divide_frames_evenly(start_frame, end_frame, num_servers): - frame_range = end_frame - start_frame + 1 - frames_per_server = frame_range // num_servers - leftover_frames = frame_range % num_servers - - ranges = [] - current_start = start_frame - for i in range(num_servers): - current_end = current_start + frames_per_server - 1 - if leftover_frames > 0: - current_end += 1 - leftover_frames -= 1 - if current_start <= current_end: - ranges.append((current_start, current_end)) - current_start = current_end + 1 - - return ranges - - -def divide_frames_by_cpu_count(frame_start, frame_end, servers): - total_frames = frame_end - frame_start + 1 - total_performance = sum(server['cpu_count'] for server in servers) - - frame_ranges = {} - current_frame = frame_start - allocated_frames = 0 - - for i, server in enumerate(servers): - if i == len(servers) - 1: # if it's the last server - # Give all remaining frames to the last server - num_frames = total_frames - allocated_frames - else: - num_frames = round((server['cpu_count'] / total_performance) * total_frames) - allocated_frames += num_frames - - frame_end_for_server = current_frame + num_frames - 1 - - if current_frame <= frame_end_for_server: - frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) - current_frame = frame_end_for_server + 1 - - return frame_ranges - - -def find_available_servers(server_list, renderer, start_frame, end_frame): - local_hostname = socket.gethostname() - subjob_servers = [{'hostname': local_hostname, 'cpu_count': psutil.cpu_count(logical=False)}] - for hostname in server_list: - if hostname != local_hostname: - response = RenderServerProxy(hostname).get_status() - if response and response.get('renderers', {}).get(renderer, {}).get('is_ready', False): - subjob_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])}) - - if len(subjob_servers) == 1: - logger.debug("No available servers to split job with. Skipping subjob creation.") - return subjob_servers - - # Calculate respective frames for each server - breakdown = divide_frames_by_cpu_count(start_frame, end_frame, subjob_servers) - subjob_servers = [server for server in subjob_servers if breakdown.get(server['hostname']) is not None] - for server in subjob_servers: - server['frame_range'] = breakdown[server['hostname']] - server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 - - return subjob_servers - - -if __name__ == "__main__": - found_servers = ['kamino.local', 'deathstar.local'] - print(find_available_servers(found_servers, 'blender', 1, 5)) diff --git a/lib/workers/base_worker.py b/lib/workers/base_worker.py index 5367564..5eb73f0 100644 --- a/lib/workers/base_worker.py +++ b/lib/workers/base_worker.py @@ -5,12 +5,11 @@ import logging import os import subprocess import threading -import time -import zipfile from datetime import datetime from enum import Enum import psutil +from pubsub import pub from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base @@ -27,8 +26,8 @@ class RenderStatus(Enum): CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" - WAITING = "waiting" - NOT_READY = "not_ready" + WAITING_FOR_SUBJOBS = "waiting_for_subjobs" + CONFIGURING = "configuring" UNDEFINED = "undefined" @@ -101,7 +100,7 @@ class BaseRenderWorker(Base): self.end_time = None # History - self.status = RenderStatus.NOT_READY + self.status = RenderStatus.CONFIGURING self.warnings = [] self.errors = [] @@ -120,8 +119,11 @@ class BaseRenderWorker(Base): return self._status @status.setter - def status(self, value): - self._status = value.value + def status(self, new_status): + if self._status != new_status.value: + old_status = self._status + self._status = new_status.value + pub.sendMessage('status_change', job_id=self.id, old_status=old_status, new_status=new_status) @status.getter def status(self): @@ -230,45 +232,9 @@ class BaseRenderWorker(Base): logger.info(message) f.write(message) - from lib.server.server_proxy import RenderServerProxy - - # Wait on children jobs, if necessary if self.children: - self.status = RenderStatus.WAITING - subjobs_still_running = self.children.copy() - while len(subjobs_still_running): - for hostname, job_id in subjobs_still_running.copy().items(): - proxy = RenderServerProxy(hostname) - response = proxy.get_job_info(job_id) - if not response: - logger.warning(f"No response from: {hostname}") - else: - status = string_to_status(response.get('status', '')) - status_msg = f"Subjob {job_id}@{hostname} | Status: {status} | {response.get('percent_complete')}%" - - if status in [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]: - logger.info(f"Downloading completed subjob files from {hostname} to localhost") - try: - zip_file_path = self.output_path + f'_{hostname}_{job_id}.zip' - proxy.get_job_files(job_id, zip_file_path) - logger.debug("Zip file download successfully - Preparing to unzip.") - extract_path = os.path.dirname(zip_file_path) - with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: - zip_ref.extractall(extract_path) - logger.info(f"Successfully extracted zip to: {extract_path}") - os.remove(zip_file_path) - except Exception as e: - err_msg = f"Error transferring output from subjob {job_id}@{hostname}: {e}" - logger.exception(err_msg) - self.errors.append(err_msg) - finally: - subjobs_still_running.pop(hostname) - else: - logger.debug(status_msg) - logger.debug(f"Waiting on {len(subjobs_still_running)} subjobs on {', '.join(list(subjobs_still_running.keys()))}") - time.sleep(5) - - logger.info("All subjobs complete") + from lib.distributed_job_manager import DistributedJobManager + DistributedJobManager.wait_for_subjobs(local_job=self) # Post Render Work logger.debug("Starting post-processing work") diff --git a/lib/workers/blender_worker.py b/lib/workers/blender_worker.py index 17a5062..a6771f2 100644 --- a/lib/workers/blender_worker.py +++ b/lib/workers/blender_worker.py @@ -124,12 +124,20 @@ class BlenderRenderWorker(BaseRenderWorker): output_dir_files = os.listdir(os.path.dirname(self.output_path)) if self.total_frames > 1 and len(output_dir_files) > 1 and not self.parent: logger.info("Generating preview for image sequence") + + # Calculate what the real start frame # is if we have child objects + start_frame = self.start_frame + if self.children: + min_child_frame = min(int(child["start_frame"]) for child in self.children.values()) + start_frame = min(min_child_frame, self.start_frame) + logger.debug(f"Post processing start frame #{start_frame}") + try: pattern = os.path.splitext(self.output_path)[0] + "_%04d" + most_common_extension(output_dir_files) image_sequence_to_video(source_glob_pattern=pattern, output_path=self.output_path + '.mov', framerate=self.scene_info['fps'], - start_frame=self.start_frame) + start_frame=start_frame) logger.info('Successfully generated preview video from image sequence') except Exception as e: logger.error(f'Error generating video from image sequence: {e}')