Cleanup dependencies on old attributes. Remove scheduled_job.py.

This commit is contained in:
Brett Williams
2023-05-30 22:53:37 -05:00
parent 6533481391
commit 130bdbb3b1
2 changed files with 44 additions and 188 deletions

View File

@@ -5,6 +5,7 @@ import os
import subprocess import subprocess
import threading import threading
import json import json
import glob
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
@@ -37,7 +38,7 @@ def string_to_status(string):
class BaseRenderWorker(Base): class BaseRenderWorker(Base):
__tablename__ = 'render_workers' __tablename__ = 'render_workers'
id = Column(Integer, primary_key=True) id = Column(String, primary_key=True)
input_path = Column(String) input_path = Column(String)
output_path = Column(String) output_path = Column(String)
date_created = Column(DateTime) date_created = Column(DateTime)
@@ -46,6 +47,7 @@ class BaseRenderWorker(Base):
renderer = Column(String) renderer = Column(String)
renderer_version = Column(String) renderer_version = Column(String)
priority = Column(Integer) priority = Column(Integer)
total_frames = Column(Integer)
owner = Column(String) owner = Column(String)
client = Column(String) client = Column(String)
name = Column(String) name = Column(String)
@@ -65,7 +67,12 @@ class BaseRenderWorker(Base):
if not self.engine: if not self.engine:
raise NotImplementedError("Engine not defined") raise NotImplementedError("Engine not defined")
def generate_id():
import uuid
return str(uuid.uuid4()).split('-')[0]
# Essential Info # Essential Info
self.id = generate_id()
self.input_path = input_path self.input_path = input_path
self.output_path = output_path self.output_path = output_path
self.args = args or {} self.args = args or {}
@@ -82,7 +89,6 @@ class BaseRenderWorker(Base):
self.current_frame = 0 self.current_frame = 0
# Logging # Logging
self.log_path = None
self.start_time = None self.start_time = None
self.end_time = None self.end_time = None
@@ -139,6 +145,11 @@ class BaseRenderWorker(Base):
def generate_worker_subprocess(self): def generate_worker_subprocess(self):
raise NotImplementedError("generate_worker_subprocess not implemented") raise NotImplementedError("generate_worker_subprocess not implemented")
def log_path(self):
filename = (self.name or os.path.basename(self.input_path)) + '_' + \
self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log'
return os.path.join(os.path.dirname(self.input_path), filename)
def start(self): def start(self):
if not os.path.exists(self.input_path): if not os.path.exists(self.input_path):
@@ -160,17 +171,9 @@ class BaseRenderWorker(Base):
self.__thread.start() self.__thread.start()
def run(self): def run(self):
# Setup logging # Setup logging
try: log_dir = os.path.dirname(self.log_path())
if not self.log_path: os.makedirs(log_dir, exist_ok=True)
log_dir = os.path.join(os.path.dirname(self.input_path), 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log'
logger.info('Logs saved in {}'.format(self.log_path))
except Exception as e:
logger.error("Error setting up logging: {}".format(e))
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
@@ -184,7 +187,7 @@ class BaseRenderWorker(Base):
universal_newlines=False) universal_newlines=False)
self.start_time = datetime.now() self.start_time = datetime.now()
with open(self.log_path, "a") as f: with open(self.log_path(), "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}") f"Render for {self.input_path}")
@@ -295,27 +298,41 @@ class BaseRenderWorker(Base):
elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown"
return elapsed_time_string return elapsed_time_string
def file_list(self):
job_dir = os.path.dirname(self.output_path)
file_list = glob.glob(os.path.join(job_dir, '*'))
file_list.sort()
return file_list
def json(self): def json(self):
worker_data = self.__dict__.copy() job_dict = {
worker_data['percent_complete'] = self.percent_complete() 'id': self.id,
worker_data['time_elapsed'] = self.time_elapsed() 'name': self.name,
worker_data['status'] = self.status.value 'input_path': self.input_path,
worker_data['renderer'] = self.renderer 'output_path': self.output_path,
worker_data['name'] = self.name 'priority': self.priority,
worker_data['renderer_version'] = self.renderer_version 'owner': self.owner,
keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict 'client': self.client,
for key in worker_data.keys(): 'date_created': self.date_created,
if key.startswith('_'): 'start_time': self.start_time,
keys_to_remove.append(key) 'status': self.status.value,
for key in keys_to_remove: 'time_elapsed': self.time_elapsed(),
worker_data.pop(key, None) 'file_hash': self.file_hash,
'percent_complete': self.percent_complete(),
'file_count': len(self.file_list()),
'renderer': self.renderer,
'renderer_version': self.renderer_version,
'errors': getattr(self, 'errors', None),
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None)
}
# convert to json and back to auto-convert dates to iso format # convert to json and back to auto-convert dates to iso format
def date_serializer(o): def date_serializer(o):
if isinstance(o, datetime): if isinstance(o, datetime):
return o.isoformat() return o.isoformat()
json_convert = json.dumps(worker_data, default=date_serializer) json_convert = json.dumps(job_dict, default=date_serializer)
worker_json = json.loads(json_convert) worker_json = json.loads(json_convert)
return worker_json return worker_json

