10 Commits

Author SHA1 Message Date
Brett Williams
a464910426 Always add render engine to Blender args 2024-08-08 23:20:38 -05:00
Brett Williams
a4ff36ac56 Add render engines to system_info 2024-08-08 23:13:31 -05:00
51a5a63944 Use pubsub messages instead of a background thread to process changes (#92)
* Use pubsub messages instead of a background thread to process changes to the RenderQueue

* Misc logging improvements
2024-08-08 23:01:26 -05:00
3600eeb21b Refactor: Move all initialization logic out of api_server and into init (#91)
* Zeroconf logging improvements

* Ignore RuntimeErrors in background threads - Prevents issues during shutdown

* Migrate start up code from api_server.py to init.py

* Add error handlers to the API server to handle detached instances

* Integrate RenderQueue eval loop into RenderQueue object

* Silently catch RuntimeErrors on evaluate_queue

* Stop background queue updates in prepare_for_shutdown
2024-08-08 04:47:22 -05:00
6afb6e65a6 Integrate watchdog into render worker (#88)
* Add a watchdog to base_worker

* Logging cleanup

* Prevent multiple watchdogs from running if render process restarts

* Add process timeout parameter to Config

* Refactor

* Add error handling to process output parsing

* Fix issue where start_time was not getting set consistently
2024-08-06 10:48:24 -05:00
Brett Williams
90d5e9b7af Misc logging cleanup 2024-08-05 10:57:56 -05:00
4df41a2079 Download frames from subjobs as frames are completed (#87)
* Add a frame complete notification to BaseWorker and distributed_job_manager.py

* Add API to download individual files to API server and ServerProxy

* Rename subjob notification API and add download_missing_frames_from_subjob

* Subjobs will now notify parent when a frame is complete

* Fix missed rename

* Add some misc logging

* Better error handling

* Fix frame download file path issue

* Download missing frames at job completion and misc cleanup

* Misc cleanup

* Code cleanup
2024-08-04 21:30:10 -05:00
1cdb7810bf New PreviewManager to handle generating previews asynchronously (#86)
* Add PreviewManager

* Refactoring and better error handling

* Integrate PreviewManager into api_server.py

* Integrate PreviewManager into distributed_job_manager.py

* Add method to preview_manager.py to delete previews and integrate it into api_server

* Misc logging improvements

* Misc code cleanup

* Replace existing preview on job completion - Minor code fixes
2024-08-04 16:45:46 -05:00
Brett Williams
21011e47ca Fix issue where tests would never complete correctly 2024-08-04 11:48:36 -05:00
Brett Williams
86977b9d6d Fix issue where custom job name was being ignored 2024-08-04 11:47:56 -05:00
20 changed files with 713 additions and 334 deletions

View File

@@ -3,7 +3,7 @@ update_engines_on_launch: true
max_content_path: 100000000 max_content_path: 100000000
server_log_level: info server_log_level: info
log_buffer_length: 250 log_buffer_length: 250
subjob_connection_timeout: 120 worker_process_timeout: 120
flask_log_level: error flask_log_level: error
flask_debug_enable: false flask_debug_enable: false
queue_eval_seconds: 1 queue_eval_seconds: 1

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from src.api.api_server import start_server from init import run
if __name__ == '__main__': if __name__ == '__main__':
start_server() run(server_only=True)

View File

@@ -49,12 +49,11 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory):
raise ValueError(f"Error downloading file from URL: {project_url}") raise ValueError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path): elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path) referred_name = os.path.basename(local_path)
else: else:
raise ValueError("Cannot find any valid project paths") raise ValueError("Cannot find any valid project paths")
# Prepare the local filepath # Prepare the local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_') cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-')
job_dir = os.path.join(upload_directory, '-'.join( job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True) os.makedirs(job_dir, exist_ok=True)

View File

@@ -2,14 +2,12 @@
import concurrent.futures import concurrent.futures
import json import json
import logging import logging
import multiprocessing
import os import os
import pathlib import pathlib
import shutil import shutil
import socket import socket
import ssl import ssl
import tempfile import tempfile
import threading
import time import time
from datetime import datetime from datetime import datetime
from zipfile import ZipFile from zipfile import ZipFile
@@ -17,19 +15,19 @@ from zipfile import ZipFile
import psutil import psutil
import yaml import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
from sqlalchemy.orm.exc import DetachedInstanceError
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
from src.api.serverproxy_manager import ServerProxyManager from src.api.preview_manager import PreviewManager
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
from src.utilities.config import Config from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
current_system_os_version, num_to_alphanumeric current_system_os_version, num_to_alphanumeric
from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.zeroconf_server import ZeroconfServer
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
logger = logging.getLogger() logger = logging.getLogger()
server = Flask(__name__) server = Flask(__name__)
@@ -39,6 +37,29 @@ categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED
RenderStatus.COMPLETED, RenderStatus.CANCELLED] RenderStatus.COMPLETED, RenderStatus.CANCELLED]
# -- Error Handlers --
@server.errorhandler(JobNotFoundError)
def handle_job_not_found(job_error):
return str(job_error), 400
@server.errorhandler(DetachedInstanceError)
def handle_detached_instance(error):
# logger.debug(f"detached instance: {error}")
return "Unavailable", 503
@server.errorhandler(Exception)
def handle_general_error(general_error):
err_msg = f"Server error: {general_error}"
logger.error(err_msg)
return err_msg, 500
# -- Jobs --
def sorted_jobs(all_jobs, sort_by_date=True): def sorted_jobs(all_jobs, sort_by_date=True):
if not sort_by_date: if not sort_by_date:
sorted_job_list = [] sorted_job_list = []
@@ -60,9 +81,11 @@ def jobs_json():
job_cache_int = int(json.dumps(all_jobs).__hash__()) job_cache_int = int(json.dumps(all_jobs).__hash__())
job_cache_token = num_to_alphanumeric(job_cache_int) job_cache_token = num_to_alphanumeric(job_cache_int)
return {'jobs': all_jobs, 'token': job_cache_token} return {'jobs': all_jobs, 'token': job_cache_token}
except DetachedInstanceError as e:
raise e
except Exception as e: except Exception as e:
logger.exception(f"Exception fetching jobs_json: {e}") logger.error(f"Error fetching jobs_json: {e}")
return {}, 500 raise e
@server.get('/api/jobs_long_poll') @server.get('/api/jobs_long_poll')
@@ -78,50 +101,40 @@ def long_polling_jobs():
if time.time() - start_time > 30: if time.time() - start_time > 30:
return {}, 204 return {}, 204
time.sleep(1) time.sleep(1)
except DetachedInstanceError as e:
raise e
except Exception as e: except Exception as e:
logger.exception(f"Exception fetching long_polling_jobs: {e}") logger.error(f"Error fetching long_polling_jobs: {e}")
return {}, 500 raise e
@server.route('/api/job/<job_id>/thumbnail') @server.route('/api/job/<job_id>/thumbnail')
def job_thumbnail(job_id): def job_thumbnail(job_id):
try:
big_thumb = request.args.get('size', False) == "big" big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False) video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=True) found_job = RenderQueue.job_with_id(job_id, none_ok=False)
if found_job:
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True) # trigger a thumbnail update - just in case
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') previews = PreviewManager.get_previews_for_job(found_job)
big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4') all_previews_list = previews.get('output', previews.get('input', []))
big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg')
# generate regular thumb if it doesn't exist video_previews = [x for x in all_previews_list if x['kind'] == 'video']
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \ image_previews = [x for x in all_previews_list if x['kind'] == 'image']
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: filtered_list = video_previews if video_previews and video_ok else image_previews
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
# generate big thumb if it doesn't exist # todo - sort by size or other metrics here
if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \ if filtered_list:
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: preview_to_send = filtered_list[0]
generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800) mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
# generated videos return send_file(preview_to_send['filename'], mimetype=file_mime_type)
if video_ok: except Exception as e:
if big_thumb and os.path.exists(big_video_path) and not os.path.exists( logger.error(f'Error getting thumbnail: {e}')
big_video_path + '_IN-PROGRESS'): return f'Error getting thumbnail: {e}', 500
return send_file(big_video_path, mimetype="video/mp4") return "No thumbnail available", 404
elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
return send_file(thumb_video_path, mimetype="video/mp4")
# Generated thumbs
if big_thumb and os.path.exists(big_image_path):
return send_file(big_image_path, mimetype='image/jpeg')
elif os.path.exists(thumb_image_path):
return send_file(thumb_image_path, mimetype='image/jpeg')
return found_job.status.value, 200
return found_job.status.value, 404
# Get job file routing # Get job file routing
@@ -146,22 +159,17 @@ def filtered_jobs_json(status_val):
return f'Cannot find jobs with status {status_val}', 400 return f'Cannot find jobs with status {status_val}', 400
@server.post('/api/job/<job_id>/notify_parent_of_status_change') @server.post('/api/job/<job_id>/send_subjob_update_notification')
def subjob_status_change(job_id): def subjob_update_notification(job_id):
try: try:
subjob_details = request.json subjob_details = request.json
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}") logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
return Response(status=200) return Response(status=200)
except JobNotFoundError: except JobNotFoundError:
return "Job not found", 404 return "Job not found", 404
@server.errorhandler(JobNotFoundError)
def handle_job_not_found(job_error):
return f'Cannot find job with ID {job_error.job_id}', 400
@server.get('/api/job/<job_id>') @server.get('/api/job/<job_id>')
def get_job_status(job_id): def get_job_status(job_id):
return RenderQueue.job_with_id(job_id).json() return RenderQueue.job_with_id(job_id).json()
@@ -180,7 +188,22 @@ def get_job_logs(job_id):
@server.get('/api/job/<job_id>/file_list') @server.get('/api/job/<job_id>/file_list')
def get_file_list(job_id): def get_file_list(job_id):
return RenderQueue.job_with_id(job_id).file_list() return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()]
@server.route('/api/job/<job_id>/download')
def download_file(job_id):
requested_filename = request.args.get('filename')
if not requested_filename:
return 'Filename required', 400
found_job = RenderQueue.job_with_id(job_id)
for job_filename in found_job.file_list():
if os.path.basename(job_filename).lower() == requested_filename.lower():
return send_file(job_filename, as_attachment=True, )
return f"File '{requested_filename}' not found", 404
@server.route('/api/job/<job_id>/download_all') @server.route('/api/job/<job_id>/download_all')
@@ -305,14 +328,10 @@ def delete_job(job_id):
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
# Remove any thumbnails try:
for filename in os.listdir(server.config['THUMBS_FOLDER']): PreviewManager.delete_previews_for_job(found_job)
if job_id in filename: except Exception as e:
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename)) logger.error(f"Error deleting previews for {found_job}: {e}")
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
if os.path.exists(thumb_path):
os.remove(thumb_path)
# See if we own the project_dir (i.e. was it uploaded) # See if we own the project_dir (i.e. was it uploaded)
project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
@@ -489,60 +508,24 @@ def get_disk_benchmark():
return {'write_speed': results[0], 'read_speed': results[-1]} return {'write_speed': results[0], 'read_speed': results[-1]}
def start_server(): def start_server(hostname=None):
def eval_loop(delay_sec=1):
while True:
RenderQueue.evaluate_queue()
time.sleep(delay_sec)
# get hostname # get hostname
if not hostname:
local_hostname = socket.gethostname() local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings # load flask settings
server.config['HOSTNAME'] = local_hostname server.config['HOSTNAME'] = hostname
server.config['PORT'] = int(Config.port_number) server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs'))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs server.config['enable_split_jobs'] = Config.enable_split_jobs
# Setup directory for saving engines to
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
# Debug info
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# disable most Flask logging # disable most Flask logging
flask_log = logging.getLogger('werkzeug') flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(Config.flask_log_level.upper()) flask_log.setLevel(Config.flask_log_level.upper())
# check for updates for render engines if configured or on first launch logger.debug('Starting API server')
if Config.update_engines_on_launch or not EngineManager.get_engines(): server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False,
EngineManager.update_all_engines() threaded=True)
# Set up the RenderQueue object
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
thread.start()
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
try:
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
use_reloader=False, threaded=True)
finally:
RenderQueue.save_state()
ZeroconfServer.stop()

