Add ability to add multiple jobs with same source file

This commit is contained in:
Brett Williams
2022-10-30 01:50:33 -07:00
parent a8a90d49f3
commit 5785989af4
4 changed files with 114 additions and 75 deletions

View File

@@ -110,7 +110,7 @@ def create_node_tree(all_server_data) -> Tree:
def create_jobs_table(all_server_data) -> Table:
table = Table("ID", "Project", "Output", "Renderer", Column(header="Priority", justify="center"),
table = Table("ID", "Name", "Output", "Renderer", Column(header="Priority", justify="center"),
Column(header="Status", justify="center"), Column(header="Time Elapsed", justify="right"),
Column(header="# Frames", justify="right"), "Client", show_lines=True,
box=box.HEAVY_HEAD)
@@ -135,8 +135,7 @@ def create_jobs_table(all_server_data) -> Table:
elapsed_time = job['worker'].get('time_elapsed', 'unknown')
# Project name
project_name = job_color + os.path.basename(job['worker']['input_path'])
project_name = project_name.replace(".", "[default].")
project_name = job_color + job['name']
if job_status == RenderStatus.RUNNING:
job_text = f"{job_color}[bold]Running - {float(job['worker']['percent_complete']) * 100:.1f}%"

View File

@@ -141,81 +141,124 @@ def add_job_handler():
try:
"""Create new job and add to server render queue"""
json_string = request.form.get('json', None)
uploaded_file = request.files.get('file', None)
if not json_string:
return 'missing json data', 400
print(json_string)
# handle uploaded files
uploaded_file_local_path = None
job_dir = None
if uploaded_file and uploaded_file.filename:
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
new_id = RenderJob.generate_id()
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
if not os.path.exists(job_dir):
os.makedirs(job_dir)
uploaded_file_local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(uploaded_file_local_path)
local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(local_path)
input_path = local_path
output_dir = os.path.join(job_dir, 'output')
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, os.path.basename(output_path))
# convert job input paths for uploaded files and add jobs
jobs_list = json.loads(json_string)
results = []
for job in jobs_list:
if uploaded_file_local_path:
job['input_path'] = uploaded_file_local_path
output_dir = os.path.join(job_dir, job.get('name', 'output'))
os.makedirs(output_dir, exist_ok=True)
job['output_path'] = os.path.join(output_dir, os.path.basename(job['output_path']))
remove_job_dir = len(jobs_list) == 1 # remove failed job dir for single file uploads only
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
results.append(add_result)
# local renders
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {input_path}")
try:
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return err_msg, 400
# client renders
elif client in RenderQueue.render_clients:
# see if host is available
if RenderQueue.is_client_available(client):
# call uploader on remote client
try:
logger.info(f"Uploading file {input_path} to client {client}")
job_data = request.json
response = post_job_to_server(input_path, job_data)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
remove_job_dir()
return 'Job rejected by client', 403
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
remove_job_dir()
return err_msg, 500
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
remove_job_dir()
return err_msg, 503
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
remove_job_dir()
return err_msg, 400
# return any errors from results list
for response in results:
if response.get('error', None):
if len(results) == 1:
return results, response.get('code', 500)
else:
return results, 400
return results
except Exception as e:
logger.exception(f"Unknown error adding job: {e}")
remove_job_dir()
return 'unknown error', 500
def add_job(job_params, remove_job_dir_on_failure=False):
def remove_job_dir():
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
logger.debug(f"Removing job dir: {job_dir}")
shutil.rmtree(job_dir)
name = job_params.get("name", None)
job_owner = job_params.get("owner", None)
renderer = job_params.get("renderer", None)
input_path = job_params.get("input_path", None)
output_path = job_params.get("output_path", None)
priority = int(job_params.get('priority', 2))
args = job_params.get('args', {})
client = job_params.get('client', RenderQueue.host_name)
force_start = job_params.get('force_start', False)
custom_id = None
job_dir = None
# check for minimum render requirements
if None in [renderer, input_path, output_path]:
err_msg = 'Cannot add job: Missing required parameters'
logger.error(err_msg)
return {'error': err_msg, 'code': 400}
# local renders
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json_safe_copy()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
# client renders
elif client in RenderQueue.render_clients:
# see if host is available
if RenderQueue.is_client_available(client):
# call uploader on remote client
try:
logger.info(f"Uploading file {input_path} to client {client}")
job_data = request.json
response = post_job_to_server(input_path, job_data, client)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
remove_job_dir()
return {'error': "Job rejected by client", 'code': 400}
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 500}
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 503}
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
@server.get('/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
@@ -269,10 +312,11 @@ def upload_file_page():
supported_renderers=RenderWorkerFactory.supported_renderers())
def post_job_to_server(input_path, job_json, client, server_port=8080):
#todo: move this to a helper file
def post_job_to_server(input_path, job_list, client, server_port=8080):
# Pack job data and submit to server
job_files = {'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'),
'json': (None, json.dumps(job_json), 'application/json')}
'json': (None, json.dumps(job_list), 'application/json')}
req = requests.post(f'http://{client}:{server_port}/add_job', files=job_files)
return req

View File

@@ -12,7 +12,8 @@ logger = logging.getLogger()
class RenderJob:
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, custom_id=None):
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None,
custom_id=None, name=None):
self.id = custom_id or self.generate_id()
self.owner = owner
self.priority = priority
@@ -21,22 +22,17 @@ class RenderJob:
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = renderer
self.name = os.path.basename(input_path) + '_' + self.date_created.isoformat()
self.name = name or os.path.basename(input_path) + '_' + self.date_created.isoformat()
self.archived = False
self.worker = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
self.worker.log_path = os.path.join(os.path.dirname(input_path), os.path.basename(input_path) + '.log')
self.worker.log_path = os.path.join(os.path.dirname(input_path), self.name + '.log')
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.worker.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.worker.status
def file_hash(self):
return hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest()

View File

@@ -128,7 +128,7 @@ class RenderQueue:
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [json.loads(j.json()) for j in cls.job_queue],
'jobs': [j.json_safe_copy() for j in cls.job_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f: