mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Compare commits
10 Commits
feature/84
...
feature/bl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a464910426 | ||
|
|
a4ff36ac56 | ||
| 51a5a63944 | |||
| 3600eeb21b | |||
| 6afb6e65a6 | |||
|
|
90d5e9b7af | ||
| 4df41a2079 | |||
| 1cdb7810bf | |||
|
|
21011e47ca | ||
|
|
86977b9d6d |
@@ -3,7 +3,7 @@ update_engines_on_launch: true
|
|||||||
max_content_path: 100000000
|
max_content_path: 100000000
|
||||||
server_log_level: info
|
server_log_level: info
|
||||||
log_buffer_length: 250
|
log_buffer_length: 250
|
||||||
subjob_connection_timeout: 120
|
worker_process_timeout: 120
|
||||||
flask_log_level: error
|
flask_log_level: error
|
||||||
flask_debug_enable: false
|
flask_debug_enable: false
|
||||||
queue_eval_seconds: 1
|
queue_eval_seconds: 1
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
from src.api.api_server import start_server
|
from init import run
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
start_server()
|
run(server_only=True)
|
||||||
|
|||||||
@@ -49,12 +49,11 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory):
|
|||||||
raise ValueError(f"Error downloading file from URL: {project_url}")
|
raise ValueError(f"Error downloading file from URL: {project_url}")
|
||||||
elif local_path and os.path.exists(local_path):
|
elif local_path and os.path.exists(local_path):
|
||||||
referred_name = os.path.basename(local_path)
|
referred_name = os.path.basename(local_path)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("Cannot find any valid project paths")
|
raise ValueError("Cannot find any valid project paths")
|
||||||
|
|
||||||
# Prepare the local filepath
|
# Prepare the local filepath
|
||||||
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
|
cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-')
|
||||||
job_dir = os.path.join(upload_directory, '-'.join(
|
job_dir = os.path.join(upload_directory, '-'.join(
|
||||||
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
|
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
|
||||||
os.makedirs(job_dir, exist_ok=True)
|
os.makedirs(job_dir, exist_ok=True)
|
||||||
|
|||||||
@@ -2,14 +2,12 @@
|
|||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import multiprocessing
|
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import shutil
|
import shutil
|
||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
@@ -17,19 +15,19 @@ from zipfile import ZipFile
|
|||||||
import psutil
|
import psutil
|
||||||
import yaml
|
import yaml
|
||||||
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
|
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
|
||||||
|
from sqlalchemy.orm.exc import DetachedInstanceError
|
||||||
|
|
||||||
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
|
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
|
||||||
from src.api.serverproxy_manager import ServerProxyManager
|
from src.api.preview_manager import PreviewManager
|
||||||
from src.distributed_job_manager import DistributedJobManager
|
from src.distributed_job_manager import DistributedJobManager
|
||||||
from src.engines.core.base_worker import string_to_status, RenderStatus
|
from src.engines.core.base_worker import string_to_status, RenderStatus
|
||||||
from src.engines.engine_manager import EngineManager
|
from src.engines.engine_manager import EngineManager
|
||||||
from src.render_queue import RenderQueue, JobNotFoundError
|
from src.render_queue import RenderQueue, JobNotFoundError
|
||||||
|
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
|
||||||
from src.utilities.config import Config
|
from src.utilities.config import Config
|
||||||
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
|
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
|
||||||
current_system_os_version, num_to_alphanumeric
|
current_system_os_version, num_to_alphanumeric
|
||||||
from src.utilities.server_helper import generate_thumbnail_for_job
|
|
||||||
from src.utilities.zeroconf_server import ZeroconfServer
|
from src.utilities.zeroconf_server import ZeroconfServer
|
||||||
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
|
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
server = Flask(__name__)
|
server = Flask(__name__)
|
||||||
@@ -39,6 +37,29 @@ categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED
|
|||||||
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
|
RenderStatus.COMPLETED, RenderStatus.CANCELLED]
|
||||||
|
|
||||||
|
|
||||||
|
# -- Error Handlers --
|
||||||
|
|
||||||
|
@server.errorhandler(JobNotFoundError)
|
||||||
|
def handle_job_not_found(job_error):
|
||||||
|
return str(job_error), 400
|
||||||
|
|
||||||
|
|
||||||
|
@server.errorhandler(DetachedInstanceError)
|
||||||
|
def handle_detached_instance(error):
|
||||||
|
# logger.debug(f"detached instance: {error}")
|
||||||
|
return "Unavailable", 503
|
||||||
|
|
||||||
|
|
||||||
|
@server.errorhandler(Exception)
|
||||||
|
def handle_general_error(general_error):
|
||||||
|
err_msg = f"Server error: {general_error}"
|
||||||
|
logger.error(err_msg)
|
||||||
|
return err_msg, 500
|
||||||
|
|
||||||
|
|
||||||
|
# -- Jobs --
|
||||||
|
|
||||||
|
|
||||||
def sorted_jobs(all_jobs, sort_by_date=True):
|
def sorted_jobs(all_jobs, sort_by_date=True):
|
||||||
if not sort_by_date:
|
if not sort_by_date:
|
||||||
sorted_job_list = []
|
sorted_job_list = []
|
||||||
@@ -60,9 +81,11 @@ def jobs_json():
|
|||||||
job_cache_int = int(json.dumps(all_jobs).__hash__())
|
job_cache_int = int(json.dumps(all_jobs).__hash__())
|
||||||
job_cache_token = num_to_alphanumeric(job_cache_int)
|
job_cache_token = num_to_alphanumeric(job_cache_int)
|
||||||
return {'jobs': all_jobs, 'token': job_cache_token}
|
return {'jobs': all_jobs, 'token': job_cache_token}
|
||||||
|
except DetachedInstanceError as e:
|
||||||
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Exception fetching jobs_json: {e}")
|
logger.error(f"Error fetching jobs_json: {e}")
|
||||||
return {}, 500
|
raise e
|
||||||
|
|
||||||
|
|
||||||
@server.get('/api/jobs_long_poll')
|
@server.get('/api/jobs_long_poll')
|
||||||
@@ -78,50 +101,40 @@ def long_polling_jobs():
|
|||||||
if time.time() - start_time > 30:
|
if time.time() - start_time > 30:
|
||||||
return {}, 204
|
return {}, 204
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
except DetachedInstanceError as e:
|
||||||
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Exception fetching long_polling_jobs: {e}")
|
logger.error(f"Error fetching long_polling_jobs: {e}")
|
||||||
return {}, 500
|
raise e
|
||||||
|
|
||||||
|
|
||||||
@server.route('/api/job/<job_id>/thumbnail')
|
@server.route('/api/job/<job_id>/thumbnail')
|
||||||
def job_thumbnail(job_id):
|
def job_thumbnail(job_id):
|
||||||
|
|
||||||
|
try:
|
||||||
big_thumb = request.args.get('size', False) == "big"
|
big_thumb = request.args.get('size', False) == "big"
|
||||||
video_ok = request.args.get('video_ok', False)
|
video_ok = request.args.get('video_ok', False)
|
||||||
found_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
|
||||||
if found_job:
|
|
||||||
|
|
||||||
os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True)
|
# trigger a thumbnail update - just in case
|
||||||
thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
|
||||||
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
|
previews = PreviewManager.get_previews_for_job(found_job)
|
||||||
big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4')
|
all_previews_list = previews.get('output', previews.get('input', []))
|
||||||
big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg')
|
|
||||||
|
|
||||||
# generate regular thumb if it doesn't exist
|
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
|
||||||
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \
|
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
|
||||||
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
filtered_list = video_previews if video_previews and video_ok else image_previews
|
||||||
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
|
|
||||||
|
|
||||||
# generate big thumb if it doesn't exist
|
# todo - sort by size or other metrics here
|
||||||
if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \
|
if filtered_list:
|
||||||
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
preview_to_send = filtered_list[0]
|
||||||
generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800)
|
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
|
||||||
|
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
|
||||||
# generated videos
|
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
|
||||||
if video_ok:
|
except Exception as e:
|
||||||
if big_thumb and os.path.exists(big_video_path) and not os.path.exists(
|
logger.error(f'Error getting thumbnail: {e}')
|
||||||
big_video_path + '_IN-PROGRESS'):
|
return f'Error getting thumbnail: {e}', 500
|
||||||
return send_file(big_video_path, mimetype="video/mp4")
|
return "No thumbnail available", 404
|
||||||
elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
|
|
||||||
return send_file(thumb_video_path, mimetype="video/mp4")
|
|
||||||
|
|
||||||
# Generated thumbs
|
|
||||||
if big_thumb and os.path.exists(big_image_path):
|
|
||||||
return send_file(big_image_path, mimetype='image/jpeg')
|
|
||||||
elif os.path.exists(thumb_image_path):
|
|
||||||
return send_file(thumb_image_path, mimetype='image/jpeg')
|
|
||||||
|
|
||||||
return found_job.status.value, 200
|
|
||||||
return found_job.status.value, 404
|
|
||||||
|
|
||||||
|
|
||||||
# Get job file routing
|
# Get job file routing
|
||||||
@@ -146,22 +159,17 @@ def filtered_jobs_json(status_val):
|
|||||||
return f'Cannot find jobs with status {status_val}', 400
|
return f'Cannot find jobs with status {status_val}', 400
|
||||||
|
|
||||||
|
|
||||||
@server.post('/api/job/<job_id>/notify_parent_of_status_change')
|
@server.post('/api/job/<job_id>/send_subjob_update_notification')
|
||||||
def subjob_status_change(job_id):
|
def subjob_update_notification(job_id):
|
||||||
try:
|
try:
|
||||||
subjob_details = request.json
|
subjob_details = request.json
|
||||||
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
|
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
|
||||||
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
||||||
return Response(status=200)
|
return Response(status=200)
|
||||||
except JobNotFoundError:
|
except JobNotFoundError:
|
||||||
return "Job not found", 404
|
return "Job not found", 404
|
||||||
|
|
||||||
|
|
||||||
@server.errorhandler(JobNotFoundError)
|
|
||||||
def handle_job_not_found(job_error):
|
|
||||||
return f'Cannot find job with ID {job_error.job_id}', 400
|
|
||||||
|
|
||||||
|
|
||||||
@server.get('/api/job/<job_id>')
|
@server.get('/api/job/<job_id>')
|
||||||
def get_job_status(job_id):
|
def get_job_status(job_id):
|
||||||
return RenderQueue.job_with_id(job_id).json()
|
return RenderQueue.job_with_id(job_id).json()
|
||||||
@@ -180,7 +188,22 @@ def get_job_logs(job_id):
|
|||||||
|
|
||||||
@server.get('/api/job/<job_id>/file_list')
|
@server.get('/api/job/<job_id>/file_list')
|
||||||
def get_file_list(job_id):
|
def get_file_list(job_id):
|
||||||
return RenderQueue.job_with_id(job_id).file_list()
|
return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()]
|
||||||
|
|
||||||
|
|
||||||
|
@server.route('/api/job/<job_id>/download')
|
||||||
|
def download_file(job_id):
|
||||||
|
|
||||||
|
requested_filename = request.args.get('filename')
|
||||||
|
if not requested_filename:
|
||||||
|
return 'Filename required', 400
|
||||||
|
|
||||||
|
found_job = RenderQueue.job_with_id(job_id)
|
||||||
|
for job_filename in found_job.file_list():
|
||||||
|
if os.path.basename(job_filename).lower() == requested_filename.lower():
|
||||||
|
return send_file(job_filename, as_attachment=True, )
|
||||||
|
|
||||||
|
return f"File '{requested_filename}' not found", 404
|
||||||
|
|
||||||
|
|
||||||
@server.route('/api/job/<job_id>/download_all')
|
@server.route('/api/job/<job_id>/download_all')
|
||||||
@@ -305,14 +328,10 @@ def delete_job(job_id):
|
|||||||
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
|
if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir):
|
||||||
shutil.rmtree(output_dir)
|
shutil.rmtree(output_dir)
|
||||||
|
|
||||||
# Remove any thumbnails
|
try:
|
||||||
for filename in os.listdir(server.config['THUMBS_FOLDER']):
|
PreviewManager.delete_previews_for_job(found_job)
|
||||||
if job_id in filename:
|
except Exception as e:
|
||||||
os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename))
|
logger.error(f"Error deleting previews for {found_job}: {e}")
|
||||||
|
|
||||||
thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4')
|
|
||||||
if os.path.exists(thumb_path):
|
|
||||||
os.remove(thumb_path)
|
|
||||||
|
|
||||||
# See if we own the project_dir (i.e. was it uploaded)
|
# See if we own the project_dir (i.e. was it uploaded)
|
||||||
project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
|
project_dir = os.path.dirname(os.path.dirname(found_job.input_path))
|
||||||
@@ -489,60 +508,24 @@ def get_disk_benchmark():
|
|||||||
return {'write_speed': results[0], 'read_speed': results[-1]}
|
return {'write_speed': results[0], 'read_speed': results[-1]}
|
||||||
|
|
||||||
|
|
||||||
def start_server():
|
def start_server(hostname=None):
|
||||||
def eval_loop(delay_sec=1):
|
|
||||||
while True:
|
|
||||||
RenderQueue.evaluate_queue()
|
|
||||||
time.sleep(delay_sec)
|
|
||||||
|
|
||||||
# get hostname
|
# get hostname
|
||||||
|
if not hostname:
|
||||||
local_hostname = socket.gethostname()
|
local_hostname = socket.gethostname()
|
||||||
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
||||||
|
|
||||||
# load flask settings
|
# load flask settings
|
||||||
server.config['HOSTNAME'] = local_hostname
|
server.config['HOSTNAME'] = hostname
|
||||||
server.config['PORT'] = int(Config.port_number)
|
server.config['PORT'] = int(Config.port_number)
|
||||||
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
|
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
|
||||||
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(Config.upload_folder), 'thumbs'))
|
|
||||||
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
|
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
|
||||||
server.config['enable_split_jobs'] = Config.enable_split_jobs
|
server.config['enable_split_jobs'] = Config.enable_split_jobs
|
||||||
|
|
||||||
# Setup directory for saving engines to
|
|
||||||
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
|
|
||||||
'engines')))
|
|
||||||
os.makedirs(EngineManager.engines_path, exist_ok=True)
|
|
||||||
|
|
||||||
# Debug info
|
|
||||||
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
|
|
||||||
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
|
|
||||||
logger.debug(f"Engines directory: {EngineManager.engines_path}")
|
|
||||||
|
|
||||||
# disable most Flask logging
|
# disable most Flask logging
|
||||||
flask_log = logging.getLogger('werkzeug')
|
flask_log = logging.getLogger('werkzeug')
|
||||||
flask_log.setLevel(Config.flask_log_level.upper())
|
flask_log.setLevel(Config.flask_log_level.upper())
|
||||||
|
|
||||||
# check for updates for render engines if configured or on first launch
|
logger.debug('Starting API server')
|
||||||
if Config.update_engines_on_launch or not EngineManager.get_engines():
|
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False,
|
||||||
EngineManager.update_all_engines()
|
threaded=True)
|
||||||
|
|
||||||
# Set up the RenderQueue object
|
|
||||||
RenderQueue.load_state(database_directory=server.config['UPLOAD_FOLDER'])
|
|
||||||
ServerProxyManager.subscribe_to_listener()
|
|
||||||
DistributedJobManager.subscribe_to_listener()
|
|
||||||
|
|
||||||
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': Config.queue_eval_seconds}, daemon=True)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
|
|
||||||
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
|
|
||||||
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_cores': multiprocessing.cpu_count(),
|
|
||||||
'system_os': current_system_os(),
|
|
||||||
'system_os_version': current_system_os_version()}
|
|
||||||
ZeroconfServer.start()
|
|
||||||
|
|
||||||
try:
|
|
||||||
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable,
|
|
||||||
use_reloader=False, threaded=True)
|
|
||||||
finally:
|
|
||||||
RenderQueue.save_state()
|
|
||||||
ZeroconfServer.stop()
|
|
||||||
|
|||||||
113
src/api/preview_manager.py
Normal file
113
src/api/preview_manager.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
supported_video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
|
||||||
|
supported_image_formats = ['.jpg', '.png', '.exr', '.tif']
|
||||||
|
|
||||||
|
|
||||||
|
class PreviewManager:
|
||||||
|
|
||||||
|
storage_path = None
|
||||||
|
_running_jobs = {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=320):
|
||||||
|
|
||||||
|
# Determine best source file to use for thumbs
|
||||||
|
job_file_list = job.file_list()
|
||||||
|
source_files = job_file_list if job_file_list else [job.input_path]
|
||||||
|
preview_label = "output" if job_file_list else "input"
|
||||||
|
|
||||||
|
# filter by type
|
||||||
|
found_image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_image_formats]
|
||||||
|
found_video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in supported_video_formats]
|
||||||
|
|
||||||
|
# check if we even have any valid files to work from
|
||||||
|
if source_files and not found_video_files and not found_image_files:
|
||||||
|
logger.warning(f"No valid image or video files found in files from job: {job}")
|
||||||
|
return
|
||||||
|
|
||||||
|
os.makedirs(cls.storage_path, exist_ok=True)
|
||||||
|
base_path = os.path.join(cls.storage_path, f"{job.id}-{preview_label}-{max_width}")
|
||||||
|
preview_video_path = base_path + '.mp4'
|
||||||
|
preview_image_path = base_path + '.jpg'
|
||||||
|
|
||||||
|
if replace_existing:
|
||||||
|
for x in [preview_image_path, preview_video_path]:
|
||||||
|
try:
|
||||||
|
os.remove(x)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Generate image previews
|
||||||
|
if (found_video_files or found_image_files) and not os.path.exists(preview_image_path):
|
||||||
|
try:
|
||||||
|
path_of_source = found_image_files[-1] if found_image_files else found_video_files[-1]
|
||||||
|
logger.debug(f"Generating image preview for {path_of_source}")
|
||||||
|
save_first_frame(source_path=path_of_source, dest_path=preview_image_path, max_width=max_width)
|
||||||
|
logger.debug(f"Successfully created image preview for {path_of_source}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating image preview for {job}: {e}")
|
||||||
|
|
||||||
|
# Generate video previews
|
||||||
|
if found_video_files and not os.path.exists(preview_video_path):
|
||||||
|
try:
|
||||||
|
path_of_source = found_video_files[0]
|
||||||
|
logger.debug(f"Generating video preview for {path_of_source}")
|
||||||
|
generate_thumbnail(source_path=path_of_source, dest_path=preview_video_path, max_width=max_width)
|
||||||
|
logger.debug(f"Successfully created video preview for {path_of_source}")
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
logger.error(f"Error generating video preview for {job}: {e}")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
|
||||||
|
job_thread = cls._running_jobs.get(job.id)
|
||||||
|
if job_thread and job_thread.is_alive():
|
||||||
|
logger.debug(f'Preview generation job already running for {job}')
|
||||||
|
else:
|
||||||
|
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
|
||||||
|
job_thread.start()
|
||||||
|
cls._running_jobs[job.id] = job_thread
|
||||||
|
|
||||||
|
if wait_until_completion:
|
||||||
|
job_thread.join(timeout=timeout)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_previews_for_job(cls, job):
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
try:
|
||||||
|
directory_path = Path(cls.storage_path)
|
||||||
|
preview_files_for_job = [f for f in directory_path.iterdir() if f.is_file() and f.name.startswith(job.id)]
|
||||||
|
|
||||||
|
for preview_filename in preview_files_for_job:
|
||||||
|
try:
|
||||||
|
pixel_width = str(preview_filename).split('-')[-1]
|
||||||
|
preview_label = str(os.path.basename(preview_filename)).split('-')[1]
|
||||||
|
extension = os.path.splitext(preview_filename)[-1].lower()
|
||||||
|
kind = 'video' if extension in supported_video_formats else \
|
||||||
|
'image' if extension in supported_image_formats else 'unknown'
|
||||||
|
results[preview_label] = results.get(preview_label, [])
|
||||||
|
results[preview_label].append({'filename': str(preview_filename), 'width': pixel_width, 'kind': kind})
|
||||||
|
except IndexError: # ignore invalid filenames
|
||||||
|
pass
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
return results
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def delete_previews_for_job(cls, job):
|
||||||
|
all_previews = cls.get_previews_for_job(job)
|
||||||
|
flattened_list = [item for sublist in all_previews.values() for item in sublist]
|
||||||
|
for preview in flattened_list:
|
||||||
|
try:
|
||||||
|
logger.debug(f"Removing preview: {preview['filename']}")
|
||||||
|
os.remove(preview['filename'])
|
||||||
|
except OSError as e:
|
||||||
|
logger.error(f"Error removing preview '{preview.get('filename')}': {e}")
|
||||||
@@ -171,19 +171,19 @@ class RenderServerProxy:
|
|||||||
def get_all_engines(self):
|
def get_all_engines(self):
|
||||||
return self.request_data('all_engines')
|
return self.request_data('all_engines')
|
||||||
|
|
||||||
def notify_parent_of_status_change(self, parent_id, subjob):
|
def send_subjob_update_notification(self, parent_id, subjob):
|
||||||
"""
|
"""
|
||||||
Notifies the parent job of a status change in a subjob.
|
Notifies the parent job of an update in a subjob.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
parent_id (str): The ID of the parent job.
|
parent_id (str): The ID of the parent job.
|
||||||
subjob (Job): The subjob that has changed status.
|
subjob (Job): The subjob that has updated.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Response: The response from the server.
|
Response: The response from the server.
|
||||||
"""
|
"""
|
||||||
hostname = LOOPBACK if self.is_localhost else self.hostname
|
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||||
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
|
return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification',
|
||||||
json=subjob.json())
|
json=subjob.json())
|
||||||
|
|
||||||
def post_job_to_server(self, file_path, job_list, callback=None):
|
def post_job_to_server(self, file_path, job_list, callback=None):
|
||||||
@@ -232,19 +232,27 @@ class RenderServerProxy:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"An error occurred: {e}")
|
logger.error(f"An error occurred: {e}")
|
||||||
|
|
||||||
def get_job_files(self, job_id, save_path):
|
def get_job_files_list(self, job_id):
|
||||||
|
return self.request_data(f"job/{job_id}/file_list")
|
||||||
|
|
||||||
|
def download_all_job_files(self, job_id, save_path):
|
||||||
hostname = LOOPBACK if self.is_localhost else self.hostname
|
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||||
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
|
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download_all"
|
||||||
return self.download_file(url, filename=save_path)
|
return self.__download_file_from_url(url, output_filepath=save_path)
|
||||||
|
|
||||||
|
def download_job_file(self, job_id, job_filename, save_path):
|
||||||
|
hostname = LOOPBACK if self.is_localhost else self.hostname
|
||||||
|
url = f"http://{hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}"
|
||||||
|
return self.__download_file_from_url(url, output_filepath=save_path)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def download_file(url, filename):
|
def __download_file_from_url(url, output_filepath):
|
||||||
with requests.get(url, stream=True) as r:
|
with requests.get(url, stream=True) as r:
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
with open(filename, 'wb') as f:
|
with open(output_filepath, 'wb') as f:
|
||||||
for chunk in r.iter_content(chunk_size=8192):
|
for chunk in r.iter_content(chunk_size=8192):
|
||||||
f.write(chunk)
|
f.write(chunk)
|
||||||
return filename
|
return output_filepath
|
||||||
|
|
||||||
# --- Renderer --- #
|
# --- Renderer --- #
|
||||||
|
|
||||||
|
|||||||
@@ -10,9 +10,11 @@ import requests
|
|||||||
from plyer import notification
|
from plyer import notification
|
||||||
from pubsub import pub
|
from pubsub import pub
|
||||||
|
|
||||||
|
from src.api.preview_manager import PreviewManager
|
||||||
from src.api.server_proxy import RenderServerProxy
|
from src.api.server_proxy import RenderServerProxy
|
||||||
from src.engines.engine_manager import EngineManager
|
from src.engines.engine_manager import EngineManager
|
||||||
from src.render_queue import RenderQueue
|
from src.render_queue import RenderQueue
|
||||||
|
from src.utilities.config import Config
|
||||||
from src.utilities.misc_helper import get_file_size_human
|
from src.utilities.misc_helper import get_file_size_human
|
||||||
from src.utilities.status_utils import RenderStatus, string_to_status
|
from src.utilities.status_utils import RenderStatus, string_to_status
|
||||||
from src.utilities.zeroconf_server import ZeroconfServer
|
from src.utilities.zeroconf_server import ZeroconfServer
|
||||||
@@ -32,6 +34,43 @@ class DistributedJobManager:
|
|||||||
This should be called once, typically during the initialization phase.
|
This should be called once, typically during the initialization phase.
|
||||||
"""
|
"""
|
||||||
pub.subscribe(cls.__local_job_status_changed, 'status_change')
|
pub.subscribe(cls.__local_job_status_changed, 'status_change')
|
||||||
|
pub.subscribe(cls.__local_job_frame_complete, 'frame_complete')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __local_job_frame_complete(cls, job_id, frame_number, update_interval=5):
|
||||||
|
|
||||||
|
"""
|
||||||
|
Responds to the 'frame_complete' pubsub message for local jobs.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
job_id (str): The ID of the job that has changed status.
|
||||||
|
old_status (str): The previous status of the job.
|
||||||
|
new_status (str): The new (current) status of the job.
|
||||||
|
|
||||||
|
Note: Do not call directly. Instead, call via the 'frame_complete' pubsub message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||||
|
if not render_job: # ignore jobs not in the queue
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug(f"Job {job_id} has completed frame #{frame_number}")
|
||||||
|
replace_existing_previews = (frame_number % update_interval) == 0
|
||||||
|
cls.__job_update_shared(render_job, replace_existing_previews)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __job_update_shared(cls, render_job, replace_existing_previews=False):
|
||||||
|
# update previews
|
||||||
|
PreviewManager.update_previews_for_job(job=render_job, replace_existing=replace_existing_previews)
|
||||||
|
|
||||||
|
# notify parent to allow individual frames to be copied instead of waiting until the end
|
||||||
|
if render_job.parent:
|
||||||
|
parent_id, parent_hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
|
||||||
|
try:
|
||||||
|
logger.debug(f'Job {render_job.id} updating parent {parent_id}@{parent_hostname}')
|
||||||
|
RenderServerProxy(parent_hostname).send_subjob_update_notification(parent_id, render_job)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def __local_job_status_changed(cls, job_id, old_status, new_status):
|
def __local_job_status_changed(cls, job_id, old_status, new_status):
|
||||||
@@ -52,15 +91,15 @@ class DistributedJobManager:
|
|||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
|
logger.debug(f"Job {job_id} status change: {old_status} -> {new_status}")
|
||||||
if render_job.parent: # If local job is a subjob from a remote server
|
|
||||||
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
|
|
||||||
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
|
|
||||||
|
|
||||||
# handle cancelling all the children
|
cls.__job_update_shared(render_job, replace_existing_previews=(render_job.status == RenderStatus.COMPLETED))
|
||||||
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
|
||||||
|
# Handle children
|
||||||
|
if render_job.children:
|
||||||
|
if new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # Cancel children if necessary
|
||||||
for child in render_job.children:
|
for child in render_job.children:
|
||||||
child_id, hostname = child.split('@')
|
child_id, child_hostname = child.split('@')
|
||||||
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
|
RenderServerProxy(child_hostname).cancel_job(child_id, confirm=True)
|
||||||
|
|
||||||
# UI Notifications
|
# UI Notifications
|
||||||
try:
|
try:
|
||||||
@@ -97,8 +136,7 @@ class DistributedJobManager:
|
|||||||
"""
|
"""
|
||||||
Creates render jobs.
|
Creates render jobs.
|
||||||
|
|
||||||
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render
|
This method job data and a local path to a loaded project. It creates and returns new a render job.
|
||||||
job for each job data in the list and appends the result to a list. The list of results is then returned.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
job_data (dict): Job data.
|
job_data (dict): Job data.
|
||||||
@@ -134,6 +172,7 @@ class DistributedJobManager:
|
|||||||
worker.priority = int(job_data.get('priority', worker.priority))
|
worker.priority = int(job_data.get('priority', worker.priority))
|
||||||
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
|
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
|
||||||
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
|
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
|
||||||
|
worker.watchdog_timeout = Config.worker_process_timeout
|
||||||
worker.hostname = socket.gethostname()
|
worker.hostname = socket.gethostname()
|
||||||
|
|
||||||
# determine if we can / should split the job
|
# determine if we can / should split the job
|
||||||
@@ -143,6 +182,7 @@ class DistributedJobManager:
|
|||||||
logger.debug("Not splitting into subjobs")
|
logger.debug("Not splitting into subjobs")
|
||||||
|
|
||||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||||
|
PreviewManager.update_previews_for_job(worker)
|
||||||
|
|
||||||
return worker
|
return worker
|
||||||
|
|
||||||
@@ -151,9 +191,9 @@ class DistributedJobManager:
|
|||||||
# --------------------------------------------
|
# --------------------------------------------
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def handle_subjob_status_change(cls, local_job, subjob_data):
|
def handle_subjob_update_notification(cls, local_job, subjob_data):
|
||||||
"""
|
"""
|
||||||
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
local_job (BaseRenderWorker): The local parent job worker.
|
local_job (BaseRenderWorker): The local parent job worker.
|
||||||
@@ -162,27 +202,40 @@ class DistributedJobManager:
|
|||||||
|
|
||||||
subjob_status = string_to_status(subjob_data['status'])
|
subjob_status = string_to_status(subjob_data['status'])
|
||||||
subjob_id = subjob_data['id']
|
subjob_id = subjob_data['id']
|
||||||
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
|
subjob_hostname = subjob_data['hostname']
|
||||||
hostname.split('@')[0] == subjob_id), None)
|
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
||||||
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
|
old_status = local_job.children.get(subjob_key, {}).get('status')
|
||||||
|
local_job.children[subjob_key] = subjob_data
|
||||||
|
|
||||||
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
|
logname = f"<Parent: {local_job.id} | Child: {subjob_key}>"
|
||||||
|
if old_status != subjob_status.value:
|
||||||
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
||||||
|
|
||||||
# Download complete or partial render jobs
|
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||||
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
|
|
||||||
subjob_data['file_count']:
|
|
||||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
|
||||||
if not download_result:
|
|
||||||
# todo: handle error
|
|
||||||
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
|
||||||
|
|
||||||
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
|
||||||
# todo: determine missing frames and schedule new job
|
|
||||||
pass
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def download_from_subjob(local_job, subjob_id, subjob_hostname):
|
def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||||
|
|
||||||
|
try:
|
||||||
|
local_files = [os.path.basename(x) for x in local_job.file_list()]
|
||||||
|
subjob_proxy = RenderServerProxy(subjob_hostname)
|
||||||
|
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
|
||||||
|
|
||||||
|
for subjob_filename in subjob_files:
|
||||||
|
if subjob_filename not in local_files:
|
||||||
|
try:
|
||||||
|
logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}")
|
||||||
|
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
|
||||||
|
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
|
||||||
|
save_path=local_save_path)
|
||||||
|
logger.debug(f'Downloaded successfully - {local_save_path}')
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||||
"""
|
"""
|
||||||
Downloads and extracts files from a completed subjob on a remote server.
|
Downloads and extracts files from a completed subjob on a remote server.
|
||||||
|
|
||||||
@@ -203,7 +256,7 @@ class DistributedJobManager:
|
|||||||
try:
|
try:
|
||||||
local_job.children[child_key]['download_status'] = 'working'
|
local_job.children[child_key]['download_status'] = 'working'
|
||||||
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
||||||
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
|
||||||
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error downloading files from remote server: {e}")
|
logger.error(f"Error downloading files from remote server: {e}")
|
||||||
@@ -227,6 +280,7 @@ class DistributedJobManager:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def wait_for_subjobs(cls, local_job):
|
def wait_for_subjobs(cls, local_job):
|
||||||
|
# todo: rewrite this method
|
||||||
logger.debug(f"Waiting for subjobs for job {local_job}")
|
logger.debug(f"Waiting for subjobs for job {local_job}")
|
||||||
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||||
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
||||||
@@ -266,10 +320,10 @@ class DistributedJobManager:
|
|||||||
|
|
||||||
# Check if job is finished, but has not had files copied yet over yet
|
# Check if job is finished, but has not had files copied yet over yet
|
||||||
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
||||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
try:
|
||||||
if not download_result:
|
cls.download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||||
logger.error("Failed to download from subjob")
|
except Exception as e:
|
||||||
# todo: error handling here
|
logger.error(f"Error downloading missing frames from subjob: {e}")
|
||||||
|
|
||||||
# Any finished jobs not successfully downloaded at this point are skipped
|
# Any finished jobs not successfully downloaded at this point are skipped
|
||||||
if local_job.children[child_key].get('download_status', None) is None and \
|
if local_job.children[child_key].get('download_status', None) is None and \
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
from src.engines.core.base_engine import *
|
from src.engines.core.base_engine import *
|
||||||
from src.utilities.misc_helper import system_safe_path
|
from src.utilities.misc_helper import system_safe_path
|
||||||
@@ -148,7 +149,13 @@ class Blender(BaseRenderEngine):
|
|||||||
return options
|
return options
|
||||||
|
|
||||||
def system_info(self):
|
def system_info(self):
|
||||||
return {'render_devices': self.get_render_devices()}
|
with ThreadPoolExecutor() as executor:
|
||||||
|
future_render_devices = executor.submit(self.get_render_devices)
|
||||||
|
future_engines = executor.submit(self.supported_render_engines)
|
||||||
|
render_devices = future_render_devices.result()
|
||||||
|
engines = future_engines.result()
|
||||||
|
|
||||||
|
return {'render_devices': render_devices, 'engines': engines}
|
||||||
|
|
||||||
def get_render_devices(self):
|
def get_render_devices(self):
|
||||||
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')
|
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_system_info.py')
|
||||||
|
|||||||
@@ -40,11 +40,13 @@ class BlenderRenderWorker(BaseRenderWorker):
|
|||||||
if custom_camera:
|
if custom_camera:
|
||||||
python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];"
|
python_exp = python_exp + f"bpy.context.scene.camera = bpy.data.objects['{custom_camera}'];"
|
||||||
|
|
||||||
|
# Setup Render Engines
|
||||||
|
self.args['engine'] = self.args.get('engine', 'CYCLES').upper() # set default render engine
|
||||||
|
# Configure Cycles
|
||||||
|
if self.args['engine'] == 'CYCLES':
|
||||||
# Set Render Device (gpu/cpu/any)
|
# Set Render Device (gpu/cpu/any)
|
||||||
blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
|
|
||||||
if blender_engine == 'CYCLES':
|
|
||||||
render_device = self.args.get('render_device', 'any').lower()
|
render_device = self.args.get('render_device', 'any').lower()
|
||||||
if render_device not in {'any', 'gpu', 'cpu'}:
|
if render_device not in ['any', 'gpu', 'cpu']:
|
||||||
raise AttributeError(f"Invalid Cycles render device: {render_device}")
|
raise AttributeError(f"Invalid Cycles render device: {render_device}")
|
||||||
|
|
||||||
use_gpu = render_device in {'any', 'gpu'}
|
use_gpu = render_device in {'any', 'gpu'}
|
||||||
@@ -62,7 +64,10 @@ class BlenderRenderWorker(BaseRenderWorker):
|
|||||||
# Export format
|
# Export format
|
||||||
export_format = self.args.get('export_format', None) or 'JPEG'
|
export_format = self.args.get('export_format', None) or 'JPEG'
|
||||||
|
|
||||||
path_without_ext = os.path.splitext(self.output_path)[0] + "_"
|
main_part, ext = os.path.splitext(self.output_path)
|
||||||
|
# Remove the extension only if it is not composed entirely of digits
|
||||||
|
path_without_ext = main_part if not ext[1:].isdigit() else self.output_path
|
||||||
|
path_without_ext += "_"
|
||||||
cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format])
|
cmd.extend(['-E', blender_engine, '-o', path_without_ext, '-F', export_format])
|
||||||
|
|
||||||
# set frame range
|
# set frame range
|
||||||
@@ -107,6 +112,7 @@ class BlenderRenderWorker(BaseRenderWorker):
|
|||||||
output_file_number = output_filename_match.groups()[0]
|
output_file_number = output_filename_match.groups()[0]
|
||||||
try:
|
try:
|
||||||
self.current_frame = int(output_file_number)
|
self.current_frame = int(output_file_number)
|
||||||
|
self._send_frame_complete_notification()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
elif render_stats_match:
|
elif render_stats_match:
|
||||||
@@ -115,15 +121,15 @@ class BlenderRenderWorker(BaseRenderWorker):
|
|||||||
logger.info(f'Frame #{self.current_frame} - '
|
logger.info(f'Frame #{self.current_frame} - '
|
||||||
f'{frame_count} of {self.total_frames} completed in {time_completed} | '
|
f'{frame_count} of {self.total_frames} completed in {time_completed} | '
|
||||||
f'Total Elapsed Time: {datetime.now() - self.start_time}')
|
f'Total Elapsed Time: {datetime.now() - self.start_time}')
|
||||||
else:
|
|
||||||
logger.debug(f'DEBUG: {line}')
|
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
# if len(line.strip()):
|
# if len(line.strip()):
|
||||||
# logger.debug(line.strip())
|
# logger.debug(line.strip())
|
||||||
|
|
||||||
def percent_complete(self):
|
def percent_complete(self):
|
||||||
if self.total_frames <= 1:
|
if self.status == RenderStatus.COMPLETED:
|
||||||
|
return 1
|
||||||
|
elif self.total_frames <= 1:
|
||||||
return self.__frame_percent_complete
|
return self.__frame_percent_complete
|
||||||
else:
|
else:
|
||||||
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames
|
whole_frame_percent = (self.current_frame - self.start_frame) / self.total_frames
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import psutil
|
import psutil
|
||||||
@@ -94,10 +95,15 @@ class BaseRenderWorker(Base):
|
|||||||
self.errors = []
|
self.errors = []
|
||||||
|
|
||||||
# Threads and processes
|
# Threads and processes
|
||||||
self.__thread = threading.Thread(target=self.run, args=())
|
self.__thread = threading.Thread(target=self.__run, args=())
|
||||||
self.__thread.daemon = True
|
self.__thread.daemon = True
|
||||||
self.__process = None
|
self.__process = None
|
||||||
self.last_output = None
|
self.last_output = None
|
||||||
|
self.__last_output_time = None
|
||||||
|
self.watchdog_timeout = 120
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Job id:{self.id} p{self.priority} {self.renderer}-{self.renderer_version} '{self.name}' status:{self.status.value}>"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def total_frames(self):
|
def total_frames(self):
|
||||||
@@ -121,19 +127,16 @@ class BaseRenderWorker(Base):
|
|||||||
self._status = RenderStatus.CANCELLED.value
|
self._status = RenderStatus.CANCELLED.value
|
||||||
return string_to_status(self._status)
|
return string_to_status(self._status)
|
||||||
|
|
||||||
def validate(self):
|
def _send_frame_complete_notification(self):
|
||||||
if not os.path.exists(self.input_path):
|
pub.sendMessage('frame_complete', job_id=self.id, frame_number=self.current_frame)
|
||||||
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
|
|
||||||
self.generate_subprocess()
|
|
||||||
|
|
||||||
def generate_subprocess(self):
|
def generate_subprocess(self):
|
||||||
# Convert raw args from string if available and catch conflicts
|
# Convert raw args from string if available and catch conflicts
|
||||||
generated_args = [str(x) for x in self.generate_worker_subprocess()]
|
generated_args = [str(x) for x in self.generate_worker_subprocess()]
|
||||||
generated_args_flags = [x for x in generated_args if x.startswith('-')]
|
generated_args_flags = [x for x in generated_args if x.startswith('-')]
|
||||||
if len(generated_args_flags) != len(set(generated_args_flags)):
|
if len(generated_args_flags) != len(set(generated_args_flags)):
|
||||||
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
|
msg = f"Cannot generate subprocess - Multiple arg conflicts detected: {generated_args}"
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
logger.debug(f"Generated args for subprocess: {generated_args}")
|
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
return generated_args
|
return generated_args
|
||||||
|
|
||||||
@@ -175,11 +178,12 @@ class BaseRenderWorker(Base):
|
|||||||
|
|
||||||
self.status = RenderStatus.RUNNING
|
self.status = RenderStatus.RUNNING
|
||||||
self.start_time = datetime.now()
|
self.start_time = datetime.now()
|
||||||
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
|
|
||||||
f'Frame Count: {self.total_frames}')
|
|
||||||
self.__thread.start()
|
self.__thread.start()
|
||||||
|
|
||||||
def run(self):
|
def __run(self):
|
||||||
|
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
|
||||||
|
f'Frame Count: {self.total_frames}')
|
||||||
|
|
||||||
# Setup logging
|
# Setup logging
|
||||||
log_dir = os.path.dirname(self.log_path())
|
log_dir = os.path.dirname(self.log_path())
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
@@ -209,48 +213,42 @@ class BaseRenderWorker(Base):
|
|||||||
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
|
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
|
||||||
self.status = RenderStatus.RUNNING
|
self.status = RenderStatus.RUNNING
|
||||||
|
|
||||||
# Start process and get updates
|
return_code = self.__setup_and_run_process(f, subprocess_cmds)
|
||||||
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
|
||||||
universal_newlines=False)
|
|
||||||
|
|
||||||
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
|
|
||||||
f.write(c)
|
|
||||||
f.flush()
|
|
||||||
os.fsync(f.fileno())
|
|
||||||
self.last_output = c.strip()
|
|
||||||
self._parse_stdout(c.strip())
|
|
||||||
|
|
||||||
f.write('\n')
|
|
||||||
|
|
||||||
# Check return codes and process
|
|
||||||
return_code = self.__process.wait()
|
|
||||||
self.end_time = datetime.now()
|
self.end_time = datetime.now()
|
||||||
|
|
||||||
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled
|
message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \
|
||||||
|
f"after {self.time_elapsed()}\n\n"
|
||||||
|
f.write(message)
|
||||||
|
|
||||||
|
# Teardown
|
||||||
|
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||||
message = f"{self.engine.name()} render ended with status '{self.status}' " \
|
message = f"{self.engine.name()} render ended with status '{self.status}' " \
|
||||||
f"after {self.time_elapsed()}"
|
f"after {self.time_elapsed()}"
|
||||||
f.write(message)
|
f.write(message)
|
||||||
return
|
return
|
||||||
|
|
||||||
# if file output hasn't increased, return as error, otherwise restart process.
|
# if file output hasn't increased, return as error, otherwise restart process.
|
||||||
if len(self.file_list()) <= initial_file_count:
|
file_count_has_increased = len(self.file_list()) > initial_file_count
|
||||||
err_msg = f"File count has not increased. Count is still {len(self.file_list())}"
|
if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code:
|
||||||
f.write(f'Error: {err_msg}\n\n')
|
|
||||||
self.errors.append(err_msg)
|
|
||||||
self.status = RenderStatus.ERROR
|
|
||||||
|
|
||||||
# Handle completed - All else counts as failed attempt
|
|
||||||
if (self.status == RenderStatus.COMPLETED) and not return_code:
|
|
||||||
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
|
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
|
||||||
f"{self.time_elapsed()}\n")
|
f"{self.time_elapsed()}\n")
|
||||||
f.write(message)
|
f.write(message)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Handle non-zero return codes
|
if return_code:
|
||||||
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \
|
err_msg = f"{self.engine.name()} render failed with code {return_code}"
|
||||||
f"after {self.time_elapsed()}\n\n"
|
logger.error(err_msg)
|
||||||
f.write(message)
|
self.errors.append(err_msg)
|
||||||
self.errors.append(message)
|
|
||||||
|
# handle instances where renderer exits ok but doesnt generate files
|
||||||
|
if not return_code and not file_count_has_increased:
|
||||||
|
err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. "
|
||||||
|
f"Count is still {len(self.file_list())}")
|
||||||
|
f.write(f'Error: {err_msg}\n\n')
|
||||||
|
self.errors.append(err_msg)
|
||||||
|
|
||||||
|
# only count the attempt as failed if renderer creates no output - ignore error codes for now
|
||||||
|
if not file_count_has_increased:
|
||||||
failed_attempts += 1
|
failed_attempts += 1
|
||||||
|
|
||||||
if self.children:
|
if self.children:
|
||||||
@@ -263,6 +261,65 @@ class BaseRenderWorker(Base):
|
|||||||
self.status = RenderStatus.COMPLETED
|
self.status = RenderStatus.COMPLETED
|
||||||
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
|
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
|
||||||
|
|
||||||
|
def __setup_and_run_process(self, f, subprocess_cmds):
|
||||||
|
|
||||||
|
def watchdog():
|
||||||
|
logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout')
|
||||||
|
while self.__process.poll() is None:
|
||||||
|
time_since_last_update = time.time() - self.__last_output_time
|
||||||
|
if time_since_last_update > self.watchdog_timeout:
|
||||||
|
logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)")
|
||||||
|
self.__process.kill()
|
||||||
|
break
|
||||||
|
# logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}')
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
logger.debug(f'Stopping process watchdog for {self}')
|
||||||
|
|
||||||
|
return_code = -1
|
||||||
|
watchdog_thread = threading.Thread(target=watchdog)
|
||||||
|
watchdog_thread.daemon = True
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start process and get updates
|
||||||
|
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
|
universal_newlines=False)
|
||||||
|
|
||||||
|
# Start watchdog
|
||||||
|
self.__last_output_time = time.time()
|
||||||
|
watchdog_thread.start()
|
||||||
|
|
||||||
|
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
|
||||||
|
self.last_output = c.strip()
|
||||||
|
self.__last_output_time = time.time()
|
||||||
|
try:
|
||||||
|
f.write(c)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error saving log to disk: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._parse_stdout(c.strip())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'Error parsing stdout: {e}')
|
||||||
|
|
||||||
|
f.write('\n')
|
||||||
|
|
||||||
|
# Check return codes and process
|
||||||
|
return_code = self.__process.wait()
|
||||||
|
except Exception as e:
|
||||||
|
message = f'Uncaught error running render process: {e}'
|
||||||
|
f.write(message)
|
||||||
|
logger.exception(message)
|
||||||
|
self.__process.kill()
|
||||||
|
|
||||||
|
# let watchdog end before continuing - prevents multiple watchdogs running when process restarts
|
||||||
|
if watchdog_thread.is_alive():
|
||||||
|
watchdog_thread.join()
|
||||||
|
|
||||||
|
return return_code
|
||||||
|
|
||||||
def post_processing(self):
|
def post_processing(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ class EngineManager:
|
|||||||
|
|
||||||
if background:
|
if background:
|
||||||
return thread
|
return thread
|
||||||
else:
|
|
||||||
thread.join()
|
thread.join()
|
||||||
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
|
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu) # Check that engine downloaded
|
||||||
if not found_engine:
|
if not found_engine:
|
||||||
@@ -204,13 +204,19 @@ class EngineManager:
|
|||||||
def engine_update_task(engine_class):
|
def engine_update_task(engine_class):
|
||||||
logger.debug(f"Checking for updates to {engine_class.name()}")
|
logger.debug(f"Checking for updates to {engine_class.name()}")
|
||||||
latest_version = engine_class.downloader().find_most_recent_version()
|
latest_version = engine_class.downloader().find_most_recent_version()
|
||||||
if latest_version:
|
|
||||||
logger.debug(f"Latest version of {engine_class.name()} available: {latest_version.get('version')}")
|
if not latest_version:
|
||||||
if not cls.is_version_downloaded(engine_class.name(), latest_version.get('version')):
|
logger.warning(f"Could not find most recent version of {engine.name()} to download")
|
||||||
logger.info(f"Downloading latest version of {engine_class.name()}...")
|
return
|
||||||
cls.download_engine(engine=engine_class.name(), version=latest_version['version'], background=True)
|
|
||||||
else:
|
version_num = latest_version.get('version')
|
||||||
logger.warning(f"Unable to get check for updates for {engine.name()}")
|
if cls.is_version_downloaded(engine_class.name(), version_num):
|
||||||
|
logger.debug(f"Latest version of {engine_class.name()} ({version_num}) already downloaded")
|
||||||
|
return
|
||||||
|
|
||||||
|
# download the engine
|
||||||
|
logger.info(f"Downloading latest version of {engine_class.name()} ({version_num})...")
|
||||||
|
cls.download_engine(engine=engine_class.name(), version=version_num, background=True)
|
||||||
|
|
||||||
logger.info(f"Checking for updates for render engines...")
|
logger.info(f"Checking for updates for render engines...")
|
||||||
threads = []
|
threads = []
|
||||||
|
|||||||
@@ -132,8 +132,8 @@ class FFMPEGDownloader(EngineDownloader):
|
|||||||
system_os = system_os or current_system_os()
|
system_os = system_os or current_system_os()
|
||||||
cpu = cpu or current_system_cpu()
|
cpu = cpu or current_system_cpu()
|
||||||
return cls.all_versions(system_os, cpu)[0]
|
return cls.all_versions(system_os, cpu)[0]
|
||||||
except (IndexError, requests.exceptions.RequestException):
|
except (IndexError, requests.exceptions.RequestException) as e:
|
||||||
logger.error(f"Cannot get most recent version of ffmpeg")
|
logger.error(f"Cannot get most recent version of ffmpeg: {e}")
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
124
src/init.py
124
src/init.py
@@ -1,22 +1,27 @@
|
|||||||
''' app/init.py '''
|
''' app/init.py '''
|
||||||
import logging
|
import logging
|
||||||
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from PyQt6.QtCore import QObject, pyqtSignal
|
|
||||||
from PyQt6.QtWidgets import QApplication
|
|
||||||
|
|
||||||
from src.api.api_server import start_server
|
from src.api.api_server import start_server
|
||||||
|
from src.api.preview_manager import PreviewManager
|
||||||
|
from src.api.serverproxy_manager import ServerProxyManager
|
||||||
|
from src.distributed_job_manager import DistributedJobManager
|
||||||
from src.engines.engine_manager import EngineManager
|
from src.engines.engine_manager import EngineManager
|
||||||
from src.render_queue import RenderQueue
|
from src.render_queue import RenderQueue
|
||||||
from src.ui.main_window import MainWindow
|
|
||||||
from src.utilities.config import Config
|
from src.utilities.config import Config
|
||||||
from src.utilities.misc_helper import system_safe_path
|
from src.utilities.misc_helper import system_safe_path, current_system_cpu, current_system_os, current_system_os_version
|
||||||
|
from src.utilities.zeroconf_server import ZeroconfServer
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
def run() -> int:
|
def run(server_only=False) -> int:
|
||||||
"""
|
"""
|
||||||
Initializes the application and runs it.
|
Initializes the application and runs it.
|
||||||
|
|
||||||
@@ -24,43 +29,93 @@ def run() -> int:
|
|||||||
int: The exit status code.
|
int: The exit status code.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# setup logging
|
||||||
|
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
||||||
|
level=Config.server_log_level.upper())
|
||||||
|
logging.getLogger("requests").setLevel(logging.WARNING) # suppress noisy requests/urllib3 logging
|
||||||
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
# Setup logging for console ui
|
||||||
|
buffer_handler = __setup_buffer_handler() if not server_only else None
|
||||||
|
|
||||||
|
logger.info(f"Starting Zordon Render Server")
|
||||||
|
return_code = 0
|
||||||
try:
|
try:
|
||||||
# Load Config YAML
|
# Load Config YAML
|
||||||
Config.setup_config_dir()
|
Config.setup_config_dir()
|
||||||
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
|
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
|
||||||
|
|
||||||
|
# configure default paths
|
||||||
EngineManager.engines_path = system_safe_path(
|
EngineManager.engines_path = system_safe_path(
|
||||||
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
|
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
|
||||||
'engines')))
|
'engines')))
|
||||||
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
os.makedirs(EngineManager.engines_path, exist_ok=True)
|
||||||
level=Config.server_log_level.upper())
|
PreviewManager.storage_path = system_safe_path(
|
||||||
|
os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
|
||||||
|
|
||||||
app: QApplication = QApplication(sys.argv)
|
# Debug info
|
||||||
|
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
|
||||||
|
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
|
||||||
|
logger.debug(f"Engines directory: {EngineManager.engines_path}")
|
||||||
|
|
||||||
# Start server in background
|
# Set up the RenderQueue object
|
||||||
background_server = threading.Thread(target=start_server)
|
RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder)))
|
||||||
background_server.daemon = True
|
ServerProxyManager.subscribe_to_listener()
|
||||||
background_server.start()
|
DistributedJobManager.subscribe_to_listener()
|
||||||
|
|
||||||
# Setup logging for console ui
|
# check for updates for render engines if configured or on first launch
|
||||||
buffer_handler = BufferingHandler()
|
if Config.update_engines_on_launch or not EngineManager.get_engines():
|
||||||
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
|
EngineManager.update_all_engines()
|
||||||
logger = logging.getLogger()
|
|
||||||
logger.addHandler(buffer_handler)
|
|
||||||
|
|
||||||
window: MainWindow = MainWindow()
|
# get hostname
|
||||||
window.buffer_handler = buffer_handler
|
local_hostname = socket.gethostname()
|
||||||
window.show()
|
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
|
||||||
|
|
||||||
return_code = app.exec()
|
# configure and start API server
|
||||||
|
api_server = threading.Thread(target=start_server, args=(local_hostname,))
|
||||||
|
api_server.daemon = True
|
||||||
|
api_server.start()
|
||||||
|
|
||||||
|
# start zeroconf server
|
||||||
|
ZeroconfServer.configure("_zordon._tcp.local.", local_hostname, Config.port_number)
|
||||||
|
ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
|
||||||
|
'system_cpu_cores': multiprocessing.cpu_count(),
|
||||||
|
'system_os': current_system_os(),
|
||||||
|
'system_os_version': current_system_os_version()}
|
||||||
|
ZeroconfServer.start()
|
||||||
|
logger.info(f"Zordon Render Server started - Hostname: {local_hostname}")
|
||||||
|
|
||||||
|
RenderQueue.evaluation_inverval = Config.queue_eval_seconds
|
||||||
|
RenderQueue.start()
|
||||||
|
|
||||||
|
# start in gui or server only (cli) mode
|
||||||
|
logger.debug(f"Launching in {'server only' if server_only else 'GUI'} mode")
|
||||||
|
if server_only: # CLI only
|
||||||
|
api_server.join()
|
||||||
|
else: # GUI
|
||||||
|
return_code = __show_gui(buffer_handler)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Unhandled exception: {e}")
|
logging.error(f"Unhandled exception: {e}")
|
||||||
return_code = 1
|
return_code = 1
|
||||||
finally:
|
finally:
|
||||||
|
# shut down gracefully
|
||||||
|
logger.info(f"Zordon Render Server is preparing to shut down")
|
||||||
|
try:
|
||||||
RenderQueue.prepare_for_shutdown()
|
RenderQueue.prepare_for_shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Exception during prepare for shutdown: {e}")
|
||||||
|
ZeroconfServer.stop()
|
||||||
|
logger.info(f"Zordon Render Server has shut down")
|
||||||
return sys.exit(return_code)
|
return sys.exit(return_code)
|
||||||
|
|
||||||
|
|
||||||
|
def __setup_buffer_handler():
|
||||||
|
# lazy load GUI frameworks
|
||||||
|
from PyQt6.QtCore import QObject, pyqtSignal
|
||||||
|
|
||||||
class BufferingHandler(logging.Handler, QObject):
|
class BufferingHandler(logging.Handler, QObject):
|
||||||
new_record = pyqtSignal(str)
|
new_record = pyqtSignal(str)
|
||||||
|
|
||||||
@@ -70,9 +125,34 @@ class BufferingHandler(logging.Handler, QObject):
|
|||||||
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
|
self.buffer = deque(maxlen=capacity) # Define a buffer with a fixed capacity
|
||||||
|
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
|
try:
|
||||||
msg = self.format(record)
|
msg = self.format(record)
|
||||||
self.buffer.append(msg) # Add message to the buffer
|
self.buffer.append(msg) # Add message to the buffer
|
||||||
self.new_record.emit(msg) # Emit signal
|
self.new_record.emit(msg) # Emit signal
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
|
||||||
def get_buffer(self):
|
def get_buffer(self):
|
||||||
return list(self.buffer) # Return a copy of the buffer
|
return list(self.buffer) # Return a copy of the buffer
|
||||||
|
|
||||||
|
buffer_handler = BufferingHandler()
|
||||||
|
buffer_handler.setFormatter(logging.getLogger().handlers[0].formatter)
|
||||||
|
logger = logging.getLogger()
|
||||||
|
logger.addHandler(buffer_handler)
|
||||||
|
return buffer_handler
|
||||||
|
|
||||||
|
|
||||||
|
def __show_gui(buffer_handler):
|
||||||
|
# lazy load GUI frameworks
|
||||||
|
from PyQt6.QtWidgets import QApplication
|
||||||
|
|
||||||
|
# load application
|
||||||
|
app: QApplication = QApplication(sys.argv)
|
||||||
|
|
||||||
|
# configure main window
|
||||||
|
from src.ui.main_window import MainWindow
|
||||||
|
window: MainWindow = MainWindow()
|
||||||
|
window.buffer_handler = buffer_handler
|
||||||
|
window.show()
|
||||||
|
|
||||||
|
return app.exec()
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ import logging
|
|||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pubsub import pub
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
from sqlalchemy.orm.exc import DetachedInstanceError
|
||||||
|
|
||||||
from src.utilities.status_utils import RenderStatus
|
|
||||||
from src.engines.engine_manager import EngineManager
|
|
||||||
from src.engines.core.base_worker import Base
|
from src.engines.core.base_worker import Base
|
||||||
|
from src.utilities.status_utils import RenderStatus
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
@@ -17,6 +18,9 @@ class JobNotFoundError(Exception):
|
|||||||
super().__init__(args)
|
super().__init__(args)
|
||||||
self.job_id = job_id
|
self.job_id = job_id
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Cannot find job with ID: {self.job_id}"
|
||||||
|
|
||||||
|
|
||||||
class RenderQueue:
|
class RenderQueue:
|
||||||
engine = None
|
engine = None
|
||||||
@@ -24,18 +28,46 @@ class RenderQueue:
|
|||||||
job_queue = []
|
job_queue = []
|
||||||
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
|
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
|
||||||
last_saved_counts = {}
|
last_saved_counts = {}
|
||||||
|
is_running = False
|
||||||
|
__eval_thread = None
|
||||||
|
evaluation_inverval = 1
|
||||||
|
|
||||||
def __init__(self):
|
# --------------------------------------------
|
||||||
pass
|
# Start / Stop Background Updates
|
||||||
|
# --------------------------------------------
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def start(cls):
|
||||||
|
logger.debug("Starting render queue updates")
|
||||||
|
cls.is_running = True
|
||||||
|
cls.evaluate_queue()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __local_job_status_changed(cls, job_id, old_status, new_status):
|
||||||
|
render_job = RenderQueue.job_with_id(job_id, none_ok=True)
|
||||||
|
if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
|
||||||
|
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
|
||||||
|
RenderQueue.evaluate_queue()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def stop(cls):
|
||||||
|
logger.debug("Stopping render queue updates")
|
||||||
|
cls.is_running = False
|
||||||
|
|
||||||
|
# --------------------------------------------
|
||||||
|
# Queue Management
|
||||||
|
# --------------------------------------------
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_to_render_queue(cls, render_job, force_start=False):
|
def add_to_render_queue(cls, render_job, force_start=False):
|
||||||
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
|
logger.info(f"Adding job to render queue: {render_job}")
|
||||||
cls.job_queue.append(render_job)
|
cls.job_queue.append(render_job)
|
||||||
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
|
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
|
||||||
cls.start_job(render_job)
|
cls.start_job(render_job)
|
||||||
cls.session.add(render_job)
|
cls.session.add(render_job)
|
||||||
cls.save_state()
|
cls.save_state()
|
||||||
|
if cls.is_running:
|
||||||
|
cls.evaluate_queue()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def all_jobs(cls):
|
def all_jobs(cls):
|
||||||
@@ -81,6 +113,7 @@ class RenderQueue:
|
|||||||
cls.session = sessionmaker(bind=cls.engine)()
|
cls.session = sessionmaker(bind=cls.engine)()
|
||||||
from src.engines.core.base_worker import BaseRenderWorker
|
from src.engines.core.base_worker import BaseRenderWorker
|
||||||
cls.job_queue = cls.session.query(BaseRenderWorker).all()
|
cls.job_queue = cls.session.query(BaseRenderWorker).all()
|
||||||
|
pub.subscribe(cls.__local_job_status_changed, 'status_change')
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def save_state(cls):
|
def save_state(cls):
|
||||||
@@ -89,6 +122,7 @@ class RenderQueue:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def prepare_for_shutdown(cls):
|
def prepare_for_shutdown(cls):
|
||||||
logger.debug("Closing session")
|
logger.debug("Closing session")
|
||||||
|
cls.stop()
|
||||||
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
|
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
|
||||||
[cls.cancel_job(job) for job in running_jobs]
|
[cls.cancel_job(job) for job in running_jobs]
|
||||||
cls.save_state()
|
cls.save_state()
|
||||||
@@ -105,6 +139,7 @@ class RenderQueue:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def evaluate_queue(cls):
|
def evaluate_queue(cls):
|
||||||
|
try:
|
||||||
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
|
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
|
||||||
for job in not_started:
|
for job in not_started:
|
||||||
if cls.is_available_for_job(job.renderer, job.priority):
|
if cls.is_available_for_job(job.renderer, job.priority):
|
||||||
@@ -118,22 +153,24 @@ class RenderQueue:
|
|||||||
|
|
||||||
if cls.last_saved_counts != cls.job_counts():
|
if cls.last_saved_counts != cls.job_counts():
|
||||||
cls.save_state()
|
cls.save_state()
|
||||||
|
except DetachedInstanceError:
|
||||||
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def start_job(cls, job):
|
def start_job(cls, job):
|
||||||
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
|
logger.info(f'Starting job: {job}')
|
||||||
job.start()
|
job.start()
|
||||||
cls.save_state()
|
cls.save_state()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def cancel_job(cls, job):
|
def cancel_job(cls, job):
|
||||||
logger.info(f'Cancelling job ID: {job.id}')
|
logger.info(f'Cancelling job: {job}')
|
||||||
job.stop()
|
job.stop()
|
||||||
return job.status == RenderStatus.CANCELLED
|
return job.status == RenderStatus.CANCELLED
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def delete_job(cls, job):
|
def delete_job(cls, job):
|
||||||
logger.info(f"Deleting job ID: {job.id}")
|
logger.info(f"Deleting job: {job}")
|
||||||
job.stop()
|
job.stop()
|
||||||
cls.job_queue.remove(job)
|
cls.job_queue.remove(job)
|
||||||
cls.session.delete(job)
|
cls.session.delete(job)
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class NewRenderJobForm(QWidget):
|
|||||||
self.priority_input = None
|
self.priority_input = None
|
||||||
self.end_frame_input = None
|
self.end_frame_input = None
|
||||||
self.start_frame_input = None
|
self.start_frame_input = None
|
||||||
self.output_path_input = None
|
self.render_name_input = None
|
||||||
self.scene_file_input = None
|
self.scene_file_input = None
|
||||||
self.scene_file_browse_button = None
|
self.scene_file_browse_button = None
|
||||||
self.job_name_input = None
|
self.job_name_input = None
|
||||||
@@ -136,11 +136,11 @@ class NewRenderJobForm(QWidget):
|
|||||||
self.output_settings_group = QGroupBox("Output Settings")
|
self.output_settings_group = QGroupBox("Output Settings")
|
||||||
output_settings_layout = QVBoxLayout(self.output_settings_group)
|
output_settings_layout = QVBoxLayout(self.output_settings_group)
|
||||||
# output path
|
# output path
|
||||||
output_path_layout = QHBoxLayout()
|
render_name_layout = QHBoxLayout()
|
||||||
output_path_layout.addWidget(QLabel("Render name:"))
|
render_name_layout.addWidget(QLabel("Render name:"))
|
||||||
self.output_path_input = QLineEdit()
|
self.render_name_input = QLineEdit()
|
||||||
output_path_layout.addWidget(self.output_path_input)
|
render_name_layout.addWidget(self.render_name_input)
|
||||||
output_settings_layout.addLayout(output_path_layout)
|
output_settings_layout.addLayout(render_name_layout)
|
||||||
# file format
|
# file format
|
||||||
file_format_layout = QHBoxLayout()
|
file_format_layout = QHBoxLayout()
|
||||||
file_format_layout.addWidget(QLabel("Format:"))
|
file_format_layout.addWidget(QLabel("Format:"))
|
||||||
@@ -281,7 +281,7 @@ class NewRenderJobForm(QWidget):
|
|||||||
|
|
||||||
output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text()))
|
output_name, _ = os.path.splitext(os.path.basename(self.scene_file_input.text()))
|
||||||
output_name = output_name.replace(' ', '_')
|
output_name = output_name.replace(' ', '_')
|
||||||
self.output_path_input.setText(output_name)
|
self.render_name_input.setText(output_name)
|
||||||
file_name = self.scene_file_input.text()
|
file_name = self.scene_file_input.text()
|
||||||
|
|
||||||
# setup bg worker
|
# setup bg worker
|
||||||
@@ -292,7 +292,7 @@ class NewRenderJobForm(QWidget):
|
|||||||
def browse_output_path(self):
|
def browse_output_path(self):
|
||||||
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory")
|
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory")
|
||||||
if directory:
|
if directory:
|
||||||
self.output_path_input.setText(directory)
|
self.render_name_input.setText(directory)
|
||||||
|
|
||||||
def args_help_button_clicked(self):
|
def args_help_button_clicked(self):
|
||||||
url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/'
|
url = (f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/renderer/'
|
||||||
@@ -316,8 +316,6 @@ class NewRenderJobForm(QWidget):
|
|||||||
self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet
|
self.renderer_type.setCurrentIndex(0) #todo: find out why we don't have renderer info yet
|
||||||
# not ideal but if we don't have the renderer info we have to pick something
|
# not ideal but if we don't have the renderer info we have to pick something
|
||||||
|
|
||||||
self.output_path_input.setText(os.path.basename(input_path))
|
|
||||||
|
|
||||||
# cleanup progress UI
|
# cleanup progress UI
|
||||||
self.load_file_group.setHidden(True)
|
self.load_file_group.setHidden(True)
|
||||||
self.toggle_renderer_enablement(True)
|
self.toggle_renderer_enablement(True)
|
||||||
@@ -451,14 +449,16 @@ class SubmitWorker(QThread):
|
|||||||
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
|
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
|
||||||
'renderer': self.window.renderer_type.currentText().lower(),
|
'renderer': self.window.renderer_type.currentText().lower(),
|
||||||
'engine_version': self.window.renderer_version_combo.currentText(),
|
'engine_version': self.window.renderer_version_combo.currentText(),
|
||||||
'args': {'raw': self.window.raw_args.text()},
|
'args': {'raw': self.window.raw_args.text(),
|
||||||
'output_path': self.window.output_path_input.text(),
|
'export_format': self.window.file_format_combo.currentText()},
|
||||||
|
'output_path': self.window.render_name_input.text(),
|
||||||
'start_frame': self.window.start_frame_input.value(),
|
'start_frame': self.window.start_frame_input.value(),
|
||||||
'end_frame': self.window.end_frame_input.value(),
|
'end_frame': self.window.end_frame_input.value(),
|
||||||
'priority': self.window.priority_input.currentIndex() + 1,
|
'priority': self.window.priority_input.currentIndex() + 1,
|
||||||
'notes': self.window.notes_input.toPlainText(),
|
'notes': self.window.notes_input.toPlainText(),
|
||||||
'enable_split_jobs': self.window.enable_splitjobs.isChecked(),
|
'enable_split_jobs': self.window.enable_splitjobs.isChecked(),
|
||||||
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked()}
|
'split_jobs_same_os': self.window.splitjobs_same_os.isChecked(),
|
||||||
|
'name': self.window.render_name_input.text()}
|
||||||
|
|
||||||
# get the dynamic args
|
# get the dynamic args
|
||||||
for i in range(self.window.renderer_options_layout.count()):
|
for i in range(self.window.renderer_options_layout.count()):
|
||||||
@@ -487,7 +487,8 @@ class SubmitWorker(QThread):
|
|||||||
for cam in selected_cameras:
|
for cam in selected_cameras:
|
||||||
job_copy = copy.deepcopy(job_json)
|
job_copy = copy.deepcopy(job_json)
|
||||||
job_copy['args']['camera'] = cam
|
job_copy['args']['camera'] = cam
|
||||||
job_copy['name'] = pathlib.Path(input_path).stem.replace(' ', '_') + "-" + cam.replace(' ', '')
|
job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '')
|
||||||
|
job_copy['output_path'] = job_copy['name']
|
||||||
job_list.append(job_copy)
|
job_list.append(job_copy)
|
||||||
else:
|
else:
|
||||||
job_list = [job_json]
|
job_list = [job_json]
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import sys
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from PyQt6.QtGui import QFont
|
from PyQt6.QtGui import QFont
|
||||||
@@ -16,7 +15,10 @@ class QSignalHandler(logging.Handler, QObject):
|
|||||||
|
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
msg = self.format(record)
|
msg = self.format(record)
|
||||||
|
try:
|
||||||
self.new_record.emit(msg) # Emit signal
|
self.new_record.emit(msg) # Emit signal
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ConsoleWindow(QMainWindow):
|
class ConsoleWindow(QMainWindow):
|
||||||
|
|||||||
@@ -183,8 +183,13 @@ class MainWindow(QMainWindow):
|
|||||||
|
|
||||||
def __background_update(self):
|
def __background_update(self):
|
||||||
while True:
|
while True:
|
||||||
|
try:
|
||||||
self.update_servers()
|
self.update_servers()
|
||||||
self.fetch_jobs()
|
self.fetch_jobs()
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Uncaught exception in background update: {e}")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
def closeEvent(self, event):
|
def closeEvent(self, event):
|
||||||
@@ -372,13 +377,17 @@ class MainWindow(QMainWindow):
|
|||||||
|
|
||||||
def load_image_path(self, image_path):
|
def load_image_path(self, image_path):
|
||||||
# Load and set the image using QPixmap
|
# Load and set the image using QPixmap
|
||||||
|
try:
|
||||||
pixmap = QPixmap(image_path)
|
pixmap = QPixmap(image_path)
|
||||||
if not pixmap:
|
if not pixmap:
|
||||||
logger.error("Error loading image")
|
logger.error("Error loading image")
|
||||||
return
|
return
|
||||||
self.image_label.setPixmap(pixmap)
|
self.image_label.setPixmap(pixmap)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error loading image path: {e}")
|
||||||
|
|
||||||
def load_image_data(self, pillow_image):
|
def load_image_data(self, pillow_image):
|
||||||
|
try:
|
||||||
# Convert the Pillow Image to a QByteArray (byte buffer)
|
# Convert the Pillow Image to a QByteArray (byte buffer)
|
||||||
byte_array = QByteArray()
|
byte_array = QByteArray()
|
||||||
buffer = QBuffer(byte_array)
|
buffer = QBuffer(byte_array)
|
||||||
@@ -396,6 +405,8 @@ class MainWindow(QMainWindow):
|
|||||||
logger.error("Error loading image")
|
logger.error("Error loading image")
|
||||||
return
|
return
|
||||||
self.image_label.setPixmap(pixmap)
|
self.image_label.setPixmap(pixmap)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error loading image data: {e}")
|
||||||
|
|
||||||
def update_servers(self):
|
def update_servers(self):
|
||||||
found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames))
|
found_servers = list(set(ZeroconfServer.found_hostnames() + self.added_hostnames))
|
||||||
|
|||||||
@@ -32,20 +32,23 @@ class StatusBar(QStatusBar):
|
|||||||
|
|
||||||
# Check for status change every 1s on background thread
|
# Check for status change every 1s on background thread
|
||||||
while True:
|
while True:
|
||||||
|
try:
|
||||||
|
# update status label - get download status
|
||||||
new_status = proxy.status()
|
new_status = proxy.status()
|
||||||
new_image_name = image_names.get(new_status, 'Synchronize.png')
|
|
||||||
image_path = os.path.join(resources_dir(), new_image_name)
|
|
||||||
self.label.setPixmap((QPixmap(image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
|
||||||
|
|
||||||
# add download status
|
|
||||||
if EngineManager.download_tasks:
|
if EngineManager.download_tasks:
|
||||||
if len(EngineManager.download_tasks) == 1:
|
if len(EngineManager.download_tasks) == 1:
|
||||||
task = EngineManager.download_tasks[0]
|
task = EngineManager.download_tasks[0]
|
||||||
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
|
new_status = f"{new_status} | Downloading {task.engine.capitalize()} {task.version}..."
|
||||||
else:
|
else:
|
||||||
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
|
new_status = f"{new_status} | Downloading {len(EngineManager.download_tasks)} engines"
|
||||||
|
|
||||||
self.messageLabel.setText(new_status)
|
self.messageLabel.setText(new_status)
|
||||||
|
|
||||||
|
# update status image
|
||||||
|
new_image_name = image_names.get(new_status, 'Synchronize.png')
|
||||||
|
new_image_path = os.path.join(resources_dir(), new_image_name)
|
||||||
|
self.label.setPixmap((QPixmap(new_image_path).scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
||||||
|
except RuntimeError: # ignore runtime errors during shutdown
|
||||||
|
pass
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
background_thread = threading.Thread(target=background_update,)
|
background_thread = threading.Thread(target=background_update,)
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ class Config:
|
|||||||
max_content_path = 100000000
|
max_content_path = 100000000
|
||||||
server_log_level = 'debug'
|
server_log_level = 'debug'
|
||||||
log_buffer_length = 250
|
log_buffer_length = 250
|
||||||
subjob_connection_timeout = 120
|
worker_process_timeout = 120
|
||||||
flask_log_level = 'error'
|
flask_log_level = 'error'
|
||||||
flask_debug_enable = False
|
flask_debug_enable = False
|
||||||
queue_eval_seconds = 1
|
queue_eval_seconds = 1
|
||||||
@@ -28,7 +28,7 @@ class Config:
|
|||||||
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
|
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
|
||||||
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
|
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
|
||||||
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
|
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
|
||||||
cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout)
|
cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
|
||||||
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
|
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
|
||||||
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
|
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
|
||||||
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)
|
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ import logging
|
|||||||
import socket
|
import socket
|
||||||
|
|
||||||
from pubsub import pub
|
from pubsub import pub
|
||||||
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException
|
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser, ServiceStateChange, NonUniqueNameException, \
|
||||||
|
NotRunningException
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
@@ -31,12 +32,14 @@ class ZeroconfServer:
|
|||||||
def start(cls, listen_only=False):
|
def start(cls, listen_only=False):
|
||||||
if not cls.service_type:
|
if not cls.service_type:
|
||||||
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
|
raise RuntimeError("The 'configure' method must be run before starting the zeroconf server")
|
||||||
|
logger.debug("Starting zeroconf service")
|
||||||
if not listen_only:
|
if not listen_only:
|
||||||
cls._register_service()
|
cls._register_service()
|
||||||
cls._browse_services()
|
cls._browse_services()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def stop(cls):
|
def stop(cls):
|
||||||
|
logger.debug("Stopping zeroconf service")
|
||||||
cls._unregister_service()
|
cls._unregister_service()
|
||||||
cls.zeroconf.close()
|
cls.zeroconf.close()
|
||||||
|
|
||||||
@@ -73,6 +76,7 @@ class ZeroconfServer:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
|
def _on_service_discovered(cls, zeroconf, service_type, name, state_change):
|
||||||
|
try:
|
||||||
info = zeroconf.get_service_info(service_type, name)
|
info = zeroconf.get_service_info(service_type, name)
|
||||||
hostname = name.split(f'.{cls.service_type}')[0]
|
hostname = name.split(f'.{cls.service_type}')[0]
|
||||||
logger.debug(f"Zeroconf: {hostname} {state_change}")
|
logger.debug(f"Zeroconf: {hostname} {state_change}")
|
||||||
@@ -82,6 +86,8 @@ class ZeroconfServer:
|
|||||||
else:
|
else:
|
||||||
cls.client_cache.pop(hostname)
|
cls.client_cache.pop(hostname)
|
||||||
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
|
pub.sendMessage('zeroconf_state_change', hostname=hostname, state_change=state_change)
|
||||||
|
except NotRunningException:
|
||||||
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def found_hostnames(cls):
|
def found_hostnames(cls):
|
||||||
@@ -104,9 +110,15 @@ class ZeroconfServer:
|
|||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
import time
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080)
|
ZeroconfServer.configure("_zordon._tcp.local.", "foobar.local", 8080)
|
||||||
try:
|
try:
|
||||||
ZeroconfServer.start()
|
ZeroconfServer.start()
|
||||||
input("Server running - Press enter to end")
|
while True:
|
||||||
|
time.sleep(0.1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
finally:
|
finally:
|
||||||
ZeroconfServer.stop()
|
ZeroconfServer.stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user