113
src/api/preview_manager.py Normal file
View File

@@ -0,0 +1,113 @@
import logging
import os
import subprocess
import threading
from pathlib import Path
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
logger = logging.getLogger()
supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
supported_image_formats = ['.jpg', '.png', '.exr', '.tif']
class PreviewManager:
storage_path = None
_running_jobs = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320):
# Determine best source file to use for thumbs
job_file_list = job.file_list()
source_files = job_file_list if job_file_list else [job.input_path]
preview_label = "output" if job_file_list else "input"
# filter by type
found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats]
found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats]
# check if we even have any valid files to work from
if source_files and not found_video_files and not found_image_files:
logger.warning(f"No valid image or video files found in files from job: {job}")
return
os.makedirs(cls.storage_path, exist_ok=True)
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
preview_video_path = base_path + '.mp4'
preview_image_path = base_path + '.jpg'
if replace_existing:
for x in [preview_image_path, preview_video_path]:
try:
os.remove(x)
except OSError:
pass
# Generate image previews
if (found_video_files or found_image_files) and not os.path.exists(preview_image_path):
try:
path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1]
logger.debug(f"Generating image preview for {path_of_source}")
save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width)
logger.debug(f"Successfully created image preview for {path_of_source}")
except Exception as e:
logger.error(f"Error generating image preview for {job}: {e}")
# Generate video previews
if found_video_files and not os.path.exists(preview_video_path):
try:
path_of_source = found_video_files[0]
logger.debug(f"Generating video preview for {path_of_source}")
generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width)
logger.debug(f"Successfully created video preview for {path_of_source}")
except subprocess.CalledProcessError as e:
logger.error(f"Error generating video preview for {job}: {e}")
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
job_thread = cls._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
else:
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
results = {}
try:
directory_path = Path(cls.storage_path)
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
for preview_filename in preview_files_for_job:
try:
pixel_width = str(preview_filename).split('-')[-1]
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
extension = os.path.splitext(preview_filename)[-1].lower()
kind = 'video' if extension in supported_video_formats else \
'image' if extension in supported_image_formats else 'unknown'
results[preview_label] = results.get(preview_label, [])
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
except IndexError: # ignore invalid filenames
pass
except FileNotFoundError:
pass
return results
@classmethod
def delete_previews_for_job(cls, job):
all_previews = cls.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:
try:
logger.debug(f"Removing preview: {preview['filename']}")
os.remove(preview['filename'])
except OSError as e:
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")

