Updated wait_for_subjobs

This commit is contained in:
Brett Williams
2023-10-25 18:59:01 -05:00
parent b646c1f848
commit 3b975418de

View File

@@ -195,61 +195,61 @@ class DistributedJobManager:
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
server_proxys = {} server_proxys = {}
def incomplete_subjobs():
incomplete = {} def fetch_subjob_info(child_key):
for child in parent_job.children.keys(): """
subjob_id, subjob_hostname = child.split('@') Fetch subjob information from the remote server using a RenderServerProxy.
if not server_proxys.get(subjob_hostname):
Parameters:
child_key (str): The key representing the subjob.
Returns:
dict: Subjob information.
"""
subjob_id, subjob_hostname = child_key.split('@')
if subjob_hostname not in server_proxys:
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
proxy = server_proxys.get(subjob_hostname) return server_proxys[subjob_hostname].get_job_info(subjob_id)
job_info = proxy.get_job_info(subjob_id)
if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']:
incomplete[child] = job_info
return incomplete
logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}') while True:
while len(incomplete_subjobs()): incomplete_jobs = {}
for child_key, subjob_cached_data in incomplete_subjobs().items(): for child_key in list(
parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration
subjob_data = fetch_subjob_info(child_key)
subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1]
if not server_proxys.get(subjob_hostname):
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
# Fetch info from server and handle failing case
subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id)
if not subjob_data: if not subjob_data:
subjob_id, subjob_hostname = child_key.split('@')
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}") logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
last_connection_max_time = 12 last_connection_max_time = 12
if last_connection.seconds > last_connection_max_time: if last_connection.seconds > last_connection_max_time:
# after a certain amount of time offline, consider the child offline logger.error(
logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
logger.warning(f"Spinning up new subjob to replace offlined server") logger.warning(f"Spinning up a new subjob to replace the offlined server")
parent_job.children[child_key]['errors'] = ['Renderer went offline'] parent_job.children[child_key]['errors'] = ['Renderer went offline']
parent_job.children[child_key]['status'] = RenderStatus.ERROR
# schedule a new job to replace the failed one cls.handle_subjob_status_change(parent_job_id=parent_job.id,
local_hostname = socket.gethostname() subjob_data=parent_job.children[child_key])
cls.new_create_subjob(parent_job.id, local_hostname,
parent_job.children[child_key]['start_frame'],
parent_job.children[child_key]['end_frame'])
continue continue
# Update parent job cache
parent_job.children[child_key] = subjob_data parent_job.children[child_key] = subjob_data
# percentage
status = string_to_status(subjob_data.get('status', '')) status = string_to_status(subjob_data.get('status', ''))
status_msg = f"Subjob {child_key} | {status} | " \ status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%"
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
logger.debug(status_msg) logger.debug(status_msg)
if incomplete_subjobs(): if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]:
logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on " incomplete_jobs[child_key] = subjob_data
f"{', '.join(list(incomplete_subjobs().keys()))}")
if incomplete_jobs:
logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}")
else:
logger.debug("No more incomplete subjobs")
if not cls.completion_hold_enabled:
break
time.sleep(5) time.sleep(5)
@classmethod @classmethod