Files
forth_bootslop/process_visuals.py
2026-02-19 16:09:48 -05:00

214 lines
8.3 KiB
Python

import os
import re
import cv2
import yt_dlp
import numpy as np
import imagehash
from PIL import Image
import asyncio
from winsdk.windows.media.ocr import OcrEngine
from winsdk.windows.globalization import Language
from winsdk.windows.graphics.imaging import SoftwareBitmap, BitmapPixelFormat, BitmapAlphaMode
VIDEOS = {
"Forth Day 2020 - Preview of x64 & ColorForth & SPIR V - Onat.txt": "https://youtu.be/ajZAECYdJvE",
"Neokineogfx - 4th And Beyond - Transcript.txt": "https://youtu.be/Awkdt30Ruvk",
"Silicon Valley Forth Interest Group - Metaprogramming VAMP in KYRA, a Next-gen Forth-like language --- Onat Türkçüoğlu -- 2025-04-26.txt": "https://youtu.be/J9U_5tjdegY"
}
REFERENCES_DIR = "C:/projects/forth/bootslop/references"
OUT_DIR = os.path.join(REFERENCES_DIR, "processed_visuals")
os.makedirs(OUT_DIR, exist_ok=True)
def parse_timestamps(filepath):
timestamps = []
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if re.match(r'^(\d+:)?\d+:\d{2}$', line):
parts = list(map(int, line.split(':')))
if len(parts) == 2:
seconds = parts[0] * 60 + parts[1]
else:
seconds = parts[0] * 3600 + parts[1] * 60 + parts[2]
timestamps.append(seconds)
return sorted(list(set(timestamps)))
def download_highres(url, output_path):
ydl_opts = {
'format': 'bestvideo[ext=mp4]/best',
'outtmpl': output_path,
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
from winsdk.windows.storage import StorageFile
from winsdk.windows.graphics.imaging import BitmapDecoder
async def ocr_image(img_cv2):
temp_path = os.path.join(OUT_DIR, "temp_ocr.png")
cv2.imwrite(temp_path, img_cv2)
file = await StorageFile.get_file_from_path_async(os.path.abspath(temp_path))
stream = await file.open_read_async()
decoder = await BitmapDecoder.create_async(stream)
bitmap = await decoder.get_software_bitmap_async()
engine = OcrEngine.try_create_from_language(Language("en-US"))
if not engine:
return None
return await engine.recognize_async(bitmap)
def get_word_color(img_hsv, rect):
x, y, w, h = int(rect.x), int(rect.y), int(rect.width), int(rect.height)
roi = img_hsv[y:y+h, x:x+w]
if roi.size == 0: return "WHITE"
# Mask out background (low saturation/value)
mask = (roi[:,:,1] > 60) & (roi[:,:,2] > 60)
if np.sum(mask) < 5:
# Check brightness
if np.mean(roi[:,:,2]) > 128:
return "WHITE"
return "DIM"
avg_h = np.median(roi[:,:,0][mask])
# OpenCV Hue is 0-179
if avg_h < 12 or avg_h > 165: return "RED"
elif 12 <= avg_h < 25: return "ORANGE"
elif 25 <= avg_h < 40: return "YELLOW"
elif 40 <= avg_h < 85: return "GREEN"
elif 85 <= avg_h < 130: return "CYAN"
elif 130 <= avg_h < 150: return "BLUE"
elif 150 <= avg_h <= 165: return "MAGENTA"
return "UNKNOWN"
def detect_faces(gray_img):
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
if not os.path.exists(cascade_path): return []
face_cascade = cv2.CascadeClassifier(cascade_path)
return face_cascade.detectMultiScale(gray_img, 1.3, 5)
async def process_video(video_path, timestamps, video_name):
out_dir = os.path.join(OUT_DIR, video_name)
os.makedirs(out_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Failed to open {video_path}")
return
saved_hashes = []
markdown_lines = [f"# OCR and Visual Log for {video_name}\n"]
for t_sec in timestamps:
cap.set(cv2.CAP_PROP_POS_MSEC, t_sec * 1000)
ret, frame = cap.read()
if not ret: continue
h = imagehash.phash(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
is_duplicate = any(h - saved_h < 5 for saved_h in saved_hashes)
if is_duplicate: continue
saved_hashes.append(h)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
faces = detect_faces(gray)
total_area = frame.shape[0] * frame.shape[1]
face_area = sum(w*h for (x,y,w,h) in faces)
ocr_result = await ocr_image(frame)
words = []
text_area = 0
lines_output = []
if ocr_result and ocr_result.lines:
for line in ocr_result.lines:
line_str = []
for word in line.words:
words.append(word)
text_area += word.bounding_rect.width * word.bounding_rect.height
color = get_word_color(hsv, word.bounding_rect)
line_str.append(f"<{color}>{word.text}</{color}>")
lines_output.append(" ".join(line_str))
# Heuristic to discard useless frames (mostly face, no code)
if face_area > total_area * 0.05 and text_area < total_area * 0.01:
continue # Skip this frame
markdown_lines.append(f"## Time {t_sec}s")
if lines_output:
markdown_lines.append("\n".join(lines_output))
# Crop code block
if words:
min_x = int(min(w.bounding_rect.x for w in words))
min_y = int(min(w.bounding_rect.y for w in words))
max_x = int(max(w.bounding_rect.x + w.bounding_rect.width for w in words))
max_y = int(max(w.bounding_rect.y + w.bounding_rect.height for w in words))
pad = 30
min_x = max(0, min_x - pad)
min_y = max(0, min_y - pad)
max_x = min(frame.shape[1], max_x + pad)
max_y = min(frame.shape[0], max_y + pad)
code_crop = frame[min_y:max_y, min_x:max_x]
code_path = os.path.join(out_dir, f"code_{t_sec:04d}s.jpg")
Image.fromarray(cv2.cvtColor(code_crop, cv2.COLOR_BGR2RGB)).save(code_path)
markdown_lines.append(f"\n*Saved code image: {code_path}*")
# Find non-text visual content (e.g. diagrams) outside the text area
# A very simple heuristic: if there's a huge gap of non-text that has strong edges
edges = cv2.Canny(gray, 50, 150)
# mask out the text area
edges[min_y:max_y, min_x:max_x] = 0
# mask out faces
for (fx,fy,fw,fh) in faces:
edges[fy:fy+fh, fx:fx+fw] = 0
non_text_pixels = cv2.countNonZero(edges)
if non_text_pixels > 5000:
# There is significant visual structure outside the text and faces.
# Find its bounding box
pts = cv2.findNonZero(edges)
if pts is not None:
bx, by, bw, bh = cv2.boundingRect(pts)
if bw > 100 and bh > 100: # large enough
visual_crop = frame[max(0, by-pad):min(frame.shape[0], by+bh+pad), max(0, bx-pad):min(frame.shape[1], bx+bw+pad)]
visual_path = os.path.join(out_dir, f"visual_{t_sec:04d}s.jpg")
Image.fromarray(cv2.cvtColor(visual_crop, cv2.COLOR_BGR2RGB)).save(visual_path)
markdown_lines.append(f"\n*Saved non-text visual: {visual_path}*")
markdown_lines.append("\n---\n")
cap.release()
with open(os.path.join(OUT_DIR, f"{video_name}_ocr.md"), "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
async def main():
for transcript_file, url in VIDEOS.items():
filepath = os.path.join(REFERENCES_DIR, transcript_file)
if not os.path.exists(filepath): continue
print(f"Processing {transcript_file}...")
timestamps = parse_timestamps(filepath)
video_name = os.path.splitext(transcript_file)[0]
video_path = os.path.join(OUT_DIR, f"{video_name}.mp4")
if not os.path.exists(video_path):
print(f"Downloading high-res video...")
download_highres(url, video_path)
print(f"Extracting, OCRing, and semantic tagging frames...")
await process_video(video_path, timestamps, video_name)
if __name__ == '__main__':
asyncio.run(main())