View File

@@ -171,19 +171,19 @@ class RenderServerProxy:
def get_all_engines(self): def get_all_engines(self):
return self.request_data('all_engines') return self.request_data('all_engines')
def notify_parent_of_status_change(self, parent_id, subjob): def send_subjob_update_notification(self, parent_id, subjob):
""" """
Notifies the parent job of a status change in a subjob. Notifies the parent job of an update in a subjob.
Args: Args:
parent_id (str): The ID of the parent job. parent_id (str): The ID of the parent job.
subjob (Job): The subjob that has changed status. subjob (Job): The subjob that has updated.
Returns: Returns:
Response: The response from the server. Response: The response from the server.
""" """
hostname = LOOPBACK if self.is_localhost else self.hostname hostname = LOOPBACK if self.is_localhost else self.hostname
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification',
json=subjob.json()) json=subjob.json())
def post_job_to_server(self, file_path, job_list, callback=None): def post_job_to_server(self, file_path, job_list, callback=None):
@@ -232,19 +232,27 @@ class RenderServerProxy:
except Exception as e: except Exception as e:
logger.error(f"An error occurred: {e}") logger.error(f"An error occurred: {e}")
def get_job_files(self, job_id, save_path): def get_job_files_list(self, job_id):
return self.request_data(f"job/{job_id}/file_list")
def download_all_job_files(self, job_id, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all" url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
return self.download_file(url, filename=save_path) return self.__download_file_from_url(url, output_filepath=save_path)
def download_job_file(self, job_id, job_filename, save_path):
hostname = LOOPBACK if self.is_localhost else self.hostname
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}"
return self.__download_file_from_url(url, output_filepath=save_path)
@staticmethod @staticmethod
def download_file(url, filename): def __download_file_from_url(url, output_filepath):
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
r.raise_for_status() r.raise_for_status()
with open(filename, 'wb') as f: with open(output_filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): for chunk in r.iter_content(chunk_size=8192):
f.write(chunk) f.write(chunk)
return filename return output_filepath
# --- Renderer --- # # --- Renderer --- #

View File

@@ -10,9 +10,11 @@ import requests
from plyer import notification from plyer import notification
from pubsub import pub from pubsub import pub
from src.api.preview_manager import PreviewManager
from src.api.server_proxy import RenderServerProxy from src.api.server_proxy import RenderServerProxy
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.misc_helper import get_file_size_human from src.utilities.misc_helper import get_file_size_human
from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.status_utils import RenderStatus, string_to_status
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.zeroconf_server import ZeroconfServer
@@ -32,6 +34,43 @@ class DistributedJobManager:
This should be called once, typically during the initialization phase. This should be called once, typically during the initialization phase.
""" """
pub.subscribe(cls.__local_job_status_changed, 'status_change') pub.subscribe(cls.__local_job_status_changed, 'status_change')
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
@classmethod
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
"""
Responds to the 'frame_complete' pubsub message for local jobs.
Parameters:
job_id (str): The ID of the job that has changed status.
old_status (str): The previous status of the job.
new_status (str): The new (current) status of the job.
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
"""
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if not render_job: # ignore jobs not in the queue
return
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
replace_existing_previews = (frame_number % update_interval) == 0
cls.__job_update_shared(render_job, replace_existing_previews)
@classmethod
def __job_update_shared(cls, render_job, replace_existing_previews=False):
# update previews
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
# notify parent to allow individual frames to be copied instead of waiting until the end
if render_job.parent:
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
try:
logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}')
RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job)
except Exception as e:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod @classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status): def __local_job_status_changed(cls, job_id, old_status, new_status):
@@ -52,15 +91,15 @@ class DistributedJobManager:
return return
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}") logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
if render_job.parent: # If local job is a subjob from a remote server
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
# handle cancelling all the children cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
# Handle children
if render_job.children:
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
for child in render_job.children: for child in render_job.children:
child_id, hostname = child.split('@') child_id, child_hostname = child.split('@')
RenderServerProxy(hostname).cancel_job(child_id, confirm=True) RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
# UI Notifications # UI Notifications
try: try:
@@ -97,8 +136,7 @@ class DistributedJobManager:
""" """
Creates render jobs. Creates render jobs.
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render This method job data and a local path to a loaded project. It creates and returns new a render job.
job for each job data in the list and appends the result to a list. The list of results is then returned.
Args: Args:
job_data (dict): Job data. job_data (dict): Job data.
@@ -134,6 +172,7 @@ class DistributedJobManager:
worker.priority = int(job_data.get('priority', worker.priority)) worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname() worker.hostname = socket.gethostname()
# determine if we can / should split the job # determine if we can / should split the job
@@ -143,6 +182,7 @@ class DistributedJobManager:
logger.debug("Not splitting into subjobs") logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
PreviewManager.update_previews_for_job(worker)
return worker return worker
@@ -151,9 +191,9 @@ class DistributedJobManager:
# -------------------------------------------- # --------------------------------------------
@classmethod @classmethod
def handle_subjob_status_change(cls, local_job, subjob_data): def handle_subjob_update_notification(cls, local_job, subjob_data):
""" """
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args: Args:
local_job (BaseRenderWorker): The local parent job worker. local_job (BaseRenderWorker): The local parent job worker.
@@ -162,27 +202,40 @@ class DistributedJobManager:
subjob_status = string_to_status(subjob_data['status']) subjob_status = string_to_status(subjob_data['status'])
subjob_id = subjob_data['id'] subjob_id = subjob_data['id']
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if subjob_hostname = subjob_data['hostname']
hostname.split('@')[0] == subjob_id), None) subjob_key = f'{subjob_id}@{subjob_hostname}'
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data old_status = local_job.children.get(subjob_key, {}).get('status')
local_job.children[subjob_key] = subjob_data
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}" logname = f"<Parent: {local_job.id} | Child: {subjob_key}>"
if old_status != subjob_status.value:
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}") logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
# Download complete or partial render jobs cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
subjob_data['file_count']:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
# todo: handle error
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
# todo: determine missing frames and schedule new job
pass
@staticmethod @staticmethod
def download_from_subjob(local_job, subjob_id, subjob_hostname): def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
try:
local_files = [os.path.basename(x) for x in local_job.file_list()]
subjob_proxy = RenderServerProxy(subjob_hostname)
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
for subjob_filename in subjob_files:
if subjob_filename not in local_files:
try:
logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}")
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
save_path=local_save_path)
logger.debug(f'Downloaded successfully - {local_save_path}')
except Exception as e:
logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}")
except Exception as e:
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
@staticmethod
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
""" """
Downloads and extracts files from a completed subjob on a remote server. Downloads and extracts files from a completed subjob on a remote server.
@@ -203,7 +256,7 @@ class DistributedJobManager:
try: try:
local_job.children[child_key]['download_status'] = 'working' local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost") logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e: except Exception as e:
logger.error(f"Error downloading files from remote server: {e}") logger.error(f"Error downloading files from remote server: {e}")
@@ -227,6 +280,7 @@ class DistributedJobManager:
@classmethod @classmethod
def wait_for_subjobs(cls, local_job): def wait_for_subjobs(cls, local_job):
# todo: rewrite this method
logger.debug(f"Waiting for subjobs for job {local_job}") logger.debug(f"Waiting for subjobs for job {local_job}")
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED] statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
@@ -266,10 +320,10 @@ class DistributedJobManager:
# Check if job is finished, but has not had files copied yet over yet # Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download: if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname) try:
if not download_result: cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
logger.error("Failed to download from subjob") except Exception as e:
# todo: error handling here logger.error(f"Error downloading missing frames from subjob: {e}")
# Any finished jobs not successfully downloaded at this point are skipped # Any finished jobs not successfully downloaded at this point are skipped
if local_job.children[child_key].get('download_status', None) is None and \ if local_job.children[child_key].get('download_status', None) is None and \

