Download frames from subjobs as frames are completed (#87)

* Add a frame complete notification to BaseWorker and distributed_job_manager.py

* Add API to download individual files to API server and ServerProxy

* Rename subjob notification API and add download_missing_frames_from_subjob

* Subjobs will now notify parent when a frame is complete

* Fix missed rename

* Add some misc logging

* Better error handling

* Fix frame download file path issue

* Download missing frames at job completion and misc cleanup

* Misc cleanup

* Code cleanup
This commit is contained in:
2024-08-04 21:30:10 -05:00
committed by GitHub
parent 1cdb7810bf
commit 4df41a2079
6 changed files with 150 additions and 73 deletions

View File

@@ -134,12 +134,12 @@ def filtered_jobs_json(status_val):
return f'Cannot find jobs with status {status_val}', 400 return f'Cannot find jobs with status {status_val}', 400
@server.post('/api/job/<job_id>/notify_parent_of_status_change') @server.post('/api/job/<job_id>/send_subjob_update_notification')
def subjob_status_change(job_id): def subjob_update_notification(job_id):
try: try:
subjob_details = request.json subjob_details = request.json
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
return Response(status=200) return Response(status=200)
except JobNotFoundError: except JobNotFoundError:
return "Job not found", 404 return "Job not found", 404
@@ -168,7 +168,22 @@ def get_job_logs(job_id):
@server.get('/api/job/<job_id>/file_list') @server.get('/api/job/<job_id>/file_list')
def get_file_list(job_id): def get_file_list(job_id):
return RenderQueue.job_with_id(job_id).file_list() return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()]
@server.route('/api/job/<job_id>/download')
def download_file(job_id):
requested_filename = request.args.get('filename')
if not requested_filename:
return 'Filename required', 400
found_job = RenderQueue.job_with_id(job_id)
for job_filename in found_job.file_list():
if os.path.basename(job_filename).lower() == requested_filename.lower():
return send_file(job_filename, as_attachment=True, )
return f"File '{requested_filename}' not found", 404
@server.route('/api/job/<job_id>/download_all') @server.route('/api/job/<job_id>/download_all')
@@ -293,7 +308,10 @@ def delete_job(job_id):
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
PreviewManager.delete_previews_for_job(found_job) try:
PreviewManager.delete_previews_for_job(found_job)
except Exception as e:
logger.error(f"Error deleting previews for {found_job}: {e}")
# See if we own the project_dir (i.e. was it uploaded) # See if we own the project_dir (i.e. was it uploaded)
project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
@@ -473,7 +491,10 @@ def get_disk_benchmark():
def start_server(): def start_server():
def eval_loop(delay_sec=1): def eval_loop(delay_sec=1):
while True: while True:
RenderQueue.evaluate_queue() try:
RenderQueue.evaluate_queue()
except Exception as e:
logger.error(f"Uncaught error while evaluating queue: {e}")
time.sleep(delay_sec) time.sleep(delay_sec)
try: try:

View File

@@ -33,6 +33,7 @@ class PreviewManager:
logger.warning(f"No valid image or video files found in files from job: {job}") logger.warning(f"No valid image or video files found in files from job: {job}")
return return
os.makedirs(cls.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}") base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4' preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg' preview_image_path = base_path + '.jpg'
@@ -80,21 +81,24 @@ class PreviewManager:
@classmethod @classmethod
def get_previews_for_job(cls, job): def get_previews_for_job(cls, job):
directory_path = Path(cls.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
results = {} results = {}
for preview_filename in preview_files_for_job: try:
try: directory_path = Path(cls.storage_path)
pixel_width = str(preview_filename).split('-')[-1] preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
extension = os.path.splitext(preview_filename)[-1].lower() for preview_filename in preview_files_for_job:
kind = 'video' if extension in supported_video_formats else \ try:
'image' if extension in supported_image_formats else 'unknown' pixel_width = str(preview_filename).split('-')[-1]
results[preview_label] = results.get(preview_label, []) preview_label = str(os.path.basename(preview_filename)).split('-')[1]
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind}) extension = os.path.splitext(preview_filename)[-1].lower()
except IndexError: # ignore invalid filenames kind = 'video' if extension in supported_video_formats else \
pass 'image' if extension in supported_image_formats else 'unknown'
results[preview_label] = results.get(preview_label, [])
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
except IndexError: # ignore invalid filenames
pass
except FileNotFoundError:
pass
return results return results
@classmethod @classmethod

