#!/usr/bin/env python3 import json import logging import os import pathlib import shutil import socket import threading import time from datetime import datetime from zipfile import ZipFile import requests import yaml from flask import Flask, request, render_template, send_file, after_this_request from werkzeug.utils import secure_filename from lib.render_job import RenderJob from lib.render_queue import RenderQueue from utilities.render_worker import RenderWorkerFactory, string_to_status logger = logging.getLogger() app = Flask(__name__) @app.get('/jobs') def jobs_json(): return [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] @app.get('/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [json.loads(x.json()) for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 @app.get('/job_status/') def get_job_status(job_id): found_job = RenderQueue.job_with_id(job_id) if found_job: return found_job.json() else: return f'Cannot find job with ID {job_id}', 400 @app.get('/file_list/') def get_file_list(job_id): found_job = RenderQueue.job_with_id(job_id) if found_job: job_dir = os.path.dirname(found_job.render.output_path) return os.listdir(job_dir) else: return f'Cannot find job with ID {job_id}', 400 @app.route('/download_all/') def download_all(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): os.remove(zip_filename) return response found_job = RenderQueue.job_with_id(job_id) if found_job: output_dir = os.path.dirname(found_job.render.output_path) if os.path.exists(output_dir): zip_filename = pathlib.Path(found_job.render.input_path).stem + '.zip' with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=os.path.join(output_dir, f), arcname=os.path.basename(f)) return send_file(zip_filename, mimetype="zip", as_attachment=True, ) else: return f'Cannot find project files for job {job_id}', 500 else: return f'Cannot find job with ID {job_id}', 400 @app.post('/register_client') def register_client(): client_hostname = request.values['hostname'] x = RenderQueue.register_client(client_hostname) return "Success" if x else "Fail" @app.post('/unregister_client') def unregister_client(): client_hostname = request.values['hostname'] x = RenderQueue.unregister_client(client_hostname) return "Success" if x else "Fail" @app.get('/clients') def render_clients(): return RenderQueue.render_clients @app.get('/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: for client_hostname in RenderQueue.render_clients: is_online = False if client_hostname == RenderQueue.host_name: snapshot_results = snapshot() is_online = True else: snapshot_results = {} try: snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) snapshot_results = snapshot_request.json() is_online = snapshot_request.ok except requests.ConnectionError as e: pass server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': is_online} full_results['servers'][client_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @app.get('/snapshot') def snapshot(): server_status = RenderQueue.status() server_jobs = [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @app.post('/add_job') def add_job(): def remove_job_dir(): if job_dir and os.path.exists(job_dir): logger.debug(f"Removing job dir: {job_dir}") shutil.rmtree(job_dir) try: """Create new job and add to server render queue""" json_string = request.form.get('json', None) if not json_string: return 'missing json data', 400 request_params = json.loads(json_string) job_owner = request_params.get("owner", None) renderer = request_params.get("renderer", None) input_path = request_params.get("input", None) output_path = request_params.get("output", None) priority = int(request_params.get('priority', 2)) args = request_params.get('args', {}) client = request_params.get('client', RenderQueue.host_name) force_start = request_params.get('force_start', False) uploaded_file = request.files.get('file', None) html_origin = request_params.get('origin', None) == 'html' custom_id = None job_dir = None # check for minimum render requirements if None in [renderer, input_path or (uploaded_file and uploaded_file.filename), output_path]: err_msg = 'Cannot add job: Missing required parameters' logger.error(err_msg) return err_msg, 400 # handle uploaded files if uploaded_file and uploaded_file.filename: logger.info(f"Receiving uploaded file {uploaded_file.filename}") new_id = RenderJob.generate_id() job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) if not os.path.exists(job_dir): os.makedirs(job_dir) local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) uploaded_file.save(local_path) input_path = local_path output_dir = os.path.join(job_dir, 'output') os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, os.path.basename(output_path)) # local renders if client == RenderQueue.host_name: logger.info(f"Creating job locally - {input_path}") try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) render_job.log_path = os.path.join(os.path.dirname(input_path), os.path.basename(input_path) + '.log') except Exception as e: err_msg = f"Error creating job: {str(e)}" logger.exception(err_msg) remove_job_dir() return err_msg, 400 new_job = RenderJob(render_job, priority=priority, owner=job_owner, custom_id=custom_id) RenderQueue.add_to_render_queue(new_job, force_start=force_start) return new_job.json() # client renders elif client in RenderQueue.render_clients: # see if host is available if RenderQueue.is_client_available(client): # call uploader on remote client try: logger.info(f"Uploading file {input_path} to client {client}") job_data = request.json response = post_job_to_server(input_path, job_data) if response.ok: logger.info("Job submitted successfully!") return response.json() if response.json() else "Job ok" else: remove_job_dir() return 'Job rejected by client', 403 except requests.ConnectionError as e: err_msg = f"Error submitting job to client: {client}" logger.error(err_msg) remove_job_dir() return err_msg, 500 else: # client is not available err_msg = f"Render client '{client}' is unreachable" logger.error(err_msg) remove_job_dir() return err_msg, 503 else: err_msg = f"Unknown render client: '{client}'" logger.error(err_msg) remove_job_dir() return err_msg, 400 except Exception as e: logger.exception(f"Unknown error adding job: {e}") remove_job_dir() return 'unknown error', 500 @app.get('/cancel_job') def cancel_job(): job_id = request.args.get('id', None) confirm = request.args.get('confirm', False) if not job_id: return 'job id not found', 400 elif not confirm: return 'confirmation required', 400 else: found = [x for x in RenderQueue.job_queue if x.id == job_id] if len(found) > 1: # logger.error('Multiple jobs found for ID {}'.format(job_id)) return f'multiple jobs found for ID {job_id}', 400 elif found: success = RenderQueue.cancel_job(found[0]) return success return 'job not found', 400 @app.get('/clear_history') def clear_history(): RenderQueue.clear_history() return 'success' @app.route('/status') def status(): return RenderQueue.status() @app.get('/renderer_info') def renderer_info(): renderer_data = {} for r in RenderWorkerFactory.supported_renderers(): renderer_class = RenderWorkerFactory.class_for_name(r) renderer_data[r] = {'available': renderer_class.renderer_path() is not None, 'version': renderer_class.version(), 'supported_extensions': renderer_class.supported_extensions, 'supported_export_formats': renderer_class.supported_export_formats} return renderer_data @app.route('/') def default(): return "Server running" @app.route('/upload') def upload_file_page(): return render_template('upload.html', render_clients=RenderQueue.render_clients, supported_renderers=RenderWorkerFactory.supported_renderers()) def post_job_to_server(input_path, job_json, client, server_port=8080): # Pack job data and submit to server job_files = {'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'), 'json': (None, json.dumps(job_json), 'application/json')} req = requests.post(f'http://{client}:{server_port}/add_job', files=job_files) return req def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: RenderQueue.evaluate_queue() time.sleep(delay_sec) with open('config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=config.get('server_log_level', 'INFO').upper()) app.config['UPLOAD_FOLDER'] = config['upload_folder'] app.config['MAX_CONTENT_PATH'] = config['max_content_path'] # app.config['RESULT_STATIC_PATH'] = 'static' # Get hostname and render clients RenderQueue.host_name = socket.gethostname() app.config['HOSTNAME'] = RenderQueue.host_name if not RenderQueue.render_clients: RenderQueue.render_clients = [RenderQueue.host_name] # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Setup the RenderManager object RenderQueue.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() if background_thread: server_thread = threading.Thread( target=lambda: app.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: app.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False), use_reloader=False) if __name__ == '__main__': start_server()