pyffstream.py
0be6a56b
 """
 Python library to ease processing of video frames.
 """
 
 import sys
 import os
 import signal
 import argparse
 import json
 import datetime
 import shlex
 import ffmpeg  # ffmpeg-python
 import numpy as np
 import PIL.Image
 import cv2
 
 
 def error(message):
     print(f"{sys.argv[0]}: Error: {message}", file=sys.stderr)
     exit(1)
 
 
 def resize(image, width, height, resample):
     return np.array(
         PIL.Image
         .fromarray(image)
         .resize((width, height), resample)
     )
 
 
 def text(image, text, x, y, thickness, font=cv2.FONT_HERSHEY_SIMPLEX):
     margin = 10
     height, width = image.shape[:2]
     size, _ = cv2.getTextSize(text, font, 1, 1)
     scale = min(1, (width - 2 * margin) / size[0])
     x = np.clip(x, margin, width - int(scale * size[0]) - margin)
     y = np.clip(y, margin + int(scale * size[1]), height - margin)
     black = (0, 0, 0)
     white = (255, 255, 255)
     image = cv2.putText(image, text, (x, y), font, scale, black, 3 * thickness)
     image = cv2.putText(image, text, (x, y), font, scale, white, thickness)
     return image
 
 
 def fix_rect(args, x, y, w, h):
     # Grow into correct aspect ratio.
     nw = max(w, int(h * args.output_aspect))
     nh = max(h, int(w / args.output_aspect))
     # Optionally shrink into working image size.
     scale = min(1, args.working_width / nw, args.working_height / nh)
     nw = int(nw * scale)
     nh = int(nh * scale)
     # Center new region on old region.
     nx = x - (nw - w) // 2
     ny = y - (nh - h) // 2
     # Push into working image.
     nx = max(0, nx) - max(0, (nx+nw) - args.working_width)
     ny = max(0, ny) - max(0, (ny+nh) - args.working_height)
     # Return.
     return nx, ny, nw, nh
 
 
 def run(description, process, init=None, args_pre=None, args_post=None):
     # Parse arguments
     parser = argparse.ArgumentParser(description=description)
     if args_pre is not None:
         args_pre(parser)
     parser.add_argument(
         '--start-frame', metavar='FRAME', type=int,
         help="starting input frame to process (inclusive)")
     parser.add_argument(
         '--end-frame', metavar='FRAME', type=int,
         help="ending input frame to process (exclusive)")
     parser.add_argument(
         '--decimate', metavar="COUNT", type=int, default=1,
         help="only use every %(metavar)s input frame")
     parser.add_argument(
         '--width', type=int,
         help="width of output video")
     parser.add_argument(
         '--height', type=int,
         help="height of output video")
     parser.add_argument(
         '--codec',
         help="codec of output video"
     )
     parser.add_argument(
         '--pix-fmt',
         help="pixel format of output video")
     parser.add_argument(
         '--extra-args', metavar='ARGS',
         help="""
         extra arguments to pass to FFmpeg, as a JSON object (do not specify the
         leading dash for keys, use a null value for arguments that do not take
         a parameter)
         """)
     parser.add_argument(
         '--no-preview', action='store_true',
         help="do not show any previews")
     parser.add_argument(
         '--no-write', action='store_true',
         help="do not write any files")
     parser.add_argument(
         '--no-audio', action='store_true',
         help="do not include audio in output files")
     parser.add_argument(
         '--overwrite', action='store_true',
         help="overwrite output files")
     parser.add_argument(
         '--debug', metavar='DEBUG_FILE',
         help="produce debug output (leave empty to base filename on input)")
     parser.add_argument(
         '--output', metavar='OUTPUT_FILE',
         help="produce final output (leave empty to base filename on input)")
     parser.add_argument(
         'input', metavar='INPUT_FILE',
         help="input file")
     if args_post is not None:
         args_post(parser)
     args = parser.parse_args()
 
     # Check arguments.
     root, ext = os.path.splitext(os.path.basename(args.input))
     for file in ["debug", "output"]:
         if getattr(args, file) == "":
             setattr(args, file, f"{root}_{file}{ext}")
     if os.path.isfile(args.output) and not args.overwrite:
         error(f"File exists: '{args.output}', use --overwrite to overwrite.")
     if os.path.isfile(args.debug) and not args.overwrite:
         error(f"File exists: '{args.debug}', use --overwrite to overwrite.")
     if not os.path.isfile(args.input):
         error(f"File does not exist: '{args.input}'.")
 
     # Probe input.
     probe = ffmpeg.probe(args.input, select_streams='v')
 
     # Common parameters.
     args.duration = float(probe['format']['duration'])
     args.frame_count = int(probe['streams'][0]['nb_frames'])
 
     # Input parameters.
     args.input_codec = probe['streams'][0]['codec_name']
     args.input_pix_fmt = probe['streams'][0]['pix_fmt']
     args.input_width = probe['streams'][0]['width']
     args.input_height = probe['streams'][0]['height']
     args.input_aspect = args.input_width / args.input_height
     args.input_fps = (lambda x, y: x / y)(
         *map(int, probe['streams'][0]['r_frame_rate'].split("/"))
     )
 
     # Output parameters.
     args.output_codec = args.codec or args.input_codec
     args.output_pix_fmt = args.pix_fmt or args.input_pix_fmt
     args.output_width = args.width or args.input_width
     args.output_height = args.height or int(
         args.output_width / args.input_aspect
     )
     args.output_aspect = args.output_width / args.output_height
     args.output_fps = args.input_fps / (args.decimate or 1)
 
     # Working parameters.
     args.working_width = min(
         args.input_width,
         2 * max(
             args.output_width,
             int(args.output_height * args.input_aspect)
         )
     )
     args.working_height = min(
         args.input_height,
         2 * max(
             args.output_height,
             int(args.output_width / args.input_aspect)
         )
     )
     args.working_width += args.working_width % 2
     args.working_height += args.working_height % 2
     args.thickness = max(1, int(args.working_width / 1000))
 
     # Fill in default arguments.
     if args.start_frame is None:
         args.start_frame = 0
     if args.end_frame is None:
         args.end_frame = args.frame_count
     if args.extra_args is None:
         args.extra_args = {}
     else:
         try:
             args.extra_args = json.loads(args.extra_args)
         except json.decoder.JSONDecodeError:
             error(f"Extra arguments is not valid JSON.")
         if type(args.extra_args) is not dict:
             error(f"Extra arguments is not a JSON object.")
 
     # Open files.
     debug_size = f'{args.working_width}x{args.working_height}'
     output_size = f'{args.output_width}x{args.output_height}'
     pipe_args = {
         'format': 'rawvideo',
         'pix_fmt': 'rgb24',
     }
     output_args = {
         'vcodec': args.output_codec,
         'pix_fmt': args.output_pix_fmt,
         'shortest': None,
         **args.extra_args,
     }
     audio_args = (
         [
             ffmpeg
             .input(args.input)
             .audio
             # This works badly for some reason.
             .filter(
                 'atrim',
                 start=f"{args.start_frame/args.input_fps}s",
                 end=f"{args.end_frame/args.input_fps}s",
             )
         ]
         if not args.no_audio else []
     )
     input_stream = (
         ffmpeg
         .input(args.input)
         .output('pipe:', **pipe_args)
         .global_args('-loglevel', 'error')
         # .global_args('-stats')
         .run_async(pipe_stdout=True)
     )
     if not args.no_write:
         if args.debug:
             debug_stream = (
                 ffmpeg
                 .input('pipe:', **pipe_args, s=debug_size, r=args.output_fps)
                 .output(
                     *audio_args, args.debug, **output_args, r=args.output_fps)
                 .global_args('-loglevel', 'error')
                 .run_async(pipe_stdin=True, overwrite_output=args.overwrite)
             )
         if args.output:
             output_stream = (
                 ffmpeg
                 .input('pipe:', **pipe_args, s=output_size, r=args.output_fps)
                 .output(
                     *audio_args, args.output, **output_args, r=args.output_fps)
                 .global_args('-loglevel', 'error')
                 .run_async(pipe_stdin=True, overwrite_output=args.overwrite)
             )
 
     # Set up signal handler.
     sigint = False
 
     def sigint_handler(signum, frame):
         nonlocal sigint
         sigint = True
 
     signal.signal(signal.SIGINT, sigint_handler)
 
     # Call init.
     if init is not None:
         state = init(args)
     else:
         state = None
 
     # Process.
     try:
         for frame_num in range(args.frame_count):
             # Check for end frame.
             if frame_num >= args.end_frame:
                 break
 
             # Gather and print info.
             time_now = datetime.timedelta(
                 seconds=int(frame_num / args.input_fps))
             time_duration = datetime.timedelta(
                 seconds=int(args.duration))
             elapsed_frames = f"{frame_num} / {args.frame_count-1}"
             elapsed_time = f"{time_now} / {time_duration}"
             argv = sys.argv
             if len(argv) >= 1:
                 argv[0] = os.path.basename(argv[0])
             argv = " ".join(map(shlex.quote, argv))
             sys.stdout.write(f"{elapsed_frames} ({elapsed_time})\r")
 
             # Read input.
             if sigint:
                 break
             input_bytes = input_stream.stdout.read(
                 args.input_width * args.input_height * 3
             )
             if not input_bytes:
                 break
             input_frame = (
                 np
                 .frombuffer(input_bytes, np.uint8)
                 .reshape([args.input_height, args.input_width, 3])
             )
 
             # Check for start frame.
             if frame_num < args.start_frame:
                 continue
 
             # Check for decimate frame.
             if (frame_num - args.start_frame) % args.decimate != 0:
                 continue
 
             # Resize to working size.
             frame = resize(
                 input_frame,
                 args.working_width,
                 args.working_height,
                 PIL.Image.NEAREST,
             )
 
             # Call process.
             output_frame, debug_frame = process(args, state, frame, frame_num)
 
             # Show info.
             if args.debug:
                 debug_frame = text(
                     debug_frame,
                     elapsed_frames,
                     0,
                     0,
                     args.thickness,
                 )
                 debug_frame = text(
                     debug_frame,
                     elapsed_time,
                     args.working_width,
                     0,
                     args.thickness,
                 )
                 debug_frame = text(
                     debug_frame,
                     argv,
                     0,
                     args.working_height,
                     args.thickness,
                 )
 
             # Show preview windows.
             if not args.no_preview:
                 if args.debug:
                     cv2.imshow(
                         f"{args.debug}",
                         cv2.cvtColor(debug_frame, cv2.COLOR_RGB2BGR),
                     )
                 if args.output:
                     cv2.imshow(
                         f"{args.output}",
                         cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR),
                     )
                 if cv2.waitKey(1) in (ord('q'), 27):
                     break
 
             # Write files.
             if not args.no_write:
                 if sigint:
                     break
                 try:
                     if args.debug:
                         debug_stream.stdin.write(
                             debug_frame
                             .astype(np.uint8)
                             .tobytes()
                         )
                     if args.output:
                         output_stream.stdin.write(
                             output_frame
                             .astype(np.uint8)
                             .tobytes()
                         )
                 except BrokenPipeError:
                     # FFmpeg has probably written some error message to stderr,
                     # so just break.
                     break
     except KeyboardInterrupt:
         pass
     finally:
         print("")
         # Close and wait.
         if not args.no_write:
             if args.debug:
                 debug_stream.stdin.close()
                 debug_stream.wait()
             if args.output:
                 output_stream.stdin.close()
                 output_stream.wait()
         input_stream.send_signal(signal.SIGINT)
         input_stream.communicate()