mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Compare commits
10 Commits
feature/84
...
feature/bl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a464910426 | ||
|
|
a4ff36ac56 | ||
| 51a5a63944 | |||
| 3600eeb21b | |||
| 6afb6e65a6 | |||
|
|
90d5e9b7af | ||
| 4df41a2079 | |||
| 1cdb7810bf | |||
|
|
21011e47ca | ||
|
|
86977b9d6d |
@@ -3,7 +3,7 @@ update_engines_on_launch: true
|
||||
max_content_path: 100000000
|
||||
server_log_level: info
|
||||
log_buffer_length: 250
|
||||
subjob_connection_timeout: 120
|
||||
worker_process_timeout: 120
|
||||
flask_log_level: error
|
||||
flask_debug_enable: false
|
||||
queue_eval_seconds: 1
|
||||
|
||||
279
dashboard.py
Normal file
279
dashboard.py
Normal file
@@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
import datetime
|
||||
import os.path
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from rich import box
|
||||
from rich.console import Console
|
||||
from rich.layout import Layout
|
||||
from rich.live import Live
|
||||
from rich.panel import Panel
|
||||
from rich.table import Column
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
from rich.tree import Tree
|
||||
|
||||
from src.engines.core.base_worker import RenderStatus, string_to_status
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.utilities.misc_helper import get_time_elapsed
|
||||
from start_server import start_server
|
||||
|
||||
"""
|
||||
The RenderDashboard is designed to be run on a remote machine or on the local server
|
||||
This provides a detailed status of all jobs running on the server
|
||||
"""
|
||||
|
||||
status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green',
|
||||
RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple',
|
||||
RenderStatus.RUNNING: 'cyan'}
|
||||
|
||||
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
|
||||
RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED]
|
||||
|
||||
renderer_colors = {'ffmpeg': '[magenta]', 'blender': '[orange1]', 'aerender': '[purple]'}
|
||||
|
||||
local_hostname = socket.gethostname()
|
||||
|
||||
|
||||
def status_string_to_color(status_string):
|
||||
job_status = string_to_status(status_string)
|
||||
job_color = '[{}]'.format(status_colors[job_status])
|
||||
return job_color
|
||||
|
||||
|
||||
def sorted_jobs(all_jobs):
|
||||
|
||||
sort_by_date = True
|
||||
if not sort_by_date:
|
||||
sorted_job_list = []
|
||||
if all_jobs:
|
||||
for status_category in categories:
|
||||
found_jobs = [x for x in all_jobs if x['status'] == status_category.value]
|
||||
if found_jobs:
|
||||
sorted_found_jobs = sorted(found_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True)
|
||||
sorted_job_list.extend(sorted_found_jobs)
|
||||
else:
|
||||
sorted_job_list = sorted(all_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True)
|
||||
return sorted_job_list
|
||||
|
||||
|
||||
def create_node_tree(all_server_data) -> Tree:
|
||||
main_tree = Tree("[magenta]Server Cluster")
|
||||
|
||||
for server_host, server_data in all_server_data['servers'].items():
|
||||
|
||||
node_title_local = f"[cyan bold]{server_host}[/] [yellow](This Computer)[default]"
|
||||
node_title_remote = f"[cyan]{server_host} [magenta](Remote)[default]"
|
||||
node_tree_text = node_title_local if (server_host == local_hostname) else node_title_remote
|
||||
|
||||
if server_data.get('is_online', False):
|
||||
|
||||
node_tree_text = node_tree_text + " - [green]Running"
|
||||
node_tree = Tree(node_tree_text)
|
||||
|
||||
stats_text = f"CPU: [yellow]{server_data['status']['cpu_percent']}% [default]| RAM: " \
|
||||
f"[yellow]{server_data['status']['memory_percent']}% [default]| Cores: " \
|
||||
f"[yellow]{server_data['status']['cpu_count']} [default]| " \
|
||||
f"{server_data['status']['platform'].split('-')[0]}"
|
||||
|
||||
node_tree.add(Tree(stats_text))
|
||||
|
||||
running_jobs = [job for job in server_data['jobs'] if job['status'] == RenderStatus.RUNNING.value]
|
||||
not_started = [job for job in server_data['jobs'] if job['status'] == RenderStatus.NOT_STARTED.value]
|
||||
scheduled = [job for job in server_data['jobs'] if job['status'] == RenderStatus.SCHEDULED.value]
|
||||
jobs_to_display = running_jobs + not_started + scheduled
|
||||
|
||||
jobs_tree = Tree(f"Running: [green]{len(running_jobs)} [default]| Queued: [cyan]{len(not_started)}"
|
||||
f"[default] | Scheduled: [cyan]{len(scheduled)}")
|
||||
|
||||
for job in jobs_to_display:
|
||||
renderer = f"{renderer_colors[job['renderer']]}{job['renderer']}[default]"
|
||||
filename = os.path.basename(job['input_path']).split('.')[0]
|
||||
if job['status'] == RenderStatus.RUNNING.value:
|
||||
jobs_tree.add(f"[bold]{renderer} {filename} ({job['id']}) - {status_string_to_color(job['status'])}{(float(job['percent_complete']) * 100):.1f}%")
|
||||
else:
|
||||
jobs_tree.add(f"{filename} ({job['id']}) - {status_string_to_color(job['status'])}{job['status'].title()}")
|
||||
|
||||
if not jobs_to_display:
|
||||
jobs_tree.add("[italic]No running jobs")
|
||||
|
||||
node_tree.add(jobs_tree)
|
||||
main_tree.add(node_tree)
|
||||
else:
|
||||
# if server is offline
|
||||
node_tree_text = node_tree_text + " - [red]Offline"
|
||||
node_tree = Tree(node_tree_text)
|
||||
main_tree.add(node_tree)
|
||||
return main_tree
|
||||
|
||||
|
||||
def create_jobs_table(all_server_data) -> Table:
|
||||
table = Table("ID", "Name", "Renderer", Column(header="Priority", justify="center"),
|
||||
Column(header="Status", justify="center"), Column(header="Time Elapsed", justify="right"),
|
||||
Column(header="# Frames", justify="right"), "Client", show_lines=True,
|
||||
box=box.HEAVY_HEAD)
|
||||
|
||||
all_jobs = []
|
||||
for server_name, server_data in all_server_data['servers'].items():
|
||||
for job in server_data['jobs']:
|
||||
#todo: clean this up
|
||||
all_jobs.append(job)
|
||||
|
||||
all_jobs = sorted_jobs(all_jobs)
|
||||
|
||||
for job in all_jobs:
|
||||
|
||||
job_status = string_to_status(job['status'])
|
||||
job_color = '[{}]'.format(status_colors[job_status])
|
||||
job_text = f"{job_color}" + job_status.value.title()
|
||||
|
||||
if job_status == RenderStatus.ERROR and job['errors']:
|
||||
job_text = job_text + "\n" + "\n".join(job['errors'])
|
||||
|
||||
# Project name
|
||||
project_name = job_color + (job['name'] or os.path.basename(job['input_path']))
|
||||
elapsed_time = get_time_elapsed(datetime.datetime.fromisoformat(job['start_time']),
|
||||
datetime.datetime.fromisoformat(job['end_time']))
|
||||
|
||||
if job_status == RenderStatus.RUNNING:
|
||||
job_text = f"{job_color}[bold]Running - {float(job['percent_complete']) * 100:.1f}%"
|
||||
elapsed_time = "[bold]" + elapsed_time
|
||||
project_name = "[bold]" + project_name
|
||||
elif job_status == RenderStatus.CANCELLED or job_status == RenderStatus.ERROR:
|
||||
project_name = "[strike]" + project_name
|
||||
|
||||
# Priority
|
||||
priority_color = ["red", "yellow", "cyan"][(job['priority'] - 1)]
|
||||
|
||||
client_name = job['client'] or 'unknown'
|
||||
client_colors = {'unknown': '[red]', local_hostname: '[yellow]'}
|
||||
client_title = client_colors.get(client_name, '[magenta]') + client_name
|
||||
|
||||
table.add_row(
|
||||
job['id'],
|
||||
project_name,
|
||||
renderer_colors.get(job['renderer'], '[cyan]') + job['renderer'] + '[default]-' + job['renderer_version'],
|
||||
f"[{priority_color}]{job['priority']}",
|
||||
job_text,
|
||||
elapsed_time,
|
||||
str(max(int(job['total_frames']), 1)),
|
||||
client_title
|
||||
)
|
||||
|
||||
return table
|
||||
|
||||
|
||||
def create_status_panel(all_server_data):
|
||||
for key, value in all_server_data['servers'].items():
|
||||
if key == local_hostname:
|
||||
return str(value['status'])
|
||||
return "no status"
|
||||
|
||||
|
||||
class KeyboardThread(threading.Thread):
|
||||
|
||||
def __init__(self, input_cbk = None, name='keyboard-input-thread'):
|
||||
self.input_cbk = input_cbk
|
||||
super(KeyboardThread, self).__init__(name=name)
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.input_cbk(input()) #waits to get input + Return
|
||||
|
||||
|
||||
def my_callback(inp):
|
||||
#evaluate the keyboard input
|
||||
print('You Entered:', inp)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
get_server_ip = input("Enter server IP or None for local: ") or local_hostname
|
||||
|
||||
server_proxy = RenderServerProxy(get_server_ip, "8080")
|
||||
|
||||
if not server_proxy.connect():
|
||||
if server_proxy.hostname == local_hostname:
|
||||
start_server_input = input("Local server not running. Start server? (y/n) ")
|
||||
if start_server_input and start_server_input[0].lower() == "y":
|
||||
# Startup the local server
|
||||
start_server()
|
||||
test = server_proxy.connect()
|
||||
print(f"connected? {test}")
|
||||
else:
|
||||
print(f"\nUnable to connect to server: {server_proxy.hostname}")
|
||||
print("\nVerify IP address is correct and server is running")
|
||||
exit(1)
|
||||
|
||||
# start the Keyboard thread
|
||||
# kthread = KeyboardThread(my_callback)
|
||||
|
||||
# Console Layout
|
||||
console = Console()
|
||||
layout = Layout()
|
||||
|
||||
# Divide the "screen" in to three parts
|
||||
layout.split(
|
||||
Layout(name="header", size=3),
|
||||
Layout(ratio=1, name="main")
|
||||
# Layout(size=10, name="footer"),
|
||||
)
|
||||
# Divide the "main" layout in to "side" and "body"
|
||||
layout["main"].split_row(
|
||||
Layout(name="side"),
|
||||
Layout(name="body",
|
||||
ratio=3))
|
||||
# Divide the "side" layout in to two
|
||||
layout["side"].split(Layout(name="side_top"), Layout(name="side_bottom"))
|
||||
|
||||
# Server connection header
|
||||
header_text = Text(f"Connected to server: ")
|
||||
header_text.append(f"{server_proxy.hostname} ", style="green")
|
||||
if server_proxy.hostname == local_hostname:
|
||||
header_text.append("(This Computer)", style="yellow")
|
||||
else:
|
||||
header_text.append("(Remote)", style="magenta")
|
||||
|
||||
# background process to update server data independent of the UI
|
||||
def fetch_server_data(server):
|
||||
while True:
|
||||
fetched_data = server.get_data(timeout=5)
|
||||
if fetched_data:
|
||||
server.fetched_status_data = fetched_data
|
||||
time.sleep(1)
|
||||
|
||||
x = threading.Thread(target=fetch_server_data, args=(server_proxy,))
|
||||
x.daemon = True
|
||||
x.start()
|
||||
|
||||
# draw and update the UI
|
||||
with Live(console=console, screen=False, refresh_per_second=1, transient=True) as live:
|
||||
while True:
|
||||
try:
|
||||
if server_proxy.fetched_status_data:
|
||||
|
||||
server_online = False
|
||||
if server_proxy.fetched_status_data.get('timestamp', None):
|
||||
timestamp = datetime.datetime.fromisoformat(server_proxy.fetched_status_data['timestamp'])
|
||||
time_diff = datetime.datetime.now() - timestamp
|
||||
server_online = time_diff.seconds < 10 # client is offline if not updated in certain time
|
||||
|
||||
layout["body"].update(create_jobs_table(server_proxy.fetched_status_data))
|
||||
layout["side_top"].update(Panel(create_node_tree(server_proxy.fetched_status_data)))
|
||||
layout["side_bottom"].update(Panel(create_status_panel(server_proxy.fetched_status_data)))
|
||||
|
||||
online_text = "Online" if server_online else "Offline"
|
||||
online_color = "green" if server_online else "red"
|
||||
layout["header"].update(Panel(Text(f"Zordon Render Client - Version 0.0.1 alpha - {online_text}",
|
||||
justify="center", style=online_color)))
|
||||
live.update(layout, refresh=False)
|
||||
except Exception as e:
|
||||
print(f"Exception updating table: {e}")
|
||||
traceback.print_exception(e)
|
||||
time.sleep(1)
|
||||
# # # todo: Add input prompt to manage running jobs (ie add, cancel, get info, etc)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
from src.api.api_server import start_server
|
||||
from init import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
start_server()
|
||||
run(server_only=True)
|
||||
|
||||
8
setup.py
8
setup.py
@@ -11,13 +11,7 @@ from setuptools import setup
|
||||
APP = ['main.py']
|
||||
DATA_FILES = [('config', glob.glob('config/*.*')),
|
||||
('resources', glob.glob('resources/*.*'))]
|
||||
OPTIONS = {
|
||||
'excludes': ['PySide6'],
|
||||
'includes': ['zeroconf', 'zeroconf._services.info'],
|
||||
'plist': {
|
||||
'LSMinimumSystemVersion': '10.15', # Specify minimum macOS version
|
||||
},
|
||||
}
|
||||
OPTIONS = {}
|
||||
|
||||
setup(
|
||||
app=APP,
|
||||
|
||||
@@ -49,12 +49,11 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory):
|
||||
raise ValueError(f"Error downloading file from URL: {project_url}")
|
||||
elif local_path and os.path.exists(local_path):
|
||||
referred_name = os.path.basename(local_path)
|
||||
|
||||
else:
|
||||
raise ValueError("Cannot find any valid project paths")
|
||||
|
||||
# Prepare the local filepath
|
||||
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
|
||||
cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-')
|
||||
job_dir = os.path.join(upload_directory, '-'.join(
|
||||
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
|
||||
os.makedirs(job_dir, exist_ok=True)
|
||||
|
||||
@@ -2,14 +2,12 @@
|
||||
import concurrent.futures
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import socket
|
||||
import ssl
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from zipfile import ZipFile
|
||||
@@ -17,19 +15,19 @@ from zipfile import ZipFile
|
||||
import psutil
|
||||
import yaml
|
||||
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
|
||||
from sqlalchemy.orm.exc import DetachedInstanceError
|
||||
|
||||
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
|
||||
from src.api.serverproxy_manager import ServerProxyManager
|
||||
from src.api.preview_manager import PreviewManager
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
from src.engines.core.base_worker import string_to_status, RenderStatus
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.render_queue import RenderQueue, JobNotFoundError
|
||||
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
|
||||
from src.utilities.config import Config
|
||||
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
|
||||
current_system_os_version, num_to_alphanumeric
|
||||
from src.utilities.server_helper import generate_thumbnail_for_job
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
|
||||
|
||||
logger = logging.getLogger()
|
||||
server = Flask(__name__)
|
||||
@@ -39,6 +37,29 @@ categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED
|
||||
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
|
||||
|
||||
|
||||
# -- Error Handlers --
|
||||
|
||||
@server.errorhandler(JobNotFoundError)
|
||||
def handle_job_not_found(job_error):
|
||||
return str(job_error), 400
|
||||
|
||||
|
||||
@server.errorhandler(DetachedInstanceError)
|
||||
def handle_detached_instance(error):
|
||||
# logger.debug(f"detached instance: {error}")
|
||||
return "Unavailable", 503
|
||||
|
||||
|
||||
@server.errorhandler(Exception)
|
||||
def handle_general_error(general_error):
|
||||
err_msg = f"Server error: {general_error}"
|
||||
logger.error(err_msg)
|
||||
return err_msg, 500
|
||||
|
||||
|
||||
# -- Jobs --
|
||||
|
||||
|
||||
def sorted_jobs(all_jobs, sort_by_date=True):
|
||||
if not sort_by_date:
|
||||
sorted_job_list = []
|
||||
@@ -60,9 +81,11 @@ def jobs_json():
|
||||
job_cache_int = int(json.dumps(all_jobs).__hash__())
|
||||
job_cache_token = num_to_alphanumeric(job_cache_int)
|
||||
return {'jobs': all_jobs, 'token': job_cache_token}
|
||||
except DetachedInstanceError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception fetching jobs_json: {e}")
|
||||
return {}, 500
|
||||
logger.error(f"Error fetching jobs_json: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
@server.get('/api/jobs_long_poll')
|
||||
@@ -78,50 +101,40 @@ def long_polling_jobs():
|
||||
if time.time() - start_time > 30:
|
||||
return {}, 204
|
||||
time.sleep(1)
|
||||
except DetachedInstanceError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception fetching long_polling_jobs: {e}")
|
||||
return {}, 500
|
||||
logger.error(f"Error fetching long_polling_jobs: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
@server.route('/api/job/<job_id>/thumbnail')
|
||||
def job_thumbnail(job_id):
|
||||
|
||||
try:
|
||||
big_thumb = request.args.get('size', False) == "big"
|
||||
video_ok = request.args.get('video_ok', False)
|
||||
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||
if found_job:
|
||||
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
|
||||
|
||||
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
|
||||
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
||||
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
|
||||
big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4')
|
||||
big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg')
|
||||
# trigger a thumbnail update - just in case
|
||||
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
|
||||
previews = PreviewManager.get_previews_for_job(found_job)
|
||||
all_previews_list = previews.get('output', previews.get('input', []))
|
||||
|
||||
# generate regular thumb if it doesn't exist
|
||||
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \
|
||||
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
|
||||
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
|
||||
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
|
||||
filtered_list = video_previews if video_previews and video_ok else image_previews
|
||||
|
||||
# generate big thumb if it doesn't exist
|
||||
if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \
|
||||
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800)
|
||||
|
||||
# generated videos
|
||||
if video_ok:
|
||||
if big_thumb and os.path.exists(big_video_path) and not os.path.exists(
|
||||
big_video_path + '_IN-PROGRESS'):
|
||||
return send_file(big_video_path, mimetype="video/mp4")
|
||||
elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
||||
return send_file(thumb_video_path, mimetype="video/mp4")
|
||||
|
||||
# Generated thumbs
|
||||
if big_thumb and os.path.exists(big_image_path):
|
||||
return send_file(big_image_path, mimetype='image/jpeg')
|
||||
elif os.path.exists(thumb_image_path):
|
||||
return send_file(thumb_image_path, mimetype='image/jpeg')
|
||||
|
||||
return found_job.status.value, 200
|
||||
return found_job.status.value, 404
|
||||
# todo - sort by size or other metrics here
|
||||
if filtered_list:
|
||||
preview_to_send = filtered_list[0]
|
||||
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
|
||||
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
|
||||
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
|
||||
except Exception as e:
|
||||
logger.error(f'Error getting thumbnail: {e}')
|
||||
return f'Error getting thumbnail: {e}', 500
|
||||
return "No thumbnail available", 404
|
||||
|
||||
|
||||
# Get job file routing
|
||||
@@ -146,22 +159,17 @@ def filtered_jobs_json(status_val):
|
||||
return f'Cannot find jobs with status {status_val}', 400
|
||||
|
||||
|
||||
@server.post('/api/job/<job_id>/notify_parent_of_status_change')
|
||||
def subjob_status_change(job_id):
|
||||
@server.post('/api/job/<job_id>/send_subjob_update_notification')
|
||||
def subjob_update_notification(job_id):
|
||||
try:
|
||||
subjob_details = request.json
|
||||
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
|
||||
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
||||
DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
||||
return Response(status=200)
|
||||
except JobNotFoundError:
|
||||
return "Job not found", 404
|
||||
|
||||
|
||||
@server.errorhandler(JobNotFoundError)
|
||||
def handle_job_not_found(job_error):
|
||||
return f'Cannot find job with ID {job_error.job_id}', 400
|
||||
|
||||
|
||||
@server.get('/api/job/<job_id>')
|
||||
def get_job_status(job_id):
|
||||
return RenderQueue.job_with_id(job_id).json()
|
||||
@@ -180,7 +188,22 @@ def get_job_logs(job_id):
|
||||
|
||||
@server.get('/api/job/<job_id>/file_list')
|
||||
def get_file_list(job_id):
|
||||
return RenderQueue.job_with_id(job_id).file_list()
|
||||
return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()]
|
||||
|
||||
|
||||
@server.route('/api/job/<job_id>/download')
|
||||
def download_file(job_id):
|
||||
|
||||
requested_filename = request.args.get('filename')
|
||||
if not requested_filename:
|
||||
return 'Filename required', 400
|
||||
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
for job_filename in found_job.file_list():
|
||||
if os.path.basename(job_filename).lower() == requested_filename.lower():
|
||||
return send_file(job_filename, as_attachment=True, )
|
||||
|
||||
return f"File '{requested_filename}' not found", 404
|
||||
|
||||
|
||||
@server.route('/api/job/<job_id>/download_all')
|
||||
@@ -305,14 +328,10 @@ def delete_job(job_id):
|
||||
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# Remove any thumbnails
|
||||
for filename in os.listdir(server.config['THUMBS_FOLDER']):
|
||||
if job_id in filename:
|
||||
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
|
||||
|
||||
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
||||
if os.path.exists(thumb_path):
|
||||
os.remove(thumb_path)
|
||||
try:
|
||||
PreviewManager.delete_previews_for_job(found_job)
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting previews for {found_job}: {e}")
|
||||
|
||||
# See if we own the project_dir (i.e. was it uploaded)
|
||||
project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
|
||||
@@ -463,7 +482,6 @@ def delete_engine_download():
|
||||
@server.get('/api/renderer/<renderer>/args')
|
||||
def get_renderer_args(renderer):
|
||||
try:
|
||||
# todo: possibly deprecate
|
||||
renderer_engine_class = EngineManager.engine_with_name(renderer)
|
||||
return renderer_engine_class().get_arguments()
|
||||
except LookupError:
|
||||
@@ -490,60 +508,24 @@ def get_disk_benchmark():
|
||||
return {'write_speed': results[0], 'read_speed': results[-1]}
|
||||
|
||||
|
||||
def start_server():
|
||||
def eval_loop(delay_sec=1):
|
||||
while True:
|
||||
RenderQueue.evaluate_queue()
|
||||
time.sleep(delay_sec)
|
||||
def start_server(hostname=None):
|
||||
|
||||
# get hostname
|
||||
if not hostname:
|
||||
local_hostname = socket.gethostname()
|
||||
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
||||
hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
||||
|
||||
# load flask settings
|
||||
server.config['HOSTNAME'] = local_hostname
|
||||
server.config['HOSTNAME'] = hostname
|
||||
server.config['PORT'] = int(Config.port_number)
|
||||
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
|
||||
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs'))
|
||||
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
|
||||
server.config['enable_split_jobs'] = Config.enable_split_jobs
|
||||
|
||||
# Setup directory for saving engines to
|
||||
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
|
||||
'engines')))
|
||||
os.makedirs(EngineManager.engines_path, exist_ok=True)
|
||||
|
||||
# Debug info
|
||||
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
|
||||
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
|
||||
logger.debug(f"Engines directory: {EngineManager.engines_path}")
|
||||
|
||||
# disable most Flask logging
|
||||
flask_log = logging.getLogger('werkzeug')
|
||||
flask_log.setLevel(Config.flask_log_level.upper())
|
||||
|
||||
# check for updates for render engines if configured or on first launch
|
||||
if Config.update_engines_on_launch or not EngineManager.get_engines():
|
||||
EngineManager.update_all_engines()
|
||||
|
||||
# Set up the RenderQueue object
|
||||
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
|
||||
ServerProxyManager.subscribe_to_listener()
|
||||
DistributedJobManager.subscribe_to_listener()
|
||||
|
||||
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
|
||||
thread.start()
|
||||
|
||||
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
|
||||
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
|
||||
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
|
||||
'system_os': current_system_os(),
|
||||
'system_os_version': current_system_os_version()}
|
||||
ZeroconfServer.start()
|
||||
|
||||
try:
|
||||
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
|
||||
use_reloader=False, threaded=True)
|
||||
finally:
|
||||
RenderQueue.save_state()
|
||||
ZeroconfServer.stop()
|
||||
logger.debug('Starting API server')
|
||||
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False,
|
||||
threaded=True)
|
||||
|
||||
113
src/api/preview_manager.py
Normal file
113
src/api/preview_manager.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
|
||||
|
||||
logger = logging.getLogger()
|
||||
supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
|
||||
supported_image_formats = ['.jpg', '.png', '.exr', '.tif']
|
||||
|
||||
|
||||
class PreviewManager:
|
||||
|
||||
storage_path = None
|
||||
_running_jobs = {}
|
||||
|
||||
@classmethod
|
||||
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320):
|
||||
|
||||
# Determine best source file to use for thumbs
|
||||
job_file_list = job.file_list()
|
||||
source_files = job_file_list if job_file_list else [job.input_path]
|
||||
preview_label = "output" if job_file_list else "input"
|
||||
|
||||
# filter by type
|
||||
found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats]
|
||||
found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats]
|
||||
|
||||
# check if we even have any valid files to work from
|
||||
if source_files and not found_video_files and not found_image_files:
|
||||
logger.warning(f"No valid image or video files found in files from job: {job}")
|
||||
return
|
||||
|
||||
os.makedirs(cls.storage_path, exist_ok=True)
|
||||
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
|
||||
preview_video_path = base_path + '.mp4'
|
||||
preview_image_path = base_path + '.jpg'
|
||||
|
||||
if replace_existing:
|
||||
for x in [preview_image_path, preview_video_path]:
|
||||
try:
|
||||
os.remove(x)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Generate image previews
|
||||
if (found_video_files or found_image_files) and not os.path.exists(preview_image_path):
|
||||
try:
|
||||
path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1]
|
||||
logger.debug(f"Generating image preview for {path_of_source}")
|
||||
save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width)
|
||||
logger.debug(f"Successfully created image preview for {path_of_source}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating image preview for {job}: {e}")
|
||||
|
||||
# Generate video previews
|
||||
if found_video_files and not os.path.exists(preview_video_path):
|
||||
try:
|
||||
path_of_source = found_video_files[0]
|
||||
logger.debug(f"Generating video preview for {path_of_source}")
|
||||
generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width)
|
||||
logger.debug(f"Successfully created video preview for {path_of_source}")
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"Error generating video preview for {job}: {e}")
|
||||
|
||||
@classmethod
|
||||
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
|
||||
job_thread = cls._running_jobs.get(job.id)
|
||||
if job_thread and job_thread.is_alive():
|
||||
logger.debug(f'Preview generation job already running for {job}')
|
||||
else:
|
||||
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
|
||||
job_thread.start()
|
||||
cls._running_jobs[job.id] = job_thread
|
||||
|
||||
if wait_until_completion:
|
||||
job_thread.join(timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
def get_previews_for_job(cls, job):
|
||||
|
||||
results = {}
|
||||
try:
|
||||
directory_path = Path(cls.storage_path)
|
||||
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
|
||||
|
||||
for preview_filename in preview_files_for_job:
|
||||
try:
|
||||
pixel_width = str(preview_filename).split('-')[-1]
|
||||
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
|
||||
extension = os.path.splitext(preview_filename)[-1].lower()
|
||||
kind = 'video' if extension in supported_video_formats else \
|
||||
'image' if extension in supported_image_formats else 'unknown'
|
||||
results[preview_label] = results.get(preview_label, [])
|
||||
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
|
||||
except IndexError: # ignore invalid filenames
|
||||
pass
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
def delete_previews_for_job(cls, job):
|
||||
all_previews = cls.get_previews_for_job(job)
|
||||
flattened_list = [item for sublist in all_previews.values() for item in sublist]
|
||||
for preview in flattened_list:
|
||||
try:
|
||||
logger.debug(f"Removing preview: {preview['filename']}")
|
||||
os.remove(preview['filename'])
|
||||
except OSError as e:
|
||||
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")
|
||||
@@ -171,19 +171,19 @@ class RenderServerProxy:
|
||||
def get_all_engines(self):
|
||||
return self.request_data('all_engines')
|
||||
|
||||
def notify_parent_of_status_change(self, parent_id, subjob):
|
||||
def send_subjob_update_notification(self, parent_id, subjob):
|
||||
"""
|
||||
Notifies the parent job of a status change in a subjob.
|
||||
Notifies the parent job of an update in a subjob.
|
||||
|
||||
Args:
|
||||
parent_id (str): The ID of the parent job.
|
||||
subjob (Job): The subjob that has changed status.
|
||||
subjob (Job): The subjob that has updated.
|
||||
|
||||
Returns:
|
||||
Response: The response from the server.
|
||||
"""
|
||||
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
|
||||
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification',
|
||||
json=subjob.json())
|
||||
|
||||
def post_job_to_server(self, file_path, job_list, callback=None):
|
||||
@@ -232,19 +232,27 @@ class RenderServerProxy:
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {e}")
|
||||
|
||||
def get_job_files(self, job_id, save_path):
|
||||
def get_job_files_list(self, job_id):
|
||||
return self.request_data(f"job/{job_id}/file_list")
|
||||
|
||||
def download_all_job_files(self, job_id, save_path):
|
||||
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
|
||||
return self.download_file(url, filename=save_path)
|
||||
return self.__download_file_from_url(url, output_filepath=save_path)
|
||||
|
||||
def download_job_file(self, job_id, job_filename, save_path):
|
||||
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}"
|
||||
return self.__download_file_from_url(url, output_filepath=save_path)
|
||||
|
||||
@staticmethod
|
||||
def download_file(url, filename):
|
||||
def __download_file_from_url(url, output_filepath):
|
||||
with requests.get(url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(filename, 'wb') as f:
|
||||
with open(output_filepath, 'wb') as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
return filename
|
||||
return output_filepath
|
||||
|
||||
# --- Renderer --- #
|
||||
|
||||
|
||||
@@ -10,9 +10,11 @@ import requests
|
||||
from plyer import notification
|
||||
from pubsub import pub
|
||||
|
||||
from src.api.preview_manager import PreviewManager
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.render_queue import RenderQueue
|
||||
from src.utilities.config import Config
|
||||
from src.utilities.misc_helper import get_file_size_human
|
||||
from src.utilities.status_utils import RenderStatus, string_to_status
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
@@ -32,6 +34,43 @@ class DistributedJobManager:
|
||||
This should be called once, typically during the initialization phase.
|
||||
"""
|
||||
pub.subscribe(cls.__local_job_status_changed, 'status_change')
|
||||
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
|
||||
|
||||
@classmethod
|
||||
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
|
||||
|
||||
"""
|
||||
Responds to the 'frame_complete' pubsub message for local jobs.
|
||||
|
||||
Parameters:
|
||||
job_id (str): The ID of the job that has changed status.
|
||||
old_status (str): The previous status of the job.
|
||||
new_status (str): The new (current) status of the job.
|
||||
|
||||
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
|
||||
"""
|
||||
|
||||
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||
if not render_job: # ignore jobs not in the queue
|
||||
return
|
||||
|
||||
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
|
||||
replace_existing_previews = (frame_number % update_interval) == 0
|
||||
cls.__job_update_shared(render_job, replace_existing_previews)
|
||||
|
||||
@classmethod
|
||||
def __job_update_shared(cls, render_job, replace_existing_previews=False):
|
||||
# update previews
|
||||
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
|
||||
|
||||
# notify parent to allow individual frames to be copied instead of waiting until the end
|
||||
if render_job.parent:
|
||||
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
|
||||
try:
|
||||
logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}')
|
||||
RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job)
|
||||
except Exception as e:
|
||||
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
|
||||
|
||||
@classmethod
|
||||
def __local_job_status_changed(cls, job_id, old_status, new_status):
|
||||
@@ -52,15 +91,15 @@ class DistributedJobManager:
|
||||
return
|
||||
|
||||
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
|
||||
if render_job.parent: # If local job is a subjob from a remote server
|
||||
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
|
||||
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
|
||||
|
||||
# handle cancelling all the children
|
||||
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
|
||||
|
||||
# Handle children
|
||||
if render_job.children:
|
||||
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
|
||||
for child in render_job.children:
|
||||
child_id, hostname = child.split('@')
|
||||
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
|
||||
child_id, child_hostname = child.split('@')
|
||||
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
|
||||
|
||||
# UI Notifications
|
||||
try:
|
||||
@@ -97,8 +136,7 @@ class DistributedJobManager:
|
||||
"""
|
||||
Creates render jobs.
|
||||
|
||||
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render
|
||||
job for each job data in the list and appends the result to a list. The list of results is then returned.
|
||||
This method job data and a local path to a loaded project. It creates and returns new a render job.
|
||||
|
||||
Args:
|
||||
job_data (dict): Job data.
|
||||
@@ -134,6 +172,7 @@ class DistributedJobManager:
|
||||
worker.priority = int(job_data.get('priority', worker.priority))
|
||||
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
|
||||
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
|
||||
worker.watchdog_timeout = Config.worker_process_timeout
|
||||
worker.hostname = socket.gethostname()
|
||||
|
||||
# determine if we can / should split the job
|
||||
@@ -143,6 +182,7 @@ class DistributedJobManager:
|
||||
logger.debug("Not splitting into subjobs")
|
||||
|
||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||
PreviewManager.update_previews_for_job(worker)
|
||||
|
||||
return worker
|
||||
|
||||
@@ -151,9 +191,9 @@ class DistributedJobManager:
|
||||
# --------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def handle_subjob_status_change(cls, local_job, subjob_data):
|
||||
def handle_subjob_update_notification(cls, local_job, subjob_data):
|
||||
"""
|
||||
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
||||
Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
|
||||
|
||||
Args:
|
||||
local_job (BaseRenderWorker): The local parent job worker.
|
||||
@@ -162,27 +202,40 @@ class DistributedJobManager:
|
||||
|
||||
subjob_status = string_to_status(subjob_data['status'])
|
||||
subjob_id = subjob_data['id']
|
||||
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
|
||||
hostname.split('@')[0] == subjob_id), None)
|
||||
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
|
||||
subjob_hostname = subjob_data['hostname']
|
||||
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
||||
old_status = local_job.children.get(subjob_key, {}).get('status')
|
||||
local_job.children[subjob_key] = subjob_data
|
||||
|
||||
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
|
||||
logname = f"<Parent: {local_job.id} | Child: {subjob_key}>"
|
||||
if old_status != subjob_status.value:
|
||||
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
||||
|
||||
# Download complete or partial render jobs
|
||||
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
|
||||
subjob_data['file_count']:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
# todo: handle error
|
||||
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
||||
|
||||
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
||||
# todo: determine missing frames and schedule new job
|
||||
pass
|
||||
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
|
||||
@staticmethod
|
||||
def download_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||
def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||
|
||||
try:
|
||||
local_files = [os.path.basename(x) for x in local_job.file_list()]
|
||||
subjob_proxy = RenderServerProxy(subjob_hostname)
|
||||
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
|
||||
|
||||
for subjob_filename in subjob_files:
|
||||
if subjob_filename not in local_files:
|
||||
try:
|
||||
logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}")
|
||||
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
|
||||
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
|
||||
save_path=local_save_path)
|
||||
logger.debug(f'Downloaded successfully - {local_save_path}')
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
|
||||
|
||||
@staticmethod
|
||||
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||
"""
|
||||
Downloads and extracts files from a completed subjob on a remote server.
|
||||
|
||||
@@ -203,7 +256,7 @@ class DistributedJobManager:
|
||||
try:
|
||||
local_job.children[child_key]['download_status'] = 'working'
|
||||
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
||||
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
||||
RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
|
||||
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading files from remote server: {e}")
|
||||
@@ -227,6 +280,7 @@ class DistributedJobManager:
|
||||
|
||||
@classmethod
|
||||
def wait_for_subjobs(cls, local_job):
|
||||
# todo: rewrite this method
|
||||
logger.debug(f"Waiting for subjobs for job {local_job}")
|
||||
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
||||
@@ -266,10 +320,10 @@ class DistributedJobManager:
|
||||
|
||||
# Check if job is finished, but has not had files copied yet over yet
|
||||
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
logger.error("Failed to download from subjob")
|
||||
# todo: error handling here
|
||||
try:
|
||||
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading missing frames from subjob: {e}")
|
||||
|
||||
# Any finished jobs not successfully downloaded at this point are skipped
|
||||
if local_job.children[child_key].get('download_status', None) is None and \
|
||||
|
||||
@@ -1,74 +1,22 @@
|
||||
import glob
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from src.engines.core.base_engine import BaseRenderEngine
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class AERender(BaseRenderEngine):
|
||||
|
||||
file_extensions = ['aepx']
|
||||
supported_extensions = ['.aep']
|
||||
|
||||
def version(self):
|
||||
version = None
|
||||
try:
|
||||
render_path = self.renderer_path()
|
||||
if render_path:
|
||||
ver_out = subprocess.run([render_path, '-version'], capture_output=True, text=True)
|
||||
version = ver_out.stdout.split(" ")[-1].strip()
|
||||
ver_out = subprocess.check_output([render_path, '-version'], timeout=SUBPROCESS_TIMEOUT)
|
||||
version = ver_out.decode('utf-8').split(" ")[-1].strip()
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to get {self.name()} version: {e}')
|
||||
return version
|
||||
|
||||
@classmethod
|
||||
def default_renderer_path(cls):
|
||||
paths = glob.glob('/Applications/*After Effects*/aerender')
|
||||
if len(paths) > 1:
|
||||
logger.warning('Multiple After Effects installations detected')
|
||||
elif not paths:
|
||||
logger.error('After Effects installation not found')
|
||||
return paths[0]
|
||||
|
||||
def get_project_info(self, project_path, timeout=10):
|
||||
scene_info = {}
|
||||
try:
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
tree = ET.parse(project_path)
|
||||
root = tree.getroot()
|
||||
namespace = {'ae': 'http://www.adobe.com/products/aftereffects'}
|
||||
|
||||
comp_names = []
|
||||
for item in root.findall(".//ae:Item", namespace):
|
||||
if item.find("ae:Layr", namespace) is not None:
|
||||
for string in item.findall("./ae:string", namespace):
|
||||
comp_names.append(string.text)
|
||||
scene_info['comp_names'] = comp_names
|
||||
except Exception as e:
|
||||
logger.error(f'Error getting file details for .aepx file: {e}')
|
||||
return scene_info
|
||||
|
||||
def run_javascript(self, script_path, project_path, timeout=None):
|
||||
# todo: implement
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_output_formats(cls):
|
||||
# todo: create implementation
|
||||
return []
|
||||
|
||||
def ui_options(self, project_info):
|
||||
from src.engines.aerender.aerender_ui import AERenderUI
|
||||
return AERenderUI.get_options(self, project_info)
|
||||
|
||||
@classmethod
|
||||
def worker_class(cls):
|
||||
from src.engines.aerender.aerender_worker import AERenderWorker
|
||||
return AERenderWorker
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
x = AERender().get_project_info('/Users/brett/ae_testing/project.aepx')
|
||||
print(x)
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
|
||||
class AERenderUI:
|
||||
@staticmethod
|
||||
def get_options(instance, project_info):
|
||||
options = [
|
||||
{'name': 'comp', 'options': project_info.get('comp_names', [])}
|
||||
]
|
||||
return options
|
||||
@@ -9,39 +9,72 @@ import time
|
||||
from src.engines.core.base_worker import BaseRenderWorker, timecode_to_frames
|
||||
from src.engines.aerender.aerender_engine import AERender
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
def aerender_path():
|
||||
paths = glob.glob('/Applications/*After Effects*/aerender')
|
||||
if len(paths) > 1:
|
||||
logging.warning('Multiple After Effects installations detected')
|
||||
elif not paths:
|
||||
logging.error('After Effects installation not found')
|
||||
else:
|
||||
return paths[0]
|
||||
|
||||
|
||||
class AERenderWorker(BaseRenderWorker):
|
||||
|
||||
supported_extensions = ['.aep']
|
||||
engine = AERender
|
||||
|
||||
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
|
||||
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, engine_path=engine_path,
|
||||
args=args, parent=parent, name=name)
|
||||
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
|
||||
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
|
||||
parent=parent, name=name)
|
||||
|
||||
# temp files for processing stdout
|
||||
self.__progress_history = []
|
||||
self.__temp_attributes = {}
|
||||
self.comp = args.get('comp', None)
|
||||
self.render_settings = args.get('render_settings', None)
|
||||
self.omsettings = args.get('omsettings', None)
|
||||
|
||||
self.progress = 0
|
||||
self.progress_history = []
|
||||
self.attributes = {}
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
comp = self.args.get('comp', 'Comp 1')
|
||||
render_settings = self.args.get('render_settings', None)
|
||||
omsettings = self.args.get('omsettings', None)
|
||||
|
||||
command = [self.renderer_path, '-project', self.input_path, '-comp', f'"{comp}"']
|
||||
|
||||
if render_settings:
|
||||
command.extend(['-RStemplate', render_settings])
|
||||
|
||||
if omsettings:
|
||||
command.extend(['-OMtemplate', omsettings])
|
||||
|
||||
command.extend(['-s', self.start_frame,
|
||||
'-e', self.end_frame,
|
||||
'-output', self.output_path])
|
||||
return command
|
||||
if os.path.exists('nexrender-cli-macos'):
|
||||
logging.info('nexrender found')
|
||||
# {
|
||||
# "template": {
|
||||
# "src": String,
|
||||
# "composition": String,
|
||||
#
|
||||
# "frameStart": Number,
|
||||
# "frameEnd": Number,
|
||||
# "frameIncrement": Number,
|
||||
#
|
||||
# "continueOnMissing": Boolean,
|
||||
# "settingsTemplate": String,
|
||||
# "outputModule": String,
|
||||
# "outputExt": String,
|
||||
# },
|
||||
# "assets": [],
|
||||
# "actions": {
|
||||
# "prerender": [],
|
||||
# "postrender": [],
|
||||
# },
|
||||
# "onChange": Function,
|
||||
# "onRenderProgress": Function
|
||||
# }
|
||||
job = {'template':
|
||||
{
|
||||
'src': 'file://' + self.input_path, 'composition': self.comp.replace('"', ''),
|
||||
'settingsTemplate': self.render_settings.replace('"', ''),
|
||||
'outputModule': self.omsettings.replace('"', ''), 'outputExt': 'mov'}
|
||||
}
|
||||
x = ['./nexrender-cli-macos', "'{}'".format(json.dumps(job))]
|
||||
else:
|
||||
logging.info('nexrender not found')
|
||||
x = [aerender_path(), '-project', self.input_path, '-comp', self.comp, '-RStemplate', self.render_settings,
|
||||
'-OMtemplate', self.omsettings, '-output', self.output_path]
|
||||
return x
|
||||
|
||||
def _parse_stdout(self, line):
|
||||
|
||||
@@ -50,12 +83,12 @@ class AERenderWorker(BaseRenderWorker):
|
||||
# print 'progress'
|
||||
trimmed = line.replace('PROGRESS:', '').strip()
|
||||
if len(trimmed):
|
||||
self.__progress_history.append(line)
|
||||
self.progress_history.append(line)
|
||||
if 'Seconds' in trimmed:
|
||||
self._update_progress(line)
|
||||
elif ': ' in trimmed:
|
||||
tmp = trimmed.split(': ')
|
||||
self.__temp_attributes[tmp[0].strip()] = tmp[1].strip()
|
||||
self.attributes[tmp[0].strip()] = tmp[1].strip()
|
||||
elif line.startswith('WARNING:'):
|
||||
trimmed = line.replace('WARNING:', '').strip()
|
||||
self.warnings.append(trimmed)
|
||||
@@ -66,28 +99,28 @@ class AERenderWorker(BaseRenderWorker):
|
||||
def _update_progress(self, line):
|
||||
|
||||
if not self.total_frames:
|
||||
duration_string = self.__temp_attributes.get('Duration', None)
|
||||
frame_rate = self.__temp_attributes.get('Frame Rate', '0').split(' ')[0]
|
||||
duration_string = self.attributes.get('Duration', None)
|
||||
frame_rate = self.attributes.get('Frame Rate', '0').split(' ')[0]
|
||||
self.total_frames = timecode_to_frames(duration_string.split('Duration:')[-1], float(frame_rate))
|
||||
|
||||
match = re.match(r'PROGRESS:.*\((?P<frame>\d+)\): (?P<time>\d+)', line).groupdict()
|
||||
self.current_frame = match['frame']
|
||||
self.last_frame = match['frame']
|
||||
|
||||
def average_frame_duration(self):
|
||||
|
||||
total_durations = 0
|
||||
|
||||
for line in self.__progress_history:
|
||||
for line in self.progress_history:
|
||||
match = re.match(r'PROGRESS:.*\((?P<frame>\d+)\): (?P<time>\d+)', line)
|
||||
if match:
|
||||
total_durations += int(match.group(2))
|
||||
|
||||
average = float(total_durations) / self.current_frame
|
||||
average = float(total_durations) / self.last_frame
|
||||
return average
|
||||
|
||||
def percent_complete(self):
|
||||
if self.total_frames:
|
||||
return (float(self.current_frame) / float(self.total_frames)) * 100
|
||||
return (float(self.last_frame) / float(self.total_frames)) * 100
|
||||
else:
|
||||
return 0
|
||||
|
||||
@@ -95,11 +128,8 @@ class AERenderWorker(BaseRenderWorker):
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
|
||||
|
||||
r = AERenderWorker(input_path='/Users/brett/ae_testing/project.aepx',
|
||||
output_path='/Users/brett/ae_testing/project.mp4',
|
||||
engine_path=AERenderWorker.engine.default_renderer_path(),
|
||||
args={'start_frame': 1, 'end_frame': 5})
|
||||
|
||||
r = AERenderWorker('/Users/brett/Desktop/Youtube_Vids/Film_Formats/Frame_Animations.aep', '"Film Pan"',
|
||||
'"Draft Settings"', '"ProRes"', '/Users/brett/Desktop/test_render')
|
||||
r.start()
|
||||
while r.is_running():
|
||||
time.sleep(0.1)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import re
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from src.engines.core.base_engine import *
|
||||
from src.utilities.misc_helper import system_safe_path
|
||||
@@ -11,22 +12,25 @@ class Blender(BaseRenderEngine):
|
||||
|
||||
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
|
||||
binary_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender'}
|
||||
file_extensions = ['blend']
|
||||
|
||||
@staticmethod
|
||||
def downloader():
|
||||
from src.engines.blender.blender_downloader import BlenderDownloader
|
||||
return BlenderDownloader
|
||||
|
||||
@classmethod
|
||||
def worker_class(cls):
|
||||
@staticmethod
|
||||
def worker_class():
|
||||
from src.engines.blender.blender_worker import BlenderRenderWorker
|
||||
return BlenderRenderWorker
|
||||
|
||||
def ui_options(self, project_info):
|
||||
def ui_options(self):
|
||||
from src.engines.blender.blender_ui import BlenderUI
|
||||
return BlenderUI.get_options(self)
|
||||
|
||||
@staticmethod
|
||||
def supported_extensions():
|
||||
return ['blend']
|
||||
|
||||
def version(self):
|
||||
version = None
|
||||
try:
|
||||
@@ -112,7 +116,7 @@ class Blender(BaseRenderEngine):
|
||||
logger.error(f'Error packing .blend file: {e}')
|
||||
return None
|
||||
|
||||
def get_arguments(self): # possibly deprecate
|
||||
def get_arguments(self):
|
||||
help_text = subprocess.check_output([self.renderer_path(), '-h']).decode('utf-8')
|
||||
lines = help_text.splitlines()
|
||||
|
||||
@@ -145,7 +149,13 @@ class Blender(BaseRenderEngine):
|
||||
return options
|
||||
|
||||
def system_info(self):
|
||||
return {'render_devices': self.get_render_devices()}
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future_render_devices = executor.submit(self.get_render_devices)
|
||||
future_engines = executor.submit(self.supported_render_engines)
|
||||
render_devices = future_render_devices.result()
|
||||
engines = future_engines.result()
|
||||
|
||||
return {'render_devices': render_devices, 'engines': engines}
|
||||
|
||||
def get_render_devices(self):
|
||||
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')
|
||||
|
||||
@@ -12,12 +12,17 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
engine = Blender
|
||||
|
||||
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
|
||||
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
|
||||
engine_path=engine_path, args=args, parent=parent, name=name)
|
||||
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, engine_path=engine_path, args=args, parent=parent, name=name)
|
||||
|
||||
# Stats
|
||||
self.__frame_percent_complete = 0.0
|
||||
self.current_frame = -1 # todo: is this necessary?
|
||||
|
||||
# Scene Info
|
||||
self.scene_info = Blender(engine_path).get_project_info(input_path)
|
||||
self.start_frame = int(self.scene_info.get('start_frame', 1))
|
||||
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
|
||||
self.project_length = (self.end_frame - self.start_frame) + 1
|
||||
self.current_frame = -1
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
@@ -35,11 +40,13 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
if custom_camera:
|
||||
python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];"
|
||||
|
||||
# Setup Render Engines
|
||||
self.args['engine'] = self.args.get('engine', 'CYCLES').upper() # set default render engine
|
||||
# Configure Cycles
|
||||
if self.args['engine'] == 'CYCLES':
|
||||
# Set Render Device (gpu/cpu/any)
|
||||
blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
|
||||
if blender_engine == 'CYCLES':
|
||||
render_device = self.args.get('render_device', 'any').lower()
|
||||
if render_device not in {'any', 'gpu', 'cpu'}:
|
||||
if render_device not in ['any', 'gpu', 'cpu']:
|
||||
raise AttributeError(f"Invalid Cycles render device: {render_device}")
|
||||
|
||||
use_gpu = render_device in {'any', 'gpu'}
|
||||
@@ -57,7 +64,10 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
# Export format
|
||||
export_format = self.args.get('export_format', None) or 'JPEG'
|
||||
|
||||
path_without_ext = os.path.splitext(self.output_path)[0] + "_"
|
||||
main_part, ext = os.path.splitext(self.output_path)
|
||||
# Remove the extension only if it is not composed entirely of digits
|
||||
path_without_ext = main_part if not ext[1:].isdigit() else self.output_path
|
||||
path_without_ext += "_"
|
||||
cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format])
|
||||
|
||||
# set frame range
|
||||
@@ -102,6 +112,7 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
output_file_number = output_filename_match.groups()[0]
|
||||
try:
|
||||
self.current_frame = int(output_file_number)
|
||||
self._send_frame_complete_notification()
|
||||
except ValueError:
|
||||
pass
|
||||
elif render_stats_match:
|
||||
@@ -110,15 +121,15 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
logger.info(f'Frame #{self.current_frame} - '
|
||||
f'{frame_count} of {self.total_frames} completed in {time_completed} | '
|
||||
f'Total Elapsed Time: {datetime.now() - self.start_time}')
|
||||
else:
|
||||
logger.debug(f'DEBUG: {line}')
|
||||
else:
|
||||
pass
|
||||
# if len(line.strip()):
|
||||
# logger.debug(line.strip())
|
||||
|
||||
def percent_complete(self):
|
||||
if self.total_frames <= 1:
|
||||
if self.status == RenderStatus.COMPLETED:
|
||||
return 1
|
||||
elif self.total_frames <= 1:
|
||||
return self.__frame_percent_complete
|
||||
else:
|
||||
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames
|
||||
|
||||
@@ -19,8 +19,8 @@ for cam_obj in bpy.data.cameras:
|
||||
|
||||
data = {'cameras': cameras,
|
||||
'engine': scene.render.engine,
|
||||
'start_frame': scene.frame_start,
|
||||
'end_frame': scene.frame_end,
|
||||
'frame_start': scene.frame_start,
|
||||
'frame_end': scene.frame_end,
|
||||
'resolution_x': scene.render.resolution_x,
|
||||
'resolution_y': scene.render.resolution_y,
|
||||
'resolution_percentage': scene.render.resolution_percentage,
|
||||
|
||||
@@ -9,12 +9,12 @@ SUBPROCESS_TIMEOUT = 5
|
||||
class BaseRenderEngine(object):
|
||||
|
||||
install_paths = []
|
||||
file_extensions = []
|
||||
supported_extensions = []
|
||||
|
||||
def __init__(self, custom_path=None):
|
||||
self.custom_renderer_path = custom_path
|
||||
if not self.renderer_path() or not os.path.exists(self.renderer_path()):
|
||||
raise FileNotFoundError(f"Cannot find path ({self.renderer_path()}) for renderer '{self.name()}'")
|
||||
raise FileNotFoundError(f"Cannot find path to renderer for {self.name()} instance")
|
||||
|
||||
if not os.access(self.renderer_path(), os.X_OK):
|
||||
logger.warning(f"Path is not executable. Setting permissions to 755 for {self.renderer_path()}")
|
||||
@@ -47,18 +47,19 @@ class BaseRenderEngine(object):
|
||||
def downloader(): # override when subclassing if using a downloader class
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def worker_class(cls): # override when subclassing to link worker class
|
||||
raise NotImplementedError(f"Worker class not implemented for engine {cls.name()}")
|
||||
@staticmethod
|
||||
def worker_class(): # override when subclassing to link worker class
|
||||
raise NotImplementedError("Worker class not implemented")
|
||||
|
||||
def ui_options(self, project_info): # override to return options for ui
|
||||
def ui_options(self): # override to return options for ui
|
||||
return {}
|
||||
|
||||
def get_help(self): # override if renderer uses different help flag
|
||||
path = self.renderer_path()
|
||||
if not path:
|
||||
raise FileNotFoundError("renderer path not found")
|
||||
help_doc = subprocess.run([path, '-h'], capture_output=True, text=True).stdout.strip()
|
||||
help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT,
|
||||
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
|
||||
return help_doc
|
||||
|
||||
def get_project_info(self, project_path, timeout=10):
|
||||
@@ -68,10 +69,6 @@ class BaseRenderEngine(object):
|
||||
def get_output_formats(cls):
|
||||
raise NotImplementedError(f"get_output_formats not implemented for {cls.__name__}")
|
||||
|
||||
@classmethod
|
||||
def supported_extensions(cls):
|
||||
return cls.file_extensions
|
||||
|
||||
def get_arguments(self):
|
||||
pass
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import psutil
|
||||
@@ -81,11 +82,8 @@ class BaseRenderWorker(Base):
|
||||
# Frame Ranges
|
||||
self.project_length = 0 # is this necessary?
|
||||
self.current_frame = 0
|
||||
|
||||
# Get Project Info
|
||||
self.scene_info = self.engine(engine_path).get_project_info(project_path=input_path)
|
||||
self.start_frame = int(self.scene_info.get('start_frame', 1))
|
||||
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
|
||||
self.start_frame = 0
|
||||
self.end_frame = None
|
||||
|
||||
# Logging
|
||||
self.start_time = None
|
||||
@@ -97,10 +95,15 @@ class BaseRenderWorker(Base):
|
||||
self.errors = []
|
||||
|
||||
# Threads and processes
|
||||
self.__thread = threading.Thread(target=self.run, args=())
|
||||
self.__thread = threading.Thread(target=self.__run, args=())
|
||||
self.__thread.daemon = True
|
||||
self.__process = None
|
||||
self.last_output = None
|
||||
self.__last_output_time = None
|
||||
self.watchdog_timeout = 120
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Job id:{self.id} p{self.priority} {self.renderer}-{self.renderer_version} '{self.name}' status:{self.status.value}>"
|
||||
|
||||
@property
|
||||
def total_frames(self):
|
||||
@@ -124,19 +127,16 @@ class BaseRenderWorker(Base):
|
||||
self._status = RenderStatus.CANCELLED.value
|
||||
return string_to_status(self._status)
|
||||
|
||||
def validate(self):
|
||||
if not os.path.exists(self.input_path):
|
||||
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
|
||||
self.generate_subprocess()
|
||||
def _send_frame_complete_notification(self):
|
||||
pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame)
|
||||
|
||||
def generate_subprocess(self):
|
||||
# Convert raw args from string if available and catch conflicts
|
||||
generated_args = [str(x) for x in self.generate_worker_subprocess()]
|
||||
generated_args_flags = [x for x in generated_args if x.startswith('-')]
|
||||
if len(generated_args_flags) != len(set(generated_args_flags)):
|
||||
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
|
||||
msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}"
|
||||
logger.error(msg)
|
||||
logger.debug(f"Generated args for subprocess: {generated_args}")
|
||||
raise ValueError(msg)
|
||||
return generated_args
|
||||
|
||||
@@ -178,11 +178,12 @@ class BaseRenderWorker(Base):
|
||||
|
||||
self.status = RenderStatus.RUNNING
|
||||
self.start_time = datetime.now()
|
||||
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
|
||||
f'Frame Count: {self.total_frames}')
|
||||
self.__thread.start()
|
||||
|
||||
def run(self):
|
||||
def __run(self):
|
||||
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
|
||||
f'Frame Count: {self.total_frames}')
|
||||
|
||||
# Setup logging
|
||||
log_dir = os.path.dirname(self.log_path())
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
@@ -195,7 +196,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} "
|
||||
f"render for {self.input_path}\n\n")
|
||||
f.write(f"Running command: \"{' '.join(subprocess_cmds)}\"\n")
|
||||
f.write(f"Running command: {subprocess_cmds}\n")
|
||||
f.write('=' * 80 + '\n\n')
|
||||
|
||||
while True:
|
||||
@@ -210,50 +211,44 @@ class BaseRenderWorker(Base):
|
||||
else:
|
||||
f.write(f'\n{"=" * 20} Attempt #{failed_attempts + 1} {"=" * 20}\n\n')
|
||||
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
|
||||
|
||||
# Start process and get updates
|
||||
self.status = RenderStatus.RUNNING
|
||||
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||
universal_newlines=False)
|
||||
|
||||
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
|
||||
f.write(c)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
self.last_output = c.strip()
|
||||
self._parse_stdout(c.strip())
|
||||
|
||||
f.write('\n')
|
||||
|
||||
# Check return codes and process
|
||||
return_code = self.__process.wait()
|
||||
return_code = self.__setup_and_run_process(f, subprocess_cmds)
|
||||
self.end_time = datetime.now()
|
||||
|
||||
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled
|
||||
message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \
|
||||
f"after {self.time_elapsed()}\n\n"
|
||||
f.write(message)
|
||||
|
||||
# Teardown
|
||||
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
message = f"{self.engine.name()} render ended with status '{self.status}' " \
|
||||
f"after {self.time_elapsed()}"
|
||||
f.write(message)
|
||||
return
|
||||
|
||||
# if file output hasn't increased, return as error, otherwise restart process.
|
||||
if len(self.file_list()) <= initial_file_count:
|
||||
err_msg = f"File count has not increased. Count is still {len(self.file_list())}"
|
||||
f.write(f'Error: {err_msg}\n\n')
|
||||
self.errors.append(err_msg)
|
||||
self.status = RenderStatus.ERROR
|
||||
|
||||
# Handle completed - All else counts as failed attempt
|
||||
if (self.status == RenderStatus.COMPLETED) and not return_code:
|
||||
file_count_has_increased = len(self.file_list()) > initial_file_count
|
||||
if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code:
|
||||
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
|
||||
f"{self.time_elapsed()}\n")
|
||||
f.write(message)
|
||||
break
|
||||
|
||||
# Handle non-zero return codes
|
||||
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \
|
||||
f"after {self.time_elapsed()}\n\n"
|
||||
f.write(message)
|
||||
self.errors.append(message)
|
||||
if return_code:
|
||||
err_msg = f"{self.engine.name()} render failed with code {return_code}"
|
||||
logger.error(err_msg)
|
||||
self.errors.append(err_msg)
|
||||
|
||||
# handle instances where renderer exits ok but doesnt generate files
|
||||
if not return_code and not file_count_has_increased:
|
||||
err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. "
|
||||
f"Count is still {len(self.file_list())}")
|
||||
f.write(f'Error: {err_msg}\n\n')
|
||||
self.errors.append(err_msg)
|
||||
|
||||
# only count the attempt as failed if renderer creates no output - ignore error codes for now
|
||||
if not file_count_has_increased:
|
||||
failed_attempts += 1
|
||||
|
||||
if self.children:
|
||||
@@ -266,6 +261,65 @@ class BaseRenderWorker(Base):
|
||||
self.status = RenderStatus.COMPLETED
|
||||
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
|
||||
|
||||
def __setup_and_run_process(self, f, subprocess_cmds):
|
||||
|
||||
def watchdog():
|
||||
logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout')
|
||||
while self.__process.poll() is None:
|
||||
time_since_last_update = time.time() - self.__last_output_time
|
||||
if time_since_last_update > self.watchdog_timeout:
|
||||
logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)")
|
||||
self.__process.kill()
|
||||
break
|
||||
# logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}')
|
||||
time.sleep(1)
|
||||
|
||||
logger.debug(f'Stopping process watchdog for {self}')
|
||||
|
||||
return_code = -1
|
||||
watchdog_thread = threading.Thread(target=watchdog)
|
||||
watchdog_thread.daemon = True
|
||||
|
||||
try:
|
||||
# Start process and get updates
|
||||
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||
universal_newlines=False)
|
||||
|
||||
# Start watchdog
|
||||
self.__last_output_time = time.time()
|
||||
watchdog_thread.start()
|
||||
|
||||
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
|
||||
self.last_output = c.strip()
|
||||
self.__last_output_time = time.time()
|
||||
try:
|
||||
f.write(c)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving log to disk: {e}")
|
||||
|
||||
try:
|
||||
self._parse_stdout(c.strip())
|
||||
except Exception as e:
|
||||
logger.error(f'Error parsing stdout: {e}')
|
||||
|
||||
f.write('\n')
|
||||
|
||||
# Check return codes and process
|
||||
return_code = self.__process.wait()
|
||||
except Exception as e:
|
||||
message = f'Uncaught error running render process: {e}'
|
||||
f.write(message)
|
||||
logger.exception(message)
|
||||
self.__process.kill()
|
||||
|
||||
# let watchdog end before continuing - prevents multiple watchdogs running when process restarts
|
||||
if watchdog_thread.is_alive():
|
||||
watchdog_thread.join()
|
||||
|
||||
return return_code
|
||||
|
||||
def post_processing(self):
|
||||
pass
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import concurrent.futures
|
||||
|
||||
from src.engines.blender.blender_engine import Blender
|
||||
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
||||
from src.engines.aerender.aerender_engine import AERender
|
||||
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu
|
||||
|
||||
logger = logging.getLogger()
|
||||
@@ -19,7 +18,7 @@ class EngineManager:
|
||||
|
||||
@staticmethod
|
||||
def supported_engines():
|
||||
return [Blender, FFMPEG, AERender]
|
||||
return [Blender, FFMPEG]
|
||||
|
||||
@classmethod
|
||||
def engine_with_name(cls, engine_name):
|
||||
@@ -80,20 +79,17 @@ class EngineManager:
|
||||
'type': 'system'
|
||||
}
|
||||
|
||||
if not filter_name:
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = {
|
||||
executor.submit(fetch_engine_details, eng): eng.name()
|
||||
for eng in cls.supported_engines()
|
||||
if eng.default_renderer_path()
|
||||
if eng.default_renderer_path() and (not filter_name or filter_name == eng.name())
|
||||
}
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
result = future.result()
|
||||
if result:
|
||||
results.append(result)
|
||||
else:
|
||||
results.append(fetch_engine_details(cls.engine_with_name(filter_name)))
|
||||
|
||||
return results
|
||||
|
||||
@@ -176,7 +172,7 @@ class EngineManager:
|
||||
|
||||
if background:
|
||||
return thread
|
||||
else:
|
||||
|
||||
thread.join()
|
||||
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
|
||||
if not found_engine:
|
||||
@@ -208,13 +204,19 @@ class EngineManager:
|
||||
def engine_update_task(engine_class):
|
||||
logger.debug(f"Checking for updates to {engine_class.name()}")
|
||||
latest_version = engine_class.downloader().find_most_recent_version()
|
||||
if latest_version:
|
||||
logger.debug(f"Latest version of {engine_class.name()} available: {latest_version.get('version')}")
|
||||
if not cls.is_version_downloaded(engine_class.name(), latest_version.get('version')):
|
||||
logger.info(f"Downloading latest version of {engine_class.name()}...")
|
||||
cls.download_engine(engine=engine_class.name(), version=latest_version['version'], background=True)
|
||||
else:
|
||||
logger.warning(f"Unable to get check for updates for {engine.name()}")
|
||||
|
||||
if not latest_version:
|
||||
logger.warning(f"Could not find most recent version of {engine.name()} to download")
|
||||
return
|
||||
|
||||
version_num = latest_version.get('version')
|
||||
if cls.is_version_downloaded(engine_class.name(), version_num):
|
||||
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
|
||||
return
|
||||
|
||||
# download the engine
|
||||
logger.info(f"Downloading latest version of {engine_class.name()} ({version_num})...")
|
||||
cls.download_engine(engine=engine_class.name(), version=version_num, background=True)
|
||||
|
||||
logger.info(f"Checking for updates for render engines...")
|
||||
threads = []
|
||||
@@ -298,6 +300,6 @@ if __name__ == '__main__':
|
||||
|
||||
# print(EngineManager.newest_engine_version('blender', 'macos', 'arm64'))
|
||||
# EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a')
|
||||
EngineManager.engines_path = "/Users/brettwilliams/zordon-uploads/engines/"
|
||||
EngineManager.engines_path = "/Users/brettwilliams/zordon-uploads/engines"
|
||||
# print(EngineManager.is_version_downloaded("ffmpeg", "6.0"))
|
||||
print(EngineManager.get_engines())
|
||||
|
||||
@@ -132,8 +132,8 @@ class FFMPEGDownloader(EngineDownloader):
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
return cls.all_versions(system_os, cpu)[0]
|
||||
except (IndexError, requests.exceptions.RequestException):
|
||||
logger.error(f"Cannot get most recent version of ffmpeg")
|
||||
except (IndexError, requests.exceptions.RequestException) as e:
|
||||
logger.error(f"Cannot get most recent version of ffmpeg: {e}")
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -12,26 +12,24 @@ class FFMPEG(BaseRenderEngine):
|
||||
from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader
|
||||
return FFMPEGDownloader
|
||||
|
||||
@classmethod
|
||||
def worker_class(cls):
|
||||
@staticmethod
|
||||
def worker_class():
|
||||
from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker
|
||||
return FFMPEGRenderWorker
|
||||
|
||||
def ui_options(self, project_info):
|
||||
def ui_options(self):
|
||||
from src.engines.ffmpeg.ffmpeg_ui import FFMPEGUI
|
||||
return FFMPEGUI.get_options(self, project_info)
|
||||
return FFMPEGUI.get_options(self)
|
||||
|
||||
@classmethod
|
||||
def supported_extensions(cls):
|
||||
if not cls.file_extensions:
|
||||
help_text = (subprocess.check_output([cls().renderer_path(), '-h', 'full'], stderr=subprocess.STDOUT)
|
||||
.decode('utf-8'))
|
||||
found = re.findall(r'extensions that .* is allowed to access \(default "(.*)"', help_text)
|
||||
found_extensions = set()
|
||||
for match in found:
|
||||
found_extensions.update(match.split(','))
|
||||
cls.file_extensions = list(found_extensions)
|
||||
return cls.file_extensions
|
||||
return list(found_extensions)
|
||||
|
||||
def version(self):
|
||||
version = None
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
class FFMPEGUI:
|
||||
@staticmethod
|
||||
def get_options(instance, project_info):
|
||||
def get_options(instance):
|
||||
options = []
|
||||
return options
|
||||
|
||||
126
src/init.py
126
src/init.py
@@ -1,22 +1,27 @@
|
||||
''' app/init.py '''
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
from PyQt6.QtCore import QObject, pyqtSignal
|
||||
from PyQt6.QtWidgets import QApplication
|
||||
|
||||
from src.api.api_server import start_server
|
||||
from src.api.preview_manager import PreviewManager
|
||||
from src.api.serverproxy_manager import ServerProxyManager
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.render_queue import RenderQueue
|
||||
from src.ui.main_window import MainWindow
|
||||
from src.utilities.config import Config
|
||||
from src.utilities.misc_helper import system_safe_path
|
||||
from src.utilities.misc_helper import system_safe_path, current_system_cpu, current_system_os, current_system_os_version
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def run() -> int:
|
||||
def run(server_only=False) -> int:
|
||||
"""
|
||||
Initializes the application and runs it.
|
||||
|
||||
@@ -24,44 +29,94 @@ def run() -> int:
|
||||
int: The exit status code.
|
||||
"""
|
||||
|
||||
# setup logging
|
||||
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
||||
level=Config.server_log_level.upper())
|
||||
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
|
||||
# Setup logging for console ui
|
||||
buffer_handler = __setup_buffer_handler() if not server_only else None
|
||||
|
||||
logger.info(f"Starting Zordon Render Server")
|
||||
return_code = 0
|
||||
try:
|
||||
# Load Config YAML
|
||||
Config.setup_config_dir()
|
||||
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
|
||||
|
||||
# configure default paths
|
||||
EngineManager.engines_path = system_safe_path(
|
||||
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
|
||||
'engines')))
|
||||
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
||||
level=Config.server_log_level.upper())
|
||||
os.makedirs(EngineManager.engines_path, exist_ok=True)
|
||||
PreviewManager.storage_path = system_safe_path(
|
||||
os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
|
||||
|
||||
app: QApplication = QApplication(sys.argv)
|
||||
# Debug info
|
||||
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
|
||||
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
|
||||
logger.debug(f"Engines directory: {EngineManager.engines_path}")
|
||||
|
||||
# Start server in background
|
||||
background_server = threading.Thread(target=start_server)
|
||||
background_server.daemon = True
|
||||
background_server.start()
|
||||
# Set up the RenderQueue object
|
||||
RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder)))
|
||||
ServerProxyManager.subscribe_to_listener()
|
||||
DistributedJobManager.subscribe_to_listener()
|
||||
|
||||
# Setup logging for console ui
|
||||
buffer_handler = BufferingHandler()
|
||||
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
|
||||
logger = logging.getLogger()
|
||||
logger.addHandler(buffer_handler)
|
||||
# check for updates for render engines if configured or on first launch
|
||||
if Config.update_engines_on_launch or not EngineManager.get_engines():
|
||||
EngineManager.update_all_engines()
|
||||
|
||||
window: MainWindow = MainWindow()
|
||||
window.buffer_handler = buffer_handler
|
||||
window.show()
|
||||
# get hostname
|
||||
local_hostname = socket.gethostname()
|
||||
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
||||
|
||||
return_code = app.exec()
|
||||
# configure and start API server
|
||||
api_server = threading.Thread(target=start_server, args=(local_hostname,))
|
||||
api_server.daemon = True
|
||||
api_server.start()
|
||||
|
||||
# start zeroconf server
|
||||
ZeroconfServer.configure("_zordon._tcp.local.", local_hostname, Config.port_number)
|
||||
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
|
||||
'system_cpu_cores': multiprocessing.cpu_count(),
|
||||
'system_os': current_system_os(),
|
||||
'system_os_version': current_system_os_version()}
|
||||
ZeroconfServer.start()
|
||||
logger.info(f"Zordon Render Server started - Hostname: {local_hostname}")
|
||||
|
||||
RenderQueue.evaluation_inverval = Config.queue_eval_seconds
|
||||
RenderQueue.start()
|
||||
|
||||
# start in gui or server only (cli) mode
|
||||
logger.debug(f"Launching in {'server only' if server_only else 'GUI'} mode")
|
||||
if server_only: # CLI only
|
||||
api_server.join()
|
||||
else: # GUI
|
||||
return_code = __show_gui(buffer_handler)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as e:
|
||||
logging.error(f"Unhandled exception: {e}")
|
||||
return_code = 1
|
||||
finally:
|
||||
# shut down gracefully
|
||||
logger.info(f"Zordon Render Server is preparing to shut down")
|
||||
try:
|
||||
RenderQueue.prepare_for_shutdown()
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception during prepare for shutdown: {e}")
|
||||
ZeroconfServer.stop()
|
||||
logger.info(f"Zordon Render Server has shut down")
|
||||
return sys.exit(return_code)
|
||||
|
||||
|
||||
class BufferingHandler(logging.Handler, QObject):
|
||||
def __setup_buffer_handler():
|
||||
# lazy load GUI frameworks
|
||||
from PyQt6.QtCore import QObject, pyqtSignal
|
||||
|
||||
class BufferingHandler(logging.Handler, QObject):
|
||||
new_record = pyqtSignal(str)
|
||||
|
||||
def __init__(self, capacity=100):
|
||||
@@ -70,9 +125,34 @@ class BufferingHandler(logging.Handler, QObject):
|
||||
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
msg = self.format(record)
|
||||
self.buffer.append(msg) # Add message to the buffer
|
||||
self.new_record.emit(msg) # Emit signal
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
def get_buffer(self):
|
||||
return list(self.buffer) # Return a copy of the buffer
|
||||
|
||||
buffer_handler = BufferingHandler()
|
||||
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
|
||||
logger = logging.getLogger()
|
||||
logger.addHandler(buffer_handler)
|
||||
return buffer_handler
|
||||
|
||||
|
||||
def __show_gui(buffer_handler):
|
||||
# lazy load GUI frameworks
|
||||
from PyQt6.QtWidgets import QApplication
|
||||
|
||||
# load application
|
||||
app: QApplication = QApplication(sys.argv)
|
||||
|
||||
# configure main window
|
||||
from src.ui.main_window import MainWindow
|
||||
window: MainWindow = MainWindow()
|
||||
window.buffer_handler = buffer_handler
|
||||
window.show()
|
||||
|
||||
return app.exec()
|
||||
|
||||
@@ -2,12 +2,13 @@ import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from pubsub import pub
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm.exc import DetachedInstanceError
|
||||
|
||||
from src.utilities.status_utils import RenderStatus
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.engines.core.base_worker import Base
|
||||
from src.utilities.status_utils import RenderStatus
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -17,6 +18,9 @@ class JobNotFoundError(Exception):
|
||||
super().__init__(args)
|
||||
self.job_id = job_id
|
||||
|
||||
def __str__(self):
|
||||
return f"Cannot find job with ID: {self.job_id}"
|
||||
|
||||
|
||||
class RenderQueue:
|
||||
engine = None
|
||||
@@ -24,18 +28,46 @@ class RenderQueue:
|
||||
job_queue = []
|
||||
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
|
||||
last_saved_counts = {}
|
||||
is_running = False
|
||||
__eval_thread = None
|
||||
evaluation_inverval = 1
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
# --------------------------------------------
|
||||
# Start / Stop Background Updates
|
||||
# --------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def start(cls):
|
||||
logger.debug("Starting render queue updates")
|
||||
cls.is_running = True
|
||||
cls.evaluate_queue()
|
||||
|
||||
@classmethod
|
||||
def __local_job_status_changed(cls, job_id, old_status, new_status):
|
||||
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
|
||||
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
|
||||
RenderQueue.evaluate_queue()
|
||||
|
||||
@classmethod
|
||||
def stop(cls):
|
||||
logger.debug("Stopping render queue updates")
|
||||
cls.is_running = False
|
||||
|
||||
# --------------------------------------------
|
||||
# Queue Management
|
||||
# --------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def add_to_render_queue(cls, render_job, force_start=False):
|
||||
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
|
||||
logger.info(f"Adding job to render queue: {render_job}")
|
||||
cls.job_queue.append(render_job)
|
||||
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
|
||||
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
|
||||
cls.start_job(render_job)
|
||||
cls.session.add(render_job)
|
||||
cls.save_state()
|
||||
if cls.is_running:
|
||||
cls.evaluate_queue()
|
||||
|
||||
@classmethod
|
||||
def all_jobs(cls):
|
||||
@@ -81,6 +113,7 @@ class RenderQueue:
|
||||
cls.session = sessionmaker(bind=cls.engine)()
|
||||
from src.engines.core.base_worker import BaseRenderWorker
|
||||
cls.job_queue = cls.session.query(BaseRenderWorker).all()
|
||||
pub.subscribe(cls.__local_job_status_changed, 'status_change')
|
||||
|
||||
@classmethod
|
||||
def save_state(cls):
|
||||
@@ -89,6 +122,7 @@ class RenderQueue:
|
||||
@classmethod
|
||||
def prepare_for_shutdown(cls):
|
||||
logger.debug("Closing session")
|
||||
cls.stop()
|
||||
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
|
||||
[cls.cancel_job(job) for job in running_jobs]
|
||||
cls.save_state()
|
||||
@@ -105,6 +139,7 @@ class RenderQueue:
|
||||
|
||||
@classmethod
|
||||
def evaluate_queue(cls):
|
||||
try:
|
||||
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
|
||||
for job in not_started:
|
||||
if cls.is_available_for_job(job.renderer, job.priority):
|
||||
@@ -118,22 +153,24 @@ class RenderQueue:
|
||||
|
||||
if cls.last_saved_counts != cls.job_counts():
|
||||
cls.save_state()
|
||||
except DetachedInstanceError:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def start_job(cls, job):
|
||||
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
|
||||
logger.info(f'Starting job: {job}')
|
||||
job.start()
|
||||
cls.save_state()
|
||||
|
||||
@classmethod
|
||||
def cancel_job(cls, job):
|
||||
logger.info(f'Cancelling job ID: {job.id}')
|
||||
logger.info(f'Cancelling job: {job}')
|
||||
job.stop()
|
||||
return job.status == RenderStatus.CANCELLED
|
||||
|
||||
@classmethod
|
||||
def delete_job(cls, job):
|
||||
logger.info(f"Deleting job ID: {job.id}")
|
||||
logger.info(f"Deleting job: {job}")
|
||||
job.stop()
|
||||
cls.job_queue.remove(job)
|
||||
cls.session.delete(job)
|
||||
|
||||
@@ -55,7 +55,7 @@ class NewRenderJobForm(QWidget):
|
||||
self.priority_input = None
|
||||
self.end_frame_input = None
|
||||
self.start_frame_input = None
|
||||
self.output_path_input = None
|
||||
self.render_name_input = None
|
||||
self.scene_file_input = None
|
||||
self.scene_file_browse_button = None
|
||||
self.job_name_input = None
|
||||
@@ -136,11 +136,11 @@ class NewRenderJobForm(QWidget):
|
||||
self.output_settings_group = QGroupBox("Output Settings")
|
||||
output_settings_layout = QVBoxLayout(self.output_settings_group)
|
||||
# output path
|
||||
output_path_layout = QHBoxLayout()
|
||||
output_path_layout.addWidget(QLabel("Render name:"))
|
||||
self.output_path_input = QLineEdit()
|
||||
output_path_layout.addWidget(self.output_path_input)
|
||||
output_settings_layout.addLayout(output_path_layout)
|
||||
render_name_layout = QHBoxLayout()
|
||||
render_name_layout.addWidget(QLabel("Render name:"))
|
||||
self.render_name_input = QLineEdit()
|
||||
render_name_layout.addWidget(self.render_name_input)
|
||||
output_settings_layout.addLayout(render_name_layout)
|
||||
# file format
|
||||
file_format_layout = QHBoxLayout()
|
||||
file_format_layout.addWidget(QLabel("Format:"))
|
||||
@@ -281,7 +281,7 @@ class NewRenderJobForm(QWidget):
|
||||
|
||||
output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text()))
|
||||
output_name = output_name.replace(' ', '_')
|
||||
self.output_path_input.setText(output_name)
|
||||
self.render_name_input.setText(output_name)
|
||||
file_name = self.scene_file_input.text()
|
||||
|
||||
# setup bg worker
|
||||
@@ -292,7 +292,7 @@ class NewRenderJobForm(QWidget):
|
||||
def browse_output_path(self):
|
||||
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory")
|
||||
if directory:
|
||||
self.output_path_input.setText(directory)
|
||||
self.render_name_input.setText(directory)
|
||||
|
||||
def args_help_button_clicked(self):
|
||||
url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/'
|
||||
@@ -316,28 +316,16 @@ class NewRenderJobForm(QWidget):
|
||||
self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet
|
||||
# not ideal but if we don't have the renderer info we have to pick something
|
||||
|
||||
self.output_path_input.setText(os.path.basename(input_path))
|
||||
|
||||
# cleanup progress UI
|
||||
self.load_file_group.setHidden(True)
|
||||
self.toggle_renderer_enablement(True)
|
||||
|
||||
# -- Load scene data
|
||||
# start / end frames
|
||||
self.start_frame_input.setValue(self.project_info.get('start_frame', 0))
|
||||
self.end_frame_input.setValue(self.project_info.get('end_frame', 0))
|
||||
self.start_frame_input.setEnabled(bool(self.project_info.get('start_frame')))
|
||||
self.end_frame_input.setEnabled(bool(self.project_info.get('start_frame')))
|
||||
|
||||
# resolution
|
||||
self.resolution_x_input.setValue(self.project_info.get('resolution_x', 1920))
|
||||
self.resolution_y_input.setValue(self.project_info.get('resolution_y', 1080))
|
||||
self.resolution_x_input.setEnabled(bool(self.project_info.get('resolution_x')))
|
||||
self.resolution_y_input.setEnabled(bool(self.project_info.get('resolution_y')))
|
||||
|
||||
# frame rate
|
||||
self.frame_rate_input.setValue(self.project_info.get('fps', 24))
|
||||
self.frame_rate_input.setEnabled(bool(self.project_info.get('fps')))
|
||||
# Load scene data
|
||||
self.start_frame_input.setValue(self.project_info.get('frame_start'))
|
||||
self.end_frame_input.setValue(self.project_info.get('frame_end'))
|
||||
self.resolution_x_input.setValue(self.project_info.get('resolution_x'))
|
||||
self.resolution_y_input.setValue(self.project_info.get('resolution_y'))
|
||||
self.frame_rate_input.setValue(self.project_info.get('fps'))
|
||||
|
||||
# Cameras
|
||||
self.cameras_list.clear()
|
||||
@@ -360,7 +348,7 @@ class NewRenderJobForm(QWidget):
|
||||
# Dynamic Engine Options
|
||||
clear_layout(self.renderer_options_layout) # clear old options
|
||||
# dynamically populate option list
|
||||
self.current_engine_options = engine().ui_options(self.project_info)
|
||||
self.current_engine_options = engine().ui_options()
|
||||
for option in self.current_engine_options:
|
||||
h_layout = QHBoxLayout()
|
||||
label = QLabel(option['name'].replace('_', ' ').capitalize() + ':')
|
||||
@@ -461,14 +449,16 @@ class SubmitWorker(QThread):
|
||||
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
|
||||
'renderer': self.window.renderer_type.currentText().lower(),
|
||||
'engine_version': self.window.renderer_version_combo.currentText(),
|
||||
'args': {'raw': self.window.raw_args.text()},
|
||||
'output_path': self.window.output_path_input.text(),
|
||||
'args': {'raw': self.window.raw_args.text(),
|
||||
'export_format': self.window.file_format_combo.currentText()},
|
||||
'output_path': self.window.render_name_input.text(),
|
||||
'start_frame': self.window.start_frame_input.value(),
|
||||
'end_frame': self.window.end_frame_input.value(),
|
||||
'priority': self.window.priority_input.currentIndex() + 1,
|
||||
'notes': self.window.notes_input.toPlainText(),
|
||||
'enable_split_jobs': self.window.enable_splitjobs.isChecked(),
|
||||
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked()}
|
||||
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked(),
|
||||
'name': self.window.render_name_input.text()}
|
||||
|
||||
# get the dynamic args
|
||||
for i in range(self.window.renderer_options_layout.count()):
|
||||
@@ -497,7 +487,8 @@ class SubmitWorker(QThread):
|
||||
for cam in selected_cameras:
|
||||
job_copy = copy.deepcopy(job_json)
|
||||
job_copy['args']['camera'] = cam
|
||||
job_copy['name'] = pathlib.Path(input_path).stem.replace(' ', '_') + "-" + cam.replace(' ', '')
|
||||
job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '')
|
||||
job_copy['output_path'] = job_copy['name']
|
||||
job_list.append(job_copy)
|
||||
else:
|
||||
job_list = [job_json]
|
||||
@@ -506,12 +497,8 @@ class SubmitWorker(QThread):
|
||||
engine = EngineManager.engine_with_name(self.window.renderer_type.currentText().lower())
|
||||
input_path = engine().perform_presubmission_tasks(input_path)
|
||||
# submit
|
||||
result = None
|
||||
try:
|
||||
result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_list=job_list,
|
||||
callback=create_callback)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.message_signal.emit(result)
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import sys
|
||||
import logging
|
||||
|
||||
from PyQt6.QtGui import QFont
|
||||
@@ -16,7 +15,10 @@ class QSignalHandler(logging.Handler, QObject):
|
||||
|
||||
def emit(self, record):
|
||||
msg = self.format(record)
|
||||
try:
|
||||
self.new_record.emit(msg) # Emit signal
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
|
||||
class ConsoleWindow(QMainWindow):
|
||||
|
||||
@@ -183,8 +183,13 @@ class MainWindow(QMainWindow):
|
||||
|
||||
def __background_update(self):
|
||||
while True:
|
||||
try:
|
||||
self.update_servers()
|
||||
self.fetch_jobs()
|
||||
except RuntimeError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Uncaught exception in background update: {e}")
|
||||
time.sleep(0.5)
|
||||
|
||||
def closeEvent(self, event):
|
||||
@@ -372,13 +377,17 @@ class MainWindow(QMainWindow):
|
||||
|
||||
def load_image_path(self, image_path):
|
||||
# Load and set the image using QPixmap
|
||||
try:
|
||||
pixmap = QPixmap(image_path)
|
||||
if not pixmap:
|
||||
logger.error("Error loading image")
|
||||
return
|
||||
self.image_label.setPixmap(pixmap)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading image path: {e}")
|
||||
|
||||
def load_image_data(self, pillow_image):
|
||||
try:
|
||||
# Convert the Pillow Image to a QByteArray (byte buffer)
|
||||
byte_array = QByteArray()
|
||||
buffer = QBuffer(byte_array)
|
||||
@@ -396,6 +405,8 @@ class MainWindow(QMainWindow):
|
||||
logger.error("Error loading image")
|
||||
return
|
||||
self.image_label.setPixmap(pixmap)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading image data: {e}")
|
||||
|
||||
def update_servers(self):
|
||||
found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames))
|
||||
|
||||
@@ -32,20 +32,23 @@ class StatusBar(QStatusBar):
|
||||
|
||||
# Check for status change every 1s on background thread
|
||||
while True:
|
||||
try:
|
||||
# update status label - get download status
|
||||
new_status = proxy.status()
|
||||
new_image_name = image_names.get(new_status, 'Synchronize.png')
|
||||
image_path = os.path.join(resources_dir(), new_image_name)
|
||||
self.label.setPixmap((QPixmap(image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
||||
|
||||
# add download status
|
||||
if EngineManager.download_tasks:
|
||||
if len(EngineManager.download_tasks) == 1:
|
||||
task = EngineManager.download_tasks[0]
|
||||
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
|
||||
else:
|
||||
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
|
||||
|
||||
self.messageLabel.setText(new_status)
|
||||
|
||||
# update status image
|
||||
new_image_name = image_names.get(new_status, 'Synchronize.png')
|
||||
new_image_path = os.path.join(resources_dir(), new_image_name)
|
||||
self.label.setPixmap((QPixmap(new_image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
||||
except RuntimeError: # ignore runtime errors during shutdown
|
||||
pass
|
||||
time.sleep(1)
|
||||
|
||||
background_thread = threading.Thread(target=background_update,)
|
||||
|
||||
@@ -10,7 +10,7 @@ class Config:
|
||||
max_content_path = 100000000
|
||||
server_log_level = 'debug'
|
||||
log_buffer_length = 250
|
||||
subjob_connection_timeout = 120
|
||||
worker_process_timeout = 120
|
||||
flask_log_level = 'error'
|
||||
flask_debug_enable = False
|
||||
queue_eval_seconds = 1
|
||||
@@ -28,7 +28,7 @@ class Config:
|
||||
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
|
||||
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
|
||||
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
|
||||
cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout)
|
||||
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
|
||||
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
|
||||
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
|
||||
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)
|
||||
|
||||
@@ -2,7 +2,8 @@ import logging
|
||||
import socket
|
||||
|
||||
from pubsub import pub
|
||||
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException
|
||||
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
|
||||
NotRunningException
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -31,12 +32,14 @@ class ZeroconfServer:
|
||||
def start(cls, listen_only=False):
|
||||
if not cls.service_type:
|
||||
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
|
||||
logger.debug("Starting zeroconf service")
|
||||
if not listen_only:
|
||||
cls._register_service()
|
||||
cls._browse_services()
|
||||
|
||||
@classmethod
|
||||
def stop(cls):
|
||||
logger.debug("Stopping zeroconf service")
|
||||
cls._unregister_service()
|
||||
cls.zeroconf.close()
|
||||
|
||||
@@ -73,6 +76,7 @@ class ZeroconfServer:
|
||||
|
||||
@classmethod
|
||||
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
|
||||
try:
|
||||
info = zeroconf.get_service_info(service_type, name)
|
||||
hostname = name.split(f'.{cls.service_type}')[0]
|
||||
logger.debug(f"Zeroconf: {hostname} {state_change}")
|
||||
@@ -82,6 +86,8 @@ class ZeroconfServer:
|
||||
else:
|
||||
cls.client_cache.pop(hostname)
|
||||
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
|
||||
except NotRunningException:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def found_hostnames(cls):
|
||||
@@ -104,9 +110,15 @@ class ZeroconfServer:
|
||||
|
||||
# Example usage:
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080)
|
||||
try:
|
||||
ZeroconfServer.start()
|
||||
input("Server running - Press enter to end")
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
ZeroconfServer.stop()
|
||||
|
||||
Reference in New Issue
Block a user