0be6a56b |
"""
Python library to ease processing of video frames.
"""
import sys
import os
import signal
import argparse
import json
import datetime
import shlex
import ffmpeg # ffmpeg-python
import numpy as np
import PIL.Image
import cv2
def error(message):
print(f"{sys.argv[0]}: Error: {message}", file=sys.stderr)
exit(1)
def resize(image, width, height, resample):
return np.array(
PIL.Image
.fromarray(image)
.resize((width, height), resample)
)
def text(image, text, x, y, thickness, font=cv2.FONT_HERSHEY_SIMPLEX):
margin = 10
height, width = image.shape[:2]
size, _ = cv2.getTextSize(text, font, 1, 1)
scale = min(1, (width - 2 * margin) / size[0])
x = np.clip(x, margin, width - int(scale * size[0]) - margin)
y = np.clip(y, margin + int(scale * size[1]), height - margin)
black = (0, 0, 0)
white = (255, 255, 255)
image = cv2.putText(image, text, (x, y), font, scale, black, 3 * thickness)
image = cv2.putText(image, text, (x, y), font, scale, white, thickness)
return image
def fix_rect(args, x, y, w, h):
# Grow into correct aspect ratio.
nw = max(w, int(h * args.output_aspect))
nh = max(h, int(w / args.output_aspect))
# Optionally shrink into working image size.
scale = min(1, args.working_width / nw, args.working_height / nh)
nw = int(nw * scale)
nh = int(nh * scale)
# Center new region on old region.
nx = x - (nw - w) // 2
ny = y - (nh - h) // 2
# Push into working image.
nx = max(0, nx) - max(0, (nx+nw) - args.working_width)
ny = max(0, ny) - max(0, (ny+nh) - args.working_height)
# Return.
return nx, ny, nw, nh
def run(description, process, init=None, args_pre=None, args_post=None):
# Parse arguments
parser = argparse.ArgumentParser(description=description)
if args_pre is not None:
args_pre(parser)
parser.add_argument(
'--start-frame', metavar='FRAME', type=int,
help="starting input frame to process (inclusive)")
parser.add_argument(
'--end-frame', metavar='FRAME', type=int,
help="ending input frame to process (exclusive)")
parser.add_argument(
'--decimate', metavar="COUNT", type=int, default=1,
help="only use every %(metavar)s input frame")
parser.add_argument(
'--width', type=int,
help="width of output video")
parser.add_argument(
'--height', type=int,
help="height of output video")
parser.add_argument(
'--codec',
help="codec of output video"
)
parser.add_argument(
'--pix-fmt',
help="pixel format of output video")
parser.add_argument(
'--extra-args', metavar='ARGS',
help="""
extra arguments to pass to FFmpeg, as a JSON object (do not specify the
leading dash for keys, use a null value for arguments that do not take
a parameter)
""")
parser.add_argument(
'--no-preview', action='store_true',
help="do not show any previews")
parser.add_argument(
'--no-write', action='store_true',
help="do not write any files")
parser.add_argument(
'--no-audio', action='store_true',
help="do not include audio in output files")
parser.add_argument(
'--overwrite', action='store_true',
help="overwrite output files")
parser.add_argument(
'--debug', metavar='DEBUG_FILE',
help="produce debug output (leave empty to base filename on input)")
parser.add_argument(
'--output', metavar='OUTPUT_FILE',
help="produce final output (leave empty to base filename on input)")
parser.add_argument(
'input', metavar='INPUT_FILE',
help="input file")
if args_post is not None:
args_post(parser)
args = parser.parse_args()
# Check arguments.
root, ext = os.path.splitext(os.path.basename(args.input))
for file in ["debug", "output"]:
if getattr(args, file) == "":
setattr(args, file, f"{root}_{file}{ext}")
if os.path.isfile(args.output) and not args.overwrite:
error(f"File exists: '{args.output}', use --overwrite to overwrite.")
if os.path.isfile(args.debug) and not args.overwrite:
error(f"File exists: '{args.debug}', use --overwrite to overwrite.")
if not os.path.isfile(args.input):
error(f"File does not exist: '{args.input}'.")
# Probe input.
probe = ffmpeg.probe(args.input, select_streams='v')
# Common parameters.
args.duration = float(probe['format']['duration'])
args.frame_count = int(probe['streams'][0]['nb_frames'])
# Input parameters.
args.input_codec = probe['streams'][0]['codec_name']
args.input_pix_fmt = probe['streams'][0]['pix_fmt']
args.input_width = probe['streams'][0]['width']
args.input_height = probe['streams'][0]['height']
args.input_aspect = args.input_width / args.input_height
args.input_fps = (lambda x, y: x / y)(
*map(int, probe['streams'][0]['r_frame_rate'].split("/"))
)
# Output parameters.
args.output_codec = args.codec or args.input_codec
args.output_pix_fmt = args.pix_fmt or args.input_pix_fmt
args.output_width = args.width or args.input_width
args.output_height = args.height or int(
args.output_width / args.input_aspect
)
args.output_aspect = args.output_width / args.output_height
args.output_fps = args.input_fps / (args.decimate or 1)
# Working parameters.
args.working_width = min(
args.input_width,
2 * max(
args.output_width,
int(args.output_height * args.input_aspect)
)
)
args.working_height = min(
args.input_height,
2 * max(
args.output_height,
int(args.output_width / args.input_aspect)
)
)
args.working_width += args.working_width % 2
args.working_height += args.working_height % 2
args.thickness = max(1, int(args.working_width / 1000))
# Fill in default arguments.
if args.start_frame is None:
args.start_frame = 0
if args.end_frame is None:
args.end_frame = args.frame_count
if args.extra_args is None:
args.extra_args = {}
else:
try:
args.extra_args = json.loads(args.extra_args)
except json.decoder.JSONDecodeError:
error(f"Extra arguments is not valid JSON.")
if type(args.extra_args) is not dict:
error(f"Extra arguments is not a JSON object.")
# Open files.
debug_size = f'{args.working_width}x{args.working_height}'
output_size = f'{args.output_width}x{args.output_height}'
pipe_args = {
'format': 'rawvideo',
'pix_fmt': 'rgb24',
}
output_args = {
'vcodec': args.output_codec,
'pix_fmt': args.output_pix_fmt,
'shortest': None,
**args.extra_args,
}
audio_args = (
[
ffmpeg
.input(args.input)
.audio
# This works badly for some reason.
.filter(
'atrim',
start=f"{args.start_frame/args.input_fps}s",
end=f"{args.end_frame/args.input_fps}s",
)
]
if not args.no_audio else []
)
input_stream = (
ffmpeg
.input(args.input)
.output('pipe:', **pipe_args)
.global_args('-loglevel', 'error')
# .global_args('-stats')
.run_async(pipe_stdout=True)
)
if not args.no_write:
if args.debug:
debug_stream = (
ffmpeg
.input('pipe:', **pipe_args, s=debug_size, r=args.output_fps)
.output(
*audio_args, args.debug, **output_args, r=args.output_fps)
.global_args('-loglevel', 'error')
.run_async(pipe_stdin=True, overwrite_output=args.overwrite)
)
if args.output:
output_stream = (
ffmpeg
.input('pipe:', **pipe_args, s=output_size, r=args.output_fps)
.output(
*audio_args, args.output, **output_args, r=args.output_fps)
.global_args('-loglevel', 'error')
.run_async(pipe_stdin=True, overwrite_output=args.overwrite)
)
# Set up signal handler.
sigint = False
def sigint_handler(signum, frame):
nonlocal sigint
sigint = True
signal.signal(signal.SIGINT, sigint_handler)
# Call init.
if init is not None:
state = init(args)
else:
state = None
# Process.
try:
for frame_num in range(args.frame_count):
# Check for end frame.
if frame_num >= args.end_frame:
break
# Gather and print info.
time_now = datetime.timedelta(
seconds=int(frame_num / args.input_fps))
time_duration = datetime.timedelta(
seconds=int(args.duration))
elapsed_frames = f"{frame_num} / {args.frame_count-1}"
elapsed_time = f"{time_now} / {time_duration}"
argv = sys.argv
if len(argv) >= 1:
argv[0] = os.path.basename(argv[0])
argv = " ".join(map(shlex.quote, argv))
sys.stdout.write(f"{elapsed_frames} ({elapsed_time})\r")
# Read input.
if sigint:
break
input_bytes = input_stream.stdout.read(
args.input_width * args.input_height * 3
)
if not input_bytes:
break
input_frame = (
np
.frombuffer(input_bytes, np.uint8)
.reshape([args.input_height, args.input_width, 3])
)
# Check for start frame.
if frame_num < args.start_frame:
continue
# Check for decimate frame.
if (frame_num - args.start_frame) % args.decimate != 0:
continue
# Resize to working size.
frame = resize(
input_frame,
args.working_width,
args.working_height,
PIL.Image.NEAREST,
)
# Call process.
output_frame, debug_frame = process(args, state, frame, frame_num)
# Show info.
if args.debug:
debug_frame = text(
debug_frame,
elapsed_frames,
0,
0,
args.thickness,
)
debug_frame = text(
debug_frame,
elapsed_time,
args.working_width,
0,
args.thickness,
)
debug_frame = text(
debug_frame,
argv,
0,
args.working_height,
args.thickness,
)
# Show preview windows.
if not args.no_preview:
if args.debug:
cv2.imshow(
f"{args.debug}",
cv2.cvtColor(debug_frame, cv2.COLOR_RGB2BGR),
)
if args.output:
cv2.imshow(
f"{args.output}",
cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR),
)
if cv2.waitKey(1) in (ord('q'), 27):
break
# Write files.
if not args.no_write:
if sigint:
break
try:
if args.debug:
debug_stream.stdin.write(
debug_frame
.astype(np.uint8)
.tobytes()
)
if args.output:
output_stream.stdin.write(
output_frame
.astype(np.uint8)
.tobytes()
)
except BrokenPipeError:
# FFmpeg has probably written some error message to stderr,
# so just break.
break
except KeyboardInterrupt:
pass
finally:
print("")
# Close and wait.
if not args.no_write:
if args.debug:
debug_stream.stdin.close()
debug_stream.wait()
if args.output:
output_stream.stdin.close()
output_stream.wait()
input_stream.send_signal(signal.SIGINT)
input_stream.communicate()
|