diff --git a/lib/client/dashboard_window.py b/lib/client/dashboard_window.py index f642595..30fe991 100644 --- a/lib/client/dashboard_window.py +++ b/lib/client/dashboard_window.py @@ -10,6 +10,7 @@ from PIL import Image, ImageTk from lib.client.new_job_window import NewJobWindow from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer +from lib.workers.base_worker import RenderStatus from lib.utilities.misc_helper import launch_url, file_exists_in_mounts, get_time_elapsed logger = logging.getLogger() @@ -82,7 +83,7 @@ class DashboardWindow: # Setup the Tree self.job_tree = ttk.Treeview(server_frame, show="headings") - self.job_tree.tag_configure('running', background='lawn green', font=('', 0, 'bold')) + self.job_tree.tag_configure(RenderStatus.RUNNING.value, background='lawn green', font=('', 0, 'bold')) self.job_tree.bind("<>", self.job_picked) self.job_tree["columns"] = ("id", "Name", "Renderer", "Priority", "Status", "Time Elapsed", "Frames", "Date Added", "Parent", "") @@ -206,7 +207,7 @@ class DashboardWindow: def delete_job(self): job_ids = self.selected_job_ids() if len(job_ids) == 1: - job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == job_ids[0]), None) + job = next((d for d in self.current_server_proxy.get_all_jobs() if d.get('id') == job_ids[0]), None) display_name = job['name'] or os.path.basename(job['input_path']) message = f"Are you sure you want to delete the job:\n{display_name}?" else: @@ -250,9 +251,9 @@ class DashboardWindow: self.set_image(self.default_image) # update button status - current_jobs = self.current_server_proxy.get_jobs() or [] + current_jobs = self.current_server_proxy.get_all_jobs() or [] job = next((d for d in current_jobs if d.get('id') == job_id), None) - stop_button_state = 'normal' if job and job['status'] == 'running' else 'disabled' + stop_button_state = 'normal' if job and job['status'] == RenderStatus.RUNNING.value else 'disabled' self.stop_button.config(state=stop_button_state) generic_button_state = 'normal' if job else 'disabled' @@ -261,16 +262,16 @@ class DashboardWindow: self.logs_button.config(state=generic_button_state) def show_files(self): - output_path = None - if self.selected_job_ids(): - job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == self.selected_job_ids()[0]), None) - output_path = os.path.dirname(job['output_path']) # check local filesystem - if not os.path.exists(output_path): - output_path = file_exists_in_mounts(output_path) # check any attached network shares - if output_path: - launch_url(output_path) - else: - messagebox.showerror("File Not Found", "The file could not be found. Check your network mounts.") + if not self.selected_job_ids(): + return + + job = next((d for d in self.current_server_proxy.get_all_jobs() if d.get('id') == self.selected_job_ids()[0]), None) + output_path = os.path.dirname(job['output_path']) # check local filesystem + if not os.path.exists(output_path): + output_path = file_exists_in_mounts(output_path) # check any attached network shares + if not output_path: + return messagebox.showerror("File Not Found", "The file could not be found. Check your network mounts.") + launch_url(output_path) def open_logs(self): if self.selected_job_ids(): @@ -338,21 +339,24 @@ class DashboardWindow: if clear_table: self.job_tree.delete(*self.job_tree.get_children()) - job_fetch = self.current_server_proxy.get_jobs(ignore_token=clear_table) + job_fetch = self.current_server_proxy.get_all_jobs(ignore_token=clear_table) if job_fetch: for job in job_fetch: - display_status = job['status'] if job['status'] != 'running' else \ + display_status = job['status'] if job['status'] != RenderStatus.RUNNING.value else \ ('%.0f%%' % (job['percent_complete'] * 100)) # if running, show percent, otherwise just show status tags = (job['status'],) start_time = datetime.datetime.fromisoformat(job['start_time']) if job['start_time'] else None end_time = datetime.datetime.fromisoformat(job['end_time']) if job['end_time'] else None + time_elapsed = "" if (job['status'] != RenderStatus.RUNNING.value and not end_time) else \ + get_time_elapsed(start_time, end_time) + values = (job['id'], job['name'] or os.path.basename(job['input_path']), job['renderer'] + "-" + job['renderer_version'], job['priority'], display_status, - get_time_elapsed(start_time, end_time), + time_elapsed, job['total_frames'], job['date_created'], job['parent']) diff --git a/lib/server/api_server.py b/lib/server/api_server.py index db30efc..9351898 100755 --- a/lib/server/api_server.py +++ b/lib/server/api_server.py @@ -193,8 +193,7 @@ def make_job_ready(job_id): found_job = RenderQueue.job_with_id(job_id) if found_job.status in [RenderStatus.NOT_READY, RenderStatus.NOT_STARTED]: if found_job.children: - for child_name in found_job.children.split(','): - child_id, hostname = child_name.split('@') + for hostname, child_id in found_job.children.items(): RenderServerProxy(hostname).request_data(f'/api/job//make_ready') found_job.status = RenderStatus.NOT_STARTED RenderQueue.save_state() @@ -323,7 +322,7 @@ def add_job_handler(): logger.info(f"Attempting to download URL: {project_url}") try: downloaded_file_url, info = urlretrieve(project_url) - referred_name = info.get_filename() + referred_name = info.get_filename() or os.path.basename(project_url) except Exception as e: err_msg = f"Error downloading file: {e}" logger.error(err_msg) @@ -507,7 +506,8 @@ def create_subjobs(worker, job_data, project_path): # start subjobs logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs") - worker.children = ",".join([f"{results['id']}@{hostname}" for hostname, results in submission_results.items()]) + for hostname, results in submission_results.items(): + worker.children[hostname] = results['id'] worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" except Exception as e: diff --git a/lib/server/server_proxy.py b/lib/server/server_proxy.py index 858ebf7..e0e6192 100644 --- a/lib/server/server_proxy.py +++ b/lib/server/server_proxy.py @@ -83,7 +83,10 @@ class RenderServerProxy: def stop_background_update(self): self.__update_in_background = False - def get_jobs(self, timeout=5, ignore_token=False): + def get_job_info(self, job_id, timeout=5): + return self.request_data(f'job/{job_id}', timeout=timeout) + + def get_all_jobs(self, timeout=5, ignore_token=False): if not self.__update_in_background or ignore_token: self.__update_job_cache(timeout, ignore_token) return self.__jobs_cache.copy() if self.__jobs_cache else None @@ -130,3 +133,17 @@ class RenderServerProxy: response = requests.post(f'http://{self.hostname}:{self.port}/api/add_job', data=monitor, headers=headers) return response + + def get_job_files(self, job_id, save_path): + url = f"http://{self.hostname}:{self.port}/api/job/{job_id}/download_all" + return self.download_file(url, filename=save_path) + + @staticmethod + def download_file(url, filename): + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(filename, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + return filename + diff --git a/lib/utilities/ffmpeg_helper.py b/lib/utilities/ffmpeg_helper.py index c79e2cf..eda5c69 100644 --- a/lib/utilities/ffmpeg_helper.py +++ b/lib/utilities/ffmpeg_helper.py @@ -2,10 +2,9 @@ import subprocess from lib.engines.ffmpeg_engine import FFMPEG -def image_sequence_to_video(source_glob_pattern, output_path, framerate=24, encoder="libx264", pix_fmt="yuv420p"): +def image_sequence_to_video(source_glob_pattern, output_path, framerate=24, encoder="prores_ks", profile=4): subprocess.run([FFMPEG.renderer_path(), "-framerate", str(framerate), "-i", f"{source_glob_pattern}", - "-c:v", encoder, "-pix_fmt", pix_fmt, output_path], stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, check=True) + "-c:v", encoder, "-profile:v", str(profile), '-pix_fmt', 'yuva444p10le', output_path], check=True) def save_first_frame(source_path, dest_path, max_width=1280): diff --git a/lib/workers/base_worker.py b/lib/workers/base_worker.py index ccbb64c..9cca1b3 100644 --- a/lib/workers/base_worker.py +++ b/lib/workers/base_worker.py @@ -1,18 +1,20 @@ #!/usr/bin/env python3 import io +import json import logging import os import subprocess import threading -import json +import time +import zipfile from datetime import datetime from enum import Enum -from sqlalchemy import Column, Integer, String, DateTime -from sqlalchemy.ext.declarative import declarative_base -from lib.utilities.misc_helper import get_time_elapsed - import psutil +from sqlalchemy import Column, Integer, String, DateTime, JSON +from sqlalchemy.ext.declarative import declarative_base + +from lib.utilities.misc_helper import get_time_elapsed logger = logging.getLogger() Base = declarative_base() @@ -25,6 +27,7 @@ class RenderStatus(Enum): CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" + WAITING = "waiting" NOT_READY = "not_ready" UNDEFINED = "undefined" @@ -52,7 +55,7 @@ class BaseRenderWorker(Base): start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) parent = Column(String, nullable=True) - children = Column(String, nullable=True) + children = Column(JSON) name = Column(String) file_hash = Column(String) _status = Column(String) @@ -84,7 +87,7 @@ class BaseRenderWorker(Base): self.renderer_version = self.engine.version() self.priority = priority self.parent = parent - self.children = None + self.children = {} self.name = name or os.path.basename(input_path) # Frame Ranges @@ -101,14 +104,11 @@ class BaseRenderWorker(Base): self.status = RenderStatus.NOT_READY self.warnings = [] self.errors = [] - self.failed_attempts = 0 - self.maximum_attempts = 1 # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None - self.is_finished = False self.last_output = None @property @@ -192,56 +192,88 @@ class BaseRenderWorker(Base): log_dir = os.path.dirname(self.log_path()) os.makedirs(log_dir, exist_ok=True) - while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: + # Start process and get updates + subprocess_cmds = self.generate_subprocess() + logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) + self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=False) - if self.failed_attempts: - logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}') + with open(self.log_path(), "a") as f: - # Start process and get updates - subprocess_cmds = self.generate_subprocess() - logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) - self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - universal_newlines=False) + f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " + f"Render for {self.input_path}") + f.write(f"Running command: {' '.join(subprocess_cmds)}\n") + for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding + f.write(c) + logger.debug(f"{self.engine.name()}Worker: {c.strip()}") + self.last_output = c.strip() + self._parse_stdout(c.strip()) + f.write('\n') - with open(self.log_path(), "a") as f: - - f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " - f"Render for {self.input_path}") - f.write(f"Running command: {' '.join(subprocess_cmds)}\n") - for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding - f.write(c) - logger.debug(f"{self.engine.name()}Worker: {c.strip()}") - self.last_output = c.strip() - self._parse_stdout(c.strip()) - f.write('\n') - - # Check return codes - return_code = self.__process.wait() - self.end_time = datetime.now() - # Return early if job was cancelled - if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: - self.is_finished = True - return - - if return_code: - message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}" - logger.error(message) - self.failed_attempts = self.failed_attempts + 1 - else: - message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}" - logger.info(message) - self.status = RenderStatus.COMPLETED + # Check return codes + return_code = self.__process.wait() + self.end_time = datetime.now() + # Return early if job was cancelled + if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: + return + if return_code: + message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}" + logger.error(message) f.write(message) + self.status = RenderStatus.ERROR + if not self.errors: + self.errors = [message] + return - if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED: - logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path, - self.failed_attempts)) - self.status = RenderStatus.ERROR - if not self.errors: - self.errors = [self.last_output] - self.is_finished = True + message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}" + logger.info(message) + f.write(message) + + from lib.server.server_proxy import RenderServerProxy + + # Wait on children jobs, if necessary + if self.children: + self.status = RenderStatus.WAITING + subjobs_still_running = self.children.copy() + while len(subjobs_still_running): + for hostname, job_id in subjobs_still_running.copy().items(): + proxy = RenderServerProxy(hostname) + response = proxy.get_job_info(job_id) + if not response: + logger.warning(f"No response from: {hostname}") + else: + status = string_to_status(response.get('status', '')) + status_msg = f"Subjob {job_id}@{hostname} | Status: {status} | {response.get('percent_complete')}%" + + if status in [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]: + logger.info(f"Downloading completed subjob files from {hostname} to localhost") + try: + zip_file_path = self.output_path + f'_{hostname}_{job_id}.zip' + proxy.get_job_files(job_id, zip_file_path) + logger.debug("Zip file download successfully - Preparing to unzip.") + extract_path = os.path.dirname(zip_file_path) + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_path) + logger.info(f"Successfully extracted zip to: {extract_path}") + os.remove(zip_file_path) + except Exception as e: + err_msg = f"Error transferring output from subjob {job_id}@{hostname}: {e}" + logger.exception(err_msg) + self.errors.append(err_msg) + finally: + subjobs_still_running.pop(hostname) + else: + logger.debug(status_msg) + logger.debug(f"Waiting on {len(subjobs_still_running)} subjobs on {', '.join(list(subjobs_still_running.keys()))}") + time.sleep(5) + + logger.info("All subjobs complete") + + # Post Render Work + logger.debug("Starting post-processing work") self.post_processing() + self.status = RenderStatus.COMPLETED def post_processing(self): pass @@ -260,7 +292,6 @@ class BaseRenderWorker(Base): def stop(self, is_error=False): if hasattr(self, '__process'): try: - self.maximum_attempts = 0 process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() @@ -285,10 +316,13 @@ class BaseRenderWorker(Base): return get_time_elapsed(self.start_time, self.end_time) def file_list(self): - job_dir = os.path.dirname(self.output_path) - file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)] - file_list.sort() - return file_list + try: + job_dir = os.path.dirname(self.output_path) + file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)] + file_list.sort() + return file_list + except FileNotFoundError: + return [] def json(self): job_dict = { diff --git a/lib/workers/blender_worker.py b/lib/workers/blender_worker.py index dc48ee2..61e4bd8 100644 --- a/lib/workers/blender_worker.py +++ b/lib/workers/blender_worker.py @@ -1,12 +1,10 @@ #!/usr/bin/env python3 -import json import re -try: - from .base_worker import * -except ImportError: - from base_worker import * +from collections import Counter -from ..engines.blender_engine import Blender +from lib.engines.blender_engine import Blender +from lib.utilities.ffmpeg_helper import image_sequence_to_video +from lib.workers.base_worker import * class BlenderRenderWorker(BaseRenderWorker): @@ -42,9 +40,7 @@ class BlenderRenderWorker(BaseRenderWorker): if self.camera: cmd.extend(['--python-expr', f"import bpy;bpy.context.scene.camera = bpy.data.objects['{self.camera}'];"]) - # add dash at end of given path to separate frame numbers - path_with_ending_dash = os.path.splitext(self.output_path)[0] + "-" + os.path.splitext(self.output_path)[1] - cmd.extend(['-E', self.blender_engine, '-o', path_with_ending_dash, '-F', self.export_format]) + cmd.extend(['-E', self.blender_engine, '-o', self.output_path, '-F', self.export_format]) # set frame range cmd.extend(['-s', self.start_frame, '-e', self.end_frame, '-a']) @@ -117,19 +113,20 @@ class BlenderRenderWorker(BaseRenderWorker): return max(total_percent, 0) def post_processing(self): - output_dir = os.listdir(os.path.dirname(self.output_path)) - if self.total_frames > 1 and len(output_dir) > 1: - from ..utilities.ffmpeg_helper import image_sequence_to_video + + def most_common_extension(file_paths): + extensions = [os.path.splitext(path)[1] for path in file_paths] + counter = Counter(extensions) + most_common_ext, _ = counter.most_common(1)[0] + return most_common_ext + + output_dir_files = os.listdir(os.path.dirname(self.output_path)) + if self.total_frames > 1 and len(output_dir_files) > 1: logger.info("Generating preview for image sequence") - - # get proper file extension - path_with_ending_dash = os.path.splitext(self.output_path)[0] + "-" - found_output = next(obj for obj in output_dir if os.path.basename(path_with_ending_dash) in obj) - glob_pattern = path_with_ending_dash + '%04d' + ('.' + found_output.split('.')[-1] if found_output else "") - try: - image_sequence_to_video(source_glob_pattern=glob_pattern, - output_path=self.output_path + '.mp4', + pattern = self.output_path + "%04d" + most_common_extension(output_dir_files) + image_sequence_to_video(source_glob_pattern=pattern, + output_path=self.output_path + '.mov', framerate=self.scene_info['fps']) logger.info('Successfully generated preview video from image sequence') except Exception as e: