Files
Zordon/lib/render_queue.py
Brett Williams 9ac22a5555 Misc Cleanup
2022-12-11 18:01:30 -08:00

258 lines
9.4 KiB
Python
Executable File

import json
import logging
import os
import platform
from datetime import datetime
import psutil
import requests
from lib.render_job import RenderJob
from utilities.render_worker import RenderStatus
logger = logging.getLogger()
JSON_FILE = 'server_state.json'
#todo: move history to sqlite db
class RenderQueue:
job_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = None
port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker))
render_job.client = cls.host_name
cls.job_queue.append(render_job)
if force_start:
cls.start_job(render_job)
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.job_queue if x.render_status() == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id):
found_job = next((x for x in cls.job_queue if x.id == job_id), None)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.job_queue.remove(job_to_remove)
cls.save_state()
@classmethod
def load_state(cls, json_path=None):
"""Load state history from JSON file"""
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
with open(input_path) as f:
# load saved data
saved_state = json.load(f)
cls.render_clients = saved_state.get('clients', {})
for job in saved_state.get('jobs', []):
try:
render_job = RenderJob(renderer=job['renderer'], input_path=job['worker']['input_path'],
output_path=job['worker']['output_path'], args=job['worker']['args'],
priority=job['priority'], client=job['client'])
# Load Worker values
for key, val in job['worker'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
render_job.worker.__dict__[key] = datetime.fromisoformat(val)
else:
render_job.worker.__dict__[key] = val
render_job.worker.status = RenderStatus[job['status'].upper()]
job.pop('worker', None)
# Create RenderJob with re-created Renderer object
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
render_job.__dict__[key] = datetime.fromisoformat(val)
else:
render_job.__dict__[key] = val
render_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if render_job.render_status() == RenderStatus.RUNNING:
render_job.worker.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.job_queue.append(render_job)
except Exception as e:
logger.error(f"Unable to load job: {job} - {e}")
cls.last_saved_counts = cls.job_counts()
@classmethod
def save_state(cls, json_path=None):
"""Save state history to JSON file"""
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [j.json() for j in cls.job_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(output, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving state JSON: {}".format(e))
@classmethod
def evaluate_queue(cls):
instances = cls.renderer_instances()
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.worker.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.start()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.render_status() == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}")
job.stop()
cls.job_queue.remove(job)
return True
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.worker.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": cls.job_counts(),
"host_name": cls.host_name
}
return stats
@classmethod
def register_client(cls, hostname):
err_msg = None
if hostname == cls.host_name:
err_msg = "Cannot register same hostname as server"
elif hostname in cls.render_clients:
err_msg = f"Client '{hostname}' already registered"
else:
try:
response = requests.get(f"http://{hostname}:8080/api/status", timeout=3)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
cls.save_state()
else:
err_msg = f'Response from server not ok: {response.text}'
except requests.ConnectionError as e:
err_msg = f"Cannot connect to client at hostname: {hostname}"
if err_msg:
logger.warning(err_msg)
return err_msg, 400
else:
return 'success'
@classmethod
def unregister_client(cls, hostname):
success = False
if hostname in cls.render_clients and hostname != cls.host_name:
cls.render_clients.remove(hostname)
logger.info(f"Client '{hostname}' successfully unregistered")
success = True
return success
@staticmethod
def is_client_available(client_hostname, timeout=3):
try:
response = requests.get(f"http://{client_hostname}:8080/api/status", timeout=timeout)
if response.ok:
return True
except requests.ConnectionError as e:
pass
return False