Rename methods and add docstrings to distributed_job_manager.py

This commit is contained in:
Brett Williams
2024-08-20 23:16:48 -05:00
parent 429d6bf0e9
commit 400a9e997b

View File

@@ -129,12 +129,12 @@ class DistributedJobManager:
# -------------------------------------------- # --------------------------------------------
@classmethod @classmethod
def create_render_job(cls, job_data, loaded_project_local_path): def create_render_job(cls, new_job_attributes, loaded_project_local_path):
"""Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new
render job. render job.
Args: Args:
job_data (dict): Job data. new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
loaded_project_local_path (str): The local path to the loaded project. loaded_project_local_path (str): The local path to the loaded project.
Returns: Returns:
@@ -142,7 +142,7 @@ class DistributedJobManager:
""" """
# get new output path in output_dir # get new output path in output_dir
output_path = job_data.get('output_path') output_path = new_job_attributes.get('output_path')
if not output_path: if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path) loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0] output_filename = os.path.splitext(loaded_project_filename)[0]
@@ -156,27 +156,27 @@ class DistributedJobManager:
logger.debug(f"New job output path: {output_path}") logger.debug(f"New job output path: {output_path}")
# create & configure jobs # create & configure jobs
worker = EngineManager.create_worker(renderer=job_data['renderer'], worker = EngineManager.create_worker(renderer=new_job_attributes['renderer'],
input_path=loaded_project_local_path, input_path=loaded_project_local_path,
output_path=output_path, output_path=output_path,
engine_version=job_data.get('engine_version'), engine_version=new_job_attributes.get('engine_version'),
args=job_data.get('args', {}), args=new_job_attributes.get('args', {}),
parent=job_data.get('parent'), parent=new_job_attributes.get('parent'),
name=job_data.get('name')) name=new_job_attributes.get('name'))
worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary? worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary?
worker.priority = int(job_data.get('priority', worker.priority)) worker.priority = int(new_job_attributes.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname() worker.hostname = socket.gethostname()
# determine if we can / should split the job # determine if we can / should split the job
if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path)
else: else:
worker.status = RenderStatus.NOT_STARTED worker.status = RenderStatus.NOT_STARTED
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) RenderQueue.add_to_render_queue(worker, force_start=new_job_attributes.get('force_start', False))
PreviewManager.update_previews_for_job(worker) PreviewManager.update_previews_for_job(worker)
return worker return worker
@@ -285,15 +285,15 @@ class DistributedJobManager:
# -------------------------------------------- # --------------------------------------------
@classmethod @classmethod
def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None): def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None):
# todo: I don't love this # todo: I don't love this
parent_worker.status = RenderStatus.CONFIGURING parent_worker.status = RenderStatus.CONFIGURING
cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data, cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes,
project_path, system_os)) project_path, system_os))
cls.background_worker.start() cls.background_worker.start()
@classmethod @classmethod
def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None): def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None):
""" """
Splits a job into subjobs and distributes them among available servers. Splits a job into subjobs and distributes them among available servers.
@@ -302,10 +302,10 @@ class DistributedJobManager:
subjob. subjob.
Args: Args:
parent_worker (Worker): The worker that is handling the job. parent_worker (Worker): The parent job what we're creating the subjobs for.
job_data (dict): The data for the job to be split. new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
project_path (str): The path to the project associated with the job. project_path (str): The path to the project.
system_os (str, optional): The operating system of the servers. Default is any OS. system_os (str, optional): Required OS. Default is any.
specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. specific_servers (list, optional): List of specific servers to split work between. Defaults to all found.
""" """
@@ -326,7 +326,7 @@ class DistributedJobManager:
try: try:
for subjob_data in all_subjob_server_data: for subjob_data in all_subjob_server_data:
subjob_hostname = subjob_data['hostname'] subjob_hostname = subjob_data['hostname']
post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname,
parent_worker) parent_worker)
if not post_results.ok: if not post_results.ok:
ValueError(f"Failed to create subjob on {subjob_hostname}") ValueError(f"Failed to create subjob on {subjob_hostname}")
@@ -347,8 +347,9 @@ class DistributedJobManager:
RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
@staticmethod @staticmethod
def __create_subjob(job_data, project_path, server_data, server_hostname, parent_worker): def __create_subjob(new_job_attributes, project_path, server_data, server_hostname, parent_worker):
subjob = job_data.copy() """Convenience method to create subjobs for a parent worker"""
subjob = new_job_attributes.copy()
subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}"
subjob['start_frame'] = server_data['frame_range'][0] subjob['start_frame'] = server_data['frame_range'][0]