mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 08:48:13 +00:00
248 lines
8.1 KiB
Python
Executable File
248 lines
8.1 KiB
Python
Executable File
import logging
|
|
import platform
|
|
from datetime import datetime
|
|
|
|
import psutil
|
|
import requests
|
|
from sqlalchemy import create_engine, Column, String, Integer
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from .render_workers.render_worker import RenderStatus
|
|
from .scheduled_job import ScheduledJob, Base
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
class JobNotFoundError(Exception):
|
|
def __init__(self, job_id, *args):
|
|
super().__init__(args)
|
|
self.job_id = job_id
|
|
|
|
|
|
class RenderClient(Base):
|
|
__tablename__ = 'render_clients'
|
|
id = Column(Integer, primary_key=True)
|
|
hostname = Column(String)
|
|
|
|
def __init__(self, hostname):
|
|
self.hostname = hostname
|
|
|
|
def is_available(self, timeout=3):
|
|
try:
|
|
response = requests.get(f"http://{self.hostname}:8080/api/status", timeout=timeout)
|
|
if response.ok:
|
|
return True
|
|
except requests.ConnectionError as e:
|
|
pass
|
|
return False
|
|
|
|
def __repr__(self):
|
|
return "client stuff"
|
|
|
|
|
|
class RenderQueue:
|
|
engine = create_engine('sqlite:///database.db')
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
ScheduledJob.register_user_events()
|
|
job_queue = []
|
|
maximum_renderer_instances = {'blender': 2, 'aerender': 1, 'ffmpeg': 4}
|
|
hostname = None
|
|
port = 8080
|
|
client_mode = False
|
|
server_hostname = None
|
|
|
|
last_saved_counts = {}
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def add_to_render_queue(cls, render_job, force_start=False, client=None):
|
|
|
|
if not client or render_job.client == cls.hostname:
|
|
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker_object))
|
|
render_job.client = cls.hostname
|
|
cls.job_queue.append(render_job)
|
|
if force_start:
|
|
cls.start_job(render_job)
|
|
cls.session.add(render_job)
|
|
cls.save_state()
|
|
else:
|
|
# todo: implement client rendering
|
|
logger.warning('remote client rendering not implemented yet')
|
|
|
|
@classmethod
|
|
def all_jobs(cls):
|
|
return cls.job_queue
|
|
|
|
@classmethod
|
|
def running_jobs(cls):
|
|
return cls.jobs_with_status(RenderStatus.RUNNING)
|
|
|
|
@classmethod
|
|
def pending_jobs(cls):
|
|
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
|
|
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
|
|
return pending_jobs
|
|
|
|
@classmethod
|
|
def jobs_with_status(cls, status, priority_sorted=False):
|
|
found_jobs = [x for x in cls.all_jobs() if x.render_status() == status]
|
|
if priority_sorted:
|
|
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
|
|
return found_jobs
|
|
|
|
@classmethod
|
|
def job_with_id(cls, job_id, none_ok=False):
|
|
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
|
|
if not found_job and not none_ok:
|
|
raise JobNotFoundError(job_id)
|
|
return found_job
|
|
|
|
@classmethod
|
|
def clear_history(cls):
|
|
to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED,
|
|
RenderStatus.COMPLETED, RenderStatus.ERROR]]
|
|
for job_to_remove in to_remove:
|
|
cls.delete_job(job_to_remove)
|
|
cls.save_state()
|
|
|
|
@classmethod
|
|
def load_state(cls):
|
|
cls.job_queue = cls.session.query(ScheduledJob).all()
|
|
|
|
@classmethod
|
|
def save_state(cls):
|
|
cls.session.commit()
|
|
|
|
@classmethod
|
|
def evaluate_queue(cls):
|
|
|
|
instances = cls.renderer_instances()
|
|
|
|
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
|
|
if not_started:
|
|
for job in not_started:
|
|
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
|
|
max_renderers = job.renderer in instances.keys() and instances[
|
|
job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1)
|
|
|
|
if not max_renderers and not higher_priority_jobs:
|
|
cls.start_job(job)
|
|
|
|
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
|
|
for job in scheduled:
|
|
if job.scheduled_start <= datetime.now():
|
|
cls.start_job(job)
|
|
|
|
if cls.last_saved_counts != cls.job_counts():
|
|
cls.save_state()
|
|
|
|
@classmethod
|
|
def start_job(cls, job):
|
|
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
|
|
job.priority))
|
|
job.start()
|
|
|
|
@classmethod
|
|
def cancel_job(cls, job):
|
|
logger.info(f'Cancelling job ID: {job.id}')
|
|
job.stop()
|
|
return job.render_status() == RenderStatus.CANCELLED
|
|
|
|
@classmethod
|
|
def delete_job(cls, job):
|
|
logger.info(f"Deleting job ID: {job.id}")
|
|
job.stop()
|
|
cls.job_queue.remove(job)
|
|
cls.session.delete(job)
|
|
cls.save_state()
|
|
return True
|
|
|
|
@classmethod
|
|
def renderer_instances(cls):
|
|
from collections import Counter
|
|
all_instances = [x.renderer for x in cls.running_jobs()]
|
|
return Counter(all_instances)
|
|
|
|
@classmethod
|
|
def job_counts(cls):
|
|
job_counts = {}
|
|
for job_status in RenderStatus:
|
|
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
|
|
return job_counts
|
|
|
|
@classmethod
|
|
def status(cls):
|
|
return {"timestamp": datetime.now().isoformat(),
|
|
"platform": platform.platform(),
|
|
"cpu_percent": psutil.cpu_percent(percpu=False),
|
|
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
|
|
"cpu_count": psutil.cpu_count(),
|
|
"memory_total": psutil.virtual_memory().total,
|
|
"memory_available": psutil.virtual_memory().available,
|
|
"memory_percent": psutil.virtual_memory().percent,
|
|
"job_counts": cls.job_counts(),
|
|
"host_name": cls.hostname
|
|
}
|
|
|
|
@classmethod
|
|
def render_clients(cls):
|
|
all_clients = cls.session.query(RenderClient).all()
|
|
if not all_clients:
|
|
cls.session.add(RenderClient(hostname=cls.hostname))
|
|
cls.save_state()
|
|
all_clients = cls.session.query(RenderClient).all()
|
|
return all_clients
|
|
|
|
@classmethod
|
|
def client_with_hostname(cls, hostname):
|
|
return cls.session.query(RenderClient).filter(RenderClient.hostname == hostname).first()
|
|
|
|
@classmethod
|
|
def register_client(cls, hostname):
|
|
new_client = None
|
|
err_msg = None
|
|
|
|
if hostname == cls.hostname:
|
|
err_msg = "Cannot register same hostname as server"
|
|
elif cls.client_with_hostname(hostname):
|
|
err_msg = f"Client '{hostname}' already registered"
|
|
else:
|
|
new_client = RenderClient(hostname=hostname)
|
|
if not new_client.is_available():
|
|
cls.session.add(new_client)
|
|
logger.info(f"Client '{hostname}' successfully registered")
|
|
cls.save_state()
|
|
else:
|
|
err_msg = f"Cannot connect to client at hostname: {hostname}"
|
|
|
|
if err_msg:
|
|
logger.warning(err_msg)
|
|
return err_msg, 400
|
|
else:
|
|
return new_client.hostname
|
|
|
|
@classmethod
|
|
def unregister_client(cls, hostname):
|
|
success = False
|
|
client = cls.client_with_hostname(hostname)
|
|
if client and hostname != cls.hostname:
|
|
cls.session.delete(client)
|
|
cls.save_state()
|
|
logger.info(f"Client '{hostname}' successfully unregistered")
|
|
success = True
|
|
return str(success)
|
|
|
|
@staticmethod
|
|
def is_client_available(client_hostname, timeout=3):
|
|
try:
|
|
response = requests.get(f"http://{client_hostname}:8080/api/status", timeout=timeout)
|
|
if response.ok:
|
|
return True
|
|
except requests.ConnectionError as e:
|
|
pass
|
|
return False
|