21 Commits

Author SHA1 Message Date
Brett Williams
298a2ccc64 Merge remote-tracking branch 'origin/#24_generate_new_subjobs_on_error' into #24_generate_new_subjobs_on_error 2023-10-26 04:22:09 -05:00
Brett Williams
496e5f78a5 Split_into_subjobs WIP 2023-10-26 04:21:52 -05:00
Brett Williams
c1e5fd1129 Updated wait_for_subjobs 2023-10-26 04:21:52 -05:00
Brett Williams
f6073b2954 Add last connected to server_proxy.py 2023-10-26 04:21:52 -05:00
Brett Williams
cc1d6ba452 Misc cleanup 2023-10-26 04:21:52 -05:00
Brett Williams
a5e9ac0014 wait_for_subjobs rewrite 2023-10-26 04:21:52 -05:00
Brett Williams
1d44716a1f Added new_create_subjob method 2023-10-26 04:21:52 -05:00
Brett Williams
ba81be7088 Move the current_frame attribute to base_worker.py 2023-10-26 04:21:52 -05:00
Brett Williams
8574486443 Missed a line 2023-10-26 04:21:52 -05:00
Brett Williams
fca2a9f441 Fix issue where subjobs were not updating parent job json 2023-10-26 04:21:52 -05:00
Brett Williams
0730b20c52 Added two stubs for methods needed for dynamic subjob generation 2023-10-26 04:21:51 -05:00
Brett Williams
80ffda8447 Split_into_subjobs WIP 2023-10-25 19:00:41 -05:00
Brett Williams
3b975418de Updated wait_for_subjobs 2023-10-25 18:59:01 -05:00
Brett Williams
b646c1f848 Add last connected to server_proxy.py 2023-10-25 18:54:36 -05:00
Brett Williams
0fe50bc175 Misc cleanup 2023-10-25 18:53:41 -05:00
Brett Williams
fa0bdf807f wait_for_subjobs rewrite 2023-10-25 15:54:14 -05:00
Brett Williams
5b102a5ea4 Added new_create_subjob method 2023-10-25 15:07:17 -05:00
Brett Williams
006a97a17a Move the current_frame attribute to base_worker.py 2023-10-25 13:41:14 -05:00
Brett Williams
3e567060f8 Missed a line 2023-10-25 11:47:12 -05:00
Brett Williams
7dff2e3393 Fix issue where subjobs were not updating parent job json 2023-10-25 11:46:09 -05:00
Brett Williams
0f4a9b5ddd Added two stubs for methods needed for dynamic subjob generation 2023-10-25 07:28:49 -05:00
8 changed files with 182 additions and 108 deletions

View File

@@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
from src.api.api_server import make_job_ready
make_job_ready(worker.id)

View File

@@ -168,7 +168,7 @@ def subjob_status_change(job_id):
try:
subjob_details = request.json
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
DistributedJobManager.handle_subjob_status_change(job_id, subjob_data=subjob_details)
return Response(status=200)
except JobNotFoundError:
return "Job not found", 404
@@ -286,8 +286,10 @@ def add_job_handler():
try:
if request.is_json:
jobs_list = [request.json] if not isinstance(request.json, list) else request.json
logger.debug(f"Received add_job JSON: {jobs_list}")
elif request.form.get('json', None):
jobs_list = json.loads(request.form['json'])
logger.debug(f"Received add_job form: {jobs_list}")
else:
# Cleanup flat form data into nested structure
form_dict = {k: v for k, v in dict(request.form).items() if v}
@@ -301,6 +303,7 @@ def add_job_handler():
args['raw'] = form_dict.get('raw_args', None)
form_dict['args'] = args
jobs_list = [form_dict]
logger.debug(f"Received add_job data: {jobs_list}")
except Exception as e:
err_msg = f"Error processing job data: {e}"
logger.error(err_msg)

View File

@@ -4,6 +4,7 @@ import os
import socket
import threading
import time
from datetime import datetime
import requests
from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor
@@ -33,6 +34,9 @@ class RenderServerProxy:
self.__background_thread = None
self.__offline_flags = 0
self.update_cadence = 5
self.last_contact = datetime.now()
# to prevent errors, the last contact datetime is set to when the class is initialized - you must keep an
# instance of this class alive to accurately know the delay
def connect(self):
status = self.request_data('status')
@@ -55,6 +59,7 @@ class RenderServerProxy:
req = self.request(payload, timeout)
if req.ok and req.status_code == 200:
self.__offline_flags = 0
self.last_contact = datetime.now()
return req.json()
except json.JSONDecodeError as e:
logger.debug(f"JSON decode error: {e}")

View File

