#!/usr/bin/env python3 import json import logging import os import socket import threading import time from datetime import datetime import requests import yaml from flask import Flask, jsonify, request, render_template from werkzeug.utils import secure_filename from lib.render_job import RenderJob from lib.render_manager import RenderManager from utilities.render_worker import RenderWorkerFactory, string_to_status logger = logging.getLogger() app = Flask(__name__) @app.get('/jobs') def jobs_json(): return [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] @app.get('/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [json.loads(x.json()) for x in RenderManager.jobs_with_status(state)] if jobs: return jobs else: return {'error', f'Cannot find jobs with status {status_val}'}, 400 @app.get('/job_status/') def get_job_status(job_id): found_job = RenderManager.job_with_id(job_id) if found_job: logger.info("Founbd jobs") return found_job.json() else: return {'error': f'Cannot find job with ID {job_id}'}, 400 @app.post('/register_client') def register_client(): client_hostname = request.values['hostname'] x = RenderManager.register_client(client_hostname) return "Success" if x else "Fail" @app.post('/unregister_client') def unregister_client(): client_hostname = request.values['hostname'] x = RenderManager.unregister_client(client_hostname) return "Success" if x else "Fail" @app.get('/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: for client_hostname in RenderManager.render_clients: is_online = False if client_hostname == RenderManager.host_name: snapshot_results = snapshot() is_online = True else: snapshot_results = {} try: snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) snapshot_results = snapshot_request.json() is_online = snapshot_request.ok except requests.ConnectionError as e: pass server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': is_online} full_results['servers'][client_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @app.get('/snapshot') def snapshot(): server_status = RenderManager.status() server_jobs = [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @app.post('/add_job') def add_job(): try: """Create new job and add to server render queue""" if request.is_json: request_params = request.get_json() else: # handle request values from HTML pages - may deprecate if HTML is not used or can pass JSON from HTML request_params = request.values renderer = request_params.get("renderer", None) input_path = request_params.get("input", None) output_path = request_params.get("output", "test-output.mp4") priority = int(request_params.get('priority', 2)) args = request_params.get('args', {}) client = request_params.get('client', RenderManager.host_name) force_start = request_params.get('force_start', False) uploaded_file = request.files.get('file', None) html_origin = request_params.get('origin', None) == 'html' custom_id = None if None in [renderer, input_path or (uploaded_file and uploaded_file.filename), output_path]: err_msg = 'Cannot add job: Missing required parameters' logger.error(err_msg) return {'error': err_msg} # cleanup args from html form and convert them into an args dict for key, val in request_params.items(): if key.startswith(renderer): cleaned_key = key.split('+')[-1] args[cleaned_key] = val # handle uploaded files if uploaded_file and uploaded_file.filename: logger.info(f"Receiving uploaded file {uploaded_file.filename}") new_id = RenderJob.generate_id() job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) if not os.path.exists(job_dir): os.makedirs(job_dir) local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) uploaded_file.save(local_path) input_path = local_path # todo: finish output_path - currently placeholder data output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4") # local renders if client == RenderManager.host_name: logger.info(f"Creating job locally - {input_path}") try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) except ValueError as e: logger.exception(e) return {'error': str(e)}, 400 new_job = RenderJob(render_job, priority=priority, custom_id=custom_id) RenderManager.add_to_render_queue(new_job, force_start=force_start) return new_job.json() # client renders elif client in RenderManager.render_clients: # see if host is available if RenderManager.is_client_available(client): if args and renderer == 'blender' and args.get('pack_files', False): from utilities.blender_worker import pack_blender_files new_path = pack_blender_files(path=input_path) if new_path: logger.info(f'Packed Blender file successfully: {new_path}') input_path = new_path else: err_msg = f'Failed to pack Blender file: {input_path}' logger.error(err_msg) return {'error': err_msg}, 400 # call uploader on remote client try: job_files = {'file': open(input_path, 'rb')} job_data = request.json job_data['input'] = input_path logger.info(f"Uploading file {input_path} to client {client}") response = requests.post(f"http://{client}:8080/add_job", files=job_files, data=job_data) if response.ok: logger.info("Job submitted successfully!") return response.json() if response.json() else "Job ok" else: return {'error', 'Job rejected by client'}, 400 except requests.ConnectionError as e: err_msg = f"Error submitting job to client: {client}" logger.error(err_msg) return {'error', err_msg}, 400 else: # client is not available err_msg = f"Render client '{client}' is unreachable" logger.error(err_msg) return {'error', err_msg}, 400 else: err_msg = f"Unknown render client: '{client}'" logger.error(err_msg) return {'error', err_msg}, 400 except Exception as e: logger.exception(f"Unknown error adding job: {e}") return {'error', 'cannot add job'} @app.get('/cancel_job') def cancel_job(): job_id = request.args.get('id', None) confirm = request.args.get('confirm', False) if not job_id: return {'error': 'job id not found'}, 400 elif not confirm: return {'error': 'confirmation required'}, 400 else: found = [x for x in RenderManager.render_queue if x.id == job_id] if len(found) > 1: # logger.error('Multiple jobs found for ID {}'.format(job_id)) return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)}) elif found: success = RenderManager.cancel_job(found[0]) return jsonify({'result': success}) return {'error': 'job not found'}, 400 @app.get('/clear_history') def clear_history(): RenderManager.clear_history() return {'result': True} @app.route('/status') def status(): return RenderManager.status() @app.route('/') def default(): return "Server running" @app.route('/upload') def upload_file_page(): return render_template('upload.html', render_clients=RenderManager.render_clients, supported_renderers=RenderWorkerFactory.supported_renderers()) def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: RenderManager.evaluate_queue() time.sleep(delay_sec) with open('config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=config.get('server_log_level', 'INFO').upper()) app.config['UPLOAD_FOLDER'] = config['upload_folder'] app.config['MAX_CONTENT_PATH'] = config['max_content_path'] # app.config['RESULT_STATIC_PATH'] = 'static' # Get hostname and render clients RenderManager.host_name = socket.gethostname() app.config['HOSTNAME'] = RenderManager.host_name if not RenderManager.render_clients: RenderManager.render_clients = [RenderManager.host_name] # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Setup the RenderManager object RenderManager.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() if background_thread: server_thread = threading.Thread( target=lambda: app.run(host='0.0.0.0', port=RenderManager.port, debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: app.run(host='0.0.0.0', port=RenderManager.port, debug=config.get('flask_debug_enable', False), use_reloader=False) if __name__ == '__main__': start_server()