From a5e9ac0014544b444b0d6d0edfe28bcbdc1200aa Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 15:54:14 -0500 Subject: [PATCH] wait_for_subjobs rewrite --- src/distributed_job_manager.py | 80 ++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 4180ab3..39dd3de 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -203,60 +203,66 @@ class DistributedJobManager: return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod - def wait_for_subjobs(cls, local_job): - logger.debug(f"Waiting for subjobs for job {local_job}") - local_job.status = RenderStatus.WAITING_FOR_SUBJOBS - statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] + def wait_for_subjobs(cls, parent_job): + logger.debug(f"Waiting for subjobs for job {parent_job}") + parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - def subjobs_not_downloaded(): - return {k: v for k, v in local_job.children.items() if 'download_status' not in v or - v['download_status'] == 'working' or v['download_status'] is None} + server_proxys = {} + def incomplete_subjobs(): + incomplete = {} + for child in parent_job.children.keys(): + subjob_id, subjob_hostname = child.split('@') + if not server_proxys.get(subjob_hostname): + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + proxy = server_proxys.get(subjob_hostname) + job_info = proxy.get_job_info(subjob_id) + if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']: + incomplete[child] = job_info + return incomplete - logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}') + logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}') + while len(incomplete_subjobs()): - while len(subjobs_not_downloaded()): - for child_key, subjob_cached_data in subjobs_not_downloaded().items(): + for child_key, subjob_cached_data in incomplete_subjobs().items(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] + if not server_proxys.get(subjob_hostname): + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + # Fetch info from server and handle failing case - subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) + subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id) if not subjob_data: - logger.warning(f"No response from: {subjob_hostname}") - # todo: handle timeout / missing server situations + last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact + logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}") + + last_connection_max_time = 12 + if last_connection.seconds > last_connection_max_time: + # after a certain amount of time offline, consider the child offline + logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") + logger.warning(f"Spinning up new subjob to replace offlined server") + parent_job.children[child_key]['errors'] = ['Renderer went offline'] + + # schedule a new job to replace the failed one + local_hostname = socket.gethostname() + cls.new_create_subjob(parent_job.id, local_hostname, + parent_job.children[child_key]['start_frame'], + parent_job.children[child_key]['end_frame']) continue - # Update parent job cache but keep the download status - download_status = local_job.children[child_key].get('download_status', None) - local_job.children[child_key] = subjob_data - local_job.children[child_key]['download_status'] = download_status + # Update parent job cache + parent_job.children[child_key] = subjob_data + # percentage status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) - # Still working in another thread - keep waiting - if download_status == 'working': - continue - - # Check if job is finished, but has not had files copied yet over yet - if download_status is None and subjob_data['file_count'] and status in statuses_to_download: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - logger.error("Failed to download from subjob") - # todo: error handling here - - # Any finished jobs not successfully downloaded at this point are skipped - if local_job.children[child_key].get('download_status', None) is None and \ - status in statuses_to_download: - logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") - local_job.children[child_key]['download_status'] = 'skipped' - - if subjobs_not_downloaded(): - logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " - f"{', '.join(list(subjobs_not_downloaded().keys()))}") + if incomplete_subjobs(): + logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on " + f"{', '.join(list(incomplete_subjobs().keys()))}") time.sleep(5) @classmethod