#!/usr/bin/env python3 import re from .render_worker import * class FFMPEG(BaseRenderEngine): @classmethod def version(cls): version = None try: ver_out = subprocess.check_output([cls.renderer_path(), '-version']).decode('utf-8') match = re.match(".*version\s*(\S+)\s*Copyright", ver_out) if match: version = match.groups()[0] except Exception as e: logger.error("Failed to get FFMPEG version: {}".format(e)) return version @classmethod def get_encoders(cls): encoders_raw = subprocess.check_output([cls.renderer_path(), '-encoders'], stderr=subprocess.DEVNULL).decode('utf-8') pattern = '(?P[VASFXBD.]{6})\s+(?P\S{2,})\s+(?P.*)' encoders = [m.groupdict() for m in re.finditer(pattern, encoders_raw)] return encoders @classmethod def get_formats(cls): formats_raw = subprocess.check_output([cls.renderer_path(), '-formats'], stderr=subprocess.DEVNULL).decode('utf-8') pattern = '(?P[DE]{1,2})\s+(?P\S{2,})\s+(?P.*)' formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)] return formats @classmethod def full_report(cls): return {'version': cls.version(), 'help_text': cls.get_help(), 'encoders': cls.get_encoders(), 'formats': cls.get_formats()} class FFMPEGRenderWorker(BaseRenderWorker): engine = FFMPEG def __init__(self, input_path, output_path, args=None): super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=True, args=args) stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105 input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y", "/dev/null"], stderr=subprocess.DEVNULL).decode('utf-8') found_frames = re.findall('frame=\s*(\d+)', stream_info) self.total_frames = found_frames[-1] if found_frames else '-1' self.frame = 0 # Stats self.current_frame = -1 def generate_worker_subprocess(self): cmd = [self.engine.renderer_path(), '-y', '-stats', '-i', self.input_path] # Resize frame if self.args.get('x_resolution', None) and self.args.get('y_resolution', None): cmd.extend(['-vf', f"scale={self.args['x_resolution']}:{self.args['y_resolution']}"]) # Convert raw args from string if available raw_args = self.args.get('raw', None) if raw_args: cmd.extend(raw_args.split(' ')) # Close with output path cmd.append(self.output_path) return cmd def percent_complete(self): return max(float(self.current_frame) / float(self.total_frames), 0.0) def _parse_stdout(self, line): pattern = re.compile(r'frame=\s*(?P\d+)\s*fps.*time=(?P\S+)') found = pattern.search(line) if found: stats = found.groupdict() self.current_frame = stats['current_frame'] time_elapsed = stats['time_elapsed'] elif "not found" in line: self.errors.append(line) if __name__ == '__main__': print(FFMPEG.full_report()) # logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG) # # test_movie = '/Users/brettwilliams/Desktop/dark_knight_rises.mp4' # # r = FFMPEGRenderWorker(test_movie, '/Users/brettwilliams/Desktop/test-ffmpeg.mp4', args=['-c:v', 'libx265', '-vtag', 'hvc1']) # # r = FFMPEGRenderer(test_movie, '/Users/brettwilliams/Desktop/dark_knight_rises-output.mp4') # r.start() # while r.is_running(): # time.sleep(1)