mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
481 lines
18 KiB
Python
Executable File
481 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import socket
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from zipfile import ZipFile
|
|
|
|
import json2html
|
|
import requests
|
|
import yaml
|
|
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from lib.scheduled_job import ScheduledJob
|
|
from lib.render_queue import RenderQueue, JobNotFoundError
|
|
from lib.render_workers.render_worker import RenderWorkerFactory, string_to_status, RenderStatus
|
|
from lib.utilities.server_helper import post_job_to_server, generate_thumbnail_for_job
|
|
|
|
logger = logging.getLogger()
|
|
server = Flask(__name__, template_folder='templates', static_folder='static')
|
|
|
|
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
|
|
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
|
|
|
|
|
|
def sorted_jobs(all_jobs, sort_by_date=True):
|
|
if not sort_by_date:
|
|
sorted_job_list = []
|
|
if all_jobs:
|
|
for status_category in categories:
|
|
found_jobs = [x for x in all_jobs if x.render_status() == status_category.value]
|
|
if found_jobs:
|
|
sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True)
|
|
sorted_job_list.extend(sorted_found_jobs)
|
|
else:
|
|
sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True)
|
|
return sorted_job_list
|
|
|
|
|
|
@server.route('/')
|
|
@server.route('/index')
|
|
def index():
|
|
|
|
with open('config/presets.yaml') as f:
|
|
presets = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()),
|
|
hostname=RenderQueue.hostname, renderer_info=renderer_info(),
|
|
render_clients=render_clients(), preset_list=presets)
|
|
|
|
|
|
@server.route('/ui/job/<job_id>/full_details')
|
|
def job_detail(job_id):
|
|
found_job = RenderQueue.job_with_id(job_id)
|
|
table_html = json2html.json2html.convert(json=found_job.json(),
|
|
table_attributes='class="table is-narrow is-striped is-fullwidth"')
|
|
media_url = None
|
|
if found_job.file_list() and found_job.render_status() == RenderStatus.COMPLETED:
|
|
media_basename = os.path.basename(found_job.file_list()[0])
|
|
media_url = f"/api/job/{job_id}/file/{media_basename}"
|
|
return render_template('details.html', detail_table=table_html, media_url=media_url,
|
|
hostname=RenderQueue.hostname, job_status=found_job.render_status().value.title(),
|
|
job=found_job, renderer_info=renderer_info())
|
|
|
|
|
|
@server.route('/ui/job/<job_id>/thumbnail')
|
|
def job_thumbnail(job_id):
|
|
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
|
if found_job:
|
|
|
|
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
|
|
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
|
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
|
|
|
|
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
|
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
|
|
|
|
if os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
|
return send_file(thumb_video_path, mimetype="video/mp4")
|
|
elif os.path.exists(thumb_image_path):
|
|
return send_file(thumb_image_path, mimetype='image/jpeg')
|
|
return send_file('static/images/spinner.gif', mimetype="image/gif")
|
|
|
|
|
|
# Get job file routing
|
|
@server.route('/api/job/<job_id>/file/<filename>', methods=['GET'])
|
|
def get_job_file(job_id, filename):
|
|
found_job = RenderQueue.job_with_id(job_id)
|
|
try:
|
|
for full_path in found_job.file_list():
|
|
if filename in full_path:
|
|
return send_file(path_or_file=full_path)
|
|
except FileNotFoundError:
|
|
abort(404)
|
|
|
|
|
|
@server.get('/api/jobs')
|
|
def jobs_json():
|
|
return [x.json() for x in RenderQueue.all_jobs()]
|
|
|
|
|
|
@server.get('/api/jobs/<status_val>')
|
|
def filtered_jobs_json(status_val):
|
|
state = string_to_status(status_val)
|
|
jobs = [x.json() for x in RenderQueue.jobs_with_status(state)]
|
|
if jobs:
|
|
return jobs
|
|
else:
|
|
return f'Cannot find jobs with status {status_val}', 400
|
|
|
|
|
|
@server.errorhandler(JobNotFoundError)
|
|
def handle_job_not_found(job_error):
|
|
return f'Cannot find job with ID {job_error.job_id}', 400
|
|
|
|
|
|
@server.get('/api/job/<job_id>')
|
|
def get_job_status(job_id):
|
|
return RenderQueue.job_with_id(job_id).json()
|
|
|
|
|
|
@server.get('/api/job/<job_id>/logs')
|
|
def get_job_logs(job_id):
|
|
found_job = RenderQueue.job_with_id(job_id)
|
|
log_path = found_job.log_path()
|
|
log_data = None
|
|
if log_path and os.path.exists(log_path):
|
|
with open(log_path) as file:
|
|
log_data = file.read()
|
|
return Response(log_data, mimetype='text/plain')
|
|
|
|
|
|
@server.get('/api/job/<job_id>/file_list')
|
|
def get_file_list(job_id):
|
|
return RenderQueue.job_with_id(job_id)
|
|
|
|
|
|
@server.route('/api/job/<job_id>/download_all')
|
|
def download_all(job_id):
|
|
zip_filename = None
|
|
|
|
@after_this_request
|
|
def clear_zip(response):
|
|
if zip_filename and os.path.exists(zip_filename):
|
|
os.remove(zip_filename)
|
|
return response
|
|
|
|
found_job = RenderQueue.job_with_id(job_id)
|
|
output_dir = os.path.dirname(found_job.output_path)
|
|
if os.path.exists(output_dir):
|
|
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip')
|
|
with ZipFile(zip_filename, 'w') as zipObj:
|
|
for f in os.listdir(output_dir):
|
|
zipObj.write(filename=os.path.join(output_dir, f),
|
|
arcname=os.path.basename(f))
|
|
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
|
|
else:
|
|
return f'Cannot find project files for job {job_id}', 500
|
|
|
|
|
|
@server.post('/api/register_client')
|
|
def register_client():
|
|
client_hostname = request.values['hostname']
|
|
return RenderQueue.register_client(client_hostname)
|
|
|
|
|
|
@server.post('/api/unregister_client')
|
|
def unregister_client():
|
|
client_hostname = request.values['hostname']
|
|
return RenderQueue.unregister_client(client_hostname)
|
|
|
|
|
|
@server.get('/api/clients')
|
|
def render_clients():
|
|
return [c.hostname for c in RenderQueue.render_clients()]
|
|
|
|
|
|
@server.get('/api/presets')
|
|
def presets():
|
|
with open('config/presets.yaml') as f:
|
|
presets = yaml.load(f, Loader=yaml.FullLoader)
|
|
return presets
|
|
|
|
|
|
@server.get('/api/full_status')
|
|
def full_status():
|
|
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
|
|
|
|
try:
|
|
for client_hostname in render_clients():
|
|
is_online = False
|
|
if client_hostname == RenderQueue.hostname:
|
|
snapshot_results = snapshot()
|
|
is_online = True
|
|
else:
|
|
snapshot_results = {}
|
|
try:
|
|
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
|
|
snapshot_results = snapshot_request.json()
|
|
is_online = snapshot_request.ok
|
|
except requests.ConnectionError as e:
|
|
pass
|
|
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
|
|
'is_online': is_online}
|
|
full_results['servers'][client_hostname] = server_data
|
|
except Exception as e:
|
|
logger.error(f"Exception fetching full status: {e}")
|
|
|
|
return full_results
|
|
|
|
|
|
@server.get('/api/snapshot')
|
|
def snapshot():
|
|
server_status = RenderQueue.status()
|
|
server_jobs = [x.json() for x in RenderQueue.all_jobs()]
|
|
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
|
|
return server_data
|
|
|
|
|
|
@server.post('/api/add_job')
|
|
def add_job_handler():
|
|
try:
|
|
"""Create new job and add to server render queue"""
|
|
if request.is_json:
|
|
jobs_list = [request.json]
|
|
elif request.form.get('json', None):
|
|
jobs_list = json.loads(request.form['json'])
|
|
else:
|
|
# Cleanup flat form data into nested structure
|
|
form_dict = {k: v for k, v in dict(request.form).items() if v}
|
|
args = {}
|
|
arg_keys = [k for k in form_dict.keys() if '-arg_' in k]
|
|
for key in arg_keys:
|
|
if form_dict['renderer'] in key or 'AnyRenderer' in key:
|
|
cleaned_key = key.split('-arg_')[-1]
|
|
args[cleaned_key] = form_dict[key]
|
|
form_dict.pop(key)
|
|
args['raw'] = form_dict.get('raw_args', None)
|
|
form_dict['args'] = args
|
|
jobs_list = [form_dict]
|
|
|
|
# handle uploaded files
|
|
uploaded_file = request.files.get('file', None)
|
|
uploaded_file_local_path = None
|
|
job_dir = None
|
|
if uploaded_file and uploaded_file.filename:
|
|
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
|
|
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], (uploaded_file.filename + "_" +
|
|
datetime.now().strftime("%Y.%m.%d_%H.%M.%S")))
|
|
os.makedirs(job_dir, exist_ok=True)
|
|
uploaded_file_local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
|
|
uploaded_file.save(uploaded_file_local_path)
|
|
|
|
# convert job input paths for uploaded files and add jobs
|
|
results = []
|
|
for job in jobs_list:
|
|
if uploaded_file_local_path:
|
|
job['input_path'] = uploaded_file_local_path
|
|
output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
job['output_path'] = os.path.join(output_dir, os.path.basename(job['output_path']))
|
|
remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only
|
|
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
|
|
results.append(add_result)
|
|
|
|
# return any errors from results list
|
|
for response in results:
|
|
if response.get('error', None):
|
|
if len(results) == 1:
|
|
return results, response.get('code', 500)
|
|
else:
|
|
return results, 400
|
|
|
|
if request.args.get('redirect', False):
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Unknown error adding job: {e}")
|
|
return 'unknown error', 500
|
|
|
|
|
|
def add_job(job_params, remove_job_dir_on_failure=False):
|
|
def remove_job_dir():
|
|
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
|
|
logger.debug(f"Removing job dir: {job_dir}")
|
|
shutil.rmtree(job_dir)
|
|
|
|
name = job_params.get("name", None)
|
|
job_owner = job_params.get("owner", None)
|
|
renderer = job_params.get("renderer", None)
|
|
input_path = job_params.get("input_path", None)
|
|
output_path = job_params.get("output_path", None)
|
|
priority = int(job_params.get('priority', 2))
|
|
args = job_params.get('args', {})
|
|
client = job_params.get('client', None) or RenderQueue.hostname
|
|
force_start = job_params.get('force_start', False)
|
|
custom_id = None
|
|
job_dir = None
|
|
|
|
# check for minimum render requirements
|
|
if None in [renderer, input_path, output_path]:
|
|
err_msg = 'Cannot add job: Missing required parameters'
|
|
logger.error(err_msg)
|
|
return {'error': err_msg, 'code': 400}
|
|
|
|
# local renders
|
|
if client == RenderQueue.hostname:
|
|
logger.info(f"Creating job locally - {name if name else input_path}")
|
|
try:
|
|
render_job = ScheduledJob(renderer, input_path, output_path, args, priority, job_owner, client,
|
|
notify=False, custom_id=custom_id, name=name)
|
|
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
|
|
return render_job.json()
|
|
except Exception as e:
|
|
err_msg = f"Error creating job: {str(e)}"
|
|
logger.exception(err_msg)
|
|
remove_job_dir()
|
|
return {'error': err_msg, 'code': 400}
|
|
|
|
# client renders
|
|
elif client in RenderQueue.render_clients():
|
|
if client.is_available():
|
|
# call uploader on remote client
|
|
try:
|
|
logger.info(f"Uploading file {input_path} to client {client}")
|
|
job_data = request.json
|
|
response = post_job_to_server(input_path, job_data, client.hostname)
|
|
if response.ok:
|
|
logger.info("Job submitted successfully!")
|
|
return response.json() if response.json() else "Job ok"
|
|
else:
|
|
remove_job_dir()
|
|
return {'error': "Job rejected by client", 'code': 400}
|
|
except requests.ConnectionError as e:
|
|
err_msg = f"Error submitting job to client: {client}"
|
|
logger.error(err_msg)
|
|
remove_job_dir()
|
|
return {'error': err_msg, 'code': 500}
|
|
else:
|
|
# client is not available
|
|
err_msg = f"Render client '{client}' is unreachable"
|
|
logger.error(err_msg)
|
|
remove_job_dir()
|
|
return {'error': err_msg, 'code': 503}
|
|
|
|
else:
|
|
err_msg = f"Unknown render client: '{client}'"
|
|
logger.error(err_msg)
|
|
remove_job_dir()
|
|
return {'error': err_msg, 'code': 400}
|
|
|
|
|
|
@server.get('/api/job/<job_id>/cancel')
|
|
def cancel_job(job_id):
|
|
if not request.args.get('confirm', False):
|
|
return 'Confirmation required to cancel job', 400
|
|
|
|
if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)):
|
|
if request.args.get('redirect', False):
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return "Job cancelled"
|
|
else:
|
|
return "Unknown error", 500
|
|
|
|
|
|
@server.route('/api/job/<job_id>/delete', methods=['POST', 'GET'])
|
|
def delete_job(job_id):
|
|
try:
|
|
if not request.args.get('confirm', False):
|
|
return 'Confirmation required to delete job', 400
|
|
|
|
# Check if we can remove the 'output' directory
|
|
found_job = RenderQueue.job_with_id(job_id)
|
|
output_dir = os.path.dirname(os.path.dirname(found_job.output_path))
|
|
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
|
|
shutil.rmtree(output_dir)
|
|
|
|
# Remove any thumbnails
|
|
for filename in os.listdir(server.config['THUMBS_FOLDER']):
|
|
if job_id in filename:
|
|
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
|
|
|
|
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
|
if os.path.exists(thumb_path):
|
|
os.remove(thumb_path)
|
|
|
|
# See if we own the input file (i.e. was it uploaded)
|
|
input_dir = os.path.dirname(found_job.input_path)
|
|
if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir):
|
|
shutil.rmtree(input_dir)
|
|
|
|
RenderQueue.delete_job(found_job)
|
|
if request.args.get('redirect', False):
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return "Job deleted", 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting job: {e}")
|
|
return f"Error deleting job: {e}", 500
|
|
|
|
|
|
@server.get('/api/clear_history')
|
|
def clear_history():
|
|
RenderQueue.clear_history()
|
|
return 'success'
|
|
|
|
|
|
@server.route('/api/status')
|
|
def status():
|
|
return RenderQueue.status()
|
|
|
|
|
|
@server.get('/api/renderer_info')
|
|
def renderer_info():
|
|
renderer_data = {}
|
|
for r in RenderWorkerFactory.supported_renderers():
|
|
engine = RenderWorkerFactory.class_for_name(r).engine
|
|
engine_available = engine.renderer_path() is not None
|
|
renderer_data[r] = {'available': engine_available,
|
|
'version': engine.version() if engine_available else None,
|
|
'supported_extensions': engine.supported_extensions,
|
|
'supported_export_formats': engine.get_output_formats() if engine_available else None,
|
|
'path': engine.renderer_path()}
|
|
return renderer_data
|
|
|
|
|
|
@server.route('/upload')
|
|
def upload_file_page():
|
|
return render_template('upload.html', render_clients=render_clients(),
|
|
supported_renderers=RenderWorkerFactory.supported_renderers())
|
|
|
|
|
|
def start_server(background_thread=False):
|
|
def eval_loop(delay_sec=1):
|
|
while True:
|
|
RenderQueue.evaluate_queue()
|
|
time.sleep(delay_sec)
|
|
|
|
with open('config/config.yaml') as f:
|
|
config = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
|
level=config.get('server_log_level', 'INFO').upper())
|
|
|
|
server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder'])
|
|
server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs')
|
|
server.config['MAX_CONTENT_PATH'] = config['max_content_path']
|
|
|
|
# Get hostname and render clients
|
|
RenderQueue.hostname = socket.gethostname()
|
|
server.config['HOSTNAME'] = RenderQueue.hostname
|
|
|
|
# disable most Flask logging
|
|
flask_log = logging.getLogger('werkzeug')
|
|
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
|
|
|
|
# Set up the RenderQueue object
|
|
RenderQueue.load_state()
|
|
|
|
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
|
|
thread.start()
|
|
|
|
logging.info(f"Starting Zordon Render Server - Hostname: '{RenderQueue.hostname}'")
|
|
|
|
if background_thread:
|
|
server_thread = threading.Thread(
|
|
target=lambda: server.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False))
|
|
server_thread.start()
|
|
server_thread.join()
|
|
else:
|
|
server.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False),
|
|
use_reloader=False, threaded=True) |