From fd6af10d56b8c324d9e7701c11282c3d67daf401 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 12 Oct 2022 13:44:41 -0700 Subject: [PATCH] Refactor and reorganize code. Split server into manager and server files. --- dashboard.py | 10 +- lib/__init__.py | 0 lib/render_job.py | 70 +++++ lib/render_manager.py | 255 ++++++++++++++++++ server.py | 293 +++++++++++++++++++++ templates/upload.html | 37 +-- utilities/__init__.py | 0 zordon_server.py | 596 ------------------------------------------ 8 files changed, 630 insertions(+), 631 deletions(-) create mode 100644 lib/__init__.py create mode 100644 lib/render_job.py create mode 100755 lib/render_manager.py create mode 100755 server.py create mode 100644 utilities/__init__.py delete mode 100755 zordon_server.py diff --git a/dashboard.py b/dashboard.py index e106b63..33354bb 100755 --- a/dashboard.py +++ b/dashboard.py @@ -17,8 +17,8 @@ from rich.table import Table from rich.text import Text from rich.tree import Tree -import zordon_server -from zordon_server import RenderStatus, string_to_status +from utilities.render_worker import RenderStatus, string_to_status +from zordon_server import start_server """ The RenderDashboard is designed to be run on a remote machine or on the local server @@ -235,10 +235,10 @@ if __name__ == '__main__': if not client.connect(): if client.server_ip == local_hostname: - start_server = input("Local server not running. Start server? (y/n) ") - if start_server and start_server[0].lower() == "y": + start_server_input = input("Local server not running. Start server? (y/n) ") + if start_server_input and start_server_input[0].lower() == "y": # Startup the local server - zordon_server.RenderServer.start(background_thread=True) + start_server(background_thread=True) test = client.connect() print(f"connected? {test}") else: diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/render_job.py b/lib/render_job.py new file mode 100644 index 0000000..91474a2 --- /dev/null +++ b/lib/render_job.py @@ -0,0 +1,70 @@ +import json +import logging +import os +import uuid +from datetime import datetime + +from utilities.render_worker import RenderStatus + +logger = logging.getLogger() + + +class RenderJob: + + def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None): + self.id = custom_id or self.generate_id() + self.owner = owner + self.render = render + self.priority = priority + self.client = client + self.notify = notify + self.date_created = datetime.now() + self.scheduled_start = None + self.renderer = render.renderer + self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat() + self.archived = False + + def render_status(self): + """Returns status of render job""" + try: + if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED: + return RenderStatus.SCHEDULED + else: + return self.render.status + except Exception as e: + logger.warning("render_status error: {}".format(e)) + return RenderStatus.ERROR + + def json(self): + """Converts RenderJob into JSON format""" + import numbers + + def date_serializer(o): + if isinstance(o, datetime): + return o.isoformat() + + json_string = '' + + try: + d = self.__dict__.copy() + d['status'] = self.render_status().value + d['render'] = self.render.__dict__.copy() + for key in ['thread', 'process']: # remove unwanted keys from JSON + d['render'].pop(key, None) + d['render']['status'] = d['status'] + + # jobs from current_session generate percent completed + # jobs after loading server pull in a saved value. Have to check if callable object or not + + percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \ + else self.render.percent_complete() + d['render']['percent_complete'] = percent_complete + + json_string = json.dumps(d, default=date_serializer) + except Exception as e: + logger.error("Error converting to JSON: {}".format(e)) + return json_string + + @classmethod + def generate_id(cls): + return str(uuid.uuid4()).split('-')[0] \ No newline at end of file diff --git a/lib/render_manager.py b/lib/render_manager.py new file mode 100755 index 0000000..efe7dd7 --- /dev/null +++ b/lib/render_manager.py @@ -0,0 +1,255 @@ +import json +import logging +import os +import platform +from datetime import datetime + +import psutil +import requests + +from lib.render_job import RenderJob +from utilities.render_worker import RenderWorkerFactory, RenderStatus + +logger = logging.getLogger() + +JSON_FILE = 'server_state.json' +#todo: move history to sqlite db + + +class RenderManager: + render_queue = [] + render_clients = [] + maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} + host_name = None + port = 8080 + client_mode = False + server_hostname = None + + last_saved_counts = {} + + def __init__(self): + pass + + @classmethod + def add_to_render_queue(cls, render_job, force_start=False, client=None): + + if not client or render_job.client == cls.host_name: + logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) + render_job.client = cls.host_name + cls.render_queue.append(render_job) + if force_start: + cls.start_job(render_job) + else: + cls.evaluate_queue() + else: + # todo: implement client rendering + logger.warning('remote client rendering not implemented yet') + + @classmethod + def running_jobs(cls): + return cls.jobs_with_status(RenderStatus.RUNNING) + + @classmethod + def pending_jobs(cls): + pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) + pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) + return pending_jobs + + @classmethod + def jobs_with_status(cls, status, priority_sorted=False, include_archived=True): + found_jobs = [x for x in cls.render_queue if x.render_status() == status] + if not include_archived: + found_jobs = [x for x in found_jobs if not x.archived] + if priority_sorted: + found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) + return found_jobs + + @classmethod + def job_with_id(cls, job_id): + found_job = next((x for x in cls.render_queue if x.id == job_id), None) + return found_job + + @classmethod + def clear_history(cls): + to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] + for x in to_remove: + x.archived = True + cls.save_state() + + @classmethod + def load_state(cls, json_path=None): + """Load state history from JSON file""" + + input_path = json_path or JSON_FILE + if os.path.exists(input_path): + with open(input_path) as f: + + # load saved data + saved_state = json.load(f) + cls.render_clients = saved_state.get('clients', {}) + + for job in saved_state.get('jobs', []): + + # Identify renderer type and recreate Renderer object + job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output']) + + # Load Renderer values + for key, val in job['render'].items(): + if val and key in ['start_time', 'end_time']: # convert date strings back into date objects + job_render_object.__dict__[key] = datetime.fromisoformat(val) + else: + job_render_object.__dict__[key] = val + + job_render_object.status = RenderStatus[job['status'].upper()] + job.pop('render', None) + + # Create RenderJob with re-created Renderer object + new_job = RenderJob(job_render_object, job['priority'], job['client']) + for key, val in job.items(): + if key in ['date_created']: # convert date strings back to datetime objects + new_job.__dict__[key] = datetime.fromisoformat(val) + else: + new_job.__dict__[key] = val + new_job.__delattr__('status') + + # Handle older loaded jobs that were cancelled before closing + if new_job.render_status() == RenderStatus.RUNNING: + new_job.render.status = RenderStatus.CANCELLED + + # finally add back to render queue + cls.render_queue.append(new_job) + + cls.last_saved_counts = cls.job_counts() + + @classmethod + def save_state(cls, json_path=None): + """Save state history to JSON file""" + try: + logger.debug("Saving Render History") + output = {'timestamp': datetime.now().isoformat(), + 'jobs': [json.loads(j.json()) for j in cls.render_queue], + 'clients': cls.render_clients} + output_path = json_path or JSON_FILE + with open(output_path, 'w') as f: + json.dump(output, f, indent=4) + cls.last_saved_counts = cls.job_counts() + except Exception as e: + logger.error("Error saving state JSON: {}".format(e)) + + @classmethod + def evaluate_queue(cls): + + instances = cls.renderer_instances() + + not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) + if not_started: + for job in not_started: + renderer = job.render.renderer + higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] + max_renderers = renderer in instances.keys() and instances[ + renderer] >= cls.maximum_renderer_instances.get(renderer, 1) + + if not max_renderers and not higher_priority_jobs: + cls.start_job(job) + + scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) + for job in scheduled: + if job.scheduled_start <= datetime.now(): + cls.start_job(job) + + if cls.last_saved_counts != cls.job_counts(): + cls.save_state() + + @classmethod + def start_job(cls, job): + logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, + job.priority)) + job.render.start() + + @classmethod + def cancel_job(cls, job): + logger.info('Cancelling job ID: {}'.format(job.id)) + if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]: + job.render.stop() + job.render.status = RenderStatus.CANCELLED + return True + return False + + @classmethod + def renderer_instances(cls): + from collections import Counter + all_instances = [x.render.renderer for x in cls.running_jobs()] + return Counter(all_instances) + + @classmethod + def job_counts(cls): + job_counts = {} + for job_status in RenderStatus: + job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) + + return job_counts + + @classmethod + def status(cls): + + stats = {"timestamp": datetime.now().isoformat(), + "platform": platform.platform(), + "cpu_percent": psutil.cpu_percent(percpu=False), + "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), + "cpu_count": psutil.cpu_count(), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "memory_percent": psutil.virtual_memory().percent, + "job_counts": cls.job_counts(), + "host_name": cls.host_name + } + + return stats + + @classmethod + def all_jobs(cls): + all_jobs = [x for x in cls.render_queue if not x.archived] + return all_jobs + + @classmethod + def register_client(cls, hostname): + + #todo: check to make sure not adding ourselves + + success = False + + if hostname in cls.render_clients: + logger.warning(f"Client '{hostname}' already registered") + return success + + try: + response = requests.get(f"http://{hostname}:8080/status", timeout=3) + if response.ok: + cls.render_clients.append(hostname) + logger.info(f"Client '{hostname}' successfully registered") + success = True + cls.save_state() + except requests.ConnectionError as e: + logger.error(f"Cannot connect to client at hostname: {hostname}") + return success + + @classmethod + def unregister_client(cls, hostname): + success = False + if hostname in cls.render_clients and hostname != cls.host_name: + cls.render_clients.remove(hostname) + logger.info(f"Client '{hostname}' successfully unregistered") + success = True + return success + + @staticmethod + def is_client_available(client_hostname, timeout=3): + try: + response = requests.get(f"http://{client_hostname}:8080/status", timeout=timeout) + if response.ok: + return True + except requests.ConnectionError as e: + pass + return False + diff --git a/server.py b/server.py new file mode 100755 index 0000000..89994c6 --- /dev/null +++ b/server.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 + +import json +import logging +import os +import socket +import threading +import time +from datetime import datetime + +import requests +import yaml +from flask import Flask, jsonify, request, render_template +from werkzeug.utils import secure_filename + +from lib.render_job import RenderJob +from lib.render_manager import RenderManager +from utilities.render_worker import RenderWorkerFactory, string_to_status + +logger = logging.getLogger() +app = Flask(__name__) + + +@app.get('/jobs') +def jobs_json(): + return [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] + + +@app.get('/jobs/') +def filtered_jobs_json(status_val): + state = string_to_status(status_val) + jobs = [json.loads(x.json()) for x in RenderManager.jobs_with_status(state)] + if jobs: + return jobs + else: + return {'error', f'Cannot find jobs with status {status_val}'}, 400 + + +@app.get('/job_status/') +def get_job_status(job_id): + found_job = RenderManager.job_with_id(job_id) + if found_job: + logger.info("Founbd jobs") + return found_job.json() + else: + return {'error': f'Cannot find job with ID {job_id}'}, 400 + + +@app.post('/register_client') +def register_client(): + client_hostname = request.values['hostname'] + x = RenderManager.register_client(client_hostname) + return "Success" if x else "Fail" + + +@app.post('/unregister_client') +def unregister_client(): + client_hostname = request.values['hostname'] + x = RenderManager.unregister_client(client_hostname) + return "Success" if x else "Fail" + + +@app.get('/full_status') +def full_status(): + full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} + + try: + for client_hostname in RenderManager.render_clients: + is_online = False + if client_hostname == RenderManager.host_name: + snapshot_results = snapshot() + is_online = True + else: + snapshot_results = {} + try: + snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) + snapshot_results = snapshot_request.json() + is_online = snapshot_request.ok + except requests.ConnectionError as e: + pass + server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), + 'is_online': is_online} + full_results['servers'][client_hostname] = server_data + except Exception as e: + logger.error(f"Exception fetching full status: {e}") + + return full_results + + +@app.get('/snapshot') +def snapshot(): + server_status = RenderManager.status() + server_jobs = [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] + server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} + return server_data + + +@app.post('/add_job') +def add_job(): + """Create new job and add to server render queue""" + renderer = request.json.get("renderer", None) + input_path = request.json.get("input", None) + output_path = request.json.get("output", None) + priority = request.json.get('priority', 2) + args = request.json.get('args', None) + client = request.json.get('client', RenderManager.host_name) + force_start = request.json.get('force_start', False) + + if None in [renderer, input_path, output_path]: + return {'error': 'missing required parameters'}, 400 + + if client == RenderManager.host_name: + logger.info(f"Creating job locally - {input_path}") + # Local Renders + try: + render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) + except ValueError as e: + logger.exception(e) + return {'error': str(e)}, 400 + + new_job = RenderJob(render_job, priority=priority) + RenderManager.add_to_render_queue(new_job, force_start=force_start) + + return new_job.json() + + elif client in RenderManager.render_clients: + + # see if host is available + if RenderManager.is_client_available(client): + + if args and renderer == 'blender' and args.get('pack_files', False): + from utilities.blender_worker import pack_blender_files + new_path = pack_blender_files(path=input_path) + if new_path: + logger.info(f'Packed Blender file successfully: {new_path}') + input_path = new_path + else: + err_msg = f'Failed to pack Blender file: {input_path}' + logger.error(err_msg) + return {'error': err_msg}, 400 + + # call uploader on remote client + try: + job_files = {'file': open(input_path, 'rb')} + job_data = request.json + job_data['input'] = input_path + logger.info(f"Uploading file {input_path} to client {client}") + response = requests.post(f"http://{client}:8080/uploader", files=job_files, data=job_data) + if response.ok: + logger.info("Job submitted successfully!") + return response.json() if response.json() else "Job ok" + else: + return {'error', 'Job rejected by client'}, 400 + except requests.ConnectionError as e: + err_msg = f"Error submitting job to client: {client}" + logger.error(err_msg) + return {'error', err_msg}, 400 + else: + # client is not available + err_msg = f"Render client '{client}' is unreachable" + logger.error(err_msg) + return {'error', err_msg}, 400 + + else: + err_msg = f"Unknown render client: '{client}'" + logger.error(err_msg) + return {'error', err_msg}, 400 + + +@app.get('/cancel_job') +def cancel_job(): + job_id = request.args.get('id', None) + confirm = request.args.get('confirm', False) + if not job_id: + return {'error': 'job id not found'}, 400 + elif not confirm: + return {'error': 'confirmation required'}, 400 + else: + found = [x for x in RenderManager.render_queue if x.id == job_id] + if len(found) > 1: + # logger.error('Multiple jobs found for ID {}'.format(job_id)) + return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)}) + elif found: + success = RenderManager.cancel_job(found[0]) + return jsonify({'result': success}) + return {'error': 'job not found'}, 400 + + +@app.get('/clear_history') +def clear_history(): + RenderManager.clear_history() + return {'result': True} + + +@app.route('/status') +def status(): + return RenderManager.status() + + +@app.route('/') +def default(): + return "Server running" + + +@app.route('/upload') +def upload_file_page(): + return render_template('upload.html') + + +@app.route('/uploader', methods=['POST']) +def upload_file(): + if request.method == 'POST': + try: + + uploaded_file = request.files['file'] + if not uploaded_file.filename: + return {'error': 'no file uploaded'} + + # generate directory to use + logger.debug(f"Receiving uploaded file {uploaded_file.filename}") + new_id = RenderJob.generate_id() + job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) + if not os.path.exists(job_dir): + os.makedirs(job_dir) + + local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) + uploaded_file.save(local_path) + renderer = request.values['renderer'] + + # todo: finish output_path - currently placeholder data + output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4") + + try: + render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path, + args=request.values.get('args', None)) + new_job = RenderJob(render_job, custom_id=new_id) + RenderManager.add_to_render_queue(new_job) + return new_job.json() + except ValueError as e: + logger.exception(e) + return {'error': str(e)}, 400 + + except Exception as e: + logger.exception(e) + + return {'error': 'unknown error'} + + +def start_server(background_thread=False): + + def eval_loop(delay_sec=1): + while True: + RenderManager.evaluate_queue() + time.sleep(delay_sec) + + logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', + level=logging.INFO) + + with open('config.yaml') as f: + config = yaml.load(f, Loader=yaml.FullLoader) + + app.config['UPLOAD_FOLDER'] = config['upload_folder'] + app.config['MAX_CONTENT_PATH'] = config['max_content_path'] + # app.config['RESULT_STATIC_PATH'] = 'static' + + # Get hostname and render clients + RenderManager.host_name = socket.gethostname() + app.config['HOSTNAME'] = RenderManager.host_name + if not RenderManager.render_clients: + RenderManager.render_clients = [RenderManager.host_name] + + # disable most Flask logging + flask_log = logging.getLogger('werkzeug') + flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) + + # Setup the RenderManager object + RenderManager.load_state() + + thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) + thread.start() + + if background_thread: + server_thread = threading.Thread( + target=lambda: app.run(host='0.0.0.0', port=RenderManager.port, debug=False, use_reloader=False)) + server_thread.start() + server_thread.join() + else: + app.run(host='0.0.0.0', port=RenderManager.port, debug=config.get('flask_debug_enable', False), + use_reloader=False) + + +if __name__ == '__main__': + start_server() \ No newline at end of file diff --git a/templates/upload.html b/templates/upload.html index 52f2b57..edea94a 100644 --- a/templates/upload.html +++ b/templates/upload.html @@ -1,32 +1,9 @@ - - -

Upload a file

- -
-
-
-
-
- - -
-
-
- Blender Engine: - - - - -
-
- - -
- + +
+ + +
+ \ No newline at end of file diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zordon_server.py b/zordon_server.py deleted file mode 100755 index 24351bb..0000000 --- a/zordon_server.py +++ /dev/null @@ -1,596 +0,0 @@ -#!/usr/bin/env python -import argparse -import json -import logging -import os -import platform -import socket -import threading -import time -import uuid -from datetime import datetime - -import psutil -import requests -import yaml -from flask import Flask, jsonify, request, render_template -from werkzeug.utils import secure_filename - -from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status - -app = Flask(__name__) - -logger = logging.getLogger() -local_hostname = socket.gethostname() - -JSON_FILE = 'server_state.json' -#todo: move history to sqlite db - - -class RenderJob: - - def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None): - self.id = custom_id or self.generate_id() - self.owner = owner - self.render = render - self.priority = priority - self.client = client - self.notify = notify - self.date_created = datetime.now() - self.scheduled_start = None - self.renderer = render.renderer - self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat() - self.archived = False - - def render_status(self): - """Returns status of render job""" - try: - if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED: - return RenderStatus.SCHEDULED - else: - return self.render.status - except Exception as e: - logger.warning("render_status error: {}".format(e)) - return RenderStatus.ERROR - - def json(self): - """Converts RenderJob into JSON format""" - import numbers - - def date_serializer(o): - if isinstance(o, datetime): - return o.isoformat() - - json_string = '' - - try: - d = self.__dict__.copy() - d['status'] = self.render_status().value - d['render'] = self.render.__dict__.copy() - for key in ['thread', 'process']: # remove unwanted keys from JSON - d['render'].pop(key, None) - d['render']['status'] = d['status'] - - # jobs from current_session generate percent completed - # jobs after loading server pull in a saved value. Have to check if callable object or not - - percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \ - else self.render.percent_complete() - d['render']['percent_complete'] = percent_complete - - json_string = json.dumps(d, default=date_serializer) - except Exception as e: - logger.error("Error converting to JSON: {}".format(e)) - return json_string - - @classmethod - def generate_id(cls): - return str(uuid.uuid4()).split('-')[0] - - -class RenderServer: - render_queue = [] - render_clients = [] - maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} - host_name = None - port = 8080 - client_mode = False - server_hostname = None - - last_saved_counts = {} - - def __init__(self): - pass - - @classmethod - def add_to_render_queue(cls, render_job, force_start=False, client=None): - - if not client or render_job.client == cls.host_name: - logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) - render_job.client = cls.host_name - cls.render_queue.append(render_job) - if force_start: - cls.start_job(render_job) - else: - cls.evaluate_queue() - else: - # todo: implement client rendering - logger.warning('remote client rendering not implemented yet') - - @classmethod - def running_jobs(cls): - return cls.jobs_with_status(RenderStatus.RUNNING) - - @classmethod - def pending_jobs(cls): - pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) - pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) - return pending_jobs - - @classmethod - def jobs_with_status(cls, status, priority_sorted=False, include_archived=True): - found_jobs = [x for x in cls.render_queue if x.render_status() == status] - if not include_archived: - found_jobs = [x for x in found_jobs if not x.archived] - if priority_sorted: - found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) - return found_jobs - - @classmethod - def job_with_id(cls, job_id): - found_job = next((x for x in cls.render_queue if x.id == job_id), None) - return found_job - - @classmethod - def clear_history(cls): - to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] - for x in to_remove: - x.archived = True - cls.save_state() - - @classmethod - def load_state(cls, json_path=None): - """Load state history from JSON file""" - - input_path = json_path or JSON_FILE - if os.path.exists(input_path): - with open(input_path) as f: - - # load saved data - saved_state = json.load(f) - cls.render_clients = saved_state.get('clients', {}) - - for job in saved_state.get('jobs', []): - - # Identify renderer type and recreate Renderer object - job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output']) - - # Load Renderer values - for key, val in job['render'].items(): - if val and key in ['start_time', 'end_time']: # convert date strings back into date objects - job_render_object.__dict__[key] = datetime.fromisoformat(val) - else: - job_render_object.__dict__[key] = val - - job_render_object.status = RenderStatus[job['status'].upper()] - job.pop('render', None) - - # Create RenderJob with re-created Renderer object - new_job = RenderJob(job_render_object, job['priority'], job['client']) - for key, val in job.items(): - if key in ['date_created']: # convert date strings back to datetime objects - new_job.__dict__[key] = datetime.fromisoformat(val) - else: - new_job.__dict__[key] = val - new_job.__delattr__('status') - - # Handle older loaded jobs that were cancelled before closing - if new_job.render_status() == RenderStatus.RUNNING: - new_job.render.status = RenderStatus.CANCELLED - - # finally add back to render queue - cls.render_queue.append(new_job) - - cls.last_saved_counts = cls.job_counts() - - @classmethod - def save_state(cls, json_path=None): - """Save state history to JSON file""" - try: - logger.debug("Saving Render History") - output = {'timestamp': datetime.now().isoformat(), - 'jobs': [json.loads(j.json()) for j in cls.render_queue], - 'clients': cls.render_clients} - output_path = json_path or JSON_FILE - with open(output_path, 'w') as f: - json.dump(output, f, indent=4) - cls.last_saved_counts = cls.job_counts() - except Exception as e: - logger.error("Error saving state JSON: {}".format(e)) - - @classmethod - def evaluate_queue(cls): - - instances = cls.renderer_instances() - - not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) - if not_started: - for job in not_started: - renderer = job.render.renderer - higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] - max_renderers = renderer in instances.keys() and instances[ - renderer] >= cls.maximum_renderer_instances.get(renderer, 1) - - if not max_renderers and not higher_priority_jobs: - cls.start_job(job) - - scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) - for job in scheduled: - if job.scheduled_start <= datetime.now(): - cls.start_job(job) - - if cls.last_saved_counts != cls.job_counts(): - cls.save_state() - - @classmethod - def start_job(cls, job): - logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, - job.priority)) - job.render.start() - - @classmethod - def cancel_job(cls, job): - logger.info('Cancelling job ID: {}'.format(job.id)) - if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]: - job.render.stop() - job.render.status = RenderStatus.CANCELLED - return True - return False - - @classmethod - def renderer_instances(cls): - from collections import Counter - all_instances = [x.render.renderer for x in cls.running_jobs()] - return Counter(all_instances) - - @classmethod - def job_counts(cls): - job_counts = {} - for job_status in RenderStatus: - job_counts[job_status.value] = len(RenderServer.jobs_with_status(job_status)) - - return job_counts - - @classmethod - def status(cls): - - stats = {"timestamp": datetime.now().isoformat(), - "platform": platform.platform(), - "cpu_percent": psutil.cpu_percent(percpu=False), - "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(), - "memory_total": psutil.virtual_memory().total, - "memory_available": psutil.virtual_memory().available, - "memory_percent": psutil.virtual_memory().percent, - "job_counts": RenderServer.job_counts(), - "host_name": RenderServer.host_name - } - - return stats - - @classmethod - def all_jobs(cls): - all_jobs = [x for x in RenderServer.render_queue if not x.archived] - return all_jobs - - @classmethod - def register_client(cls, hostname): - - success = False - - if hostname in cls.render_clients: - logger.warning(f"Client '{hostname}' already registered") - return success - - try: - response = requests.get(f"http://{hostname}:8080/status", timeout=3) - if response.ok: - cls.render_clients.append(hostname) - logger.info(f"Client '{hostname}' successfully registered") - success = True - cls.save_state() - except requests.ConnectionError as e: - logger.error(f"Cannot connect to client at hostname: {hostname}") - return success - - @classmethod - def unregister_client(cls, hostname): - success = False - if hostname in cls.render_clients and hostname != cls.host_name: - cls.render_clients.remove(hostname) - logger.info(f"Client '{hostname}' successfully unregistered") - success = True - return success - - @staticmethod - def is_client_available(client_hostname, timeout=3): - try: - response = requests.get(f"http://{client_hostname}:8080/status", timeout=timeout) - if response.ok: - return True - except requests.ConnectionError as e: - pass - return False - - @classmethod - def start(cls, background_thread=False): - - def eval_loop(delay_sec=1): - while True: - cls.evaluate_queue() - time.sleep(delay_sec) - - logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', - level=logging.INFO) - - with open('config.yaml') as f: - config = yaml.load(f, Loader=yaml.FullLoader) - - app.config['UPLOAD_FOLDER'] = config['upload_folder'] - app.config['MAX_CONTENT_PATH'] = config['max_content_path'] - app.config['RESULT_STATIC_PATH'] = 'static/' - - # Get hostname and render clients - cls.host_name = socket.gethostname() - if not RenderServer.render_clients: - cls.render_clients = [RenderServer.host_name] - - if not cls.client_mode: - logger.info(f"Starting Zordon in Server Mode - {cls.host_name}") - else: - logger.info(f"Starting Zordon in Client Mode - {cls.host_name}") - - # disable most Flask logging - flask_log = logging.getLogger('werkzeug') - flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) - - # Setup the RenderServer object - cls.load_state() - - thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) - thread.start() - - if background_thread: - server_thread = threading.Thread( - target=lambda: app.run(host='0.0.0.0', port=cls.port, debug=False, use_reloader=False)) - server_thread.start() - server_thread.join() - else: - app.run(host='0.0.0.0', port=cls.port, debug=config.get('flask_debug_enable', False), - use_reloader=False) - - -@app.get('/jobs') -def jobs_json(): - return [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived] - - -@app.get('/jobs/') -def filtered_jobs_json(status_val): - state = string_to_status(status_val) - jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)] - return jobs - - -@app.get('/job_status/') -def get_job_status(job_id): - found_job = RenderServer.job_with_id(job_id) - if found_job: - return found_job.json() - else: - return None - - -@app.post('/register_client') -def register_client(): - client_hostname = request.values['hostname'] - x = RenderServer.register_client(client_hostname) - return "Success" if x else "Fail" - - -@app.post('/unregister_client') -def unregister_client(): - client_hostname = request.values['hostname'] - x = RenderServer.unregister_client(client_hostname) - return "Success" if x else "Fail" - - -@app.get('/full_status') -def full_status(): - full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} - - try: - for client_hostname in RenderServer.render_clients: - is_online = False - if client_hostname == local_hostname: - snapshot_results = snapshot() - is_online = True - else: - snapshot_results = {} - try: - snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1) - snapshot_results = snapshot_request.json() - is_online = snapshot_request.ok - except requests.ConnectionError as e: - pass - server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), - 'is_online': is_online} - full_results['servers'][client_hostname] = server_data - except Exception as e: - logger.error(f"Exception fetching full status: {e}") - - return full_results - - -@app.get('/snapshot') -def snapshot(): - server_status = RenderServer.status() - server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived] - server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} - return server_data - - -@app.post('/add_job') -def add_job(): - """Create new job and add to server render queue""" - renderer = request.json.get("renderer", None) - input_path = request.json.get("input", None) - output_path = request.json.get("output", None) - priority = request.json.get('priority', 2) - args = request.json.get('args', None) - client = request.json.get('client', RenderServer.host_name) - force_start = request.json.get('force_start', False) - - if None in [renderer, input_path, output_path]: - return {'error': 'missing required parameters'}, 400 - - if client == RenderServer.host_name: - # Local Renders - try: - render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) - except ValueError as e: - logger.exception(e) - return {'error': str(e)}, 400 - - new_job = RenderJob(render_job, priority=priority) - RenderServer.add_to_render_queue(new_job, force_start=force_start) - - return new_job.json() - - elif client in RenderServer.render_clients: - - # see if host is available - if RenderServer.is_client_available(client): - - if args and renderer == 'blender' and args.get('pack_files', False): - from utilities.blender_worker import pack_blender_files - new_path = pack_blender_files(path=input_path) - if new_path: - logger.info(f'Packed Blender file successfully: {new_path}') - input_path = new_path - else: - err_msg = f'Failed to pack Blender file: {input_path}' - logger.error(err_msg) - return {'error': err_msg}, 400 - - # call uploader on remote client - try: - job_files = {'file': open(input_path, 'rb')} - job_data = request.json - job_data['input'] = input_path - response = requests.post(f"http://{client}:8080/uploader", files=job_files, data=job_data) - if response.ok: - logger.info("Job submitted successfully!") - return response.json() if response.json() else "Job ok" - else: - return {'error', 'Job rejected by client'}, 400 - except requests.ConnectionError as e: - err_msg = f"Error submitting job to client: {client}" - logger.error(err_msg) - return {'error', err_msg}, 400 - else: - # client is not available - err_msg = f"Render client '{client}' is unreachable" - logger.error(err_msg) - return {'error', err_msg}, 400 - - else: - err_msg = f"Unknown render client: '{client}'" - logger.error(err_msg) - return {'error', err_msg}, 400 - - -@app.get('/cancel_job') -def cancel_job(): - job_id = request.args.get('id', None) - confirm = request.args.get('confirm', False) - if not job_id: - return {'error': 'job id not found'}, 400 - elif not confirm: - return {'error': 'confirmation required'}, 400 - else: - found = [x for x in RenderServer.render_queue if x.id == job_id] - if len(found) > 1: - # logger.error('Multiple jobs found for ID {}'.format(job_id)) - return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)}) - elif found: - success = RenderServer.cancel_job(found[0]) - return jsonify({'result': success}) - return {'error': 'job not found'}, 400 - - -@app.get('/clear_history') -def clear_history(): - RenderServer.clear_history() - return {'result': True} - - -@app.route('/status') -def status(): - return RenderServer.status() - - -@app.route('/') -def default(): - return "Server running" - - -@app.route('/upload') -def upload_file_page(): - return render_template('upload.html') - - -@app.route('/uploader', methods=['POST']) -def upload_file(): - - if request.method == 'POST': - try: - - uploaded_file = request.files['file'] - if not uploaded_file.filename: - return {'error': 'no file uploaded'} - - # generate directory to use - logger.debug(f"Receiving uploaded file {uploaded_file.filename}") - new_id = RenderJob.generate_id() - job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) - if not os.path.exists(job_dir): - os.makedirs(job_dir) - - local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) - uploaded_file.save(local_path) - renderer = request.values['renderer'] - - # todo: finish output_path - currently placeholder data - output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4") - - try: - render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path, - args=request.values.get('args', None)) - new_job = RenderJob(render_job, custom_id=new_id) - RenderServer.add_to_render_queue(new_job) - return new_job.json() - except ValueError as e: - logger.exception(e) - return {'error': str(e)}, 400 - - except Exception as e: - logger.exception(e) - - return {'error': 'unknown error'} - - -if __name__ == '__main__': - RenderServer.start() -