diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 39ec380..ee6d2f7 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -129,12 +129,12 @@ class DistributedJobManager: # -------------------------------------------- @classmethod - def create_render_job(cls, job_data, loaded_project_local_path): + def create_render_job(cls, new_job_attributes, loaded_project_local_path): """Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new render job. Args: - job_data (dict): Job data. + new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) loaded_project_local_path (str): The local path to the loaded project. Returns: @@ -142,7 +142,7 @@ class DistributedJobManager: """ # get new output path in output_dir - output_path = job_data.get('output_path') + output_path = new_job_attributes.get('output_path') if not output_path: loaded_project_filename = os.path.basename(loaded_project_local_path) output_filename = os.path.splitext(loaded_project_filename)[0] @@ -156,27 +156,27 @@ class DistributedJobManager: logger.debug(f"New job output path: {output_path}") # create & configure jobs - worker = EngineManager.create_worker(renderer=job_data['renderer'], + worker = EngineManager.create_worker(renderer=new_job_attributes['renderer'], input_path=loaded_project_local_path, output_path=output_path, - engine_version=job_data.get('engine_version'), - args=job_data.get('args', {}), - parent=job_data.get('parent'), - name=job_data.get('name')) - worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary? - worker.priority = int(job_data.get('priority', worker.priority)) - worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) - worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) + engine_version=new_job_attributes.get('engine_version'), + args=new_job_attributes.get('args', {}), + parent=new_job_attributes.get('parent'), + name=new_job_attributes.get('name')) + worker.status = new_job_attributes.get("initial_status", worker.status) # todo: is this necessary? + worker.priority = int(new_job_attributes.get('priority', worker.priority)) + worker.start_frame = int(new_job_attributes.get("start_frame", worker.start_frame)) + worker.end_frame = int(new_job_attributes.get("end_frame", worker.end_frame)) worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() # determine if we can / should split the job - if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: - cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path) + if new_job_attributes.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent: + cls.split_into_subjobs_async(worker, new_job_attributes, loaded_project_local_path) else: worker.status = RenderStatus.NOT_STARTED - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + RenderQueue.add_to_render_queue(worker, force_start=new_job_attributes.get('force_start', False)) PreviewManager.update_previews_for_job(worker) return worker @@ -285,15 +285,15 @@ class DistributedJobManager: # -------------------------------------------- @classmethod - def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None): + def split_into_subjobs_async(cls, parent_worker, new_job_attributes, project_path, system_os=None): # todo: I don't love this parent_worker.status = RenderStatus.CONFIGURING - cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data, + cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, new_job_attributes, project_path, system_os)) cls.background_worker.start() @classmethod - def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None): + def split_into_subjobs(cls, parent_worker, new_job_attributes, project_path, system_os=None, specific_servers=None): """ Splits a job into subjobs and distributes them among available servers. @@ -302,10 +302,10 @@ class DistributedJobManager: subjob. Args: - parent_worker (Worker): The worker that is handling the job. - job_data (dict): The data for the job to be split. - project_path (str): The path to the project associated with the job. - system_os (str, optional): The operating system of the servers. Default is any OS. + parent_worker (Worker): The parent job what we're creating the subjobs for. + new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc) + project_path (str): The path to the project. + system_os (str, optional): Required OS. Default is any. specific_servers (list, optional): List of specific servers to split work between. Defaults to all found. """ @@ -326,7 +326,7 @@ class DistributedJobManager: try: for subjob_data in all_subjob_server_data: subjob_hostname = subjob_data['hostname'] - post_results = cls.__create_subjob(job_data, project_path, subjob_data, subjob_hostname, + post_results = cls.__create_subjob(new_job_attributes, project_path, subjob_data, subjob_hostname, parent_worker) if not post_results.ok: ValueError(f"Failed to create subjob on {subjob_hostname}") @@ -347,8 +347,9 @@ class DistributedJobManager: RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True) @staticmethod - def __create_subjob(job_data, project_path, server_data, server_hostname, parent_worker): - subjob = job_data.copy() + def __create_subjob(new_job_attributes, project_path, server_data, server_hostname, parent_worker): + """Convenience method to create subjobs for a parent worker""" + subjob = new_job_attributes.copy() subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{parent_worker.id}@{parent_worker.hostname}" subjob['start_frame'] = server_data['frame_range'][0]