Add missing docstrings and pylint improvements (#130)

This commit is contained in:
2026-01-20 23:13:38 -06:00
committed by GitHub
parent 74dce5cc3d
commit e8992fc91a
8 changed files with 312 additions and 83 deletions

View File

@@ -300,26 +300,7 @@ def add_job_handler():
return err_msg, 500
try:
loaded_project_local_path = processed_job_data['__loaded_project_local_path']
created_jobs = []
if processed_job_data.get("child_jobs"):
for child_job_diffs in processed_job_data["child_jobs"]:
processed_child_job_data = processed_job_data.copy()
processed_child_job_data.pop("child_jobs")
processed_child_job_data.update(child_job_diffs)
child_job = DistributedJobManager.create_render_job(processed_child_job_data, loaded_project_local_path)
created_jobs.append(child_job)
else:
new_job = DistributedJobManager.create_render_job(processed_job_data, loaded_project_local_path)
created_jobs.append(new_job)
# Save notes to .txt
if processed_job_data.get("notes"):
parent_dir = Path(loaded_project_local_path).parent.parent
notes_name = processed_job_data['name'] + "-notes.txt"
with (Path(parent_dir) / notes_name).open("w") as f:
f.write(processed_job_data["notes"])
return [x.json() for x in created_jobs]
return JobImportHandler.create_jobs_from_processed_data(processed_job_data)
except Exception as e:
logger.exception(f"Error creating render job: {e}")
return 'unknown error', 500
@@ -447,6 +428,7 @@ def engine_info():
return result
except Exception as e:
traceback.print_exc(e)
logger.error(f"Error fetching details for engine '{engine.name()}': {e}")
return {}
@@ -650,6 +632,7 @@ def handle_404(error):
@server.errorhandler(Exception)
def handle_general_error(general_error):
traceback.print_exception(type(general_error), general_error, general_error.__traceback__)
err_msg = f"Server error: {general_error}"
logger.error(err_msg)
return err_msg, 500

View File

@@ -5,19 +5,77 @@ import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests
from pathlib import Path
from tqdm import tqdm
from werkzeug.utils import secure_filename
from distributed_job_manager import DistributedJobManager
logger = logging.getLogger()
class JobImportHandler:
"""Handles job import operations for rendering projects.
This class provides functionality to validate, download, and process
job data and project files for the rendering queue system.
"""
@classmethod
def create_jobs_from_processed_data(cls, processed_job_data: dict) -> list[dict]:
""" Takes processed job data and creates new jobs
Args: processed_job_data: Dictionary containing job information"""
loaded_project_local_path = processed_job_data['__loaded_project_local_path']
# prepare child job data
job_data_to_create = []
if processed_job_data.get("child_jobs"):
for child_job_diffs in processed_job_data["child_jobs"]:
processed_child_job_data = processed_job_data.copy()
processed_child_job_data.pop("child_jobs")
processed_child_job_data.update(child_job_diffs)
job_data_to_create.append(processed_child_job_data)
else:
job_data_to_create.append(processed_job_data)
# create the jobs
created_jobs = []
for job_data in job_data_to_create:
new_job = DistributedJobManager.create_render_job(job_data, loaded_project_local_path)
created_jobs.append(new_job)
# Save notes to .txt
if processed_job_data.get("notes"):
parent_dir = Path(loaded_project_local_path).parent.parent
notes_name = processed_job_data['name'] + "-notes.txt"
with (Path(parent_dir) / notes_name).open("w") as f:
f.write(processed_job_data["notes"])
return [x.json() for x in created_jobs]
@classmethod
def validate_job_data(cls, new_job_data: dict, upload_directory: Path, uploaded_file=None) -> dict:
"""Validates and prepares job data for import.
This method validates the job data dictionary, handles project file
acquisition (upload, download, or local copy), and prepares the job
directory structure.
Args:
new_job_data: Dictionary containing job information including
'name', 'engine_name', and optionally 'url' or 'local_path'.
upload_directory: Base directory for storing uploaded jobs.
uploaded_file: Optional uploaded file object from the request.
Returns:
The validated job data dictionary with additional metadata.
Raises:
KeyError: If required fields 'name' or 'engine_name' are missing.
FileNotFoundError: If no valid project file can be found.
"""
loaded_project_local_path = None
# check for required keys
@@ -25,7 +83,7 @@ class JobImportHandler:
engine_name = new_job_data.get('engine_name')
if not job_name:
raise KeyError("Missing job name")
elif not engine_name:
if not engine_name:
raise KeyError("Missing engine name")
project_url = new_job_data.get('url', None)
@@ -45,8 +103,7 @@ class JobImportHandler:
# Prepare the local filepath
cleaned_path_name = job_name.replace(' ', '-')
timestamp = datetime.now().strftime("%Y.%m.%d_%H.%M.%S")
folder_name = f"{cleaned_path_name}-{engine_name}-{timestamp}"
folder_name = f"{cleaned_path_name}-{engine_name}-{datetime.now().strftime('%Y.%m.%d_%H.%M.%S')}"
job_dir = Path(upload_directory) / folder_name
os.makedirs(job_dir, exist_ok=True)
project_source_dir = Path(job_dir) / 'source'
@@ -81,12 +138,24 @@ class JobImportHandler:
@staticmethod
def download_project_from_url(project_url: str):
"""Downloads a project file from the given URL.
Downloads the file from the specified URL to a temporary directory
with progress tracking. Returns the filename and temporary path.
Args:
project_url: The URL to download the project file from.
Returns:
A tuple of (filename, temp_file_path) if successful,
(None, None) if download fails.
"""
# This nested function is to handle downloading from a URL
logger.info(f"Downloading project from url: {project_url}")
referred_name = os.path.basename(project_url)
try:
response = requests.get(project_url, stream=True)
response = requests.get(project_url, stream=True, timeout=300)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
@@ -113,8 +182,8 @@ class JobImportHandler:
"""
Processes a zipped project.
This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file.
If the zip file contains more than one project file or none, an error is raised.
This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project
file. If the zip file contains more than one project file or none, an error is raised.
Args:
zip_path (Path): The path to the zip file.
@@ -148,5 +217,5 @@ class JobImportHandler:
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}")
raise ValueError(f'Error processing zip file: {e}') from e
return extracted_project_path

View File

@@ -12,12 +12,20 @@ supported_image_formats = ['.jpg', '.png', '.exr', '.tif', '.tga', '.bmp', '.web
class PreviewManager:
"""Manages generation, storage, and retrieval of preview images and videos for rendering jobs."""
storage_path = None
_running_jobs = {}
@classmethod
def __generate_job_preview_worker(cls, job, replace_existing=False, max_width=480):
"""Generates image and video previews for a given job.
Args:
job: The job object containing file information.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
max_width (int): Maximum width for the preview images/videos. Defaults to 480.
"""
# Determine best source file to use for thumbs
job_file_list = job.file_list()
@@ -67,20 +75,36 @@ class PreviewManager:
@classmethod
def update_previews_for_job(cls, job, replace_existing=False, wait_until_completion=False, timeout=None):
"""Updates previews for a given job by starting a background thread.
Args:
job: The job object.
replace_existing (bool): Whether to replace existing previews. Defaults to False.
wait_until_completion (bool): Whether to wait for the thread to complete. Defaults to False.
timeout (float): Timeout for waiting, if applicable.
"""
job_thread = cls._running_jobs.get(job.id)
if job_thread and job_thread.is_alive():
logger.debug(f'Preview generation job already running for {job}')
else:
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
return
job_thread = threading.Thread(target=cls.__generate_job_preview_worker, args=(job, replace_existing,))
job_thread.start()
cls._running_jobs[job.id] = job_thread
if wait_until_completion:
job_thread.join(timeout=timeout)
@classmethod
def get_previews_for_job(cls, job):
"""Retrieves previews for a given job.
Args:
job: The job object.
Returns:
dict: A dictionary containing preview information.
"""
results = {}
try:
directory_path = Path(cls.storage_path)
@@ -103,6 +127,11 @@ class PreviewManager:
@classmethod
def delete_previews_for_job(cls, job):
"""Deletes all previews associated with a given job.
Args:
job: The job object.
"""
all_previews = cls.get_previews_for_job(job)
flattened_list = [item for sublist in all_previews.values() for item in sublist]
for preview in flattened_list:

View File

@@ -19,6 +19,7 @@ class BaseRenderEngine(object):
"""
install_paths: List[str] = []
binary_names: Dict[str, str] = {}
# --------------------------------------------
# Required Overrides for Subclasses:

View File

@@ -122,7 +122,7 @@ class EngineManager:
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
# Find the path to the binary file
search_root = cls.engines_path / directory
search_root = Path(cls.engines_path) / directory
match = next((p for p in search_root.rglob(binary_name) if p.is_file()), None)
path = str(match) if match else None
result_dict['path'] = path
@@ -458,27 +458,27 @@ class EngineDownloadWorker(threading.Thread):
self.percent_complete = current_progress
def run(self):
"""Execute the download process.
"""Execute the download process.
Checks if engine version already exists, then downloads if not found.
Handles cleanup and error reporting.
"""
try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True)
if existing_download:
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download
Checks if engine version already exists, then downloads if not found.
Handles cleanup and error reporting.
"""
try:
existing_download = EngineManager.is_version_installed(self.engine, self.version, self.system_os, self.cpu,
ignore_system=True)
if existing_download:
logger.info(f"Requested download of {self.engine} {self.version}, but local copy already exists")
return existing_download
# Get the appropriate downloader class based on the engine type
downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine( self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e:
logger.error(f"Error in download worker: {e}")
finally:
# remove itself from the downloader list
EngineManager.download_tasks.remove(self)
# Get the appropriate downloader class based on the engine type
downloader = EngineManager.engine_class_with_name(self.engine).downloader()
downloader.download_engine( self.version, download_location=EngineManager.engines_path,
system_os=self.system_os, cpu=self.cpu, timeout=300, progress_callback=self._update_progress)
except Exception as e:
logger.error(f"Error in download worker: {e}")
finally:
# remove itself from the downloader list
EngineManager.download_tasks.remove(self)
if __name__ == '__main__':

View File

@@ -1,4 +1,5 @@
import logging
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
@@ -122,7 +123,6 @@ class RenderQueue:
cls.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(cls.engine)
cls.session = sessionmaker(bind=cls.engine)()
from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
@@ -135,7 +135,7 @@ class RenderQueue:
logger.debug("Closing session")
cls.stop()
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
[cls.cancel_job(job) for job in running_jobs]
_ = [cls.cancel_job(job) for job in running_jobs]
cls.save_state()
cls.session.close()
@@ -145,7 +145,6 @@ class RenderQueue:
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.engine_name for x in cls.running_jobs()]
return Counter(all_instances)