@@ -1,3 +1,4 @@
import datetime
import logging
import os
import socket
@@ -85,38 +86,60 @@ class DistributedJobManager:
logger.debug(f"Unable to show UI notification: {e}")
@classmethod
def handle_subjob_status_change(cls, local_job, subjob_data):
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
"""
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
Parameters:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): subjob data sent from remote server.
local_job_id (str): ID for local parent job worker.
subjob_data (dict): Subjob data sent from the remote server.
Returns:
None
"""
subjob_status = string_to_status(subjob_data['status'])
parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_id = subjob_data['id']
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
hostname.split('@')[0] == subjob_id), None)
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
subjob_key = f'{subjob_id}@{subjob_hostname}'
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
# Update the local job's subjob data
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
parent_job.children[subjob_key] = subjob_data
logname = f"{parent_job_id}:{subjob_key}"
subjob_status = string_to_status(subjob_data['status'])
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
# Download complete or partial render jobs
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
subjob_data['file_count']:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
# todo: handle error
# Handle downloading for completed, cancelled, or error'd subjobs
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
and subjob_data['file_count']):
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
# todo: determine missing frames and schedule new job
pass
logger.info("Creating a new subjob")
cls.new_create_subjob(parent_job.id, socket.gethostname(),
parent_job.children[subjob_key]['start_frame'],
parent_job.children[subjob_key]['end_frame'])
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
@staticmethod
def determine_missing_frames(parent_job_id):
"""
Determine missing frames in the subjob.
Parameters:
subjob_data (dict): Subjob data.
Returns:
list: List of missing frame numbers.
"""
# todo: Implement the logic to determine missing frames based on subjob_data
missing_frames = []
return missing_frames
@staticmethod
def download_from_subjob(local_job, subjob_id, subjob_hostname):
@@ -138,13 +161,11 @@ class DistributedJobManager:
# download zip file from server
try:
local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e:
logger.exception(f"Exception downloading files from remote server: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
# extract zip
@@ -155,129 +176,173 @@ class DistributedJobManager:
zip_ref.extractall(extract_path)
logger.info(f"Successfully extracted zip to: {extract_path}")
os.remove(zip_file_path)
local_job.children[child_key]['download_status'] = 'complete'
return True
except Exception as e:
logger.exception(f"Exception extracting zip file: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
return local_job.children[child_key].get('download_status', None) == 'complete'
@classmethod
def wait_for_subjobs(cls, local_job):
logger.debug(f"Waiting for subjobs for job {local_job}")
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
def wait_for_subjobs(cls, parent_job):
"""
Wait for subjobs to complete and update the parent job's status.
def subjobs_not_downloaded():
return {k: v for k, v in local_job.children.items() if 'download_status' not in v or
v['download_status'] == 'working' or v['download_status'] is None}
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
status. It updates the parent job's children with the latest subjob information.
logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}')
Parameters:
parent_job (BaseRenderWorker): The parent job worker.
while len(subjobs_not_downloaded()):
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
Returns:
None
"""
logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
server_proxys = {}
subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1]
def fetch_subjob_info(child_key):
"""
Fetch subjob information from the remote server using a RenderServerProxy.
Parameters:
child_key (str): The key representing the subjob.
Returns:
dict: Subjob information.
"""
subjob_id, subjob_hostname = child_key.split('@')
if subjob_hostname not in server_proxys:
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
return server_proxys[subjob_hostname].get_job_info(subjob_id)
while True:
incomplete_jobs = {}
for child_key in list(
parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration
subjob_data = fetch_subjob_info(child_key)
# Fetch info from server and handle failing case
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
if not subjob_data:
logger.warning(f"No response from: {subjob_hostname}")
# todo: handle timeout / missing server situations
subjob_id, subjob_hostname = child_key.split('@')
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
last_connection_max_time = 12
if last_connection.seconds > last_connection_max_time:
logger.error(
f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
logger.warning(f"Spinning up a new subjob to replace the offlined server")
parent_job.children[child_key]['errors'] = ['Renderer went offline']
parent_job.children[child_key]['status'] = RenderStatus.ERROR
cls.handle_subjob_status_change(parent_job_id=parent_job.id,
subjob_data=parent_job.children[child_key])
continue
# Update parent job cache but keep the download status
download_status = local_job.children[child_key].get('download_status', None)
local_job.children[child_key] = subjob_data
local_job.children[child_key]['download_status'] = download_status
parent_job.children[child_key] = subjob_data
status = string_to_status(subjob_data.get('status', ''))
status_msg = f"Subjob {child_key} | {status} | " \
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%"
logger.debug(status_msg)
# Still working in another thread - keep waiting
if download_status == 'working':
continue
if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]:
incomplete_jobs[child_key] = subjob_data
# Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
logger.error("Failed to download from subjob")
# todo: error handling here
# Any finished jobs not successfully downloaded at this point are skipped
if local_job.children[child_key].get('download_status', None) is None and \
status in statuses_to_download:
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
local_job.children[child_key]['download_status'] = 'skipped'
if subjobs_not_downloaded():
logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on "
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(5)
if incomplete_jobs:
logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}")
else:
logger.debug("No more incomplete subjobs")
if not cls.completion_hold_enabled:
break
time.sleep(5)
@classmethod
def split_into_subjobs(cls, worker, job_data, project_path):
def split_into_subjobs(cls, parent_worker, job_data, project_path):
# Check availability
available_servers = cls.find_available_servers(worker.renderer)
available_servers = cls.find_available_servers(parent_worker.renderer)
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
local_hostname = socket.gethostname()
# Prep and submit these sub-jobs
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
try:
for server_data in subjob_servers:
server_hostname = server_data['hostname']
# setup parent render job first - truncate frames
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
# setup remote subjobs
submission_results = {}
for subjob_server_data in subjob_frame_ranges:
server_hostname = subjob_server_data['hostname']
if server_hostname != local_hostname:
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data,
server_hostname, worker)
post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
subjob_server_data['frame_range'][0],
subjob_server_data['frame_range'][-1])
if post_results.ok:
server_data['submission_results'] = post_results.json()[0]
subjob_server_data['submission_results'] = post_results.json()[0]
else:
logger.error(f"Failed to create subjob on {server_hostname}")
break
else:
# truncate parent render_job
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
server_data['submission_results'] = worker.json()
subjob_server_data['submission_results'] = [True]
# check that job posts were all successful.
if not all(d.get('submission_results') is not None for d in subjob_servers):
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
# # todo: rewrite this code - should not have to have all submissions go through
# raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# start subjobs
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
for server_data in subjob_servers:
if server_data['hostname'] != local_hostname:
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
worker.children[child_key] = server_data['submission_results']
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
for subjob_server_data in subjob_frame_ranges:
if subjob_server_data['hostname'] != local_hostname:
child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
parent_worker.children[child_key] = subjob_server_data['submission_results']
parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
logger.exception(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
# submission_results.items()] # todo: fix this
@staticmethod
def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker):
subjob = job_data.copy()
subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{worker.id}@{local_hostname}"
subjob['start_frame'] = server_data['frame_range'][0]
subjob['end_frame'] = server_data['frame_range'][-1]
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
f"{subjob['end_frame']} to {server_hostname}")
post_results = RenderServerProxy(server_hostname).post_job_to_server(
file_path=project_path, job_list=[subjob])
def new_create_subjob(parent_job_id, remote_hostname, start_frame, end_frame):
"""
Create and post a subjob to a remote render server.
Parameters:
- parent_job_id (str): ID of the parent job.
- remote_hostname (str): Remote server's hostname/address.
- start_frame (int): Starting frame of the subjob.
- end_frame (int): Ending frame of the subjob.
Example:
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
"""
logger.info(f"parentID: {parent_job_id}")
local_hostname = socket.gethostname()
parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
'args': parent_job.args, 'output_path': parent_job.output_path,
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
f"for {remote_hostname}")
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
file_path=parent_job.input_path, job_list=[subjob_data])
post_results_json = post_results.json()[0]
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
return post_results
@staticmethod

