mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Split_into_subjobs WIP
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
@@ -119,6 +120,7 @@ class DistributedJobManager:
|
|||||||
cls.new_create_subjob(parent_job.id, socket.gethostname(),
|
cls.new_create_subjob(parent_job.id, socket.gethostname(),
|
||||||
parent_job.children[subjob_key]['start_frame'],
|
parent_job.children[subjob_key]['start_frame'],
|
||||||
parent_job.children[subjob_key]['end_frame'])
|
parent_job.children[subjob_key]['end_frame'])
|
||||||
|
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def determine_missing_frames(parent_job_id):
|
def determine_missing_frames(parent_job_id):
|
||||||
@@ -193,7 +195,6 @@ class DistributedJobManager:
|
|||||||
"""
|
"""
|
||||||
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
||||||
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||||
|
|
||||||
server_proxys = {}
|
server_proxys = {}
|
||||||
|
|
||||||
def fetch_subjob_info(child_key):
|
def fetch_subjob_info(child_key):
|
||||||
@@ -253,51 +254,60 @@ class DistributedJobManager:
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def split_into_subjobs(cls, worker, job_data, project_path):
|
def split_into_subjobs(cls, parent_worker, job_data, project_path):
|
||||||
|
|
||||||
# Check availability
|
# Check availability
|
||||||
available_servers = cls.find_available_servers(worker.renderer)
|
available_servers = cls.find_available_servers(parent_worker.renderer)
|
||||||
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
|
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
|
||||||
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
|
subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
|
||||||
local_hostname = socket.gethostname()
|
local_hostname = socket.gethostname()
|
||||||
|
|
||||||
# Prep and submit these sub-jobs
|
# Prep and submit these sub-jobs
|
||||||
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
|
simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
|
||||||
|
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
|
||||||
try:
|
try:
|
||||||
for server_data in subjob_servers:
|
|
||||||
server_hostname = server_data['hostname']
|
# setup parent render job first - truncate frames
|
||||||
|
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
|
||||||
|
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
|
||||||
|
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
|
||||||
|
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
|
||||||
|
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
|
||||||
|
|
||||||
|
# setup remote subjobs
|
||||||
|
submission_results = {}
|
||||||
|
for subjob_server_data in subjob_frame_ranges:
|
||||||
|
server_hostname = subjob_server_data['hostname']
|
||||||
if server_hostname != local_hostname:
|
if server_hostname != local_hostname:
|
||||||
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data,
|
post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
|
||||||
server_hostname, worker)
|
subjob_server_data['frame_range'][0],
|
||||||
|
subjob_server_data['frame_range'][-1])
|
||||||
|
|
||||||
if post_results.ok:
|
if post_results.ok:
|
||||||
server_data['submission_results'] = post_results.json()[0]
|
subjob_server_data['submission_results'] = post_results.json()[0]
|
||||||
else:
|
else:
|
||||||
logger.error(f"Failed to create subjob on {server_hostname}")
|
logger.error(f"Failed to create subjob on {server_hostname}")
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# truncate parent render_job
|
subjob_server_data['submission_results'] = [True]
|
||||||
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
|
|
||||||
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
|
|
||||||
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
|
|
||||||
server_data['submission_results'] = worker.json()
|
|
||||||
|
|
||||||
# check that job posts were all successful.
|
# check that job posts were all successful.
|
||||||
if not all(d.get('submission_results') is not None for d in subjob_servers):
|
# if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
|
||||||
# todo: rewrite this code - should not have to have all submissions go through
|
# # todo: rewrite this code - should not have to have all submissions go through
|
||||||
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
# raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
||||||
|
|
||||||
# start subjobs
|
# start subjobs
|
||||||
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
|
logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||||
for server_data in subjob_servers:
|
for subjob_server_data in subjob_frame_ranges:
|
||||||
if server_data['hostname'] != local_hostname:
|
if subjob_server_data['hostname'] != local_hostname:
|
||||||
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
|
child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
|
||||||
worker.children[child_key] = server_data['submission_results']
|
parent_worker.children[child_key] = subjob_server_data['submission_results']
|
||||||
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
|
parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# cancel all the subjobs
|
# cancel all the subjobs
|
||||||
logger.error(f"Failed to split job into subjobs: {e}")
|
logger.exception(f"Failed to split job into subjobs: {e}")
|
||||||
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
|
logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||||
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
|
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
|
||||||
# submission_results.items()] # todo: fix this
|
# submission_results.items()] # todo: fix this
|
||||||
|
|
||||||
@@ -316,17 +326,20 @@ class DistributedJobManager:
|
|||||||
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
|
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
|
||||||
"""
|
"""
|
||||||
logger.info(f"parentID: {parent_job_id}")
|
logger.info(f"parentID: {parent_job_id}")
|
||||||
|
local_hostname = socket.gethostname()
|
||||||
parent_job = RenderQueue.job_with_id(parent_job_id)
|
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||||
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
||||||
'args': parent_job.args, 'output_path': parent_job.output_path,
|
'args': parent_job.args, 'output_path': parent_job.output_path,
|
||||||
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
||||||
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
|
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
|
||||||
|
|
||||||
logger.debug(f"new subjob data: {subjob_data}")
|
logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
|
||||||
|
f"for {remote_hostname}")
|
||||||
logger.debug(f"Creating subjob {start_frame}-{end_frame} for {remote_hostname}")
|
|
||||||
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
|
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
|
||||||
file_path=parent_job.input_path, job_list=[subjob_data])
|
file_path=parent_job.input_path, job_list=[subjob_data])
|
||||||
|
post_results_json = post_results.json()[0]
|
||||||
|
|
||||||
|
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
|
||||||
return post_results
|
return post_results
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user