mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Major file reorganization
This commit is contained in:
485
lib/server/job_server.py
Executable file
485
lib/server/job_server.py
Executable file
@@ -0,0 +1,485 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from zipfile import ZipFile
|
||||
|
||||
import json2html
|
||||
import requests
|
||||
import yaml
|
||||
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from lib.render_job import RenderJob
|
||||
from lib.render_queue import RenderQueue, JobNotFoundError
|
||||
from lib.render_workers.render_worker import RenderWorkerFactory, string_to_status, RenderStatus
|
||||
from lib.utilities.server_helper import post_job_to_server, generate_thumbnail_for_job
|
||||
|
||||
logger = logging.getLogger()
|
||||
server = Flask(__name__, template_folder='templates', static_folder='static')
|
||||
|
||||
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
|
||||
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
|
||||
|
||||
|
||||
def sorted_jobs(all_jobs, sort_by_date=True):
|
||||
if not sort_by_date:
|
||||
sorted_job_list = []
|
||||
if all_jobs:
|
||||
for status_category in categories:
|
||||
found_jobs = [x for x in all_jobs if x.render_status() == status_category.value]
|
||||
if found_jobs:
|
||||
sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True)
|
||||
sorted_job_list.extend(sorted_found_jobs)
|
||||
else:
|
||||
sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True)
|
||||
return sorted_job_list
|
||||
|
||||
|
||||
@server.route('/')
|
||||
@server.route('/index')
|
||||
def index():
|
||||
|
||||
with open('config/presets.yaml') as f:
|
||||
presets = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.job_queue),
|
||||
hostname=RenderQueue.host_name, renderer_info=renderer_info(),
|
||||
render_clients=RenderQueue.render_clients, preset_list=presets)
|
||||
|
||||
|
||||
@server.route('/ui/job/<job_id>/full_details')
|
||||
def job_detail(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
table_html = json2html.json2html.convert(json=found_job.json(),
|
||||
table_attributes='class="table is-narrow is-striped is-fullwidth"')
|
||||
media_url = None
|
||||
if found_job.file_list() and found_job.render_status() == RenderStatus.COMPLETED:
|
||||
media_basename = os.path.basename(found_job.file_list()[0])
|
||||
media_url = f"/api/job/{job_id}/file/{media_basename}"
|
||||
return render_template('details.html', detail_table=table_html, media_url=media_url,
|
||||
hostname=RenderQueue.host_name, job_status=found_job.render_status().value.title(),
|
||||
job=found_job, renderer_info=renderer_info())
|
||||
|
||||
|
||||
@server.route('/ui/job/<job_id>/thumbnail')
|
||||
def job_thumbnail(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||
if found_job:
|
||||
|
||||
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
|
||||
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
||||
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
|
||||
|
||||
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
||||
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
|
||||
|
||||
if os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
||||
return send_file(thumb_video_path, mimetype="video/mp4")
|
||||
elif os.path.exists(thumb_image_path):
|
||||
return send_file(thumb_image_path, mimetype='image/jpeg')
|
||||
return send_file('static/images/spinner.gif', mimetype="image/gif")
|
||||
|
||||
|
||||
# Get job file routing
|
||||
@server.route('/api/job/<job_id>/file/<filename>', methods=['GET'])
|
||||
def get_job_file(job_id, filename):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
try:
|
||||
for full_path in found_job.file_list():
|
||||
if filename in full_path:
|
||||
return send_file(path_or_file=full_path)
|
||||
except FileNotFoundError:
|
||||
abort(404)
|
||||
|
||||
|
||||
@server.get('/api/jobs')
|
||||
def jobs_json():
|
||||
return [x.json() for x in RenderQueue.job_queue]
|
||||
|
||||
|
||||
@server.get('/api/jobs/<status_val>')
|
||||
def filtered_jobs_json(status_val):
|
||||
state = string_to_status(status_val)
|
||||
jobs = [x.json() for x in RenderQueue.jobs_with_status(state)]
|
||||
if jobs:
|
||||
return jobs
|
||||
else:
|
||||
return f'Cannot find jobs with status {status_val}', 400
|
||||
|
||||
|
||||
@server.errorhandler(JobNotFoundError)
|
||||
def handle_job_not_found(job_error):
|
||||
return f'Cannot find job with ID {job_error.job_id}', 400
|
||||
|
||||
|
||||
@server.get('/api/job/<job_id>')
|
||||
def get_job_status(job_id):
|
||||
return RenderQueue.job_with_id(job_id).json()
|
||||
|
||||
|
||||
@server.get('/api/job/<job_id>/logs')
|
||||
def get_job_logs(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
log_path = found_job.worker.log_path
|
||||
log_data = None
|
||||
if log_path and os.path.exists(log_path):
|
||||
with open(log_path) as file:
|
||||
log_data = file.read()
|
||||
return Response(log_data, mimetype='text/plain')
|
||||
|
||||
|
||||
@server.get('/api/job/<job_id>/file_list')
|
||||
def get_file_list(job_id):
|
||||
return RenderQueue.job_with_id(job_id)
|
||||
|
||||
|
||||
@server.route('/api/job/<job_id>/download_all')
|
||||
def download_all(job_id):
|
||||
zip_filename = None
|
||||
|
||||
@after_this_request
|
||||
def clear_zip(response):
|
||||
if zip_filename and os.path.exists(zip_filename):
|
||||
os.remove(zip_filename)
|
||||
return response
|
||||
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
output_dir = os.path.dirname(found_job.worker.output_path)
|
||||
if os.path.exists(output_dir):
|
||||
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.worker.input_path).stem + '.zip')
|
||||
with ZipFile(zip_filename, 'w') as zipObj:
|
||||
for f in os.listdir(output_dir):
|
||||
zipObj.write(filename=os.path.join(output_dir, f),
|
||||
arcname=os.path.basename(f))
|
||||
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
|
||||
else:
|
||||
return f'Cannot find project files for job {job_id}', 500
|
||||
|
||||
|
||||
@server.post('/api/register_client')
|
||||
def register_client():
|
||||
client_hostname = request.values['hostname']
|
||||
x = RenderQueue.register_client(client_hostname)
|
||||
return "Success" if x else "Fail"
|
||||
|
||||
|
||||
@server.post('/api/unregister_client')
|
||||
def unregister_client():
|
||||
client_hostname = request.values['hostname']
|
||||
x = RenderQueue.unregister_client(client_hostname)
|
||||
return "Success" if x else "Fail"
|
||||
|
||||
|
||||
@server.get('/api/clients')
|
||||
def render_clients():
|
||||
return RenderQueue.render_clients
|
||||
|
||||
|
||||
@server.get('/api/full_status')
|
||||
def full_status():
|
||||
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
|
||||
|
||||
try:
|
||||
for client_hostname in RenderQueue.render_clients:
|
||||
is_online = False
|
||||
if client_hostname == RenderQueue.host_name:
|
||||
snapshot_results = snapshot()
|
||||
is_online = True
|
||||
else:
|
||||
snapshot_results = {}
|
||||
try:
|
||||
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
|
||||
snapshot_results = snapshot_request.json()
|
||||
is_online = snapshot_request.ok
|
||||
except requests.ConnectionError as e:
|
||||
pass
|
||||
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
|
||||
'is_online': is_online}
|
||||
full_results['servers'][client_hostname] = server_data
|
||||
except Exception as e:
|
||||
logger.error(f"Exception fetching full status: {e}")
|
||||
|
||||
return full_results
|
||||
|
||||
|
||||
@server.get('/api/snapshot')
|
||||
def snapshot():
|
||||
server_status = RenderQueue.status()
|
||||
server_jobs = [x.json() for x in RenderQueue.job_queue]
|
||||
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
|
||||
return server_data
|
||||
|
||||
|
||||
@server.post('/api/add_job')
|
||||
def add_job_handler():
|
||||
try:
|
||||
"""Create new job and add to server render queue"""
|
||||
if request.is_json:
|
||||
jobs_list = [request.json]
|
||||
elif request.form.get('json', None):
|
||||
jobs_list = json.loads(request.form['json'])
|
||||
else:
|
||||
# Cleanup flat form data into nested structure
|
||||
form_dict = {k: v for k, v in dict(request.form).items() if v}
|
||||
args = {}
|
||||
arg_keys = [k for k in form_dict.keys() if '-arg_' in k]
|
||||
for key in arg_keys:
|
||||
if form_dict['renderer'] in key or 'AnyRenderer' in key:
|
||||
cleaned_key = key.split('-arg_')[-1]
|
||||
args[cleaned_key] = form_dict[key]
|
||||
form_dict.pop(key)
|
||||
args['raw'] = form_dict.get('raw_args', None)
|
||||
form_dict['args'] = args
|
||||
jobs_list = [form_dict]
|
||||
|
||||
# handle uploaded files
|
||||
uploaded_file = request.files.get('file', None)
|
||||
uploaded_file_local_path = None
|
||||
job_dir = None
|
||||
if uploaded_file and uploaded_file.filename:
|
||||
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
|
||||
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], (uploaded_file.filename + "_" +
|
||||
datetime.now().strftime("%Y.%m.%d_%H.%M.%S")))
|
||||
os.makedirs(job_dir, exist_ok=True)
|
||||
uploaded_file_local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
|
||||
uploaded_file.save(uploaded_file_local_path)
|
||||
|
||||
# convert job input paths for uploaded files and add jobs
|
||||
results = []
|
||||
for job in jobs_list:
|
||||
if uploaded_file_local_path:
|
||||
job['input_path'] = uploaded_file_local_path
|
||||
output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
job['output_path'] = os.path.join(output_dir, os.path.basename(job['output_path']))
|
||||
remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only
|
||||
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
|
||||
results.append(add_result)
|
||||
|
||||
# return any errors from results list
|
||||
for response in results:
|
||||
if response.get('error', None):
|
||||
if len(results) == 1:
|
||||
return results, response.get('code', 500)
|
||||
else:
|
||||
return results, 400
|
||||
|
||||
if request.args.get('redirect', False):
|
||||
return redirect(url_for('index'))
|
||||
else:
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unknown error adding job: {e}")
|
||||
return 'unknown error', 500
|
||||
|
||||
|
||||
def add_job(job_params, remove_job_dir_on_failure=False):
|
||||
def remove_job_dir():
|
||||
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
|
||||
logger.debug(f"Removing job dir: {job_dir}")
|
||||
shutil.rmtree(job_dir)
|
||||
|
||||
name = job_params.get("name", None)
|
||||
job_owner = job_params.get("owner", None)
|
||||
renderer = job_params.get("renderer", None)
|
||||
input_path = job_params.get("input_path", None)
|
||||
output_path = job_params.get("output_path", None)
|
||||
priority = int(job_params.get('priority', 2))
|
||||
args = job_params.get('args', {})
|
||||
client = job_params.get('client', None) or RenderQueue.host_name
|
||||
force_start = job_params.get('force_start', False)
|
||||
custom_id = None
|
||||
job_dir = None
|
||||
|
||||
# check for minimum render requirements
|
||||
if None in [renderer, input_path, output_path]:
|
||||
err_msg = 'Cannot add job: Missing required parameters'
|
||||
logger.error(err_msg)
|
||||
return {'error': err_msg, 'code': 400}
|
||||
|
||||
# local renders
|
||||
if client == RenderQueue.host_name:
|
||||
logger.info(f"Creating job locally - {name if name else input_path}")
|
||||
try:
|
||||
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
|
||||
notify=False, custom_id=custom_id, name=name)
|
||||
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
|
||||
return render_job.json()
|
||||
except Exception as e:
|
||||
err_msg = f"Error creating job: {str(e)}"
|
||||
logger.exception(err_msg)
|
||||
remove_job_dir()
|
||||
return {'error': err_msg, 'code': 400}
|
||||
|
||||
# client renders
|
||||
elif client in RenderQueue.render_clients:
|
||||
|
||||
# see if host is available
|
||||
if RenderQueue.is_client_available(client):
|
||||
|
||||
# call uploader on remote client
|
||||
try:
|
||||
logger.info(f"Uploading file {input_path} to client {client}")
|
||||
job_data = request.json
|
||||
response = post_job_to_server(input_path, job_data, client)
|
||||
if response.ok:
|
||||
logger.info("Job submitted successfully!")
|
||||
return response.json() if response.json() else "Job ok"
|
||||
else:
|
||||
remove_job_dir()
|
||||
return {'error': "Job rejected by client", 'code': 400}
|
||||
except requests.ConnectionError as e:
|
||||
err_msg = f"Error submitting job to client: {client}"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return {'error': err_msg, 'code': 500}
|
||||
else:
|
||||
# client is not available
|
||||
err_msg = f"Render client '{client}' is unreachable"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return {'error': err_msg, 'code': 503}
|
||||
|
||||
else:
|
||||
err_msg = f"Unknown render client: '{client}'"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return {'error': err_msg, 'code': 400}
|
||||
|
||||
|
||||
@server.get('/api/job/<job_id>/cancel')
|
||||
def cancel_job(job_id):
|
||||
if not request.args.get('confirm', False):
|
||||
return 'Confirmation required to cancel job', 400
|
||||
|
||||
if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)):
|
||||
if request.args.get('redirect', False):
|
||||
return redirect(url_for('index'))
|
||||
else:
|
||||
return "Job cancelled"
|
||||
else:
|
||||
return "Unknown error", 500
|
||||
|
||||
|
||||
@server.route('/api/job/<job_id>/delete', methods=['POST', 'GET'])
|
||||
def delete_job(job_id):
|
||||
try:
|
||||
if not request.args.get('confirm', False):
|
||||
return 'Confirmation required to delete job', 400
|
||||
|
||||
# First, remove all render files and logs
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
files_to_delete = found_job.file_list()
|
||||
files_to_delete.append(found_job.log_path())
|
||||
for d in files_to_delete:
|
||||
if os.path.exists(d):
|
||||
os.remove(d)
|
||||
|
||||
# Check if we can remove the 'output' directory
|
||||
output_dir = os.path.dirname(files_to_delete[0])
|
||||
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# Remove any thumbnails
|
||||
for filename in os.listdir(server.config['THUMBS_FOLDER']):
|
||||
if job_id in filename:
|
||||
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
|
||||
|
||||
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
||||
if os.path.exists(thumb_path):
|
||||
os.remove(thumb_path)
|
||||
|
||||
# See if we own the input file (i.e. was it uploaded)
|
||||
input_dir = os.path.dirname(found_job.worker.input_path)
|
||||
if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir):
|
||||
shutil.rmtree(input_dir)
|
||||
|
||||
RenderQueue.delete_job(found_job)
|
||||
if request.args.get('redirect', False):
|
||||
return redirect(url_for('index'))
|
||||
else:
|
||||
return "Job deleted", 200
|
||||
|
||||
except Exception as e:
|
||||
return f"Unknown error: {e}", 500
|
||||
|
||||
|
||||
@server.get('/api/clear_history')
|
||||
def clear_history():
|
||||
RenderQueue.clear_history()
|
||||
return 'success'
|
||||
|
||||
|
||||
@server.route('/api/status')
|
||||
def status():
|
||||
return RenderQueue.status()
|
||||
|
||||
|
||||
@server.get('/api/renderer_info')
|
||||
def renderer_info():
|
||||
renderer_data = {}
|
||||
for r in RenderWorkerFactory.supported_renderers():
|
||||
renderer_class = RenderWorkerFactory.class_for_name(r)
|
||||
renderer_data[r] = {'available': renderer_class.renderer_path() is not None,
|
||||
'version': renderer_class.version(),
|
||||
'supported_extensions': renderer_class.supported_extensions,
|
||||
'supported_export_formats': renderer_class.supported_export_formats}
|
||||
return renderer_data
|
||||
|
||||
|
||||
@server.route('/upload')
|
||||
def upload_file_page():
|
||||
return render_template('upload.html', render_clients=RenderQueue.render_clients,
|
||||
supported_renderers=RenderWorkerFactory.supported_renderers())
|
||||
|
||||
|
||||
def start_server(background_thread=False):
|
||||
def eval_loop(delay_sec=1):
|
||||
while True:
|
||||
RenderQueue.evaluate_queue()
|
||||
time.sleep(delay_sec)
|
||||
|
||||
with open('config/config.yaml') as f:
|
||||
config = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
||||
level=config.get('server_log_level', 'INFO').upper())
|
||||
|
||||
server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder'])
|
||||
server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs')
|
||||
server.config['MAX_CONTENT_PATH'] = config['max_content_path']
|
||||
|
||||
# Get hostname and render clients
|
||||
RenderQueue.host_name = socket.gethostname()
|
||||
server.config['HOSTNAME'] = RenderQueue.host_name
|
||||
if not RenderQueue.render_clients:
|
||||
RenderQueue.render_clients = [RenderQueue.host_name]
|
||||
|
||||
# disable most Flask logging
|
||||
flask_log = logging.getLogger('werkzeug')
|
||||
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
|
||||
|
||||
# Set up the RenderQueue object
|
||||
RenderQueue.load_state()
|
||||
|
||||
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
|
||||
thread.start()
|
||||
|
||||
logging.info(f"Starting Zordon Render Server - Hostname: '{RenderQueue.host_name}'")
|
||||
|
||||
if background_thread:
|
||||
server_thread = threading.Thread(
|
||||
target=lambda: server.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False))
|
||||
server_thread.start()
|
||||
server_thread.join()
|
||||
else:
|
||||
server.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False),
|
||||
use_reloader=False, threaded=True)
|
||||
Reference in New Issue
Block a user