#!/usr/bin/env python3 import logging import os import shutil import tempfile import zipfile from datetime import datetime import requests from tqdm import tqdm from werkzeug.utils import secure_filename from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue logger = logging.getLogger() def handle_uploaded_project_files(request, jobs_list, upload_directory): # Initialize default values loaded_project_local_path = None uploaded_project = request.files.get('file', None) project_url = jobs_list[0].get('url', None) local_path = jobs_list[0].get('local_path', None) renderer = jobs_list[0].get('renderer') downloaded_file_url = None if uploaded_project and uploaded_project.filename: referred_name = os.path.basename(uploaded_project.filename) elif project_url: referred_name, downloaded_file_url = download_project_from_url(project_url) if not referred_name: raise ValueError(f"Error downloading file from URL: {project_url}") elif local_path and os.path.exists(local_path): referred_name = os.path.basename(local_path) else: raise ValueError("Cannot find any valid project paths") # Prepare the local filepath cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_') job_dir = os.path.join(upload_directory, '-'.join( [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) os.makedirs(job_dir, exist_ok=True) project_source_dir = os.path.join(job_dir, 'source') os.makedirs(project_source_dir, exist_ok=True) # Move projects to their work directories if uploaded_project and uploaded_project.filename: loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename)) uploaded_project.save(loaded_project_local_path) logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}") elif project_url: loaded_project_local_path = os.path.join(project_source_dir, referred_name) shutil.move(downloaded_file_url, loaded_project_local_path) logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}") elif local_path: loaded_project_local_path = os.path.join(project_source_dir, referred_name) shutil.copy(local_path, loaded_project_local_path) logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}") return loaded_project_local_path, referred_name def download_project_from_url(project_url): # This nested function is to handle downloading from a URL logger.info(f"Downloading project from url: {project_url}") referred_name = os.path.basename(project_url) downloaded_file_url = None try: response = requests.get(project_url, stream=True) if response.status_code == 200: # Get the total file size from the "Content-Length" header file_size = int(response.headers.get("Content-Length", 0)) # Create a progress bar using tqdm progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) # Open a file for writing in binary mode downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name) with open(downloaded_file_url, "wb") as file: for chunk in response.iter_content(chunk_size=1024): if chunk: # Write the chunk to the file file.write(chunk) # Update the progress bar progress_bar.update(len(chunk)) # Close the progress bar progress_bar.close() else: return None, None except Exception as e: logger.error(f"Error downloading file: {e}") return None, None return referred_name, downloaded_file_url def process_zipped_project(zip_path): # Given a zip path, extract its content, and return the main project file path work_path = os.path.dirname(zip_path) try: with zipfile.ZipFile(zip_path, 'r') as myzip: myzip.extractall(work_path) project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] project_files = [x for x in project_files if '.zip' not in x] logger.debug(f"Zip files: {project_files}") # supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions # if supported_exts: # project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] # If there's more than 1 project file or none, raise an error if len(project_files) != 1: raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}') extracted_project_path = os.path.join(work_path, project_files[0]) logger.info(f"Extracted zip file to {extracted_project_path}") except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: logger.error(f"Error processing zip file: {e}") raise ValueError(f"Error processing zip file: {e}") return extracted_project_path def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_split_jobs=False): results = [] for job_data in jobs_list: try: # prepare output paths output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output') os.makedirs(output_dir, exist_ok=True) # get new output path in output_dir output_path = job_data.get('output_path') if not output_path: loaded_project_filename = os.path.basename(loaded_project_local_path) output_filename = os.path.splitext(loaded_project_filename)[0] else: output_filename = os.path.basename(output_path) output_path = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output', output_filename) logger.debug(f"New job output path: {output_path}") # create & configure jobs worker = EngineManager.create_worker(renderer=job_data['renderer'], input_path=loaded_project_local_path, output_path=output_path, engine_version=job_data.get('engine_version'), args=job_data.get('args', {})) worker.status = job_data.get("initial_status", worker.status) worker.parent = job_data.get("parent", worker.parent) worker.name = job_data.get("name", worker.name) worker.priority = int(job_data.get('priority', worker.priority)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) # determine if we can / should split the job if enable_split_jobs and (worker.total_frames > 1) and not worker.parent: DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) else: logger.debug("Not splitting into subjobs") RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) if not worker.parent: from src.api.api_server import make_job_ready make_job_ready(worker.id) results.append(worker.json()) except FileNotFoundError as e: err_msg = f"Cannot create job: {e}" logger.error(err_msg) results.append({'error': err_msg}) except Exception as e: err_msg = f"Exception creating render job: {e}" logger.exception(err_msg) results.append({'error': err_msg}) return results