#!/usr/bin/env python3 import json import logging import os import shutil import socket import threading import time from datetime import datetime import requests import yaml from flask import Flask, request, render_template from werkzeug.utils import secure_filename from lib.render_job import RenderJob from lib.render_queue import RenderQueue from utilities.render_worker import RenderWorkerFactory, string_to_status logger = logging.getLogger() app = Flask(__name__) @app.get('/jobs') def jobs_json(): return [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] @app.get('/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [json.loads(x.json()) for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return {'error', f'Cannot find jobs with status {status_val}'}, 400 @app.get('/job_status/') def get_job_status(job_id): found_job = RenderQueue.job_with_id(job_id) if found_job: logger.info("Founbd jobs") return found_job.json() else: return {'error': f'Cannot find job with ID {job_id}'}, 400 @app.post('/register_client') def register_client(): client_hostname = request.values['hostname'] x = RenderQueue.register_client(client_hostname) return "Success" if x else "Fail" @app.post('/unregister_client') def unregister_client(): client_hostname = request.values['hostname'] x = RenderQueue.unregister_client(client_hostname) return "Success" if x else "Fail" @app.get('/clients') def render_clients(): return RenderQueue.render_clients @app.get('/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: for client_hostname in RenderQueue.render_clients: is_online = False if client_hostname == RenderQueue.host_name: snapshot_results = snapshot() is_online = True else: snapshot_results = {} try: snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) snapshot_results = snapshot_request.json() is_online = snapshot_request.ok except requests.ConnectionError as e: pass server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': is_online} full_results['servers'][client_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @app.get('/snapshot') def snapshot(): server_status = RenderQueue.status() server_jobs = [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @app.post('/add_job') def add_job(): def remove_job_dir(): if job_dir and os.path.exists(job_dir): logger.debug(f"Removing job dir: {job_dir}") shutil.rmtree(job_dir) try: """Create new job and add to server render queue""" json_string = request.form.get('json', None) if not json_string: return 'missing json data', 400 request_params = json.loads(json_string) job_owner = request_params.get("owner", None) renderer = request_params.get("renderer", None) input_path = request_params.get("input", None) output_path = request_params.get("output", None) priority = int(request_params.get('priority', 2)) args = request_params.get('args', {}) client = request_params.get('client', RenderQueue.host_name) force_start = request_params.get('force_start', False) uploaded_file = request.files.get('file', None) html_origin = request_params.get('origin', None) == 'html' custom_id = None job_dir = None # check for minimum render requirements if None in [renderer, input_path or (uploaded_file and uploaded_file.filename), output_path]: err_msg = 'Cannot add job: Missing required parameters' logger.error(err_msg) return err_msg, 400 # handle uploaded files if uploaded_file and uploaded_file.filename: logger.info(f"Receiving uploaded file {uploaded_file.filename}") new_id = RenderJob.generate_id() job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) if not os.path.exists(job_dir): os.makedirs(job_dir) local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) uploaded_file.save(local_path) input_path = local_path output_path = os.path.join(job_dir, os.path.basename(output_path)) # local renders if client == RenderQueue.host_name: logger.info(f"Creating job locally - {input_path}") try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) except ValueError as e: err_msg = f"Error creating job: {str(e)}" logger.exception(err_msg) remove_job_dir() return err_msg, 400 new_job = RenderJob(render_job, priority=priority, owner=job_owner, custom_id=custom_id) RenderQueue.add_to_render_queue(new_job, force_start=force_start) return new_job.json() # client renders elif client in RenderQueue.render_clients: # see if host is available if RenderQueue.is_client_available(client): # call uploader on remote client try: logger.info(f"Uploading file {input_path} to client {client}") job_data = request.json response = post_job_to_server(input_path, job_data) if response.ok: logger.info("Job submitted successfully!") return response.json() if response.json() else "Job ok" else: remove_job_dir() return 'Job rejected by client', 403 except requests.ConnectionError as e: err_msg = f"Error submitting job to client: {client}" logger.error(err_msg) remove_job_dir() return err_msg, 500 else: # client is not available err_msg = f"Render client '{client}' is unreachable" logger.error(err_msg) remove_job_dir() return err_msg, 503 else: err_msg = f"Unknown render client: '{client}'" logger.error(err_msg) remove_job_dir() return err_msg, 400 except Exception as e: logger.exception(f"Unknown error adding job: {e}") remove_job_dir() return 'unknown error', 500 @app.get('/cancel_job') def cancel_job(): job_id = request.args.get('id', None) confirm = request.args.get('confirm', False) if not job_id: return 'job id not found', 400 elif not confirm: return 'confirmation required', 400 else: found = [x for x in RenderQueue.job_queue if x.id == job_id] if len(found) > 1: # logger.error('Multiple jobs found for ID {}'.format(job_id)) return f'multiple jobs found for ID {job_id}', 400 elif found: success = RenderQueue.cancel_job(found[0]) return success return 'job not found', 400 @app.get('/clear_history') def clear_history(): RenderQueue.clear_history() return 'success' @app.route('/status') def status(): return RenderQueue.status() @app.get('/renderer_info') def renderer_info(): renderer_data = {} for r in RenderWorkerFactory.supported_renderers(): renderer_class = RenderWorkerFactory.class_for_name(r) renderer_data[r] = {'version': renderer_class.version(), 'supported_extensions': renderer_class.supported_extensions, 'supported_export_formats': renderer_class.supported_export_formats} return renderer_data @app.route('/') def default(): return "Server running" @app.route('/upload') def upload_file_page(): return render_template('upload.html', render_clients=RenderQueue.render_clients, supported_renderers=RenderWorkerFactory.supported_renderers()) def post_job_to_server(input_path, job_json, client, server_port=8080): # Pack job data and submit to server job_files = {'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'), 'json': (None, json.dumps(job_json), 'application/json')} req = requests.post(f'http://{client}:{server_port}/add_job', files=job_files) return req def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: RenderQueue.evaluate_queue() time.sleep(delay_sec) with open('config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=config.get('server_log_level', 'INFO').upper()) app.config['UPLOAD_FOLDER'] = config['upload_folder'] app.config['MAX_CONTENT_PATH'] = config['max_content_path'] # app.config['RESULT_STATIC_PATH'] = 'static' # Get hostname and render clients RenderQueue.host_name = socket.gethostname() app.config['HOSTNAME'] = RenderQueue.host_name if not RenderQueue.render_clients: RenderQueue.render_clients = [RenderQueue.host_name] # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Setup the RenderManager object RenderQueue.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() if background_thread: server_thread = threading.Thread( target=lambda: app.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: app.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False), use_reloader=False) if __name__ == '__main__': start_server()