diff --git a/src/api/api_server.py b/src/api/api_server.py index 55b6ec4..6938d71 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -28,6 +28,7 @@ from src.utilities.misc_helper import system_safe_path, current_system_os, curre current_system_os_version, num_to_alphanumeric from src.utilities.server_helper import generate_thumbnail_for_job from src.utilities.zeroconf_server import ZeroconfServer +from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark logger = logging.getLogger() server = Flask(__name__) @@ -450,6 +451,17 @@ def get_renderer_help(renderer): return f"Cannot find renderer '{renderer}'", 400 +@server.get('/api/cpu_benchmark') +def get_cpu_benchmark_score(): + return str(cpu_benchmark(10)) + + +@server.get('/api/disk_benchmark') +def get_disk_benchmark(): + results = disk_io_benchmark() + return {'write_speed': results[0], 'read_speed': results[-1]} + + def start_server(): def eval_loop(delay_sec=1): while True: diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 28b3cf4..1ba9ac1 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -4,8 +4,10 @@ import socket import time import zipfile +import requests from plyer import notification from pubsub import pub +from concurrent.futures import ThreadPoolExecutor from src.api.server_proxy import RenderServerProxy from src.render_queue import RenderQueue @@ -291,7 +293,7 @@ class DistributedJobManager: return post_results @staticmethod - def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_count'): + def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'): """ Splits the frame range among available servers proportionally based on their performance (CPU count). @@ -300,8 +302,9 @@ class DistributedJobManager: end_frame (int): The end frame number of the animation to be rendered. available_servers (list): A list of available server dictionaries. Each server dictionary should include 'hostname' and 'cpu_count' keys (see find_available_servers). - method (str, optional): Specifies the distribution method. Possible values are 'cpu_count' and 'equally'. - Defaults to 'cpu_count'. + method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count' + and 'evenly'. + Defaults to 'cpu_benchmark'. Returns: list: A list of server dictionaries where each dictionary includes the frame range and total number of @@ -310,7 +313,7 @@ class DistributedJobManager: # Calculate respective frames for each server def divide_frames_by_cpu_count(frame_start, frame_end, servers): total_frames = frame_end - frame_start + 1 - total_performance = sum(server['cpu_count'] for server in servers) + total_cpus = sum(server['cpu_count'] for server in servers) frame_ranges = {} current_frame = frame_start @@ -321,7 +324,47 @@ class DistributedJobManager: # Give all remaining frames to the last server num_frames = total_frames - allocated_frames else: - num_frames = round((server['cpu_count'] / total_performance) * total_frames) + num_frames = round((server['cpu_count'] / total_cpus) * total_frames) + allocated_frames += num_frames + + frame_end_for_server = current_frame + num_frames - 1 + + if current_frame <= frame_end_for_server: + frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) + current_frame = frame_end_for_server + 1 + + return frame_ranges + + def divide_frames_by_benchmark(frame_start, frame_end, servers): + + def fetch_benchmark(server): + try: + benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}' + f'/api/cpu_benchmark').text + server['cpu_benchmark'] = benchmark + logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}') + except requests.exceptions.RequestException as e: + logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}') + + # Number of threads to use (can adjust based on your needs or number of servers) + threads = len(servers) + + with ThreadPoolExecutor(max_workers=threads) as executor: + executor.map(fetch_benchmark, servers) + + total_frames = frame_end - frame_start + 1 + total_performance = sum(int(server['cpu_benchmark']) for server in servers) + + frame_ranges = {} + current_frame = frame_start + allocated_frames = 0 + + for i, server in enumerate(servers): + if i == len(servers) - 1: # if it's the last server + # Give all remaining frames to the last server + num_frames = total_frames - allocated_frames + else: + num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames) allocated_frames += num_frames frame_end_for_server = current_frame + num_frames - 1 @@ -350,12 +393,18 @@ class DistributedJobManager: return frame_ranges - if method == 'equally': - breakdown = divide_frames_equally(start_frame, end_frame, available_servers) - # elif method == 'benchmark_score': # todo: implement benchmark score - # pass + if len(available_servers) == 1: + breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)} else: - breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) + logger.debug(f'Splitting between {len(available_servers)} servers by {method} method') + if method == 'evenly': + breakdown = divide_frames_equally(start_frame, end_frame, available_servers) + elif method == 'cpu_benchmark': + breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers) + elif method == 'cpu_count': + breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) + else: + raise ValueError(f"Invalid distribution method: {method}") server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] for server in server_breakdown: @@ -381,3 +430,17 @@ class DistributedJobManager: available_servers.append(response) return available_servers + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + ZeroconfServer.configure("_zordon._tcp.local.", 'testing', 8080) + ZeroconfServer.start(listen_only=True) + print("Starting Zeroconf...") + time.sleep(2) + available_servers = DistributedJobManager.find_available_servers('blender') + print(f"AVAILABLE SERVERS: {available_servers}") + results = DistributedJobManager.distribute_server_work(1, 100, available_servers) + print(f"RESULTS: {results}") + ZeroconfServer.stop() diff --git a/src/utilities/benchmark.py b/src/utilities/benchmark.py new file mode 100644 index 0000000..09a7836 --- /dev/null +++ b/src/utilities/benchmark.py @@ -0,0 +1,78 @@ +import concurrent.futures +import os +import time +import logging + +logger = logging.getLogger() + + +def cpu_workload(n): + # Simple arithmetic operation for workload + while n > 0: + n -= 1 + return n + + +def cpu_benchmark(duration_seconds=10): + # Determine the number of available CPU cores + num_cores = os.cpu_count() + + # Calculate workload per core, assuming a large number for the workload + workload_per_core = 10000000 + + # Record start time + start_time = time.time() + + # Use ProcessPoolExecutor to utilize all CPU cores + with concurrent.futures.ProcessPoolExecutor() as executor: + # Launching tasks for each core + futures = [executor.submit(cpu_workload, workload_per_core) for _ in range(num_cores)] + + # Wait for all futures to complete, with a timeout to limit the benchmark duration + concurrent.futures.wait(futures, timeout=duration_seconds) + + # Record end time + end_time = time.time() + + # Calculate the total number of operations (workload) done by all cores + total_operations = workload_per_core * num_cores + # Calculate the total time taken + total_time = end_time - start_time + # Calculate operations per second as the score + score = total_operations / total_time + score = score * 0.0001 + + return int(score) + + +def disk_io_benchmark(file_size_mb=100, filename='benchmark_test_file'): + write_speed = None + read_speed = None + + # Measure write speed + start_time = time.time() + with open(filename, 'wb') as f: + f.write(os.urandom(file_size_mb * 1024 * 1024)) # Write random bytes to file + end_time = time.time() + write_time = end_time - start_time + write_speed = file_size_mb / write_time + + # Measure read speed + start_time = time.time() + with open(filename, 'rb') as f: + content = f.read() + end_time = time.time() + read_time = end_time - start_time + read_speed = file_size_mb / read_time + + # Cleanup + os.remove(filename) + + logger.debug(f"Disk Write Speed: {write_speed:.2f} MB/s") + logger.debug(f"Disk Read Speed: {read_speed:.2f} MB/s") + return write_speed, read_speed + + +if __name__ == '__main__': + print(cpu_benchmark()) + print(disk_io_benchmark())