import json import re from src.engines.core.base_engine import * class FFMPEG(BaseRenderEngine): binary_names = {'linux': 'ffmpeg', 'windows': 'ffmpeg.exe', 'macos': 'ffmpeg'} @staticmethod def downloader(): from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader return FFMPEGDownloader @staticmethod def worker_class(): from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker return FFMPEGRenderWorker @classmethod def supported_extensions(cls): help_text = (subprocess.check_output([cls().renderer_path(), '-h', 'full'], stderr=subprocess.STDOUT) .decode('utf-8')) found = re.findall('extensions that .* is allowed to access \(default "(.*)"', help_text) found_extensions = set() for match in found: found_extensions.update(match.split(',')) return list(found_extensions) def version(self): version = None try: ver_out = subprocess.check_output([self.renderer_path(), '-version'], timeout=SUBPROCESS_TIMEOUT).decode('utf-8') match = re.match(".*version\s*(\S+)\s*Copyright", ver_out) if match: version = match.groups()[0] except Exception as e: logger.error("Failed to get FFMPEG version: {}".format(e)) return version def get_project_info(self, project_path, timeout=10): try: # Run ffprobe and parse the output as JSON cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'v', project_path ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) video_info = json.loads(result.stdout) # Extract the necessary information video_stream = video_info['streams'][0] frame_rate = eval(video_stream['r_frame_rate']) duration = float(video_stream['duration']) width = video_stream['width'] height = video_stream['height'] # Calculate total frames (end frame) total_frames = int(duration * frame_rate) end_frame = total_frames - 1 # The start frame is typically 0 start_frame = 0 return { 'frame_start': start_frame, 'frame_end': end_frame, 'fps': frame_rate, 'resolution_x': width, 'resolution_y': height } except Exception as e: print(f"An error occurred: {e}") return None def get_encoders(self): raw_stdout = subprocess.check_output([self.renderer_path(), '-encoders'], stderr=subprocess.DEVNULL, timeout=SUBPROCESS_TIMEOUT).decode('utf-8') pattern = '(?P[VASFXBD.]{6})\s+(?P\S{2,})\s+(?P.*)' encoders = [m.groupdict() for m in re.finditer(pattern, raw_stdout)] return encoders def get_formats(self): encoders = [x['name'] for x in self.get_all_formats() if 'E' in x['type']] return encoders def get_all_formats(self): try: formats_raw = subprocess.check_output([self.renderer_path(), '-formats'], stderr=subprocess.DEVNULL, timeout=SUBPROCESS_TIMEOUT).decode('utf-8') pattern = '(?P[DE]{1,2})\s+(?P\S{2,})\s+(?P.*)' all_formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)] return all_formats except Exception as e: logger.error(f"Error getting all formats: {e}") return [] def extension_for_format(self, ffmpeg_format): # Extract the common extension using regex muxer_flag = 'muxer' if 'E' in ffmpeg_format['type'] else 'demuxer' format_detail_raw = subprocess.check_output( [self.renderer_path(), '-hide_banner', '-h', f"{muxer_flag}={ffmpeg_format['id']}"]).decode('utf-8') pattern = r"Common extensions: (\w+)" common_extensions = re.findall(pattern, format_detail_raw) found_extensions = [] if common_extensions: found_extensions = common_extensions[0].split(',') return found_extensions def get_output_formats(self): return [x['id'] for x in self.get_all_formats() if 'E' in x['type'].upper()] def get_frame_count(self, path_to_file): raw_stdout = subprocess.check_output([self.renderer_path(), '-i', path_to_file, '-map', '0:v:0', '-c', 'copy', '-f', 'null', '-'], stderr=subprocess.STDOUT, timeout=SUBPROCESS_TIMEOUT).decode('utf-8') match = re.findall(r'frame=\s*(\d+)', raw_stdout) if match: frame_number = int(match[-1]) return frame_number def get_arguments(self): help_text = (subprocess.check_output([self.renderer_path(), '-h', 'long'], stderr=subprocess.STDOUT) .decode('utf-8')) lines = help_text.splitlines() options = {} current_category = None for line in lines: line = line.strip() if line.endswith(":") and "options" in line.lower(): current_category = line.split('options')[0].strip() options[current_category] = {} elif line: # Ignore blank lines split_line = line.split(' ', 2) flag = split_line[0] argument = split_line[1] if len(split_line) > 1 else '' description = split_line[2] if len(split_line) > 2 else '' if current_category: name = flag.strip('-') options[current_category][name] = { 'flag': flag, 'description': description.strip(), 'takes_argument': bool(argument) } return options if __name__ == "__main__": print(FFMPEG().get_all_formats())