113 lines
3.7 KiB
Python
113 lines
3.7 KiB
Python
import os
|
|
import re
|
|
import sys
|
|
import imagehash
|
|
from PIL import Image
|
|
import cv2
|
|
import yt_dlp
|
|
|
|
VIDEOS = {
|
|
"Forth Day 2020 - Preview of x64 & ColorForth & SPIR V - Onat.txt": "https://youtu.be/ajZAECYdJvE",
|
|
"Neokineogfx - 4th And Beyond - Transcript.txt": "https://youtu.be/Awkdt30Ruvk",
|
|
"Silicon Valley Forth Interest Group - Metaprogramming VAMP in KYRA, a Next-gen Forth-like language --- Onat Türkçüoğlu -- 2025-04-26.txt": "https://youtu.be/J9U_5tjdegY"
|
|
}
|
|
|
|
REFERENCES_DIR = "C:/projects/forth/bootslop/references"
|
|
SNAPSHOTS_DIR = os.path.join(REFERENCES_DIR, "snapshots")
|
|
|
|
os.makedirs(SNAPSHOTS_DIR, exist_ok=True)
|
|
|
|
def parse_timestamps(filepath):
|
|
timestamps = []
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# Match MM:SS or H:MM:SS
|
|
if re.match(r'^(\d+:)?\d+:\d{2}$', line):
|
|
parts = list(map(int, line.split(':')))
|
|
if len(parts) == 2:
|
|
seconds = parts[0] * 60 + parts[1]
|
|
else:
|
|
seconds = parts[0] * 3600 + parts[1] * 60 + parts[2]
|
|
timestamps.append(seconds)
|
|
return sorted(list(set(timestamps)))
|
|
|
|
def download_video(url, output_path):
|
|
ydl_opts = {
|
|
'format': 'best[ext=mp4]/best',
|
|
'outtmpl': output_path,
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download([url])
|
|
|
|
def extract_and_dedup_frames(video_path, timestamps, out_dir):
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
if not cap.isOpened():
|
|
print(f"Failed to open video {video_path}")
|
|
return
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
saved_hashes = []
|
|
|
|
for t_sec in timestamps:
|
|
# Set video position
|
|
cap.set(cv2.CAP_PROP_POS_MSEC, t_sec * 1000)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
# Convert CV2 BGR to RGB for Pillow
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
pil_img = Image.fromarray(frame_rgb)
|
|
|
|
# Calculate perceptual hash
|
|
h = imagehash.phash(pil_img)
|
|
|
|
# Check against previous hashes (hamming distance < 5 is very similar)
|
|
is_duplicate = False
|
|
for saved_h in saved_hashes:
|
|
if h - saved_h < 5:
|
|
is_duplicate = True
|
|
break
|
|
|
|
if not is_duplicate:
|
|
saved_hashes.append(h)
|
|
out_filename = os.path.join(out_dir, f"frame_{t_sec:04d}s.jpg")
|
|
pil_img.save(out_filename, quality=90)
|
|
print(f"Saved {out_filename}")
|
|
|
|
cap.release()
|
|
|
|
def main():
|
|
for transcript_file, url in VIDEOS.items():
|
|
filepath = os.path.join(REFERENCES_DIR, transcript_file)
|
|
if not os.path.exists(filepath):
|
|
print(f"Transcript not found: {filepath}")
|
|
continue
|
|
|
|
print(f"Processing {transcript_file}...")
|
|
timestamps = parse_timestamps(filepath)
|
|
print(f"Found {len(timestamps)} timestamps.")
|
|
|
|
video_name = os.path.splitext(transcript_file)[0]
|
|
video_path = os.path.join(SNAPSHOTS_DIR, f"{video_name}.mp4")
|
|
out_dir = os.path.join(SNAPSHOTS_DIR, video_name)
|
|
|
|
if not os.path.exists(video_path):
|
|
print(f"Downloading video from {url}...")
|
|
download_video(url, video_path)
|
|
|
|
print("Extracting frames...")
|
|
extract_and_dedup_frames(video_path, timestamps, out_dir)
|
|
|
|
# Clean up video to save space
|
|
if os.path.exists(video_path):
|
|
os.remove(video_path)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|