From e757506787a16b95a5bbbe72bc143f46cb110fad Mon Sep 17 00:00:00 2001 From: Brett Date: Sat, 10 Aug 2024 21:19:01 -0500 Subject: [PATCH] Parent creates local subjobs instead of truncating original (#95) * Parent worker now creates subjob on local host and waits for it * Improve wait_for_subjobs logic * Fix setting end_time for base_worker * API cleanup * Code refactoring * Cleanup --- src/api/api_server.py | 10 +- src/distributed_job_manager.py | 132 +++++++++++++------------ src/engines/core/base_worker.py | 164 ++++++++++++++++++-------------- 3 files changed, 168 insertions(+), 138 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index 1eb0f6b..77d3ba1 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -161,13 +161,9 @@ def filtered_jobs_json(status_val): @server.post('/api/job//send_subjob_update_notification') def subjob_update_notification(job_id): - try: - subjob_details = request.json - logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") - DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) - return Response(status=200) - except JobNotFoundError: - return "Job not found", 404 + subjob_details = request.json + DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + return Response(status=200) @server.get('/api/job/') diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index d01a5af..aac7788 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -179,7 +179,7 @@ class DistributedJobManager: if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) else: - logger.debug("Not splitting into subjobs") + worker.status = RenderStatus.NOT_STARTED RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) PreviewManager.update_previews_for_job(worker) @@ -211,11 +211,13 @@ class DistributedJobManager: if old_status != subjob_status.value: logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") - cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) + download_success = cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) + if subjob_data['status'] == 'completed' and download_success: + local_job.children[subjob_key]['download_status'] = 'completed' @staticmethod def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): - + success = True try: local_files = [os.path.basename(x) for x in local_job.file_list()] subjob_proxy = RenderServerProxy(subjob_hostname) @@ -231,8 +233,11 @@ class DistributedJobManager: logger.debug(f'Downloaded successfully - {local_save_path}') except Exception as e: logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}") + success = False except Exception as e: logger.exception(f'Uncaught exception while trying to download from subjob: {e}') + success = False + return success @staticmethod def download_all_from_subjob(local_job, subjob_id, subjob_hostname): @@ -279,62 +284,67 @@ class DistributedJobManager: return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod - def wait_for_subjobs(cls, local_job): - # todo: rewrite this method - logger.debug(f"Waiting for subjobs for job {local_job}") - local_job.status = RenderStatus.WAITING_FOR_SUBJOBS + def wait_for_subjobs(cls, parent_job): + logger.debug(f"Waiting for subjobs for job {parent_job}") + parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] def subjobs_not_downloaded(): - return {k: v for k, v in local_job.children.items() if 'download_status' not in v or + return {k: v for k, v in parent_job.children.items() if 'download_status' not in v or v['download_status'] == 'working' or v['download_status'] is None} - logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}') + logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {parent_job.id}') - while len(subjobs_not_downloaded()): - for child_key, subjob_cached_data in subjobs_not_downloaded().items(): + server_delay = 10 + sleep_counter = 0 + while parent_job.status == RenderStatus.WAITING_FOR_SUBJOBS: - subjob_id = child_key.split('@')[0] - subjob_hostname = child_key.split('@')[-1] + if sleep_counter % server_delay == 0: # only ping servers every x seconds + for child_key, subjob_cached_data in subjobs_not_downloaded().items(): - # Fetch info from server and handle failing case - subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) - if not subjob_data: - logger.warning(f"No response from: {subjob_hostname}") - # todo: handle timeout / missing server situations - continue + subjob_id = child_key.split('@')[0] + subjob_hostname = child_key.split('@')[-1] - # Update parent job cache but keep the download status - download_status = local_job.children[child_key].get('download_status', None) - local_job.children[child_key] = subjob_data - local_job.children[child_key]['download_status'] = download_status + # Fetch info from server and handle failing case + subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) + if not subjob_data: + logger.warning(f"No response from {subjob_hostname}") + # timeout / missing server situations + parent_job.children[child_key]['download_status'] = f'error: No response from {subjob_hostname}' + continue - status = string_to_status(subjob_data.get('status', '')) - status_msg = f"Subjob {child_key} | {status} | " \ - f"{float(subjob_data.get('percent_complete')) * 100.0}%" - logger.debug(status_msg) + # Update parent job cache but keep the download status + download_status = parent_job.children[child_key].get('download_status', None) + parent_job.children[child_key] = subjob_data + parent_job.children[child_key]['download_status'] = download_status - # Still working in another thread - keep waiting - if download_status == 'working': - continue + status = string_to_status(subjob_data.get('status', '')) + status_msg = f"Subjob {child_key} | {status} | " \ + f"{float(subjob_data.get('percent_complete')) * 100.0}%" + logger.debug(status_msg) - # Check if job is finished, but has not had files copied yet over yet - if download_status is None and subjob_data['file_count'] and status in statuses_to_download: - try: - cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) - except Exception as e: - logger.error(f"Error downloading missing frames from subjob: {e}") + # Check if job is finished, but has not had files copied yet over yet + if download_status is None and subjob_data['file_count'] and status in statuses_to_download: + try: + cls.download_missing_frames_from_subjob(parent_job, subjob_id, subjob_hostname) + parent_job.children[child_key]['download_status'] = 'complete' + except Exception as e: + logger.error(f"Error downloading missing frames from subjob: {e}") + parent_job.children[child_key]['download_status'] = 'error: {}' - # Any finished jobs not successfully downloaded at this point are skipped - if local_job.children[child_key].get('download_status', None) is None and \ - status in statuses_to_download: - logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") - local_job.children[child_key]['download_status'] = 'skipped' + # Any finished jobs not successfully downloaded at this point are skipped + if parent_job.children[child_key].get('download_status', None) is None and \ + status in statuses_to_download: + logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") + parent_job.children[child_key]['download_status'] = 'skipped' if subjobs_not_downloaded(): logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " f"{', '.join(list(subjobs_not_downloaded().keys()))}") - time.sleep(5) + time.sleep(1) + sleep_counter += 1 + else: # exit the loop + parent_job.status = RenderStatus.RUNNING # -------------------------------------------- # Creating Subjobs @@ -366,9 +376,15 @@ class DistributedJobManager: """ # Check availability - parent_worker.status = RenderStatus.CONFIGURING - available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, system_os) - logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") + available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, + system_os) + # skip if theres no external servers found + external_servers = [x for x in available_servers if x['hostname'] != parent_worker.hostname] + if not external_servers: + parent_worker.status = RenderStatus.NOT_STARTED + return + + logger.debug(f"Splitting into subjobs - Available servers: {[x['hostname'] for x in available_servers]}") all_subjob_server_data = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) # Prep and submit these sub-jobs @@ -376,25 +392,19 @@ class DistributedJobManager: try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] - if subjob_hostname != parent_worker.hostname: - post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, - parent_worker) - if not post_results.ok: - ValueError(f"Failed to create subjob on {subjob_hostname}") + post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, + parent_worker) + if not post_results.ok: + ValueError(f"Failed to create subjob on {subjob_hostname}") - # save child info - submission_results = post_results.json()[0] - child_key = f"{submission_results['id']}@{subjob_hostname}" - parent_worker.children[child_key] = submission_results - else: - # truncate parent render_job - parent_worker.start_frame = max(subjob_data['frame_range'][0], parent_worker.start_frame) - parent_worker.end_frame = min(subjob_data['frame_range'][-1], parent_worker.end_frame) - logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}") + # save child info + submission_results = post_results.json()[0] + child_key = f"{submission_results['id']}@{subjob_hostname}" + parent_worker.children[child_key] = submission_results # start subjobs - logger.debug(f"Created {len(all_subjob_server_data) - 1} subjobs successfully") - parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]" + logger.debug(f"Created {len(all_subjob_server_data)} subjobs successfully") + parent_worker.name = f"{parent_worker.name} (Parent)" parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts except Exception as e: # cancel all the subjobs diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index c1fbe75..0f16530 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -90,7 +90,7 @@ class BaseRenderWorker(Base): self.end_time = None # History - self.status = RenderStatus.NOT_STARTED + self.status = RenderStatus.CONFIGURING self.warnings = [] self.errors = [] @@ -158,7 +158,7 @@ class BaseRenderWorker(Base): def start(self): - if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED]: + if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED, RenderStatus.CONFIGURING]: logger.error(f"Trying to start job with status: {self.status}") return @@ -176,90 +176,114 @@ class BaseRenderWorker(Base): self.errors.append(msg) return - self.status = RenderStatus.RUNNING + self.status = RenderStatus.RUNNING if not self.children else RenderStatus.WAITING_FOR_SUBJOBS self.start_time = datetime.now() self.__thread.start() + # handle multiple attempts at running subprocess + def __run__subprocess_cycle(self, log_file): + subprocess_cmds = self.generate_subprocess() + initial_file_count = len(self.file_list()) + failed_attempts = 0 + + log_file.write(f"Running command: {subprocess_cmds}\n") + log_file.write('=' * 80 + '\n\n') + + while True: + # Log attempt # + if failed_attempts: + if failed_attempts >= self.maximum_attempts: + err_msg = f"Maximum attempts exceeded ({self.maximum_attempts})" + logger.error(err_msg) + self.status = RenderStatus.ERROR + self.errors.append(err_msg) + return + else: + log_file.write(f'\n{"=" * 20} Attempt #{failed_attempts + 1} {"=" * 20}\n\n') + logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}") + self.status = RenderStatus.RUNNING + + return_code = self.__setup_and_run_process(log_file, subprocess_cmds) + + message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \ + f"after {self.time_elapsed()}\n\n" + log_file.write(message) + + # don't try again if we've been cancelled + if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: + return + + # if file output hasn't increased, return as error, otherwise restart process. + file_count_has_increased = len(self.file_list()) > initial_file_count + if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code: + break + + if return_code: + err_msg = f"{self.engine.name()} render failed with code {return_code}" + logger.error(err_msg) + self.errors.append(err_msg) + + # handle instances where renderer exits ok but doesnt generate files + if not return_code and not file_count_has_increased: + err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. " + f"Count is still {len(self.file_list())}") + log_file.write(f'Error: {err_msg}\n\n') + self.errors.append(err_msg) + + # only count the attempt as failed if renderer creates no output - reset counter on successful output + failed_attempts = 0 if file_count_has_increased else failed_attempts + 1 + + def __run__wait_for_subjobs(self, logfile): + from src.distributed_job_manager import DistributedJobManager + DistributedJobManager.wait_for_subjobs(parent_job=self) + + @staticmethod + def log_and_print(message, log_file): + logger.info(message) + log_file.write(f"{message}\n") + def __run(self): - logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | ' - f'Frame Count: {self.total_frames}') # Setup logging log_dir = os.path.dirname(self.log_path()) os.makedirs(log_dir, exist_ok=True) - subprocess_cmds = self.generate_subprocess() - initial_file_count = len(self.file_list()) - failed_attempts = 0 + with open(self.log_path(), "a") as log_file: - with open(self.log_path(), "a") as f: + self.log_and_print(f"{self.start_time.isoformat()} - Starting " + f"{self.engine.name()} {self.renderer_version} render job for {self.name} " + f"({self.input_path})", log_file) + log_file.write(f"\n") + if not self.children: + self.__run__subprocess_cycle(log_file) + else: + self.__run__wait_for_subjobs(log_file) - f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} " - f"render for {self.input_path}\n\n") - f.write(f"Running command: {subprocess_cmds}\n") - f.write('=' * 80 + '\n\n') - - while True: - # Log attempt # - if failed_attempts: - if failed_attempts >= self.maximum_attempts: - err_msg = f"Maximum attempts exceeded ({self.maximum_attempts})" - logger.error(err_msg) - self.status = RenderStatus.ERROR - self.errors.append(err_msg) - return - else: - f.write(f'\n{"=" * 20} Attempt #{failed_attempts + 1} {"=" * 20}\n\n') - logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}") - self.status = RenderStatus.RUNNING - - return_code = self.__setup_and_run_process(f, subprocess_cmds) + if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: self.end_time = datetime.now() + message = f"{self.engine.name()} render ended with status '{self.status}' " \ + f"after {self.time_elapsed()}" + self.log_and_print(message, log_file) + return - message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \ - f"after {self.time_elapsed()}\n\n" - f.write(message) + # Validate Output + file_list_length = len(self.file_list()) + expected_list_length = (self.end_frame - self.start_frame + 1) if self.end_frame else 1 - # Teardown - if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: - message = f"{self.engine.name()} render ended with status '{self.status}' " \ - f"after {self.time_elapsed()}" - f.write(message) - return + if file_list_length not in (expected_list_length, 1): + logger.error(f"Expected length: {expected_list_length} | actual length: {len(self.file_list())}") + # todo: create new subjob - # if file output hasn't increased, return as error, otherwise restart process. - file_count_has_increased = len(self.file_list()) > initial_file_count - if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code: - message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in " - f"{self.time_elapsed()}\n") - f.write(message) - break + # Post Render Work + if not self.parent: + logger.debug(f"Starting post-processing work for {self}") + self.post_processing() + logger.debug(f"Completed post-processing work for {self}") - if return_code: - err_msg = f"{self.engine.name()} render failed with code {return_code}" - logger.error(err_msg) - self.errors.append(err_msg) - - # handle instances where renderer exits ok but doesnt generate files - if not return_code and not file_count_has_increased: - err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. " - f"Count is still {len(self.file_list())}") - f.write(f'Error: {err_msg}\n\n') - self.errors.append(err_msg) - - # only count the attempt as failed if renderer creates no output - ignore error codes for now - if not file_count_has_increased: - failed_attempts += 1 - - if self.children: - from src.distributed_job_manager import DistributedJobManager - DistributedJobManager.wait_for_subjobs(local_job=self) - - # Post Render Work - logger.debug("Starting post-processing work") - self.post_processing() - self.status = RenderStatus.COMPLETED - logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}") + self.status = RenderStatus.COMPLETED + self.end_time = datetime.now() + message = f"Render {self.name} completed successfully after {self.time_elapsed()}" + self.log_and_print(message, log_file) def __setup_and_run_process(self, f, subprocess_cmds):