import os import re import sys import imagehash from PIL import Image import cv2 import yt_dlp VIDEOS = { "Forth Day 2020 - Preview of x64 & ColorForth & SPIR V - Onat.txt": "https://youtu.be/ajZAECYdJvE", "Neokineogfx - 4th And Beyond - Transcript.txt": "https://youtu.be/Awkdt30Ruvk", "Silicon Valley Forth Interest Group - Metaprogramming VAMP in KYRA, a Next-gen Forth-like language --- Onat Türkçüoğlu -- 2025-04-26.txt": "https://youtu.be/J9U_5tjdegY" } REFERENCES_DIR = "C:/projects/forth/bootslop/references" SNAPSHOTS_DIR = os.path.join(REFERENCES_DIR, "snapshots") os.makedirs(SNAPSHOTS_DIR, exist_ok=True) def parse_timestamps(filepath): timestamps = [] with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # Match MM:SS or H:MM:SS if re.match(r'^(\d+:)?\d+:\d{2}$', line): parts = list(map(int, line.split(':'))) if len(parts) == 2: seconds = parts[0] * 60 + parts[1] else: seconds = parts[0] * 3600 + parts[1] * 60 + parts[2] timestamps.append(seconds) return sorted(list(set(timestamps))) def download_video(url, output_path): ydl_opts = { 'format': 'best[ext=mp4]/best', 'outtmpl': output_path, 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) def extract_and_dedup_frames(video_path, timestamps, out_dir): os.makedirs(out_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Failed to open video {video_path}") return fps = cap.get(cv2.CAP_PROP_FPS) saved_hashes = [] for t_sec in timestamps: # Set video position cap.set(cv2.CAP_PROP_POS_MSEC, t_sec * 1000) ret, frame = cap.read() if not ret: continue # Convert CV2 BGR to RGB for Pillow frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(frame_rgb) # Calculate perceptual hash h = imagehash.phash(pil_img) # Check against previous hashes (hamming distance < 5 is very similar) is_duplicate = False for saved_h in saved_hashes: if h - saved_h < 5: is_duplicate = True break if not is_duplicate: saved_hashes.append(h) out_filename = os.path.join(out_dir, f"frame_{t_sec:04d}s.jpg") pil_img.save(out_filename, quality=90) print(f"Saved {out_filename}") cap.release() def main(): for transcript_file, url in VIDEOS.items(): filepath = os.path.join(REFERENCES_DIR, transcript_file) if not os.path.exists(filepath): print(f"Transcript not found: {filepath}") continue print(f"Processing {transcript_file}...") timestamps = parse_timestamps(filepath) print(f"Found {len(timestamps)} timestamps.") video_name = os.path.splitext(transcript_file)[0] video_path = os.path.join(SNAPSHOTS_DIR, f"{video_name}.mp4") out_dir = os.path.join(SNAPSHOTS_DIR, video_name) if not os.path.exists(video_path): print(f"Downloading video from {url}...") download_video(url, video_path) print("Extracting frames...") extract_and_dedup_frames(video_path, timestamps, out_dir) # Clean up video to save space if os.path.exists(video_path): os.remove(video_path) if __name__ == '__main__': main()