View File

@@ -171,19 +171,19 @@ class RenderServerProxy:
def get_all_engines(self): def get_all_engines(self):
return self.request_data('all_engines') return self.request_data('all_engines')
def notify_parent_of_status_change(self, parent_id, subjob): def send_subjob_update_notification(self, parent_id, subjob):
""" """
Notifies the parent job of a status change in a subjob. Notifies the parent job of an update in a subjob.
Args: Args:
parent_id (str): The ID of the parent job. parent_id (str): The ID of the parent job.
subjob (Job): The subjob that has changed status. subjob (Job): The subjob that has updated.
Returns: Returns:
Response: The response from the server. Response: The response from the server.
""" """
hostname = LOOPBACK if self.is_localhost else self.hostname hostname = LOOPBACK if self.is_localhost else self.hostname
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification',
json=subjob.json()) json=subjob.json())
def post_job_to_server(self, file_path, job_list, callback=None): def post_job_to_server(self, file_path, job_list, callback=None):
@@ -232,19 +232,27 @@ class RenderServerProxy:
except Exception as e: except Exception as e:
logger.error(f"An error occurred: {e}") logger.error(f"An error occurred: {e}")
def get_job_files(self, job_id, save_path): def get_job_files_list(self, job_id):
return self.request_data(f"job/{job_id}/file_list")
def download_all_job_files(self, job_id, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all" url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
return self.download_file(url, filename=save_path) return self.__download_file_from_url(url, output_filepath=save_path)
def download_job_file(self, job_id, job_filename, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}"
return self.__download_file_from_url(url, output_filepath=save_path)
@staticmethod @staticmethod
def download_file(url, filename): def __download_file_from_url(url, output_filepath):
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
r.raise_for_status() r.raise_for_status()
with open(filename, 'wb') as f: with open(output_filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): for chunk in r.iter_content(chunk_size=8192):
f.write(chunk) f.write(chunk)
return filename return output_filepath
# --- Renderer --- # # --- Renderer --- #

View File

@@ -33,6 +33,43 @@ class DistributedJobManager:
This should be called once, typically during the initialization phase. This should be called once, typically during the initialization phase.
""" """
pub.subscribe(cls.__local_job_status_changed, 'status_change') pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
@classmethod
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
"""
Responds to the 'frame_complete' pubsub message for local jobs.
Parameters:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue
return
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews)
@classmethod
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try:
logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}')
RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job)
except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod @classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status): def __local_job_status_changed(cls, job_id, old_status, new_status):
@@ -54,20 +91,14 @@ class DistributedJobManager:
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
# Update previews - Force replace if job has completed successfully cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
PreviewManager.update_previews_for_job(render_job,
replace_existing=(render_job.status == RenderStatus.COMPLETED))
# Notify parent if necessary # Handle children
if render_job.parent: # If local job is a subjob from a remote server if render_job.children:
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1] if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job) for child in render_job.children:
child_id, child_hostname = child.split('@')
# Cancelling children if necessary RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
if render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
for child in render_job.children:
child_id, hostname = child.split('@')
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
# UI Notifications # UI Notifications
try: try:
@@ -159,9 +190,9 @@ class DistributedJobManager:
# -------------------------------------------- # --------------------------------------------
@classmethod @classmethod
def handle_subjob_status_change(cls, local_job, subjob_data): def handle_subjob_update_notification(cls, local_job, subjob_data):
""" """
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args: Args:
local_job (BaseRenderWorker): The local parent job worker. local_job (BaseRenderWorker): The local parent job worker.
@@ -170,27 +201,40 @@ class DistributedJobManager:
subjob_status = string_to_status(subjob_data['status']) subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id'] subjob_id = subjob_data['id']
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if subjob_hostname = subjob_data['hostname']
hostname.split('@')[0] == subjob_id), None) subjob_key = f'{subjob_id}@{subjob_hostname}'
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data old_status = local_job.children.get(subjob_key, {}).get('status')
local_job.children[subjob_key] = subjob_data
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" logname = f"<Parent: {local_job.id} | Child: {subjob_key}>"
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") if old_status != subjob_status.value:
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
# Download complete or partial render jobs cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
subjob_data['file_count']:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
# todo: handle error
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
# todo: determine missing frames and schedule new job
pass
@staticmethod @staticmethod
def download_from_subjob(local_job, subjob_id, subjob_hostname): def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
try:
local_files = [os.path.basename(x) for x in local_job.file_list()]
subjob_proxy = RenderServerProxy(subjob_hostname)
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
for subjob_filename in subjob_files:
if subjob_filename not in local_files:
logger.info(f"Missing file '{subjob_filename}' from {subjob_hostname}")
try:
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
save_path=local_save_path)
logger.debug(f'Downloaded successfully - {local_save_path}')
except Exception as e:
logger.error(f"Error downloading file '{subjob_filename}': {e}")
except Exception as e:
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
@staticmethod
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
""" """
Downloads and extracts files from a completed subjob on a remote server. Downloads and extracts files from a completed subjob on a remote server.
@@ -211,7 +255,7 @@ class DistributedJobManager:
try: try:
local_job.children[child_key]['download_status'] = 'working' local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e: except Exception as e:
logger.error(f"Error downloading files from remote server: {e}") logger.error(f"Error downloading files from remote server: {e}")
@@ -235,6 +279,7 @@ class DistributedJobManager:
@classmethod @classmethod
def wait_for_subjobs(cls, local_job): def wait_for_subjobs(cls, local_job):
# todo: rewrite this method
logger.debug(f"Waiting for subjobs for job {local_job}") logger.debug(f"Waiting for subjobs for job {local_job}")
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
@@ -274,10 +319,10 @@ class DistributedJobManager:
# Check if job is finished, but has not had files copied yet over yet # Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download: if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) try:
if not download_result: cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
logger.error("Failed to download from subjob") except Exception as e:
# todo: error handling here logger.error(f"Error downloading missing frames from subjob: {e}")
# Any finished jobs not successfully downloaded at this point are skipped # Any finished jobs not successfully downloaded at this point are skipped
if local_job.children[child_key].get('download_status', None) is None and \ if local_job.children[child_key].get('download_status', None) is None and \

View File

@@ -110,6 +110,7 @@ class BlenderRenderWorker(BaseRenderWorker):
output_file_number = output_filename_match.groups()[0] output_file_number = output_filename_match.groups()[0]
try: try:
self.current_frame = int(output_file_number) self.current_frame = int(output_file_number)
self._send_frame_complete_notification()
except ValueError: except ValueError:
pass pass
elif render_stats_match: elif render_stats_match:
@@ -118,15 +119,15 @@ class BlenderRenderWorker(BaseRenderWorker):
logger.info(f'Frame #{self.current_frame} - ' logger.info(f'Frame #{self.current_frame} - '
f'{frame_count} of {self.total_frames} completed in {time_completed} | ' f'{frame_count} of {self.total_frames} completed in {time_completed} | '
f'Total Elapsed Time: {datetime.now() - self.start_time}') f'Total Elapsed Time: {datetime.now() - self.start_time}')
else:
logger.debug(f'DEBUG: {line}')
else: else:
pass pass
# if len(line.strip()): # if len(line.strip()):
# logger.debug(line.strip()) # logger.debug(line.strip())
def percent_complete(self): def percent_complete(self):
if self.total_frames <= 1: if self.status == RenderStatus.COMPLETED:
return 1
elif self.total_frames <= 1:
return self.__frame_percent_complete return self.__frame_percent_complete
else: else:
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames

View File

@@ -124,10 +124,8 @@ class BaseRenderWorker(Base):
self._status = RenderStatus.CANCELLED.value self._status = RenderStatus.CANCELLED.value
return string_to_status(self._status) return string_to_status(self._status)
def validate(self): def _send_frame_complete_notification(self):
if not os.path.exists(self.input_path): pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame)
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self): def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts # Convert raw args from string if available and catch conflicts