mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 08:48:13 +00:00
Compare commits
21 Commits
github-act
...
#24_genera
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
298a2ccc64 | ||
|
|
496e5f78a5 | ||
|
|
c1e5fd1129 | ||
|
|
f6073b2954 | ||
|
|
cc1d6ba452 | ||
|
|
a5e9ac0014 | ||
|
|
1d44716a1f | ||
|
|
ba81be7088 | ||
|
|
8574486443 | ||
|
|
fca2a9f441 | ||
|
|
0730b20c52 | ||
|
|
80ffda8447 | ||
|
|
3b975418de | ||
|
|
b646c1f848 | ||
|
|
0fe50bc175 | ||
|
|
fa0bdf807f | ||
|
|
5b102a5ea4 | ||
|
|
006a97a17a | ||
|
|
3e567060f8 | ||
|
|
7dff2e3393 | ||
|
|
0f4a9b5ddd |
@@ -166,8 +166,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
|
||||
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
|
||||
else:
|
||||
logger.debug("Not splitting into subjobs")
|
||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||
|
||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||
if not worker.parent:
|
||||
from src.api.api_server import make_job_ready
|
||||
make_job_ready(worker.id)
|
||||
|
||||
@@ -168,7 +168,7 @@ def subjob_status_change(job_id):
|
||||
try:
|
||||
subjob_details = request.json
|
||||
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
|
||||
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
||||
DistributedJobManager.handle_subjob_status_change(job_id, subjob_data=subjob_details)
|
||||
return Response(status=200)
|
||||
except JobNotFoundError:
|
||||
return "Job not found", 404
|
||||
@@ -286,8 +286,10 @@ def add_job_handler():
|
||||
try:
|
||||
if request.is_json:
|
||||
jobs_list = [request.json] if not isinstance(request.json, list) else request.json
|
||||
logger.debug(f"Received add_job JSON: {jobs_list}")
|
||||
elif request.form.get('json', None):
|
||||
jobs_list = json.loads(request.form['json'])
|
||||
logger.debug(f"Received add_job form: {jobs_list}")
|
||||
else:
|
||||
# Cleanup flat form data into nested structure
|
||||
form_dict = {k: v for k, v in dict(request.form).items() if v}
|
||||
@@ -301,6 +303,7 @@ def add_job_handler():
|
||||
args['raw'] = form_dict.get('raw_args', None)
|
||||
form_dict['args'] = args
|
||||
jobs_list = [form_dict]
|
||||
logger.debug(f"Received add_job data: {jobs_list}")
|
||||
except Exception as e:
|
||||
err_msg = f"Error processing job data: {e}"
|
||||
logger.error(err_msg)
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor
|
||||
@@ -33,6 +34,9 @@ class RenderServerProxy:
|
||||
self.__background_thread = None
|
||||
self.__offline_flags = 0
|
||||
self.update_cadence = 5
|
||||
self.last_contact = datetime.now()
|
||||
# to prevent errors, the last contact datetime is set to when the class is initialized - you must keep an
|
||||
# instance of this class alive to accurately know the delay
|
||||
|
||||
def connect(self):
|
||||
status = self.request_data('status')
|
||||
@@ -55,6 +59,7 @@ class RenderServerProxy:
|
||||
req = self.request(payload, timeout)
|
||||
if req.ok and req.status_code == 200:
|
||||
self.__offline_flags = 0
|
||||
self.last_contact = datetime.now()
|
||||
return req.json()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.debug(f"JSON decode error: {e}")
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@@ -85,38 +86,60 @@ class DistributedJobManager:
|
||||
logger.debug(f"Unable to show UI notification: {e}")
|
||||
|
||||
@classmethod
|
||||
def handle_subjob_status_change(cls, local_job, subjob_data):
|
||||
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
|
||||
"""
|
||||
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
||||
|
||||
Parameters:
|
||||
local_job (BaseRenderWorker): The local parent job worker.
|
||||
subjob_data (dict): subjob data sent from remote server.
|
||||
local_job_id (str): ID for local parent job worker.
|
||||
subjob_data (dict): Subjob data sent from the remote server.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
|
||||
subjob_status = string_to_status(subjob_data['status'])
|
||||
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||
subjob_id = subjob_data['id']
|
||||
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
|
||||
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
|
||||
hostname.split('@')[0] == subjob_id), None)
|
||||
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
|
||||
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
||||
|
||||
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
|
||||
# Update the local job's subjob data
|
||||
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
|
||||
parent_job.children[subjob_key] = subjob_data
|
||||
|
||||
logname = f"{parent_job_id}:{subjob_key}"
|
||||
subjob_status = string_to_status(subjob_data['status'])
|
||||
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
||||
|
||||
# Download complete or partial render jobs
|
||||
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
|
||||
subjob_data['file_count']:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
# todo: handle error
|
||||
# Handle downloading for completed, cancelled, or error'd subjobs
|
||||
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
|
||||
and subjob_data['file_count']):
|
||||
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
|
||||
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
||||
|
||||
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
|
||||
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
||||
# todo: determine missing frames and schedule new job
|
||||
pass
|
||||
logger.info("Creating a new subjob")
|
||||
cls.new_create_subjob(parent_job.id, socket.gethostname(),
|
||||
parent_job.children[subjob_key]['start_frame'],
|
||||
parent_job.children[subjob_key]['end_frame'])
|
||||
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
|
||||
|
||||
@staticmethod
|
||||
def determine_missing_frames(parent_job_id):
|
||||
"""
|
||||
Determine missing frames in the subjob.
|
||||
|
||||
Parameters:
|
||||
subjob_data (dict): Subjob data.
|
||||
|
||||
Returns:
|
||||
list: List of missing frame numbers.
|
||||
"""
|
||||
# todo: Implement the logic to determine missing frames based on subjob_data
|
||||
missing_frames = []
|
||||
return missing_frames
|
||||
|
||||
|
||||
@staticmethod
|
||||
def download_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||
@@ -138,13 +161,11 @@ class DistributedJobManager:
|
||||
|
||||
# download zip file from server
|
||||
try:
|
||||
local_job.children[child_key]['download_status'] = 'working'
|
||||
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
||||
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
||||
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception downloading files from remote server: {e}")
|
||||
local_job.children[child_key]['download_status'] = 'failed'
|
||||
return False
|
||||
|
||||
# extract zip
|
||||
@@ -155,129 +176,173 @@ class DistributedJobManager:
|
||||
zip_ref.extractall(extract_path)
|
||||
logger.info(f"Successfully extracted zip to: {extract_path}")
|
||||
os.remove(zip_file_path)
|
||||
local_job.children[child_key]['download_status'] = 'complete'
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception extracting zip file: {e}")
|
||||
local_job.children[child_key]['download_status'] = 'failed'
|
||||
return False
|
||||
|
||||
return local_job.children[child_key].get('download_status', None) == 'complete'
|
||||
|
||||
@classmethod
|
||||
def wait_for_subjobs(cls, local_job):
|
||||
logger.debug(f"Waiting for subjobs for job {local_job}")
|
||||
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
||||
def wait_for_subjobs(cls, parent_job):
|
||||
"""
|
||||
Wait for subjobs to complete and update the parent job's status.
|
||||
|
||||
def subjobs_not_downloaded():
|
||||
return {k: v for k, v in local_job.children.items() if 'download_status' not in v or
|
||||
v['download_status'] == 'working' or v['download_status'] is None}
|
||||
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
|
||||
status. It updates the parent job's children with the latest subjob information.
|
||||
|
||||
logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}')
|
||||
Parameters:
|
||||
parent_job (BaseRenderWorker): The parent job worker.
|
||||
|
||||
while len(subjobs_not_downloaded()):
|
||||
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
||||
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||
server_proxys = {}
|
||||
|
||||
subjob_id = child_key.split('@')[0]
|
||||
subjob_hostname = child_key.split('@')[-1]
|
||||
def fetch_subjob_info(child_key):
|
||||
"""
|
||||
Fetch subjob information from the remote server using a RenderServerProxy.
|
||||
|
||||
Parameters:
|
||||
child_key (str): The key representing the subjob.
|
||||
|
||||
Returns:
|
||||
dict: Subjob information.
|
||||
"""
|
||||
subjob_id, subjob_hostname = child_key.split('@')
|
||||
if subjob_hostname not in server_proxys:
|
||||
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
|
||||
return server_proxys[subjob_hostname].get_job_info(subjob_id)
|
||||
|
||||
while True:
|
||||
incomplete_jobs = {}
|
||||
|
||||
for child_key in list(
|
||||
parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration
|
||||
subjob_data = fetch_subjob_info(child_key)
|
||||
|
||||
# Fetch info from server and handle failing case
|
||||
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
|
||||
if not subjob_data:
|
||||
logger.warning(f"No response from: {subjob_hostname}")
|
||||
# todo: handle timeout / missing server situations
|
||||
subjob_id, subjob_hostname = child_key.split('@')
|
||||
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
|
||||
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
|
||||
|
||||
last_connection_max_time = 12
|
||||
if last_connection.seconds > last_connection_max_time:
|
||||
logger.error(
|
||||
f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
|
||||
logger.warning(f"Spinning up a new subjob to replace the offlined server")
|
||||
parent_job.children[child_key]['errors'] = ['Renderer went offline']
|
||||
parent_job.children[child_key]['status'] = RenderStatus.ERROR
|
||||
|
||||
cls.handle_subjob_status_change(parent_job_id=parent_job.id,
|
||||
subjob_data=parent_job.children[child_key])
|
||||
continue
|
||||
|
||||
# Update parent job cache but keep the download status
|
||||
download_status = local_job.children[child_key].get('download_status', None)
|
||||
local_job.children[child_key] = subjob_data
|
||||
local_job.children[child_key]['download_status'] = download_status
|
||||
parent_job.children[child_key] = subjob_data
|
||||
|
||||
status = string_to_status(subjob_data.get('status', ''))
|
||||
status_msg = f"Subjob {child_key} | {status} | " \
|
||||
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
|
||||
status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%"
|
||||
logger.debug(status_msg)
|
||||
|
||||
# Still working in another thread - keep waiting
|
||||
if download_status == 'working':
|
||||
continue
|
||||
if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
incomplete_jobs[child_key] = subjob_data
|
||||
|
||||
# Check if job is finished, but has not had files copied yet over yet
|
||||
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
logger.error("Failed to download from subjob")
|
||||
# todo: error handling here
|
||||
|
||||
# Any finished jobs not successfully downloaded at this point are skipped
|
||||
if local_job.children[child_key].get('download_status', None) is None and \
|
||||
status in statuses_to_download:
|
||||
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
|
||||
local_job.children[child_key]['download_status'] = 'skipped'
|
||||
|
||||
if subjobs_not_downloaded():
|
||||
logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on "
|
||||
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
|
||||
time.sleep(5)
|
||||
if incomplete_jobs:
|
||||
logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}")
|
||||
else:
|
||||
logger.debug("No more incomplete subjobs")
|
||||
if not cls.completion_hold_enabled:
|
||||
break
|
||||
time.sleep(5)
|
||||
|
||||
@classmethod
|
||||
def split_into_subjobs(cls, worker, job_data, project_path):
|
||||
def split_into_subjobs(cls, parent_worker, job_data, project_path):
|
||||
|
||||
# Check availability
|
||||
available_servers = cls.find_available_servers(worker.renderer)
|
||||
available_servers = cls.find_available_servers(parent_worker.renderer)
|
||||
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
|
||||
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
|
||||
subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
|
||||
local_hostname = socket.gethostname()
|
||||
|
||||
# Prep and submit these sub-jobs
|
||||
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
|
||||
simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
|
||||
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
|
||||
try:
|
||||
for server_data in subjob_servers:
|
||||
server_hostname = server_data['hostname']
|
||||
|
||||
# setup parent render job first - truncate frames
|
||||
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
|
||||
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
|
||||
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
|
||||
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
|
||||
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
|
||||
|
||||
# setup remote subjobs
|
||||
submission_results = {}
|
||||
for subjob_server_data in subjob_frame_ranges:
|
||||
server_hostname = subjob_server_data['hostname']
|
||||
if server_hostname != local_hostname:
|
||||
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data,
|
||||
server_hostname, worker)
|
||||
post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
|
||||
subjob_server_data['frame_range'][0],
|
||||
subjob_server_data['frame_range'][-1])
|
||||
|
||||
if post_results.ok:
|
||||
server_data['submission_results'] = post_results.json()[0]
|
||||
subjob_server_data['submission_results'] = post_results.json()[0]
|
||||
else:
|
||||
logger.error(f"Failed to create subjob on {server_hostname}")
|
||||
break
|
||||
else:
|
||||
# truncate parent render_job
|
||||
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
|
||||
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
|
||||
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
|
||||
server_data['submission_results'] = worker.json()
|
||||
subjob_server_data['submission_results'] = [True]
|
||||
|
||||
# check that job posts were all successful.
|
||||
if not all(d.get('submission_results') is not None for d in subjob_servers):
|
||||
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
||||
# if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
|
||||
# # todo: rewrite this code - should not have to have all submissions go through
|
||||
# raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
||||
|
||||
# start subjobs
|
||||
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
|
||||
for server_data in subjob_servers:
|
||||
if server_data['hostname'] != local_hostname:
|
||||
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
|
||||
worker.children[child_key] = server_data['submission_results']
|
||||
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
|
||||
logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||
for subjob_server_data in subjob_frame_ranges:
|
||||
if subjob_server_data['hostname'] != local_hostname:
|
||||
child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
|
||||
parent_worker.children[child_key] = subjob_server_data['submission_results']
|
||||
parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
|
||||
|
||||
except Exception as e:
|
||||
# cancel all the subjobs
|
||||
logger.error(f"Failed to split job into subjobs: {e}")
|
||||
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
|
||||
logger.exception(f"Failed to split job into subjobs: {e}")
|
||||
logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
|
||||
# submission_results.items()] # todo: fix this
|
||||
|
||||
@staticmethod
|
||||
def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker):
|
||||
subjob = job_data.copy()
|
||||
subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
|
||||
subjob['parent'] = f"{worker.id}@{local_hostname}"
|
||||
subjob['start_frame'] = server_data['frame_range'][0]
|
||||
subjob['end_frame'] = server_data['frame_range'][-1]
|
||||
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
|
||||
f"{subjob['end_frame']} to {server_hostname}")
|
||||
post_results = RenderServerProxy(server_hostname).post_job_to_server(
|
||||
file_path=project_path, job_list=[subjob])
|
||||
def new_create_subjob(parent_job_id, remote_hostname, start_frame, end_frame):
|
||||
"""
|
||||
Create and post a subjob to a remote render server.
|
||||
|
||||
Parameters:
|
||||
- parent_job_id (str): ID of the parent job.
|
||||
- remote_hostname (str): Remote server's hostname/address.
|
||||
- start_frame (int): Starting frame of the subjob.
|
||||
- end_frame (int): Ending frame of the subjob.
|
||||
|
||||
Example:
|
||||
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
|
||||
"""
|
||||
logger.info(f"parentID: {parent_job_id}")
|
||||
local_hostname = socket.gethostname()
|
||||
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
||||
'args': parent_job.args, 'output_path': parent_job.output_path,
|
||||
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
||||
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
|
||||
|
||||
logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
|
||||
f"for {remote_hostname}")
|
||||
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
|
||||
file_path=parent_job.input_path, job_list=[subjob_data])
|
||||
post_results_json = post_results.json()[0]
|
||||
|
||||
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
|
||||
return post_results
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -28,7 +28,6 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
self.start_frame = int(self.scene_info.get('start_frame', 1))
|
||||
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
|
||||
self.project_length = (self.end_frame - self.start_frame) + 1
|
||||
self.current_frame = -1
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ class BaseRenderWorker(Base):
|
||||
project_length = Column(Integer)
|
||||
start_frame = Column(Integer)
|
||||
end_frame = Column(Integer, nullable=True)
|
||||
current_frame = Column(Integer)
|
||||
parent = Column(String, nullable=True)
|
||||
children = Column(JSON)
|
||||
name = Column(String)
|
||||
@@ -75,7 +76,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
# Frame Ranges
|
||||
self.project_length = -1
|
||||
self.current_frame = 0 # should this be a 1 ?
|
||||
self.current_frame = -1 # negative indicates not started
|
||||
self.start_frame = 0 # should this be a 1 ?
|
||||
self.end_frame = None
|
||||
|
||||
@@ -236,7 +237,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
if self.children:
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
DistributedJobManager.wait_for_subjobs(local_job=self)
|
||||
DistributedJobManager.wait_for_subjobs(parent_job=self)
|
||||
|
||||
# Post Render Work
|
||||
logger.debug("Starting post-processing work")
|
||||
@@ -304,7 +305,6 @@ class BaseRenderWorker(Base):
|
||||
'children': self.children,
|
||||
'date_created': self.date_created,
|
||||
'start_time': self.start_time,
|
||||
'end_time': self.end_time,
|
||||
'status': self.status.value,
|
||||
'file_hash': self.file_hash,
|
||||
'percent_complete': self.percent_complete(),
|
||||
@@ -314,6 +314,7 @@ class BaseRenderWorker(Base):
|
||||
'errors': getattr(self, 'errors', None),
|
||||
'start_frame': self.start_frame,
|
||||
'end_frame': self.end_frame,
|
||||
'current_frame': self.current_frame,
|
||||
'total_frames': self.total_frames,
|
||||
'last_output': getattr(self, 'last_output', None),
|
||||
'log_path': self.log_path()
|
||||
|
||||
@@ -19,7 +19,6 @@ class FFMPEGRenderWorker(BaseRenderWorker):
|
||||
"/dev/null"], stderr=subprocess.STDOUT).decode('utf-8')
|
||||
found_frames = re.findall('frame=\s*(\d+)', stream_info)
|
||||
self.project_length = found_frames[-1] if found_frames else '-1'
|
||||
self.current_frame = -1
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
|
||||
@@ -65,10 +65,12 @@ class RenderQueue:
|
||||
|
||||
@classmethod
|
||||
def job_with_id(cls, job_id, none_ok=False):
|
||||
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
|
||||
if not found_job and not none_ok:
|
||||
raise JobNotFoundError(job_id)
|
||||
return found_job
|
||||
for job in cls.all_jobs():
|
||||
if job.id == job_id:
|
||||
return job
|
||||
if not none_ok:
|
||||
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def clear_history(cls):
|
||||
|
||||
Reference in New Issue
Block a user