Cpu benchmarks #48 (#76)

* Add benchmark.py

* Add cpu / disk benchmark APIs

* Add cpu_benchmark method to distributed_job_manager.py

* Make sure cpu_benchmark is an int

* Improve distributed_job_manager test
This commit is contained in:
2024-02-11 05:19:24 -06:00
committed by GitHub
parent 79db960383
commit a31fe98964
3 changed files with 163 additions and 10 deletions

View File

@@ -28,6 +28,7 @@ from src.utilities.misc_helper import system_safe_path, current_system_os, curre
current_system_os_version, num_to_alphanumeric current_system_os_version, num_to_alphanumeric
from src.utilities.server_helper import generate_thumbnail_for_job from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.zeroconf_server import ZeroconfServer
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
logger = logging.getLogger() logger = logging.getLogger()
server = Flask(__name__) server = Flask(__name__)
@@ -450,6 +451,17 @@ def get_renderer_help(renderer):
return f"Cannot find renderer '{renderer}'", 400 return f"Cannot find renderer '{renderer}'", 400
@server.get('/api/cpu_benchmark')
def get_cpu_benchmark_score():
return str(cpu_benchmark(10))
@server.get('/api/disk_benchmark')
def get_disk_benchmark():
results = disk_io_benchmark()
return {'write_speed': results[0], 'read_speed': results[-1]}
def start_server(): def start_server():
def eval_loop(delay_sec=1): def eval_loop(delay_sec=1):
while True: while True:

View File

@@ -4,8 +4,10 @@ import socket
import time import time
import zipfile import zipfile
import requests
from plyer import notification from plyer import notification
from pubsub import pub from pubsub import pub
from concurrent.futures import ThreadPoolExecutor
from src.api.server_proxy import RenderServerProxy from src.api.server_proxy import RenderServerProxy
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
@@ -291,7 +293,7 @@ class DistributedJobManager:
return post_results return post_results
@staticmethod @staticmethod
def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_count'): def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'):
""" """
Splits the frame range among available servers proportionally based on their performance (CPU count). Splits the frame range among available servers proportionally based on their performance (CPU count).
@@ -300,8 +302,9 @@ class DistributedJobManager:
end_frame (int): The end frame number of the animation to be rendered. end_frame (int): The end frame number of the animation to be rendered.
available_servers (list): A list of available server dictionaries. Each server dictionary should include available_servers (list): A list of available server dictionaries. Each server dictionary should include
'hostname' and 'cpu_count' keys (see find_available_servers). 'hostname' and 'cpu_count' keys (see find_available_servers).
method (str, optional): Specifies the distribution method. Possible values are 'cpu_count' and 'equally'. method (str, optional): Specifies the distribution method. Possible values are 'cpu_benchmark', 'cpu_count'
Defaults to 'cpu_count'. and 'evenly'.
Defaults to 'cpu_benchmark'.
Returns: Returns:
list: A list of server dictionaries where each dictionary includes the frame range and total number of list: A list of server dictionaries where each dictionary includes the frame range and total number of
@@ -310,7 +313,7 @@ class DistributedJobManager:
# Calculate respective frames for each server # Calculate respective frames for each server
def divide_frames_by_cpu_count(frame_start, frame_end, servers): def divide_frames_by_cpu_count(frame_start, frame_end, servers):
total_frames = frame_end - frame_start + 1 total_frames = frame_end - frame_start + 1
total_performance = sum(server['cpu_count'] for server in servers) total_cpus = sum(server['cpu_count'] for server in servers)
frame_ranges = {} frame_ranges = {}
current_frame = frame_start current_frame = frame_start
@@ -321,7 +324,47 @@ class DistributedJobManager:
# Give all remaining frames to the last server # Give all remaining frames to the last server
num_frames = total_frames - allocated_frames num_frames = total_frames - allocated_frames
else: else:
num_frames = round((server['cpu_count'] / total_performance) * total_frames) num_frames = round((server['cpu_count'] / total_cpus) * total_frames)
allocated_frames += num_frames
frame_end_for_server = current_frame + num_frames - 1
if current_frame <= frame_end_for_server:
frame_ranges[server['hostname']] = (current_frame, frame_end_for_server)
current_frame = frame_end_for_server + 1
return frame_ranges
def divide_frames_by_benchmark(frame_start, frame_end, servers):
def fetch_benchmark(server):
try:
benchmark = requests.get(f'http://{server["hostname"]}:{ZeroconfServer.server_port}'
f'/api/cpu_benchmark').text
server['cpu_benchmark'] = benchmark
logger.debug(f'Benchmark for {server["hostname"]}: {benchmark}')
except requests.exceptions.RequestException as e:
logger.error(f'Error fetching benchmark for {server["hostname"]}: {e}')
# Number of threads to use (can adjust based on your needs or number of servers)
threads = len(servers)
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(fetch_benchmark, servers)
total_frames = frame_end - frame_start + 1
total_performance = sum(int(server['cpu_benchmark']) for server in servers)
frame_ranges = {}
current_frame = frame_start
allocated_frames = 0
for i, server in enumerate(servers):
if i == len(servers) - 1: # if it's the last server
# Give all remaining frames to the last server
num_frames = total_frames - allocated_frames
else:
num_frames = round((int(server['cpu_benchmark']) / total_performance) * total_frames)
allocated_frames += num_frames allocated_frames += num_frames
frame_end_for_server = current_frame + num_frames - 1 frame_end_for_server = current_frame + num_frames - 1
@@ -350,12 +393,18 @@ class DistributedJobManager:
return frame_ranges return frame_ranges
if method == 'equally': if len(available_servers) == 1:
breakdown = divide_frames_equally(start_frame, end_frame, available_servers) breakdown = {available_servers[0]['hostname']: (start_frame, end_frame)}
# elif method == 'benchmark_score': # todo: implement benchmark score
# pass
else: else:
breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers) logger.debug(f'Splitting between {len(available_servers)} servers by {method} method')
if method == 'evenly':
breakdown = divide_frames_equally(start_frame, end_frame, available_servers)
elif method == 'cpu_benchmark':
breakdown = divide_frames_by_benchmark(start_frame, end_frame, available_servers)
elif method == 'cpu_count':
breakdown = divide_frames_by_cpu_count(start_frame, end_frame, available_servers)
else:
raise ValueError(f"Invalid distribution method: {method}")
server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None] server_breakdown = [server for server in available_servers if breakdown.get(server['hostname']) is not None]
for server in server_breakdown: for server in server_breakdown:
@@ -381,3 +430,17 @@ class DistributedJobManager:
available_servers.append(response) available_servers.append(response)
return available_servers return available_servers
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
ZeroconfServer.configure("_zordon._tcp.local.", 'testing', 8080)
ZeroconfServer.start(listen_only=True)
print("Starting Zeroconf...")
time.sleep(2)
available_servers = DistributedJobManager.find_available_servers('blender')
print(f"AVAILABLE SERVERS: {available_servers}")
results = DistributedJobManager.distribute_server_work(1, 100, available_servers)
print(f"RESULTS: {results}")
ZeroconfServer.stop()

