#!/usr/bin/env python import logging import platform import socket import threading import time import uuid from datetime import datetime import psutil import requests import yaml import json import os from flask import Flask, jsonify, request from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status data = 'foo' app = Flask(__name__) logger = logging.getLogger() local_hostname = socket.gethostname() JSON_FILE = 'job_history.json' #todo: move history to sqlite db class RenderJob: def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None): self.id = custom_id or self.generate_id() self.owner = owner self.render = render self.priority = priority self.client = client self.notify = notify self.date_created = datetime.now() self.scheduled_start = None self.renderer = render.renderer self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat() self.archived = False def render_status(self): """Returns status of render job""" try: if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED: return RenderStatus.SCHEDULED else: return self.render.status except Exception as e: logger.warning("render_status error: {}".format(e)) return RenderStatus.ERROR def json(self): """Converts RenderJob into JSON format""" import numbers def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_string = '' try: d = self.__dict__.copy() d['status'] = self.render_status().value d['render'] = self.render.__dict__.copy() for key in ['thread', 'process']: # remove unwanted keys from JSON d['render'].pop(key, None) d['render']['status'] = d['status'] # jobs from current_session generate percent completed # jobs after loading server pull in a saved value. Have to check if callable object or not percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \ else self.render.percent_complete() d['render']['percent_complete'] = percent_complete json_string = json.dumps(d, default=date_serializer) except Exception as e: logger.error("Error converting to JSON: {}".format(e)) return json_string @classmethod def generate_id(cls): return str(uuid.uuid4()).split('-')[0] class RenderServer: render_queue = [] render_clients = [] maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} host_name = socket.gethostname() port = 8080 last_saved_counts = {} def __init__(self): pass @classmethod def add_to_render_queue(cls, render_job, force_start=False): if not render_job.client: logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) cls.render_queue.append(render_job) if force_start: cls.start_job(render_job) else: cls.evaluate_queue() else: # todo: implement client rendering logger.warning('remote client rendering not implemented yet') @classmethod def running_jobs(cls): return cls.jobs_with_status(RenderStatus.RUNNING) @classmethod def pending_jobs(cls): pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) return pending_jobs @classmethod def jobs_with_status(cls, status, priority_sorted=False, include_archived=True): found_jobs = [x for x in cls.render_queue if x.render_status() == status] if not include_archived: found_jobs = [x for x in found_jobs if not x.archived] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @classmethod def clear_history(cls): to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR]] for x in to_remove: x.archived = True @classmethod def load_history(cls, json_path=None): input_path = json_path or JSON_FILE if os.path.exists(input_path): f = open(input_path) job_list = json.load(f) for job in job_list: # Identify renderer type and recreate Renderer object job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output']) # Load Renderer values for key, val in job['render'].items(): if val and key in ['start_time', 'end_time']: # convert date strings back into date objects job_render_object.__dict__[key] = datetime.fromisoformat(val) else: job_render_object.__dict__[key] = val job_render_object.status = RenderStatus[job['status'].upper()] job.pop('render', None) # Create RenderJob with re-created Renderer object new_job = RenderJob(job_render_object, job['priority'], job['client']) for key, val in job.items(): if key in ['date_created']: # convert date strings back to datetime objects new_job.__dict__[key] = datetime.fromisoformat(val) else: new_job.__dict__[key] = val new_job.__delattr__('status') # Handle older loaded jobs that were cancelled before closing if new_job.render_status() == RenderStatus.RUNNING: new_job.render.status = RenderStatus.CANCELLED # finally add back to render queue cls.render_queue.append(new_job) f.close() cls.last_saved_counts = cls.job_counts() @classmethod def save_history(cls, json_path=None): """Save job history to JSON file""" try: logger.debug("Saving Render History") new_list = [] for job in cls.render_queue: new_list.append(json.loads(job.json())) output_path = json_path or JSON_FILE with open(output_path, 'w') as f: json.dump(new_list, f, indent=4) cls.last_saved_counts = cls.job_counts() except Exception as e: logger.error("Error saving jobs JSON: {}".format(e)) @classmethod def evaluate_queue(cls): instances = cls.renderer_instances() not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) if not_started: for job in not_started: renderer = job.render.renderer higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] max_renderers = renderer in instances.keys() and instances[ renderer] >= cls.maximum_renderer_instances.get(renderer, 1) if not max_renderers and not higher_priority_jobs: cls.start_job(job) scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): cls.start_job(job) if cls.last_saved_counts != cls.job_counts(): cls.save_history() @classmethod def start_job(cls, job): logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, job.priority)) job.render.start() @classmethod def cancel_job(cls, job): logger.info('Cancelling job ID: {}'.format(job.id)) if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]: job.render.stop() job.render.status = RenderStatus.CANCELLED return True return False @classmethod def renderer_instances(cls): from collections import Counter all_instances = [x.render.renderer for x in cls.running_jobs()] return Counter(all_instances) @classmethod def job_counts(cls): job_counts = {} for job_status in RenderStatus: job_counts[job_status.value] = len(RenderServer.jobs_with_status(job_status)) return job_counts @classmethod def status(cls): stats = {"timestamp": datetime.now().isoformat(), "platform": platform.platform(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_count": psutil.cpu_count(), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderServer.job_counts(), "host_name": RenderServer.host_name } return stats @classmethod def all_jobs(cls): all_jobs = [x for x in RenderServer.render_queue if not x.archived] return all_jobs @classmethod def register_client(cls, hostname): # todo: register this response = requests.get("https://{}/_register_".format(hostname)) print(response.status_code) pass @classmethod def unregister_client(cls): pass @app.get('/jobs') def jobs_json(): return jsonify([json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]) @app.get('/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)] return jsonify(jobs) @app.get('/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} # todo: iterate through all servers server_list = [socket.gethostname()] try: for server_hostname in server_list: if server_hostname == local_hostname: server_status = RenderServer.status() server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived] else: server_status = requests.get(f'http://{server_hostname}:8080/status', timeout=1).json() server_jobs = requests.get(f'http://{server_hostname}:8080/jobs', timeout=1).json() server_data = {'status': server_status, 'jobs': server_jobs} full_results['servers'][server_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @app.post('/add_job') def add_job(): """Create new job and add to server render queue""" renderer = request.json["renderer"] input_path = request.json["input"] output_path = request.json["output"] priority = request.json.get('priority', 2) args = request.json.get('args', None) force_start = request.json.get('force_start', False) return add_job_handler(renderer, input_path, output_path, args=args, priority=priority, force_start=force_start) def add_job_handler(renderer, input_path, output_path, args=None, priority=2, force_start=False): if not os.path.exists(input_path): err_msg = f"Cannot add job. Cannot find input file: {input_path}" logger.error(err_msg) return {"error": err_msg}, 400 try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) except ValueError as e: logger.exception(e) return {'error': str(e)}, 400 new_job = RenderJob(render_job, priority=priority) RenderServer.add_to_render_queue(new_job, force_start=force_start) return new_job.json() @app.get('/cancel_job') def cancel_job(): job_id = request.args.get('id', None) confirm = request.args.get('confirm', False) if not job_id: return jsonify({'error': 'job id not found'}) elif not confirm: return jsonify({'error': 'confirmation required'}) else: found = [x for x in RenderServer.render_queue if x.id == job_id] if len(found) > 1: # logger.error('Multiple jobs found for ID {}'.format(job_id)) return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)}) elif found: success = RenderServer.cancel_job(found[0]) return jsonify({'result': success}) return jsonify({'error': 'job not found'}) @app.get('/clear_history') def clear_history(): RenderServer.clear_history() return jsonify({'result': True}) @app.route('/status') def status(): return jsonify(RenderServer.status()) @app.route('/') def default(): return "Server running" @app.route('/upload') def upload_file_page(): return render_template('upload.html') @app.route('/uploader', methods=['GET', 'POST']) def upload_file(): if request.method == 'POST': try: uploaded_file = request.files['file'] if not uploaded_file.filename: return {'error': 'no file uploaded'} with open('config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(logging.ERROR) logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO) # Setup the RenderServer object RenderServer.load_history() RenderServer.evaluate_queue() thread = threading.Thread(target=eval_loop, daemon=True) thread.start() server_thread = threading.Thread(target=lambda: app.run(host=RenderServer.host_name, port=RenderServer.port, debug=False, use_reloader=False)) server_thread.start() if not background_thread: server_thread.join() if __name__ == '__main__': start_server()