From 8a3e74660cb8ee7b769c649449cdf9de6b813add Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 30 Jul 2024 19:22:38 -0500 Subject: [PATCH] Create subjobs after submission - #54 (#79) * Force start in render queue only starts NOT_STARTED and SCHEDULED jobs * Refactor adding jobs / subjobs * Remove dead code * Fixed issue with bulk job submission * Cancel job now cancels all subjobs * Misc fixes * JSON now returns job hostname * Add hostname as optional column in DB * Misc fixes * Error handling for removing zip file after download * Clean up imports * Fixed issue where worker child information would not be saved --- src/api/add_job_helpers.py | 71 --------------- src/api/api_server.py | 35 +++----- src/api/server_proxy.py | 3 +- src/distributed_job_manager.py | 148 ++++++++++++++++++++++++-------- src/engines/core/base_worker.py | 8 +- src/engines/engine_manager.py | 4 +- src/init.py | 9 +- src/render_queue.py | 2 +- 8 files changed, 138 insertions(+), 142 deletions(-) diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py index fc7d910..755564a 100644 --- a/src/api/add_job_helpers.py +++ b/src/api/add_job_helpers.py @@ -10,10 +10,6 @@ import requests from tqdm import tqdm from werkzeug.utils import secure_filename -from src.distributed_job_manager import DistributedJobManager -from src.engines.engine_manager import EngineManager -from src.render_queue import RenderQueue - logger = logging.getLogger() @@ -153,70 +149,3 @@ def process_zipped_project(zip_path): logger.error(f"Error processing zip file: {e}") raise ValueError(f"Error processing zip file: {e}") return extracted_project_path - - -def create_render_jobs(jobs_list, loaded_project_local_path): - """ - Creates render jobs. - - This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render - job for each job data in the list and appends the result to a list. The list of results is then returned. - - Args: - jobs_list (list): A list of job data. - loaded_project_local_path (str): The local path to the loaded project. - - Returns: - list: A list of results from creating the render jobs. - """ - results = [] - for job_data in jobs_list: - try: - # get new output path in output_dir - output_path = job_data.get('output_path') - if not output_path: - loaded_project_filename = os.path.basename(loaded_project_local_path) - output_filename = os.path.splitext(loaded_project_filename)[0] - else: - output_filename = os.path.basename(output_path) - - # Prepare output path - output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output') - output_path = os.path.join(output_dir, output_filename) - os.makedirs(output_dir, exist_ok=True) - logger.debug(f"New job output path: {output_path}") - - # create & configure jobs - worker = EngineManager.create_worker(renderer=job_data['renderer'], - input_path=loaded_project_local_path, - output_path=output_path, - engine_version=job_data.get('engine_version'), - args=job_data.get('args', {})) - worker.status = job_data.get("initial_status", worker.status) - worker.parent = job_data.get("parent", worker.parent) - worker.name = job_data.get("name", worker.name) - worker.priority = int(job_data.get('priority', worker.priority)) - worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) - worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) - - # determine if we can / should split the job - if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: - DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) - else: - logger.debug("Not splitting into subjobs") - - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) - if not worker.parent: - from src.api.api_server import make_job_ready - make_job_ready(worker.id) - results.append(worker.json()) - except FileNotFoundError as e: - err_msg = f"Cannot create job: {e}" - logger.error(err_msg) - results.append({'error': err_msg}) - except Exception as e: - err_msg = f"Exception creating render job: {e}" - logger.exception(err_msg) - results.append({'error': err_msg}) - - return results diff --git a/src/api/api_server.py b/src/api/api_server.py index aab98dc..8c9c418 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -17,7 +17,7 @@ import psutil import yaml from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort -from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs +from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.serverproxy_manager import ServerProxyManager from src.distributed_job_manager import DistributedJobManager from src.engines.core.base_worker import string_to_status, RenderStatus @@ -182,24 +182,6 @@ def get_file_list(job_id): return RenderQueue.job_with_id(job_id).file_list() -@server.get('/api/job//make_ready') -def make_job_ready(job_id): - try: - found_job = RenderQueue.job_with_id(job_id) - if found_job.status in [RenderStatus.CONFIGURING, RenderStatus.NOT_STARTED]: - if found_job.children: - for child_key in found_job.children.keys(): - child_id = child_key.split('@')[0] - hostname = child_key.split('@')[-1] - ServerProxyManager.get_proxy_for_hostname(hostname).request_data(f'job/{child_id}/make_ready') - found_job.status = RenderStatus.NOT_STARTED - RenderQueue.save_state() - return found_job.json(), 200 - except Exception as e: - return f"Error making job ready: {e}", 500 - return "Not valid command", 405 - - @server.route('/api/job//download_all') def download_all(job_id): zip_filename = None @@ -207,7 +189,10 @@ def download_all(job_id): @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): - os.remove(zip_filename) + try: + os.remove(zip_filename) + except Exception as e: + logger.warning(f"Error removing zip file '{zip_filename}': {e}") return response found_job = RenderQueue.job_with_id(job_id) @@ -283,13 +268,13 @@ def add_job_handler(): if loaded_project_local_path.lower().endswith('.zip'): loaded_project_local_path = process_zipped_project(loaded_project_local_path) - results = create_render_jobs(jobs_list, loaded_project_local_path) - for response in results: - if response.get('error', None): - return results, 400 + results = [] + for new_job_data in jobs_list: + new_job = DistributedJobManager.create_render_job(new_job_data, loaded_project_local_path) + results.append(new_job.json()) return results, 200 except Exception as e: - logger.exception(f"Unknown error adding job: {e}") + logger.exception(f"Error adding job: {e}") return 'unknown error', 500 diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 7aee89e..5f82431 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -17,7 +17,8 @@ status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', R RenderStatus.RUNNING: 'cyan', RenderStatus.WAITING_FOR_SUBJOBS: 'blue'} categories = [RenderStatus.RUNNING, RenderStatus.WAITING_FOR_SUBJOBS, RenderStatus.ERROR, RenderStatus.NOT_STARTED, - RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] + RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED, + RenderStatus.CONFIGURING] logger = logging.getLogger() OFFLINE_MAX = 4 diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 1ba9ac1..d18cdf4 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -1,15 +1,17 @@ import logging import os import socket +import threading import time import zipfile +from concurrent.futures import ThreadPoolExecutor import requests from plyer import notification from pubsub import pub -from concurrent.futures import ThreadPoolExecutor from src.api.server_proxy import RenderServerProxy +from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue from src.utilities.misc_helper import get_file_size_human from src.utilities.status_utils import RenderStatus, string_to_status @@ -86,6 +88,68 @@ class DistributedJobManager: except Exception as e: logger.debug(f"Unable to show UI notification: {e}") + # -------------------------------------------- + # Create Job + # -------------------------------------------- + + @classmethod + def create_render_job(cls, job_data, loaded_project_local_path): + """ + Creates render jobs. + + This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render + job for each job data in the list and appends the result to a list. The list of results is then returned. + + Args: + job_data (dict): Job data. + loaded_project_local_path (str): The local path to the loaded project. + + Returns: + worker: Created job worker + """ + + # get new output path in output_dir + output_path = job_data.get('output_path') + if not output_path: + loaded_project_filename = os.path.basename(loaded_project_local_path) + output_filename = os.path.splitext(loaded_project_filename)[0] + else: + output_filename = os.path.basename(output_path) + + # Prepare output path + output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output') + output_path = os.path.join(output_dir, output_filename) + os.makedirs(output_dir, exist_ok=True) + logger.debug(f"New job output path: {output_path}") + + # create & configure jobs + worker = EngineManager.create_worker(renderer=job_data['renderer'], + input_path=loaded_project_local_path, + output_path=output_path, + engine_version=job_data.get('engine_version'), + args=job_data.get('args', {})) + worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary? + worker.parent = job_data.get("parent", worker.parent) + worker.name = job_data.get("name", worker.name) + worker.priority = int(job_data.get('priority', worker.priority)) + worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) + worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) + worker.hostname = socket.gethostname() + + # determine if we can / should split the job + if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: + cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) + else: + logger.debug("Not splitting into subjobs") + + RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + + return worker + + # -------------------------------------------- + # Handling Subjobs + # -------------------------------------------- + @classmethod def handle_subjob_status_change(cls, local_job, subjob_data): """ @@ -142,7 +206,7 @@ class DistributedJobManager: RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: - logger.exception(f"Exception downloading files from remote server: {e}") + logger.error(f"Error downloading files from remote server: {e}") local_job.children[child_key]['download_status'] = 'failed' return False @@ -218,8 +282,20 @@ class DistributedJobManager: f"{', '.join(list(subjobs_not_downloaded().keys()))}") time.sleep(5) + # -------------------------------------------- + # Creating Subjobs + # -------------------------------------------- + @classmethod - def split_into_subjobs(cls, worker, job_data, project_path, system_os=None): + def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None): + # todo: I don't love this + parent_worker.status = RenderStatus.CONFIGURING + cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data, + project_path, system_os)) + cls.background_worker.start() + + @classmethod + def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None): """ Splits a job into subjobs and distributes them among available servers. @@ -228,56 +304,50 @@ class DistributedJobManager: subjob. Args: - worker (Worker): The worker that is handling the job. + parent_worker (Worker): The worker that is handling the job. job_data (dict): The data for the job to be split. project_path (str): The path to the project associated with the job. - system_os (str, optional): The operating system of the servers. Defaults to None. + system_os (str, optional): The operating system of the servers. Default is any OS. + specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. """ # Check availability - available_servers = cls.find_available_servers(worker.renderer, system_os) + parent_worker.status = RenderStatus.CONFIGURING + available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, system_os) logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") - subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) - local_hostname = socket.gethostname() + subjob_servers = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) + # Prep and submit these sub-jobs - logger.info(f"Job {worker.id} split plan: {subjob_servers}") + logger.info(f"Job {parent_worker.id} split plan: {subjob_servers}") try: - for server_data in subjob_servers: - server_hostname = server_data['hostname'] - if server_hostname != local_hostname: - post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, - server_hostname, worker) - if post_results.ok: - server_data['submission_results'] = post_results.json()[0] - else: - logger.error(f"Failed to create subjob on {server_hostname}") - break + for subjob_data in subjob_servers: + subjob_hostname = subjob_data['hostname'] + if subjob_hostname != parent_worker.hostname: + post_results = cls.__create_subjob(job_data, parent_worker.hostname, project_path, subjob_data, + subjob_hostname, parent_worker) + if not post_results.ok: + ValueError(f"Failed to create subjob on {subjob_hostname}") + + # save child info + submission_results = post_results.json()[0] + child_key = f"{submission_results['id']}@{subjob_hostname}" + parent_worker.children[child_key] = submission_results else: # truncate parent render_job - worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) - worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) - logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") - server_data['submission_results'] = worker.json() - - # check that job posts were all successful. - if not all(d.get('submission_results') is not None for d in subjob_servers): - raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs + parent_worker.start_frame = max(subjob_data['frame_range'][0], parent_worker.start_frame) + parent_worker.end_frame = min(subjob_data['frame_range'][-1], parent_worker.end_frame) + logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}") # start subjobs - logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") - for server_data in subjob_servers: - if server_data['hostname'] != local_hostname: - child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" - worker.children[child_key] = server_data['submission_results'] - worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" - + logger.debug(f"Created {len(subjob_servers) - 1} subjobs successfully") + parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]" + parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") - # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in - # submission_results.items()] # todo: fix this + RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker): @@ -292,6 +362,10 @@ class DistributedJobManager: file_path=project_path, job_list=[subjob]) return post_results + # -------------------------------------------- + # Server Handling + # -------------------------------------------- + @staticmethod def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'): """ @@ -440,7 +514,7 @@ if __name__ == '__main__': print("Starting Zeroconf...") time.sleep(2) available_servers = DistributedJobManager.find_available_servers('blender') - print(f"AVAILABLE SERVERS: {available_servers}") + print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}") results = DistributedJobManager.distribute_server_work(1, 100, available_servers) print(f"RESULTS: {results}") ZeroconfServer.stop() diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index f5132b4..70cafb3 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -11,6 +11,7 @@ import psutil from pubsub import pub from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.mutable import MutableDict from src.utilities.misc_helper import get_time_elapsed from src.utilities.status_utils import RenderStatus, string_to_status @@ -23,6 +24,7 @@ class BaseRenderWorker(Base): __tablename__ = 'render_workers' id = Column(String, primary_key=True) + hostname = Column(String, nullable=True) input_path = Column(String) output_path = Column(String) date_created = Column(DateTime) @@ -36,7 +38,7 @@ class BaseRenderWorker(Base): start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) parent = Column(String, nullable=True) - children = Column(JSON) + children = Column(MutableDict.as_mutable(JSON)) name = Column(String) file_hash = Column(String) _status = Column(String) @@ -60,6 +62,7 @@ class BaseRenderWorker(Base): # Essential Info self.id = generate_id() + self.hostname = None self.input_path = input_path self.output_path = output_path self.args = args or {} @@ -85,7 +88,7 @@ class BaseRenderWorker(Base): self.end_time = None # History - self.status = RenderStatus.CONFIGURING + self.status = RenderStatus.NOT_STARTED self.warnings = [] self.errors = [] @@ -306,6 +309,7 @@ class BaseRenderWorker(Base): job_dict = { 'id': self.id, 'name': self.name, + 'hostname': self.hostname, 'input_path': self.input_path, 'output_path': self.output_path, 'priority': self.priority, diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index 2274557..de7ba49 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -208,7 +208,7 @@ class EngineManager: worker_class = cls.engine_with_name(renderer).worker_class() # check to make sure we have versions installed - all_versions = EngineManager.all_versions_for_engine(renderer) + all_versions = cls.all_versions_for_engine(renderer) if not all_versions: raise FileNotFoundError(f"Cannot find any installed {renderer} engines") @@ -222,7 +222,7 @@ class EngineManager: # Download the required engine if not found locally if not engine_path: - download_result = EngineManager.download_engine(renderer, engine_version) + download_result = cls.download_engine(renderer, engine_version) if not download_result: raise FileNotFoundError(f"Cannot download requested version: {renderer} {engine_version}") engine_path = download_result['path'] diff --git a/src/init.py b/src/init.py index 7026a37..358a374 100644 --- a/src/init.py +++ b/src/init.py @@ -8,10 +8,10 @@ from collections import deque from PyQt6.QtCore import QObject, pyqtSignal from PyQt6.QtWidgets import QApplication -from .render_queue import RenderQueue -from .ui.main_window import MainWindow - from src.api.api_server import start_server +from src.engines.engine_manager import EngineManager +from src.render_queue import RenderQueue +from src.ui.main_window import MainWindow from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path @@ -28,6 +28,9 @@ def run() -> int: # Load Config YAML Config.setup_config_dir() Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml'))) + EngineManager.engines_path = system_safe_path( + os.path.join(os.path.join(os.path.expanduser(Config.upload_folder), + 'engines'))) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=Config.server_log_level.upper()) diff --git a/src/render_queue.py b/src/render_queue.py index 4507b38..884b7a3 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -32,7 +32,7 @@ class RenderQueue: def add_to_render_queue(cls, render_job, force_start=False): logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) cls.job_queue.append(render_job) - if force_start: + if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): cls.start_job(render_job) cls.session.add(render_job) cls.save_state()