View File

@@ -1,5 +1,6 @@
import json import json
import re import re
from concurrent.futures import ThreadPoolExecutor
from src.engines.core.base_engine import * from src.engines.core.base_engine import *
from src.utilities.misc_helper import system_safe_path from src.utilities.misc_helper import system_safe_path
@@ -148,7 +149,13 @@ class Blender(BaseRenderEngine):
return options return options
def system_info(self): def system_info(self):
return {'render_devices': self.get_render_devices()} with ThreadPoolExecutor() as executor:
future_render_devices = executor.submit(self.get_render_devices)
future_engines = executor.submit(self.supported_render_engines)
render_devices = future_render_devices.result()
engines = future_engines.result()
return {'render_devices': render_devices, 'engines': engines}
def get_render_devices(self): def get_render_devices(self):
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py') script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')

View File

@@ -40,11 +40,13 @@ class BlenderRenderWorker(BaseRenderWorker):
if custom_camera: if custom_camera:
python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];" python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];"
# Setup Render Engines
self.args['engine'] = self.args.get('engine', 'CYCLES').upper() # set default render engine
# Configure Cycles
if self.args['engine'] == 'CYCLES':
# Set Render Device (gpu/cpu/any) # Set Render Device (gpu/cpu/any)
blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
if blender_engine == 'CYCLES':
render_device = self.args.get('render_device', 'any').lower() render_device = self.args.get('render_device', 'any').lower()
if render_device not in {'any', 'gpu', 'cpu'}: if render_device not in ['any', 'gpu', 'cpu']:
raise AttributeError(f"Invalid Cycles render device: {render_device}") raise AttributeError(f"Invalid Cycles render device: {render_device}")
use_gpu = render_device in {'any', 'gpu'} use_gpu = render_device in {'any', 'gpu'}
@@ -62,7 +64,10 @@ class BlenderRenderWorker(BaseRenderWorker):
# Export format # Export format
export_format = self.args.get('export_format', None) or 'JPEG' export_format = self.args.get('export_format', None) or 'JPEG'
path_without_ext = os.path.splitext(self.output_path)[0] + "_" main_part, ext = os.path.splitext(self.output_path)
# Remove the extension only if it is not composed entirely of digits
path_without_ext = main_part if not ext[1:].isdigit() else self.output_path
path_without_ext += "_"
cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format]) cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format])
# set frame range # set frame range
@@ -107,6 +112,7 @@ class BlenderRenderWorker(BaseRenderWorker):
output_file_number = output_filename_match.groups()[0] output_file_number = output_filename_match.groups()[0]
try: try:
self.current_frame = int(output_file_number) self.current_frame = int(output_file_number)
self._send_frame_complete_notification()
except ValueError: except ValueError:
pass pass
elif render_stats_match: elif render_stats_match:
@@ -115,15 +121,15 @@ class BlenderRenderWorker(BaseRenderWorker):
logger.info(f'Frame #{self.current_frame} - ' logger.info(f'Frame #{self.current_frame} - '
f'{frame_count} of {self.total_frames} completed in {time_completed} | ' f'{frame_count} of {self.total_frames} completed in {time_completed} | '
f'Total Elapsed Time: {datetime.now() - self.start_time}') f'Total Elapsed Time: {datetime.now() - self.start_time}')
else:
logger.debug(f'DEBUG: {line}')
else: else:
pass pass
# if len(line.strip()): # if len(line.strip()):
# logger.debug(line.strip()) # logger.debug(line.strip())
def percent_complete(self): def percent_complete(self):
if self.total_frames <= 1: if self.status == RenderStatus.COMPLETED:
return 1
elif self.total_frames <= 1:
return self.__frame_percent_complete return self.__frame_percent_complete
else: else:
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames

View File

