diff --git a/config/config.yaml b/config/config.yaml index 7839071..31222f7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -4,4 +4,5 @@ server_log_level: info flask_log_level: error flask_debug_enable: false queue_eval_seconds: 1 -port_number: 8080 \ No newline at end of file +port_number: 8080 +enable_split_jobs: true \ No newline at end of file diff --git a/lib/client/dashboard_window.py b/lib/client/dashboard_window.py index 23dafd4..f642595 100644 --- a/lib/client/dashboard_window.py +++ b/lib/client/dashboard_window.py @@ -85,7 +85,7 @@ class DashboardWindow: self.job_tree.tag_configure('running', background='lawn green', font=('', 0, 'bold')) self.job_tree.bind("<>", self.job_picked) self.job_tree["columns"] = ("id", "Name", "Renderer", "Priority", "Status", "Time Elapsed", "Frames", - "Date Added", "Owner", "") + "Date Added", "Parent", "") # Format the columns self.job_tree.column("id", width=0, stretch=False) @@ -96,7 +96,7 @@ class DashboardWindow: self.job_tree.column("Time Elapsed", width=100, stretch=False) self.job_tree.column("Frames", width=50, stretch=False) self.job_tree.column("Date Added", width=150, stretch=True) - self.job_tree.column("Owner", width=250, stretch=True) + self.job_tree.column("Parent", width=250, stretch=True) # Create the column headings for name in self.job_tree['columns']: @@ -200,7 +200,7 @@ class DashboardWindow: def stop_job(self): job_ids = self.selected_job_ids() for job_id in job_ids: - self.current_server_proxy.request_data(f'job/{job_id}/cancel?confirm=true') + self.current_server_proxy.cancel_job(job_id, confirm=True) self.update_jobs(clear_table=True) def delete_job(self): @@ -250,7 +250,8 @@ class DashboardWindow: self.set_image(self.default_image) # update button status - job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == job_id), None) + current_jobs = self.current_server_proxy.get_jobs() or [] + job = next((d for d in current_jobs if d.get('id') == job_id), None) stop_button_state = 'normal' if job and job['status'] == 'running' else 'disabled' self.stop_button.config(state=stop_button_state) @@ -304,11 +305,14 @@ class DashboardWindow: new_proxy.start_background_update() self.server_proxies[hostname] = new_proxy - for hostname, proxy in self.server_proxies.items(): - if hostname not in self.server_tree.get_children(): - self.server_tree.insert("", tk.END, iid=hostname, values=(hostname, proxy.status(), )) - else: - update_row(self.server_tree, hostname, new_values=(hostname, proxy.status())) + try: + for hostname, proxy in self.server_proxies.items(): + if hostname not in self.server_tree.get_children(): + self.server_tree.insert("", tk.END, iid=hostname, values=(hostname, proxy.status(), )) + else: + update_row(self.server_tree, hostname, new_values=(hostname, proxy.status())) + except RuntimeError: + pass # remove any servers that don't belong for row in self.server_tree.get_children(): @@ -351,7 +355,7 @@ class DashboardWindow: get_time_elapsed(start_time, end_time), job['total_frames'], job['date_created'], - job['owner']) + job['parent']) try: if self.job_tree.exists(job['id']): update_row(self.job_tree, job['id'], new_values=values, tags=tags) diff --git a/lib/render_queue.py b/lib/render_queue.py index 215ea68..e72b2b0 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -67,7 +67,7 @@ class RenderQueue: @classmethod def clear_history(cls): to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] + RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_to_remove in to_remove: cls.delete_job(job_to_remove) cls.save_state() @@ -80,18 +80,19 @@ class RenderQueue: def save_state(cls): cls.session.commit() + @classmethod + def is_available_for_job(cls, renderer, priority): + instances = cls.renderer_instances() + higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] + max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1) + return not max_renderers and not higher_priority_jobs + @classmethod def evaluate_queue(cls): not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) - if not_started: - for job in not_started: - instances = cls.renderer_instances() - higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] - max_renderers = job.renderer in instances.keys() and instances[ - job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1) - - if not max_renderers and not higher_priority_jobs: - cls.start_job(job) + for job in not_started: + if cls.is_available_for_job(job.renderer, job.priority): + cls.start_job(job) scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: @@ -106,6 +107,7 @@ class RenderQueue: def start_job(cls, job): logger.info(f'Starting render: {job.name} - Priority {job.priority}') job.start() + cls.save_state() @classmethod def cancel_job(cls, job): diff --git a/lib/server/api_server.py b/lib/server/api_server.py index 4f408c3..db30efc 100755 --- a/lib/server/api_server.py +++ b/lib/server/api_server.py @@ -20,10 +20,11 @@ from flask import Flask, request, render_template, send_file, after_this_request from werkzeug.utils import secure_filename from lib.render_queue import RenderQueue, JobNotFoundError -from lib.workers.base_worker import string_to_status, RenderStatus -from lib.workers.worker_factory import RenderWorkerFactory +from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer from lib.utilities.server_helper import generate_thumbnail_for_job +from lib.workers.base_worker import string_to_status, RenderStatus +from lib.workers.worker_factory import RenderWorkerFactory logger = logging.getLogger() server = Flask(__name__, template_folder='templates', static_folder='static') @@ -186,6 +187,23 @@ def get_file_list(job_id): return RenderQueue.job_with_id(job_id).file_list() +@server.get('/api/job//make_ready') +def make_job_ready(job_id): + try: + found_job = RenderQueue.job_with_id(job_id) + if found_job.status in [RenderStatus.NOT_READY, RenderStatus.NOT_STARTED]: + if found_job.children: + for child_name in found_job.children.split(','): + child_id, hostname = child_name.split('@') + RenderServerProxy(hostname).request_data(f'/api/job//make_ready') + found_job.status = RenderStatus.NOT_STARTED + RenderQueue.save_state() + return found_job.json(), 200 + except Exception as e: + return "Error making job ready: {e}", 500 + return "Not valid command", 405 + + @server.route('/api/job//download_all') def download_all(job_id): zip_filename = None @@ -239,9 +257,31 @@ def snapshot(): return server_data +@server.get('/api/_detected_clients') +def detected_clients(): + # todo: dev/debug only. Should not ship this - probably. + return server.config['ZEROCONF_SERVER'].found_clients() + + +@server.route('/api/is_available_for_job', methods=['POST', 'GET']) +def available_for_job(): + """ + Check queue to see if it can take a job with a given renderer and priority + """ + renderer = request.args.get('renderer') + priority = request.args.get('priority') + + if not renderer or not priority: + return {"error": "Both 'renderer' and 'priority' parameters are required"}, 400 + elif renderer not in RenderWorkerFactory.supported_renderers(): + return {"error": f"Unsupported renderer: {renderer}"}, 400 + else: + return {"is_available": RenderQueue.is_available_for_job(renderer, priority), + 'renderer': renderer, 'priority': priority}, 200 + + @server.post('/api/add_job') def add_job_handler(): - # initial handling of raw data try: if request.is_json: @@ -253,11 +293,11 @@ def add_job_handler(): form_dict = {k: v for k, v in dict(request.form).items() if v} args = {} arg_keys = [k for k in form_dict.keys() if '-arg_' in k] - for key in arg_keys: - if form_dict['renderer'] in key or 'AnyRenderer' in key: - cleaned_key = key.split('-arg_')[-1] - args[cleaned_key] = form_dict[key] - form_dict.pop(key) + for server_hostname in arg_keys: + if form_dict['renderer'] in server_hostname or 'AnyRenderer' in server_hostname: + cleaned_key = server_hostname.split('-arg_')[-1] + args[cleaned_key] = form_dict[server_hostname] + form_dict.pop(server_hostname) args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] @@ -269,6 +309,7 @@ def add_job_handler(): # start handling project files try: # handle uploaded files + logger.debug(f"Incoming new job request: {jobs_list}") uploaded_project = request.files.get('file', None) project_url = jobs_list[0].get('url', None) input_path = jobs_list[0].get('input_path', None) @@ -342,31 +383,36 @@ def add_job_handler(): # create and add jobs to render queue results = [] - for job in jobs_list: + for job_data in jobs_list: try: # prepare output paths - output_dir = os.path.join(job_dir, job.get('name', None) or 'output') + output_dir = os.path.join(job_dir, job_data.get('name', None) or 'output') os.makedirs(output_dir, exist_ok=True) # get new output path in output_dir - job['output_path'] = os.path.join(output_dir, os.path.basename( - job.get('name', None) or job.get('output_path', None) or loaded_project_local_path + job_data['output_path'] = os.path.join(output_dir, os.path.basename( + job_data.get('name', None) or job_data.get('output_path', None) or loaded_project_local_path )) # create & configure jobs - render_job = RenderWorkerFactory.create_worker(renderer=job['renderer'], + worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'], input_path=loaded_project_local_path, - output_path=job["output_path"], - args=job.get('args', {})) - render_job.client = server.config['HOSTNAME'] - render_job.owner = job.get("owner", render_job.owner) - render_job.name = job.get("name", render_job.name) - render_job.priority = int(job.get('priority', render_job.priority)) - render_job.start_frame = job.get("start_frame", render_job.start_frame) - render_job.end_frame = job.get("end_frame", render_job.end_frame) + output_path=job_data["output_path"], + args=job_data.get('args', {})) + worker.status = job_data.get("initial_status", worker.status) + worker.parent = job_data.get("parent", worker.parent) + worker.name = job_data.get("name", worker.name) + worker.priority = int(job_data.get('priority', worker.priority)) + worker.start_frame = job_data.get("start_frame", worker.start_frame) + worker.end_frame = job_data.get("end_frame", worker.end_frame) - RenderQueue.add_to_render_queue(render_job, force_start=job.get('force_start', False)) - results.append(render_job.json()) + # determine if we can / should split the job + if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent: + create_subjobs(worker, job_data, loaded_project_local_path) + + RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + make_job_ready(worker.id) + results.append(worker.json()) except Exception as e: err_msg = f"Error creating render job: {e}" logger.error(err_msg) @@ -388,6 +434,89 @@ def add_job_handler(): return 'unknown error', 500 +def create_subjobs(worker, job_data, project_path): + + # Check availablity + local_hostname = server.config['HOSTNAME'] + found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x] + available_servers = [local_hostname] + [hostname for hostname in found_servers if + RenderServerProxy(hostname).is_available_for_job(renderer=worker.renderer, + priority=worker.priority)] + + if len(available_servers) <= 1: + logger.debug("No available servers to split job with. Skipping subjob creation.") + return + + logger.info(f"Found {len(available_servers) - 1} additional available servers | " + f"Breaking up job into {len(available_servers)} jobs") + logger.debug(f"Available servers: {available_servers}") + + def divide_frames(start_frame, end_frame, num_servers): + frame_range = end_frame - start_frame + 1 + frames_per_server = frame_range // num_servers + leftover_frames = frame_range % num_servers + + ranges = [] + current_start = start_frame + for i in range(num_servers): + current_end = current_start + frames_per_server - 1 + if leftover_frames > 0: + current_end += 1 + leftover_frames -= 1 + if current_start <= current_end: + ranges.append((current_start, current_end)) + current_start = current_end + 1 + + return ranges + + # Calculate respective frames for each server + server_frame_ranges = {} + for idx, frame_range in enumerate(divide_frames(worker.start_frame, worker.end_frame, len(available_servers))): + server_frame_ranges[available_servers[idx]] = frame_range + + logger.info(f"Job {worker.id} split plan: {server_frame_ranges}") + + # Prep and submit these sub-jobs + submission_results = {} + try: + for server_hostname, frame_range in server_frame_ranges.items(): + if server_hostname != local_hostname: + subjob = job_data.copy() + subjob['name'] = f"{worker.name}[{frame_range[0]}-{frame_range[-1]}]" + subjob['parent'] = f"{worker.id}@{local_hostname}" + subjob['start_frame'] = frame_range[0] + subjob['end_frame'] = frame_range[-1] + + logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" + f"{subjob['end_frame']} to {server_hostname}") + post_results = RenderServerProxy(server_hostname).post_job_to_server( + input_path=project_path, job_list=[subjob]) + if post_results.ok: + submission_results[server_hostname] = post_results.json()[0] + else: + logger.error(f"Failed to create subjob on {server_hostname}") + break + + # check that job posts were all successful. + if len(submission_results) != (len(server_frame_ranges) - 1): + raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs + + # truncate parent render_job + worker.end_frame = min(server_frame_ranges[local_hostname][-1], worker.end_frame) + logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") + + # start subjobs + logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs") + worker.children = ",".join([f"{results['id']}@{hostname}" for hostname, results in submission_results.items()]) + worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" + + except Exception as e: + # cancel all the subjobs + logger.error(f"Failed to split job into subjobs: {e}") + logger.debug(f"Cancelling {len(server_frame_ranges) - 1} attempted subjobs") + [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()] + + @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): @@ -506,6 +635,7 @@ def start_server(background_thread=False): server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder']) server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs') server.config['MAX_CONTENT_PATH'] = config['max_content_path'] + server.config['enable_split_jobs'] = config.get('enable_split_jobs', False) # disable most Flask logging flask_log = logging.getLogger('werkzeug') @@ -520,6 +650,7 @@ def start_server(background_thread=False): logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") zeroconf_server = ZeroconfServer("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) zeroconf_server.start() + server.config['ZEROCONF_SERVER'] = zeroconf_server try: if background_thread: @@ -531,4 +662,5 @@ def start_server(background_thread=False): server.run(host='0.0.0.0', port=server.config['PORT'], debug=config.get('flask_debug_enable', False), use_reloader=False, threaded=True) finally: + RenderQueue.save_state() zeroconf_server.stop() diff --git a/lib/server/server_proxy.py b/lib/server/server_proxy.py index ca8d539..858ebf7 100644 --- a/lib/server/server_proxy.py +++ b/lib/server/server_proxy.py @@ -84,7 +84,7 @@ class RenderServerProxy: self.__update_in_background = False def get_jobs(self, timeout=5, ignore_token=False): - if not self.__update_in_background: + if not self.__update_in_background or ignore_token: self.__update_job_cache(timeout, ignore_token) return self.__jobs_cache.copy() if self.__jobs_cache else None @@ -104,6 +104,14 @@ class RenderServerProxy: all_data = self.request_data('full_status', timeout=timeout) return all_data + def cancel_job(self, job_id, confirm=False): + return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') + + def is_available_for_job(self, renderer, priority=2): + request_data = self.request_data(f'is_available_for_job?renderer={renderer}&priority={priority}', + timeout=1) or {} + return request_data.get('is_available', False) + def post_job_to_server(self, input_path, job_list, callback=None): # Prepare the form data encoder = MultipartEncoder({ diff --git a/lib/workers/aerender_worker.py b/lib/workers/aerender_worker.py index 7dbe604..4079152 100644 --- a/lib/workers/aerender_worker.py +++ b/lib/workers/aerender_worker.py @@ -22,10 +22,9 @@ class AERenderWorker(BaseRenderWorker): supported_extensions = ['.aep'] engine = AERender - def __init__(self, input_path, output_path, priority=2, args=None, owner=None, - client=None, name=None): + def __init__(self, input_path, output_path, args=None, parent=None, name=None): super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, - client=client, priority=priority, owner=owner, name=name) + parent=parent, name=name) self.comp = args.get('comp', None) self.render_settings = args.get('render_settings', None) diff --git a/lib/workers/base_worker.py b/lib/workers/base_worker.py index ec9afe7..35306b5 100644 --- a/lib/workers/base_worker.py +++ b/lib/workers/base_worker.py @@ -26,6 +26,7 @@ class RenderStatus(Enum): CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" + NOT_READY = "not_ready" UNDEFINED = "undefined" @@ -51,15 +52,15 @@ class BaseRenderWorker(Base): project_length = Column(Integer) start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) - owner = Column(String) - client = Column(String) + parent = Column(String, nullable=True) + children = Column(String, nullable=True) name = Column(String) file_hash = Column(String) _status = Column(String) engine = None - def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None, + def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, parent=None, name=None): if not ignore_extensions: @@ -83,9 +84,9 @@ class BaseRenderWorker(Base): self.renderer = self.engine.name() self.renderer_version = self.engine.version() self.priority = priority - self.owner = owner - self.client = client - self.name = name + self.parent = parent + self.children = None + self.name = name or os.path.basename(input_path) # Frame Ranges self.project_length = -1 @@ -98,7 +99,7 @@ class BaseRenderWorker(Base): self.end_time = None # History - self.status = RenderStatus.NOT_STARTED + self.status = RenderStatus.NOT_READY self.warnings = [] self.errors = [] self.failed_attempts = 0 @@ -127,8 +128,7 @@ class BaseRenderWorker(Base): def status(self): if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]: if not hasattr(self, 'errors'): - self._status = RenderStatus.CANCELLED - return RenderStatus.CANCELLED + self._status = RenderStatus.CANCELLED.value return string_to_status(self._status) def validate(self): @@ -184,6 +184,7 @@ class BaseRenderWorker(Base): return self.status = RenderStatus.RUNNING + self.start_time = datetime.now() logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path} | Frame Count: {self.total_frames}') self.__thread.start() @@ -202,7 +203,6 @@ class BaseRenderWorker(Base): logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) - self.start_time = datetime.now() with open(self.log_path(), "a") as f: @@ -298,8 +298,8 @@ class BaseRenderWorker(Base): 'input_path': self.input_path, 'output_path': self.output_path, 'priority': self.priority, - 'owner': self.owner, - 'client': self.client, + 'parent': self.parent, + 'children': self.children, 'date_created': self.date_created, 'start_time': self.start_time, 'end_time': self.end_time, @@ -310,6 +310,8 @@ class BaseRenderWorker(Base): 'renderer': self.renderer, 'renderer_version': self.renderer_version, 'errors': getattr(self, 'errors', None), + 'start_frame': self.start_frame, + 'end_frame': self.end_frame, 'total_frames': self.total_frames, 'last_output': getattr(self, 'last_output', None), 'log_path': self.log_path() diff --git a/lib/workers/blender_worker.py b/lib/workers/blender_worker.py index d4285c4..dc48ee2 100644 --- a/lib/workers/blender_worker.py +++ b/lib/workers/blender_worker.py @@ -13,10 +13,9 @@ class BlenderRenderWorker(BaseRenderWorker): engine = Blender - def __init__(self, input_path, output_path, priority=2, args=None, owner=None, - client=None, name=None): + def __init__(self, input_path, output_path, args=None, parent=None, name=None): super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, - client=client, priority=priority, owner=owner, name=name) + parent=parent, name=name) # Args self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper() @@ -112,7 +111,7 @@ class BlenderRenderWorker(BaseRenderWorker): if self.total_frames <= 1: return self.__frame_percent_complete else: - whole_frame_percent = (self.current_frame - 1) / self.total_frames + whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames adjusted_frame_percent = self.__frame_percent_complete / self.total_frames total_percent = whole_frame_percent + adjusted_frame_percent return max(total_percent, 0) diff --git a/lib/workers/ffmpeg_worker.py b/lib/workers/ffmpeg_worker.py index 4f63be4..4522dc3 100644 --- a/lib/workers/ffmpeg_worker.py +++ b/lib/workers/ffmpeg_worker.py @@ -8,10 +8,9 @@ class FFMPEGRenderWorker(BaseRenderWorker): engine = FFMPEG - def __init__(self, input_path, output_path, priority=2, args=None, owner=None, - client=None, name=None): + def __init__(self, input_path, output_path, args=None, parent=None, name=None): super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, - client=client, priority=priority, owner=owner, name=name) + parent=parent, name=name) stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105 input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y", diff --git a/lib/workers/worker_factory.py b/lib/workers/worker_factory.py index 4d40220..d4a95f7 100644 --- a/lib/workers/worker_factory.py +++ b/lib/workers/worker_factory.py @@ -10,11 +10,9 @@ class RenderWorkerFactory: return classes @staticmethod - def create_worker(renderer, input_path, output_path, priority=2, args=None, owner=None, - client=None, name=None): + def create_worker(renderer, input_path, output_path, args=None, parent=None, name=None): worker_class = RenderWorkerFactory.class_for_name(renderer) - return worker_class(input_path=input_path, output_path=output_path, args=args, priority=priority, owner=owner, - client=client, name=name) + return worker_class(input_path=input_path, output_path=output_path, args=args, parent=parent, name=name) @staticmethod def supported_renderers():