Multi client jobs (#15)

* Add API to expose if RenderQueue is available to take new jobs for a given renderer and priority

* Fix issue with calculating Blender percent complete when not starting at 1

* Rename owner / client properties to parent / children

* Add make_ready method to API

* Create and submit subjobs to other servers

* Update make_ready to update children jobs and some misc fixes

* Misc GUI cleanup
This commit is contained in:
2023-06-15 02:01:50 -05:00
committed by GitHub
parent 78a389080c
commit 69715e8afa
10 changed files with 215 additions and 71 deletions

View File

@@ -4,4 +4,5 @@ server_log_level: info
flask_log_level: error
flask_debug_enable: false
queue_eval_seconds: 1
port_number: 8080
port_number: 8080
enable_split_jobs: true

View File

@@ -85,7 +85,7 @@ class DashboardWindow:
self.job_tree.tag_configure('running', background='lawn green', font=('', 0, 'bold'))
self.job_tree.bind("<<TreeviewSelect>>", self.job_picked)
self.job_tree["columns"] = ("id", "Name", "Renderer", "Priority", "Status", "Time Elapsed", "Frames",
"Date Added", "Owner", "")
"Date Added", "Parent", "")
# Format the columns
self.job_tree.column("id", width=0, stretch=False)
@@ -96,7 +96,7 @@ class DashboardWindow:
self.job_tree.column("Time Elapsed", width=100, stretch=False)
self.job_tree.column("Frames", width=50, stretch=False)
self.job_tree.column("Date Added", width=150, stretch=True)
self.job_tree.column("Owner", width=250, stretch=True)
self.job_tree.column("Parent", width=250, stretch=True)
# Create the column headings
for name in self.job_tree['columns']:
@@ -200,7 +200,7 @@ class DashboardWindow:
def stop_job(self):
job_ids = self.selected_job_ids()
for job_id in job_ids:
self.current_server_proxy.request_data(f'job/{job_id}/cancel?confirm=true')
self.current_server_proxy.cancel_job(job_id, confirm=True)
self.update_jobs(clear_table=True)
def delete_job(self):
@@ -250,7 +250,8 @@ class DashboardWindow:
self.set_image(self.default_image)
# update button status
job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == job_id), None)
current_jobs = self.current_server_proxy.get_jobs() or []
job = next((d for d in current_jobs if d.get('id') == job_id), None)
stop_button_state = 'normal' if job and job['status'] == 'running' else 'disabled'
self.stop_button.config(state=stop_button_state)
@@ -304,11 +305,14 @@ class DashboardWindow:
new_proxy.start_background_update()
self.server_proxies[hostname] = new_proxy
for hostname, proxy in self.server_proxies.items():
if hostname not in self.server_tree.get_children():
self.server_tree.insert("", tk.END, iid=hostname, values=(hostname, proxy.status(), ))
else:
update_row(self.server_tree, hostname, new_values=(hostname, proxy.status()))
try:
for hostname, proxy in self.server_proxies.items():
if hostname not in self.server_tree.get_children():
self.server_tree.insert("", tk.END, iid=hostname, values=(hostname, proxy.status(), ))
else:
update_row(self.server_tree, hostname, new_values=(hostname, proxy.status()))
except RuntimeError:
pass
# remove any servers that don't belong
for row in self.server_tree.get_children():
@@ -351,7 +355,7 @@ class DashboardWindow:
get_time_elapsed(start_time, end_time),
job['total_frames'],
job['date_created'],
job['owner'])
job['parent'])
try:
if self.job_tree.exists(job['id']):
update_row(self.job_tree, job['id'], new_values=values, tags=tags)

View File

