Convert render_queue and scheduledjob to use sql instead of json

This commit is contained in:
Brett Williams
2023-05-24 09:58:02 -05:00
parent dd2ae2d71a
commit e11c5e7e58
4 changed files with 76 additions and 152 deletions

View File

@@ -1,22 +1,17 @@
import json
import logging import logging
import os
import platform import platform
from datetime import datetime from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import psutil import psutil
import requests import requests
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .scheduled_job import ScheduledJob, Base
from .render_workers.render_worker import RenderStatus from .render_workers.render_worker import RenderStatus
from .scheduled_job import ScheduledJob, Base
logger = logging.getLogger() logger = logging.getLogger()
JSON_FILE = 'server_state.json'
#todo: move history to sqlite db
class JobNotFoundError(Exception): class JobNotFoundError(Exception):
def __init__(self, job_id, *args): def __init__(self, job_id, *args):
@@ -25,7 +20,6 @@ class JobNotFoundError(Exception):
class RenderQueue: class RenderQueue:
engine = create_engine('sqlite:///database.db') engine = create_engine('sqlite:///database.db')
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
@@ -47,17 +41,21 @@ class RenderQueue:
def add_to_render_queue(cls, render_job, force_start=False, client=None): def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name: if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker)) logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker()))
render_job.client = cls.host_name render_job.client = cls.host_name
cls.job_queue.append(render_job) cls.job_queue.append(render_job)
if force_start: if force_start:
cls.start_job(render_job) cls.start_job(render_job)
cls.session.add(render_job) cls.session.add(render_job)
cls.session.commit() cls.save_state()
else: else:
# todo: implement client rendering # todo: implement client rendering
logger.warning('remote client rendering not implemented yet') logger.warning('remote client rendering not implemented yet')
@classmethod
def all_jobs(cls):
return cls.job_queue
@classmethod @classmethod
def running_jobs(cls): def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING) return cls.jobs_with_status(RenderStatus.RUNNING)
@@ -70,89 +68,33 @@ class RenderQueue:
@classmethod @classmethod
def jobs_with_status(cls, status, priority_sorted=False): def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.job_queue if x.render_status() == status] found_jobs = [x for x in cls.all_jobs() if x.render_status() == status]
if priority_sorted: if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs return found_jobs
@classmethod @classmethod
def job_with_id(cls, job_id, none_ok=False): def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.job_queue if x.id == job_id), None) found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok: if not found_job and not none_ok:
raise JobNotFoundError(job_id) raise JobNotFoundError(job_id)
return found_job return found_job
@classmethod @classmethod
def clear_history(cls): def clear_history(cls):
to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED, to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]] RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove: for job_to_remove in to_remove:
cls.job_queue.remove(job_to_remove) cls.job_queue.remove(job_to_remove)
cls.save_state() cls.save_state()
@classmethod @classmethod
def load_state(cls, json_path=None): def load_state(cls):
"""Load state history from JSON file""" cls.job_queue = cls.session.query(ScheduledJob).all()
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
with open(input_path) as f:
# load saved data
saved_state = json.load(f)
cls.render_clients = saved_state.get('clients', {})
for job in saved_state.get('jobs', []):
try:
render_job = ScheduledJob(renderer=job['renderer'], input_path=job['worker']['input_path'],
output_path=job['worker']['output_path'], args=job['worker']['args'],
priority=job['priority'], client=job['client'])
# Load Worker values
for key, val in job['worker'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
render_job.worker.__dict__[key] = datetime.fromisoformat(val)
else:
render_job.worker.__dict__[key] = val
render_job.worker.status = RenderStatus[job['status'].upper()]
job.pop('worker', None)
# Create RenderJob with re-created Renderer object
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
render_job.__dict__[key] = datetime.fromisoformat(val)
else:
import types
if hasattr(render_job, key):
if getattr(render_job, key) and not isinstance(getattr(render_job, key), types.MethodType):
render_job.__dict__[key] = val
# Handle older loaded jobs that were cancelled before closing
if render_job.render_status() == RenderStatus.RUNNING:
render_job.worker.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.job_queue.append(render_job)
except Exception as e:
logger.exception(f"Unable to load job: {job['id']} - {e}")
cls.last_saved_counts = cls.job_counts()
@classmethod @classmethod
def save_state(cls, json_path=None): def save_state(cls):
"""Save state history to JSON file""" cls.session.commit()
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [j.json() for j in cls.job_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(output, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving state JSON: {}".format(e))
@classmethod @classmethod
def evaluate_queue(cls): def evaluate_queue(cls):
@@ -162,7 +104,7 @@ class RenderQueue:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started: if not_started:
for job in not_started: for job in not_started:
renderer = job.worker.engine.name() renderer = job.worker().engine.name()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[ max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1) renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
@@ -195,12 +137,14 @@ class RenderQueue:
logger.info(f"Deleting job ID: {job.id}") logger.info(f"Deleting job ID: {job.id}")
job.stop() job.stop()
cls.job_queue.remove(job) cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
return True return True
@classmethod @classmethod
def renderer_instances(cls): def renderer_instances(cls):
from collections import Counter from collections import Counter
all_instances = [x.worker.engine.name() for x in cls.running_jobs()] all_instances = [x.worker().engine.name() for x in cls.running_jobs()]
return Counter(all_instances) return Counter(all_instances)
@classmethod @classmethod
@@ -208,13 +152,11 @@ class RenderQueue:
job_counts = {} job_counts = {}
for job_status in RenderStatus: for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts return job_counts
@classmethod @classmethod
def status(cls): def status(cls):
return {"timestamp": datetime.now().isoformat(),
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(), "platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
@@ -226,8 +168,6 @@ class RenderQueue:
"host_name": cls.host_name "host_name": cls.host_name
} }
return stats
@classmethod @classmethod
def register_client(cls, hostname): def register_client(cls, hostname):
@@ -273,4 +213,3 @@ class RenderQueue:
except requests.ConnectionError as e: except requests.ConnectionError as e:
pass pass
return False return False

View File

@@ -3,7 +3,6 @@ import hashlib
import json import json
import logging import logging
import os import os
import threading
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -20,10 +19,17 @@ class ScheduledJob(Base):
__tablename__ = 'scheduled_jobs' __tablename__ = 'scheduled_jobs'
id = Column(String, primary_key=True) id = Column(String, primary_key=True)
# Get file hash on bg thread renderer = Column(String)
def __get_file_hash(self): input_path = Column(String)
if os.path.exists(self.worker.input_path): output_path = Column(String)
self.file_hash = hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest() priority = Column(Integer)
owner = Column(String)
client = Column(String)
notify = Column(String)
date_created = Column(DateTime)
scheduled_start = Column(DateTime)
name = Column(String)
file_hash = Column(String)
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None,
custom_id=None, name=None): custom_id=None, name=None):
@@ -45,18 +51,23 @@ class ScheduledJob(Base):
self.worker_object.validate() self.worker_object.validate()
self.file_hash = None self.file_hash = None
threading.Thread(target=self.__get_file_hash).start() # get file hash on bg thread if not self.file_hash and os.path.exists(input_path):
self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
def worker(self):
if hasattr(self, 'worker_object'):
return self.worker_object
else:
return {}
def render_status(self): def render_status(self):
if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED: try:
if self.scheduled_start and self.worker().status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED return RenderStatus.SCHEDULED
else: else:
return self.worker.status return self.worker().status
except:
def file_hash(self): return RenderStatus.CANCELLED
if os.path.exists(self.worker.input_path):
return hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest()
return None
def json(self): def json(self):
"""Converts RenderJob into JSON-friendly dict""" """Converts RenderJob into JSON-friendly dict"""
@@ -94,44 +105,15 @@ class ScheduledJob(Base):
return job_dict return job_dict
def start(self): def start(self):
self.worker.start() if hasattr(self, 'worker'):
self.worker().start()
def stop(self): def stop(self):
self.worker.stop() if hasattr(self, 'worker'):
self.worker().stop()
def time_elapsed(self):
from string import Template
class DeltaTemplate(Template):
delimiter = "%"
def strfdelta(tdelta, fmt='%H:%M:%S'):
d = {"D": tdelta.days}
hours, rem = divmod(tdelta.seconds, 3600)
minutes, seconds = divmod(rem, 60)
d["H"] = '{:02d}'.format(hours)
d["M"] = '{:02d}'.format(minutes)
d["S"] = '{:02d}'.format(seconds)
t = DeltaTemplate(fmt)
return t.substitute(**d)
# calculate elapsed time
elapsed_time = None
start_time = self.worker.start_time
end_time = self.worker.end_time
if start_time:
if end_time:
elapsed_time = end_time - start_time
elif self.render_status() == RenderStatus.RUNNING:
elapsed_time = datetime.now() - start_time
elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown"
return elapsed_time_string
def frame_count(self): def frame_count(self):
return self.worker.total_frames return self.worker().total_frames
def work_path(self): def work_path(self):
return os.path.dirname(self.output_path) return os.path.dirname(self.output_path)
@@ -144,7 +126,10 @@ class ScheduledJob(Base):
return os.path.join(os.path.dirname(self.input_path), self.name + '.log') return os.path.join(os.path.dirname(self.input_path), self.name + '.log')
def percent_complete(self): def percent_complete(self):
return self.worker.percent_complete() try:
return self.worker().percent_complete()
except:
return -1
@classmethod @classmethod
def generate_id(cls): def generate_id(cls):

