mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Repo cleanup. Move job_server class to lib.
This commit is contained in:
303
lib/job_server.py
Executable file
303
lib/job_server.py
Executable file
@@ -0,0 +1,303 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from zipfile import ZipFile
|
||||
|
||||
import requests
|
||||
from flask import Flask, request, render_template, send_file, after_this_request
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from lib.render_job import RenderJob
|
||||
from lib.render_queue import RenderQueue
|
||||
from utilities.render_worker import RenderWorkerFactory, string_to_status
|
||||
|
||||
logger = logging.getLogger()
|
||||
server = Flask(__name__)
|
||||
|
||||
|
||||
@server.get('/jobs')
|
||||
def jobs_json():
|
||||
return [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived]
|
||||
|
||||
|
||||
@server.get('/jobs/<status_val>')
|
||||
def filtered_jobs_json(status_val):
|
||||
state = string_to_status(status_val)
|
||||
jobs = [json.loads(x.json()) for x in RenderQueue.jobs_with_status(state)]
|
||||
if jobs:
|
||||
return jobs
|
||||
else:
|
||||
return f'Cannot find jobs with status {status_val}', 400
|
||||
|
||||
|
||||
@server.get('/job_status/<job_id>')
|
||||
def get_job_status(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
if found_job:
|
||||
return found_job.json()
|
||||
else:
|
||||
return f'Cannot find job with ID {job_id}', 400
|
||||
|
||||
|
||||
@server.get('/file_list/<job_id>')
|
||||
def get_file_list(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
if found_job:
|
||||
job_dir = os.path.dirname(found_job.render.output_path)
|
||||
return os.listdir(job_dir)
|
||||
else:
|
||||
return f'Cannot find job with ID {job_id}', 400
|
||||
|
||||
|
||||
@server.route('/download_all/<job_id>')
|
||||
def download_all(job_id):
|
||||
|
||||
zip_filename = None
|
||||
|
||||
@after_this_request
|
||||
def clear_zip(response):
|
||||
if zip_filename and os.path.exists(zip_filename):
|
||||
os.remove(zip_filename)
|
||||
return response
|
||||
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
if found_job:
|
||||
output_dir = os.path.dirname(found_job.render.output_path)
|
||||
if os.path.exists(output_dir):
|
||||
zip_filename = pathlib.Path(found_job.render.input_path).stem + '.zip'
|
||||
with ZipFile(zip_filename, 'w') as zipObj:
|
||||
for f in os.listdir(output_dir):
|
||||
zipObj.write(filename=os.path.join(output_dir, f),
|
||||
arcname=os.path.basename(f))
|
||||
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
|
||||
else:
|
||||
return f'Cannot find project files for job {job_id}', 500
|
||||
else:
|
||||
return f'Cannot find job with ID {job_id}', 400
|
||||
|
||||
|
||||
@server.post('/register_client')
|
||||
def register_client():
|
||||
client_hostname = request.values['hostname']
|
||||
x = RenderQueue.register_client(client_hostname)
|
||||
return "Success" if x else "Fail"
|
||||
|
||||
|
||||
@server.post('/unregister_client')
|
||||
def unregister_client():
|
||||
client_hostname = request.values['hostname']
|
||||
x = RenderQueue.unregister_client(client_hostname)
|
||||
return "Success" if x else "Fail"
|
||||
|
||||
|
||||
@server.get('/clients')
|
||||
def render_clients():
|
||||
return RenderQueue.render_clients
|
||||
|
||||
|
||||
@server.get('/full_status')
|
||||
def full_status():
|
||||
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
|
||||
|
||||
try:
|
||||
for client_hostname in RenderQueue.render_clients:
|
||||
is_online = False
|
||||
if client_hostname == RenderQueue.host_name:
|
||||
snapshot_results = snapshot()
|
||||
is_online = True
|
||||
else:
|
||||
snapshot_results = {}
|
||||
try:
|
||||
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
|
||||
snapshot_results = snapshot_request.json()
|
||||
is_online = snapshot_request.ok
|
||||
except requests.ConnectionError as e:
|
||||
pass
|
||||
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
|
||||
'is_online': is_online}
|
||||
full_results['servers'][client_hostname] = server_data
|
||||
except Exception as e:
|
||||
logger.error(f"Exception fetching full status: {e}")
|
||||
|
||||
return full_results
|
||||
|
||||
|
||||
@server.get('/snapshot')
|
||||
def snapshot():
|
||||
server_status = RenderQueue.status()
|
||||
server_jobs = [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived]
|
||||
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
|
||||
return server_data
|
||||
|
||||
|
||||
@server.post('/add_job')
|
||||
def add_job():
|
||||
def remove_job_dir():
|
||||
if job_dir and os.path.exists(job_dir):
|
||||
logger.debug(f"Removing job dir: {job_dir}")
|
||||
shutil.rmtree(job_dir)
|
||||
|
||||
try:
|
||||
"""Create new job and add to server render queue"""
|
||||
json_string = request.form.get('json', None)
|
||||
if not json_string:
|
||||
return 'missing json data', 400
|
||||
|
||||
request_params = json.loads(json_string)
|
||||
job_owner = request_params.get("owner", None)
|
||||
renderer = request_params.get("renderer", None)
|
||||
input_path = request_params.get("input", None)
|
||||
output_path = request_params.get("output", None)
|
||||
priority = int(request_params.get('priority', 2))
|
||||
args = request_params.get('args', {})
|
||||
client = request_params.get('client', RenderQueue.host_name)
|
||||
force_start = request_params.get('force_start', False)
|
||||
uploaded_file = request.files.get('file', None)
|
||||
html_origin = request_params.get('origin', None) == 'html'
|
||||
custom_id = None
|
||||
job_dir = None
|
||||
|
||||
# check for minimum render requirements
|
||||
if None in [renderer, input_path or (uploaded_file and uploaded_file.filename), output_path]:
|
||||
err_msg = 'Cannot add job: Missing required parameters'
|
||||
logger.error(err_msg)
|
||||
return err_msg, 400
|
||||
|
||||
# handle uploaded files
|
||||
if uploaded_file and uploaded_file.filename:
|
||||
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
|
||||
new_id = RenderJob.generate_id()
|
||||
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
|
||||
if not os.path.exists(job_dir):
|
||||
os.makedirs(job_dir)
|
||||
|
||||
local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
|
||||
uploaded_file.save(local_path)
|
||||
input_path = local_path
|
||||
output_dir = os.path.join(job_dir, 'output')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, os.path.basename(output_path))
|
||||
|
||||
# local renders
|
||||
if client == RenderQueue.host_name:
|
||||
logger.info(f"Creating job locally - {input_path}")
|
||||
try:
|
||||
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
|
||||
render_job.log_path = os.path.join(os.path.dirname(input_path), os.path.basename(input_path) + '.log')
|
||||
except Exception as e:
|
||||
err_msg = f"Error creating job: {str(e)}"
|
||||
logger.exception(err_msg)
|
||||
remove_job_dir()
|
||||
return err_msg, 400
|
||||
|
||||
new_job = RenderJob(render_job, priority=priority, owner=job_owner, custom_id=custom_id)
|
||||
RenderQueue.add_to_render_queue(new_job, force_start=force_start)
|
||||
|
||||
return new_job.json()
|
||||
|
||||
# client renders
|
||||
elif client in RenderQueue.render_clients:
|
||||
|
||||
# see if host is available
|
||||
if RenderQueue.is_client_available(client):
|
||||
|
||||
# call uploader on remote client
|
||||
try:
|
||||
logger.info(f"Uploading file {input_path} to client {client}")
|
||||
job_data = request.json
|
||||
response = post_job_to_server(input_path, job_data)
|
||||
if response.ok:
|
||||
logger.info("Job submitted successfully!")
|
||||
return response.json() if response.json() else "Job ok"
|
||||
else:
|
||||
remove_job_dir()
|
||||
return 'Job rejected by client', 403
|
||||
except requests.ConnectionError as e:
|
||||
err_msg = f"Error submitting job to client: {client}"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return err_msg, 500
|
||||
else:
|
||||
# client is not available
|
||||
err_msg = f"Render client '{client}' is unreachable"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return err_msg, 503
|
||||
|
||||
else:
|
||||
err_msg = f"Unknown render client: '{client}'"
|
||||
logger.error(err_msg)
|
||||
remove_job_dir()
|
||||
return err_msg, 400
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unknown error adding job: {e}")
|
||||
remove_job_dir()
|
||||
return 'unknown error', 500
|
||||
|
||||
|
||||
@server.get('/cancel_job')
|
||||
def cancel_job():
|
||||
job_id = request.args.get('id', None)
|
||||
confirm = request.args.get('confirm', False)
|
||||
if not job_id:
|
||||
return 'job id not found', 400
|
||||
elif not confirm:
|
||||
return 'confirmation required', 400
|
||||
else:
|
||||
found = [x for x in RenderQueue.job_queue if x.id == job_id]
|
||||
if len(found) > 1:
|
||||
# logger.error('Multiple jobs found for ID {}'.format(job_id))
|
||||
return f'multiple jobs found for ID {job_id}', 400
|
||||
elif found:
|
||||
success = RenderQueue.cancel_job(found[0])
|
||||
return success
|
||||
return 'job not found', 400
|
||||
|
||||
|
||||
@server.get('/clear_history')
|
||||
def clear_history():
|
||||
RenderQueue.clear_history()
|
||||
return 'success'
|
||||
|
||||
|
||||
@server.route('/status')
|
||||
def status():
|
||||
return RenderQueue.status()
|
||||
|
||||
|
||||
@server.get('/renderer_info')
|
||||
def renderer_info():
|
||||
renderer_data = {}
|
||||
for r in RenderWorkerFactory.supported_renderers():
|
||||
renderer_class = RenderWorkerFactory.class_for_name(r)
|
||||
renderer_data[r] = {'available': renderer_class.renderer_path() is not None,
|
||||
'version': renderer_class.version(),
|
||||
'supported_extensions': renderer_class.supported_extensions,
|
||||
'supported_export_formats': renderer_class.supported_export_formats}
|
||||
return renderer_data
|
||||
|
||||
|
||||
@server.route('/')
|
||||
def default():
|
||||
return "Server running"
|
||||
|
||||
|
||||
@server.route('/upload')
|
||||
def upload_file_page():
|
||||
return render_template('upload.html', render_clients=RenderQueue.render_clients,
|
||||
supported_renderers=RenderWorkerFactory.supported_renderers())
|
||||
|
||||
|
||||
def post_job_to_server(input_path, job_json, client, server_port=8080):
|
||||
# Pack job data and submit to server
|
||||
job_files = {'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'),
|
||||
'json': (None, json.dumps(job_json), 'application/json')}
|
||||
|
||||
req = requests.post(f'http://{client}:{server_port}/add_job', files=job_files)
|
||||
return req
|
||||
Reference in New Issue
Block a user