14 Commits

Author SHA1 Message Date
Brett Williams
a76b0340f9 Disable parts of add_job UI when missing critical project data instead of crashing 2024-08-04 12:06:29 -05:00
Brett Williams
f9c114bf32 Add UI options for aerender 2024-08-04 10:00:16 -05:00
Brett Williams
dad9b8c250 Misc improvements 2024-08-04 01:19:22 -05:00
Brett Williams
8826382f86 Misc worker cleanup 2024-08-04 01:03:24 -05:00
Brett Williams
58822c4a20 get_project_info gets comp names from .aepx files now 2024-08-04 01:01:21 -05:00
Brett Williams
dc7f3877b2 Cleanup subprocess generation 2024-08-03 21:23:25 -05:00
Brett Williams
b1280ad445 Add AERender to supported_engines in EngineManager 2024-08-03 21:03:01 -05:00
Brett Williams
0cebb93ba2 Merge remote-tracking branch 'origin/feature/84-after-effects-support' into feature/84-after-effects-support 2024-08-03 20:56:54 -05:00
Brett Williams
a2785400ac Fix generate_worker_subprocess in aerender_worker.py 2024-08-03 20:55:48 -05:00
Brett Williams
d9201b5082 Changes to engine file extensions structure 2024-08-03 20:55:48 -05:00
Brett Williams
7d633d97c2 Fix getting path to After Effects 2024-08-03 20:55:48 -05:00
Brett Williams
e6e2ff8e07 Fix generate_worker_subprocess in aerender_worker.py 2024-08-03 20:45:43 -05:00
Brett Williams
7986960b21 Changes to engine file extensions structure 2024-08-03 20:21:26 -05:00
Brett Williams
ebb847b09e Fix getting path to After Effects 2024-08-03 20:00:36 -05:00
29 changed files with 522 additions and 1125 deletions

View File

@@ -3,7 +3,7 @@ update_engines_on_launch: true
max_content_path: 100000000
server_log_level: info
log_buffer_length: 250
worker_process_timeout: 120
subjob_connection_timeout: 120
flask_log_level: error
flask_debug_enable: false
queue_eval_seconds: 1

View File

@@ -1,279 +0,0 @@
#!/usr/bin/env python3
import datetime
import os.path
import socket
import threading
import time
import traceback
from rich import box
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Column
from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from src.engines.core.base_worker import RenderStatus, string_to_status
from src.api.server_proxy import RenderServerProxy
from src.utilities.misc_helper import get_time_elapsed
from start_server import start_server
"""
The RenderDashboard is designed to be run on a remote machine or on the local server
This provides a detailed status of all jobs running on the server
"""
status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green',
RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple',
RenderStatus.RUNNING: 'cyan'}
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED]
renderer_colors = {'ffmpeg': '[magenta]', 'blender': '[orange1]', 'aerender': '[purple]'}
local_hostname = socket.gethostname()
def status_string_to_color(status_string):
job_status = string_to_status(status_string)
job_color = '[{}]'.format(status_colors[job_status])
return job_color
def sorted_jobs(all_jobs):
sort_by_date = True
if not sort_by_date:
sorted_job_list = []
if all_jobs:
for status_category in categories:
found_jobs = [x for x in all_jobs if x['status'] == status_category.value]
if found_jobs:
sorted_found_jobs = sorted(found_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True)
sorted_job_list.extend(sorted_found_jobs)
else:
sorted_job_list = sorted(all_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True)
return sorted_job_list
def create_node_tree(all_server_data) -> Tree:
main_tree = Tree("[magenta]Server Cluster")
for server_host, server_data in all_server_data['servers'].items():
node_title_local = f"[cyan bold]{server_host}[/] [yellow](This Computer)[default]"
node_title_remote = f"[cyan]{server_host} [magenta](Remote)[default]"
node_tree_text = node_title_local if (server_host == local_hostname) else node_title_remote
if server_data.get('is_online', False):
node_tree_text = node_tree_text + " - [green]Running"
node_tree = Tree(node_tree_text)
stats_text = f"CPU: [yellow]{server_data['status']['cpu_percent']}% [default]| RAM: " \
f"[yellow]{server_data['status']['memory_percent']}% [default]| Cores: " \
f"[yellow]{server_data['status']['cpu_count']} [default]| " \
f"{server_data['status']['platform'].split('-')[0]}"
node_tree.add(Tree(stats_text))
running_jobs = [job for job in server_data['jobs'] if job['status'] == RenderStatus.RUNNING.value]
not_started = [job for job in server_data['jobs'] if job['status'] == RenderStatus.NOT_STARTED.value]
scheduled = [job for job in server_data['jobs'] if job['status'] == RenderStatus.SCHEDULED.value]
jobs_to_display = running_jobs + not_started + scheduled
jobs_tree = Tree(f"Running: [green]{len(running_jobs)} [default]| Queued: [cyan]{len(not_started)}"
f"[default] | Scheduled: [cyan]{len(scheduled)}")
for job in jobs_to_display:
renderer = f"{renderer_colors[job['renderer']]}{job['renderer']}[default]"
filename = os.path.basename(job['input_path']).split('.')[0]
if job['status'] == RenderStatus.RUNNING.value:
jobs_tree.add(f"[bold]{renderer} {filename} ({job['id']}) - {status_string_to_color(job['status'])}{(float(job['percent_complete']) * 100):.1f}%")
else:
jobs_tree.add(f"{filename} ({job['id']}) - {status_string_to_color(job['status'])}{job['status'].title()}")
if not jobs_to_display:
jobs_tree.add("[italic]No running jobs")
node_tree.add(jobs_tree)
main_tree.add(node_tree)
else:
# if server is offline
node_tree_text = node_tree_text + " - [red]Offline"
node_tree = Tree(node_tree_text)
main_tree.add(node_tree)
return main_tree
def create_jobs_table(all_server_data) -> Table:
table = Table("ID", "Name", "Renderer", Column(header="Priority", justify="center"),
Column(header="Status", justify="center"), Column(header="Time Elapsed", justify="right"),
Column(header="# Frames", justify="right"), "Client", show_lines=True,
box=box.HEAVY_HEAD)
all_jobs = []
for server_name, server_data in all_server_data['servers'].items():
for job in server_data['jobs']:
#todo: clean this up
all_jobs.append(job)
all_jobs = sorted_jobs(all_jobs)
for job in all_jobs:
job_status = string_to_status(job['status'])
job_color = '[{}]'.format(status_colors[job_status])
job_text = f"{job_color}" + job_status.value.title()
if job_status == RenderStatus.ERROR and job['errors']:
job_text = job_text + "\n" + "\n".join(job['errors'])
# Project name
project_name = job_color + (job['name'] or os.path.basename(job['input_path']))
elapsed_time = get_time_elapsed(datetime.datetime.fromisoformat(job['start_time']),
datetime.datetime.fromisoformat(job['end_time']))
if job_status == RenderStatus.RUNNING:
job_text = f"{job_color}[bold]Running - {float(job['percent_complete']) * 100:.1f}%"
elapsed_time = "[bold]" + elapsed_time
project_name = "[bold]" + project_name
elif job_status == RenderStatus.CANCELLED or job_status == RenderStatus.ERROR:
project_name = "[strike]" + project_name
# Priority
priority_color = ["red", "yellow", "cyan"][(job['priority'] - 1)]
client_name = job['client'] or 'unknown'
client_colors = {'unknown': '[red]', local_hostname: '[yellow]'}
client_title = client_colors.get(client_name, '[magenta]') + client_name
table.add_row(
job['id'],
project_name,
renderer_colors.get(job['renderer'], '[cyan]') + job['renderer'] + '[default]-' + job['renderer_version'],
f"[{priority_color}]{job['priority']}",
job_text,
elapsed_time,
str(max(int(job['total_frames']), 1)),
client_title
)
return table
def create_status_panel(all_server_data):
for key, value in all_server_data['servers'].items():
if key == local_hostname:
return str(value['status'])
return "no status"
class KeyboardThread(threading.Thread):
def __init__(self, input_cbk = None, name='keyboard-input-thread'):
self.input_cbk = input_cbk
super(KeyboardThread, self).__init__(name=name)
self.start()
def run(self):
while True:
self.input_cbk(input()) #waits to get input + Return
def my_callback(inp):
#evaluate the keyboard input
print('You Entered:', inp)
if __name__ == '__main__':
get_server_ip = input("Enter server IP or None for local: ") or local_hostname
server_proxy = RenderServerProxy(get_server_ip, "8080")
if not server_proxy.connect():
if server_proxy.hostname == local_hostname:
start_server_input = input("Local server not running. Start server? (y/n) ")
if start_server_input and start_server_input[0].lower() == "y":
# Startup the local server
start_server()
test = server_proxy.connect()
print(f"connected? {test}")
else:
print(f"\nUnable to connect to server: {server_proxy.hostname}")
print("\nVerify IP address is correct and server is running")
exit(1)
# start the Keyboard thread
# kthread = KeyboardThread(my_callback)
# Console Layout
console = Console()
layout = Layout()
# Divide the "screen" in to three parts
layout.split(
Layout(name="header", size=3),
Layout(ratio=1, name="main")
# Layout(size=10, name="footer"),
)
# Divide the "main" layout in to "side" and "body"
layout["main"].split_row(
Layout(name="side"),
Layout(name="body",
ratio=3))
# Divide the "side" layout in to two
layout["side"].split(Layout(name="side_top"), Layout(name="side_bottom"))
# Server connection header
header_text = Text(f"Connected to server: ")
header_text.append(f"{server_proxy.hostname} ", style="green")
if server_proxy.hostname == local_hostname:
header_text.append("(This Computer)", style="yellow")
else:
header_text.append("(Remote)", style="magenta")
# background process to update server data independent of the UI
def fetch_server_data(server):
while True:
fetched_data = server.get_data(timeout=5)
if fetched_data:
server.fetched_status_data = fetched_data
time.sleep(1)
x = threading.Thread(target=fetch_server_data, args=(server_proxy,))
x.daemon = True
x.start()
# draw and update the UI
with Live(console=console, screen=False, refresh_per_second=1, transient=True) as live:
while True:
try:
if server_proxy.fetched_status_data:
server_online = False
if server_proxy.fetched_status_data.get('timestamp', None):
timestamp = datetime.datetime.fromisoformat(server_proxy.fetched_status_data['timestamp'])
time_diff = datetime.datetime.now() - timestamp
server_online = time_diff.seconds < 10 # client is offline if not updated in certain time
layout["body"].update(create_jobs_table(server_proxy.fetched_status_data))
layout["side_top"].update(Panel(create_node_tree(server_proxy.fetched_status_data)))
layout["side_bottom"].update(Panel(create_status_panel(server_proxy.fetched_status_data)))
online_text = "Online" if server_online else "Offline"
online_color = "green" if server_online else "red"
layout["header"].update(Panel(Text(f"Zordon Render Client - Version 0.0.1 alpha - {online_text}",
justify="center", style=online_color)))
live.update(layout, refresh=False)
except Exception as e:
print(f"Exception updating table: {e}")
traceback.print_exception(e)
time.sleep(1)
# # # todo: Add input prompt to manage running jobs (ie add, cancel, get info, etc)

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
from init import run
from src.api.api_server import start_server
if __name__ == '__main__':
run(server_only=True)
start_server()