View File

@@ -49,7 +49,7 @@ def index():
with open('config/presets.yaml') as f: with open('config/presets.yaml') as f:
presets = yaml.load(f, Loader=yaml.FullLoader) presets = yaml.load(f, Loader=yaml.FullLoader)
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.job_queue), return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()),
hostname=RenderQueue.host_name, renderer_info=renderer_info(), hostname=RenderQueue.host_name, renderer_info=renderer_info(),
render_clients=RenderQueue.render_clients, preset_list=presets) render_clients=RenderQueue.render_clients, preset_list=presets)
@@ -101,7 +101,7 @@ def get_job_file(job_id, filename):
@server.get('/api/jobs') @server.get('/api/jobs')
def jobs_json(): def jobs_json():
return [x.json() for x in RenderQueue.job_queue] return [x.json() for x in RenderQueue.all_jobs()]
@server.get('/api/jobs/<status_val>') @server.get('/api/jobs/<status_val>')
@@ -127,7 +127,7 @@ def get_job_status(job_id):
@server.get('/api/job/<job_id>/logs') @server.get('/api/job/<job_id>/logs')
def get_job_logs(job_id): def get_job_logs(job_id):
found_job = RenderQueue.job_with_id(job_id) found_job = RenderQueue.job_with_id(job_id)
log_path = found_job.worker.log_path log_path = found_job.log_path()
log_data = None log_data = None
if log_path and os.path.exists(log_path): if log_path and os.path.exists(log_path):
with open(log_path) as file: with open(log_path) as file:
@@ -151,9 +151,9 @@ def download_all(job_id):
return response return response
found_job = RenderQueue.job_with_id(job_id) found_job = RenderQueue.job_with_id(job_id)
output_dir = os.path.dirname(found_job.worker.output_path) output_dir = os.path.dirname(found_job.output_path)
if os.path.exists(output_dir): if os.path.exists(output_dir):
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.worker.input_path).stem + '.zip') zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip')
with ZipFile(zip_filename, 'w') as zipObj: with ZipFile(zip_filename, 'w') as zipObj:
for f in os.listdir(output_dir): for f in os.listdir(output_dir):
zipObj.write(filename=os.path.join(output_dir, f), zipObj.write(filename=os.path.join(output_dir, f),
@@ -219,7 +219,7 @@ def full_status():
@server.get('/api/snapshot') @server.get('/api/snapshot')
def snapshot(): def snapshot():
server_status = RenderQueue.status() server_status = RenderQueue.status()
server_jobs = [x.json() for x in RenderQueue.job_queue] server_jobs = [x.json() for x in RenderQueue.all_jobs()]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data return server_data
@@ -405,7 +405,7 @@ def delete_job(job_id):
os.remove(thumb_path) os.remove(thumb_path)
# See if we own the input file (i.e. was it uploaded) # See if we own the input file (i.e. was it uploaded)
input_dir = os.path.dirname(found_job.worker.input_path) input_dir = os.path.dirname(found_job.input_path)
if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir): if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir):
shutil.rmtree(input_dir) shutil.rmtree(input_dir)

View File

@@ -33,7 +33,7 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt
elif len(job.file_list()) > 1: # if image sequence, use second to last file (last may be in use) elif len(job.file_list()) > 1: # if image sequence, use second to last file (last may be in use)
source_path = [job.file_list()[-2]] source_path = [job.file_list()[-2]]
else: else:
source_path = [job.worker.input_path] # use source if nothing else source_path = [job.input_path] # use source if nothing else
if source_path: if source_path:
# Todo: convert image sequence to animated movie # Todo: convert image sequence to animated movie