Code refactoring

This commit is contained in:
Brett Williams
2024-08-11 11:54:15 -05:00
parent d38e10ae9f
commit 33adcac592
2 changed files with 191 additions and 229 deletions

View File

@@ -1,47 +1,200 @@
import logging
import os
import subprocess
import threading
import zipfile
from concurrent.futures import ThreadPoolExecutor
from src.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
import requests
from src.api.server_proxy import RenderServerProxy
from src.utilities.misc_helper import get_file_size_human
from src.utilities.zeroconf_server import ZeroconfServer
logger = logging.getLogger()
def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_width=320):
def download_missing_frames_from_subjob(local_job, subjob_id, subjob_hostname):
success = True
try:
local_files = [os.path.basename(x) for x in local_job.file_list()]
subjob_proxy = RenderServerProxy(subjob_hostname)
subjob_files = subjob_proxy.get_job_files_list(job_id=subjob_id) or []
# Simple thread to generate thumbs in background
def generate_thumb_thread(source):
in_progress_path = thumb_video_path + '_IN-PROGRESS'
subprocess.run(['touch', in_progress_path])
try:
logger.debug(f"Generating video thumbnail for {source}")
generate_thumbnail(source_path=source, dest_path=thumb_video_path, max_width=max_width)
except subprocess.CalledProcessError as err:
logger.error(f"Error generating video thumbnail for {source}: {err}")
for subjob_filename in subjob_files:
if subjob_filename not in local_files:
try:
logger.debug(f"Downloading new file '{subjob_filename}' from {subjob_hostname}")
local_save_path = os.path.join(os.path.dirname(local_job.output_path), subjob_filename)
subjob_proxy.download_job_file(job_id=subjob_id, job_filename=subjob_filename,
save_path=local_save_path)
logger.debug(f'Downloaded successfully - {local_save_path}')
except Exception as e:
logger.error(f"Error downloading file '{subjob_filename}' from {subjob_hostname}: {e}")
success = False
except Exception as e:
logger.exception(f'Uncaught exception while trying to download from subjob: {e}')
success = False
return success
try:
os.remove(in_progress_path)
except FileNotFoundError:
pass
# Determine best source file to use for thumbs
source_files = job.file_list() or [job.input_path]
if source_files:
video_formats = ['.mp4', '.mov', '.avi', '.mpg', '.mpeg', '.mxf', '.m4v', 'mkv']
image_formats = ['.jpg', '.png', '.exr']
def download_all_from_subjob(local_job, subjob_id, subjob_hostname):
"""
Downloads and extracts files from a completed subjob on a remote server.
image_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in image_formats]
video_files = [f for f in source_files if os.path.splitext(f)[-1].lower() in video_formats]
Parameters:
local_job (BaseRenderWorker): The local parent job worker.
subjob_id (str or int): The ID of the subjob.
subjob_hostname (str): The hostname of the remote server where the subjob is located.
if (video_files or image_files) and not os.path.exists(thumb_image_path):
Returns:
bool: True if the files have been downloaded and extracted successfully, False otherwise.
"""
child_key = f'{subjob_id}@{subjob_hostname}'
logname = f"{local_job.id}:{child_key}"
zip_file_path = local_job.output_path + f'_{subjob_hostname}_{subjob_id}.zip'
# download zip file from server
try:
local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).download_all_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e:
logger.error(f"Error downloading files from remote server: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
# extract zip
try:
logger.debug(f"Extracting zip file: {zip_file_path}")
extract_path = os.path.dirname(zip_file_path)
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
logger.info(f"Successfully extracted zip to: {extract_path}")
os.remove(zip_file_path)
local_job.children[child_key]['download_status'] = 'complete'
except Exception as e:
logger.exception(f"Exception extracting zip file: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return local_job.children[child_key].get('download_status', None) == 'complete'
def distribute_server_work(start_frame, end_frame, available_servers, method='evenly'):
"""
Splits the frame range among available servers proportionally based on their performance (CPU count).
Args:
start_frame (int): The start frame number of the animation to be rendered.
end_frame (int): The end frame number of the animation to be rendered.
available_servers (list): A list of available server dictionaries. Each server dictionary should include
'hostname' and 'cpu_count' keys (see find_available_servers).
method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count'
and 'evenly'.
Defaults to 'cpu_benchmark'.
Returns:
list: A list of server dictionaries where each dictionary includes the frame range and total number of
frames to be rendered by the server.
"""
# Calculate respective frames for each server
def divide_frames_by_cpu_count(frame_start, frame_end, servers):
total_frames = frame_end - frame_start + 1
total_cpus = sum(server['cpu_count'] for server in servers)
frame_ranges = {}
current_frame = frame_start
allocated_frames = 0
for i, server in enumerate(servers):
if i == len(servers) - 1: # if it's the last server
# Give all remaining frames to the last server
num_frames = total_frames - allocated_frames
else:
num_frames = round((server['cpu_count'] / total_cpus) * total_frames)
allocated_frames += num_frames
frame_end_for_server = current_frame + num_frames - 1
if current_frame <= frame_end_for_server:
frame_ranges[server['hostname']] = (current_frame, frame_end_for_server)
current_frame = frame_end_for_server + 1
return frame_ranges
def divide_frames_by_benchmark(frame_start, frame_end, servers):
def fetch_benchmark(server):
try:
path_of_source = image_files[0] if image_files else video_files[0]
logger.debug(f"Generating image thumbnail for {path_of_source}")
save_first_frame(source_path=path_of_source, dest_path=thumb_image_path, max_width=max_width)
except Exception as e:
logger.error(f"Exception saving first frame: {e}")
benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}'
f'/api/cpu_benchmark').text
server['cpu_benchmark'] = benchmark
logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}')
except requests.exceptions.RequestException as e:
logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}')
if video_files and not os.path.exists(thumb_video_path):
x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],))
x.start()
# Number of threads to use (can adjust based on your needs or number of servers)
threads = len(servers)
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(fetch_benchmark, servers)
total_frames = frame_end - frame_start + 1
total_performance = sum(int(server['cpu_benchmark']) for server in servers)
frame_ranges = {}
current_frame = frame_start
allocated_frames = 0
for i, server in enumerate(servers):
if i == len(servers) - 1: # if it's the last server
# Give all remaining frames to the last server
num_frames = total_frames - allocated_frames
else:
num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames)
allocated_frames += num_frames
frame_end_for_server = current_frame + num_frames - 1
if current_frame <= frame_end_for_server:
frame_ranges[server['hostname']] = (current_frame, frame_end_for_server)
current_frame = frame_end_for_server + 1
return frame_ranges
def divide_frames_equally(frame_start, frame_end, servers):
frame_range = frame_end - frame_start + 1
frames_per_server = frame_range // len(servers)
leftover_frames = frame_range % len(servers)
frame_ranges = {}
current_start = frame_start
for i, server in enumerate(servers):
current_end = current_start + frames_per_server - 1
if leftover_frames > 0:
current_end += 1
leftover_frames -= 1
if current_start <= current_end:
frame_ranges[server['hostname']] = (current_start, current_end)
current_start = current_end + 1
return frame_ranges
if len(available_servers) == 1:
breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)}
else:
logger.debug(f'Splitting between {len(available_servers)} servers by {method} method')
if method == 'evenly':
breakdown = divide_frames_equally(start_frame, end_frame, available_servers)
elif method == 'cpu_benchmark':
breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers)
elif method == 'cpu_count':
breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers)
else:
raise ValueError(f"Invalid distribution method: {method}")
server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None]
for server in server_breakdown:
server['frame_range'] = breakdown[server['hostname']]
server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1
return server_breakdown