#!/usr/bin/env python3 import json import logging import os import pathlib import shutil import socket import threading import time from datetime import datetime from zipfile import ZipFile import json2html import requests import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from werkzeug.utils import secure_filename from lib.scheduled_job import ScheduledJob from lib.render_queue import RenderQueue, JobNotFoundError from lib.render_workers.worker_factory import RenderWorkerFactory from lib.render_workers.base_worker import string_to_status, RenderStatus from lib.utilities.server_helper import post_job_to_server, generate_thumbnail_for_job logger = logging.getLogger() server = Flask(__name__, template_folder='templates', static_folder='static') categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED] def sorted_jobs(all_jobs, sort_by_date=True): if not sort_by_date: sorted_job_list = [] if all_jobs: for status_category in categories: found_jobs = [x for x in all_jobs if x.render_status() == status_category.value] if found_jobs: sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) sorted_job_list.extend(sorted_found_jobs) else: sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True) return sorted_job_list @server.route('/') @server.route('/index') def index(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()), hostname=RenderQueue.hostname, renderer_info=renderer_info(), render_clients=render_clients(), preset_list=presets) @server.route('/ui/job//full_details') def job_detail(job_id): found_job = RenderQueue.job_with_id(job_id) table_html = json2html.json2html.convert(json=found_job.json(), table_attributes='class="table is-narrow is-striped is-fullwidth"') media_url = None if found_job.file_list() and found_job.render_status() == RenderStatus.COMPLETED: media_basename = os.path.basename(found_job.file_list()[0]) media_url = f"/api/job/{job_id}/file/{media_basename}" return render_template('details.html', detail_table=table_html, media_url=media_url, hostname=RenderQueue.hostname, job_status=found_job.render_status().value.title(), job=found_job, renderer_info=renderer_info()) @server.route('/ui/job//thumbnail') def job_thumbnail(job_id): found_job = RenderQueue.job_with_id(job_id, none_ok=True) if found_job: os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True) thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240) if os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): return send_file(thumb_video_path, mimetype="video/mp4") elif os.path.exists(thumb_image_path): return send_file(thumb_image_path, mimetype='image/jpeg') return send_file('static/images/spinner.gif', mimetype="image/gif") # Get job file routing @server.route('/api/job//file/', methods=['GET']) def get_job_file(job_id, filename): found_job = RenderQueue.job_with_id(job_id) try: for full_path in found_job.file_list(): if filename in full_path: return send_file(path_or_file=full_path) except FileNotFoundError: abort(404) @server.get('/api/jobs') def jobs_json(): return [x.json() for x in RenderQueue.all_jobs()] @server.get('/api/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [x.json() for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return f'Cannot find job with ID {job_error.job_id}', 400 @server.get('/api/job/') def get_job_status(job_id): return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): found_job = RenderQueue.job_with_id(job_id) log_path = found_job.log_path() log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: log_data = file.read() return Response(log_data, mimetype='text/plain') @server.get('/api/job//file_list') def get_file_list(job_id): return RenderQueue.job_with_id(job_id) @server.route('/api/job//download_all') def download_all(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): os.remove(zip_filename) return response found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip') with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=os.path.join(output_dir, f), arcname=os.path.basename(f)) return send_file(zip_filename, mimetype="zip", as_attachment=True, ) else: return f'Cannot find project files for job {job_id}', 500 @server.post('/api/register_client') def register_client(): client_hostname = request.values['hostname'] return RenderQueue.register_client(client_hostname) @server.post('/api/unregister_client') def unregister_client(): client_hostname = request.values['hostname'] return RenderQueue.unregister_client(client_hostname) @server.get('/api/clients') def render_clients(): return [c.hostname for c in RenderQueue.render_clients()] @server.get('/api/presets') def presets(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) return presets @server.get('/api/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: for client_hostname in render_clients(): is_online = False if client_hostname == RenderQueue.hostname: snapshot_results = snapshot() is_online = True else: snapshot_results = {} try: snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) snapshot_results = snapshot_request.json() is_online = snapshot_request.ok except requests.ConnectionError as e: pass server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': is_online} full_results['servers'][client_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @server.get('/api/snapshot') def snapshot(): server_status = RenderQueue.status() server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @server.post('/api/add_job') def add_job_handler(): try: """Create new job and add to server render queue""" if request.is_json: jobs_list = [request.json] elif request.form.get('json', None): jobs_list = json.loads(request.form['json']) else: # Cleanup flat form data into nested structure form_dict = {k: v for k, v in dict(request.form).items() if v} args = {} arg_keys = [k for k in form_dict.keys() if '-arg_' in k] for key in arg_keys: if form_dict['renderer'] in key or 'AnyRenderer' in key: cleaned_key = key.split('-arg_')[-1] args[cleaned_key] = form_dict[key] form_dict.pop(key) args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] # handle uploaded files uploaded_file = request.files.get('file', None) uploaded_file_local_path = None job_dir = None if uploaded_file and uploaded_file.filename: logger.info(f"Receiving uploaded file {uploaded_file.filename}") job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join( [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), jobs_list[0]['renderer'], uploaded_file.filename])) os.makedirs(job_dir, exist_ok=True) upload_dir = os.path.join(job_dir, 'source') os.makedirs(upload_dir, exist_ok=True) uploaded_file_local_path = os.path.join(upload_dir, secure_filename(uploaded_file.filename)) uploaded_file.save(uploaded_file_local_path) # convert job input paths for uploaded files and add jobs results = [] for job in jobs_list: if uploaded_file_local_path: job['input_path'] = uploaded_file_local_path output_dir = os.path.join(job_dir, job.get('name', None) or 'output') os.makedirs(output_dir, exist_ok=True) job['output_path'] = os.path.join(output_dir, os.path.basename(job.get('name', job['output_path']))) remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir) results.append(add_result) # return any errors from results list for response in results: if response.get('error', None): if len(results) == 1: return results, response.get('code', 500) else: return results, 400 if request.args.get('redirect', False): return redirect(url_for('index')) else: return results except Exception as e: logger.exception(f"Unknown error adding job: {e}") return 'unknown error', 500 def add_job(job_params, remove_job_dir_on_failure=False): def remove_job_dir(): if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir): logger.debug(f"Removing job dir: {job_dir}") shutil.rmtree(job_dir) name = job_params.get("name", None) job_owner = job_params.get("owner", None) renderer = job_params.get("renderer", None) input_path = job_params.get("input_path", None) output_path = job_params.get("output_path", None) priority = int(job_params.get('priority', 2)) args = job_params.get('args', {}) client = job_params.get('client', None) or RenderQueue.hostname force_start = job_params.get('force_start', False) custom_id = None job_dir = None # check for minimum render requirements if None in [renderer, input_path, output_path]: err_msg = 'Cannot add job: Missing required parameters' logger.error(err_msg) return {'error': err_msg, 'code': 400} # local renders if client == RenderQueue.hostname: logger.info(f"Creating job locally - {name if name else input_path}") try: render_job = ScheduledJob(renderer, input_path, output_path, args, priority, job_owner, client, notify=False, custom_id=custom_id, name=name) RenderQueue.add_to_render_queue(render_job, force_start=force_start) return render_job.json() except Exception as e: err_msg = f"Error creating job: {str(e)}" logger.exception(err_msg) remove_job_dir() return {'error': err_msg, 'code': 400} # client renders elif client in RenderQueue.render_clients(): if client.is_available(): # call uploader on remote client try: logger.info(f"Uploading file {input_path} to client {client}") job_data = request.json response = post_job_to_server(input_path, job_data, client.hostname) if response.ok: logger.info("Job submitted successfully!") return response.json() if response.json() else "Job ok" else: remove_job_dir() return {'error': "Job rejected by client", 'code': 400} except requests.ConnectionError as e: err_msg = f"Error submitting job to client: {client}" logger.error(err_msg) remove_job_dir() return {'error': err_msg, 'code': 500} else: # client is not available err_msg = f"Render client '{client}' is unreachable" logger.error(err_msg) remove_job_dir() return {'error': err_msg, 'code': 503} else: err_msg = f"Unknown render client: '{client}'" logger.error(err_msg) remove_job_dir() return {'error': err_msg, 'code': 400} @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): return 'Confirmation required to cancel job', 400 if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)): if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job cancelled" else: return "Unknown error", 500 @server.route('/api/job//delete', methods=['POST', 'GET']) def delete_job(job_id): try: if not request.args.get('confirm', False): return 'Confirmation required to delete job', 400 # Check if we can remove the 'output' directory found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(os.path.dirname(found_job.output_path)) if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) # Remove any thumbnails for filename in os.listdir(server.config['THUMBS_FOLDER']): if job_id in filename: os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename)) thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') if os.path.exists(thumb_path): os.remove(thumb_path) # See if we own the input file (i.e. was it uploaded) input_dir = os.path.dirname(found_job.input_path) if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir): shutil.rmtree(input_dir) RenderQueue.delete_job(found_job) if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job deleted", 200 except Exception as e: logger.error(f"Error deleting job: {e}") return f"Error deleting job: {e}", 500 @server.get('/api/clear_history') def clear_history(): RenderQueue.clear_history() return 'success' @server.route('/api/status') def status(): return RenderQueue.status() @server.get('/api/renderer_info') def renderer_info(): renderer_data = {} for r in RenderWorkerFactory.supported_renderers(): engine = RenderWorkerFactory.class_for_name(r).engine engine_available = engine.renderer_path() is not None renderer_data[r] = {'available': engine_available, 'version': engine.version() if engine_available else None, 'supported_extensions': engine.supported_extensions, 'supported_export_formats': engine.get_output_formats() if engine_available else None, 'path': engine.renderer_path()} return renderer_data @server.route('/upload') def upload_file_page(): return render_template('upload.html', render_clients=render_clients(), supported_renderers=RenderWorkerFactory.supported_renderers()) def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: RenderQueue.evaluate_queue() time.sleep(delay_sec) with open('config/config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=config.get('server_log_level', 'INFO').upper()) server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder']) server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs') server.config['MAX_CONTENT_PATH'] = config['max_content_path'] # Get hostname and render clients RenderQueue.hostname = socket.gethostname() server.config['HOSTNAME'] = RenderQueue.hostname # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Set up the RenderQueue object RenderQueue.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() logging.info(f"Starting Zordon Render Server - Hostname: '{RenderQueue.hostname}'") if background_thread: server_thread = threading.Thread( target=lambda: server.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: server.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False), use_reloader=False, threaded=True)