@@ -5,6 +5,7 @@ import logging
import os import os
import subprocess import subprocess
import threading import threading
import time
from datetime import datetime from datetime import datetime
import psutil import psutil
@@ -94,10 +95,15 @@ class BaseRenderWorker(Base):
self.errors = [] self.errors = []
# Threads and processes # Threads and processes
self.__thread = threading.Thread(target=self.run, args=()) self.__thread = threading.Thread(target=self.__run, args=())
self.__thread.daemon = True self.__thread.daemon = True
self.__process = None self.__process = None
self.last_output = None self.last_output = None
self.__last_output_time = None
self.watchdog_timeout = 120
def __repr__(self):
return f"<Job id:{self.id} p{self.priority} {self.renderer}-{self.renderer_version} '{self.name}' status:{self.status.value}>"
@property @property
def total_frames(self): def total_frames(self):
@@ -121,19 +127,16 @@ class BaseRenderWorker(Base):
self._status = RenderStatus.CANCELLED.value self._status = RenderStatus.CANCELLED.value
return string_to_status(self._status) return string_to_status(self._status)
def validate(self): def _send_frame_complete_notification(self):
if not os.path.exists(self.input_path): pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame)
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self): def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts # Convert raw args from string if available and catch conflicts
generated_args = [str(x) for x in self.generate_worker_subprocess()] generated_args = [str(x) for x in self.generate_worker_subprocess()]
generated_args_flags = [x for x in generated_args if x.startswith('-')] generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)): if len(generated_args_flags) != len(set(generated_args_flags)):
msg = "Cannot generate subprocess - Multiple arg conflicts detected" msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}"
logger.error(msg) logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg) raise ValueError(msg)
return generated_args return generated_args
@@ -175,11 +178,12 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.RUNNING self.status = RenderStatus.RUNNING
self.start_time = datetime.now() self.start_time = datetime.now()
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
self.__thread.start() self.__thread.start()
def run(self): def __run(self):
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
# Setup logging # Setup logging
log_dir = os.path.dirname(self.log_path()) log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True) os.makedirs(log_dir, exist_ok=True)
@@ -209,48 +213,42 @@ class BaseRenderWorker(Base):
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}") logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
self.status = RenderStatus.RUNNING self.status = RenderStatus.RUNNING
# Start process and get updates return_code = self.__setup_and_run_process(f, subprocess_cmds)
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
f.flush()
os.fsync(f.fileno())
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
self.end_time = datetime.now() self.end_time = datetime.now()
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \
f"after {self.time_elapsed()}\n\n"
f.write(message)
# Teardown
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
message = f"{self.engine.name()} render ended with status '{self.status}' " \ message = f"{self.engine.name()} render ended with status '{self.status}' " \
f"after {self.time_elapsed()}" f"after {self.time_elapsed()}"
f.write(message) f.write(message)
return return
# if file output hasn't increased, return as error, otherwise restart process. # if file output hasn't increased, return as error, otherwise restart process.
if len(self.file_list()) <= initial_file_count: file_count_has_increased = len(self.file_list()) > initial_file_count
err_msg = f"File count has not increased. Count is still {len(self.file_list())}" if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code:
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
self.status = RenderStatus.ERROR
# Handle completed - All else counts as failed attempt
if (self.status == RenderStatus.COMPLETED) and not return_code:
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in " message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
f"{self.time_elapsed()}\n") f"{self.time_elapsed()}\n")
f.write(message) f.write(message)
break break
# Handle non-zero return codes if return_code:
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \ err_msg = f"{self.engine.name()} render failed with code {return_code}"
f"after {self.time_elapsed()}\n\n" logger.error(err_msg)
f.write(message) self.errors.append(err_msg)
self.errors.append(message)
# handle instances where renderer exits ok but doesnt generate files
if not return_code and not file_count_has_increased:
err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. "
f"Count is still {len(self.file_list())}")
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
# only count the attempt as failed if renderer creates no output - ignore error codes for now
if not file_count_has_increased:
failed_attempts += 1 failed_attempts += 1
if self.children: if self.children:
@@ -263,6 +261,65 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.COMPLETED self.status = RenderStatus.COMPLETED
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}") logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
def __setup_and_run_process(self, f, subprocess_cmds):
def watchdog():
logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout')
while self.__process.poll() is None:
time_since_last_update = time.time() - self.__last_output_time
if time_since_last_update > self.watchdog_timeout:
logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)")
self.__process.kill()
break
# logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}')
time.sleep(1)
logger.debug(f'Stopping process watchdog for {self}')
return_code = -1
watchdog_thread = threading.Thread(target=watchdog)
watchdog_thread.daemon = True
try:
# Start process and get updates
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
# Start watchdog
self.__last_output_time = time.time()
watchdog_thread.start()
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
self.last_output = c.strip()
self.__last_output_time = time.time()
try:
f.write(c)
f.flush()
os.fsync(f.fileno())
except Exception as e:
logger.error(f"Error saving log to disk: {e}")
try:
self._parse_stdout(c.strip())
except Exception as e:
logger.error(f'Error parsing stdout: {e}')
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
except Exception as e:
message = f'Uncaught error running render process: {e}'
f.write(message)
logger.exception(message)
self.__process.kill()
# let watchdog end before continuing - prevents multiple watchdogs running when process restarts
if watchdog_thread.is_alive():
watchdog_thread.join()
return return_code
def post_processing(self): def post_processing(self):
pass pass

View File

@@ -172,7 +172,7 @@ class EngineManager:
if background: if background:
return thread return thread
else:
thread.join() thread.join()
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
if not found_engine: if not found_engine:
@@ -204,13 +204,19 @@ class EngineManager:
def engine_update_task(engine_class): def engine_update_task(engine_class):
logger.debug(f"Checking for updates to {engine_class.name()}") logger.debug(f"Checking for updates to {engine_class.name()}")
latest_version = engine_class.downloader().find_most_recent_version() latest_version = engine_class.downloader().find_most_recent_version()
if latest_version:
logger.debug(f"Latest version of {engine_class.name()} available: {latest_version.get('version')}") if not latest_version:
if not cls.is_version_downloaded(engine_class.name(), latest_version.get('version')): logger.warning(f"Could not find most recent version of {engine.name()} to download")
logger.info(f"Downloading latest version of {engine_class.name()}...") return
cls.download_engine(engine=engine_class.name(), version=latest_version['version'], background=True)
else: version_num = latest_version.get('version')
logger.warning(f"Unable to get check for updates for {engine.name()}") if cls.is_version_downloaded(engine_class.name(), version_num):
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
return
# download the engine
logger.info(f"Downloading latest version of {engine_class.name()} ({version_num})...")
cls.download_engine(engine=engine_class.name(), version=version_num, background=True)
logger.info(f"Checking for updates for render engines...") logger.info(f"Checking for updates for render engines...")
threads = [] threads = []

View File

