From 4df41a2079360197acf8a4369affaa603e46d908 Mon Sep 17 00:00:00 2001 From: Brett Date: Sun, 4 Aug 2024 21:30:10 -0500 Subject: [PATCH] Download frames from subjobs as frames are completed (#87) * Add a frame complete notification to BaseWorker and distributed_job_manager.py * Add API to download individual files to API server and ServerProxy * Rename subjob notification API and add download_missing_frames_from_subjob * Subjobs will now notify parent when a frame is complete * Fix missed rename * Add some misc logging * Better error handling * Fix frame download file path issue * Download missing frames at job completion and misc cleanup * Misc cleanup * Code cleanup --- src/api/api_server.py | 33 +++++-- src/api/preview_manager.py | 32 ++++--- src/api/server_proxy.py | 26 ++++-- src/distributed_job_manager.py | 119 ++++++++++++++++++-------- src/engines/blender/blender_worker.py | 7 +- src/engines/core/base_worker.py | 6 +- 6 files changed, 150 insertions(+), 73 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index d358874..838e19d 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -134,12 +134,12 @@ def filtered_jobs_json(status_val): return f'Cannot find jobs with status {status_val}', 400 -@server.post('/api/job//notify_parent_of_status_change') -def subjob_status_change(job_id): +@server.post('/api/job//send_subjob_update_notification') +def subjob_update_notification(job_id): try: subjob_details = request.json logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") - DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) return Response(status=200) except JobNotFoundError: return "Job not found", 404 @@ -168,7 +168,22 @@ def get_job_logs(job_id): @server.get('/api/job//file_list') def get_file_list(job_id): - return RenderQueue.job_with_id(job_id).file_list() + return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()] + + +@server.route('/api/job//download') +def download_file(job_id): + + requested_filename = request.args.get('filename') + if not requested_filename: + return 'Filename required', 400 + + found_job = RenderQueue.job_with_id(job_id) + for job_filename in found_job.file_list(): + if os.path.basename(job_filename).lower() == requested_filename.lower(): + return send_file(job_filename, as_attachment=True, ) + + return f"File '{requested_filename}' not found", 404 @server.route('/api/job//download_all') @@ -293,7 +308,10 @@ def delete_job(job_id): if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) - PreviewManager.delete_previews_for_job(found_job) + try: + PreviewManager.delete_previews_for_job(found_job) + except Exception as e: + logger.error(f"Error deleting previews for {found_job}: {e}") # See if we own the project_dir (i.e. was it uploaded) project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) @@ -473,7 +491,10 @@ def get_disk_benchmark(): def start_server(): def eval_loop(delay_sec=1): while True: - RenderQueue.evaluate_queue() + try: + RenderQueue.evaluate_queue() + except Exception as e: + logger.error(f"Uncaught error while evaluating queue: {e}") time.sleep(delay_sec) try: diff --git a/src/api/preview_manager.py b/src/api/preview_manager.py index 9a9b2d8..7d6c7b1 100644 --- a/src/api/preview_manager.py +++ b/src/api/preview_manager.py @@ -33,6 +33,7 @@ class PreviewManager: logger.warning(f"No valid image or video files found in files from job: {job}") return + os.makedirs(cls.storage_path, exist_ok=True) base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") preview_video_path = base_path + '.mp4' preview_image_path = base_path + '.jpg' @@ -80,21 +81,24 @@ class PreviewManager: @classmethod def get_previews_for_job(cls, job): - directory_path = Path(cls.storage_path) - preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] - results = {} - for preview_filename in preview_files_for_job: - try: - pixel_width = str(preview_filename).split('-')[-1] - preview_label = str(os.path.basename(preview_filename)).split('-')[1] - extension = os.path.splitext(preview_filename)[-1].lower() - kind = 'video' if extension in supported_video_formats else \ - 'image' if extension in supported_image_formats else 'unknown' - results[preview_label] = results.get(preview_label, []) - results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind}) - except IndexError: # ignore invalid filenames - pass + try: + directory_path = Path(cls.storage_path) + preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)] + + for preview_filename in preview_files_for_job: + try: + pixel_width = str(preview_filename).split('-')[-1] + preview_label = str(os.path.basename(preview_filename)).split('-')[1] + extension = os.path.splitext(preview_filename)[-1].lower() + kind = 'video' if extension in supported_video_formats else \ + 'image' if extension in supported_image_formats else 'unknown' + results[preview_label] = results.get(preview_label, []) + results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind}) + except IndexError: # ignore invalid filenames + pass + except FileNotFoundError: + pass return results @classmethod diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 42cff23..6c8fcc1 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -171,19 +171,19 @@ class RenderServerProxy: def get_all_engines(self): return self.request_data('all_engines') - def notify_parent_of_status_change(self, parent_id, subjob): + def send_subjob_update_notification(self, parent_id, subjob): """ - Notifies the parent job of a status change in a subjob. + Notifies the parent job of an update in a subjob. Args: parent_id (str): The ID of the parent job. - subjob (Job): The subjob that has changed status. + subjob (Job): The subjob that has updated. Returns: Response: The response from the server. """ hostname = LOOPBACK if self.is_localhost else self.hostname - return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', + return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification', json=subjob.json()) def post_job_to_server(self, file_path, job_list, callback=None): @@ -232,19 +232,27 @@ class RenderServerProxy: except Exception as e: logger.error(f"An error occurred: {e}") - def get_job_files(self, job_id, save_path): + def get_job_files_list(self, job_id): + return self.request_data(f"job/{job_id}/file_list") + + def download_all_job_files(self, job_id, save_path): hostname = LOOPBACK if self.is_localhost else self.hostname url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all" - return self.download_file(url, filename=save_path) + return self.__download_file_from_url(url, output_filepath=save_path) + + def download_job_file(self, job_id, job_filename, save_path): + hostname = LOOPBACK if self.is_localhost else self.hostname + url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}" + return self.__download_file_from_url(url, output_filepath=save_path) @staticmethod - def download_file(url, filename): + def __download_file_from_url(url, output_filepath): with requests.get(url, stream=True) as r: r.raise_for_status() - with open(filename, 'wb') as f: + with open(output_filepath, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) - return filename + return output_filepath # --- Renderer --- # diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 1350362..014f6cf 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -33,6 +33,43 @@ class DistributedJobManager: This should be called once, typically during the initialization phase. """ pub.subscribe(cls.__local_job_status_changed, 'status_change') + pub.subscribe(cls.__local_job_frame_complete, 'frame_complete') + + @classmethod + def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5): + + """ + Responds to the 'frame_complete' pubsub message for local jobs. + + Parameters: + job_id (str): The ID of the job that has changed status. + old_status (str): The previous status of the job. + new_status (str): The new (current) status of the job. + + Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message. + """ + + render_job = RenderQueue.job_with_id(job_id, none_ok=True) + if not render_job: # ignore jobs not in the queue + return + + logger.debug(f"Job {job_id} has completed frame #{frame_number}") + replace_existing_previews = (frame_number % update_interval) == 0 + cls.__job_update_shared(render_job, replace_existing_previews) + + @classmethod + def __job_update_shared(cls, render_job, replace_existing_previews=False): + # update previews + PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews) + + # notify parent to allow individual frames to be copied instead of waiting until the end + if render_job.parent: + parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] + try: + logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}') + RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job) + except Exception as e: + logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}") @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): @@ -54,20 +91,14 @@ class DistributedJobManager: logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") - # Update previews - Force replace if job has completed successfully - PreviewManager.update_previews_for_job(render_job, - replace_existing=(render_job.status == RenderStatus.COMPLETED)) + cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED)) - # Notify parent if necessary - if render_job.parent: # If local job is a subjob from a remote server - parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] - RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job) - - # Cancelling children if necessary - if render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: - for child in render_job.children: - child_id, hostname = child.split('@') - RenderServerProxy(hostname).cancel_job(child_id, confirm=True) + # Handle children + if render_job.children: + if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary + for child in render_job.children: + child_id, child_hostname = child.split('@') + RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True) # UI Notifications try: @@ -159,9 +190,9 @@ class DistributedJobManager: # -------------------------------------------- @classmethod - def handle_subjob_status_change(cls, local_job, subjob_data): + def handle_subjob_update_notification(cls, local_job, subjob_data): """ - Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. + Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob. Args: local_job (BaseRenderWorker): The local parent job worker. @@ -170,27 +201,40 @@ class DistributedJobManager: subjob_status = string_to_status(subjob_data['status']) subjob_id = subjob_data['id'] - subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if - hostname.split('@')[0] == subjob_id), None) - local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data + subjob_hostname = subjob_data['hostname'] + subjob_key = f'{subjob_id}@{subjob_hostname}' + old_status = local_job.children.get(subjob_key, {}).get('status') + local_job.children[subjob_key] = subjob_data - logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" - logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") + logname = f"" + if old_status != subjob_status.value: + logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") - # Download complete or partial render jobs - if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \ - subjob_data['file_count']: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - # todo: handle error - logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}") - - if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR: - # todo: determine missing frames and schedule new job - pass + cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) @staticmethod - def download_from_subjob(local_job, subjob_id, subjob_hostname): + def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname): + + try: + local_files = [os.path.basename(x) for x in local_job.file_list()] + subjob_proxy = RenderServerProxy(subjob_hostname) + subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or [] + + for subjob_filename in subjob_files: + if subjob_filename not in local_files: + logger.info(f"Missing file '{subjob_filename}' from {subjob_hostname}") + try: + local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename) + subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename, + save_path=local_save_path) + logger.debug(f'Downloaded successfully - {local_save_path}') + except Exception as e: + logger.error(f"Error downloading file '{subjob_filename}': {e}") + except Exception as e: + logger.exception(f'Uncaught exception while trying to download from subjob: {e}') + + @staticmethod + def download_all_from_subjob(local_job, subjob_id, subjob_hostname): """ Downloads and extracts files from a completed subjob on a remote server. @@ -211,7 +255,7 @@ class DistributedJobManager: try: local_job.children[child_key]['download_status'] = 'working' logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") - RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) + RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path) logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") except Exception as e: logger.error(f"Error downloading files from remote server: {e}") @@ -235,6 +279,7 @@ class DistributedJobManager: @classmethod def wait_for_subjobs(cls, local_job): + # todo: rewrite this method logger.debug(f"Waiting for subjobs for job {local_job}") local_job.status = RenderStatus.WAITING_FOR_SUBJOBS statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] @@ -274,10 +319,10 @@ class DistributedJobManager: # Check if job is finished, but has not had files copied yet over yet if download_status is None and subjob_data['file_count'] and status in statuses_to_download: - download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) - if not download_result: - logger.error("Failed to download from subjob") - # todo: error handling here + try: + cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname) + except Exception as e: + logger.error(f"Error downloading missing frames from subjob: {e}") # Any finished jobs not successfully downloaded at this point are skipped if local_job.children[child_key].get('download_status', None) is None and \ diff --git a/src/engines/blender/blender_worker.py b/src/engines/blender/blender_worker.py index b789388..da06a2e 100644 --- a/src/engines/blender/blender_worker.py +++ b/src/engines/blender/blender_worker.py @@ -110,6 +110,7 @@ class BlenderRenderWorker(BaseRenderWorker): output_file_number = output_filename_match.groups()[0] try: self.current_frame = int(output_file_number) + self._send_frame_complete_notification() except ValueError: pass elif render_stats_match: @@ -118,15 +119,15 @@ class BlenderRenderWorker(BaseRenderWorker): logger.info(f'Frame #{self.current_frame} - ' f'{frame_count} of {self.total_frames} completed in {time_completed} | ' f'Total Elapsed Time: {datetime.now() - self.start_time}') - else: - logger.debug(f'DEBUG: {line}') else: pass # if len(line.strip()): # logger.debug(line.strip()) def percent_complete(self): - if self.total_frames <= 1: + if self.status == RenderStatus.COMPLETED: + return 1 + elif self.total_frames <= 1: return self.__frame_percent_complete else: whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index 0e7e23b..ee3e00c 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -124,10 +124,8 @@ class BaseRenderWorker(Base): self._status = RenderStatus.CANCELLED.value return string_to_status(self._status) - def validate(self): - if not os.path.exists(self.input_path): - raise FileNotFoundError(f"Cannot find input path: {self.input_path}") - self.generate_subprocess() + def _send_frame_complete_notification(self): + pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame) def generate_subprocess(self): # Convert raw args from string if available and catch conflicts