View File

@@ -1,161 +0,0 @@
import glob
import hashlib
import json
import logging
import os
import uuid
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, JSON, event
from sqlalchemy.ext.declarative import declarative_base
from .render_workers.base_worker import RenderStatus
from .render_workers.worker_factory import RenderWorkerFactory
logger = logging.getLogger()
Base = declarative_base()
class ScheduledJob(Base):
__tablename__ = 'scheduled_jobs'
id = Column(String, primary_key=True)
renderer = Column(String)
input_path = Column(String)
output_path = Column(String)
priority = Column(Integer)
owner = Column(String)
client = Column(String)
notify = Column(String)
date_created = Column(DateTime)
scheduled_start = Column(DateTime)
name = Column(String)
file_hash = Column(String)
worker_data_store = Column(JSON)
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None,
custom_id=None, name=None):
self.id = custom_id or self.generate_id()
self.renderer = renderer
self.input_path = input_path
self.output_path = output_path
self.priority = priority
self.owner = owner
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.name = name or os.path.basename(input_path) + '_' + self.date_created.strftime("%Y.%m.%d_%H.%M.%S")
self.worker_object = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
self.worker_object.log_path = self.log_path()
self.worker_object.validate()
self.file_hash = None
if not self.file_hash and os.path.exists(input_path):
self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
def worker_data(self):
if hasattr(self, 'worker_object'):
fetched = self.worker_object.json()
if fetched != self.worker_data_store:
self.worker_data_store = fetched
return self.worker_data_store
@staticmethod
def before_insert(mapper, connection, target):
logger.debug(f"Before insert: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@staticmethod
def before_update(mapper, connection, target):
logger.debug(f"Before update: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@classmethod
def register_user_events(cls):
event.listen(cls, 'before_insert', cls.before_insert)
event.listen(cls, 'before_update', cls.before_update)
def render_status(self):
try:
worker_status = RenderStatus(self.worker_data()['status'])
if hasattr(self, 'worker_object'):
if self.scheduled_start and worker_status == RenderStatus.NOT_STARTED:
worker_status = RenderStatus.SCHEDULED
else:
if worker_status == RenderStatus.RUNNING:
worker_status = RenderStatus.ERROR
elif worker_status == RenderStatus.NOT_STARTED:
worker_status = RenderStatus.CANCELLED
except Exception as e:
logger.error(f"Exception fetching render status: {e}")
worker_status = RenderStatus.UNDEFINED
return worker_status
def json(self):
"""Converts RenderJob into JSON-friendly dict"""
job_dict = None
try:
worker_data = self.worker_data()
job_dict = {
'id': self.id,
'name': self.name,
'input_path': self.input_path,
'output_path': self.output_path,
'priority': self.priority,
'owner': self.owner,
'client': self.client,
'notify': self.notify,
'date_created': self.date_created,
'scheduled_start': self.scheduled_start,
'start_time': worker_data.get('start_time', None),
'status': self.render_status().value,
'time_elapsed': worker_data.get('time_elapsed', None),
'file_hash': self.file_hash,
'percent_complete': worker_data.get('percent_complete', None),
'file_count': len(self.file_list()),
'renderer': self.renderer,
'renderer_version': worker_data.get('renderer_version', None),
'errors': worker_data.get('errors', []),
'total_frames': worker_data.get('total_frames', -1),
'last_output': worker_data.get('last_output', None)
}
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(job_dict, default=date_serializer)
job_dict = json.loads(json_convert)
except Exception as e:
logger.exception(e)
logger.error("Error converting to JSON: {}".format(e))
return job_dict
def start(self):
if hasattr(self, 'worker_object'):
self.worker_object.start()
def stop(self):
if hasattr(self, 'worker_object'):
self.worker_object.stop()
def work_path(self):
return os.path.dirname(self.output_path)
def file_list(self):
job_dir = os.path.dirname(self.output_path)
file_list = glob.glob(os.path.join(job_dir, '*'))
file_list.sort()
return file_list
def log_path(self):
return os.path.join(os.path.dirname(self.input_path), self.name + '.log')
@classmethod
def generate_id(cls):
return str(uuid.uuid4()).split('-')[0]