Split_into_subjobs WIP

This commit is contained in:
Brett Williams
2023-10-25 19:00:41 -05:00
parent c1e5fd1129
commit 496e5f78a5

View File

@@ -1,3 +1,4 @@
import datetime
import logging import logging
import os import os
import socket import socket
@@ -122,6 +123,7 @@ class DistributedJobManager:
cls.new_create_subjob(parent_job.id, socket.gethostname(), cls.new_create_subjob(parent_job.id, socket.gethostname(),
parent_job.children[subjob_key]['start_frame'], parent_job.children[subjob_key]['start_frame'],
parent_job.children[subjob_key]['end_frame']) parent_job.children[subjob_key]['end_frame'])
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
@staticmethod @staticmethod
def determine_missing_frames(parent_job_id): def determine_missing_frames(parent_job_id):
@@ -196,7 +198,6 @@ class DistributedJobManager:
""" """
logger.debug(f"Waiting for subjobs for job {parent_job}") logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
server_proxys = {} server_proxys = {}
def fetch_subjob_info(child_key): def fetch_subjob_info(child_key):
@@ -256,51 +257,60 @@ class DistributedJobManager:
time.sleep(5) time.sleep(5)
@classmethod @classmethod
def split_into_subjobs(cls, worker, job_data, project_path): def split_into_subjobs(cls, parent_worker, job_data, project_path):
# Check availability # Check availability
available_servers = cls.find_available_servers(worker.renderer) available_servers = cls.find_available_servers(parent_worker.renderer)
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
local_hostname = socket.gethostname() local_hostname = socket.gethostname()
# Prep and submit these sub-jobs # Prep and submit these sub-jobs
logger.info(f"Job {worker.id} split plan: {subjob_servers}") simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
try: try:
for server_data in subjob_servers:
server_hostname = server_data['hostname'] # setup parent render job first - truncate frames
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
# setup remote subjobs
submission_results = {}
for subjob_server_data in subjob_frame_ranges:
server_hostname = subjob_server_data['hostname']
if server_hostname != local_hostname: if server_hostname != local_hostname:
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
server_hostname, worker) subjob_server_data['frame_range'][0],
subjob_server_data['frame_range'][-1])
if post_results.ok: if post_results.ok:
server_data['submission_results'] = post_results.json()[0] subjob_server_data['submission_results'] = post_results.json()[0]
else: else:
logger.error(f"Failed to create subjob on {server_hostname}") logger.error(f"Failed to create subjob on {server_hostname}")
break break
else: else:
# truncate parent render_job subjob_server_data['submission_results'] = [True]
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
server_data['submission_results'] = worker.json()
# check that job posts were all successful. # check that job posts were all successful.
if not all(d.get('submission_results') is not None for d in subjob_servers): # if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
# todo: rewrite this code - should not have to have all submissions go through # # todo: rewrite this code - should not have to have all submissions go through
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs # raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# start subjobs # start subjobs
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
for server_data in subjob_servers: for subjob_server_data in subjob_frame_ranges:
if server_data['hostname'] != local_hostname: if subjob_server_data['hostname'] != local_hostname:
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}" child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
worker.children[child_key] = server_data['submission_results'] parent_worker.children[child_key] = subjob_server_data['submission_results']
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
except Exception as e: except Exception as e:
# cancel all the subjobs # cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}") logger.exception(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in # [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
# submission_results.items()] # todo: fix this # submission_results.items()] # todo: fix this
@@ -319,17 +329,20 @@ class DistributedJobManager:
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100) new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
""" """
logger.info(f"parentID: {parent_job_id}") logger.info(f"parentID: {parent_job_id}")
local_hostname = socket.gethostname()
parent_job = RenderQueue.job_with_id(parent_job_id) parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path, subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
'args': parent_job.args, 'output_path': parent_job.output_path, 'args': parent_job.args, 'output_path': parent_job.output_path,
'engine_version': parent_job.renderer_version, 'start_frame': start_frame, 'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"} 'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
logger.debug(f"new subjob data: {subjob_data}") logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
f"for {remote_hostname}")
logger.debug(f"Creating subjob {start_frame}-{end_frame} for {remote_hostname}")
post_results = RenderServerProxy(remote_hostname).post_job_to_server( post_results = RenderServerProxy(remote_hostname).post_job_to_server(
file_path=parent_job.input_path, job_list=[subjob_data]) file_path=parent_job.input_path, job_list=[subjob_data])
post_results_json = post_results.json()[0]
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
return post_results return post_results
@staticmethod @staticmethod