@@ -132,8 +132,8 @@ class FFMPEGDownloader(EngineDownloader):
system_os = system_os or current_system_os() system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu() cpu = cpu or current_system_cpu()
return cls.all_versions(system_os, cpu)[0] return cls.all_versions(system_os, cpu)[0]
except (IndexError, requests.exceptions.RequestException): except (IndexError, requests.exceptions.RequestException) as e:
logger.error(f"Cannot get most recent version of ffmpeg") logger.error(f"Cannot get most recent version of ffmpeg: {e}")
return {} return {}
@classmethod @classmethod

View File

@@ -1,22 +1,27 @@
''' app/init.py ''' ''' app/init.py '''
import logging import logging
import multiprocessing
import os import os
import socket
import sys import sys
import threading import threading
import time
from collections import deque from collections import deque
from PyQt6.QtCore import QObject, pyqtSignal
from PyQt6.QtWidgets import QApplication
from src.api.api_server import start_server from src.api.api_server import start_server
from src.api.preview_manager import PreviewManager
from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
from src.ui.main_window import MainWindow
from src.utilities.config import Config from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path from src.utilities.misc_helper import system_safe_path, current_system_cpu, current_system_os, current_system_os_version
from src.utilities.zeroconf_server import ZeroconfServer
logger = logging.getLogger()
def run() -> int: def run(server_only=False) -> int:
""" """
Initializes the application and runs it. Initializes the application and runs it.
@@ -24,44 +29,94 @@ def run() -> int:
int: The exit status code. int: The exit status code.
""" """
# setup logging
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper())
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Setup logging for console ui
buffer_handler = __setup_buffer_handler() if not server_only else None
logger.info(f"Starting Zordon Render Server")
return_code = 0
try: try:
# Load Config YAML # Load Config YAML
Config.setup_config_dir() Config.setup_config_dir()
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml'))) Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
# configure default paths
EngineManager.engines_path = system_safe_path( EngineManager.engines_path = system_safe_path(
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder), os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines'))) 'engines')))
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', os.makedirs(EngineManager.engines_path, exist_ok=True)
level=Config.server_log_level.upper()) PreviewManager.storage_path = system_safe_path(
os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
app: QApplication = QApplication(sys.argv) # Debug info
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# Start server in background # Set up the RenderQueue object
background_server = threading.Thread(target=start_server) RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder)))
background_server.daemon = True ServerProxyManager.subscribe_to_listener()
background_server.start() DistributedJobManager.subscribe_to_listener()
# Setup logging for console ui # check for updates for render engines if configured or on first launch
buffer_handler = BufferingHandler() if Config.update_engines_on_launch or not EngineManager.get_engines():
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter) EngineManager.update_all_engines()
logger = logging.getLogger()
logger.addHandler(buffer_handler)
window: MainWindow = MainWindow() # get hostname
window.buffer_handler = buffer_handler local_hostname = socket.gethostname()
window.show() local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
return_code = app.exec() # configure and start API server
api_server = threading.Thread(target=start_server, args=(local_hostname,))
api_server.daemon = True
api_server.start()
# start zeroconf server
ZeroconfServer.configure("_zordon._tcp.local.", local_hostname, Config.port_number)
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
'system_cpu_cores': multiprocessing.cpu_count(),
'system_os': current_system_os(),
'system_os_version': current_system_os_version()}
ZeroconfServer.start()
logger.info(f"Zordon Render Server started - Hostname: {local_hostname}")
RenderQueue.evaluation_inverval = Config.queue_eval_seconds
RenderQueue.start()
# start in gui or server only (cli) mode
logger.debug(f"Launching in {'server only' if server_only else 'GUI'} mode")
if server_only: # CLI only
api_server.join()
else: # GUI
return_code = __show_gui(buffer_handler)
except KeyboardInterrupt:
pass
except Exception as e: except Exception as e:
logging.error(f"Unhandled exception: {e}") logging.error(f"Unhandled exception: {e}")
return_code = 1 return_code = 1
finally: finally:
# shut down gracefully
logger.info(f"Zordon Render Server is preparing to shut down")
try:
RenderQueue.prepare_for_shutdown() RenderQueue.prepare_for_shutdown()
except Exception as e:
logger.exception(f"Exception during prepare for shutdown: {e}")
ZeroconfServer.stop()
logger.info(f"Zordon Render Server has shut down")
return sys.exit(return_code) return sys.exit(return_code)
class BufferingHandler(logging.Handler, QObject): def __setup_buffer_handler():
# lazy load GUI frameworks
from PyQt6.QtCore import QObject, pyqtSignal
class BufferingHandler(logging.Handler, QObject):
new_record = pyqtSignal(str) new_record = pyqtSignal(str)
def __init__(self, capacity=100): def __init__(self, capacity=100):
@@ -70,9 +125,34 @@ class BufferingHandler(logging.Handler, QObject):
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
def emit(self, record): def emit(self, record):
try:
msg = self.format(record) msg = self.format(record)
self.buffer.append(msg) # Add message to the buffer self.buffer.append(msg) # Add message to the buffer
self.new_record.emit(msg) # Emit signal self.new_record.emit(msg) # Emit signal
except RuntimeError:
pass
def get_buffer(self): def get_buffer(self):
return list(self.buffer) # Return a copy of the buffer return list(self.buffer) # Return a copy of the buffer
buffer_handler = BufferingHandler()
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
logger = logging.getLogger()
logger.addHandler(buffer_handler)
return buffer_handler
def __show_gui(buffer_handler):
# lazy load GUI frameworks
from PyQt6.QtWidgets import QApplication
# load application
app: QApplication = QApplication(sys.argv)
# configure main window
from src.ui.main_window import MainWindow
window: MainWindow = MainWindow()
window.buffer_handler = buffer_handler
window.show()
return app.exec()

View File

