From 0fe50bc1755778a888bbc5367e9a42b0ffb57064 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 18:53:41 -0500 Subject: [PATCH] Misc cleanup --- src/api/add_job_helpers.py | 2 +- src/distributed_job_manager.py | 64 ++++++++++++++------------------- src/engines/core/base_worker.py | 2 +- src/render_queue.py | 10 +++--- 4 files changed, 35 insertions(+), 43 deletions(-) diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py index 28280b3..39dd590 100644 --- a/src/api/add_job_helpers.py +++ b/src/api/add_job_helpers.py @@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) else: logger.debug("Not splitting into subjobs") + RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) if not worker.parent: from src.api.api_server import make_job_ready make_job_ready(worker.id) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 5444cb2..9791ffc 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -82,7 +82,7 @@ class DistributedJobManager: ) @classmethod - def handle_subjob_status_change(cls, local_job_id, subjob_data): + def handle_subjob_status_change(cls, parent_job_id, subjob_data): """ Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. @@ -93,54 +93,35 @@ class DistributedJobManager: Returns: None """ - local_job = RenderQueue.job_with_id(local_job_id) + parent_job = RenderQueue.job_with_id(parent_job_id) subjob_id = subjob_data['id'] - subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if + subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if hostname.split('@')[0] == subjob_id), None) subjob_key = f'{subjob_id}@{subjob_hostname}' # Update the local job's subjob data - local_job.children = dict(local_job.children) # copy as dict to work around sqlalchemy update issue - local_job.children[subjob_key] = subjob_data + parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue + parent_job.children[subjob_key] = subjob_data - logname = f"{local_job_id}:{subjob_key}" + logname = f"{parent_job_id}:{subjob_key}" subjob_status = string_to_status(subjob_data['status']) logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") # Handle downloading for completed, cancelled, or error'd subjobs if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and subjob_data['file_count']): - if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname): + if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname): logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") # Handle cancelled or errored subjobs by determining missing frames and scheduling a new job if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: - cls.handle_cancelled_or_errored_subjob(local_job, subjob_data) - - @classmethod - def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data): - """ - Handles cancelled or errored subjobs by determining missing frames and scheduling a new job. - - Parameters: - local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): Subjob data for the cancelled or errored subjob. - - Returns: - None - """ - # Determine missing frames based on subjob_data - missing_frames = cls.determine_missing_frames(subjob_data) - - if missing_frames: - # Schedule a new job with the missing frames - new_job_data = { - # todo: Set the necessary data for the new job - } - cls.__create_subjob() + logger.info("Creating a new subjob") + cls.new_create_subjob(parent_job.id, socket.gethostname(), + parent_job.children[subjob_key]['start_frame'], + parent_job.children[subjob_key]['end_frame']) @staticmethod - def determine_missing_frames(subjob_data): + def determine_missing_frames(parent_job_id): """ Determine missing frames in the subjob. @@ -175,13 +156,11 @@ class DistributedJobManager: # download zip file from server try: - local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.exception(f"Exception downloading files from remote server: {e}") - local_job.children[child_key]['download_status'] = 'failed' return False # extract zip @@ -192,15 +171,26 @@ class DistributedJobManager: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) - local_job.children[child_key]['download_status'] = 'complete' + return True except Exception as e: logger.exception(f"Exception extracting zip file: {e}") - local_job.children[child_key]['download_status'] = 'failed' + return False - return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod def wait_for_subjobs(cls, parent_job): + """ + Wait for subjobs to complete and update the parent job's status. + + This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error + status. It updates the parent job's children with the latest subjob information. + + Parameters: + parent_job (BaseRenderWorker): The parent job worker. + + Returns: + None + """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS @@ -330,7 +320,7 @@ class DistributedJobManager: subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, 'args': parent_job.args, 'output_path': parent_job.output_path, 'engine_version': parent_job.renderer_version, 'start_frame': start_frame, - 'end_frame': end_frame, 'parent': f"{parent_job_id}@{socket.gethostname()}"} + 'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"} logger.debug(f"new subjob data: {subjob_data}") diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index c3ccbf9..3902b6c 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -237,7 +237,7 @@ class BaseRenderWorker(Base): if self.children: from src.distributed_job_manager import DistributedJobManager - DistributedJobManager.wait_for_subjobs(local_job=self) + DistributedJobManager.wait_for_subjobs(parent_job=self) # Post Render Work logger.debug("Starting post-processing work") diff --git a/src/render_queue.py b/src/render_queue.py index 2a081b7..702ce5e 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -65,10 +65,12 @@ class RenderQueue: @classmethod def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) - if not found_job and not none_ok: - raise JobNotFoundError(job_id) - return found_job + for job in cls.all_jobs(): + if job.id == job_id: + return job + if not none_ok: + raise JobNotFoundError(f"Cannot find job with id: {job_id}") + return None @classmethod def clear_history(cls):