View File

@@ -11,7 +11,13 @@ from setuptools import setup
APP = ['main.py']
DATA_FILES = [('config', glob.glob('config/*.*')),
('resources', glob.glob('resources/*.*'))]
OPTIONS = {}
OPTIONS = {
'excludes': ['PySide6'],
'includes': ['zeroconf', 'zeroconf._services.info'],
'plist': {
'LSMinimumSystemVersion': '10.15', # Specify minimum macOS version
},
}
setup(
app=APP,

View File

@@ -49,11 +49,12 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory):
raise ValueError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
raise ValueError("Cannot find any valid project paths")
# Prepare the local filepath
cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-')
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)

View File

@@ -2,12 +2,14 @@
import concurrent.futures
import json
import logging
import multiprocessing
import os
import pathlib
import shutil
import socket
import ssl
import tempfile
import threading
import time
from datetime import datetime
from zipfile import ZipFile
@@ -15,19 +17,19 @@ from zipfile import ZipFile
import psutil
import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
from sqlalchemy.orm.exc import DetachedInstanceError
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
current_system_os_version, num_to_alphanumeric
from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
logger = logging.getLogger()
server = Flask(__name__)
@@ -37,29 +39,6 @@ categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
# -- Error Handlers --
@server.errorhandler(JobNotFoundError)
def handle_job_not_found(job_error):
return str(job_error), 400
@server.errorhandler(DetachedInstanceError)
def handle_detached_instance(error):
# logger.debug(f"detached instance: {error}")
return "Unavailable", 503
@server.errorhandler(Exception)
def handle_general_error(general_error):
err_msg = f"Server error: {general_error}"
logger.error(err_msg)
return err_msg, 500
# -- Jobs --
def sorted_jobs(all_jobs, sort_by_date=True):
if not sort_by_date:
sorted_job_list = []
@@ -81,11 +60,9 @@ def jobs_json():
job_cache_int = int(json.dumps(all_jobs).__hash__())
job_cache_token = num_to_alphanumeric(job_cache_int)
return {'jobs': all_jobs, 'token': job_cache_token}
except DetachedInstanceError as e:
raise e
except Exception as e:
logger.error(f"Error fetching jobs_json: {e}")
raise e
logger.exception(f"Exception fetching jobs_json: {e}")
return {}, 500
@server.get('/api/jobs_long_poll')
@@ -101,40 +78,50 @@ def long_polling_jobs():
if time.time() - start_time > 30:
return {}, 204
time.sleep(1)
except DetachedInstanceError as e:
raise e
except Exception as e:
logger.error(f"Error fetching long_polling_jobs: {e}")
raise e
logger.exception(f"Exception fetching long_polling_jobs: {e}")
return {}, 500
@server.route('/api/job/<job_id>/thumbnail')
def job_thumbnail(job_id):
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
if found_job:
try:
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4')
big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg')
# trigger a thumbnail update - just in case
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
previews = PreviewManager.get_previews_for_job(found_job)
all_previews_list = previews.get('output', previews.get('input', []))
# generate regular thumb if it doesn't exist
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
filtered_list = video_previews if video_previews and video_ok else image_previews
# generate big thumb if it doesn't exist
if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800)
# todo - sort by size or other metrics here
if filtered_list:
preview_to_send = filtered_list[0]
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
except Exception as e:
logger.error(f'Error getting thumbnail: {e}')
return f'Error getting thumbnail: {e}', 500
return "No thumbnail available", 404
# generated videos
if video_ok:
if big_thumb and os.path.exists(big_video_path) and not os.path.exists(
big_video_path + '_IN-PROGRESS'):
return send_file(big_video_path, mimetype="video/mp4")
elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
return send_file(thumb_video_path, mimetype="video/mp4")
# Generated thumbs
if big_thumb and os.path.exists(big_image_path):
return send_file(big_image_path, mimetype='image/jpeg')
elif os.path.exists(thumb_image_path):
return send_file(thumb_image_path, mimetype='image/jpeg')
return found_job.status.value, 200
return found_job.status.value, 404
# Get job file routing
@@ -159,17 +146,22 @@ def filtered_jobs_json(status_val):
return f'Cannot find jobs with status {status_val}', 400
@server.post('/api/job/<job_id>/send_subjob_update_notification')
def subjob_update_notification(job_id):
@server.post('/api/job/<job_id>/notify_parent_of_status_change')
def subjob_status_change(job_id):
try:
subjob_details = request.json
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
return Response(status=200)
except JobNotFoundError:
return "Job not found", 404
@server.errorhandler(JobNotFoundError)
def handle_job_not_found(job_error):
return f'Cannot find job with ID {job_error.job_id}', 400
@server.get('/api/job/<job_id>')
def get_job_status(job_id):
return RenderQueue.job_with_id(job_id).json()
@@ -188,22 +180,7 @@ def get_job_logs(job_id):
@server.get('/api/job/<job_id>/file_list')
def get_file_list(job_id):
return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()]
@server.route('/api/job/<job_id>/download')
def download_file(job_id):
requested_filename = request.args.get('filename')
if not requested_filename:
return 'Filename required', 400
found_job = RenderQueue.job_with_id(job_id)
for job_filename in found_job.file_list():
if os.path.basename(job_filename).lower() == requested_filename.lower():
return send_file(job_filename, as_attachment=True, )
return f"File '{requested_filename}' not found", 404
return RenderQueue.job_with_id(job_id).file_list()
@server.route('/api/job/<job_id>/download_all')
@@ -328,10 +305,14 @@ def delete_job(job_id):
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
shutil.rmtree(output_dir)
try:
PreviewManager.delete_previews_for_job(found_job)
except Exception as e:
logger.error(f"Error deleting previews for {found_job}: {e}")
# Remove any thumbnails
for filename in os.listdir(server.config['THUMBS_FOLDER']):
if job_id in filename:
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
if os.path.exists(thumb_path):
os.remove(thumb_path)
# See if we own the project_dir (i.e. was it uploaded)
project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
@@ -482,6 +463,7 @@ def delete_engine_download():
@server.get('/api/renderer/<renderer>/args')
def get_renderer_args(renderer):
try:
# todo: possibly deprecate
renderer_engine_class = EngineManager.engine_with_name(renderer)
return renderer_engine_class().get_arguments()
except LookupError:
@@ -508,24 +490,60 @@ def get_disk_benchmark():
return {'write_speed': results[0], 'read_speed': results[-1]}
def start_server(hostname=None):
def start_server():
def eval_loop(delay_sec=1):
while True:
RenderQueue.evaluate_queue()
time.sleep(delay_sec)
# get hostname
if not hostname:
local_hostname = socket.gethostname()
hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings
server.config['HOSTNAME'] = hostname
server.config['HOSTNAME'] = local_hostname
server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs'))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs
# Setup directory for saving engines to
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
# Debug info
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(Config.flask_log_level.upper())
logger.debug('Starting API server')
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False,
threaded=True)
# check for updates for render engines if configured or on first launch
if Config.update_engines_on_launch or not EngineManager.get_engines():
EngineManager.update_all_engines()
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
thread.start()
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
try:
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
use_reloader=False, threaded=True)
finally:
RenderQueue.save_state()
ZeroconfServer.stop()

