Files
Zordon/lib/job_server.py
2022-12-11 14:19:59 -08:00

427 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import logging
import os
import pathlib
import shutil
import json2html
from datetime import datetime
from zipfile import ZipFile
import requests
import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from werkzeug.utils import secure_filename
from lib.render_job import RenderJob
from lib.render_queue import RenderQueue
from lib.server_helper import post_job_to_server
from utilities.render_worker import RenderWorkerFactory, string_to_status, RenderStatus
logger = logging.getLogger()
server = Flask(__name__, template_folder='../templates')
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
def sorted_jobs(all_jobs, sort_by_date=True):
if not sort_by_date:
sorted_job_list = []
if all_jobs:
for status_category in categories:
found_jobs = [x for x in all_jobs if x.render_status() == status_category.value]
if found_jobs:
sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True)
sorted_job_list.extend(sorted_found_jobs)
else:
sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True)
return sorted_job_list
@server.route('/')
@server.route('/index')
def index():
with open('utilities/presets.yaml') as f:
presets = yaml.load(f, Loader=yaml.FullLoader)
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.job_queue), hostname=RenderQueue.host_name,
renderer_info=renderer_info(), render_clients=RenderQueue.render_clients, preset_list=presets)
@server.route('/ui/job/<job_id>/full_details')
def job_detail(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
table_html = json2html.json2html.convert(json=found_job.json(), table_attributes='class="table is-narrow is-striped"')
media_url = None
if found_job.file_list():
media_basename = os.path.basename(found_job.file_list()[0])
media_url = f"/api/job/{job_id}/file/{media_basename}"
return render_template('details.html', detail_table=table_html, media_url=media_url)
return f'Cannot find job with ID {job_id}', 400
# Get job file routing
@server.route('/api/job/<job_id>/file/<filename>', methods=['GET'])
def get_job_file(job_id, filename):
found_job = RenderQueue.job_with_id(job_id)
try:
for full_path in found_job.file_list():
if filename in full_path:
return send_file(path_or_file=full_path)
except FileNotFoundError:
abort(404)
@server.get('/api/jobs')
def jobs_json():
return [x.json() for x in RenderQueue.job_queue]
@server.get('/api/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [x.json() for x in RenderQueue.jobs_with_status(state)]
if jobs:
return jobs
else:
return f'Cannot find jobs with status {status_val}', 400
@server.get('/api/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
return found_job.json()
else:
return f'Cannot find job with ID {job_id}', 400
@server.get('/api/job/<job_id>/logs')
def get_job_logs(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
log_path = found_job.worker.log_path
log_data = None
if log_path and os.path.exists(log_path):
with open(log_path) as file:
log_data = file.read()
return Response(log_data, mimetype='text/plain')
else:
return f'Cannot find job with ID {job_id}', 400
@server.get('/api/job/<job_id>/file_list')
def get_file_list(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
return '\n'.join(found_job.file_list())
else:
return f'Cannot find job with ID {job_id}', 400
@server.route('/api/job/<job_id>/download_all')
def download_all(job_id):
zip_filename = None
@after_this_request
def clear_zip(response):
if zip_filename and os.path.exists(zip_filename):
os.remove(zip_filename)
return response
found_job = RenderQueue.job_with_id(job_id)
if found_job:
output_dir = os.path.dirname(found_job.worker.output_path)
if os.path.exists(output_dir):
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.worker.input_path).stem + '.zip')
with ZipFile(zip_filename, 'w') as zipObj:
for f in os.listdir(output_dir):
zipObj.write(filename=os.path.join(output_dir, f),
arcname=os.path.basename(f))
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
else:
return f'Cannot find project files for job {job_id}', 500
else:
return f'Cannot find job with ID {job_id}', 400
@server.post('/api/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderQueue.register_client(client_hostname)
return "Success" if x else "Fail"
@server.post('/api/unregister_client')
def unregister_client():
client_hostname = request.values['hostname']
x = RenderQueue.unregister_client(client_hostname)
return "Success" if x else "Fail"
@server.get('/api/clients')
def render_clients():
return RenderQueue.render_clients
@server.get('/api/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
try:
for client_hostname in RenderQueue.render_clients:
is_online = False
if client_hostname == RenderQueue.host_name:
snapshot_results = snapshot()
is_online = True
else:
snapshot_results = {}
try:
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
snapshot_results = snapshot_request.json()
is_online = snapshot_request.ok
except requests.ConnectionError as e:
pass
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
'is_online': is_online}
full_results['servers'][client_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@server.get('/api/snapshot')
def snapshot():
server_status = RenderQueue.status()
server_jobs = [x.json() for x in RenderQueue.job_queue]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@server.post('/api/add_job')
def add_job_handler():
try:
"""Create new job and add to server render queue"""
if request.is_json:
jobs_list = [request.json]
elif request.form.get('json', None):
jobs_list = json.loads(request.form['json'])
else:
form_dict = dict(request.form)
args = {}
arg_keys = [k for k in form_dict.keys() if '-arg_' in k]
for key in arg_keys:
if form_dict['renderer'] in key:
cleaned_key = key.split('-arg_')[-1]
args[cleaned_key] = form_dict[key]
form_dict.pop(key)
args['raw'] = form_dict.get('raw_args', None)
form_dict['args'] = args
jobs_list = [form_dict]
# handle uploaded files
uploaded_file = request.files.get('file', None)
uploaded_file_local_path = None
job_dir = None
if uploaded_file and uploaded_file.filename:
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], (uploaded_file.filename + "_" +
datetime.now().strftime("%Y.%m.%d_%H.%M.%S")))
os.makedirs(job_dir, exist_ok=True)
uploaded_file_local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(uploaded_file_local_path)
# convert job input paths for uploaded files and add jobs
results = []
for job in jobs_list:
if uploaded_file_local_path:
job['input_path'] = uploaded_file_local_path
output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
os.makedirs(output_dir, exist_ok=True)
job['output_path'] = os.path.join(output_dir, os.path.basename(job['output_path']))
remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
results.append(add_result)
# return any errors from results list
for response in results:
if response.get('error', None):
if len(results) == 1:
return results, response.get('code', 500)
else:
return results, 400
if request.args.get('redirect', False):
return redirect(url_for('index'))
else:
return results
except Exception as e:
logger.exception(f"Unknown error adding job: {e}")
return 'unknown error', 500
def add_job(job_params, remove_job_dir_on_failure=False):
def remove_job_dir():
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
logger.debug(f"Removing job dir: {job_dir}")
shutil.rmtree(job_dir)
name = job_params.get("name", None)
job_owner = job_params.get("owner", None)
renderer = job_params.get("renderer", None)
input_path = job_params.get("input_path", None)
output_path = job_params.get("output_path", None)
priority = int(job_params.get('priority', 2))
args = job_params.get('args', {})
client = job_params.get('client', RenderQueue.host_name)
force_start = job_params.get('force_start', False)
custom_id = None
job_dir = None
# check for minimum render requirements
if None in [renderer, input_path, output_path]:
err_msg = 'Cannot add job: Missing required parameters'
logger.error(err_msg)
return {'error': err_msg, 'code': 400}
# local renders
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
# client renders
elif client in RenderQueue.render_clients:
# see if host is available
if RenderQueue.is_client_available(client):
# call uploader on remote client
try:
logger.info(f"Uploading file {input_path} to client {client}")
job_data = request.json
response = post_job_to_server(input_path, job_data, client)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
remove_job_dir()
return {'error': "Job rejected by client", 'code': 400}
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 500}
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 503}
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
@server.get('/api/job/<job_id>/cancel')
def cancel_job(job_id):
found_job = RenderQueue.job_with_id(job_id)
if not found_job:
return f'Cannot find job with ID {job_id}', 400
elif not request.args.get('confirm', False):
return 'Confirmation required to cancel job', 400
if RenderQueue.cancel_job(found_job):
if request.args.get('redirect', False):
return redirect(url_for('index'))
else:
return "Job cancelled"
else:
return "Unknown error", 500
@server.route('/api/job/<job_id>/delete', methods=['POST', 'GET'])
def delete_job(job_id):
try:
found_job = RenderQueue.job_with_id(job_id)
if not found_job:
return f'Cannot find job with ID {job_id}', 400
elif not request.args.get('confirm', False):
return 'Confirmation required to delete job', 400
# First, remove all render files and logs
files_to_delete = found_job.file_list()
files_to_delete.append(found_job.log_path())
for d in files_to_delete:
if os.path.exists(d):
os.remove(d)
# Check if we can remove the 'output' directory
output_dir = os.path.dirname(files_to_delete[0])
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
shutil.rmtree(output_dir)
# See if we own the input file (i.e. was it uploaded)
input_dir = os.path.dirname(found_job.worker.input_path)
if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir):
shutil.rmtree(input_dir)
RenderQueue.delete_job(found_job)
if request.args.get('redirect', False):
return redirect(url_for('index'))
else:
return "Job deleted", 200
except Exception as e:
return f"Unknown error: {e}", 500
@server.get('/api/clear_history')
def clear_history():
RenderQueue.clear_history()
return 'success'
@server.route('/api/status')
def status():
return RenderQueue.status()
@server.get('/api/renderer_info')
def renderer_info():
renderer_data = {}
for r in RenderWorkerFactory.supported_renderers():
renderer_class = RenderWorkerFactory.class_for_name(r)
renderer_data[r] = {'available': renderer_class.renderer_path() is not None,
'version': renderer_class.version(),
'supported_extensions': renderer_class.supported_extensions,
'supported_export_formats': renderer_class.supported_export_formats}
return renderer_data
@server.route('/upload')
def upload_file_page():
return render_template('upload.html', render_clients=RenderQueue.render_clients,
supported_renderers=RenderWorkerFactory.supported_renderers())