import os import re import cv2 import yt_dlp import numpy as np import imagehash from PIL import Image import asyncio from winsdk.windows.media.ocr import OcrEngine from winsdk.windows.globalization import Language from winsdk.windows.graphics.imaging import SoftwareBitmap, BitmapPixelFormat, BitmapAlphaMode VIDEOS = { "Forth Day 2020 - Preview of x64 & ColorForth & SPIR V - Onat.txt": "https://youtu.be/ajZAECYdJvE", "Neokineogfx - 4th And Beyond - Transcript.txt": "https://youtu.be/Awkdt30Ruvk", "Silicon Valley Forth Interest Group - Metaprogramming VAMP in KYRA, a Next-gen Forth-like language --- Onat Türkçüoğlu -- 2025-04-26.txt": "https://youtu.be/J9U_5tjdegY" } REFERENCES_DIR = "C:/projects/forth/bootslop/references" OUT_DIR = os.path.join(REFERENCES_DIR, "processed_visuals") os.makedirs(OUT_DIR, exist_ok=True) def parse_timestamps(filepath): timestamps = [] with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if re.match(r'^(\d+:)?\d+:\d{2}$', line): parts = list(map(int, line.split(':'))) if len(parts) == 2: seconds = parts[0] * 60 + parts[1] else: seconds = parts[0] * 3600 + parts[1] * 60 + parts[2] timestamps.append(seconds) return sorted(list(set(timestamps))) def download_highres(url, output_path): ydl_opts = { 'format': 'bestvideo[ext=mp4]/best', 'outtmpl': output_path, 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) from winsdk.windows.storage import StorageFile from winsdk.windows.graphics.imaging import BitmapDecoder async def ocr_image(img_cv2): temp_path = os.path.join(OUT_DIR, "temp_ocr.png") cv2.imwrite(temp_path, img_cv2) file = await StorageFile.get_file_from_path_async(os.path.abspath(temp_path)) stream = await file.open_read_async() decoder = await BitmapDecoder.create_async(stream) bitmap = await decoder.get_software_bitmap_async() engine = OcrEngine.try_create_from_language(Language("en-US")) if not engine: return None return await engine.recognize_async(bitmap) def get_word_color(img_hsv, rect): x, y, w, h = int(rect.x), int(rect.y), int(rect.width), int(rect.height) roi = img_hsv[y:y+h, x:x+w] if roi.size == 0: return "WHITE" # Mask out background (low saturation/value) mask = (roi[:,:,1] > 60) & (roi[:,:,2] > 60) if np.sum(mask) < 5: # Check brightness if np.mean(roi[:,:,2]) > 128: return "WHITE" return "DIM" avg_h = np.median(roi[:,:,0][mask]) # OpenCV Hue is 0-179 if avg_h < 12 or avg_h > 165: return "RED" elif 12 <= avg_h < 25: return "ORANGE" elif 25 <= avg_h < 40: return "YELLOW" elif 40 <= avg_h < 85: return "GREEN" elif 85 <= avg_h < 130: return "CYAN" elif 130 <= avg_h < 150: return "BLUE" elif 150 <= avg_h <= 165: return "MAGENTA" return "UNKNOWN" def detect_faces(gray_img): cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' if not os.path.exists(cascade_path): return [] face_cascade = cv2.CascadeClassifier(cascade_path) return face_cascade.detectMultiScale(gray_img, 1.3, 5) async def process_video(video_path, timestamps, video_name): out_dir = os.path.join(OUT_DIR, video_name) os.makedirs(out_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Failed to open {video_path}") return saved_hashes = [] markdown_lines = [f"# OCR and Visual Log for {video_name}\n"] for t_sec in timestamps: cap.set(cv2.CAP_PROP_POS_MSEC, t_sec * 1000) ret, frame = cap.read() if not ret: continue h = imagehash.phash(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) is_duplicate = any(h - saved_h < 5 for saved_h in saved_hashes) if is_duplicate: continue saved_hashes.append(h) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) faces = detect_faces(gray) total_area = frame.shape[0] * frame.shape[1] face_area = sum(w*h for (x,y,w,h) in faces) ocr_result = await ocr_image(frame) words = [] text_area = 0 lines_output = [] if ocr_result and ocr_result.lines: for line in ocr_result.lines: line_str = [] for word in line.words: words.append(word) text_area += word.bounding_rect.width * word.bounding_rect.height color = get_word_color(hsv, word.bounding_rect) line_str.append(f"<{color}>{word.text}") lines_output.append(" ".join(line_str)) # Heuristic to discard useless frames (mostly face, no code) if face_area > total_area * 0.05 and text_area < total_area * 0.01: continue # Skip this frame markdown_lines.append(f"## Time {t_sec}s") if lines_output: markdown_lines.append("\n".join(lines_output)) # Crop code block if words: min_x = int(min(w.bounding_rect.x for w in words)) min_y = int(min(w.bounding_rect.y for w in words)) max_x = int(max(w.bounding_rect.x + w.bounding_rect.width for w in words)) max_y = int(max(w.bounding_rect.y + w.bounding_rect.height for w in words)) pad = 30 min_x = max(0, min_x - pad) min_y = max(0, min_y - pad) max_x = min(frame.shape[1], max_x + pad) max_y = min(frame.shape[0], max_y + pad) code_crop = frame[min_y:max_y, min_x:max_x] code_path = os.path.join(out_dir, f"code_{t_sec:04d}s.jpg") cv2.imwrite(code_path, code_crop) markdown_lines.append(f"\n*Saved code image: {code_path}*") # Find non-text visual content (e.g. diagrams) outside the text area # A very simple heuristic: if there's a huge gap of non-text that has strong edges edges = cv2.Canny(gray, 50, 150) # mask out the text area edges[min_y:max_y, min_x:max_x] = 0 # mask out faces for (fx,fy,fw,fh) in faces: edges[fy:fy+fh, fx:fx+fw] = 0 non_text_pixels = cv2.countNonZero(edges) if non_text_pixels > 5000: # There is significant visual structure outside the text and faces. # Find its bounding box pts = cv2.findNonZero(edges) if pts is not None: bx, by, bw, bh = cv2.boundingRect(pts) if bw > 100 and bh > 100: # large enough visual_crop = frame[max(0, by-pad):min(frame.shape[0], by+bh+pad), max(0, bx-pad):min(frame.shape[1], bx+bw+pad)] visual_path = os.path.join(out_dir, f"visual_{t_sec:04d}s.jpg") cv2.imwrite(visual_path, visual_crop) markdown_lines.append(f"\n*Saved non-text visual: {visual_path}*") markdown_lines.append("\n---\n") cap.release() with open(os.path.join(OUT_DIR, f"{video_name}_ocr.md"), "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) async def main(): for transcript_file, url in VIDEOS.items(): filepath = os.path.join(REFERENCES_DIR, transcript_file) if not os.path.exists(filepath): continue print(f"Processing {transcript_file}...") timestamps = parse_timestamps(filepath) video_name = os.path.splitext(transcript_file)[0] video_path = os.path.join(OUT_DIR, f"{video_name}.mp4") if not os.path.exists(video_path): print(f"Downloading high-res video...") download_highres(url, video_path) print(f"Extracting, OCRing, and semantic tagging frames...") await process_video(video_path, timestamps, video_name) if __name__ == '__main__': asyncio.run(main())