From 0f4a9b5ddd65f583353df9633dcd9f2009ead31b Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 07:28:49 -0500 Subject: [PATCH 01/10] Added two stubs for methods needed for dynamic subjob generation --- src/distributed_job_manager.py | 64 +++++++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 1120fd9..203e90a 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -88,32 +88,71 @@ class DistributedJobManager: Parameters: local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): subjob data sent from remote server. + subjob_data (dict): Subjob data sent from the remote server. Returns: None """ - subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if hostname.split('@')[0] == subjob_id), None) - local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data + subjob_key = f'{subjob_id}@{subjob_hostname}' - logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" + # Update the local job's subjob data + local_job.children[subjob_key] = subjob_data + + logname = f"{local_job.id}:{subjob_key}" + subjob_status = string_to_status(subjob_data['status']) logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") - # Download complete or partial render jobs - if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \ - subjob_data['file_count']: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - # todo: handle error + # Handle downloading for completed, cancelled, or error'd subjobs + if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] + and subjob_data['file_count']): + if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname): logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") + # Handle cancelled or errored subjobs by determining missing frames and scheduling a new job if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: - # todo: determine missing frames and schedule new job - pass + cls.handle_cancelled_or_errored_subjob(local_job, subjob_data) + + @classmethod + def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data): + """ + Handles cancelled or errored subjobs by determining missing frames and scheduling a new job. + + Parameters: + local_job (BaseRenderWorker): The local parent job worker. + subjob_data (dict): Subjob data for the cancelled or errored subjob. + + Returns: + None + """ + # Determine missing frames based on subjob_data + missing_frames = cls.determine_missing_frames(subjob_data) + + if missing_frames: + # Schedule a new job with the missing frames + new_job_data = { + # todo: Set the necessary data for the new job + } + cls.__create_subjob() + + @staticmethod + def determine_missing_frames(subjob_data): + """ + Determine missing frames in the subjob. + + Parameters: + subjob_data (dict): Subjob data. + + Returns: + list: List of missing frame numbers. + """ + # todo: Implement the logic to determine missing frames based on subjob_data + missing_frames = [] + return missing_frames + @staticmethod def download_from_subjob(local_job, subjob_id, subjob_hostname): @@ -247,6 +286,7 @@ class DistributedJobManager: # check that job posts were all successful. if not all(d.get('submission_results') is not None for d in subjob_servers): + # todo: rewrite this code - should not have to have all submissions go through raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # start subjobs From 7dff2e339395bbdbaedadaf23207d493376145a6 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 11:46:09 -0500 Subject: [PATCH 02/10] Fix issue where subjobs were not updating parent job json --- src/api/api_server.py | 2 +- src/distributed_job_manager.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index 1eab8fb..dc6fe47 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -168,7 +168,7 @@ def subjob_status_change(job_id): try: subjob_details = request.json logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") - DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + DistributedJobManager.handle_subjob_status_change(job_id, subjob_data=subjob_details) return Response(status=200) except JobNotFoundError: return "Job not found", 404 diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 203e90a..f2fa30b 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -82,12 +82,12 @@ class DistributedJobManager: ) @classmethod - def handle_subjob_status_change(cls, local_job, subjob_data): + def handle_subjob_status_change(cls, local_job_id, subjob_data): """ Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. Parameters: - local_job (BaseRenderWorker): The local parent job worker. + local_job_id (str): ID for local parent job worker. subjob_data (dict): Subjob data sent from the remote server. Returns: @@ -100,9 +100,10 @@ class DistributedJobManager: subjob_key = f'{subjob_id}@{subjob_hostname}' # Update the local job's subjob data + local_job.children = dict(local_job.children) # copy as dict to work around sqlalchemy update issue local_job.children[subjob_key] = subjob_data - logname = f"{local_job.id}:{subjob_key}" + logname = f"{local_job_id}:{subjob_key}" subjob_status = string_to_status(subjob_data['status']) logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") From 3e567060f8b32aaac1b4a7242aa296d50341b3d1 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 11:47:12 -0500 Subject: [PATCH 03/10] Missed a line --- src/distributed_job_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index f2fa30b..c5895b8 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -93,7 +93,7 @@ class DistributedJobManager: Returns: None """ - + local_job = RenderQueue.job_with_id(local_job_id) subjob_id = subjob_data['id'] subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if hostname.split('@')[0] == subjob_id), None) From 006a97a17a683ac79dbcf31f428149af2346ff35 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 13:41:14 -0500 Subject: [PATCH 04/10] Move the current_frame attribute to base_worker.py --- src/api/api_server.py | 3 +++ src/engines/blender/blender_worker.py | 1 - src/engines/core/base_worker.py | 5 +++-- src/engines/ffmpeg/ffmpeg_worker.py | 1 - 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index dc6fe47..25d2cdb 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -286,8 +286,10 @@ def add_job_handler(): try: if request.is_json: jobs_list = [request.json] if not isinstance(request.json, list) else request.json + logger.debug(f"Received add_job JSON: {jobs_list}") elif request.form.get('json', None): jobs_list = json.loads(request.form['json']) + logger.debug(f"Received add_job form: {jobs_list}") else: # Cleanup flat form data into nested structure form_dict = {k: v for k, v in dict(request.form).items() if v} @@ -301,6 +303,7 @@ def add_job_handler(): args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] + logger.debug(f"Received add_job data: {jobs_list}") except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) diff --git a/src/engines/blender/blender_worker.py b/src/engines/blender/blender_worker.py index 7c10a88..8128d0c 100644 --- a/src/engines/blender/blender_worker.py +++ b/src/engines/blender/blender_worker.py @@ -28,7 +28,6 @@ class BlenderRenderWorker(BaseRenderWorker): self.start_frame = int(self.scene_info.get('start_frame', 1)) self.end_frame = int(self.scene_info.get('end_frame', self.start_frame)) self.project_length = (self.end_frame - self.start_frame) + 1 - self.current_frame = -1 def generate_worker_subprocess(self): diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index 44eebfa..c3ccbf9 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -35,6 +35,7 @@ class BaseRenderWorker(Base): project_length = Column(Integer) start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) + current_frame = Column(Integer) parent = Column(String, nullable=True) children = Column(JSON) name = Column(String) @@ -75,7 +76,7 @@ class BaseRenderWorker(Base): # Frame Ranges self.project_length = -1 - self.current_frame = 0 # should this be a 1 ? + self.current_frame = -1 # negative indicates not started self.start_frame = 0 # should this be a 1 ? self.end_frame = None @@ -304,7 +305,6 @@ class BaseRenderWorker(Base): 'children': self.children, 'date_created': self.date_created, 'start_time': self.start_time, - 'end_time': self.end_time, 'status': self.status.value, 'file_hash': self.file_hash, 'percent_complete': self.percent_complete(), @@ -314,6 +314,7 @@ class BaseRenderWorker(Base): 'errors': getattr(self, 'errors', None), 'start_frame': self.start_frame, 'end_frame': self.end_frame, + 'current_frame': self.current_frame, 'total_frames': self.total_frames, 'last_output': getattr(self, 'last_output', None), 'log_path': self.log_path() diff --git a/src/engines/ffmpeg/ffmpeg_worker.py b/src/engines/ffmpeg/ffmpeg_worker.py index 687d35b..7d69529 100644 --- a/src/engines/ffmpeg/ffmpeg_worker.py +++ b/src/engines/ffmpeg/ffmpeg_worker.py @@ -19,7 +19,6 @@ class FFMPEGRenderWorker(BaseRenderWorker): "/dev/null"], stderr=subprocess.STDOUT).decode('utf-8') found_frames = re.findall('frame=\s*(\d+)', stream_info) self.project_length = found_frames[-1] if found_frames else '-1' - self.current_frame = -1 def generate_worker_subprocess(self): From 5b102a5ea46154ca18bfa3ca9526f054182e53c8 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 15:07:17 -0500 Subject: [PATCH 05/10] Added new_create_subjob method --- src/distributed_job_manager.py | 35 ++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index c5895b8..9c36bc0 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -306,16 +306,31 @@ class DistributedJobManager: # submission_results.items()] # todo: fix this @staticmethod - def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker): - subjob = job_data.copy() - subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" - subjob['parent'] = f"{worker.id}@{local_hostname}" - subjob['start_frame'] = server_data['frame_range'][0] - subjob['end_frame'] = server_data['frame_range'][-1] - logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" - f"{subjob['end_frame']} to {server_hostname}") - post_results = RenderServerProxy(server_hostname).post_job_to_server( - file_path=project_path, job_list=[subjob]) + def new_create_subjob(parent_job_id, remote_hostname, start_frame, end_frame): + """ + Create and post a subjob to a remote render server. + + Parameters: + - parent_job_id (str): ID of the parent job. + - remote_hostname (str): Remote server's hostname/address. + - start_frame (int): Starting frame of the subjob. + - end_frame (int): Ending frame of the subjob. + + Example: + new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100) + """ + logger.info(f"parentID: {parent_job_id}") + parent_job = RenderQueue.job_with_id(parent_job_id) + subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, + 'args': parent_job.args, 'output_path': parent_job.output_path, + 'engine_version': parent_job.renderer_version, 'start_frame': start_frame, + 'end_frame': end_frame, 'parent': f"{parent_job_id}@{socket.gethostname()}"} + + logger.debug(f"new subjob data: {subjob_data}") + + logger.debug(f"Creating subjob {start_frame}-{end_frame} for {remote_hostname}") + post_results = RenderServerProxy(remote_hostname).post_job_to_server( + file_path=parent_job.input_path, job_list=[subjob_data]) return post_results @staticmethod From fa0bdf807fef6001f1079130fb9fd7ff57eb3850 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 15:54:14 -0500 Subject: [PATCH 06/10] wait_for_subjobs rewrite --- src/distributed_job_manager.py | 80 ++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 9c36bc0..5444cb2 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -200,60 +200,66 @@ class DistributedJobManager: return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod - def wait_for_subjobs(cls, local_job): - logger.debug(f"Waiting for subjobs for job {local_job}") - local_job.status = RenderStatus.WAITING_FOR_SUBJOBS - statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] + def wait_for_subjobs(cls, parent_job): + logger.debug(f"Waiting for subjobs for job {parent_job}") + parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - def subjobs_not_downloaded(): - return {k: v for k, v in local_job.children.items() if 'download_status' not in v or - v['download_status'] == 'working' or v['download_status'] is None} + server_proxys = {} + def incomplete_subjobs(): + incomplete = {} + for child in parent_job.children.keys(): + subjob_id, subjob_hostname = child.split('@') + if not server_proxys.get(subjob_hostname): + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + proxy = server_proxys.get(subjob_hostname) + job_info = proxy.get_job_info(subjob_id) + if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']: + incomplete[child] = job_info + return incomplete - logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}') + logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}') + while len(incomplete_subjobs()): - while len(subjobs_not_downloaded()): - for child_key, subjob_cached_data in subjobs_not_downloaded().items(): + for child_key, subjob_cached_data in incomplete_subjobs().items(): subjob_id = child_key.split('@')[0] subjob_hostname = child_key.split('@')[-1] + if not server_proxys.get(subjob_hostname): + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + # Fetch info from server and handle failing case - subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id) + subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id) if not subjob_data: - logger.warning(f"No response from: {subjob_hostname}") - # todo: handle timeout / missing server situations + last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact + logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}") + + last_connection_max_time = 12 + if last_connection.seconds > last_connection_max_time: + # after a certain amount of time offline, consider the child offline + logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") + logger.warning(f"Spinning up new subjob to replace offlined server") + parent_job.children[child_key]['errors'] = ['Renderer went offline'] + + # schedule a new job to replace the failed one + local_hostname = socket.gethostname() + cls.new_create_subjob(parent_job.id, local_hostname, + parent_job.children[child_key]['start_frame'], + parent_job.children[child_key]['end_frame']) continue - # Update parent job cache but keep the download status - download_status = local_job.children[child_key].get('download_status', None) - local_job.children[child_key] = subjob_data - local_job.children[child_key]['download_status'] = download_status + # Update parent job cache + parent_job.children[child_key] = subjob_data + # percentage status = string_to_status(subjob_data.get('status', '')) status_msg = f"Subjob {child_key} | {status} | " \ f"{float(subjob_data.get('percent_complete')) * 100.0}%" logger.debug(status_msg) - # Still working in another thread - keep waiting - if download_status == 'working': - continue - - # Check if job is finished, but has not had files copied yet over yet - if download_status is None and subjob_data['file_count'] and status in statuses_to_download: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - logger.error("Failed to download from subjob") - # todo: error handling here - - # Any finished jobs not successfully downloaded at this point are skipped - if local_job.children[child_key].get('download_status', None) is None and \ - status in statuses_to_download: - logger.warning(f"Skipping waiting on downloading from subjob: {child_key}") - local_job.children[child_key]['download_status'] = 'skipped' - - if subjobs_not_downloaded(): - logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on " - f"{', '.join(list(subjobs_not_downloaded().keys()))}") + if incomplete_subjobs(): + logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on " + f"{', '.join(list(incomplete_subjobs().keys()))}") time.sleep(5) @classmethod From 0fe50bc1755778a888bbc5367e9a42b0ffb57064 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 18:53:41 -0500 Subject: [PATCH 07/10] Misc cleanup --- src/api/add_job_helpers.py | 2 +- src/distributed_job_manager.py | 64 ++++++++++++++------------------- src/engines/core/base_worker.py | 2 +- src/render_queue.py | 10 +++--- 4 files changed, 35 insertions(+), 43 deletions(-) diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py index 28280b3..39dd590 100644 --- a/src/api/add_job_helpers.py +++ b/src/api/add_job_helpers.py @@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) else: logger.debug("Not splitting into subjobs") + RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) if not worker.parent: from src.api.api_server import make_job_ready make_job_ready(worker.id) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 5444cb2..9791ffc 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -82,7 +82,7 @@ class DistributedJobManager: ) @classmethod - def handle_subjob_status_change(cls, local_job_id, subjob_data): + def handle_subjob_status_change(cls, parent_job_id, subjob_data): """ Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. @@ -93,54 +93,35 @@ class DistributedJobManager: Returns: None """ - local_job = RenderQueue.job_with_id(local_job_id) + parent_job = RenderQueue.job_with_id(parent_job_id) subjob_id = subjob_data['id'] - subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if + subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if hostname.split('@')[0] == subjob_id), None) subjob_key = f'{subjob_id}@{subjob_hostname}' # Update the local job's subjob data - local_job.children = dict(local_job.children) # copy as dict to work around sqlalchemy update issue - local_job.children[subjob_key] = subjob_data + parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue + parent_job.children[subjob_key] = subjob_data - logname = f"{local_job_id}:{subjob_key}" + logname = f"{parent_job_id}:{subjob_key}" subjob_status = string_to_status(subjob_data['status']) logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") # Handle downloading for completed, cancelled, or error'd subjobs if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and subjob_data['file_count']): - if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname): + if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname): logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") # Handle cancelled or errored subjobs by determining missing frames and scheduling a new job if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: - cls.handle_cancelled_or_errored_subjob(local_job, subjob_data) - - @classmethod - def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data): - """ - Handles cancelled or errored subjobs by determining missing frames and scheduling a new job. - - Parameters: - local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): Subjob data for the cancelled or errored subjob. - - Returns: - None - """ - # Determine missing frames based on subjob_data - missing_frames = cls.determine_missing_frames(subjob_data) - - if missing_frames: - # Schedule a new job with the missing frames - new_job_data = { - # todo: Set the necessary data for the new job - } - cls.__create_subjob() + logger.info("Creating a new subjob") + cls.new_create_subjob(parent_job.id, socket.gethostname(), + parent_job.children[subjob_key]['start_frame'], + parent_job.children[subjob_key]['end_frame']) @staticmethod - def determine_missing_frames(subjob_data): + def determine_missing_frames(parent_job_id): """ Determine missing frames in the subjob. @@ -175,13 +156,11 @@ class DistributedJobManager: # download zip file from server try: - local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.exception(f"Exception downloading files from remote server: {e}") - local_job.children[child_key]['download_status'] = 'failed' return False # extract zip @@ -192,15 +171,26 @@ class DistributedJobManager: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) - local_job.children[child_key]['download_status'] = 'complete' + return True except Exception as e: logger.exception(f"Exception extracting zip file: {e}") - local_job.children[child_key]['download_status'] = 'failed' + return False - return local_job.children[child_key].get('download_status', None) == 'complete' @classmethod def wait_for_subjobs(cls, parent_job): + """ + Wait for subjobs to complete and update the parent job's status. + + This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error + status. It updates the parent job's children with the latest subjob information. + + Parameters: + parent_job (BaseRenderWorker): The parent job worker. + + Returns: + None + """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS @@ -330,7 +320,7 @@ class DistributedJobManager: subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, 'args': parent_job.args, 'output_path': parent_job.output_path, 'engine_version': parent_job.renderer_version, 'start_frame': start_frame, - 'end_frame': end_frame, 'parent': f"{parent_job_id}@{socket.gethostname()}"} + 'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"} logger.debug(f"new subjob data: {subjob_data}") diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index c3ccbf9..3902b6c 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -237,7 +237,7 @@ class BaseRenderWorker(Base): if self.children: from src.distributed_job_manager import DistributedJobManager - DistributedJobManager.wait_for_subjobs(local_job=self) + DistributedJobManager.wait_for_subjobs(parent_job=self) # Post Render Work logger.debug("Starting post-processing work") diff --git a/src/render_queue.py b/src/render_queue.py index 2a081b7..702ce5e 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -65,10 +65,12 @@ class RenderQueue: @classmethod def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) - if not found_job and not none_ok: - raise JobNotFoundError(job_id) - return found_job + for job in cls.all_jobs(): + if job.id == job_id: + return job + if not none_ok: + raise JobNotFoundError(f"Cannot find job with id: {job_id}") + return None @classmethod def clear_history(cls): From b646c1f84808b3fd7fe1bf6ed11f61a0b5612811 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 18:54:36 -0500 Subject: [PATCH 08/10] Add last connected to server_proxy.py --- src/api/server_proxy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 295e0ed..bfa9a55 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -4,6 +4,7 @@ import os import socket import threading import time +from datetime import datetime import requests from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor @@ -33,6 +34,9 @@ class RenderServerProxy: self.__background_thread = None self.__offline_flags = 0 self.update_cadence = 5 + self.last_contact = datetime.now() + # to prevent errors, the last contact datetime is set to when the class is initialized - you must keep an + # instance of this class alive to accurately know the delay def connect(self): status = self.request_data('status') @@ -55,6 +59,7 @@ class RenderServerProxy: req = self.request(payload, timeout) if req.ok and req.status_code == 200: self.__offline_flags = 0 + self.last_contact = datetime.now() return req.json() except json.JSONDecodeError as e: logger.debug(f"JSON decode error: {e}") From 3b975418dec18e111e132297a5e26619acd756d5 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 18:59:01 -0500 Subject: [PATCH 09/10] Updated wait_for_subjobs --- src/distributed_job_manager.py | 72 +++++++++++++++++----------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 9791ffc..e9575f4 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -195,62 +195,62 @@ class DistributedJobManager: parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS server_proxys = {} - def incomplete_subjobs(): - incomplete = {} - for child in parent_job.children.keys(): - subjob_id, subjob_hostname = child.split('@') - if not server_proxys.get(subjob_hostname): - server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) - proxy = server_proxys.get(subjob_hostname) - job_info = proxy.get_job_info(subjob_id) - if job_info and job_info.get('status') not in ['completed', 'cancelled', 'error']: - incomplete[child] = job_info - return incomplete - logger.info(f'Waiting on {len(incomplete_subjobs())} subjobs for {parent_job.id}') - while len(incomplete_subjobs()): + def fetch_subjob_info(child_key): + """ + Fetch subjob information from the remote server using a RenderServerProxy. - for child_key, subjob_cached_data in incomplete_subjobs().items(): + Parameters: + child_key (str): The key representing the subjob. - subjob_id = child_key.split('@')[0] - subjob_hostname = child_key.split('@')[-1] + Returns: + dict: Subjob information. + """ + subjob_id, subjob_hostname = child_key.split('@') + if subjob_hostname not in server_proxys: + server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + return server_proxys[subjob_hostname].get_job_info(subjob_id) - if not server_proxys.get(subjob_hostname): - server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname) + while True: + incomplete_jobs = {} + + for child_key in list( + parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration + subjob_data = fetch_subjob_info(child_key) - # Fetch info from server and handle failing case - subjob_data = server_proxys[subjob_hostname].get_job_info(subjob_id) if not subjob_data: + subjob_id, subjob_hostname = child_key.split('@') last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}") last_connection_max_time = 12 if last_connection.seconds > last_connection_max_time: - # after a certain amount of time offline, consider the child offline - logger.error(f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") - logger.warning(f"Spinning up new subjob to replace offlined server") + logger.error( + f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed") + logger.warning(f"Spinning up a new subjob to replace the offlined server") parent_job.children[child_key]['errors'] = ['Renderer went offline'] + parent_job.children[child_key]['status'] = RenderStatus.ERROR - # schedule a new job to replace the failed one - local_hostname = socket.gethostname() - cls.new_create_subjob(parent_job.id, local_hostname, - parent_job.children[child_key]['start_frame'], - parent_job.children[child_key]['end_frame']) + cls.handle_subjob_status_change(parent_job_id=parent_job.id, + subjob_data=parent_job.children[child_key]) continue - # Update parent job cache parent_job.children[child_key] = subjob_data - # percentage status = string_to_status(subjob_data.get('status', '')) - status_msg = f"Subjob {child_key} | {status} | " \ - f"{float(subjob_data.get('percent_complete')) * 100.0}%" + status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%" logger.debug(status_msg) - if incomplete_subjobs(): - logger.debug(f"Waiting on {len(incomplete_subjobs())} subjobs on " - f"{', '.join(list(incomplete_subjobs().keys()))}") - time.sleep(5) + if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]: + incomplete_jobs[child_key] = subjob_data + + if incomplete_jobs: + logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}") + else: + logger.debug("No more incomplete subjobs") + if not cls.completion_hold_enabled: + break + time.sleep(5) @classmethod def split_into_subjobs(cls, worker, job_data, project_path): From 80ffda84475befdde6557b8309259bc2ec2361c9 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 19:00:41 -0500 Subject: [PATCH 10/10] Split_into_subjobs WIP --- src/distributed_job_manager.py | 71 ++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index e9575f4..ac81c38 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -1,3 +1,4 @@ +import datetime import logging import os import socket @@ -119,6 +120,7 @@ class DistributedJobManager: cls.new_create_subjob(parent_job.id, socket.gethostname(), parent_job.children[subjob_key]['start_frame'], parent_job.children[subjob_key]['end_frame']) + # todo: determine why we don't wait for the new subjobs we create when replacing an error'd job @staticmethod def determine_missing_frames(parent_job_id): @@ -193,7 +195,6 @@ class DistributedJobManager: """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - server_proxys = {} def fetch_subjob_info(child_key): @@ -253,51 +254,60 @@ class DistributedJobManager: time.sleep(5) @classmethod - def split_into_subjobs(cls, worker, job_data, project_path): + def split_into_subjobs(cls, parent_worker, job_data, project_path): # Check availability - available_servers = cls.find_available_servers(worker.renderer) + available_servers = cls.find_available_servers(parent_worker.renderer) logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") - subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) + subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) local_hostname = socket.gethostname() # Prep and submit these sub-jobs - logger.info(f"Job {worker.id} split plan: {subjob_servers}") + simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges] + logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}") try: - for server_data in subjob_servers: - server_hostname = server_data['hostname'] + + # setup parent render job first - truncate frames + local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0] + parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame) + parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame) + logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}") + RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue + + # setup remote subjobs + submission_results = {} + for subjob_server_data in subjob_frame_ranges: + server_hostname = subjob_server_data['hostname'] if server_hostname != local_hostname: - post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, - server_hostname, worker) + post_results = cls.new_create_subjob(parent_worker.id, server_hostname, + subjob_server_data['frame_range'][0], + subjob_server_data['frame_range'][-1]) + if post_results.ok: - server_data['submission_results'] = post_results.json()[0] + subjob_server_data['submission_results'] = post_results.json()[0] else: logger.error(f"Failed to create subjob on {server_hostname}") break else: - # truncate parent render_job - worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) - worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) - logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") - server_data['submission_results'] = worker.json() + subjob_server_data['submission_results'] = [True] # check that job posts were all successful. - if not all(d.get('submission_results') is not None for d in subjob_servers): - # todo: rewrite this code - should not have to have all submissions go through - raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs + # if not all(d.get('submission_results') is not None for d in subjob_frame_ranges): + # # todo: rewrite this code - should not have to have all submissions go through + # raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # start subjobs - logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") - for server_data in subjob_servers: - if server_data['hostname'] != local_hostname: - child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" - worker.children[child_key] = server_data['submission_results'] - worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" + logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs") + for subjob_server_data in subjob_frame_ranges: + if subjob_server_data['hostname'] != local_hostname: + child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}" + parent_worker.children[child_key] = subjob_server_data['submission_results'] + parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]" except Exception as e: # cancel all the subjobs - logger.error(f"Failed to split job into subjobs: {e}") - logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") + logger.exception(f"Failed to split job into subjobs: {e}") + logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs") # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in # submission_results.items()] # todo: fix this @@ -316,17 +326,20 @@ class DistributedJobManager: new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100) """ logger.info(f"parentID: {parent_job_id}") + local_hostname = socket.gethostname() parent_job = RenderQueue.job_with_id(parent_job_id) subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, 'args': parent_job.args, 'output_path': parent_job.output_path, 'engine_version': parent_job.renderer_version, 'start_frame': start_frame, 'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"} - logger.debug(f"new subjob data: {subjob_data}") - - logger.debug(f"Creating subjob {start_frame}-{end_frame} for {remote_hostname}") + logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] " + f"for {remote_hostname}") post_results = RenderServerProxy(remote_hostname).post_job_to_server( file_path=parent_job.input_path, job_list=[subjob_data]) + post_results_json = post_results.json()[0] + + parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json return post_results @staticmethod