diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 1120fd9..203e90a 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -88,32 +88,71 @@ class DistributedJobManager: Parameters: local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): subjob data sent from remote server. + subjob_data (dict): Subjob data sent from the remote server. Returns: None """ - subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if hostname.split('@')[0] == subjob_id), None) - local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data + subjob_key = f'{subjob_id}@{subjob_hostname}' - logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" + # Update the local job's subjob data + local_job.children[subjob_key] = subjob_data + + logname = f"{local_job.id}:{subjob_key}" + subjob_status = string_to_status(subjob_data['status']) logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") - # Download complete or partial render jobs - if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \ - subjob_data['file_count']: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - # todo: handle error + # Handle downloading for completed, cancelled, or error'd subjobs + if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] + and subjob_data['file_count']): + if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname): logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") + # Handle cancelled or errored subjobs by determining missing frames and scheduling a new job if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: - # todo: determine missing frames and schedule new job - pass + cls.handle_cancelled_or_errored_subjob(local_job, subjob_data) + + @classmethod + def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data): + """ + Handles cancelled or errored subjobs by determining missing frames and scheduling a new job. + + Parameters: + local_job (BaseRenderWorker): The local parent job worker. + subjob_data (dict): Subjob data for the cancelled or errored subjob. + + Returns: + None + """ + # Determine missing frames based on subjob_data + missing_frames = cls.determine_missing_frames(subjob_data) + + if missing_frames: + # Schedule a new job with the missing frames + new_job_data = { + # todo: Set the necessary data for the new job + } + cls.__create_subjob() + + @staticmethod + def determine_missing_frames(subjob_data): + """ + Determine missing frames in the subjob. + + Parameters: + subjob_data (dict): Subjob data. + + Returns: + list: List of missing frame numbers. + """ + # todo: Implement the logic to determine missing frames based on subjob_data + missing_frames = [] + return missing_frames + @staticmethod def download_from_subjob(local_job, subjob_id, subjob_hostname): @@ -247,6 +286,7 @@ class DistributedJobManager: # check that job posts were all successful. if not all(d.get('submission_results') is not None for d in subjob_servers): + # todo: rewrite this code - should not have to have all submissions go through raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # start subjobs