Save worker information to database for persistence across runs and abstract away data source

This commit is contained in:
Brett Williams
2023-05-24 12:50:16 -05:00
parent e11c5e7e58
commit 2a7eddb1eb
3 changed files with 41 additions and 29 deletions

View File

@@ -24,6 +24,7 @@ class RenderQueue:
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
ScheduledJob.register_user_events()
job_queue = [] job_queue = []
render_clients = [] render_clients = []
maximum_renderer_instances = {'blender': 2, 'aerender': 1, 'ffmpeg': 4} maximum_renderer_instances = {'blender': 2, 'aerender': 1, 'ffmpeg': 4}
@@ -41,7 +42,7 @@ class RenderQueue:
def add_to_render_queue(cls, render_job, force_start=False, client=None): def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name: if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker())) logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker_object))
render_job.client = cls.host_name render_job.client = cls.host_name
cls.job_queue.append(render_job) cls.job_queue.append(render_job)
if force_start: if force_start:
@@ -85,7 +86,7 @@ class RenderQueue:
to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED, to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]] RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove: for job_to_remove in to_remove:
cls.job_queue.remove(job_to_remove) cls.delete_job(job_to_remove)
cls.save_state() cls.save_state()
@classmethod @classmethod
@@ -104,10 +105,9 @@ class RenderQueue:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started: if not_started:
for job in not_started: for job in not_started:
renderer = job.worker().engine.name()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[ max_renderers = job.renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1) job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1)
if not max_renderers and not higher_priority_jobs: if not max_renderers and not higher_priority_jobs:
cls.start_job(job) cls.start_job(job)
@@ -144,7 +144,7 @@ class RenderQueue:
@classmethod @classmethod
def renderer_instances(cls): def renderer_instances(cls):
from collections import Counter from collections import Counter
all_instances = [x.worker().engine.name() for x in cls.running_jobs()] all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances) return Counter(all_instances)
@classmethod @classmethod

View File

@@ -242,6 +242,8 @@ class BaseRenderWorker(object):
def json(self): def json(self):
worker_data = self.__dict__.copy() worker_data = self.__dict__.copy()
worker_data['percent_complete'] = self.percent_complete()
worker_data['time_elapsed'] = self.time_elapsed()
keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict
for key in worker_data.keys(): for key in worker_data.keys():
if key.startswith('_'): if key.startswith('_'):

View File

@@ -6,7 +6,7 @@ import os
import uuid import uuid
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy import Column, Integer, String, DateTime, JSON, event
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from .render_workers.render_worker import RenderStatus, RenderWorkerFactory from .render_workers.render_worker import RenderStatus, RenderWorkerFactory
@@ -30,6 +30,7 @@ class ScheduledJob(Base):
scheduled_start = Column(DateTime) scheduled_start = Column(DateTime)
name = Column(String) name = Column(String)
file_hash = Column(String) file_hash = Column(String)
worker_data_store = Column(JSON)
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None,
custom_id=None, name=None): custom_id=None, name=None):
@@ -54,18 +55,36 @@ class ScheduledJob(Base):
if not self.file_hash and os.path.exists(input_path): if not self.file_hash and os.path.exists(input_path):
self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest() self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
def worker(self): def worker_data(self):
if hasattr(self, 'worker_object'): if hasattr(self, 'worker_object'):
return self.worker_object fetched = self.worker_object.json()
else: if fetched != self.worker_data_store:
return {} self.worker_data_store = fetched
return self.worker_data_store
@staticmethod
def before_insert(mapper, connection, target):
logger.debug(f"Before insert: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@staticmethod
def before_update(mapper, connection, target):
logger.debug(f"Before update: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@classmethod
def register_user_events(cls):
event.listen(cls, 'before_insert', cls.before_insert)
event.listen(cls, 'before_update', cls.before_update)
def render_status(self): def render_status(self):
try: try:
if self.scheduled_start and self.worker().status == RenderStatus.NOT_STARTED: if self.scheduled_start and self.worker_object.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED return RenderStatus.SCHEDULED
else: else:
return self.worker().status return self.worker_object.status
except: except:
return RenderStatus.CANCELLED return RenderStatus.CANCELLED
@@ -85,12 +104,12 @@ class ScheduledJob(Base):
'date_created': self.date_created, 'date_created': self.date_created,
'scheduled_start': self.scheduled_start, 'scheduled_start': self.scheduled_start,
'status': self.render_status().value, 'status': self.render_status().value,
'time_elapsed': self.worker().time_elapsed() if hasattr(self.worker(), 'time_elapsed') else None, 'time_elapsed': self.worker_data().get('time_elapsed', None),
'file_hash': self.file_hash, 'file_hash': self.file_hash,
'percent_complete': self.percent_complete(), 'percent_complete': self.worker_data().get('percent_complete', None),
'file_list': self.file_list(), 'file_list': self.file_list(),
'renderer': self.renderer, 'renderer': self.renderer,
'worker': self.worker().json() if hasattr(self.worker(), 'json') else {}, 'worker': self.worker_data(),
} }
# convert to json and back to auto-convert dates to iso format # convert to json and back to auto-convert dates to iso format
@@ -105,15 +124,12 @@ class ScheduledJob(Base):
return job_dict return job_dict
def start(self): def start(self):
if hasattr(self, 'worker'): if hasattr(self, 'worker_object'):
self.worker().start() self.worker_object.start()
def stop(self): def stop(self):
if hasattr(self, 'worker'): if hasattr(self, 'worker_object'):
self.worker().stop() self.worker_object.stop()
def frame_count(self):
return self.worker().total_frames
def work_path(self): def work_path(self):
return os.path.dirname(self.output_path) return os.path.dirname(self.output_path)
@@ -125,12 +141,6 @@ class ScheduledJob(Base):
def log_path(self): def log_path(self):
return os.path.join(os.path.dirname(self.input_path), self.name + '.log') return os.path.join(os.path.dirname(self.input_path), self.name + '.log')
def percent_complete(self):
try:
return self.worker().percent_complete()
except:
return -1
@classmethod @classmethod
def generate_id(cls): def generate_id(cls):
return str(uuid.uuid4()).split('-')[0] return str(uuid.uuid4()).split('-')[0]