@@ -2,12 +2,13 @@ import logging
import os import os
from datetime import datetime from datetime import datetime
from pubsub import pub
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from src.utilities.status_utils import RenderStatus
from src.engines.engine_manager import EngineManager
from src.engines.core.base_worker import Base from src.engines.core.base_worker import Base
from src.utilities.status_utils import RenderStatus
logger = logging.getLogger() logger = logging.getLogger()
@@ -17,6 +18,9 @@ class JobNotFoundError(Exception):
super().__init__(args) super().__init__(args)
self.job_id = job_id self.job_id = job_id
def __str__(self):
return f"Cannot find job with ID: {self.job_id}"
class RenderQueue: class RenderQueue:
engine = None engine = None
@@ -24,18 +28,46 @@ class RenderQueue:
job_queue = [] job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts = {} last_saved_counts = {}
is_running = False
__eval_thread = None
evaluation_inverval = 1
def __init__(self): # --------------------------------------------
pass # Start / Stop Background Updates
# --------------------------------------------
@classmethod
def start(cls):
logger.debug("Starting render queue updates")
cls.is_running = True
cls.evaluate_queue()
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue()
@classmethod
def stop(cls):
logger.debug("Stopping render queue updates")
cls.is_running = False
# --------------------------------------------
# Queue Management
# --------------------------------------------
@classmethod @classmethod
def add_to_render_queue(cls, render_job, force_start=False): def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job) cls.job_queue.append(render_job)
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job) cls.start_job(render_job)
cls.session.add(render_job) cls.session.add(render_job)
cls.save_state() cls.save_state()
if cls.is_running:
cls.evaluate_queue()
@classmethod @classmethod
def all_jobs(cls): def all_jobs(cls):
@@ -81,6 +113,7 @@ class RenderQueue:
cls.session = sessionmaker(bind=cls.engine)() cls.session = sessionmaker(bind=cls.engine)()
from src.engines.core.base_worker import BaseRenderWorker from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all() cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
@classmethod @classmethod
def save_state(cls): def save_state(cls):
@@ -89,6 +122,7 @@ class RenderQueue:
@classmethod @classmethod
def prepare_for_shutdown(cls): def prepare_for_shutdown(cls):
logger.debug("Closing session") logger.debug("Closing session")
cls.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
[cls.cancel_job(job) for job in running_jobs] [cls.cancel_job(job) for job in running_jobs]
cls.save_state() cls.save_state()
@@ -105,6 +139,7 @@ class RenderQueue:
@classmethod @classmethod
def evaluate_queue(cls): def evaluate_queue(cls):
try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started: for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority): if cls.is_available_for_job(job.renderer, job.priority):
@@ -118,22 +153,24 @@ class RenderQueue:
if cls.last_saved_counts != cls.job_counts(): if cls.last_saved_counts != cls.job_counts():
cls.save_state() cls.save_state()
except DetachedInstanceError:
pass
@classmethod @classmethod
def start_job(cls, job): def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}') logger.info(f'Starting job: {job}')
job.start() job.start()
cls.save_state() cls.save_state()
@classmethod @classmethod
def cancel_job(cls, job): def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}') logger.info(f'Cancelling job: {job}')
job.stop() job.stop()
return job.status == RenderStatus.CANCELLED return job.status == RenderStatus.CANCELLED
@classmethod @classmethod
def delete_job(cls, job): def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}") logger.info(f"Deleting job: {job}")
job.stop() job.stop()
cls.job_queue.remove(job) cls.job_queue.remove(job)
cls.session.delete(job) cls.session.delete(job)

View File

@@ -55,7 +55,7 @@ class NewRenderJobForm(QWidget):
self.priority_input = None self.priority_input = None
self.end_frame_input = None self.end_frame_input = None
self.start_frame_input = None self.start_frame_input = None
self.output_path_input = None self.render_name_input = None
self.scene_file_input = None self.scene_file_input = None
self.scene_file_browse_button = None self.scene_file_browse_button = None
self.job_name_input = None self.job_name_input = None
@@ -136,11 +136,11 @@ class NewRenderJobForm(QWidget):
self.output_settings_group = QGroupBox("Output Settings") self.output_settings_group = QGroupBox("Output Settings")
output_settings_layout = QVBoxLayout(self.output_settings_group) output_settings_layout = QVBoxLayout(self.output_settings_group)
# output path # output path
output_path_layout = QHBoxLayout() render_name_layout = QHBoxLayout()
output_path_layout.addWidget(QLabel("Render name:")) render_name_layout.addWidget(QLabel("Render name:"))
self.output_path_input = QLineEdit() self.render_name_input = QLineEdit()
output_path_layout.addWidget(self.output_path_input) render_name_layout.addWidget(self.render_name_input)
output_settings_layout.addLayout(output_path_layout) output_settings_layout.addLayout(render_name_layout)
# file format # file format
file_format_layout = QHBoxLayout() file_format_layout = QHBoxLayout()
file_format_layout.addWidget(QLabel("Format:")) file_format_layout.addWidget(QLabel("Format:"))
@@ -281,7 +281,7 @@ class NewRenderJobForm(QWidget):
output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text())) output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text()))
output_name = output_name.replace(' ', '_') output_name = output_name.replace(' ', '_')
self.output_path_input.setText(output_name) self.render_name_input.setText(output_name)
file_name = self.scene_file_input.text() file_name = self.scene_file_input.text()
# setup bg worker # setup bg worker
@@ -292,7 +292,7 @@ class NewRenderJobForm(QWidget):
def browse_output_path(self): def browse_output_path(self):
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory") directory = QFileDialog.getExistingDirectory(self, "Select Output Directory")
if directory: if directory:
self.output_path_input.setText(directory) self.render_name_input.setText(directory)
def args_help_button_clicked(self): def args_help_button_clicked(self):
url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/' url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/'
@@ -316,8 +316,6 @@ class NewRenderJobForm(QWidget):
self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet
# not ideal but if we don't have the renderer info we have to pick something # not ideal but if we don't have the renderer info we have to pick something
self.output_path_input.setText(os.path.basename(input_path))
# cleanup progress UI # cleanup progress UI
self.load_file_group.setHidden(True) self.load_file_group.setHidden(True)
self.toggle_renderer_enablement(True) self.toggle_renderer_enablement(True)
@@ -451,14 +449,16 @@ class SubmitWorker(QThread):
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(), job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
'renderer': self.window.renderer_type.currentText().lower(), 'renderer': self.window.renderer_type.currentText().lower(),
'engine_version': self.window.renderer_version_combo.currentText(), 'engine_version': self.window.renderer_version_combo.currentText(),
'args': {'raw': self.window.raw_args.text()}, 'args': {'raw': self.window.raw_args.text(),
'output_path': self.window.output_path_input.text(), 'export_format': self.window.file_format_combo.currentText()},
'output_path': self.window.render_name_input.text(),
'start_frame': self.window.start_frame_input.value(), 'start_frame': self.window.start_frame_input.value(),
'end_frame': self.window.end_frame_input.value(), 'end_frame': self.window.end_frame_input.value(),
'priority': self.window.priority_input.currentIndex() + 1, 'priority': self.window.priority_input.currentIndex() + 1,
'notes': self.window.notes_input.toPlainText(), 'notes': self.window.notes_input.toPlainText(),
'enable_split_jobs': self.window.enable_splitjobs.isChecked(), 'enable_split_jobs': self.window.enable_splitjobs.isChecked(),
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked()} 'split_jobs_same_os': self.window.splitjobs_same_os.isChecked(),
'name': self.window.render_name_input.text()}
# get the dynamic args # get the dynamic args
for i in range(self.window.renderer_options_layout.count()): for i in range(self.window.renderer_options_layout.count()):
@@ -487,7 +487,8 @@ class SubmitWorker(QThread):
for cam in selected_cameras: for cam in selected_cameras:
job_copy = copy.deepcopy(job_json) job_copy = copy.deepcopy(job_json)
job_copy['args']['camera'] = cam job_copy['args']['camera'] = cam
job_copy['name'] = pathlib.Path(input_path).stem.replace(' ', '_') + "-" + cam.replace(' ', '') job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '')
job_copy['output_path'] = job_copy['name']
job_list.append(job_copy) job_list.append(job_copy)
else: else:
job_list = [job_json] job_list = [job_json]

