mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Misc cleanup
This commit is contained in:
@@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
|
|||||||
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
|
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
|
||||||
else:
|
else:
|
||||||
logger.debug("Not splitting into subjobs")
|
logger.debug("Not splitting into subjobs")
|
||||||
|
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||||
|
|
||||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
|
||||||
if not worker.parent:
|
if not worker.parent:
|
||||||
from src.api.api_server import make_job_ready
|
from src.api.api_server import make_job_ready
|
||||||
make_job_ready(worker.id)
|
make_job_ready(worker.id)
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ class DistributedJobManager:
|
|||||||
logger.debug(f"Unable to show UI notification: {e}")
|
logger.debug(f"Unable to show UI notification: {e}")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def handle_subjob_status_change(cls, local_job_id, subjob_data):
|
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
|
||||||
"""
|
"""
|
||||||
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
||||||
|
|
||||||
@@ -96,54 +96,35 @@ class DistributedJobManager:
|
|||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
local_job = RenderQueue.job_with_id(local_job_id)
|
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||||
subjob_id = subjob_data['id']
|
subjob_id = subjob_data['id']
|
||||||
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
|
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
|
||||||
hostname.split('@')[0] == subjob_id), None)
|
hostname.split('@')[0] == subjob_id), None)
|
||||||
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
||||||
|
|
||||||
# Update the local job's subjob data
|
# Update the local job's subjob data
|
||||||
local_job.children = dict(local_job.children) # copy as dict to work around sqlalchemy update issue
|
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
|
||||||
local_job.children[subjob_key] = subjob_data
|
parent_job.children[subjob_key] = subjob_data
|
||||||
|
|
||||||
logname = f"{local_job_id}:{subjob_key}"
|
logname = f"{parent_job_id}:{subjob_key}"
|
||||||
subjob_status = string_to_status(subjob_data['status'])
|
subjob_status = string_to_status(subjob_data['status'])
|
||||||
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
||||||
|
|
||||||
# Handle downloading for completed, cancelled, or error'd subjobs
|
# Handle downloading for completed, cancelled, or error'd subjobs
|
||||||
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
|
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
|
||||||
and subjob_data['file_count']):
|
and subjob_data['file_count']):
|
||||||
if not cls.download_from_subjob(local_job, subjob_id, subjob_hostname):
|
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
|
||||||
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
||||||
|
|
||||||
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
|
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
|
||||||
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
||||||
cls.handle_cancelled_or_errored_subjob(local_job, subjob_data)
|
logger.info("Creating a new subjob")
|
||||||
|
cls.new_create_subjob(parent_job.id, socket.gethostname(),
|
||||||
@classmethod
|
parent_job.children[subjob_key]['start_frame'],
|
||||||
def handle_cancelled_or_errored_subjob(cls, local_job, subjob_data):
|
parent_job.children[subjob_key]['end_frame'])
|
||||||
"""
|
|
||||||
Handles cancelled or errored subjobs by determining missing frames and scheduling a new job.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
local_job (BaseRenderWorker): The local parent job worker.
|
|
||||||
subjob_data (dict): Subjob data for the cancelled or errored subjob.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
None
|
|
||||||
"""
|
|
||||||
# Determine missing frames based on subjob_data
|
|
||||||
missing_frames = cls.determine_missing_frames(subjob_data)
|
|
||||||
|
|
||||||
if missing_frames:
|
|
||||||
# Schedule a new job with the missing frames
|
|
||||||
new_job_data = {
|
|
||||||
# todo: Set the necessary data for the new job
|
|
||||||
}
|
|
||||||
cls.__create_subjob()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def determine_missing_frames(subjob_data):
|
def determine_missing_frames(parent_job_id):
|
||||||
"""
|
"""
|
||||||
Determine missing frames in the subjob.
|
Determine missing frames in the subjob.
|
||||||
|
|
||||||
@@ -178,13 +159,11 @@ class DistributedJobManager:
|
|||||||
|
|
||||||
# download zip file from server
|
# download zip file from server
|
||||||
try:
|
try:
|
||||||
local_job.children[child_key]['download_status'] = 'working'
|
|
||||||
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
||||||
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
||||||
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Exception downloading files from remote server: {e}")
|
logger.exception(f"Exception downloading files from remote server: {e}")
|
||||||
local_job.children[child_key]['download_status'] = 'failed'
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# extract zip
|
# extract zip
|
||||||
@@ -195,15 +174,26 @@ class DistributedJobManager:
|
|||||||
zip_ref.extractall(extract_path)
|
zip_ref.extractall(extract_path)
|
||||||
logger.info(f"Successfully extracted zip to: {extract_path}")
|
logger.info(f"Successfully extracted zip to: {extract_path}")
|
||||||
os.remove(zip_file_path)
|
os.remove(zip_file_path)
|
||||||
local_job.children[child_key]['download_status'] = 'complete'
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Exception extracting zip file: {e}")
|
logger.exception(f"Exception extracting zip file: {e}")
|
||||||
local_job.children[child_key]['download_status'] = 'failed'
|
return False
|
||||||
|
|
||||||
return local_job.children[child_key].get('download_status', None) == 'complete'
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def wait_for_subjobs(cls, parent_job):
|
def wait_for_subjobs(cls, parent_job):
|
||||||
|
"""
|
||||||
|
Wait for subjobs to complete and update the parent job's status.
|
||||||
|
|
||||||
|
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
|
||||||
|
status. It updates the parent job's children with the latest subjob information.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
parent_job (BaseRenderWorker): The parent job worker.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
||||||
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||||
|
|
||||||
@@ -333,7 +323,7 @@ class DistributedJobManager:
|
|||||||
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
||||||
'args': parent_job.args, 'output_path': parent_job.output_path,
|
'args': parent_job.args, 'output_path': parent_job.output_path,
|
||||||
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
||||||
'end_frame': end_frame, 'parent': f"{parent_job_id}@{socket.gethostname()}"}
|
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
|
||||||
|
|
||||||
logger.debug(f"new subjob data: {subjob_data}")
|
logger.debug(f"new subjob data: {subjob_data}")
|
||||||
|
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ class BaseRenderWorker(Base):
|
|||||||
|
|
||||||
if self.children:
|
if self.children:
|
||||||
from src.distributed_job_manager import DistributedJobManager
|
from src.distributed_job_manager import DistributedJobManager
|
||||||
DistributedJobManager.wait_for_subjobs(local_job=self)
|
DistributedJobManager.wait_for_subjobs(parent_job=self)
|
||||||
|
|
||||||
# Post Render Work
|
# Post Render Work
|
||||||
logger.debug("Starting post-processing work")
|
logger.debug("Starting post-processing work")
|
||||||
|
|||||||
@@ -65,10 +65,12 @@ class RenderQueue:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def job_with_id(cls, job_id, none_ok=False):
|
def job_with_id(cls, job_id, none_ok=False):
|
||||||
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
|
for job in cls.all_jobs():
|
||||||
if not found_job and not none_ok:
|
if job.id == job_id:
|
||||||
raise JobNotFoundError(job_id)
|
return job
|
||||||
return found_job
|
if not none_ok:
|
||||||
|
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def clear_history(cls):
|
def clear_history(cls):
|
||||||
|
|||||||
Reference in New Issue
Block a user