mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 08:48:13 +00:00
wait_for_subjobs rewrite
This commit is contained in:
@@ -203,60 +203,66 @@ class DistributedJobManager:
|
|||||||
return local_job.children[child_key].get('download_status', None) == 'complete'
|
return local_job.children[child_key].get('download_status', None) == 'complete'
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def wait_for_subjobs(cls, local_job):
|
def wait_for_subjobs(cls, parent_job):
|
||||||
logger.debug(f"Waiting for subjobs for job {local_job}")
|
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
||||||
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||||
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
|
||||||
|
|
||||||
def subjobs_not_downloaded():
|
server_proxys = {}
|
||||||
return {k: v for k, v in local_job.children.items() if 'download_status' not in v or
|
def incomplete_subjobs():
|
||||||
v['download_status'] == 'working' or v['download_status'] is None}
|
incomplete = {}
|
||||||
|
for child in parent_job.children.keys():
|
||||||
|
subjob_id, subjob_hostname = child.split('@')
|
||||||
|
if not server_proxys.get(subjob_hostname):
|
||||||
|
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
|
||||||
|
proxy = server_proxys.get(subjob_hostname)
|
||||||
|
job_info = proxy.get_job_info(subjob_id)
|
||||||
|
if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']:
|
||||||
|
incomplete[child] = job_info
|
||||||
|
return incomplete
|
||||||
|
|
||||||
logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}')
|
logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}')
|
||||||
|
while len(incomplete_subjobs()):
|
||||||
|
|
||||||
while len(subjobs_not_downloaded()):
|
for child_key, subjob_cached_data in incomplete_subjobs().items():
|
||||||
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
|
|
||||||
|
|
||||||
subjob_id = child_key.split('@')[0]
|
subjob_id = child_key.split('@')[0]
|
||||||
subjob_hostname = child_key.split('@')[-1]
|
subjob_hostname = child_key.split('@')[-1]
|
||||||
|
|
||||||
|
if not server_proxys.get(subjob_hostname):
|
||||||
|
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
|
||||||
|
|
||||||
# Fetch info from server and handle failing case
|
# Fetch info from server and handle failing case
|
||||||
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
|
subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id)
|
||||||
if not subjob_data:
|
if not subjob_data:
|
||||||
logger.warning(f"No response from: {subjob_hostname}")
|
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
|
||||||
# todo: handle timeout / missing server situations
|
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
|
||||||
|
|
||||||
|
last_connection_max_time = 12
|
||||||
|
if last_connection.seconds > last_connection_max_time:
|
||||||
|
# after a certain amount of time offline, consider the child offline
|
||||||
|
logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
|
||||||
|
logger.warning(f"Spinning up new subjob to replace offlined server")
|
||||||
|
parent_job.children[child_key]['errors'] = ['Renderer went offline']
|
||||||
|
|
||||||
|
# schedule a new job to replace the failed one
|
||||||
|
local_hostname = socket.gethostname()
|
||||||
|
cls.new_create_subjob(parent_job.id, local_hostname,
|
||||||
|
parent_job.children[child_key]['start_frame'],
|
||||||
|
parent_job.children[child_key]['end_frame'])
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Update parent job cache but keep the download status
|
# Update parent job cache
|
||||||
download_status = local_job.children[child_key].get('download_status', None)
|
parent_job.children[child_key] = subjob_data
|
||||||
local_job.children[child_key] = subjob_data
|
|
||||||
local_job.children[child_key]['download_status'] = download_status
|
|
||||||
|
|
||||||
|
# percentage
|
||||||
status = string_to_status(subjob_data.get('status', ''))
|
status = string_to_status(subjob_data.get('status', ''))
|
||||||
status_msg = f"Subjob {child_key} | {status} | " \
|
status_msg = f"Subjob {child_key} | {status} | " \
|
||||||
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
|
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
|
||||||
logger.debug(status_msg)
|
logger.debug(status_msg)
|
||||||
|
|
||||||
# Still working in another thread - keep waiting
|
if incomplete_subjobs():
|
||||||
if download_status == 'working':
|
logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on "
|
||||||
continue
|
f"{', '.join(list(incomplete_subjobs().keys()))}")
|
||||||
|
|
||||||
# Check if job is finished, but has not had files copied yet over yet
|
|
||||||
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
|
||||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
|
||||||
if not download_result:
|
|
||||||
logger.error("Failed to download from subjob")
|
|
||||||
# todo: error handling here
|
|
||||||
|
|
||||||
# Any finished jobs not successfully downloaded at this point are skipped
|
|
||||||
if local_job.children[child_key].get('download_status', None) is None and \
|
|
||||||
status in statuses_to_download:
|
|
||||||
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
|
|
||||||
local_job.children[child_key]['download_status'] = 'skipped'
|
|
||||||
|
|
||||||
if subjobs_not_downloaded():
|
|
||||||
logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on "
|
|
||||||
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
|
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user