From c1e5fd11299a543f767f7b63ea72a8053ebbbde1 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 18:59:01 -0500 Subject: [PATCH] Updated wait_for_subjobs --- src/distributed_job_manager.py | 72 +++++++++++++++++----------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 8a9e431..c076f07 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -198,62 +198,62 @@ class DistributedJobManager: parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS server_proxys = {} - def incomplete_subjobs(): - incomplete = {} - for child in parent_job.children.keys(): - subjob_id, subjob_hostname = child.split('@') - if not server_proxys.get(subjob_hostname): - server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) - proxy = server_proxys.get(subjob_hostname) - job_info = proxy.get_job_info(subjob_id) - if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']: - incomplete[child] = job_info - return incomplete - logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}') - while len(incomplete_subjobs()): + def fetch_subjob_info(child_key): + """ + Fetch subjob information from the remote server using a RenderServerProxy. - for child_key, subjob_cached_data in incomplete_subjobs().items(): + Parameters: + child_key (str): The key representing the subjob. - subjob_id = child_key.split('@')[0] - subjob_hostname = child_key.split('@')[-1] + Returns: + dict: Subjob information. + """ + subjob_id, subjob_hostname = child_key.split('@') + if subjob_hostname not in server_proxys: + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + return server_proxys[subjob_hostname].get_job_info(subjob_id) - if not server_proxys.get(subjob_hostname): - server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + while True: + incomplete_jobs = {} + + for child_key in list( + parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration + subjob_data = fetch_subjob_info(child_key) - # Fetch info from server and handle failing case - subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id) if not subjob_data: + subjob_id, subjob_hostname = child_key.split('@') last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}") last_connection_max_time = 12 if last_connection.seconds > last_connection_max_time: - # after a certain amount of time offline, consider the child offline - logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") - logger.warning(f"Spinning up new subjob to replace offlined server") + logger.error( + f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") + logger.warning(f"Spinning up a new subjob to replace the offlined server") parent_job.children[child_key]['errors'] = ['Renderer went offline'] + parent_job.children[child_key]['status'] = RenderStatus.ERROR - # schedule a new job to replace the failed one - local_hostname = socket.gethostname() - cls.new_create_subjob(parent_job.id, local_hostname, - parent_job.children[child_key]['start_frame'], - parent_job.children[child_key]['end_frame']) + cls.handle_subjob_status_change(parent_job_id=parent_job.id, + subjob_data=parent_job.children[child_key]) continue - # Update parent job cache parent_job.children[child_key] = subjob_data - # percentage status = string_to_status(subjob_data.get('status', '')) - status_msg = f"Subjob {child_key} | {status} | " \ - f"{float(subjob_data.get('percent_complete')) * 100.0}%" + status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%" logger.debug(status_msg) - if incomplete_subjobs(): - logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on " - f"{', '.join(list(incomplete_subjobs().keys()))}") - time.sleep(5) + if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]: + incomplete_jobs[child_key] = subjob_data + + if incomplete_jobs: + logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}") + else: + logger.debug("No more incomplete subjobs") + if not cls.completion_hold_enabled: + break + time.sleep(5) @classmethod def split_into_subjobs(cls, worker, job_data, project_path):