View File

@@ -28,7 +28,6 @@ class BlenderRenderWorker(BaseRenderWorker):
self.start_frame = int(self.scene_info.get('start_frame', 1))
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
self.project_length = (self.end_frame - self.start_frame) + 1
self.current_frame = -1
def generate_worker_subprocess(self):

View File

@@ -35,6 +35,7 @@ class BaseRenderWorker(Base):
project_length = Column(Integer)
start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True)
current_frame = Column(Integer)
parent = Column(String, nullable=True)
children = Column(JSON)
name = Column(String)
@@ -75,7 +76,7 @@ class BaseRenderWorker(Base):
# Frame Ranges
self.project_length = -1
self.current_frame = 0 # should this be a 1 ?
self.current_frame = -1 # negative indicates not started
self.start_frame = 0 # should this be a 1 ?
self.end_frame = None
@@ -236,7 +237,7 @@ class BaseRenderWorker(Base):
if self.children:
from src.distributed_job_manager import DistributedJobManager
DistributedJobManager.wait_for_subjobs(local_job=self)
DistributedJobManager.wait_for_subjobs(parent_job=self)
# Post Render Work
logger.debug("Starting post-processing work")
@@ -304,7 +305,6 @@ class BaseRenderWorker(Base):
'children': self.children,
'date_created': self.date_created,
'start_time': self.start_time,
'end_time': self.end_time,
'status': self.status.value,
'file_hash': self.file_hash,
'percent_complete': self.percent_complete(),
@@ -314,6 +314,7 @@ class BaseRenderWorker(Base):
'errors': getattr(self, 'errors', None),
'start_frame': self.start_frame,
'end_frame': self.end_frame,
'current_frame': self.current_frame,
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None),
'log_path': self.log_path()

View File

@@ -19,7 +19,6 @@ class FFMPEGRenderWorker(BaseRenderWorker):
"/dev/null"], stderr=subprocess.STDOUT).decode('utf-8')
found_frames = re.findall('frame=\s*(\d+)', stream_info)
self.project_length = found_frames[-1] if found_frames else '-1'
self.current_frame = -1
def generate_worker_subprocess(self):

View File

@@ -65,10 +65,12 @@ class RenderQueue:
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
for job in cls.all_jobs():
if job.id == job_id:
return job
if not none_ok:
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
return None
@classmethod
def clear_history(cls):