From 80ffda84475befdde6557b8309259bc2ec2361c9 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 25 Oct 2023 19:00:41 -0500 Subject: [PATCH] Split_into_subjobs WIP --- src/distributed_job_manager.py | 71 ++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index e9575f4..ac81c38 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -1,3 +1,4 @@ +import datetime import logging import os import socket @@ -119,6 +120,7 @@ class DistributedJobManager: cls.new_create_subjob(parent_job.id, socket.gethostname(), parent_job.children[subjob_key]['start_frame'], parent_job.children[subjob_key]['end_frame']) + # todo: determine why we don't wait for the new subjobs we create when replacing an error'd job @staticmethod def determine_missing_frames(parent_job_id): @@ -193,7 +195,6 @@ class DistributedJobManager: """ logger.debug(f"Waiting for subjobs for job {parent_job}") parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS - server_proxys = {} def fetch_subjob_info(child_key): @@ -253,51 +254,60 @@ class DistributedJobManager: time.sleep(5) @classmethod - def split_into_subjobs(cls, worker, job_data, project_path): + def split_into_subjobs(cls, parent_worker, job_data, project_path): # Check availability - available_servers = cls.find_available_servers(worker.renderer) + available_servers = cls.find_available_servers(parent_worker.renderer) logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") - subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) + subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers) local_hostname = socket.gethostname() # Prep and submit these sub-jobs - logger.info(f"Job {worker.id} split plan: {subjob_servers}") + simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges] + logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}") try: - for server_data in subjob_servers: - server_hostname = server_data['hostname'] + + # setup parent render job first - truncate frames + local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0] + parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame) + parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame) + logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}") + RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue + + # setup remote subjobs + submission_results = {} + for subjob_server_data in subjob_frame_ranges: + server_hostname = subjob_server_data['hostname'] if server_hostname != local_hostname: - post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, - server_hostname, worker) + post_results = cls.new_create_subjob(parent_worker.id, server_hostname, + subjob_server_data['frame_range'][0], + subjob_server_data['frame_range'][-1]) + if post_results.ok: - server_data['submission_results'] = post_results.json()[0] + subjob_server_data['submission_results'] = post_results.json()[0] else: logger.error(f"Failed to create subjob on {server_hostname}") break else: - # truncate parent render_job - worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) - worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) - logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") - server_data['submission_results'] = worker.json() + subjob_server_data['submission_results'] = [True] # check that job posts were all successful. - if not all(d.get('submission_results') is not None for d in subjob_servers): - # todo: rewrite this code - should not have to have all submissions go through - raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs + # if not all(d.get('submission_results') is not None for d in subjob_frame_ranges): + # # todo: rewrite this code - should not have to have all submissions go through + # raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # start subjobs - logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") - for server_data in subjob_servers: - if server_data['hostname'] != local_hostname: - child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" - worker.children[child_key] = server_data['submission_results'] - worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" + logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs") + for subjob_server_data in subjob_frame_ranges: + if subjob_server_data['hostname'] != local_hostname: + child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}" + parent_worker.children[child_key] = subjob_server_data['submission_results'] + parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]" except Exception as e: # cancel all the subjobs - logger.error(f"Failed to split job into subjobs: {e}") - logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") + logger.exception(f"Failed to split job into subjobs: {e}") + logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs") # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in # submission_results.items()] # todo: fix this @@ -316,17 +326,20 @@ class DistributedJobManager: new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100) """ logger.info(f"parentID: {parent_job_id}") + local_hostname = socket.gethostname() parent_job = RenderQueue.job_with_id(parent_job_id) subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, 'args': parent_job.args, 'output_path': parent_job.output_path, 'engine_version': parent_job.renderer_version, 'start_frame': start_frame, 'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"} - logger.debug(f"new subjob data: {subjob_data}") - - logger.debug(f"Creating subjob {start_frame}-{end_frame} for {remote_hostname}") + logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] " + f"for {remote_hostname}") post_results = RenderServerProxy(remote_hostname).post_job_to_server( file_path=parent_job.input_path, job_list=[subjob_data]) + post_results_json = post_results.json()[0] + + parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json return post_results @staticmethod