import concurrent.futures import os import time import logging logger = logging.getLogger() def cpu_workload(n): # Simple arithmetic operation for workload while n > 0: n -= 1 return n def cpu_benchmark(duration_seconds=10): # Determine the number of available CPU cores num_cores = os.cpu_count() # Calculate workload per core, assuming a large number for the workload workload_per_core = 10000000 # Record start time start_time = time.time() # Use ProcessPoolExecutor to utilize all CPU cores with concurrent.futures.ProcessPoolExecutor() as executor: # Launching tasks for each core futures = [executor.submit(cpu_workload, workload_per_core) for _ in range(num_cores)] # Wait for all futures to complete, with a timeout to limit the benchmark duration concurrent.futures.wait(futures, timeout=duration_seconds) # Record end time end_time = time.time() # Calculate the total number of operations (workload) done by all cores total_operations = workload_per_core * num_cores # Calculate the total time taken total_time = end_time - start_time # Calculate operations per second as the score score = total_operations / total_time score = score * 0.0001 return int(score) def disk_io_benchmark(file_size_mb=100, filename='benchmark_test_file'): write_speed = None read_speed = None # Measure write speed start_time = time.time() with open(filename, 'wb') as f: f.write(os.urandom(file_size_mb * 1024 * 1024)) # Write random bytes to file end_time = time.time() write_time = end_time - start_time write_speed = file_size_mb / write_time # Measure read speed start_time = time.time() with open(filename, 'rb') as f: content = f.read() end_time = time.time() read_time = end_time - start_time read_speed = file_size_mb / read_time # Cleanup os.remove(filename) logger.debug(f"Disk Write Speed: {write_speed:.2f} MB/s") logger.debug(f"Disk Read Speed: {read_speed:.2f} MB/s") return write_speed, read_speed if __name__ == '__main__': print(cpu_benchmark()) print(disk_io_benchmark())