@@ -67,7 +67,7 @@ class RenderQueue:
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
@@ -80,18 +80,19 @@ class RenderQueue:
def save_state(cls):
cls.session.commit()
@classmethod
def is_available_for_job(cls, renderer, priority):
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
return not max_renderers and not higher_priority_jobs
@classmethod
def evaluate_queue(cls):
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = job.renderer in instances.keys() and instances[
job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
@@ -106,6 +107,7 @@ class RenderQueue:
def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
cls.save_state()
@classmethod
def cancel_job(cls, job):

View File

@@ -20,10 +20,11 @@ from flask import Flask, request, render_template, send_file, after_this_request
from werkzeug.utils import secure_filename
from lib.render_queue import RenderQueue, JobNotFoundError
from lib.workers.base_worker import string_to_status, RenderStatus
from lib.workers.worker_factory import RenderWorkerFactory
from lib.server.server_proxy import RenderServerProxy
from lib.server.zeroconf_server import ZeroconfServer
from lib.utilities.server_helper import generate_thumbnail_for_job
from lib.workers.base_worker import string_to_status, RenderStatus
from lib.workers.worker_factory import RenderWorkerFactory
logger = logging.getLogger()
server = Flask(__name__, template_folder='templates', static_folder='static')
@@ -186,6 +187,23 @@ def get_file_list(job_id):
return RenderQueue.job_with_id(job_id).file_list()
@server.get('/api/job/<job_id>/make_ready')
def make_job_ready(job_id):
try:
found_job = RenderQueue.job_with_id(job_id)
if found_job.status in [RenderStatus.NOT_READY, RenderStatus.NOT_STARTED]:
if found_job.children:
for child_name in found_job.children.split(','):
child_id, hostname = child_name.split('@')
RenderServerProxy(hostname).request_data(f'/api/job/<child_id>/make_ready')
found_job.status = RenderStatus.NOT_STARTED
RenderQueue.save_state()
return found_job.json(), 200
except Exception as e:
return "Error making job ready: {e}", 500
return "Not valid command", 405
@server.route('/api/job/<job_id>/download_all')
def download_all(job_id):
zip_filename = None
@@ -239,9 +257,31 @@ def snapshot():
return server_data
@server.get('/api/_detected_clients')
def detected_clients():
# todo: dev/debug only. Should not ship this - probably.
return server.config['ZEROCONF_SERVER'].found_clients()
@server.route('/api/is_available_for_job', methods=['POST', 'GET'])
def available_for_job():
"""
Check queue to see if it can take a job with a given renderer and priority
"""
renderer = request.args.get('renderer')
priority = request.args.get('priority')
if not renderer or not priority:
return {"error": "Both 'renderer' and 'priority' parameters are required"}, 400
elif renderer not in RenderWorkerFactory.supported_renderers():
return {"error": f"Unsupported renderer: {renderer}"}, 400
else:
return {"is_available": RenderQueue.is_available_for_job(renderer, priority),
'renderer': renderer, 'priority': priority}, 200
@server.post('/api/add_job')
def add_job_handler():
# initial handling of raw data
try:
if request.is_json:
@@ -253,11 +293,11 @@ def add_job_handler():
form_dict = {k: v for k, v in dict(request.form).items() if v}
args = {}
arg_keys = [k for k in form_dict.keys() if '-arg_' in k]
for key in arg_keys:
if form_dict['renderer'] in key or 'AnyRenderer' in key:
cleaned_key = key.split('-arg_')[-1]
args[cleaned_key] = form_dict[key]
form_dict.pop(key)
for server_hostname in arg_keys:
if form_dict['renderer'] in server_hostname or 'AnyRenderer' in server_hostname:
cleaned_key = server_hostname.split('-arg_')[-1]
args[cleaned_key] = form_dict[server_hostname]
form_dict.pop(server_hostname)
args['raw'] = form_dict.get('raw_args', None)
form_dict['args'] = args
jobs_list = [form_dict]
@@ -269,6 +309,7 @@ def add_job_handler():
# start handling project files
try:
# handle uploaded files
logger.debug(f"Incoming new job request: {jobs_list}")
uploaded_project = request.files.get('file', None)
project_url = jobs_list[0].get('url', None)
input_path = jobs_list[0].get('input_path', None)
@@ -342,31 +383,36 @@ def add_job_handler():
# create and add jobs to render queue
results = []
for job in jobs_list:
for job_data in jobs_list:
try:
# prepare output paths
output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
output_dir = os.path.join(job_dir, job_data.get('name', None) or 'output')
os.makedirs(output_dir, exist_ok=True)
# get new output path in output_dir
job['output_path'] = os.path.join(output_dir, os.path.basename(
job.get('name', None) or job.get('output_path', None) or loaded_project_local_path
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
job_data.get('name', None) or job_data.get('output_path', None) or loaded_project_local_path
))
# create & configure jobs
render_job = RenderWorkerFactory.create_worker(renderer=job['renderer'],
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=job["output_path"],
args=job.get('args', {}))
render_job.client = server.config['HOSTNAME']
render_job.owner = job.get("owner", render_job.owner)
render_job.name = job.get("name", render_job.name)
render_job.priority = int(job.get('priority', render_job.priority))
render_job.start_frame = job.get("start_frame", render_job.start_frame)
render_job.end_frame = job.get("end_frame", render_job.end_frame)
output_path=job_data["output_path"],
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = job_data.get("start_frame", worker.start_frame)
worker.end_frame = job_data.get("end_frame", worker.end_frame)
RenderQueue.add_to_render_queue(render_job, force_start=job.get('force_start', False))
results.append(render_job.json())
# determine if we can / should split the job
if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent:
create_subjobs(worker, job_data, loaded_project_local_path)
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
make_job_ready(worker.id)
results.append(worker.json())
except Exception as e:
err_msg = f"Error creating render job: {e}"
logger.error(err_msg)
@@ -388,6 +434,89 @@ def add_job_handler():
return 'unknown error', 500
def create_subjobs(worker, job_data, project_path):
# Check availablity
local_hostname = server.config['HOSTNAME']
found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x]
available_servers = [local_hostname] + [hostname for hostname in found_servers if
RenderServerProxy(hostname).is_available_for_job(renderer=worker.renderer,
priority=worker.priority)]
if len(available_servers) <= 1:
logger.debug("No available servers to split job with. Skipping subjob creation.")
return
logger.info(f"Found {len(available_servers) - 1} additional available servers | "
f"Breaking up job into {len(available_servers)} jobs")
logger.debug(f"Available servers: {available_servers}")
def divide_frames(start_frame, end_frame, num_servers):
frame_range = end_frame - start_frame + 1
frames_per_server = frame_range // num_servers
leftover_frames = frame_range % num_servers
ranges = []
current_start = start_frame
for i in range(num_servers):
current_end = current_start + frames_per_server - 1
if leftover_frames > 0:
current_end += 1
leftover_frames -= 1
if current_start <= current_end:
ranges.append((current_start, current_end))
current_start = current_end + 1
return ranges
# Calculate respective frames for each server
server_frame_ranges = {}
for idx, frame_range in enumerate(divide_frames(worker.start_frame, worker.end_frame, len(available_servers))):
server_frame_ranges[available_servers[idx]] = frame_range
logger.info(f"Job {worker.id} split plan: {server_frame_ranges}")
# Prep and submit these sub-jobs
submission_results = {}
try:
for server_hostname, frame_range in server_frame_ranges.items():
if server_hostname != local_hostname:
subjob = job_data.copy()
subjob['name'] = f"{worker.name}[{frame_range[0]}-{frame_range[-1]}]"
subjob['parent'] = f"{worker.id}@{local_hostname}"
subjob['start_frame'] = frame_range[0]
subjob['end_frame'] = frame_range[-1]
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
f"{subjob['end_frame']} to {server_hostname}")
post_results = RenderServerProxy(server_hostname).post_job_to_server(
input_path=project_path, job_list=[subjob])
if post_results.ok:
submission_results[server_hostname] = post_results.json()[0]
else:
logger.error(f"Failed to create subjob on {server_hostname}")
break
# check that job posts were all successful.
if len(submission_results) != (len(server_frame_ranges) - 1):
raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs
# truncate parent render_job
worker.end_frame = min(server_frame_ranges[local_hostname][-1], worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
# start subjobs
logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs")
worker.children = ",".join([f"{results['id']}@{hostname}" for hostname, results in submission_results.items()])
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(server_frame_ranges) - 1} attempted subjobs")
[RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()]
@server.get('/api/job/<job_id>/cancel')
def cancel_job(job_id):
if not request.args.get('confirm', False):
@@ -506,6 +635,7 @@ def start_server(background_thread=False):
server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder'])
server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs')
server.config['MAX_CONTENT_PATH'] = config['max_content_path']
server.config['enable_split_jobs'] = config.get('enable_split_jobs', False)
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
@@ -520,6 +650,7 @@ def start_server(background_thread=False):
logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
zeroconf_server = ZeroconfServer("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
zeroconf_server.start()
server.config['ZEROCONF_SERVER'] = zeroconf_server
try:
if background_thread:
@@ -531,4 +662,5 @@ def start_server(background_thread=False):
server.run(host='0.0.0.0', port=server.config['PORT'], debug=config.get('flask_debug_enable', False),
use_reloader=False, threaded=True)
finally:
RenderQueue.save_state()
zeroconf_server.stop()

View File

@@ -84,7 +84,7 @@ class RenderServerProxy:
self.__update_in_background = False
def get_jobs(self, timeout=5, ignore_token=False):
if not self.__update_in_background:
if not self.__update_in_background or ignore_token:
self.__update_job_cache(timeout, ignore_token)
return self.__jobs_cache.copy() if self.__jobs_cache else None
@@ -104,6 +104,14 @@ class RenderServerProxy:
all_data = self.request_data('full_status', timeout=timeout)
return all_data
def cancel_job(self, job_id, confirm=False):
return self.request_data(f'job/{job_id}/cancel?confirm={confirm}')
def is_available_for_job(self, renderer, priority=2):
request_data = self.request_data(f'is_available_for_job?renderer={renderer}&priority={priority}',
timeout=1) or {}
return request_data.get('is_available', False)
def post_job_to_server(self, input_path, job_list, callback=None):
# Prepare the form data
encoder = MultipartEncoder({

View File

@@ -22,10 +22,9 @@ class AERenderWorker(BaseRenderWorker):
supported_extensions = ['.aep']
engine = AERender
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
parent=parent, name=name)
self.comp = args.get('comp', None)
self.render_settings = args.get('render_settings', None)

View File

@@ -26,6 +26,7 @@ class RenderStatus(Enum):
CANCELLED = "cancelled"
ERROR = "error"
SCHEDULED = "scheduled"
NOT_READY = "not_ready"
UNDEFINED = "undefined"
@@ -51,15 +52,15 @@ class BaseRenderWorker(Base):
project_length = Column(Integer)
start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True)
owner = Column(String)
client = Column(String)
parent = Column(String, nullable=True)
children = Column(String, nullable=True)
name = Column(String)
file_hash = Column(String)
_status = Column(String)
engine = None
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None,
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, parent=None,
name=None):
if not ignore_extensions:
@@ -83,9 +84,9 @@ class BaseRenderWorker(Base):
self.renderer = self.engine.name()
self.renderer_version = self.engine.version()
self.priority = priority
self.owner = owner
self.client = client
self.name = name
self.parent = parent
self.children = None
self.name = name or os.path.basename(input_path)
# Frame Ranges
self.project_length = -1
@@ -98,7 +99,7 @@ class BaseRenderWorker(Base):
self.end_time = None
# History
self.status = RenderStatus.NOT_STARTED
self.status = RenderStatus.NOT_READY
self.warnings = []
self.errors = []
self.failed_attempts = 0
@@ -127,8 +128,7 @@ class BaseRenderWorker(Base):
def status(self):
if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]:
if not hasattr(self, 'errors'):
self._status = RenderStatus.CANCELLED
return RenderStatus.CANCELLED
self._status = RenderStatus.CANCELLED.value
return string_to_status(self._status)
def validate(self):
@@ -184,6 +184,7 @@ class BaseRenderWorker(Base):
return
self.status = RenderStatus.RUNNING
self.start_time = datetime.now()
logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path} | Frame Count: {self.total_frames}')
self.__thread.start()
@@ -202,7 +203,6 @@ class BaseRenderWorker(Base):
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
self.start_time = datetime.now()
with open(self.log_path(), "a") as f:
@@ -298,8 +298,8 @@ class BaseRenderWorker(Base):
'input_path': self.input_path,
'output_path': self.output_path,
'priority': self.priority,
'owner': self.owner,
'client': self.client,
'parent': self.parent,
'children': self.children,
'date_created': self.date_created,
'start_time': self.start_time,
'end_time': self.end_time,
@@ -310,6 +310,8 @@ class BaseRenderWorker(Base):
'renderer': self.renderer,
'renderer_version': self.renderer_version,
'errors': getattr(self, 'errors', None),
'start_frame': self.start_frame,
'end_frame': self.end_frame,
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None),
'log_path': self.log_path()

