commit 1c304e77f5bea24dcfdd9e98fb07212a42593d05 Author: Brett Williams Date: Tue Oct 4 23:09:09 2022 -0700 Initial commit diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..e69de29 diff --git a/dashboard.py b/dashboard.py new file mode 100755 index 0000000..46b1a87 --- /dev/null +++ b/dashboard.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python +import datetime +import json +import os.path +import socket +import time +import threading +import traceback + +import psutil +import requests +import click +from rich import box +from rich.console import Console +from rich.live import Live +from rich.table import Column +from rich.table import Table +from rich.text import Text +from rich.layout import Layout +from rich.panel import Panel +from rich.tree import Tree + +import zordon_server +from zordon_server import RenderStatus +from zordon_server import string_to_status + +""" +The RenderDashboard is designed to be run on a remote machine or on the local server +This provides a detailed status of all jobs running on the server +""" + +status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green', + RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', + RenderStatus.RUNNING: 'cyan'} + +categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, + RenderStatus.COMPLETED, RenderStatus.CANCELLED] + +local_hostname = socket.gethostname() + + +def status_string_to_color(status_string): + job_status = string_to_status(status_string) + job_color = '[{}]'.format(status_colors[job_status]) + return job_color + + +def sorted_jobs(all_jobs): + # sorted_jobs = [] + # if all_jobs: + # for status_category in categories: + # found_jobs = [x for x in all_jobs if x['status'] == status_category.value] + # if found_jobs: + # sorted_found_jobs = sorted(found_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True) + # sorted_jobs.extend(sorted_found_jobs) + sorted_jobs = sorted(all_jobs, key=lambda d: datetime.datetime.fromisoformat(d['date_created']), reverse=True) + return sorted_jobs + + +def create_node_tree(all_server_data) -> Tree: + main_tree = Tree("[magenta]Server Cluster") + + for server_host, server_data in all_server_data['servers'].items(): + + if server_host == local_hostname: + node_tree_text = f"[cyan bold]{server_host}[/] [yellow](This Computer)[default] - [green]Running" + else: + node_tree_text = f"[cyan]{server_host} [magenta](Remote)[default] - [green]Running" + + node_tree = Tree(node_tree_text) + + stats_text = f"CPU: {server_data['status']['cpu_percent']}% | RAM: {server_data['status']['memory_percent']}% | " \ + f"Cores: {server_data['status']['cpu_count']} | {server_data['status']['platform'].split('-')[0]}" + + node_tree.add(Tree(stats_text)) + + running_jobs = [job for job in server_data['jobs'] if job['status'] == 'running'] + not_started = [job for job in server_data['jobs'] if job['status'] == 'not_started'] + scheduled = [job for job in server_data['jobs'] if job['status'] == 'scheduled'] + + jobs_tree = Tree(f"Running: [green]{len(running_jobs)} [default]| Queued: [cyan]{len(not_started)}" + f"[default] | Scheduled: [cyan]{len(scheduled)}") + + if running_jobs or not_started or scheduled: + for job in running_jobs: + filename = os.path.basename(job['render']['input']).split('.')[0] + jobs_tree.add(f"[bold]{filename} ({job['id']}) - {status_string_to_color(job['status'])}{(float(job['render']['percent_complete']) * 100):.1f}%") + for job in not_started: + filename = os.path.basename(job['render']['input']).split('.')[0] + jobs_tree.add(f"{filename} ({job['id']}) - {status_string_to_color(job['status'])}{job['status'].title()}") + for job in scheduled: + filename = os.path.basename(job['render']['input']).split('.')[0] + jobs_tree.add(f"{filename} ({job['id']}) - {status_string_to_color(job['status'])}{job['status'].title()}") + else: + jobs_tree.add("[italic]No running jobs") + + node_tree.add(jobs_tree) + main_tree.add(node_tree) + return main_tree + + +def create_jobs_table(all_server_data) -> Table: + table = Table("ID", "Project", "Output", "Renderer", Column(header="Priority", justify="center"), + Column(header="Status", justify="center"), Column(header="Time Elapsed", justify="right"), + Column(header="# Frames", justify="right"), "Node", show_lines=True, + box=box.HEAVY_HEAD) + + all_jobs = [] + for server_name, server_data in all_server_data['servers'].items(): + for job in server_data['jobs']: + job['node'] = server_name + all_jobs.append(job) + + all_jobs = sorted_jobs(all_jobs) + + for job in all_jobs: + + job_status = string_to_status(job['status']) + job_color = '[{}]'.format(status_colors[job_status]) + job_text = f"{job_color}" + job_status.value + + elapsed_time = job['render'].get('time_elapsed', 'unknown') + + # Project name + project_name = job_color + os.path.basename(job['render']['input']) + project_name = project_name.replace(".", "[default].") + + if job_status == RenderStatus.RUNNING: + job_text = f"{job_color}[bold]Running - {float(job['render']['percent_complete']) * 100:.1f}%" + delta = datetime.datetime.now() - datetime.datetime.fromisoformat(job['render']['start_time']) + elapsed_time = "[bold]" + str(delta) + project_name = "[bold]" + project_name + elif job_status == RenderStatus.CANCELLED or job_status == RenderStatus.ERROR: + project_name = "[strike]" + project_name + + # Priority + priority_color = ["red", "yellow", "cyan"][(job['priority'] - 1)] + + node_title = ("[yellow]" if job['node'] == local_hostname else "[magenta]") + job['node'] + renderer_colors = {'ffmpeg': '[magenta]', 'Blender': '[orange1]'} + + table.add_row( + job['id'], + project_name, + os.path.basename(job['render']['output']), + renderer_colors.get(job['renderer'], '[cyan]') + job['renderer'] + '[default]-' + job['render']['renderer_version'], + f"[{priority_color}]{job['priority']}", + job_text, + elapsed_time, + str(max(int(job['render']['total_frames']), 1)), + node_title + ) + + return table + + +def create_status_panel(all_server_data): + for key, value in all_server_data['servers'].items(): + if key == local_hostname: + return str(value['status']) + return "no status" + + +class RenderDashboard: + + def __init__(self, server_ip=None, server_port="8080"): + self.server_ip = server_ip + self.server_port = server_port + self.local_hostname = local_hostname + + def connect(self): + status = self.request_data('status') + return status + + def request_data(self, payload, timeout=2): + try: + req = requests.get(f'http://{self.server_ip}:{self.server_port}/{payload}', timeout=timeout) + if req.ok: + return req.json() + except Exception as e: + pass + # print(f"Exception fetching server data: {e}") + return None + + def get_jobs(self): + all_jobs = self.request_data('jobs') + sorted_jobs = [] + if all_jobs: + for status_category in categories: + found_jobs = [x for x in all_jobs if x['status'] == status_category.value] + if found_jobs: + sorted_jobs.extend(found_jobs) + return sorted_jobs + + def get_data(self): + all_data = self.request_data('full_status') + return all_data + + +class KeyboardThread(threading.Thread): + + def __init__(self, input_cbk = None, name='keyboard-input-thread'): + self.input_cbk = input_cbk + super(KeyboardThread, self).__init__(name=name) + self.start() + + def run(self): + while True: + self.input_cbk(input()) #waits to get input + Return + + +def my_callback(inp): + #evaluate the keyboard input + print('You Entered:', inp) + + +if __name__ == '__main__': + + server_ip = input("Enter server IP or None for local: ") or local_hostname + + client = RenderDashboard(server_ip, "8080") + + if not client.connect(): + if server_ip == local_hostname: + start_server = input("Local server not running. Start server? (y/n) ") + if start_server and start_server[0].lower() == "y": + # Startup the local server + zordon_server.start_server(background_thread=True) + test = client.connect() + print(f"connected? {test}") + else: + print(f"\nUnable to connect to server: {server_ip}") + print("\nVerify IP address is correct and server is running") + exit(1) + + # start the Keyboard thread + kthread = KeyboardThread(my_callback) + + # Console Layout + console = Console() + layout = Layout() + + # Divide the "screen" in to three parts + layout.split( + Layout(name="header", size=3), + Layout(ratio=1, name="main"), + Layout(size=10, name="footer"), + ) + # Divide the "main" layout in to "side" and "body" + layout["main"].split_row( + Layout(name="side"), + Layout(name="body", + ratio=3)) + # Divide the "side" layout in to two + layout["side"].split(Layout(name="side_top"), Layout(name="side_bottom")) + + # Server connection header + header_text = Text(f"Connected to server: ") + header_text.append(f"{server_ip} ", style="green") + if server_ip == local_hostname: + header_text.append("(This Computer)", style="yellow") + else: + header_text.append("(Remote)", style="magenta") + + layout["header"].update(Panel(Text("Zordon Render Client - Version 0.0.1 alpha", justify="center"))) + + with Live(console=console, screen=False, refresh_per_second=1, transient=True) as live: + while True: + + server_data = client.get_data() + try: + if server_data: + layout["body"].update(create_jobs_table(server_data)) + layout["side_top"].update(Panel(create_node_tree(server_data))) + layout["side_bottom"].update(Panel(create_status_panel(server_data))) + live.update(layout, refresh=False) + except Exception as e: + print(f"Exception updating table: {e}") + traceback.print_exception(e) + time.sleep(1) + # # # todo: Add input prompt to manage running jobs (ie add, cancel, get info, etc) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f19acb4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +click~=8.1.3 +requests==2.28.1 +psutil~=5.9.0 +PyYAML~=6.0 +Flask==2.2.2 +rich==12.6.0 +ffmpeg-python diff --git a/utilities/aerender.py b/utilities/aerender.py new file mode 100644 index 0000000..806b404 --- /dev/null +++ b/utilities/aerender.py @@ -0,0 +1,143 @@ +#! /usr/bin/python +from utilities.generic_renderer import * +import glob +import json +import re + + +def aerender_path(): + paths = glob.glob('/Applications/*After Effects*/aerender') + if len(paths) > 1: + logging.warning('Multiple After Effects installations detected') + elif not paths: + logging.error('After Effects installation not found') + else: + return paths[0] + + +class AERenderer(Renderer): + + @staticmethod + def version(): + version = None + try: + x = subprocess.Popen([aerender_path(), '-version'], stdout=subprocess.PIPE) + x.wait() + ver_out = str(x.stdout.read().strip()) + version = ver_out.split(" ")[-1].strip() + except Exception as e: + logging.error('failed getting version: {}'.format(e)) + return version + + renderer = 'After Effects' + render_engine = 'aerender' + supported_extensions = ['.aep'] + + def __init__(self, project, comp, render_settings, omsettings, output): + super(AERenderer, self).__init__(input=project, output=output) + + self.comp = comp + self.render_settings = render_settings + self.omsettings = omsettings + + self.progress = 0 + self.progress_history = [] + self.attributes = {} + + def _generate_subprocess(self): + + if os.path.exists('nexrender-cli-macos'): + logging.info('nexrender found') + # { + # "template": { + # "src": String, + # "composition": String, + # + # "frameStart": Number, + # "frameEnd": Number, + # "frameIncrement": Number, + # + # "continueOnMissing": Boolean, + # "settingsTemplate": String, + # "outputModule": String, + # "outputExt": String, + # }, + # "assets": [], + # "actions": { + # "prerender": [], + # "postrender": [], + # }, + # "onChange": Function, + # "onRenderProgress": Function + # } + job = {'template': + { + 'src': 'file://' + self.input, 'composition': self.comp.replace('"', ''), + 'settingsTemplate': self.render_settings.replace('"', ''), + 'outputModule': self.omsettings.replace('"', ''), 'outputExt': 'mov'} + } + x = ['./nexrender-cli-macos', "'{}'".format(json.dumps(job))] + else: + logging.info('nexrender not found') + x = [aerender_path(), '-project', self.input, '-comp', self.comp, '-RStemplate', self.render_settings, + '-OMtemplate', self.omsettings, '-output', self.output] + return x + + def _parse_stdout(self, line): + + # print line + if line.startswith('PROGRESS:'): + # print 'progress' + trimmed = line.replace('PROGRESS:', '').strip() + if len(trimmed): + self.progress_history.append(line) + if 'Seconds' in trimmed: + self._update_progress(line) + elif ': ' in trimmed: + tmp = trimmed.split(': ') + self.attributes[tmp[0].strip()] = tmp[1].strip() + elif line.startswith('WARNING:'): + trimmed = line.replace('WARNING:', '').strip() + self.warnings.append(trimmed) + logging.warning(trimmed) + elif line.startswith('aerender ERROR') or 'ERROR:' in line: + self.errors.append(line) + logging.error(line) + + def _update_progress(self, line): + + if not self.total_frames: + duration_string = self.attributes.get('Duration', None) + frame_rate = self.attributes.get('Frame Rate', '0').split(' ')[0] + self.total_frames = timecode_to_frames(duration_string.split('Duration:')[-1], float(frame_rate)) + + match = re.match(r'PROGRESS:.*\((?P\d+)\): (?P