1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import argparse
import cv2
import numpy as np
from creedsolo import RSCodec
from raptorq import Encoder
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-i", "--input", help="input file")
parser.add_argument("-o", "--output", help="output video file", default="vid.mkv")
parser.add_argument("-x", "--height", help="grid height", default=100, type=int)
parser.add_argument("-y", "--width", help="grid width", default=100, type=int)
parser.add_argument("-l", "--level", help="error correction level", default=0.1, type=float)
parser.add_argument("-f", "--fps", help="frame rate", default=30, type=int)
parser.add_argument("-m", "--mix", help="mix frames with original video", action="store_true")
parser.add_argument("-v", "--version",
help="0: 10% corners w/ two-sided one-cell padding; 1: 15% corners w/ four-sided 25% padding.",
default=0, choices=[0, 1], type=int)
args = parser.parse_args()
if args.version == 0:
cheight = cwidth = max(args.height // 10, args.width // 10)
elif args.version == 1:
# # cell borders are 0.0375% of width/height
# assert args.height * 3 % 80 == args.width * 3 % 80 == 0 # TODO: less strict better ratio
# cheight = int(args.height * 0.15)
# cwidth = int(args.width * 0.15)
cheight = cwidth = int(max(args.height, args.width) * 0.16)
else:
raise NotImplementedError
midwidth = args.width - 2 * cwidth
frame_size = args.height * args.width - 4 * cheight * cwidth
# Divide by 8 / 3 for 3-bit color
frame_bytes = frame_size * 3 // 8
frame_xor = np.arange(frame_bytes, dtype=np.uint8)
# reedsolo breaks message into 255-byte chunks
# raptorq can add up to 4 extra bytes
rs_bytes = frame_bytes - (frame_bytes + 254) // 255 * int(args.level * 255) - 4
with open(args.input, "rb") as f:
data = f.read()
rsc = RSCodec(int(args.level * 255))
encoder = Encoder.with_defaults(data, rs_bytes)
packets = encoder.get_encoded_packets(int(len(data) / rs_bytes * (1 / (1 - args.level) - 1)))
# Make corners
if args.version == 0:
ones = np.ones((cheight - 1, cwidth - 1))
zeros = np.zeros((cheight - 1, cwidth - 1))
wcorner = np.pad(np.dstack((ones, ones, ones)), ((0, 1), (0, 1), (0, 0)))
rcorner = np.pad(np.dstack((ones, zeros, zeros)), ((0, 1), (1, 0), (0, 0)))
gcorner = np.pad(np.dstack((zeros, ones, zeros)), ((1, 0), (0, 1), (0, 0)))
bcorner = np.pad(np.dstack((zeros, zeros, ones)), ((1, 0), (1, 0), (0, 0)))
elif args.version == 1:
zeros = np.zeros((cheight, cwidth, 3))
wcorner = zeros.copy()
rcorner = zeros.copy()
gcorner = zeros.copy()
bcorner = zeros.copy()
black_border_h, black_border_w = cheight // 4, cwidth // 4
for corner_arr, ones_channel_ind in [(wcorner, 0), (wcorner, 1), (wcorner, 2),
(rcorner, 0), (gcorner, 1), (bcorner, 2)]:
corner_arr[black_border_h:-black_border_h, black_border_w:-black_border_w, ones_channel_ind] = np.ones((cheight // 2, cwidth // 2))
# Output flags for decoder
print(f"-x {args.height} -y {args.width} -l {args.level} -s {len(data)} -p {len(packets[0])} -v {args.version}", end="")
def mkframe(packet):
frame = np.array(rsc.encode(bytearray(packet)))
frame = np.pad(frame, (0, frame_bytes - len(frame))) ^ frame_xor
reshape_len = frame_bytes // 255 * 255
# Space out elements in each size 255 chunk
frame[:reshape_len] = np.ravel(frame[:reshape_len].reshape(reshape_len // 255, 255), "F")
frame = np.unpackbits(frame)
# Pad to be multiple of 3 so we can reshape into RGB channels
frame = np.pad(frame, (0, (3 - len(frame)) % 3))
print(frame_size)
print(frame.shape)
frame = np.reshape(frame, (frame_size, 3))
frame = np.concatenate(
(
np.concatenate(
(wcorner, frame[: cheight * midwidth].reshape((cheight, midwidth, 3)), rcorner),
axis=1,
),
frame[cheight * midwidth: frame_size - cheight * midwidth].reshape(
(args.height - 2 * cheight, args.width, 3)
),
np.concatenate(
(gcorner, frame[frame_size - cheight * midwidth:].reshape((cheight, midwidth, 3)), bcorner),
axis=1,
),
)
)
return frame.astype(np.uint8) * 255
if args.mix:
# Mix frames with original video
cap = cv2.VideoCapture(args.input)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
hscale = height // args.height
wscale = width // args.width
out = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"FFV1"), args.fps, (width, height))
i = 0
while cap.isOpened():
ret, vidframe = cap.read()
if not ret:
break
vidframe[: hscale * cheight, : wscale * cwidth] = 0
vidframe[: hscale * cheight, wscale * (args.width - cwidth) :] = 0
vidframe[hscale * (args.height - cheight) :, : wscale * cwidth] = 0
vidframe[hscale * (args.height - cheight) :, wscale * (args.width - cwidth) :] = 0
frame = np.repeat(np.repeat(mkframe(packets[i]), hscale, 0), wscale, 1)
# Set edges in original video to black
frame[vidframe % 255 != 0] = 0
out.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
i = (i + 1) % len(packets)
else:
# Create a new video
out = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"FFV1"), args.fps, (args.width, args.height))
for packet in packets:
out.write(cv2.cvtColor(mkframe(packet), cv2.COLOR_RGB2BGR))
|