View File

@@ -13,10 +13,9 @@ class BlenderRenderWorker(BaseRenderWorker):
engine = Blender
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
parent=parent, name=name)
# Args
self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
@@ -112,7 +111,7 @@ class BlenderRenderWorker(BaseRenderWorker):
if self.total_frames <= 1:
return self.__frame_percent_complete
else:
whole_frame_percent = (self.current_frame - 1) / self.total_frames
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames
adjusted_frame_percent = self.__frame_percent_complete / self.total_frames
total_percent = whole_frame_percent + adjusted_frame_percent
return max(total_percent, 0)

View File

@@ -8,10 +8,9 @@ class FFMPEGRenderWorker(BaseRenderWorker):
engine = FFMPEG
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
parent=parent, name=name)
stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",

View File

@@ -10,11 +10,9 @@ class RenderWorkerFactory:
return classes
@staticmethod
def create_worker(renderer, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
def create_worker(renderer, input_path, output_path, args=None, parent=None, name=None):
worker_class = RenderWorkerFactory.class_for_name(renderer)
return worker_class(input_path=input_path, output_path=output_path, args=args, priority=priority, owner=owner,
client=client, name=name)
return worker_class(input_path=input_path, output_path=output_path, args=args, parent=parent, name=name)
@staticmethod
def supported_renderers():