View File

@@ -1,4 +1,3 @@
import sys
import logging import logging
from PyQt6.QtGui import QFont from PyQt6.QtGui import QFont
@@ -16,7 +15,10 @@ class QSignalHandler(logging.Handler, QObject):
def emit(self, record): def emit(self, record):
msg = self.format(record) msg = self.format(record)
try:
self.new_record.emit(msg) # Emit signal self.new_record.emit(msg) # Emit signal
except RuntimeError:
pass
class ConsoleWindow(QMainWindow): class ConsoleWindow(QMainWindow):

View File

@@ -183,8 +183,13 @@ class MainWindow(QMainWindow):
def __background_update(self): def __background_update(self):
while True: while True:
try:
self.update_servers() self.update_servers()
self.fetch_jobs() self.fetch_jobs()
except RuntimeError:
pass
except Exception as e:
logger.error(f"Uncaught exception in background update: {e}")
time.sleep(0.5) time.sleep(0.5)
def closeEvent(self, event): def closeEvent(self, event):
@@ -372,13 +377,17 @@ class MainWindow(QMainWindow):
def load_image_path(self, image_path): def load_image_path(self, image_path):
# Load and set the image using QPixmap # Load and set the image using QPixmap
try:
pixmap = QPixmap(image_path) pixmap = QPixmap(image_path)
if not pixmap: if not pixmap:
logger.error("Error loading image") logger.error("Error loading image")
return return
self.image_label.setPixmap(pixmap) self.image_label.setPixmap(pixmap)
except Exception as e:
logger.error(f"Error loading image path: {e}")
def load_image_data(self, pillow_image): def load_image_data(self, pillow_image):
try:
# Convert the Pillow Image to a QByteArray (byte buffer) # Convert the Pillow Image to a QByteArray (byte buffer)
byte_array = QByteArray() byte_array = QByteArray()
buffer = QBuffer(byte_array) buffer = QBuffer(byte_array)
@@ -396,6 +405,8 @@ class MainWindow(QMainWindow):
logger.error("Error loading image") logger.error("Error loading image")
return return
self.image_label.setPixmap(pixmap) self.image_label.setPixmap(pixmap)
except Exception as e:
logger.error(f"Error loading image data: {e}")
def update_servers(self): def update_servers(self):
found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames)) found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames))

View File

@@ -32,20 +32,23 @@ class StatusBar(QStatusBar):
# Check for status change every 1s on background thread # Check for status change every 1s on background thread
while True: while True:
try:
# update status label - get download status
new_status = proxy.status() new_status = proxy.status()
new_image_name = image_names.get(new_status, 'Synchronize.png')
image_path = os.path.join(resources_dir(), new_image_name)
self.label.setPixmap((QPixmap(image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
# add download status
if EngineManager.download_tasks: if EngineManager.download_tasks:
if len(EngineManager.download_tasks) == 1: if len(EngineManager.download_tasks) == 1:
task = EngineManager.download_tasks[0] task = EngineManager.download_tasks[0]
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..." new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
else: else:
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines" new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
self.messageLabel.setText(new_status) self.messageLabel.setText(new_status)
# update status image
new_image_name = image_names.get(new_status, 'Synchronize.png')
new_image_path = os.path.join(resources_dir(), new_image_name)
self.label.setPixmap((QPixmap(new_image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
except RuntimeError: # ignore runtime errors during shutdown
pass
time.sleep(1) time.sleep(1)
background_thread = threading.Thread(target=background_update,) background_thread = threading.Thread(target=background_update,)

View File

@@ -10,7 +10,7 @@ class Config:
max_content_path = 100000000 max_content_path = 100000000
server_log_level = 'debug' server_log_level = 'debug'
log_buffer_length = 250 log_buffer_length = 250
subjob_connection_timeout = 120 worker_process_timeout = 120
flask_log_level = 'error' flask_log_level = 'error'
flask_debug_enable = False flask_debug_enable = False
queue_eval_seconds = 1 queue_eval_seconds = 1
@@ -28,7 +28,7 @@ class Config:
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout) cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds) cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)

View File

@@ -2,7 +2,8 @@ import logging
import socket import socket
from pubsub import pub from pubsub import pub
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
NotRunningException
logger = logging.getLogger() logger = logging.getLogger()
@@ -31,12 +32,14 @@ class ZeroconfServer:
def start(cls, listen_only=False): def start(cls, listen_only=False):
if not cls.service_type: if not cls.service_type:
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server") raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
logger.debug("Starting zeroconf service")
if not listen_only: if not listen_only:
cls._register_service() cls._register_service()
cls._browse_services() cls._browse_services()
@classmethod @classmethod
def stop(cls): def stop(cls):
logger.debug("Stopping zeroconf service")
cls._unregister_service() cls._unregister_service()
cls.zeroconf.close() cls.zeroconf.close()
@@ -73,6 +76,7 @@ class ZeroconfServer:
@classmethod @classmethod
def _on_service_discovered(cls, zeroconf, service_type, name, state_change): def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
try:
info = zeroconf.get_service_info(service_type, name) info = zeroconf.get_service_info(service_type, name)
hostname = name.split(f'.{cls.service_type}')[0] hostname = name.split(f'.{cls.service_type}')[0]
logger.debug(f"Zeroconf: {hostname} {state_change}") logger.debug(f"Zeroconf: {hostname} {state_change}")
@@ -82,6 +86,8 @@ class ZeroconfServer:
else: else:
cls.client_cache.pop(hostname) cls.client_cache.pop(hostname)
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change) pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
except NotRunningException:
pass
@classmethod @classmethod
def found_hostnames(cls): def found_hostnames(cls):
@@ -104,9 +110,15 @@ class ZeroconfServer:
# Example usage: # Example usage:
if __name__ == "__main__": if __name__ == "__main__":
import time
logging.basicConfig(level=logging.DEBUG)
ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080) ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080)
try: try:
ZeroconfServer.start() ZeroconfServer.start()
input("Server running - Press enter to end") while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally: finally:
ZeroconfServer.stop() ZeroconfServer.stop()