Misc cleanup

This commit is contained in:
Brett Williams
2023-10-25 18:53:41 -05:00
parent fa0bdf807f
commit 0fe50bc175
4 changed files with 35 additions and 43 deletions

View File

@@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
from src.api.api_server import make_job_ready
make_job_ready(worker.id)

View File

@@ -82,7 +82,7 @@ class DistributedJobManager:
)
@classmethod
def handle_subjob_status_change(cls, local_job_id, subjob_data):
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
"""
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
@@ -93,54 +93,35 @@ class DistributedJobManager:
Returns:
None
"""
local_job = RenderQueue.job_with_id(local_job_id)
parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_id = subjob_data['id']
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
hostname.split('@')[0] == subjob_id), None)
subjob_key = f'{subjob_id}@{subjob_hostname}'
# Update the local job's subjob data
local_job.children = dict(local_job.children) # copy as dict to work around sqlalchemy update issue
local_job.children[subjob_key] = subjob_data
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
parent_job.children[subjob_key] = subjob_data
logname = f"{local_job_id}:{subjob_key}"
logname = f"{parent_job_id}:{subjob_key}"
subjob_status = string_to_status(subjob_data['status'])
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
# Handle downloading for completed, cancelled, or error'd subjobs
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
and subjob_data['file_count']):
if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname):
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
cls.handle_cancelled_or_errored_subjob(local_job, subjob_data)
@classmethod
def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data):
"""
Handles cancelled or errored subjobs by determining missing frames and scheduling a new job.
Parameters:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): Subjob data for the cancelled or errored subjob.
Returns:
None
"""
# Determine missing frames based on subjob_data
missing_frames = cls.determine_missing_frames(subjob_data)
if missing_frames:
# Schedule a new job with the missing frames
new_job_data = {
# todo: Set the necessary data for the new job
}
cls.__create_subjob()
logger.info("Creating a new subjob")
cls.new_create_subjob(parent_job.id, socket.gethostname(),
parent_job.children[subjob_key]['start_frame'],
parent_job.children[subjob_key]['end_frame'])
@staticmethod
def determine_missing_frames(subjob_data):
def determine_missing_frames(parent_job_id):
"""
Determine missing frames in the subjob.
@@ -175,13 +156,11 @@ class DistributedJobManager:
# download zip file from server
try:
local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e:
logger.exception(f"Exception downloading files from remote server: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
# extract zip
@@ -192,15 +171,26 @@ class DistributedJobManager:
zip_ref.extractall(extract_path)
logger.info(f"Successfully extracted zip to: {extract_path}")
os.remove(zip_file_path)
local_job.children[child_key]['download_status'] = 'complete'
return True
except Exception as e:
logger.exception(f"Exception extracting zip file: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
return local_job.children[child_key].get('download_status', None) == 'complete'
@classmethod
def wait_for_subjobs(cls, parent_job):
"""
Wait for subjobs to complete and update the parent job's status.
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
status. It updates the parent job's children with the latest subjob information.
Parameters:
parent_job (BaseRenderWorker): The parent job worker.
Returns:
None
"""
logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
@@ -330,7 +320,7 @@ class DistributedJobManager:
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
'args': parent_job.args, 'output_path': parent_job.output_path,
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
'end_frame': end_frame, 'parent': f"{parent_job_id}@{socket.gethostname()}"}
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
logger.debug(f"new subjob data: {subjob_data}")

View File

@@ -237,7 +237,7 @@ class BaseRenderWorker(Base):
if self.children:
from src.distributed_job_manager import DistributedJobManager
DistributedJobManager.wait_for_subjobs(local_job=self)
DistributedJobManager.wait_for_subjobs(parent_job=self)
# Post Render Work
logger.debug("Starting post-processing work")

View File

@@ -65,10 +65,12 @@ class RenderQueue:
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
for job in cls.all_jobs():
if job.id == job_id:
return job
if not none_ok:
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
return None
@classmethod
def clear_history(cls):