import glob import hashlib import json import logging import os import uuid from datetime import datetime from sqlalchemy import Column, Integer, String, DateTime, JSON, event from sqlalchemy.ext.declarative import declarative_base from .render_workers.render_worker import RenderStatus, RenderWorkerFactory logger = logging.getLogger() Base = declarative_base() class ScheduledJob(Base): __tablename__ = 'scheduled_jobs' id = Column(String, primary_key=True) renderer = Column(String) input_path = Column(String) output_path = Column(String) priority = Column(Integer) owner = Column(String) client = Column(String) notify = Column(String) date_created = Column(DateTime) scheduled_start = Column(DateTime) name = Column(String) file_hash = Column(String) worker_data_store = Column(JSON) def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, custom_id=None, name=None): self.id = custom_id or self.generate_id() self.renderer = renderer self.input_path = input_path self.output_path = output_path self.priority = priority self.owner = owner self.client = client self.notify = notify self.date_created = datetime.now() self.scheduled_start = None self.name = name or os.path.basename(input_path) + '_' + self.date_created.strftime("%Y.%m.%d_%H.%M.%S") self.worker_object = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) self.worker_object.log_path = self.log_path() self.worker_object.validate() self.file_hash = None if not self.file_hash and os.path.exists(input_path): self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest() def worker_data(self): if hasattr(self, 'worker_object'): fetched = self.worker_object.json() if fetched != self.worker_data_store: self.worker_data_store = fetched return self.worker_data_store @staticmethod def before_insert(mapper, connection, target): logger.debug(f"Before insert: {target.name}") if hasattr(target, 'worker_object'): target.worker_data_store = target.worker_object.json() @staticmethod def before_update(mapper, connection, target): logger.debug(f"Before update: {target.name}") if hasattr(target, 'worker_object'): target.worker_data_store = target.worker_object.json() @classmethod def register_user_events(cls): event.listen(cls, 'before_insert', cls.before_insert) event.listen(cls, 'before_update', cls.before_update) def render_status(self): try: worker_status = RenderStatus(self.worker_data()['status']) if hasattr(self, 'worker_object'): if self.scheduled_start and worker_status == RenderStatus.NOT_STARTED: return RenderStatus.SCHEDULED else: if worker_status == RenderStatus.RUNNING: return RenderStatus.ERROR except Exception as e: logger.error(f"Exception fetching render status: {e}") worker_status = RenderStatus.UNDEFINED return worker_status def json(self): """Converts RenderJob into JSON-friendly dict""" job_dict = None try: job_dict = { 'id': self.id, 'name': self.name, 'input_path': self.input_path, 'output_path': self.output_path, 'priority': self.priority, 'owner': self.owner, 'client': self.client, 'notify': self.notify, 'date_created': self.date_created, 'scheduled_start': self.scheduled_start, 'status': self.render_status().value, 'time_elapsed': self.worker_data().get('time_elapsed', None), 'file_hash': self.file_hash, 'percent_complete': self.worker_data().get('percent_complete', None), 'file_list': self.file_list(), 'renderer': self.renderer, 'worker': self.worker_data(), } # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_convert = json.dumps(job_dict, default=date_serializer) job_dict = json.loads(json_convert) except Exception as e: logger.exception(e) logger.error("Error converting to JSON: {}".format(e)) return job_dict def start(self): if hasattr(self, 'worker_object'): self.worker_object.start() def stop(self): if hasattr(self, 'worker_object'): self.worker_object.stop() def work_path(self): return os.path.dirname(self.output_path) def file_list(self): job_dir = os.path.dirname(self.output_path) return glob.glob(os.path.join(job_dir, '*')) def log_path(self): return os.path.join(os.path.dirname(self.input_path), self.name + '.log') @classmethod def generate_id(cls): return str(uuid.uuid4()).split('-')[0]