import json import logging import os import platform from datetime import datetime import psutil import requests from lib.render_job import RenderJob from utilities.render_worker import RenderWorkerFactory, RenderStatus logger = logging.getLogger() JSON_FILE = 'server_state.json' #todo: move history to sqlite db class RenderQueue: job_queue = [] render_clients = [] maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} host_name = None port = 8080 client_mode = False server_hostname = None last_saved_counts = {} def __init__(self): pass @classmethod def add_to_render_queue(cls, render_job, force_start=False, client=None): if not client or render_job.client == cls.host_name: logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) render_job.client = cls.host_name cls.job_queue.append(render_job) if force_start: cls.start_job(render_job) else: cls.evaluate_queue() else: # todo: implement client rendering logger.warning('remote client rendering not implemented yet') @classmethod def running_jobs(cls): return cls.jobs_with_status(RenderStatus.RUNNING) @classmethod def pending_jobs(cls): pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) return pending_jobs @classmethod def jobs_with_status(cls, status, priority_sorted=False, include_archived=True): found_jobs = [x for x in cls.job_queue if x.render_status() == status] if not include_archived: found_jobs = [x for x in found_jobs if not x.archived] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @classmethod def job_with_id(cls, job_id): found_job = next((x for x in cls.job_queue if x.id == job_id), None) return found_job @classmethod def clear_history(cls): to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR]] for x in to_remove: x.archived = True cls.save_state() @classmethod def load_state(cls, json_path=None): """Load state history from JSON file""" input_path = json_path or JSON_FILE if os.path.exists(input_path): with open(input_path) as f: # load saved data saved_state = json.load(f) cls.render_clients = saved_state.get('clients', {}) for job in saved_state.get('jobs', []): # Identify renderer type and recreate Renderer object job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output']) # Load Renderer values for key, val in job['render'].items(): if val and key in ['start_time', 'end_time']: # convert date strings back into date objects job_render_object.__dict__[key] = datetime.fromisoformat(val) else: job_render_object.__dict__[key] = val job_render_object.status = RenderStatus[job['status'].upper()] job.pop('render', None) # Create RenderJob with re-created Renderer object new_job = RenderJob(job_render_object, job['priority'], job['client']) for key, val in job.items(): if key in ['date_created']: # convert date strings back to datetime objects new_job.__dict__[key] = datetime.fromisoformat(val) else: new_job.__dict__[key] = val new_job.__delattr__('status') # Handle older loaded jobs that were cancelled before closing if new_job.render_status() == RenderStatus.RUNNING: new_job.render.status = RenderStatus.CANCELLED # finally add back to render queue cls.job_queue.append(new_job) cls.last_saved_counts = cls.job_counts() @classmethod def save_state(cls, json_path=None): """Save state history to JSON file""" try: logger.debug("Saving Render History") output = {'timestamp': datetime.now().isoformat(), 'jobs': [json.loads(j.json()) for j in cls.job_queue], 'clients': cls.render_clients} output_path = json_path or JSON_FILE with open(output_path, 'w') as f: json.dump(output, f, indent=4) cls.last_saved_counts = cls.job_counts() except Exception as e: logger.error("Error saving state JSON: {}".format(e)) @classmethod def evaluate_queue(cls): instances = cls.renderer_instances() not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) if not_started: for job in not_started: renderer = job.render.renderer higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] max_renderers = renderer in instances.keys() and instances[ renderer] >= cls.maximum_renderer_instances.get(renderer, 1) if not max_renderers and not higher_priority_jobs: cls.start_job(job) scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): cls.start_job(job) if cls.last_saved_counts != cls.job_counts(): cls.save_state() @classmethod def start_job(cls, job): logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, job.priority)) job.render.start() @classmethod def cancel_job(cls, job): logger.info('Cancelling job ID: {}'.format(job.id)) if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]: job.render.stop() job.render.status = RenderStatus.CANCELLED return True return False @classmethod def renderer_instances(cls): from collections import Counter all_instances = [x.render.renderer for x in cls.running_jobs()] return Counter(all_instances) @classmethod def job_counts(cls): job_counts = {} for job_status in RenderStatus: job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) return job_counts @classmethod def status(cls): stats = {"timestamp": datetime.now().isoformat(), "platform": platform.platform(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": cls.job_counts(), "host_name": cls.host_name } return stats @classmethod def all_jobs(cls): all_jobs = [x for x in cls.job_queue if not x.archived] return all_jobs @classmethod def register_client(cls, hostname): #todo: check to make sure not adding ourselves success = False if hostname in cls.render_clients: logger.warning(f"Client '{hostname}' already registered") return success try: response = requests.get(f"http://{hostname}:8080/status", timeout=3) if response.ok: cls.render_clients.append(hostname) logger.info(f"Client '{hostname}' successfully registered") success = True cls.save_state() except requests.ConnectionError as e: logger.error(f"Cannot connect to client at hostname: {hostname}") return success @classmethod def unregister_client(cls, hostname): success = False if hostname in cls.render_clients and hostname != cls.host_name: cls.render_clients.remove(hostname) logger.info(f"Client '{hostname}' successfully unregistered") success = True return success @staticmethod def is_client_available(client_hostname, timeout=3): try: response = requests.get(f"http://{client_hostname}:8080/status", timeout=timeout) if response.ok: return True except requests.ConnectionError as e: pass return False