#!/usr/bin/env python3 """ Zoom in on video motion. """ import pyffstream import numpy as np import PIL.Image import cv2 import scipy.signal def args_pre(parser): # Add arguments. parser.add_argument( '--margins', metavar=('L', 'R', 'T', 'B'), type=float, nargs=4, default=[0, 0, 0, 0], help=""" margins (left, right, top, bottom, in percent) of output video (default: %(default)s) """) parser.add_argument( '--blur-factor', metavar='F', type=float, default=0.05, help="blur size factor (default: %(default)s)") parser.add_argument( '--blur-threshold', metavar='T', type=int, default=32, help="blur threshold (default: %(default)s)") parser.add_argument( '--lowpass-factor', metavar='F', type=float, default=0.00015, help="low-pass filter cutoff frequency factor (default: %(default)s)") def init(args): # Set arguments. args.history = int(30 * args.output_fps) args.blur_size = ( int(np.ceil(args.working_width * args.blur_factor)) // 2 * 2 + 1 ) args.lost_size = args.working_width * args.working_height * 0.001 args.b, args.a = scipy.signal.butter( 1, args.working_width / args.output_fps * args.lowpass_factor ) args.history_track_ratio = 0.025 args.resample_ratios = [ # (1.0, PIL.Image.NEAREST), (0.5, PIL.Image.BILINEAR), (0.2, PIL.Image.BICUBIC), (0.0, PIL.Image.LANCZOS), ] # Set state. class State: pass state = State() state.background_subtractor = cv2.createBackgroundSubtractorMOG2( history=args.history ) state.filter_state = [] return state def process(args, state, frame, frame_num): # Create debug frame. if args.debug: debug_frame = frame.copy() else: debug_frame = None # Subtract background, blur and threshold. foreground = state.background_subtractor.apply(frame) mask = cv2.compare( cv2.GaussianBlur(foreground, (args.blur_size, args.blur_size), 0), args.blur_threshold, cv2.CMP_GE, ) if args.debug: debug_frame[mask > 0] = (0, 255, 0) debug_frame[foreground > 0] = (255, 0, 0) # Nothing interesting? if np.count_nonzero(mask) < args.lost_size: # Reset rectangle. x, y, w, h = 0, 0, args.working_width, args.working_height else: # Find bounding rectangle. x, y, w, h = cv2.boundingRect(mask) if args.debug: cv2.rectangle( debug_frame, (x, y), (x+w, y+h), (0, 255, 0), 2 * args.thickness ) # Add rectangle margins. ml, mr, mt, mb = args.margins m = max(w, h) x = max(x - int(m * ml / 100), 0) y = max(y - int(m * mt / 100), 0) w = min(w + int(m * (ml+mr) / 100), args.working_width - x) h = min(h + int(m * (mt+mb) / 100), args.working_height - y) if args.debug: cv2.rectangle( debug_frame, (x, y), (x+w, y+h), (0, 0, 255), 2 * args.thickness ) # Filter rectangle. x1, y1, x2, y2 = x, y, x+w, y+h if frame_num == args.start_frame: state.filter_state = [ coord * scipy.signal.lfilter_zi(args.b, args.a) for coord in (x1, y1, x2, y2) ] (x1, y1, x2, y2), filter_state_next = zip(*( scipy.signal.lfilter(args.b, args.a, [coord], zi=zi) for coord, zi in zip((x1, y1, x2, y2), state.filter_state) )) if frame_num >= args.start_frame + args.history * args.history_track_ratio: state.filter_state = filter_state_next x1, y1, x2, y2 = [int(coord[0]) for coord in (x1, y1, x2, y2)] x, y, w, h = x1, y1, x2-x1, y2-y1 if args.debug: cv2.rectangle( debug_frame, (x, y), (x+w, y+h), (255, 0, 255), 2 * args.thickness ) # Fix rectangle. x, y, w, h = pyffstream.fix_rect(args, x, y, w, h) # Determine resampling method. for i, (ratio, resample) in enumerate(args.resample_ratios): if min(w / args.output_width, h / args.output_height) >= ratio: break if args.debug: color_coeff = i / max(1, len(args.resample_ratios) - 1) color = ( 255 * (0 + color_coeff), 255 * (1 - color_coeff), 0, ) cv2.rectangle( debug_frame, (x, y), (x+w, y+h), color, 2 * args.thickness, ) # Cut and resize. output_frame = pyffstream.resize( frame[y:y+h, x:x+w], args.output_width, args.output_height, resample ) # Return. return output_frame, debug_frame def main(): pyffstream.run(__doc__, process, init, args_pre) if __name__ == '__main__': main()