View File

@@ -0,0 +1,78 @@
import concurrent.futures
import os
import time
import logging
logger = logging.getLogger()
def cpu_workload(n):
# Simple arithmetic operation for workload
while n > 0:
n -= 1
return n
def cpu_benchmark(duration_seconds=10):
# Determine the number of available CPU cores
num_cores = os.cpu_count()
# Calculate workload per core, assuming a large number for the workload
workload_per_core = 10000000
# Record start time
start_time = time.time()
# Use ProcessPoolExecutor to utilize all CPU cores
with concurrent.futures.ProcessPoolExecutor() as executor:
# Launching tasks for each core
futures = [executor.submit(cpu_workload, workload_per_core) for _ in range(num_cores)]
# Wait for all futures to complete, with a timeout to limit the benchmark duration
concurrent.futures.wait(futures, timeout=duration_seconds)
# Record end time
end_time = time.time()
# Calculate the total number of operations (workload) done by all cores
total_operations = workload_per_core * num_cores
# Calculate the total time taken
total_time = end_time - start_time
# Calculate operations per second as the score
score = total_operations / total_time
score = score * 0.0001
return int(score)
def disk_io_benchmark(file_size_mb=100, filename='benchmark_test_file'):
write_speed = None
read_speed = None
# Measure write speed
start_time = time.time()
with open(filename, 'wb') as f:
f.write(os.urandom(file_size_mb * 1024 * 1024)) # Write random bytes to file
end_time = time.time()
write_time = end_time - start_time
write_speed = file_size_mb / write_time
# Measure read speed
start_time = time.time()
with open(filename, 'rb') as f:
content = f.read()
end_time = time.time()
read_time = end_time - start_time
read_speed = file_size_mb / read_time
# Cleanup
os.remove(filename)
logger.debug(f"Disk Write Speed: {write_speed:.2f} MB/s")
logger.debug(f"Disk Read Speed: {read_speed:.2f} MB/s")
return write_speed, read_speed
if __name__ == '__main__':
print(cpu_benchmark())
print(disk_io_benchmark())