Files
Zordon/lib/job_server.py
2022-12-07 14:39:50 -08:00

335 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import logging
import os
import pathlib
import shutil
from datetime import datetime
from zipfile import ZipFile
import requests
from flask import Flask, request, render_template, send_file, after_this_request, Response
from werkzeug.utils import secure_filename
from lib.render_job import RenderJob
from lib.render_queue import RenderQueue
from utilities.render_worker import RenderWorkerFactory, string_to_status
logger = logging.getLogger()
server = Flask(__name__)
@server.get('/api/jobs')
def jobs_json():
return [x.json_safe_copy() for x in RenderQueue.job_queue]
@server.get('/api/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [x.json_safe_copy() for x in RenderQueue.jobs_with_status(state)]
if jobs:
return jobs
else:
return f'Cannot find jobs with status {status_val}', 400
@server.get('/api/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
return found_job.json_safe_copy()
else:
return f'Cannot find job with ID {job_id}', 400
@server.get('/api/job/<job_id>/logs')
def get_job_logs(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
log_path = found_job.worker.log_path
log_data = None
if log_path and os.path.exists(log_path):
with open(log_path) as file:
log_data = file.read()
return Response(log_data, mimetype='text/plain')
else:
return f'Cannot find job with ID {job_id}', 400
@server.get('/api/file_list/<job_id>')
def get_file_list(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
job_dir = os.path.dirname(found_job.worker.output_path)
return os.listdir(job_dir)
else:
return f'Cannot find job with ID {job_id}', 400
@server.route('/api/download_all/<job_id>')
def download_all(job_id):
zip_filename = None
@after_this_request
def clear_zip(response):
if zip_filename and os.path.exists(zip_filename):
os.remove(zip_filename)
return response
found_job = RenderQueue.job_with_id(job_id)
if found_job:
output_dir = os.path.dirname(found_job.worker.output_path)
if os.path.exists(output_dir):
zip_filename = pathlib.Path(found_job.worker.input_path).stem + '.zip'
with ZipFile(zip_filename, 'w') as zipObj:
for f in os.listdir(output_dir):
zipObj.write(filename=os.path.join(output_dir, f),
arcname=os.path.basename(f))
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
else:
return f'Cannot find project files for job {job_id}', 500
else:
return f'Cannot find job with ID {job_id}', 400
@server.post('/api/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderQueue.register_client(client_hostname)
return "Success" if x else "Fail"
@server.post('/api/unregister_client')
def unregister_client():
client_hostname = request.values['hostname']
x = RenderQueue.unregister_client(client_hostname)
return "Success" if x else "Fail"
@server.get('/api/clients')
def render_clients():
return RenderQueue.render_clients
@server.get('/api/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
try:
for client_hostname in RenderQueue.render_clients:
is_online = False
if client_hostname == RenderQueue.host_name:
snapshot_results = snapshot()
is_online = True
else:
snapshot_results = {}
try:
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
snapshot_results = snapshot_request.json()
is_online = snapshot_request.ok
except requests.ConnectionError as e:
pass
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
'is_online': is_online}
full_results['servers'][client_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@server.get('/api/snapshot')
def snapshot():
server_status = RenderQueue.status()
server_jobs = [x.json_safe_copy() for x in RenderQueue.job_queue]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@server.post('/api/add_job')
def add_job_handler():
try:
"""Create new job and add to server render queue"""
if not request.form.get('json', None) and not request.is_json:
return 'missing json data', 400
# handle uploaded files
uploaded_file = request.files.get('file', None)
uploaded_file_local_path = None
job_dir = None
if uploaded_file and uploaded_file.filename:
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], (uploaded_file.filename + "_" +
datetime.now().strftime("%Y.%m.%d_%H.%M.%S")))
os.makedirs(job_dir, exist_ok=True)
uploaded_file_local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(uploaded_file_local_path)
# convert job input paths for uploaded files and add jobs
json_string = request.form.get('json', None)
jobs_list = json.loads(json_string) if json_string else [request.json]
results = []
for job in jobs_list:
if uploaded_file_local_path:
job['input_path'] = uploaded_file_local_path
output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
os.makedirs(output_dir, exist_ok=True)
job['output_path'] = os.path.join(output_dir, os.path.basename(job['output_path']))
remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
results.append(add_result)
# return any errors from results list
for response in results:
if response.get('error', None):
if len(results) == 1:
return results, response.get('code', 500)
else:
return results, 400
return results
except Exception as e:
logger.exception(f"Unknown error adding job: {e}")
return 'unknown error', 500
def add_job(job_params, remove_job_dir_on_failure=False):
def remove_job_dir():
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
logger.debug(f"Removing job dir: {job_dir}")
shutil.rmtree(job_dir)
name = job_params.get("name", None)
job_owner = job_params.get("owner", None)
renderer = job_params.get("renderer", None)
input_path = job_params.get("input_path", None)
output_path = job_params.get("output_path", None)
priority = int(job_params.get('priority', 2))
args = job_params.get('args', {})
client = job_params.get('client', RenderQueue.host_name)
force_start = job_params.get('force_start', False)
custom_id = None
job_dir = None
# check for minimum render requirements
if None in [renderer, input_path, output_path]:
err_msg = 'Cannot add job: Missing required parameters'
logger.error(err_msg)
return {'error': err_msg, 'code': 400}
# local renders
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json_safe_copy()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
# client renders
elif client in RenderQueue.render_clients:
# see if host is available
if RenderQueue.is_client_available(client):
# call uploader on remote client
try:
logger.info(f"Uploading file {input_path} to client {client}")
job_data = request.json
response = post_job_to_server(input_path, job_data, client)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
remove_job_dir()
return {'error': "Job rejected by client", 'code': 400}
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 500}
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 503}
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
@server.get('/api/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
confirm = request.args.get('confirm', False)
if not job_id:
return 'job id not found', 400
elif not confirm:
return 'confirmation required', 400
else:
found = [x for x in RenderQueue.job_queue if x.id == job_id]
if len(found) > 1:
return f'multiple jobs found for ID {job_id}', 400
elif found:
success = RenderQueue.cancel_job(found[0])
return success
return 'job not found', 400
@server.get('/api/clear_history')
def clear_history():
RenderQueue.clear_history()
return 'success'
@server.route('/api/status')
def status():
return RenderQueue.status()
@server.get('/api/renderer_info')
def renderer_info():
renderer_data = {}
for r in RenderWorkerFactory.supported_renderers():
renderer_class = RenderWorkerFactory.class_for_name(r)
renderer_data[r] = {'available': renderer_class.renderer_path() is not None,
'version': renderer_class.version(),
'supported_extensions': renderer_class.supported_extensions,
'supported_export_formats': renderer_class.supported_export_formats}
return renderer_data
@server.route('/')
def default():
return "Server running"
@server.route('/upload')
def upload_file_page():
return render_template('upload.html', render_clients=RenderQueue.render_clients,
supported_renderers=RenderWorkerFactory.supported_renderers())
#todo: move this to a helper file
def post_job_to_server(input_path, job_list, client, server_port=8080):
# Pack job data and submit to server
job_files = {'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'),
'json': (None, json.dumps(job_list), 'application/json')}
req = requests.post(f'http://{client}:{server_port}/api/add_job', files=job_files)
return req