#!/usr/bin/env python3 import json import logging import multiprocessing import os import pathlib import shutil import socket import ssl import tempfile import threading import time from datetime import datetime from zipfile import ZipFile import json2html import psutil import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs from src.api.serverproxy_manager import ServerProxyManager from src.distributed_job_manager import DistributedJobManager from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ current_system_os_version, config_dir from src.utilities.server_helper import generate_thumbnail_for_job from src.utilities.zeroconf_server import ZeroconfServer logger = logging.getLogger() server = Flask(__name__, template_folder='web/templates', static_folder='web/static') ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED] def sorted_jobs(all_jobs, sort_by_date=True): if not sort_by_date: sorted_job_list = [] if all_jobs: for status_category in categories: found_jobs = [x for x in all_jobs if x.status == status_category.value] if found_jobs: sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) sorted_job_list.extend(sorted_found_jobs) else: sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True) return sorted_job_list @server.route('/') @server.route('/index') def index(): with open(system_safe_path(os.path.join(config_dir(), 'presets.yaml'))) as f: render_presets = yaml.load(f, Loader=yaml.FullLoader) return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()), hostname=server.config['HOSTNAME'], renderer_info=renderer_info(), render_clients=[server.config['HOSTNAME']], preset_list=render_presets) @server.get('/api/jobs') def jobs_json(): try: hash_token = request.args.get('token', None) all_jobs = [x.json() for x in RenderQueue.all_jobs()] job_cache_token = str(json.dumps(all_jobs).__hash__()) if hash_token and hash_token == job_cache_token: return [], 204 # no need to update else: return {'jobs': all_jobs, 'token': job_cache_token} except Exception as e: logger.exception(f"Exception fetching all_jobs_cached: {e}") return [], 500 @server.route('/ui/job//full_details') def job_detail(job_id): found_job = RenderQueue.job_with_id(job_id) table_html = json2html.json2html.convert(json=found_job.json(), table_attributes='class="table is-narrow is-striped is-fullwidth"') media_url = None if found_job.file_list() and found_job.status == RenderStatus.COMPLETED: media_basename = os.path.basename(found_job.file_list()[0]) media_url = f"/api/job/{job_id}/file/{media_basename}" return render_template('details.html', detail_table=table_html, media_url=media_url, hostname=server.config['HOSTNAME'], job_status=found_job.status.value.title(), job=found_job, renderer_info=renderer_info()) @server.route('/api/job//thumbnail') def job_thumbnail(job_id): big_thumb = request.args.get('size', False) == "big" video_ok = request.args.get('video_ok', False) found_job = RenderQueue.job_with_id(job_id, none_ok=True) if found_job: os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True) thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4') big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg') # generate regular thumb if it doesn't exist if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \ found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240) # generate big thumb if it doesn't exist if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \ found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800) # generated videos if video_ok: if big_thumb and os.path.exists(big_video_path) and not os.path.exists( big_video_path + '_IN-PROGRESS'): return send_file(big_video_path, mimetype="video/mp4") elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): return send_file(thumb_video_path, mimetype="video/mp4") # Generated thumbs if big_thumb and os.path.exists(big_image_path): return send_file(big_image_path, mimetype='image/jpeg') elif os.path.exists(thumb_image_path): return send_file(thumb_image_path, mimetype='image/jpeg') # Misc status icons if found_job.status == RenderStatus.RUNNING: return send_file('../web/static/images/gears.png', mimetype="image/png") elif found_job.status == RenderStatus.CANCELLED: return send_file('../web/static/images/cancelled.png', mimetype="image/png") elif found_job.status == RenderStatus.SCHEDULED: return send_file('../web/static/images/scheduled.png', mimetype="image/png") elif found_job.status == RenderStatus.NOT_STARTED: return send_file('../web/static/images/not_started.png', mimetype="image/png") # errors return send_file('../web/static/images/error.png', mimetype="image/png") # Get job file routing @server.route('/api/job//file/', methods=['GET']) def get_job_file(job_id, filename): found_job = RenderQueue.job_with_id(job_id) try: for full_path in found_job.file_list(): if filename in full_path: return send_file(path_or_file=full_path) except FileNotFoundError: abort(404) @server.get('/api/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [x.json() for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 @server.post('/api/job//notify_parent_of_status_change') def subjob_status_change(job_id): try: subjob_details = request.json logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) return Response(status=200) except JobNotFoundError: return "Job not found", 404 @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return f'Cannot find job with ID {job_error.job_id}', 400 @server.get('/api/job/') def get_job_status(job_id): return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): found_job = RenderQueue.job_with_id(job_id) log_path = system_safe_path(found_job.log_path()) log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: log_data = file.read() return Response(log_data, mimetype='text/plain') @server.get('/api/job//file_list') def get_file_list(job_id): return RenderQueue.job_with_id(job_id).file_list() @server.get('/api/job//make_ready') def make_job_ready(job_id): try: found_job = RenderQueue.job_with_id(job_id) if found_job.status in [RenderStatus.CONFIGURING, RenderStatus.NOT_STARTED]: if found_job.children: for child_key in found_job.children.keys(): child_id = child_key.split('@')[0] hostname = child_key.split('@')[-1] ServerProxyManager.get_proxy_for_hostname(hostname).request_data(f'job/{child_id}/make_ready') found_job.status = RenderStatus.NOT_STARTED RenderQueue.save_state() return found_job.json(), 200 except Exception as e: return "Error making job ready: {e}", 500 return "Not valid command", 405 @server.route('/api/job//download_all') def download_all(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): os.remove(zip_filename) return response found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(), pathlib.Path(found_job.input_path).stem + '.zip')) with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=system_safe_path(os.path.join(output_dir, f)), arcname=os.path.basename(f)) return send_file(zip_filename, mimetype="zip", as_attachment=True, ) else: return f'Cannot find project files for job {job_id}', 500 @server.get('/api/presets') def presets(): presets_path = system_safe_path('config/presets.yaml') with open(presets_path) as f: presets = yaml.load(f, Loader=yaml.FullLoader) return presets @server.get('/api/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: snapshot_results = snapshot() server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': True} full_results['servers'][server.config['HOSTNAME']] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @server.get('/api/snapshot') def snapshot(): server_status = status() server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @server.get('/api/_detected_clients') def detected_clients(): # todo: dev/debug only. Should not ship this - probably. return ZeroconfServer.found_hostnames() # New version @server.post('/api/add_job') def add_job_handler(): # Process request data try: if request.is_json: jobs_list = [request.json] if not isinstance(request.json, list) else request.json elif request.form.get('json', None): jobs_list = json.loads(request.form['json']) else: # Cleanup flat form data into nested structure form_dict = {k: v for k, v in dict(request.form).items() if v} args = {} arg_keys = [k for k in form_dict.keys() if '-arg_' in k] for server_hostname in arg_keys: if form_dict['renderer'] in server_hostname or 'AnyRenderer' in server_hostname: cleaned_key = server_hostname.split('-arg_')[-1] args[cleaned_key] = form_dict[server_hostname] form_dict.pop(server_hostname) args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) return err_msg, 500 try: loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list, server.config['UPLOAD_FOLDER']) if loaded_project_local_path.lower().endswith('.zip'): loaded_project_local_path = process_zipped_project(loaded_project_local_path) results = create_render_jobs(jobs_list, loaded_project_local_path, referred_name) for response in results: if response.get('error', None): return results, 400 if request.args.get('redirect', False): return redirect(url_for('index')) else: return results, 200 except Exception as e: logger.exception(f"Unknown error adding job: {e}") return 'unknown error', 500 @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): return 'Confirmation required to cancel job', 400 if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)): if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job cancelled" else: return "Unknown error", 500 @server.route('/api/job//delete', methods=['POST', 'GET']) def delete_job(job_id): try: if not request.args.get('confirm', False): return 'Confirmation required to delete job', 400 # Check if we can remove the 'output' directory found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) # Remove any thumbnails for filename in os.listdir(server.config['THUMBS_FOLDER']): if job_id in filename: os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename)) thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') if os.path.exists(thumb_path): os.remove(thumb_path) # See if we own the project_dir (i.e. was it uploaded) project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) if server.config['UPLOAD_FOLDER'] in project_dir and os.path.exists(project_dir): # check to see if any other projects are sharing the same project file project_dir_files = [f for f in os.listdir(project_dir) if not f.startswith('.')] if len(project_dir_files) == 0 or (len(project_dir_files) == 1 and 'source' in project_dir_files[0]): logger.info(f"Removing project directory: {project_dir}") shutil.rmtree(project_dir) RenderQueue.delete_job(found_job) if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job deleted", 200 except Exception as e: logger.error(f"Error deleting job: {e}") return f"Error deleting job: {e}", 500 @server.get('/api/clear_history') def clear_history(): RenderQueue.clear_history() return 'success' @server.route('/api/status') def status(): renderer_data = {} for render_class in EngineManager.supported_engines(): if EngineManager.all_versions_for_engine(render_class.name): # only return renderers installed on host renderer_data[render_class.engine.name()] = \ {'versions': EngineManager.all_versions_for_engine(render_class.engine.name()), 'is_available': RenderQueue.is_available_for_job(render_class.engine.name()) } # Get system info return {"timestamp": datetime.now().isoformat(), "system_os": current_system_os(), "system_os_version": current_system_os_version(), "system_cpu": current_system_cpu(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(logical=False), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), "renderers": renderer_data, "hostname": server.config['HOSTNAME'], "port": server.config['PORT'] } @server.get('/api/renderer_info') def renderer_info(): return_simple = request.args.get('simple', False) renderer_data = {} for engine in EngineManager.supported_engines(): # Get all installed versions of engine installed_versions = EngineManager.all_versions_for_engine(engine.name()) if installed_versions: install_path = installed_versions[0]['path'] renderer_data[engine.name()] = {'is_available': RenderQueue.is_available_for_job(engine.name()), 'versions': installed_versions} if not return_simple: renderer_data[engine.name()]['supported_extensions'] = engine.supported_extensions renderer_data[engine.name()]['supported_export_formats'] = engine(install_path).get_output_formats() return renderer_data @server.get('/api//is_available') def is_engine_available(engine_name): return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name), 'cpu_count': int(psutil.cpu_count(logical=False)), 'versions': EngineManager.all_versions_for_engine(engine_name), 'hostname': server.config['HOSTNAME']} @server.get('/api/is_engine_available_to_download') def is_engine_available_to_download(): available_result = EngineManager.version_is_available_to_download(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return available_result if available_result else \ (f"Cannot find available download for {request.args.get('engine')} {request.args.get('version')}", 500) @server.get('/api/find_most_recent_version') def find_most_recent_version(): most_recent = EngineManager.find_most_recent_version(request.args.get('engine'), request.args.get('system_os'), request.args.get('cpu')) return most_recent if most_recent else \ (f"Error finding most recent version of {request.args.get('engine')}", 500) @server.post('/api/download_engine') def download_engine(): download_result = EngineManager.download_engine(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return download_result if download_result else \ (f"Error downloading {request.args.get('engine')} {request.args.get('version')}", 500) @server.post('/api/delete_engine') def delete_engine_download(): json_data = request.json delete_result = EngineManager.delete_engine_download(json_data.get('engine'), json_data.get('version'), json_data.get('system_os'), json_data.get('cpu')) return "Success" if delete_result else \ (f"Error deleting {json_data.get('engine')} {json_data.get('version')}", 500) @server.get('/api/renderer//args') def get_renderer_args(renderer): try: renderer_engine_class = EngineManager.engine_with_name(renderer) return renderer_engine_class().get_arguments() except LookupError: return f"Cannot find renderer '{renderer}'", 400 @server.get('/api/renderer//help') def get_renderer_help(renderer): try: renderer_engine_class = EngineManager.engine_with_name(renderer) return renderer_engine_class().get_help() except LookupError: return f"Cannot find renderer '{renderer}'", 400 @server.route('/upload') def upload_file_page(): return render_template('upload.html', supported_renderers=EngineManager.supported_engines()) def start_server(): def eval_loop(delay_sec=1): while True: RenderQueue.evaluate_queue() time.sleep(delay_sec) # get hostname local_hostname = socket.gethostname() local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") # load flask settings server.config['HOSTNAME'] = local_hostname server.config['PORT'] = int(Config.port_number) server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs')) server.config['MAX_CONTENT_PATH'] = Config.max_content_path server.config['enable_split_jobs'] = Config.enable_split_jobs # Setup directory for saving engines to EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder), 'engines'))) os.makedirs(EngineManager.engines_path, exist_ok=True) # Debug info logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}") logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}") logger.debug(f"Engines directory: {EngineManager.engines_path}") # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(Config.flask_log_level.upper()) # check for updates for render engines if config'd or on first launch if Config.update_engines_on_launch or not EngineManager.all_engines(): EngineManager.update_all_engines() # Set up the RenderQueue object RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER']) DistributedJobManager.start() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True) thread.start() logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(), 'system_os': current_system_os(), 'system_os_version': current_system_os_version()} ZeroconfServer.start() try: server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, threaded=True) finally: RenderQueue.save_state() ZeroconfServer.stop()