Combine RenderWorker creation into RenderJob creation. Misc cleanup.

This commit is contained in:
Brett Williams
2022-10-28 09:58:32 -07:00
parent 39d6e95e9a
commit 37f91c6f8a
10 changed files with 108 additions and 396 deletions

View File

@@ -48,7 +48,7 @@ def get_job_status(job_id):
def get_file_list(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
job_dir = os.path.dirname(found_job.render.output_path)
job_dir = os.path.dirname(found_job.worker.output_path)
return os.listdir(job_dir)
else:
return f'Cannot find job with ID {job_id}', 400
@@ -67,9 +67,9 @@ def download_all(job_id):
found_job = RenderQueue.job_with_id(job_id)
if found_job:
output_dir = os.path.dirname(found_job.render.output_path)
output_dir = os.path.dirname(found_job.worker.output_path)
if os.path.exists(output_dir):
zip_filename = pathlib.Path(found_job.render.input_path).stem + '.zip'
zip_filename = pathlib.Path(found_job.worker.input_path).stem + '.zip'
with ZipFile(zip_filename, 'w') as zipObj:
for f in os.listdir(output_dir):
zipObj.write(filename=os.path.join(output_dir, f),
@@ -187,19 +187,16 @@ def add_job():
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {input_path}")
try:
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
render_job.log_path = os.path.join(os.path.dirname(input_path), os.path.basename(input_path) + '.log')
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return err_msg, 400
new_job = RenderJob(render_job, priority=priority, owner=job_owner, custom_id=custom_id)
RenderQueue.add_to_render_queue(new_job, force_start=force_start)
return new_job.json()
# client renders
elif client in RenderQueue.render_clients:

View File

@@ -1,40 +1,46 @@
import hashlib
import json
import logging
import os
import uuid
from datetime import datetime
from utilities.render_worker import RenderStatus
from utilities.render_worker import RenderStatus, RenderWorkerFactory
logger = logging.getLogger()
class RenderJob:
def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None):
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, custom_id=None):
self.id = custom_id or self.generate_id()
self.owner = owner
self.render = render
self.priority = priority
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = render.renderer
self.name = os.path.basename(render.input_path) + '_' + self.date_created.isoformat()
self.renderer = renderer
self.name = os.path.basename(input_path) + '_' + self.date_created.isoformat()
self.archived = False
self.worker = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
self.worker.log_path = os.path.join(os.path.dirname(input_path), os.path.basename(input_path) + '.log')
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED:
if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.render.status
return self.worker.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
def file_hash(self):
return hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest()
def json(self):
"""Converts RenderJob into JSON format"""
import numbers
@@ -48,20 +54,22 @@ class RenderJob:
try:
d = self.__dict__.copy()
d['status'] = self.render_status().value
d['render'] = self.render.__dict__.copy()
d['file_hash'] = self.file_hash if isinstance(self.file_hash, str) else self.file_hash()
d['worker'] = self.worker.__dict__.copy()
for key in ['thread', 'process']: # remove unwanted keys from JSON
d['render'].pop(key, None)
d['render']['status'] = d['status']
d['worker'].pop(key, None)
d['worker']['status'] = d['status']
# jobs from current_session generate percent completed
# jobs after loading server pull in a saved value. Have to check if callable object or not
percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \
else self.render.percent_complete()
d['render']['percent_complete'] = percent_complete
percent_complete = self.worker.percent_complete if isinstance(self.worker.percent_complete, numbers.Number) \
else self.worker.percent_complete()
d['worker']['percent_complete'] = percent_complete
json_string = json.dumps(d, default=date_serializer)
except Exception as e:
logger.exception(e)
logger.error("Error converting to JSON: {}".format(e))
return json_string

View File

@@ -8,7 +8,7 @@ import psutil
import requests
from lib.render_job import RenderJob
from utilities.render_worker import RenderWorkerFactory, RenderStatus
from utilities.render_worker import RenderStatus
logger = logging.getLogger()
@@ -34,7 +34,7 @@ class RenderQueue:
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker))
render_job.client = cls.host_name
cls.job_queue.append(render_job)
if force_start:
@@ -91,36 +91,34 @@ class RenderQueue:
for job in saved_state.get('jobs', []):
# Identify renderer type and recreate Renderer object
job_render_object = RenderWorkerFactory.create_worker(job['renderer'],
input_path=job['render']['input_path'],
output_path=job['render']['output_path'])
render_job = RenderJob(renderer=job['renderer'], input_path=job['worker']['input_path'],
output_path=job['worker']['output_path'], args=job['worker']['args'],
priority=job['priority'], client=job['client'])
# Load Renderer values
for key, val in job['render'].items():
# Load Worker values
for key, val in job['worker'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
job_render_object.__dict__[key] = datetime.fromisoformat(val)
render_job.worker.__dict__[key] = datetime.fromisoformat(val)
else:
job_render_object.__dict__[key] = val
render_job.worker.__dict__[key] = val
job_render_object.status = RenderStatus[job['status'].upper()]
job.pop('render', None)
render_job.worker.status = RenderStatus[job['status'].upper()]
job.pop('worker', None)
# Create RenderJob with re-created Renderer object
new_job = RenderJob(job_render_object, job['priority'], job['client'])
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
new_job.__dict__[key] = datetime.fromisoformat(val)
render_job.__dict__[key] = datetime.fromisoformat(val)
else:
new_job.__dict__[key] = val
new_job.__delattr__('status')
render_job.__dict__[key] = val
render_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if new_job.render_status() == RenderStatus.RUNNING:
new_job.render.status = RenderStatus.CANCELLED
if render_job.render_status() == RenderStatus.RUNNING:
render_job.worker.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.job_queue.append(new_job)
cls.job_queue.append(render_job)
cls.last_saved_counts = cls.job_counts()
@@ -147,7 +145,7 @@ class RenderQueue:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.render.renderer
renderer = job.worker.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
@@ -167,21 +165,21 @@ class RenderQueue:
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.render.start()
job.worker.start()
@classmethod
def cancel_job(cls, job):
logger.info('Cancelling job ID: {}'.format(job.id))
if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]:
job.render.stop()
job.render.status = RenderStatus.CANCELLED
job.worker.stop()
job.worker.status = RenderStatus.CANCELLED
return True
return False
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.render.renderer for x in cls.running_jobs()]
all_instances = [x.worker.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
@@ -217,24 +215,29 @@ class RenderQueue:
@classmethod
def register_client(cls, hostname):
#todo: check to make sure not adding ourselves
err_msg = None
success = False
if hostname == cls.host_name:
err_msg = "Cannot register same hostname as server"
elif hostname in cls.render_clients:
err_msg = f"Client '{hostname}' already registered"
else:
try:
response = requests.get(f"http://{hostname}:8080/status", timeout=3)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
cls.save_state()
else:
err_msg = f'Response from server not ok: {response.text}'
except requests.ConnectionError as e:
err_msg = f"Cannot connect to client at hostname: {hostname}"
if hostname in cls.render_clients:
logger.warning(f"Client '{hostname}' already registered")
return success
try:
response = requests.get(f"http://{hostname}:8080/status", timeout=3)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
success = True
cls.save_state()
except requests.ConnectionError as e:
logger.error(f"Cannot connect to client at hostname: {hostname}")
return success
if err_msg:
logger.warning(err_msg)
return err_msg, 400
else:
return 'success'
@classmethod
def unregister_client(cls, hostname):