View File

@@ -1,113 +0,0 @@
import logging
import os
import subprocess
import threading
from pathlib import Path
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
logger = logging.getLogger()
supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
supported_image_formats = ['.jpg', '.png', '.exr', '.tif']
class PreviewManager:
storage_path = None
_running_jobs = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320):
# Determine best source file to use for thumbs
job_file_list = job.file_list()
source_files = job_file_list if job_file_list else [job.input_path]
preview_label = "output" if job_file_list else "input"
# filter by type
found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats]
found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats]
# check if we even have any valid files to work from
if source_files and not found_video_files and not found_image_files:
logger.warning(f"No valid image or video files found in files from job: {job}")
return
os.makedirs(cls.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg'
if replace_existing:
for x in [preview_image_path, preview_video_path]:
try:
os.remove(x)
except OSError:
pass
# Generate image previews
if (found_video_files or found_image_files) and not os.path.exists(preview_image_path):
try:
path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1]
logger.debug(f"Generating image preview for {path_of_source}")
save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width)
logger.debug(f"Successfully created image preview for {path_of_source}")
except Exception as e:
logger.error(f"Error generating image preview for {job}: {e}")
# Generate video previews
if found_video_files and not os.path.exists(preview_video_path):
try:
path_of_source = found_video_files[0]
logger.debug(f"Generating video preview for {path_of_source}")
generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width)
logger.debug(f"Successfully created video preview for {path_of_source}")
except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}")
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = cls._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
else:
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
results = {}
try:
directory_path = Path(cls.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
for preview_filename in preview_files_for_job:
try:
pixel_width = str(preview_filename).split('-')[-1]
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
extension = os.path.splitext(preview_filename)[-1].lower()
kind = 'video' if extension in supported_video_formats else \
'image' if extension in supported_image_formats else 'unknown'
results[preview_label] = results.get(preview_label, [])
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
except IndexError: # ignore invalid filenames
pass
except FileNotFoundError:
pass
return results
@classmethod
def delete_previews_for_job(cls, job):
all_previews = cls.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:
try:
logger.debug(f"Removing preview: {preview['filename']}")
os.remove(preview['filename'])
except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")

View File

@@ -171,19 +171,19 @@ class RenderServerProxy:
def get_all_engines(self):
return self.request_data('all_engines')
def send_subjob_update_notification(self, parent_id, subjob):
def notify_parent_of_status_change(self, parent_id, subjob):
"""
Notifies the parent job of an update in a subjob.
Notifies the parent job of a status change in a subjob.
Args:
parent_id (str): The ID of the parent job.
subjob (Job): The subjob that has updated.
subjob (Job): The subjob that has changed status.
Returns:
Response: The response from the server.
"""
hostname = LOOPBACK if self.is_localhost else self.hostname
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification',
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
json=subjob.json())
def post_job_to_server(self, file_path, job_list, callback=None):
@@ -232,27 +232,19 @@ class RenderServerProxy:
except Exception as e:
logger.error(f"An error occurred: {e}")
def get_job_files_list(self, job_id):
return self.request_data(f"job/{job_id}/file_list")
def download_all_job_files(self, job_id, save_path):
def get_job_files(self, job_id, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
return self.__download_file_from_url(url, output_filepath=save_path)
def download_job_file(self, job_id, job_filename, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}"
return self.__download_file_from_url(url, output_filepath=save_path)
return self.download_file(url, filename=save_path)
@staticmethod
def __download_file_from_url(url, output_filepath):
def download_file(url, filename):
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(output_filepath, 'wb') as f:
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return output_filepath
return filename
# --- Renderer --- #

View File

@@ -10,11 +10,9 @@ import requests
from plyer import notification
from pubsub import pub
from src.api.preview_manager import PreviewManager
from src.api.server_proxy import RenderServerProxy
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.misc_helper import get_file_size_human
from src.utilities.status_utils import RenderStatus, string_to_status
from src.utilities.zeroconf_server import ZeroconfServer
@@ -34,43 +32,6 @@ class DistributedJobManager:
This should be called once, typically during the initialization phase.
"""
pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
@classmethod
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
"""
Responds to the 'frame_complete' pubsub message for local jobs.
Parameters:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue
return
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews)
@classmethod
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try:
logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}')
RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job)
except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
@@ -91,15 +52,15 @@ class DistributedJobManager:
return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
if render_job.parent: # If local job is a subjob from a remote server
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
# Handle children
if render_job.children:
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
for child in render_job.children:
child_id, child_hostname = child.split('@')
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
# handle cancelling all the children
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
for child in render_job.children:
child_id, hostname = child.split('@')
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
# UI Notifications
try:
@@ -136,7 +97,8 @@ class DistributedJobManager:
"""
Creates render jobs.
This method job data and a local path to a loaded project. It creates and returns new a render job.
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render
job for each job data in the list and appends the result to a list. The list of results is then returned.
Args:
job_data (dict): Job data.
@@ -172,7 +134,6 @@ class DistributedJobManager:
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname()
# determine if we can / should split the job
@@ -182,7 +143,6 @@ class DistributedJobManager:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
PreviewManager.update_previews_for_job(worker)
return worker
@@ -191,9 +151,9 @@ class DistributedJobManager:
# --------------------------------------------
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data):
def handle_subjob_status_change(cls, local_job, subjob_data):
"""
Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
Args:
local_job (BaseRenderWorker): The local parent job worker.
@@ -202,40 +162,27 @@ class DistributedJobManager:
subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id']
subjob_hostname = subjob_data['hostname']
subjob_key = f'{subjob_id}@{subjob_hostname}'
old_status = local_job.children.get(subjob_key, {}).get('status')
local_job.children[subjob_key] = subjob_data
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
hostname.split('@')[0] == subjob_id), None)
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
logname = f"<Parent: {local_job.id} | Child: {subjob_key}>"
if old_status != subjob_status.value:
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
# Download complete or partial render jobs
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
subjob_data['file_count']:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
# todo: handle error
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
# todo: determine missing frames and schedule new job
pass
@staticmethod
def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
try:
local_files = [os.path.basename(x) for x in local_job.file_list()]
subjob_proxy = RenderServerProxy(subjob_hostname)
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
for subjob_filename in subjob_files:
if subjob_filename not in local_files:
try:
logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}")
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
save_path=local_save_path)
logger.debug(f'Downloaded successfully - {local_save_path}')
except Exception as e:
logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}")
except Exception as e:
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
@staticmethod
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
def download_from_subjob(local_job, subjob_id, subjob_hostname):
"""
Downloads and extracts files from a completed subjob on a remote server.
@@ -256,7 +203,7 @@ class DistributedJobManager:
try:
local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e:
logger.error(f"Error downloading files from remote server: {e}")
@@ -280,7 +227,6 @@ class DistributedJobManager:
@classmethod
def wait_for_subjobs(cls, local_job):
# todo: rewrite this method
logger.debug(f"Waiting for subjobs for job {local_job}")
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
@@ -320,10 +266,10 @@ class DistributedJobManager:
# Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
try:
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
except Exception as e:
logger.error(f"Error downloading missing frames from subjob: {e}")
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
logger.error("Failed to download from subjob")
# todo: error handling here
# Any finished jobs not successfully downloaded at this point are skipped
if local_job.children[child_key].get('download_status', None) is None and \

View File

@@ -1,22 +1,74 @@
import glob
import logging
import subprocess
from src.engines.core.base_engine import BaseRenderEngine
logger = logging.getLogger()
class AERender(BaseRenderEngine):
supported_extensions = ['.aep']
file_extensions = ['aepx']
def version(self):
version = None
try:
render_path = self.renderer_path()
if render_path:
ver_out = subprocess.check_output([render_path, '-version'], timeout=SUBPROCESS_TIMEOUT)
version = ver_out.decode('utf-8').split(" ")[-1].strip()
ver_out = subprocess.run([render_path, '-version'], capture_output=True, text=True)
version = ver_out.stdout.split(" ")[-1].strip()
except Exception as e:
logger.error(f'Failed to get {self.name()} version: {e}')
return version
@classmethod
def default_renderer_path(cls):
paths = glob.glob('/Applications/*After Effects*/aerender')
if len(paths) > 1:
logger.warning('Multiple After Effects installations detected')
elif not paths:
logger.error('After Effects installation not found')
return paths[0]
def get_project_info(self, project_path, timeout=10):
scene_info = {}
try:
import xml.etree.ElementTree as ET
tree = ET.parse(project_path)
root = tree.getroot()
namespace = {'ae': 'http://www.adobe.com/products/aftereffects'}
comp_names = []
for item in root.findall(".//ae:Item", namespace):
if item.find("ae:Layr", namespace) is not None:
for string in item.findall("./ae:string", namespace):
comp_names.append(string.text)
scene_info['comp_names'] = comp_names
except Exception as e:
logger.error(f'Error getting file details for .aepx file: {e}')
return scene_info
def run_javascript(self, script_path, project_path, timeout=None):
# todo: implement
pass
@classmethod
def get_output_formats(cls):
# todo: create implementation
return []
return []
def ui_options(self, project_info):
from src.engines.aerender.aerender_ui import AERenderUI
return AERenderUI.get_options(self, project_info)
@classmethod
def worker_class(cls):
from src.engines.aerender.aerender_worker import AERenderWorker
return AERenderWorker
if __name__ == "__main__":
x = AERender().get_project_info('/Users/brett/ae_testing/project.aepx')
print(x)

View File

@@ -0,0 +1,8 @@
class AERenderUI:
@staticmethod
def get_options(instance, project_info):
options = [
{'name': 'comp', 'options': project_info.get('comp_names', [])}
]
return options

View File

@@ -9,72 +9,39 @@ import time
from src.engines.core.base_worker import BaseRenderWorker, timecode_to_frames
from src.engines.aerender.aerender_engine import AERender
def aerender_path():
paths = glob.glob('/Applications/*After Effects*/aerender')
if len(paths) > 1:
logging.warning('Multiple After Effects installations detected')
elif not paths:
logging.error('After Effects installation not found')
else:
return paths[0]
logger = logging.getLogger()
class AERenderWorker(BaseRenderWorker):
supported_extensions = ['.aep']
engine = AERender
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
parent=parent, name=name)
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, engine_path=engine_path,
args=args, parent=parent, name=name)
self.comp = args.get('comp', None)
self.render_settings = args.get('render_settings', None)
self.omsettings = args.get('omsettings', None)
self.progress = 0
self.progress_history = []
self.attributes = {}
# temp files for processing stdout
self.__progress_history = []
self.__temp_attributes = {}
def generate_worker_subprocess(self):
if os.path.exists('nexrender-cli-macos'):
logging.info('nexrender found')
# {
# "template": {
# "src": String,
# "composition": String,
#
# "frameStart": Number,
# "frameEnd": Number,
# "frameIncrement": Number,
#
# "continueOnMissing": Boolean,
# "settingsTemplate": String,
# "outputModule": String,
# "outputExt": String,
# },
# "assets": [],
# "actions": {
# "prerender": [],
# "postrender": [],
# },
# "onChange": Function,
# "onRenderProgress": Function
# }
job = {'template':
{
'src': 'file://' + self.input_path, 'composition': self.comp.replace('"', ''),
'settingsTemplate': self.render_settings.replace('"', ''),
'outputModule': self.omsettings.replace('"', ''), 'outputExt': 'mov'}
}
x = ['./nexrender-cli-macos', "'{}'".format(json.dumps(job))]
else:
logging.info('nexrender not found')
x = [aerender_path(), '-project', self.input_path, '-comp', self.comp, '-RStemplate', self.render_settings,
'-OMtemplate', self.omsettings, '-output', self.output_path]
return x
comp = self.args.get('comp', 'Comp 1')
render_settings = self.args.get('render_settings', None)
omsettings = self.args.get('omsettings', None)
command = [self.renderer_path, '-project', self.input_path, '-comp', f'"{comp}"']
if render_settings:
command.extend(['-RStemplate', render_settings])
if omsettings:
command.extend(['-OMtemplate', omsettings])
command.extend(['-s', self.start_frame,
'-e', self.end_frame,
'-output', self.output_path])
return command
def _parse_stdout(self, line):
@@ -83,12 +50,12 @@ class AERenderWorker(BaseRenderWorker):
# print 'progress'
trimmed = line.replace('PROGRESS:', '').strip()
if len(trimmed):
self.progress_history.append(line)
self.__progress_history.append(line)
if 'Seconds' in trimmed:
self._update_progress(line)
elif ': ' in trimmed:
tmp = trimmed.split(': ')
self.attributes[tmp[0].strip()] = tmp[1].strip()
self.__temp_attributes[tmp[0].strip()] = tmp[1].strip()
elif line.startswith('WARNING:'):
trimmed = line.replace('WARNING:', '').strip()
self.warnings.append(trimmed)
@@ -99,28 +66,28 @@ class AERenderWorker(BaseRenderWorker):
def _update_progress(self, line):
if not self.total_frames:
duration_string = self.attributes.get('Duration', None)
frame_rate = self.attributes.get('Frame Rate', '0').split(' ')[0]
duration_string = self.__temp_attributes.get('Duration', None)
frame_rate = self.__temp_attributes.get('Frame Rate', '0').split(' ')[0]
self.total_frames = timecode_to_frames(duration_string.split('Duration:')[-1], float(frame_rate))
match = re.match(r'PROGRESS:.*\((?P<frame>\d+)\): (?P<time>\d+)', line).groupdict()
self.last_frame = match['frame']
self.current_frame = match['frame']
def average_frame_duration(self):
total_durations = 0
for line in self.progress_history:
for line in self.__progress_history:
match = re.match(r'PROGRESS:.*\((?P<frame>\d+)\): (?P<time>\d+)', line)
if match:
total_durations += int(match.group(2))
average = float(total_durations) / self.last_frame
average = float(total_durations) / self.current_frame
return average
def percent_complete(self):
if self.total_frames:
return (float(self.last_frame) / float(self.total_frames)) * 100
return (float(self.current_frame) / float(self.total_frames)) * 100
else:
return 0
@@ -128,8 +95,11 @@ class AERenderWorker(BaseRenderWorker):
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
r = AERenderWorker('/Users/brett/Desktop/Youtube_Vids/Film_Formats/Frame_Animations.aep', '"Film Pan"',
'"Draft Settings"', '"ProRes"', '/Users/brett/Desktop/test_render')
r = AERenderWorker(input_path='/Users/brett/ae_testing/project.aepx',
output_path='/Users/brett/ae_testing/project.mp4',
engine_path=AERenderWorker.engine.default_renderer_path(),
args={'start_frame': 1, 'end_frame': 5})
r.start()
while r.is_running():
time.sleep(0.1)

View File

@@ -1,6 +1,5 @@
import json
import re
from concurrent.futures import ThreadPoolExecutor
from src.engines.core.base_engine import *
from src.utilities.misc_helper import system_safe_path
@@ -12,25 +11,22 @@ class Blender(BaseRenderEngine):
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
binary_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender'}
file_extensions = ['blend']
@staticmethod
def downloader():
from src.engines.blender.blender_downloader import BlenderDownloader
return BlenderDownloader
@staticmethod
def worker_class():
@classmethod
def worker_class(cls):
from src.engines.blender.blender_worker import BlenderRenderWorker
return BlenderRenderWorker
def ui_options(self):
def ui_options(self, project_info):
from src.engines.blender.blender_ui import BlenderUI
return BlenderUI.get_options(self)
@staticmethod
def supported_extensions():
return ['blend']
def version(self):
version = None
try:
@@ -116,7 +112,7 @@ class Blender(BaseRenderEngine):
logger.error(f'Error packing .blend file: {e}')
return None
def get_arguments(self):
def get_arguments(self): # possibly deprecate
help_text = subprocess.check_output([self.renderer_path(), '-h']).decode('utf-8')
lines = help_text.splitlines()
@@ -149,13 +145,7 @@ class Blender(BaseRenderEngine):
return options
def system_info(self):
with ThreadPoolExecutor() as executor:
future_render_devices = executor.submit(self.get_render_devices)
future_engines = executor.submit(self.supported_render_engines)
render_devices = future_render_devices.result()
engines = future_engines.result()
return {'render_devices': render_devices, 'engines': engines}
return {'render_devices': self.get_render_devices()}
def get_render_devices(self):
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')

View File

@@ -12,17 +12,12 @@ class BlenderRenderWorker(BaseRenderWorker):
engine = Blender
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, engine_path=engine_path, args=args, parent=parent, name=name)
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
engine_path=engine_path, args=args, parent=parent, name=name)
# Stats
self.__frame_percent_complete = 0.0
# Scene Info
self.scene_info = Blender(engine_path).get_project_info(input_path)
self.start_frame = int(self.scene_info.get('start_frame', 1))
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
self.project_length = (self.end_frame - self.start_frame) + 1
self.current_frame = -1
self.current_frame = -1 # todo: is this necessary?
def generate_worker_subprocess(self):
@@ -40,13 +35,11 @@ class BlenderRenderWorker(BaseRenderWorker):
if custom_camera:
python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];"
# Setup Render Engines
self.args['engine'] = self.args.get('engine', 'CYCLES').upper() # set default render engine
# Configure Cycles
if self.args['engine'] == 'CYCLES':
# Set Render Device (gpu/cpu/any)
# Set Render Device (gpu/cpu/any)
blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
if blender_engine == 'CYCLES':
render_device = self.args.get('render_device', 'any').lower()
if render_device not in ['any', 'gpu', 'cpu']:
if render_device not in {'any', 'gpu', 'cpu'}:
raise AttributeError(f"Invalid Cycles render device: {render_device}")
use_gpu = render_device in {'any', 'gpu'}
@@ -64,10 +57,7 @@ class BlenderRenderWorker(BaseRenderWorker):
# Export format
export_format = self.args.get('export_format', None) or 'JPEG'
main_part, ext = os.path.splitext(self.output_path)
# Remove the extension only if it is not composed entirely of digits
path_without_ext = main_part if not ext[1:].isdigit() else self.output_path
path_without_ext += "_"
path_without_ext = os.path.splitext(self.output_path)[0] + "_"
cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format])
# set frame range
@@ -112,7 +102,6 @@ class BlenderRenderWorker(BaseRenderWorker):
output_file_number = output_filename_match.groups()[0]
try:
self.current_frame = int(output_file_number)
self._send_frame_complete_notification()
except ValueError:
pass
elif render_stats_match:
@@ -121,15 +110,15 @@ class BlenderRenderWorker(BaseRenderWorker):
logger.info(f'Frame #{self.current_frame} - '
f'{frame_count} of {self.total_frames} completed in {time_completed} | '
f'Total Elapsed Time: {datetime.now() - self.start_time}')
else:
logger.debug(f'DEBUG: {line}')
else:
pass
# if len(line.strip()):
# logger.debug(line.strip())
def percent_complete(self):
if self.status == RenderStatus.COMPLETED:
return 1
elif self.total_frames <= 1:
if self.total_frames <= 1:
return self.__frame_percent_complete
else:
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames

View File

@@ -19,8 +19,8 @@ for cam_obj in bpy.data.cameras:
data = {'cameras': cameras,
'engine': scene.render.engine,
'frame_start': scene.frame_start,
'frame_end': scene.frame_end,
'start_frame': scene.frame_start,
'end_frame': scene.frame_end,
'resolution_x': scene.render.resolution_x,
'resolution_y': scene.render.resolution_y,
'resolution_percentage': scene.render.resolution_percentage,

View File

@@ -9,12 +9,12 @@ SUBPROCESS_TIMEOUT = 5
class BaseRenderEngine(object):
install_paths = []
supported_extensions = []
file_extensions = []
def __init__(self, custom_path=None):
self.custom_renderer_path = custom_path
if not self.renderer_path() or not os.path.exists(self.renderer_path()):
raise FileNotFoundError(f"Cannot find path to renderer for {self.name()} instance")
raise FileNotFoundError(f"Cannot find path ({self.renderer_path()}) for renderer '{self.name()}'")
if not os.access(self.renderer_path(), os.X_OK):
logger.warning(f"Path is not executable. Setting permissions to 755 for {self.renderer_path()}")
@@ -47,19 +47,18 @@ class BaseRenderEngine(object):
def downloader(): # override when subclassing if using a downloader class
return None
@staticmethod
def worker_class(): # override when subclassing to link worker class
raise NotImplementedError("Worker class not implemented")
@classmethod
def worker_class(cls): # override when subclassing to link worker class
raise NotImplementedError(f"Worker class not implemented for engine {cls.name()}")
def ui_options(self): # override to return options for ui
def ui_options(self, project_info): # override to return options for ui
return {}
def get_help(self): # override if renderer uses different help flag
path = self.renderer_path()
if not path:
raise FileNotFoundError("renderer path not found")
help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT,
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
help_doc = subprocess.run([path, '-h'], capture_output=True, text=True).stdout.strip()
return help_doc
def get_project_info(self, project_path, timeout=10):
@@ -69,6 +68,10 @@ class BaseRenderEngine(object):
def get_output_formats(cls):
raise NotImplementedError(f"get_output_formats not implemented for {cls.__name__}")
@classmethod
def supported_extensions(cls):
return cls.file_extensions
def get_arguments(self):
pass

View File

@@ -5,7 +5,6 @@ import logging
import os
import subprocess
import threading
import time
from datetime import datetime
import psutil
@@ -82,8 +81,11 @@ class BaseRenderWorker(Base):
# Frame Ranges
self.project_length = 0 # is this necessary?
self.current_frame = 0
self.start_frame = 0
self.end_frame = None
# Get Project Info
self.scene_info = self.engine(engine_path).get_project_info(project_path=input_path)
self.start_frame = int(self.scene_info.get('start_frame', 1))
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
# Logging
self.start_time = None
@@ -95,15 +97,10 @@ class BaseRenderWorker(Base):
self.errors = []
# Threads and processes
self.__thread = threading.Thread(target=self.__run, args=())
self.__thread = threading.Thread(target=self.run, args=())
self.__thread.daemon = True
self.__process = None
self.last_output = None
self.__last_output_time = None
self.watchdog_timeout = 120
def __repr__(self):
return f"<Job id:{self.id} p{self.priority} {self.renderer}-{self.renderer_version} '{self.name}' status:{self.status.value}>"
@property
def total_frames(self):
@@ -127,16 +124,19 @@ class BaseRenderWorker(Base):
self._status = RenderStatus.CANCELLED.value
return string_to_status(self._status)
def _send_frame_complete_notification(self):
pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame)
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts
generated_args = [str(x) for x in self.generate_worker_subprocess()]
generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)):
msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}"
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg)
return generated_args
@@ -178,12 +178,11 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.RUNNING
self.start_time = datetime.now()
self.__thread.start()
def __run(self):
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
self.__thread.start()
def run(self):
# Setup logging
log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True)
@@ -196,7 +195,7 @@ class BaseRenderWorker(Base):
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} "
f"render for {self.input_path}\n\n")
f.write(f"Running command: {subprocess_cmds}\n")
f.write(f"Running command: \"{' '.join(subprocess_cmds)}\"\n")
f.write('=' * 80 + '\n\n')
while True:
@@ -211,45 +210,51 @@ class BaseRenderWorker(Base):
else:
f.write(f'\n{"=" * 20} Attempt #{failed_attempts + 1} {"=" * 20}\n\n')
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
self.status = RenderStatus.RUNNING
return_code = self.__setup_and_run_process(f, subprocess_cmds)
# Start process and get updates
self.status = RenderStatus.RUNNING
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
f.flush()
os.fsync(f.fileno())
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
self.end_time = datetime.now()
message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \
f"after {self.time_elapsed()}\n\n"
f.write(message)
# Teardown
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled
message = f"{self.engine.name()} render ended with status '{self.status}' " \
f"after {self.time_elapsed()}"
f.write(message)
return
# if file output hasn't increased, return as error, otherwise restart process.
file_count_has_increased = len(self.file_list()) > initial_file_count
if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code:
if len(self.file_list()) <= initial_file_count:
err_msg = f"File count has not increased. Count is still {len(self.file_list())}"
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
self.status = RenderStatus.ERROR
# Handle completed - All else counts as failed attempt
if (self.status == RenderStatus.COMPLETED) and not return_code:
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
f"{self.time_elapsed()}\n")
f.write(message)
break
if return_code:
err_msg = f"{self.engine.name()} render failed with code {return_code}"
logger.error(err_msg)
self.errors.append(err_msg)
# handle instances where renderer exits ok but doesnt generate files
if not return_code and not file_count_has_increased:
err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. "
f"Count is still {len(self.file_list())}")
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
# only count the attempt as failed if renderer creates no output - ignore error codes for now
if not file_count_has_increased:
failed_attempts += 1
# Handle non-zero return codes
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \
f"after {self.time_elapsed()}\n\n"
f.write(message)
self.errors.append(message)
failed_attempts += 1
if self.children:
from src.distributed_job_manager import DistributedJobManager
@@ -261,65 +266,6 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.COMPLETED
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
def __setup_and_run_process(self, f, subprocess_cmds):
def watchdog():
logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout')
while self.__process.poll() is None:
time_since_last_update = time.time() - self.__last_output_time
if time_since_last_update > self.watchdog_timeout:
logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)")
self.__process.kill()
break
# logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}')
time.sleep(1)
logger.debug(f'Stopping process watchdog for {self}')
return_code = -1
watchdog_thread = threading.Thread(target=watchdog)
watchdog_thread.daemon = True
try:
# Start process and get updates
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
# Start watchdog
self.__last_output_time = time.time()
watchdog_thread.start()
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
self.last_output = c.strip()
self.__last_output_time = time.time()
try:
f.write(c)
f.flush()
os.fsync(f.fileno())
except Exception as e:
logger.error(f"Error saving log to disk: {e}")
try:
self._parse_stdout(c.strip())
except Exception as e:
logger.error(f'Error parsing stdout: {e}')
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
except Exception as e:
message = f'Uncaught error running render process: {e}'
f.write(message)
logger.exception(message)
self.__process.kill()
# let watchdog end before continuing - prevents multiple watchdogs running when process restarts
if watchdog_thread.is_alive():
watchdog_thread.join()
return return_code
def post_processing(self):
pass

View File

@@ -6,6 +6,7 @@ import concurrent.futures
from src.engines.blender.blender_engine import Blender
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
from src.engines.aerender.aerender_engine import AERender
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu
logger = logging.getLogger()
@@ -18,7 +19,7 @@ class EngineManager:
@staticmethod
def supported_engines():
return [Blender, FFMPEG]
return [Blender, FFMPEG, AERender]
@classmethod
def engine_with_name(cls, engine_name):
@@ -79,17 +80,20 @@ class EngineManager:
'type': 'system'
}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(fetch_engine_details, eng): eng.name()
for eng in cls.supported_engines()
if eng.default_renderer_path() and (not filter_name or filter_name == eng.name())
}
if not filter_name:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(fetch_engine_details, eng): eng.name()
for eng in cls.supported_engines()
if eng.default_renderer_path()
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
results.append(result)
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
results.append(result)
else:
results.append(fetch_engine_details(cls.engine_with_name(filter_name)))
return results
@@ -110,7 +114,7 @@ class EngineManager:
return filtered[0]
except IndexError:
logger.error(f"Cannot find newest engine version for {engine}-{system_os}-{cpu}")
return None
return None
@classmethod
def is_version_downloaded(cls, engine, version, system_os=None, cpu=None):
@@ -172,12 +176,12 @@ class EngineManager:
if background:
return thread
thread.join()
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
if not found_engine:
logger.error(f"Error downloading {engine}")
return found_engine
else:
thread.join()
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
if not found_engine:
logger.error(f"Error downloading {engine}")
return found_engine
@classmethod
def delete_engine_download(cls, engine, version, system_os=None, cpu=None):
@@ -204,19 +208,13 @@ class EngineManager:
def engine_update_task(engine_class):
logger.debug(f"Checking for updates to {engine_class.name()}")
latest_version = engine_class.downloader().find_most_recent_version()
if not latest_version:
logger.warning(f"Could not find most recent version of {engine.name()} to download")
return
version_num = latest_version.get('version')
if cls.is_version_downloaded(engine_class.name(), version_num):
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
return
# download the engine
logger.info(f"Downloading latest version of {engine_class.name()} ({version_num})...")
cls.download_engine(engine=engine_class.name(), version=version_num, background=True)
if latest_version:
logger.debug(f"Latest version of {engine_class.name()} available: {latest_version.get('version')}")
if not cls.is_version_downloaded(engine_class.name(), latest_version.get('version')):
logger.info(f"Downloading latest version of {engine_class.name()}...")
cls.download_engine(engine=engine_class.name(), version=latest_version['version'], background=True)
else:
logger.warning(f"Unable to get check for updates for {engine.name()}")
logger.info(f"Checking for updates for render engines...")
threads = []
@@ -300,6 +298,6 @@ if __name__ == '__main__':
# print(EngineManager.newest_engine_version('blender', 'macos', 'arm64'))
# EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a')
EngineManager.engines_path = "/Users/brettwilliams/zordon-uploads/engines"
EngineManager.engines_path = "/Users/brettwilliams/zordon-uploads/engines/"
# print(EngineManager.is_version_downloaded("ffmpeg", "6.0"))
print(EngineManager.get_engines())

View File

@@ -132,8 +132,8 @@ class FFMPEGDownloader(EngineDownloader):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
return cls.all_versions(system_os, cpu)[0]
except (IndexError, requests.exceptions.RequestException) as e:
logger.error(f"Cannot get most recent version of ffmpeg: {e}")
except (IndexError, requests.exceptions.RequestException):
logger.error(f"Cannot get most recent version of ffmpeg")
return {}
@classmethod

View File

@@ -12,24 +12,26 @@ class FFMPEG(BaseRenderEngine):
from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader
return FFMPEGDownloader
@staticmethod
def worker_class():
@classmethod
def worker_class(cls):
from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker
return FFMPEGRenderWorker
def ui_options(self):
def ui_options(self, project_info):
from src.engines.ffmpeg.ffmpeg_ui import FFMPEGUI
return FFMPEGUI.get_options(self)
return FFMPEGUI.get_options(self, project_info)
@classmethod
def supported_extensions(cls):
help_text = (subprocess.check_output([cls().renderer_path(), '-h', 'full'], stderr=subprocess.STDOUT)
.decode('utf-8'))
found = re.findall(r'extensions that .* is allowed to access \(default "(.*)"', help_text)
found_extensions = set()
for match in found:
found_extensions.update(match.split(','))
return list(found_extensions)
if not cls.file_extensions:
help_text = (subprocess.check_output([cls().renderer_path(), '-h', 'full'], stderr=subprocess.STDOUT)
.decode('utf-8'))
found = re.findall(r'extensions that .* is allowed to access \(default "(.*)"', help_text)
found_extensions = set()
for match in found:
found_extensions.update(match.split(','))
cls.file_extensions = list(found_extensions)
return cls.file_extensions
def version(self):
version = None

View File

@@ -1,5 +1,5 @@
class FFMPEGUI:
@staticmethod
def get_options(instance):
def get_options(instance, project_info):
options = []
return options

View File

@@ -1,27 +1,22 @@
''' app/init.py '''
import logging
import multiprocessing
import os
import socket
import sys
import threading
import time
from collections import deque
from PyQt6.QtCore import QObject, pyqtSignal
from PyQt6.QtWidgets import QApplication
from src.api.api_server import start_server
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.ui.main_window import MainWindow
from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path, current_system_cpu, current_system_os, current_system_os_version
from src.utilities.zeroconf_server import ZeroconfServer
logger = logging.getLogger()
from src.utilities.misc_helper import system_safe_path
def run(server_only=False) -> int:
def run() -> int:
"""
Initializes the application and runs it.
@@ -29,130 +24,55 @@ def run(server_only=False) -> int:
int: The exit status code.
"""
# setup logging
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper())
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Setup logging for console ui
buffer_handler = __setup_buffer_handler() if not server_only else None
logger.info(f"Starting Zordon Render Server")
return_code = 0
try:
# Load Config YAML
Config.setup_config_dir()
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
# configure default paths
EngineManager.engines_path = system_safe_path(
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
PreviewManager.storage_path = system_safe_path(
os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper())
# Debug info
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
app: QApplication = QApplication(sys.argv)
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder)))
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
# Start server in background
background_server = threading.Thread(target=start_server)
background_server.daemon = True
background_server.start()
# check for updates for render engines if configured or on first launch
if Config.update_engines_on_launch or not EngineManager.get_engines():
EngineManager.update_all_engines()
# Setup logging for console ui
buffer_handler = BufferingHandler()
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
logger = logging.getLogger()
logger.addHandler(buffer_handler)
# get hostname
local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
window: MainWindow = MainWindow()
window.buffer_handler = buffer_handler
window.show()
# configure and start API server
api_server = threading.Thread(target=start_server, args=(local_hostname,))
api_server.daemon = True
api_server.start()
return_code = app.exec()
# start zeroconf server
ZeroconfServer.configure("_zordon._tcp.local.", local_hostname, Config.port_number)
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
logger.info(f"Zordon Render Server started - Hostname: {local_hostname}")
RenderQueue.evaluation_inverval = Config.queue_eval_seconds
RenderQueue.start()
# start in gui or server only (cli) mode
logger.debug(f"Launching in {'server only' if server_only else 'GUI'} mode")
if server_only: # CLI only
api_server.join()
else: # GUI
return_code = __show_gui(buffer_handler)
except KeyboardInterrupt:
pass
except Exception as e:
logging.error(f"Unhandled exception: {e}")
return_code = 1
finally:
# shut down gracefully
logger.info(f"Zordon Render Server is preparing to shut down")
try:
RenderQueue.prepare_for_shutdown()
except Exception as e:
logger.exception(f"Exception during prepare for shutdown: {e}")
ZeroconfServer.stop()
logger.info(f"Zordon Render Server has shut down")
RenderQueue.prepare_for_shutdown()
return sys.exit(return_code)
def __setup_buffer_handler():
# lazy load GUI frameworks
from PyQt6.QtCore import QObject, pyqtSignal
class BufferingHandler(logging.Handler, QObject):
new_record = pyqtSignal(str)
class BufferingHandler(logging.Handler, QObject):
new_record = pyqtSignal(str)
def __init__(self, capacity=100):
logging.Handler.__init__(self)
QObject.__init__(self)
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
def __init__(self, capacity=100):
logging.Handler.__init__(self)
QObject.__init__(self)
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
def emit(self, record):
msg = self.format(record)
self.buffer.append(msg) # Add message to the buffer
self.new_record.emit(msg) # Emit signal
def emit(self, record):
try:
msg = self.format(record)
self.buffer.append(msg) # Add message to the buffer
self.new_record.emit(msg) # Emit signal
except RuntimeError:
pass
def get_buffer(self):
return list(self.buffer) # Return a copy of the buffer
buffer_handler = BufferingHandler()
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
logger = logging.getLogger()
logger.addHandler(buffer_handler)
return buffer_handler
def __show_gui(buffer_handler):
# lazy load GUI frameworks
from PyQt6.QtWidgets import QApplication
# load application
app: QApplication = QApplication(sys.argv)
# configure main window
from src.ui.main_window import MainWindow
window: MainWindow = MainWindow()
window.buffer_handler = buffer_handler
window.show()
return app.exec()
def get_buffer(self):
return list(self.buffer) # Return a copy of the buffer

View File

@@ -2,13 +2,12 @@ import logging
import os
from datetime import datetime
from pubsub import pub
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from src.engines.core.base_worker import Base
from src.utilities.status_utils import RenderStatus
from src.engines.engine_manager import EngineManager
from src.engines.core.base_worker import Base
logger = logging.getLogger()
@@ -18,9 +17,6 @@ class JobNotFoundError(Exception):
super().__init__(args)
self.job_id = job_id
def __str__(self):
return f"Cannot find job with ID: {self.job_id}"
class RenderQueue:
engine = None
@@ -28,46 +24,18 @@ class RenderQueue:
job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts = {}
is_running = False
__eval_thread = None
evaluation_inverval = 1
# --------------------------------------------
# Start / Stop Background Updates
# --------------------------------------------
@classmethod
def start(cls):
logger.debug("Starting render queue updates")
cls.is_running = True
cls.evaluate_queue()
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue()
@classmethod
def stop(cls):
logger.debug("Stopping render queue updates")
cls.is_running = False
# --------------------------------------------
# Queue Management
# --------------------------------------------
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
logger.info(f"Adding job to render queue: {render_job}")
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
cls.job_queue.append(render_job)
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
if cls.is_running:
cls.evaluate_queue()
@classmethod
def all_jobs(cls):
@@ -113,7 +81,6 @@ class RenderQueue:
cls.session = sessionmaker(bind=cls.engine)()
from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
@classmethod
def save_state(cls):
@@ -122,7 +89,6 @@ class RenderQueue:
@classmethod
def prepare_for_shutdown(cls):
logger.debug("Closing session")
cls.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
[cls.cancel_job(job) for job in running_jobs]
cls.save_state()
@@ -139,38 +105,35 @@ class RenderQueue:
@classmethod
def evaluate_queue(cls):
try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
except DetachedInstanceError:
pass
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info(f'Starting job: {job}')
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
cls.save_state()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job: {job}')
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job: {job}")
logger.info(f"Deleting job ID: {job.id}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)

View File

@@ -55,7 +55,7 @@ class NewRenderJobForm(QWidget):
self.priority_input = None
self.end_frame_input = None
self.start_frame_input = None
self.render_name_input = None
self.output_path_input = None
self.scene_file_input = None
self.scene_file_browse_button = None
self.job_name_input = None
@@ -136,11 +136,11 @@ class NewRenderJobForm(QWidget):
self.output_settings_group = QGroupBox("Output Settings")
output_settings_layout = QVBoxLayout(self.output_settings_group)
# output path
render_name_layout = QHBoxLayout()
render_name_layout.addWidget(QLabel("Render name:"))
self.render_name_input = QLineEdit()
render_name_layout.addWidget(self.render_name_input)
output_settings_layout.addLayout(render_name_layout)
output_path_layout = QHBoxLayout()
output_path_layout.addWidget(QLabel("Render name:"))
self.output_path_input = QLineEdit()
output_path_layout.addWidget(self.output_path_input)
output_settings_layout.addLayout(output_path_layout)
# file format
file_format_layout = QHBoxLayout()
file_format_layout.addWidget(QLabel("Format:"))
@@ -281,7 +281,7 @@ class NewRenderJobForm(QWidget):
output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text()))
output_name = output_name.replace(' ', '_')
self.render_name_input.setText(output_name)
self.output_path_input.setText(output_name)
file_name = self.scene_file_input.text()
# setup bg worker
@@ -292,7 +292,7 @@ class NewRenderJobForm(QWidget):
def browse_output_path(self):
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory")
if directory:
self.render_name_input.setText(directory)
self.output_path_input.setText(directory)
def args_help_button_clicked(self):
url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/'
@@ -316,16 +316,28 @@ class NewRenderJobForm(QWidget):
self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet
# not ideal but if we don't have the renderer info we have to pick something
self.output_path_input.setText(os.path.basename(input_path))
# cleanup progress UI
self.load_file_group.setHidden(True)
self.toggle_renderer_enablement(True)
# Load scene data
self.start_frame_input.setValue(self.project_info.get('frame_start'))
self.end_frame_input.setValue(self.project_info.get('frame_end'))
self.resolution_x_input.setValue(self.project_info.get('resolution_x'))
self.resolution_y_input.setValue(self.project_info.get('resolution_y'))
self.frame_rate_input.setValue(self.project_info.get('fps'))
# -- Load scene data
# start / end frames
self.start_frame_input.setValue(self.project_info.get('start_frame', 0))
self.end_frame_input.setValue(self.project_info.get('end_frame', 0))
self.start_frame_input.setEnabled(bool(self.project_info.get('start_frame')))
self.end_frame_input.setEnabled(bool(self.project_info.get('start_frame')))
# resolution
self.resolution_x_input.setValue(self.project_info.get('resolution_x', 1920))
self.resolution_y_input.setValue(self.project_info.get('resolution_y', 1080))
self.resolution_x_input.setEnabled(bool(self.project_info.get('resolution_x')))
self.resolution_y_input.setEnabled(bool(self.project_info.get('resolution_y')))
# frame rate
self.frame_rate_input.setValue(self.project_info.get('fps', 24))
self.frame_rate_input.setEnabled(bool(self.project_info.get('fps')))
# Cameras
self.cameras_list.clear()
@@ -348,7 +360,7 @@ class NewRenderJobForm(QWidget):
# Dynamic Engine Options
clear_layout(self.renderer_options_layout) # clear old options
# dynamically populate option list
self.current_engine_options = engine().ui_options()
self.current_engine_options = engine().ui_options(self.project_info)
for option in self.current_engine_options:
h_layout = QHBoxLayout()
label = QLabel(option['name'].replace('_', ' ').capitalize() + ':')
@@ -449,16 +461,14 @@ class SubmitWorker(QThread):
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
'renderer': self.window.renderer_type.currentText().lower(),
'engine_version': self.window.renderer_version_combo.currentText(),
'args': {'raw': self.window.raw_args.text(),
'export_format': self.window.file_format_combo.currentText()},
'output_path': self.window.render_name_input.text(),
'args': {'raw': self.window.raw_args.text()},
'output_path': self.window.output_path_input.text(),
'start_frame': self.window.start_frame_input.value(),
'end_frame': self.window.end_frame_input.value(),
'priority': self.window.priority_input.currentIndex() + 1,
'notes': self.window.notes_input.toPlainText(),
'enable_split_jobs': self.window.enable_splitjobs.isChecked(),
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked(),
'name': self.window.render_name_input.text()}
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked()}
# get the dynamic args
for i in range(self.window.renderer_options_layout.count()):
@@ -487,8 +497,7 @@ class SubmitWorker(QThread):
for cam in selected_cameras:
job_copy = copy.deepcopy(job_json)
job_copy['args']['camera'] = cam
job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '')
job_copy['output_path'] = job_copy['name']
job_copy['name'] = pathlib.Path(input_path).stem.replace(' ', '_') + "-" + cam.replace(' ', '')
job_list.append(job_copy)
else:
job_list = [job_json]
@@ -497,8 +506,12 @@ class SubmitWorker(QThread):
engine = EngineManager.engine_with_name(self.window.renderer_type.currentText().lower())
input_path = engine().perform_presubmission_tasks(input_path)
# submit
result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_list=job_list,
callback=create_callback)
result = None
try:
result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_list=job_list,
callback=create_callback)
except Exception as e:
pass
self.message_signal.emit(result)

View File

@@ -1,3 +1,4 @@
import sys
import logging
from PyQt6.QtGui import QFont
@@ -15,10 +16,7 @@ class QSignalHandler(logging.Handler, QObject):
def emit(self, record):
msg = self.format(record)
try:
self.new_record.emit(msg) # Emit signal
except RuntimeError:
pass
self.new_record.emit(msg) # Emit signal
class ConsoleWindow(QMainWindow):

View File

@@ -183,13 +183,8 @@ class MainWindow(QMainWindow):
def __background_update(self):
while True:
try:
self.update_servers()
self.fetch_jobs()
except RuntimeError:
pass
except Exception as e:
logger.error(f"Uncaught exception in background update: {e}")
self.update_servers()
self.fetch_jobs()
time.sleep(0.5)
def closeEvent(self, event):
@@ -377,36 +372,30 @@ class MainWindow(QMainWindow):
def load_image_path(self, image_path):
# Load and set the image using QPixmap
try:
pixmap = QPixmap(image_path)
if not pixmap:
logger.error("Error loading image")
return
self.image_label.setPixmap(pixmap)
except Exception as e:
logger.error(f"Error loading image path: {e}")
pixmap = QPixmap(image_path)
if not pixmap:
logger.error("Error loading image")
return
self.image_label.setPixmap(pixmap)
def load_image_data(self, pillow_image):
try:
# Convert the Pillow Image to a QByteArray (byte buffer)
byte_array = QByteArray()
buffer = QBuffer(byte_array)
buffer.open(QIODevice.OpenModeFlag.WriteOnly)
pillow_image.save(buffer, "PNG")
buffer.close()
# Convert the Pillow Image to a QByteArray (byte buffer)
byte_array = QByteArray()
buffer = QBuffer(byte_array)
buffer.open(QIODevice.OpenModeFlag.WriteOnly)
pillow_image.save(buffer, "PNG")
buffer.close()
# Create a QImage from the QByteArray
image = QImage.fromData(byte_array)
# Create a QImage from the QByteArray
image = QImage.fromData(byte_array)
# Create a QPixmap from the QImage
pixmap = QPixmap.fromImage(image)
# Create a QPixmap from the QImage
pixmap = QPixmap.fromImage(image)
if not pixmap:
logger.error("Error loading image")
return
self.image_label.setPixmap(pixmap)
except Exception as e:
logger.error(f"Error loading image data: {e}")
if not pixmap:
logger.error("Error loading image")
return
self.image_label.setPixmap(pixmap)
def update_servers(self):
found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames))

View File

@@ -32,23 +32,20 @@ class StatusBar(QStatusBar):
# Check for status change every 1s on background thread
while True:
try:
# update status label - get download status
new_status = proxy.status()
if EngineManager.download_tasks:
if len(EngineManager.download_tasks) == 1:
task = EngineManager.download_tasks[0]
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
else:
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
self.messageLabel.setText(new_status)
new_status = proxy.status()
new_image_name = image_names.get(new_status, 'Synchronize.png')
image_path = os.path.join(resources_dir(), new_image_name)
self.label.setPixmap((QPixmap(image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
# update status image
new_image_name = image_names.get(new_status, 'Synchronize.png')
new_image_path = os.path.join(resources_dir(), new_image_name)
self.label.setPixmap((QPixmap(new_image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
except RuntimeError: # ignore runtime errors during shutdown
pass
# add download status
if EngineManager.download_tasks:
if len(EngineManager.download_tasks) == 1:
task = EngineManager.download_tasks[0]
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
else:
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
self.messageLabel.setText(new_status)
time.sleep(1)
background_thread = threading.Thread(target=background_update,)

View File

@@ -10,7 +10,7 @@ class Config:
max_content_path = 100000000
server_log_level = 'debug'
log_buffer_length = 250
worker_process_timeout = 120
subjob_connection_timeout = 120
flask_log_level = 'error'
flask_debug_enable = False
queue_eval_seconds = 1
@@ -28,7 +28,7 @@ class Config:
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)

View File

@@ -2,8 +2,7 @@ import logging
import socket
from pubsub import pub
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
NotRunningException
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException
logger = logging.getLogger()
@@ -32,14 +31,12 @@ class ZeroconfServer:
def start(cls, listen_only=False):
if not cls.service_type:
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
logger.debug("Starting zeroconf service")
if not listen_only:
cls._register_service()
cls._browse_services()
@classmethod
def stop(cls):
logger.debug("Stopping zeroconf service")
cls._unregister_service()
cls.zeroconf.close()
@@ -76,18 +73,15 @@ class ZeroconfServer:
@classmethod
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
try:
info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}")
if service_type == cls.service_type:
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
cls.client_cache[hostname] = info
else:
cls.client_cache.pop(hostname)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
except NotRunningException:
pass
info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}")
if service_type == cls.service_type:
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
cls.client_cache[hostname] = info
else:
cls.client_cache.pop(hostname)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
@classmethod
def found_hostnames(cls):
@@ -110,15 +104,9 @@ class ZeroconfServer:
# Example usage:
if __name__ == "__main__":
import time
logging.basicConfig(level=logging.DEBUG)
ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080)
try:
ZeroconfServer.start()
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
input("Server running - Press enter to end")
finally:
ZeroconfServer.stop()