Files
Zordon/lib/render_queue.py
2023-05-30 22:30:18 -05:00

244 lines
7.9 KiB
Python
Executable File

import logging
import platform
from datetime import datetime
import psutil
import requests
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker
from .render_workers.base_worker import RenderStatus, BaseRenderWorker, Base
logger = logging.getLogger()
class JobNotFoundError(Exception):
def __init__(self, job_id, *args):
super().__init__(args)
self.job_id = job_id
class RenderClient(Base):
__tablename__ = 'render_clients'
id = Column(Integer, primary_key=True)
hostname = Column(String)
def __init__(self, hostname):
self.hostname = hostname
def is_available(self, timeout=3):
try:
response = requests.get(f"http://{self.hostname}:8080/api/status", timeout=timeout)
if response.ok:
return True
except requests.ConnectionError as e:
pass
return False
def __repr__(self):
return "client stuff"
class RenderQueue:
engine = create_engine('sqlite:///database.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
hostname = None
port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.hostname:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
render_job.client = cls.hostname
cls.job_queue.append(render_job)
if force_start:
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def all_jobs(cls):
return cls.job_queue
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
@classmethod
def load_state(cls):
cls.job_queue = cls.session.query(BaseRenderWorker).all()
@classmethod
def save_state(cls):
cls.session.commit()
@classmethod
def evaluate_queue(cls):
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = job.renderer in instances.keys() and instances[
job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
return True
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
return {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": cls.job_counts(),
"host_name": cls.hostname
}
@classmethod
def render_clients(cls):
all_clients = cls.session.query(RenderClient).all()
if not all_clients:
cls.session.add(RenderClient(hostname=cls.hostname))
cls.save_state()
all_clients = cls.session.query(RenderClient).all()
return all_clients
@classmethod
def client_with_hostname(cls, hostname):
return cls.session.query(RenderClient).filter(RenderClient.hostname == hostname).first()
@classmethod
def register_client(cls, hostname):
new_client = None
err_msg = None
if hostname == cls.hostname:
err_msg = "Cannot register same hostname as server"
elif cls.client_with_hostname(hostname):
err_msg = f"Client '{hostname}' already registered"
else:
new_client = RenderClient(hostname=hostname)
if not new_client.is_available():
cls.session.add(new_client)
logger.info(f"Client '{hostname}' successfully registered")
cls.save_state()
else:
err_msg = f"Cannot connect to client at hostname: {hostname}"
if err_msg:
logger.warning(err_msg)
return err_msg, 400
else:
return new_client.hostname
@classmethod
def unregister_client(cls, hostname):
success = False
client = cls.client_with_hostname(hostname)
if client and hostname != cls.hostname:
cls.session.delete(client)
cls.save_state()
logger.info(f"Client '{hostname}' successfully unregistered")
success = True
return str(success)
@staticmethod
def is_client_available(client_hostname, timeout=3):
try:
response = requests.get(f"http://{client_hostname}:8080/api/status", timeout=timeout)
if response.ok:
return True
except requests.ConnectionError as e:
pass
return False