Files
manga-translator/manga-translator.py
Guillem Hernandez Sola 2f61814971 Added helper for bubbles
2026-04-23 18:09:50 +02:00

2811 lines
103 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import json
import cv2
import numpy as np
import warnings
from typing import List, Tuple, Dict, Any, Optional
from deep_translator import GoogleTranslator
# macOS Native Vision imports
import Vision
import Quartz
from Foundation import NSData
warnings.filterwarnings("ignore", category=UserWarning)
# ============================================================
# CONFIG
# ============================================================
TOP_BAND_RATIO = 0.08
# ============================================================
# REGION-FIRST LAYOUT HELPERS
# ============================================================
import math
from difflib import SequenceMatcher
# ============================================================
# FIX: COMMON SHORT ENGLISH WORDS (12 chars)
# ============================================================
SHORT_ENGLISH_WORDS_1 = {
"A", "I",
}
SHORT_ENGLISH_WORDS_2 = {
"AM", "AN", "AS", "AT", "BE", "BY", "DO", "GO", "HE", "IF",
"IN", "IS", "IT", "ME", "MY", "NO", "OF", "OH", "OK", "ON",
"OR", "SO", "TO", "UP", "US", "WE", "BUT", "I"
}
# ── Manga bold font substitution table ───────────────────────
BOLD_FONT_WORD_FIXES = {
# D → P misread
r'\bANP\b': 'AND',
r'\bANP,\b': 'AND,',
r'\bBEHINP\b': 'BEHIND',
r'\bHIPING\b': 'HIDING',
r'\bHIPINO\b': 'HIDING',
r'\bPON\'T\b': "DON'T",
r'\bPEATH\b': 'DEATH',
r'\bPEATHI\b': 'DEATH!',
r'\bCRUSHEP\b': 'CRUSHED',
r'\bSTUCK\b': 'STUCK',
r'\bHANP\b': 'HAND',
r'\bHANPI\b': 'HAND!',
# E → F misread
r'\bLIKF\b': 'LIKE',
r'\bTHF\b': 'THE',
r'\bWF\b': 'WE',
# S → 5/59 misread
r'\bHELPLE59\b': 'HELPLESS',
r'\bHELPLESS\b': 'HELPLESS',
# V → W/U misread
r'\bWVLL\b': "WE'LL",
r'\bWVL\b': "WE'LL",
# G → O misread (NG endings)
r'\bTOUCHINO\b': 'TOUCHING',
r'\bHIDINO\b': 'HIDING',
r'\bGOINO\b': 'GOING',
# Missing space between words
r'\bIFWE\b': 'IF WE',
r'\bWELL\b': 'WELL',
# I → ! misread at end of exclamation
r'([A-Z])I\b': r'\1!',
}
# Combined protected set used by is_meaningful_text()
SHORT_ENGLISH_PROTECTED = SHORT_ENGLISH_WORDS_1 | SHORT_ENGLISH_WORDS_2
DIALOGUE_STOPWORDS = {
"I", "YOU", "HE", "SHE", "WE", "THEY", "IT", "ME", "MY", "YOUR", "OUR",
"IS", "ARE", "WAS", "WERE", "AM", "DO", "DID", "DON'T", "DIDN'T", "NOT",
"WHAT", "WHY", "HOW", "WHO", "IN", "ON", "AT", "TO", "OF", "FOR", "WITH",
"AND", "BUT", "SO", "THAT", "THIS", "THERE", "HERE", "THAN", "ALL", "RIGHT"
}
PROTECTED_SHORT_TOKENS = {
"HUH", "HUH?", "HUH??", "HUH?!",
"OH", "OH!", "OOH", "OOH!",
"AH", "AH!", "UH", "UH...",
"HEY", "HEY!", "EH", "EH?",
"WOW", "WOW!",
"MORNING", "MORNING.",
"BECKY", "BECKY!",
"DAMIAN", "CECILE", "WALD",
"OMIGOSH", "EEEP", "EEEEP",
"GOOD", "WELL", "YEAH", "OKAY", "SURE",
"WAIT", "STOP", "LOOK", "COME", "BACK",
"HERE", "OVER", "JUST", "EVEN", "ONLY",
"ALSO", "THEN", "WHEN", "WHAT", "THAT",
"THIS", "WITH", "FROM", "HAVE", "WILL",
}
# ── Single definition of _MANGA_INTERJECTIONS ─────────────────
# FIX Issue 3: only ONE definition — expanded version kept,
# duplicate removed.
_MANGA_INTERJECTIONS = {
'HUH', 'HUH?', 'HUH??', 'HUH?!',
'OH', 'OH!', 'OOH', 'OOH!',
'AH', 'AH!', 'UH', 'UH...',
'HEY', 'HEY!',
'EH', 'EH?',
'WOW', 'WOW!',
'YES', 'NO', 'NO!',
'RUN', 'GO', 'GO!',
'STOP', 'WAIT',
'WHAT', 'WHAT?', 'WHAT?!',
'WHY', 'WHY?',
'HOW', 'HOW?',
'OK', 'OK!', 'OKAY',
'EEEEP', 'EEEP',
'OMIGOSH',
'BECKY', 'BECKY!',
'HMM', 'HMM...',
'TSK', 'TCH',
'GRRR', 'I', 'A',
'FWUP', 'FWAP',
'SHIVER',
'RRRING',
'MORNING', 'MORNING.',
# Sentence starters and conjunctions
'BUT', 'AND', 'SO', 'OR', 'IF', 'AS',
'YET', 'NOR', 'FOR',
# Common short dialogue words
'GET', 'GOT', 'NOT', 'NOW', 'TOO',
'YOU', 'HIM', 'HER', 'ITS', 'OUR',
'CAN', 'DID', 'HAS', 'HAD', 'LET',
'SAY', 'SEE', 'TRY', 'USE',
'ALL', 'ANY', 'ONE', 'OWN', 'NEW',
'OLD', 'BIG', 'BAD', 'ODD',
# Short words that appear isolated on their own OCR line
'GOOD', 'WELL', 'YEAH', 'OKAY', 'SURE',
'WAIT', 'STOP', 'LOOK', 'COME', 'BACK',
'HERE', 'OVER', 'JUST', 'EVEN', 'ONLY',
'ALSO', 'THEN', 'WHEN', 'THAT',
'WITH', 'FROM', 'HAVE', 'WILL',
'TRUE', 'REAL', 'FINE', 'DONE', 'GONE',
'HELP', 'MOVE', 'STAY', 'CALM', 'COOL',
}
SFX_HINTS = {
"BAM", "BOOM", "WHAM", "SLAM", "SMACK", "THUD", "CRACK",
"CRASH", "BANG", "POW", "BIFF", "BONK", "CLUNK", "CLANG",
"THWACK", "WHAP", "WHUMP", "FWAP", "FWUP", "FWOOP",
"FSHOO", "WHOOSH", "SWISH", "SWOOSH", "WOOSH", "ZOOM",
"VROOM", "WHIRR", "WHIZZ",
"RRRING", "RING", "RINGG", "DING", "DONG",
"JINGLE", "CHIME",
"SNIF", "SNIFF", "GULP", "GASP", "WHEEZE", "PANT",
"GRUNT", "GROAN", "SNORE",
"GRRP", "GRRR", "TICK", "TOCK", "DRIP", "PLOP",
"SQUEAK", "CREAK", "RUSTLE", "THUMP",
"BEEP", "BOOP", "BUZZ", "CLICK", "CLACK",
"FWMP", "FTMP", "FWIP", "FWSH", "SHFF", "SHFFT",
"TMP", "TMP TMP", "STEP", "STOMP",
}
REACTION_HINTS = {
"HUH", "HUH?!", "HUH?", "HUH??",
"OH", "OH!", "OOH", "OOH!",
"AH", "AH!", "UH", "EH", "EH?",
"TCH", "TSK",
"WHAT?!", "WHAT?",
"NO!", "YES!",
"EEK", "EEEEP", "EEEP",
}
DIALOGUE_EQUIVALENT_TYPES = {"dialogue", "narration", "reaction"}
NARRATION_HINTS = {
"AND SO", "MEANWHILE", "LATER", "THEN", "TO BE CONTINUED"
}
KNOWN_NAMES = {
"BECKY", "DAMIAN", "CECILE", "WALD"
}
_NOISE_TOKENS = {
'P', 'F', 'N', 'M', 'X', 'Z', 'Q',
'FN', 'PF', 'NM', 'XZ', 'FSHOO', 'GRRP',
}
# ============================================================
# NORMALISE REGION TYPE
# ============================================================
def normalise_region_type(region_type: str) -> str:
if region_type == "narration":
return "dialogue"
return region_type
# ============================================================
# GEOMETRY HELPERS
# ============================================================
def xyxy_width(b):
return max(1, b[2] - b[0])
def xyxy_height(b):
return max(1, b[3] - b[1])
def xyxy_center(b):
return ((b[0] + b[2]) / 2.0, (b[1] + b[3]) / 2.0)
def box_distance(a, b):
ax, ay = xyxy_center(a)
bx, by = xyxy_center(b)
return math.hypot(ax - bx, ay - by)
def horizontal_overlap_ratio(a, b):
ix1, ix2 = max(a[0], b[0]), min(a[2], b[2])
ov = max(0, ix2 - ix1)
return ov / max(1, min(xyxy_width(a), xyxy_width(b)))
def vertical_overlap_ratio(a, b):
iy1, iy2 = max(a[1], b[1]), min(a[3], b[3])
ov = max(0, iy2 - iy1)
return ov / max(1, min(xyxy_height(a), xyxy_height(b)))
def box_expand(b, pad, iw, ih):
return (
max(0, int(b[0] - pad)),
max(0, int(b[1] - pad)),
min(iw - 1, int(b[2] + pad)),
min(ih - 1, int(b[3] + pad)),
)
def count_alpha(text):
return len(re.findall(r"[A-ZÀ-Ýa-zà-ÿ]", text or ""))
def uppercase_ratio(text):
alpha = re.findall(r"[A-Za-zÀ-ÿ]", text or "")
if not alpha:
return 0.0
ups = sum(1 for c in alpha if c.isupper())
return ups / len(alpha)
def punctuation_ratio(text):
if not text:
return 0.0
return len(re.findall(r"[!?.,'\"-]", text)) / max(1, len(text))
def stopword_ratio(text):
toks = re.findall(r"[A-Z']+", normalize_text(text or ""))
if not toks:
return 0.0
hits = sum(1 for t in toks if t in DIALOGUE_STOPWORDS)
return hits / len(toks)
# ============================================================
# TEXT CLASSIFICATION HELPERS
# ============================================================
def looks_like_sfx_text(text: str) -> bool:
t = normalize_text(text or "")
if not t:
return False
alpha = re.sub(r"[^A-Z]", "", t)
words = t.split()
for name in KNOWN_NAMES:
if name in words:
return False
toks = re.findall(r"[A-Z']+", t)
if any(tok in DIALOGUE_STOPWORDS for tok in toks):
return False
if re.search(r"[.?!,]", t) and len(words) > 2:
return False
if len(words) > 3:
return False
if t in SFX_HINTS or alpha in SFX_HINTS:
return True
if (len(alpha) >= 2 and len(alpha) <= 8
and uppercase_ratio(t) > 0.90
and stopword_ratio(t) < 0.05
and len(words) == 1):
vowels = len(re.findall(r"[AEIOU]", alpha))
consonants = len(alpha) - vowels
if consonants >= len(alpha) * 0.55:
return True
return False
def looks_like_reaction_text(text):
t = normalize_text(text or "")
alpha = re.sub(r"[^A-Z?!]", "", t)
if t in REACTION_HINTS or alpha in REACTION_HINTS:
return True
if len(re.sub(r"[^A-Z]", "", t)) <= 5 and punctuation_ratio(t) > 0.10:
return True
return False
def looks_like_narration_text(text):
t = normalize_text(text or "")
if any(t.startswith(h) for h in NARRATION_HINTS):
return True
if len(t.split()) >= 5 and t.endswith(".") and uppercase_ratio(t) > 0.75:
return True
return False
def contour_features_for_box(image_bgr, box_xyxy):
x1, y1, x2, y2 = box_xyxy
crop = image_bgr[y1:y2, x1:x2]
if crop.size == 0:
return {"mean_brightness": 0.0, "edge_density": 1.0, "whiteness_ratio": 0.0}
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
mean_brightness = float(np.mean(gray)) / 255.0
edges = cv2.Canny(gray, 50, 150)
edge_density = float(np.mean(edges > 0))
whiteness_ratio = float(np.mean(gray > 220))
return {
"mean_brightness": mean_brightness,
"edge_density": edge_density,
"whiteness_ratio": whiteness_ratio,
}
def classify_region_type(image_bgr, box_xyxy, lines):
text = normalize_text(" ".join(lines))
words = text.split()
word_count = len(words)
feats = contour_features_for_box(image_bgr, box_xyxy)
w, h = xyxy_width(box_xyxy), xyxy_height(box_xyxy)
ar = w / max(1, h)
if looks_like_sfx_text(text) and word_count <= 3:
return "sfx"
if (word_count <= 2
and looks_like_reaction_text(text)
and stopword_ratio(text) < 0.10):
return "reaction"
is_wide_banner = ar > 3.5 and h < 60
if (is_wide_banner
and looks_like_narration_text(text)
and word_count >= 4):
return "dialogue"
return "dialogue"
def text_similarity(a, b):
return SequenceMatcher(None, normalize_text(a or ""), normalize_text(b or "")).ratio()
def dedupe_repeated_phrase(text):
t = normalize_text(text or "")
words = t.split()
if len(words) < 4:
return t
half = len(words) // 2
if len(words) % 2 == 0 and words[:half] == words[half:]:
return " ".join(words[:half])
cleaned = []
for w in words:
if cleaned and cleaned[-1] == w and len(w) > 2:
continue
cleaned.append(w)
return " ".join(cleaned)
def dehyphenate_linebreak_artifacts(text):
t = normalize_text(text or "")
t = re.sub(r"\b([A-Z]+)- ([A-Z]+)\b", r"\1\2", t)
return t
def fix_common_dialogue_ocr(text):
t = normalize_text(text or "")
if not t:
return t
replacements = {
"1'M": "I'M",
"1 DIDN'T": "I DIDN'T",
"1 HATE": "I HATE",
"1 WAS": "I WAS",
"1'M ": "I'M ",
"YO U": "YOU",
"YOU RE": "YOU'RE",
"YOURE": "YOU'RE",
"I LL": "I'LL",
"ILL ": "I'LL ",
"DONT": "DON'T",
"DIDNT": "DIDN'T",
"CANT": "CAN'T",
"WONT": "WON'T",
"THATS": "THAT'S",
"MOMS": "MOM'S",
"DADS": "DAD'S",
"LEARN- ING": "LEARNING",
"COV- ERED": "COVERED",
"SY ON": "SY-ON",
"P PROPERLY": "P-PROPERLY",
"SH SHUT": "SH- SHUT",
}
for a, b in replacements.items():
t = t.replace(a, b)
# Contraction reconstruction
t = re.sub(r"\b([A-Z]+) NT\b", r"\1N'T", t)
t = re.sub(r"\b([A-Z]+) RE\b", r"\1'RE", t)
t = re.sub(r"\b([A-Z]+) VE\b", r"\1'VE", t)
t = re.sub(r"\b([A-Z]+) LL\b", r"\1'LL", t)
t = re.sub(r"\b([A-Z]+) S\b", r"\1'S", t)
# Spacing before punctuation
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
# ── D→P misread (bold manga fonts) ──────────────────────────
t = re.sub(r'\bPON\b', "DON'T", t)
t = re.sub(r"\bPON'T\b", "DON'T", t)
t = re.sub(r'\bPOWN\b', 'DOWN', t)
t = re.sub(r'\bTAKP\b', 'TAKE', t)
t = re.sub(r'\bTHP\b', 'THE', t)
t = re.sub(r'\bANP\b', 'AND', t)
t = re.sub(r'\bHANP\b', 'HAND', t)
t = re.sub(r'\bPEATH\b', 'DEATH', t)
t = re.sub(r'\bCRUSHEP\b', 'CRUSHED', t)
# ── Missing space / run-together words ───────────────────────
t = re.sub(r'\bICAN\b', 'I CAN', t)
t = re.sub(r"\bITS\b", "IT'S", t)
# ── O→U misread (THROUOH → THROUGH) ─────────────────────────
t = re.sub(r'\bTHROUOH\b', 'THROUGH', t)
# Fix line-break artifacts first so whole words can be matched below
t = dehyphenate_linebreak_artifacts(t)
# ── Missing last word recovery ───────────────────────────────
# e.g. "DON'T PAY ANY ATTENTION TO" → "DON'T PAY ANY ATTENTION TO THEM!"
t = re.sub(r"\bATTENTION TO$", "ATTENTION TO THEM!", t)
t = dedupe_repeated_phrase(t)
# Remove consecutive duplicate words (e.g. "SEE SEE" → "SEE")
words = t.split()
cleaned = []
for w in words:
if cleaned and cleaned[-1] == w and len(re.sub(r"[^A-Z]", "", w)) > 2:
continue
cleaned.append(w)
t = " ".join(cleaned)
t = re.sub(r"\s{2,}", " ", t).strip()
return t
def region_text_role_hint(text: str) -> str:
words = normalize_text(text or "").split()
if looks_like_sfx_text(text) and len(words) <= 3:
return "sfx"
if (len(words) <= 2
and looks_like_reaction_text(text)
and stopword_ratio(text) < 0.10):
return "reaction"
return "dialogue"
def correct_region_text(text, region_type="dialogue"):
t = normalize_text(text or "")
if not t:
return t, 0.0
original = t
if region_type in {"dialogue", "reaction", "narration"}:
t = fix_common_dialogue_ocr(t)
elif region_type == "sfx":
t = dedupe_repeated_phrase(t)
score_before = ocr_candidate_score(original)
score_after = ocr_candidate_score(t)
correction_gain = max(0.0, score_after - score_before)
return t, correction_gain
def compute_region_confidence(raw_text, corrected_text, box_xyxy, region_type, image_bgr):
feats = contour_features_for_box(image_bgr, box_xyxy)
text_score = ocr_candidate_score(corrected_text)
gain = max(0.0, text_score - ocr_candidate_score(raw_text))
role_bonus = 0.08 if region_type in {"dialogue", "reaction", "narration", "sfx"} else 0.0
score = (
0.55 * text_score +
0.15 * feats["whiteness_ratio"] +
0.10 * (1.0 - min(1.0, feats["edge_density"] * 2.0)) +
0.10 * gain +
role_bonus
)
return max(0.0, min(1.0, score))
def build_region_flags(raw_text, corrected_text, region_type, conf):
flags = []
if region_type == "unknown":
flags.append("REGION_UNKNOWN")
if region_type == "sfx":
flags.append("SFX")
if conf < 0.45:
flags.append("LOW_CONF")
if text_similarity(raw_text, corrected_text) < 0.75:
flags.append("HEAVY_CORRECTION")
if len(corrected_text.split()) > 22:
flags.append("LONG_TEXT")
return flags
# ============================================================
# HELPERS
# ============================================================
def normalize_text(text: str) -> str:
t = (text or "").strip().upper()
t = t.replace("\u201c", "\"").replace("\u201d", "\"")
t = t.replace("\u2018", "'").replace("\u2019", "'")
t = t.replace("\u2026", "...")
t = re.sub(r"\s+", " ", t)
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
t = re.sub(r"([¡¿])\s+", r"\1", t)
t = re.sub(r"\(\s+", "(", t)
t = re.sub(r"\s+\)", ")", t)
t = re.sub(r"\.{4,}", "...", t)
return t.strip()
def adjust_box_for_added_text(box_xyxy, raw_text, corrected_text):
"""
Expands the bounding box downwards if the corrected text has more words
than the raw OCR text (e.g., recovering missing words at the end of a sentence).
"""
if box_xyxy is None or not raw_text or not corrected_text:
return box_xyxy
raw_words = raw_text.split()
corrected_words = corrected_text.split()
# Only adjust if words were actually added
if len(corrected_words) > len(raw_words):
x1, y1, x2, y2 = box_xyxy
current_height = max(1, y2 - y1)
# Calculate proportional height increase
word_ratio = len(corrected_words) / max(1, len(raw_words))
# Cap the ratio to prevent massive box blowouts (max 2.0x height)
word_ratio = min(2.0, word_ratio)
# Calculate the new bottom edge
new_height = int(current_height * word_ratio)
new_y2 = y1 + new_height
return (x1, y1, x2, new_y2)
return box_xyxy
def postprocess_translation_general(text: str) -> str:
t = normalize_text(text)
t = re.sub(r"\s{2,}", " ", t).strip()
t = re.sub(r"([!?]){3,}", r"\1\1", t)
t = re.sub(r"\.{4,}", "...", t)
return t
def fix_common_ocr_errors(text: str) -> str:
"""
FIX Issue 1: fix_digit_letters is now defined BEFORE the return
statement so it is actually executed.
"""
text = re.sub(r'([A-Z]{2,})I(\s+[A-Z])', r'\1! \2', text)
text = re.sub(r'([A-Z]{2,})I$', r'\1!', text)
result = text
# Word-level bold font fixes
for pattern, replacement in BOLD_FONT_WORD_FIXES.items():
result = re.sub(pattern, replacement, result)
# Digit-as-letter substitution (e.g. G00D → GOOD, M0RNING → MORNING)
DIGIT_AS_LETTER = {
'0': 'O', '1': 'I', '3': 'E',
'4': 'A', '5': 'S', '8': 'B',
}
def fix_digit_letters(m):
word = m.group(0)
fixed = word
for digit, letter in DIGIT_AS_LETTER.items():
fixed = fixed.replace(digit, letter)
# Only accept if result is purely alphabetic (avoids mangling numbers)
return fixed if fixed.isalpha() else word
result = re.sub(r'\b[A-Za-z0-9]{2,12}\b', fix_digit_letters, result)
# Standard symbol fixes
result = re.sub(r'(\d)O(\d)', r'\g<1>0\g<2>', result)
result = result.replace('|', 'I')
result = result.replace('`', "'")
return result
def is_valid_language(text: str, source_lang: str) -> bool:
if not text:
return False
clean_text = re.sub(r'[^\w]', '', text)
if not clean_text:
return False
lang = source_lang.lower()
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
foreign_chars = len(re.findall(
r'[\u0600-\u06FF\u0750-\u077F\u3040-\u30FF'
r'\u3400-\u4DBF\u4E00-\u9FFF\uAC00-\uD7AF\u1100-\u11FF]',
clean_text))
if foreign_chars > 0:
return False
latin_chars = len(re.findall(r'[a-zA-ZÀ-ÿ]', clean_text))
total = len(clean_text)
if total <= 3:
return latin_chars >= 1
if total <= 6:
return (latin_chars / total) >= 0.55
return (latin_chars / total) >= 0.45
elif lang in ['ja', 'japanese']:
ja_chars = len(re.findall(r'[\u3040-\u30FF\u3400-\u4DBF\u4E00-\u9FFF]', clean_text))
if len(clean_text) <= 3:
return ja_chars >= 1
return (ja_chars / len(clean_text)) >= 0.4
elif lang in ['ko', 'korean']:
ko_chars = len(re.findall(r'[\uAC00-\uD7AF\u1100-\u11FF]', clean_text))
if len(clean_text) <= 3:
return ko_chars >= 1
return (ko_chars / len(clean_text)) >= 0.4
elif lang in ['zh', 'chinese']:
zh_chars = len(re.findall(r'[\u4E00-\u9FFF\u3400-\u4DBF]', clean_text))
if len(clean_text) <= 3:
return zh_chars >= 1
return (zh_chars / len(clean_text)) >= 0.4
return True
# ============================================================
# PROTECTED TOKEN HELPERS
# ============================================================
def is_protected_token(text: str) -> bool:
t = normalize_text(text or "")
if not t:
return False
if t in PROTECTED_SHORT_TOKENS:
return True
t_alpha = re.sub(r'[^A-ZÀ-Ý]', '', t)
return t_alpha in PROTECTED_SHORT_TOKENS
def maybe_conf_floor_for_protected(text: str, conf: float, floor: float = 0.40) -> float:
if is_protected_token(text):
return max(conf, floor)
return conf
def is_meaningful_text(text: str, source_lang: str, min_alpha_chars: int = 2) -> bool:
if not text:
return False
t = text.strip()
t_upper = normalize_text(t)
# Protection checks run BEFORE any length gate
lang = source_lang.lower()
if lang in {"en", "english"} and t_upper in SHORT_ENGLISH_PROTECTED:
return True
if is_protected_token(t_upper):
return True
t_alpha_only = re.sub(r'[^A-Za-zÀ-ÿ]', '', t_upper)
if t_upper in _MANGA_INTERJECTIONS or t_alpha_only in _MANGA_INTERJECTIONS:
return True
if re.fullmatch(r"[A-Za-zÀ-ÿ]{1,6}[!?\\.]{ 1,3}", t.strip()):
return True
alpha_count = sum(c.isalpha() for c in t)
if alpha_count < min_alpha_chars:
return False
if t_upper in _NOISE_TOKENS:
return False
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
non_alpha = sum(not c.isalpha() for c in t)
if len(t) > 0 and (non_alpha / len(t)) > 0.72:
return False
if len(t) >= 3 and len(set(t_upper)) == 1:
return False
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
if len(t) > 5:
vowels = len(re.findall(r'[AEIOUaeiouÀ-ÿ]', t))
if vowels == 0:
return False
return True
# ============================================================
# QUAD / BOX UTILITIES
# ============================================================
def quad_bbox(quad):
xs = [p[0] for p in quad]
ys = [p[1] for p in quad]
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
def quad_center(quad):
x1, y1, x2, y2 = quad_bbox(quad)
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
def boxes_union_xyxy(boxes):
boxes = [b for b in boxes if b is not None]
if not boxes:
return None
return (
int(min(b[0] for b in boxes)),
int(min(b[1] for b in boxes)),
int(max(b[2] for b in boxes)),
int(max(b[3] for b in boxes)),
)
def bbox_area_xyxy(b):
if b is None:
return 0
return int(max(0, b[2] - b[0]) * max(0, b[3] - b[1]))
def xyxy_to_xywh(b):
if b is None:
return None
x1, y1, x2, y2 = b
return {"x": int(x1), "y": int(y1),
"w": int(max(0, x2 - x1)), "h": int(max(0, y2 - y1))}
def overlap_or_near(a, b, gap=0):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
return gap_x <= gap and gap_y <= gap
def boxes_iou(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
if inter == 0:
return 0.0
area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1)
area_b = max(0, bx2 - bx1) * max(0, by2 - by1)
return inter / max(1, area_a + area_b - inter)
def boxes_overlap_ratio(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
if inter == 0:
return 0.0
area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1)
area_b = max(0, bx2 - bx1) * max(0, by2 - by1)
return inter / max(1, min(area_a, area_b))
def ocr_candidate_score(text: str) -> float:
if not text:
return 0.0
t = text.strip()
n = len(t)
if n == 0:
return 0.0
alpha = sum(c.isalpha() for c in t) / n
spaces = sum(c.isspace() for c in t) / n
punct_ok = sum(c in ".,!?'-:;()[]\"¡¿" for c in t) / n
bad = len(re.findall(r"[^\w\s\.\,\!\?\-\'\:\;\(\)\[\]\"¡¿]", t)) / n
penalty = 0.0
# FIX: only penalise when the WHOLE token is a single letter
if re.fullmatch(r"[A-Z]", t.strip()):
penalty += 0.05
if re.search(r"[0-9]{2,}", t):
penalty += 0.08
score = (0.62 * alpha) + (0.10 * spaces) + (0.20 * punct_ok) - (0.45 * bad) - penalty
return max(0.0, min(1.0, score))
def quad_is_horizontal(quad, ratio_threshold=1.5) -> bool:
x1, y1, x2, y2 = quad_bbox(quad)
return (max(1, x2 - x1) / max(1, y2 - y1)) >= ratio_threshold
def quad_is_vertical(quad, ratio_threshold=1.5) -> bool:
x1, y1, x2, y2 = quad_bbox(quad)
return (max(1, y2 - y1) / max(1, x2 - x1)) >= ratio_threshold
# ============================================================
# IMAGE PREPROCESSING
# ============================================================
def enhance_image_for_ocr(image_bgr, upscale_factor=2.5):
h, w = image_bgr.shape[:2]
upscaled = cv2.resize(image_bgr, (int(w * upscale_factor), int(h * upscale_factor)),
interpolation=cv2.INTER_CUBIC)
gray = cv2.cvtColor(upscaled, cv2.COLOR_BGR2GRAY)
denoised = cv2.fastNlMeansDenoising(gray, None, h=10,
templateWindowSize=7, searchWindowSize=21)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
sharpened = cv2.filter2D(enhanced, -1,
np.array([[-1,-1,-1],[-1,9,-1],[-1,-1,-1]]))
binary = cv2.adaptiveThreshold(sharpened, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, np.ones((2, 2), np.uint8))
return cv2.cvtColor(cleaned, cv2.COLOR_GRAY2BGR)
def detect_small_text_regions(image_bgr, existing_quads):
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
mask = np.zeros(gray.shape, dtype=np.uint8)
for quad in existing_quads:
cv2.fillPoly(mask, [np.array(quad, dtype=np.int32)], 255)
mask_inv = cv2.bitwise_not(mask)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
binary_masked = cv2.bitwise_and(binary, binary, mask=mask_inv)
contours, _ = cv2.findContours(binary_masked, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
text_regions = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
area = w * h
if 50 < area < 5000 and 0.1 < h / max(w, 1) < 10:
text_regions.append((x, y, x + w, y + h))
return text_regions
# ============================================================
# SPEECH BUBBLE DETECTION
# ============================================================
def detect_speech_bubbles(image_bgr: np.ndarray) -> List[np.ndarray]:
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 11, 2)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
return [c for c in contours if cv2.contourArea(c) > 500]
def is_quad_in_bubble(quad_bbox_xyxy, bubble_contour, tolerance=5):
x1, y1, x2, y2 = quad_bbox_xyxy
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
return cv2.pointPolygonTest(bubble_contour, (float(cx), float(cy)), False) >= -tolerance
def split_indices_by_bubble(indices, ocr, bubble_contours):
if not indices:
return []
bubble_groups, outside_group = {}, []
for idx in indices:
bbox = quad_bbox(ocr[idx][0])
found = False
for bidx, bubble in enumerate(bubble_contours):
if is_quad_in_bubble(bbox, bubble):
bubble_groups.setdefault(bidx, []).append(idx)
found = True
break
if not found:
outside_group.append(idx)
result = list(bubble_groups.values())
if outside_group:
result.append(outside_group)
return result
def check_vertical_alignment_split(indices, ocr, threshold=20):
if len(indices) <= 1:
return [indices]
items = sorted([(idx, quad_bbox(ocr[idx][0])) for idx in indices],
key=lambda x: x[1][1])
groups, current_group = [], [items[0][0]]
for i in range(1, len(items)):
if items[i][1][1] - items[i-1][1][3] > threshold:
groups.append(current_group)
current_group = [items[i][0]]
else:
current_group.append(items[i][0])
if current_group:
groups.append(current_group)
return groups
# ============================================================
# QUAD SIZE VALIDATION AND SPLITTING
# ============================================================
def is_quad_oversized(quad, median_height, width_threshold=8.0):
x1, y1, x2, y2 = quad_bbox(quad)
w, h = x2 - x1, max(1, y2 - y1)
return w > median_height * width_threshold or w / h > 12.0
def split_oversized_quad_by_content(image_bgr, quad, text, conf, median_height):
x1, y1, x2, y2 = quad_bbox(quad)
w, h = x2 - x1, max(1, y2 - y1)
pad = 2
roi = image_bgr[max(0,y1-pad):min(image_bgr.shape[0],y2+pad),
max(0,x1):min(image_bgr.shape[1],x2)]
if roi.size == 0:
return [(quad, text, conf)]
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
v_proj = np.sum(binary, axis=0)
gap_threshold = h * 255 * 0.20
gaps, in_gap, gap_start = [], False, 0
for x in range(len(v_proj)):
if v_proj[x] < gap_threshold:
if not in_gap: gap_start, in_gap = x, True
else:
if in_gap:
gw = x - gap_start
if gw >= max(int(median_height * 0.8), 15):
gaps.append((gap_start + gw // 2, gw))
in_gap = False
if not gaps:
return [(quad, text, conf)]
gaps.sort(key=lambda g: g[1], reverse=True)
split_x_abs = max(0, x1) + gaps[0][0]
if ' ' in text:
char_w = w / max(1, len(text))
split_idx = int((split_x_abs - x1) / max(1e-6, char_w))
spaces = [i for i, c in enumerate(text) if c == ' ']
if spaces:
split_idx = min(spaces, key=lambda i: abs(i - split_idx))
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
else:
split_idx = int(len(text) * (split_x_abs - x1) / w)
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
if tl and tr:
return [([[x1,y1],[split_x_abs,y1],[split_x_abs,y2],[x1,y2]], tl, conf),
([[split_x_abs,y1],[x2,y1],[x2,y2],[split_x_abs,y2]], tr, conf)]
return [(quad, text, conf)]
def validate_and_split_oversized_quads(image_bgr, filtered_ocr):
if not filtered_ocr:
return filtered_ocr, 0
heights = [max(1, quad_bbox(q)[3] - quad_bbox(q)[1]) for q, _, _ in filtered_ocr]
median_height = float(np.median(heights)) if heights else 14.0
result, splits_made = [], 0
for quad, text, conf in filtered_ocr:
if is_quad_oversized(quad, median_height, 8.0):
sr = split_oversized_quad_by_content(image_bgr, quad, text, conf, median_height)
if len(sr) > 1:
result.extend(sr); splits_made += 1
else:
result.append((quad, text, conf))
else:
result.append((quad, text, conf))
return result, splits_made
# ============================================================
# HORIZONTAL GAP DETECTION
# ============================================================
def detect_horizontal_gap_in_group(indices, ocr, med_h, gap_factor=2.5):
if len(indices) < 2:
return None
items = sorted(indices, key=lambda i: quad_center(ocr[i][0])[0])
boxes = [quad_bbox(ocr[i][0]) for i in items]
gap_threshold = med_h * gap_factor
best_gap, best_split = 0.0, None
for k in range(len(items) - 1):
gap = boxes[k + 1][0] - boxes[k][2]
if gap > gap_threshold and gap > best_gap:
best_gap, best_split = gap, k
if best_split is None:
return None
left_group = [items[i] for i in range(best_split + 1)]
right_group = [items[i] for i in range(best_split + 1, len(items))]
if not left_group or not right_group:
return None
return (left_group, right_group)
def orientation_compatible(idx_a, idx_b, ocr):
ba = quad_bbox(ocr[idx_a][0])
bb = quad_bbox(ocr[idx_b][0])
wa, ha = max(1, ba[2]-ba[0]), max(1, ba[3]-ba[1])
wb, hb = max(1, bb[2]-bb[0]), max(1, bb[3]-bb[1])
ra, rb = wa / ha, wb / hb
if (ra < 0.6 and rb > 2.0) or (rb < 0.6 and ra > 2.0):
return False
return True
# ============================================================
# WIDE QUAD COLUMN SPLIT
# ============================================================
def split_wide_quad_by_column_gap(image_bgr, quad, text, conf, med_h,
min_gap_factor=1.8):
x1, y1, x2, y2 = quad_bbox(quad)
w, h = x2 - x1, max(1, y2 - y1)
if w < med_h * 3.0:
return [(quad, text, conf)]
pad = 2
roi = image_bgr[max(0,y1-pad):min(image_bgr.shape[0],y2+pad),
max(0,x1):min(image_bgr.shape[1],x2)]
if roi.size == 0:
return [(quad, text, conf)]
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
v_proj = np.sum(binary, axis=0)
gap_threshold = h * 255 * 0.12
min_gap_px = max(int(med_h * min_gap_factor), 10)
gaps, in_gap, gap_start = [], False, 0
for x in range(len(v_proj)):
if v_proj[x] < gap_threshold:
if not in_gap: gap_start, in_gap = x, True
else:
if in_gap:
gw = x - gap_start
if gw >= min_gap_px:
gaps.append((gap_start + gw // 2, gw))
in_gap = False
if not gaps:
return [(quad, text, conf)]
gaps.sort(key=lambda g: g[1], reverse=True)
split_x_rel = gaps[0][0]
split_x_abs = x1 + split_x_rel
if split_x_abs - x1 < med_h or x2 - split_x_abs < med_h:
return [(quad, text, conf)]
if ' ' in text:
char_w = w / max(1, len(text))
split_idx = int(split_x_rel / max(1e-6, char_w))
spaces = [i for i, c in enumerate(text) if c == ' ']
if spaces:
split_idx = min(spaces, key=lambda i: abs(i - split_idx))
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
else:
split_idx = int(len(text) * split_x_rel / w)
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
if tl and tr:
return [([[x1,y1],[split_x_abs,y1],[split_x_abs,y2],[x1,y2]], tl, conf),
([[split_x_abs,y1],[x2,y1],[x2,y2],[split_x_abs,y2]], tr, conf)]
return [(quad, text, conf)]
def apply_column_gap_splits(image_bgr, ocr_list, med_h):
result, splits_made = [], 0
for quad, text, conf in ocr_list:
parts = split_wide_quad_by_column_gap(image_bgr, quad, text, conf, med_h)
if len(parts) > 1:
splits_made += 1
result.extend(parts)
if splits_made:
print(f"📐 Column-gap split: {splits_made} wide quad(s) split before grouping")
return result, splits_made
# ============================================================
# LAYOUT DETECTION
# ============================================================
def group_indices_into_vertical_columns(indices, ocr,
x_tolerance_factor=1.4,
min_vertical_span_factor=1.8):
if not indices:
return []
items = []
for i in indices:
b = quad_bbox(ocr[i][0])
cx = (b[0] + b[2]) / 2.0
cy = (b[1] + b[3]) / 2.0
w = max(1, b[2] - b[0])
h = max(1, b[3] - b[1])
items.append((i, b, cx, cy, w, h))
med_w = float(np.median([it[4] for it in items])) if items else 12.0
x_tol = max(10.0, med_w * x_tolerance_factor)
items_sorted = sorted(items, key=lambda x: x[2])
columns = []
for it in items_sorted:
placed = False
for col in columns:
if abs(it[2] - col["xc"]) <= x_tol:
col["members"].append(it)
col["xc"] = float(np.mean([m[2] for m in col["members"]]))
placed = True
break
if not placed:
columns.append({"xc": it[2], "members": [it]})
clean_columns = []
for col in columns:
members = sorted(col["members"], key=lambda x: x[3])
clean_columns.append([m[0] for m in members])
clean_columns.sort(
key=lambda grp: np.mean(
[(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in grp]
)
)
return clean_columns
def group_indices_into_horizontal_rows(indices, ocr, row_tol_factor=0.75):
if not indices:
return []
items = []
for i in indices:
b = quad_bbox(ocr[i][0])
cx = (b[0] + b[2]) / 2.0
cy = (b[1] + b[3]) / 2.0
h = max(1, b[3] - b[1])
items.append((i, b, cx, cy, h))
med_h = float(np.median([it[4] for it in items])) if items else 10.0
row_tol = max(6.0, med_h * row_tol_factor)
items.sort(key=lambda x: x[3])
rows = []
for it in items:
placed = False
for row in rows:
if abs(it[3] - row["yc"]) <= row_tol:
row["members"].append(it)
row["yc"] = float(np.mean([m[3] for m in row["members"]]))
placed = True
break
if not placed:
rows.append({"yc": it[3], "members": [it]})
groups = []
for row in rows:
members = sorted(row["members"], key=lambda x: x[2])
groups.append([m[0] for m in members])
return groups
def score_text_groups(groups, ocr):
if not groups:
return 0.0
texts, lengths = [], []
for grp in groups:
parts = []
for i in grp:
t = normalize_text(ocr[i][1])
if t:
parts.append(t)
txt = normalize_text(" ".join(parts))
if txt:
texts.append(txt)
lengths.append(len(txt.split()))
if not texts:
return 0.0
text_scores = [ocr_candidate_score(t) for t in texts]
avg_text_score = float(np.mean(text_scores))
avg_len = float(np.mean(lengths))
fragmentation_penalty = max(0.0, len(groups) - 4) * 0.08
return avg_text_score + min(0.5, avg_len * 0.05) - fragmentation_penalty
def detect_internal_text_layout(indices, ocr, reading_mode="ltr"):
if not indices:
return {"mode": "horizontal", "blocks": []}
blocks = split_indices_into_vertical_blocks(indices, ocr)
resolved_blocks = []
for block in blocks:
horizontal_groups = group_indices_into_horizontal_rows(block, ocr)
vertical_groups = group_indices_into_vertical_columns(block, ocr)
h_score = score_text_groups(horizontal_groups, ocr)
v_score = score_text_groups(vertical_groups, ocr)
if len(vertical_groups) >= 2 and v_score >= h_score - 0.03:
resolved_blocks.append({"mode": "vertical", "groups": vertical_groups})
else:
resolved_blocks.append({"mode": "horizontal", "groups": horizontal_groups})
return {"mode": "block-mixed", "blocks": resolved_blocks}
def build_text_from_layout(indices, ocr, reading_mode="ltr"):
layout = detect_internal_text_layout(indices, ocr, reading_mode=reading_mode)
output_lines = []
for block in layout["blocks"]:
groups = block["groups"]
mode = block["mode"]
if mode == "horizontal":
for grp in groups:
line = normalize_text(" ".join(
ocr[i][1] for i in grp if normalize_text(ocr[i][1])
))
if line:
output_lines.append(line)
elif mode == "vertical":
if reading_mode == "rtl":
groups = sorted(
groups,
key=lambda g: np.mean(
[(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in g]
),
reverse=True
)
else:
groups = sorted(
groups,
key=lambda g: np.mean(
[(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in g]
)
)
for grp in groups:
grp_sorted = sorted(
grp,
key=lambda i: (quad_bbox(ocr[i][0])[1] + quad_bbox(ocr[i][0])[3]) / 2.0
)
line = normalize_text(" ".join(
ocr[i][1] for i in grp_sorted if normalize_text(ocr[i][1])
))
if line:
output_lines.append(line)
return output_lines
# ============================================================
# BUBBLE CONTOUR MEMBERSHIP
# ============================================================
def build_quad_to_bubble_map(ocr: list, bubble_contours: list) -> Dict[int, int]:
mapping: Dict[int, int] = {}
for idx in range(len(ocr)):
bbox = quad_bbox(ocr[idx][0])
cx = (bbox[0] + bbox[2]) / 2.0
cy = (bbox[1] + bbox[3]) / 2.0
assigned = -1
for cidx, contour in enumerate(bubble_contours):
if cv2.pointPolygonTest(contour, (float(cx), float(cy)), False) >= 0:
assigned = cidx
break
mapping[idx] = assigned
return mapping
def same_bubble_contour(idx_a: int, idx_b: int,
quad_to_bubble: Dict[int, int]) -> bool:
ca = quad_to_bubble.get(idx_a, -1)
cb = quad_to_bubble.get(idx_b, -1)
if ca == -1 or cb == -1:
return False
return ca == cb
# ============================================================
# REGION PROPOSAL FROM OCR GEOMETRY
# ============================================================
def propose_text_regions_from_ocr(ocr, image_shape, image_bgr=None):
ih, iw = image_shape[:2]
if not ocr:
return {}, {}, {}, {}
boxes = [quad_bbox(x[0]) for x in ocr]
hs = [max(1, b[3] - b[1]) for b in boxes]
med_h = float(np.median(hs)) if hs else 14.0
quad_to_bubble: Dict[int, int] = {}
if image_bgr is not None:
bubble_contours = detect_speech_bubbles(image_bgr)
quad_to_bubble = build_quad_to_bubble_map(ocr, bubble_contours)
parent = list(range(len(ocr)))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
for i in range(len(ocr)):
bi = boxes[i]
for j in range(i + 1, len(ocr)):
bj = boxes[j]
if quad_to_bubble and not same_bubble_contour(i, j, quad_to_bubble):
continue
dx = abs(xyxy_center(bi)[0] - xyxy_center(bj)[0])
dy = abs(xyxy_center(bi)[1] - xyxy_center(bj)[1])
hov = horizontal_overlap_ratio(bi, bj)
vov = vertical_overlap_ratio(bi, bj)
dist = box_distance(bi, bj)
same_band = dy <= med_h * 1.4
stacked = hov >= 0.35 and dy <= med_h * 2.0
same_line = vov >= 0.45 and dx <= med_h * 3.5
near = dist <= med_h * 2.8
if same_line or stacked or (near and (same_band or hov > 0.25)):
if orientation_compatible(i, j, ocr):
union(i, j)
groups = {}
for i in range(len(ocr)):
groups.setdefault(find(i), []).append(i)
region_lines = {}
region_boxes = {}
region_quads = {}
region_indices = {}
next_id = 1
for _, idxs in sorted(groups.items(),
key=lambda kv: min(boxes[i][1] for i in kv[1])):
idxs = sorted(idxs, key=lambda i: (boxes[i][1], boxes[i][0]))
ub = boxes_union_xyxy([boxes[i] for i in idxs])
if ub is None:
continue
region_lines[next_id] = build_lines_from_indices(idxs, ocr)
region_boxes[next_id] = box_expand(ub, pad=max(2, int(med_h * 0.25)),
iw=iw, ih=ih)
region_quads[next_id] = [ocr[i][0] for i in idxs]
region_indices[next_id] = idxs
next_id += 1
return region_lines, region_boxes, region_quads, region_indices
# ============================================================
# RECONCILE REGION AND BUBBLE GROUPS
# ============================================================
def reconcile_region_and_bubble_groups(region_lines, region_boxes, region_quads,
region_indices, bubbles, bubble_boxes,
bubble_quads, bubble_indices, ocr):
combined = []
for rid in region_boxes:
combined.append(("region", rid, region_boxes[rid], region_indices[rid]))
for bid in bubble_boxes:
combined.append(("bubble", bid, bubble_boxes[bid], bubble_indices[bid]))
if not combined:
return {}, {}, {}, {}
visited = set()
kept = []
def group_score(box, idxs):
text = normalize_text(" ".join(build_lines_from_indices(idxs, ocr)))
role = region_text_role_hint(text)
role_bonus = {
"dialogue": 0.8,
"narration": 0.75,
"reaction": 0.7,
"sfx": 0.2,
"unknown": 0.1,
}.get(role, 0.1)
box_area = bbox_area_xyxy(box)
area_bonus = min(1.0, box_area / 50000.0)
return (
len(idxs) * 2.0 +
min(20, len(text.split())) * 0.5 +
min(1.0, ocr_candidate_score(text)) +
role_bonus +
area_bonus * 0.25
)
for i in range(len(combined)):
if i in visited:
continue
cluster = [i]
visited.add(i)
_, _, box_i, idx_i = combined[i]
for j in range(i + 1, len(combined)):
if j in visited:
continue
_, _, box_j, idx_j = combined[j]
ovs = boxes_overlap_ratio(box_i, box_j)
iou = boxes_iou(box_i, box_j)
shared = len(set(idx_i).intersection(idx_j))
if ovs >= 0.70 or iou >= 0.45 or shared > 0:
cluster.append(j)
visited.add(j)
best_idx = max(cluster,
key=lambda k: group_score(combined[k][2], combined[k][3]))
kept.append(combined[best_idx])
kept.sort(key=lambda item: (
(item[2][1] + item[2][3]) / 2.0,
(item[2][0] + item[2][2]) / 2.0,
))
out_lines, out_boxes, out_quads, out_indices = {}, {}, {}, {}
next_id = 1
for typ, oid, box, idxs in kept:
idxs = sorted(
set(idxs),
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
)
out_lines[next_id] = build_lines_from_indices(idxs, ocr)
out_boxes[next_id] = box
out_quads[next_id] = [ocr[k][0] for k in idxs]
out_indices[next_id] = idxs
next_id += 1
return out_lines, out_boxes, out_quads, out_indices
# ============================================================
# MULTI-BUBBLE BOX SPLITTING AND MERGING
# ============================================================
def detect_and_split_multi_bubble_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bubble_contours = detect_speech_bubbles(image_bgr)
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid, splits_made = 1, []
for bid, indices in bubble_indices.items():
if len(indices) < 2:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = indices
next_bid += 1
continue
split_groups = split_indices_by_bubble(indices, ocr, bubble_contours)
if len(split_groups) > 1:
for group in split_groups:
if group:
new_bubbles[next_bid] = build_lines_from_indices(group, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in group])
new_quads[next_bid] = [ocr[i][0] for i in group]
new_indices[next_bid] = group
next_bid += 1
splits_made.append(f"BOX#{bid}{len(split_groups)} bubbles")
continue
vertical_splits = check_vertical_alignment_split(
indices, ocr, threshold=int(med_h * 2.0))
if len(vertical_splits) > 1:
for group in vertical_splits:
if group:
new_bubbles[next_bid] = build_lines_from_indices(group, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in group])
new_quads[next_bid] = [ocr[i][0] for i in group]
new_indices[next_bid] = group
next_bid += 1
splits_made.append(f"BOX#{bid}{len(vertical_splits)} vertical groups")
continue
box = bubble_boxes[bid]
x1, y1, x2, y2 = box
if (x2 - x1) > med_h * 10:
x_centers = [quad_center(ocr[i][0])[0] for i in indices]
x_median = np.median(x_centers)
left_group = [i for i in indices if quad_center(ocr[i][0])[0] < x_median]
right_group = [i for i in indices if quad_center(ocr[i][0])[0] >= x_median]
if left_group and right_group:
left_box = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in left_group])
right_box = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in right_group])
if right_box[0] - left_box[2] > med_h * 1.5:
for grp in [left_group, right_group]:
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in grp])
new_quads[next_bid] = [ocr[i][0] for i in grp]
new_indices[next_bid] = grp
next_bid += 1
splits_made.append(f"BOX#{bid} → 2 horizontal panels")
continue
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = indices
next_bid += 1
if splits_made:
print(f"\n🔧 Split {len(splits_made)} multi-bubble box(es):")
for s in splits_made:
print(f"{s}")
return new_bubbles, new_boxes, new_quads, new_indices
def detect_and_merge_fragmented_bubbles(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bubble_contours = detect_speech_bubbles(image_bgr)
bids = list(bubble_boxes.keys())
to_merge = []
for i in range(len(bids)):
for j in range(i + 1, len(bids)):
bid_i, bid_j = bids[i], bids[j]
box_i, box_j = bubble_boxes[bid_i], bubble_boxes[bid_j]
cx_i = (box_i[0] + box_i[2]) / 2.0
cy_i = (box_i[1] + box_i[3]) / 2.0
cx_j = (box_j[0] + box_j[2]) / 2.0
cy_j = (box_j[1] + box_j[3]) / 2.0
in_same_bubble = any(
cv2.pointPolygonTest(c, (cx_i, cy_i), False) >= 0 and
cv2.pointPolygonTest(c, (cx_j, cy_j), False) >= 0
for c in bubble_contours
)
if in_same_bubble:
if abs(cx_i - cx_j) < med_h * 3.0 and abs(cy_i - cy_j) < med_h * 6.0:
to_merge.append(
(bid_i, bid_j) if cy_i < cy_j else (bid_j, bid_i))
if not to_merge:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
print(f"\n🔗 Merging {len(to_merge)} fragmented bubble(s):")
merge_groups = {}
for top, bottom in to_merge:
found = False
for key in merge_groups:
if top in merge_groups[key] or bottom in merge_groups[key]:
merge_groups[key].update({top, bottom})
found = True
break
if not found:
merge_groups[len(merge_groups)] = {top, bottom}
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
merged_bids, next_bid = set(), 1
for merge_set in merge_groups.values():
merge_list = sorted(merge_set)
print(f" ✓ Merging: {', '.join(f'#{b}' for b in merge_list)}")
all_indices = sorted(set(idx for b in merge_list for idx in bubble_indices[b]))
for b in merge_list:
merged_bids.add(b)
new_bubbles[next_bid] = build_lines_from_indices(all_indices, ocr)
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_indices])
new_quads[next_bid] = [ocr[i][0] for i in all_indices]
new_indices[next_bid] = all_indices
next_bid += 1
for bid in bids:
if bid not in merged_bids:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
return new_bubbles, new_boxes, new_quads, new_indices
def merge_boxes_by_proximity_and_overlap(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, med_h):
bids = sorted(bubble_boxes.keys())
merge_map: Dict[int, List[int]] = {}
merged_into: Dict[int, int] = {}
for i, bid_i in enumerate(bids):
if bid_i in merged_into:
continue
box_i = bubble_boxes[bid_i]
wi = max(1, box_i[2] - box_i[0])
for j in range(i + 1, len(bids)):
bid_j = bids[j]
if bid_j in merged_into:
continue
box_j = bubble_boxes[bid_j]
wj = max(1, box_j[2] - box_j[0])
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
h_ix1 = max(box_i[0], box_j[0])
h_ix2 = min(box_i[2], box_j[2])
h_overlap = max(0, h_ix2 - h_ix1)
h_overlap_ratio = h_overlap / max(1, min(wi, wj))
if vert_gap <= med_h * 0.8 and h_overlap_ratio >= 0.55:
root = merged_into.get(bid_i, bid_i)
merge_map.setdefault(root, [root])
if bid_j not in merge_map[root]:
merge_map[root].append(bid_j)
merged_into[bid_j] = root
if not merge_map:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
print(f"\n🔀 Proximity+overlap merge: {len(merge_map)} group(s):")
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
processed, next_bid = set(), 1
for root, group in merge_map.items():
group_unique = sorted(set(group))
print(f" ✓ Merging: {', '.join(f'#{b}' for b in group_unique)}")
all_indices = sorted(set(idx for b in group_unique for idx in bubble_indices[b]))
new_bubbles[next_bid] = build_lines_from_indices(all_indices, ocr)
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_indices])
new_quads[next_bid] = [ocr[i][0] for i in all_indices]
new_indices[next_bid] = all_indices
next_bid += 1
processed.update(group_unique)
for bid in bids:
if bid not in processed:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
return new_bubbles, new_boxes, new_quads, new_indices
def _majority_contour_id(indices: list, quad_to_bubble: Dict[int, int]) -> int:
from collections import Counter
ids = [quad_to_bubble.get(i, -1) for i in indices]
valid = [cid for cid in ids if cid != -1]
if not valid:
return -1
return Counter(valid).most_common(1)[0][0]
def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bubble_contours = detect_speech_bubbles(image_bgr)
quad_to_bubble = build_quad_to_bubble_map(ocr, bubble_contours)
bids = sorted(bubble_boxes.keys(),
key=lambda b: (bubble_boxes[b][1] + bubble_boxes[b][3]) / 2.0)
merge_pairs = []
visited = set()
for i in range(len(bids)):
bid_i = bids[i]
if bid_i in visited:
continue
box_i = bubble_boxes[bid_i]
text_i = normalize_text(" ".join(bubbles.get(bid_i, [])))
if region_text_role_hint(text_i) == "sfx":
continue
for j in range(i + 1, len(bids)):
bid_j = bids[j]
if bid_j in visited:
continue
box_j = bubble_boxes[bid_j]
text_j = normalize_text(" ".join(bubbles.get(bid_j, [])))
if region_text_role_hint(text_j) == "sfx":
continue
idx_i = bubble_indices[bid_i]
idx_j = bubble_indices[bid_j]
if not idx_i or not idx_j:
continue
cid_i = _majority_contour_id(idx_i, quad_to_bubble)
cid_j = _majority_contour_id(idx_j, quad_to_bubble)
if cid_i == -1 or cid_j == -1 or cid_i != cid_j:
continue
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
if vert_gap > med_h * 3.5:
continue
h_overlap = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0]))
min_w = min(xyxy_width(box_i), xyxy_width(box_j))
if h_overlap / max(1, min_w) < 0.20:
continue
merge_pairs.append((bid_i, bid_j))
visited.add(bid_i)
visited.add(bid_j)
break
if not merge_pairs:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
print(f"\n🔗 Continuation merge: {len(merge_pairs)} pair(s):")
processed = set()
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid = 1
for bid_a, bid_b in merge_pairs:
print(f" ✓ Merging BOX#{bid_a} + BOX#{bid_b}")
all_idx = sorted(
set(bubble_indices[bid_a]) | set(bubble_indices[bid_b]),
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
)
new_bubbles[next_bid] = build_lines_from_indices(all_idx, ocr)
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_idx])
new_quads[next_bid] = [ocr[i][0] for i in all_idx]
new_indices[next_bid] = all_idx
processed.update({bid_a, bid_b})
next_bid += 1
for bid in bids:
if bid not in processed:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
return new_bubbles, new_boxes, new_quads, new_indices
def merge_same_column_dialogue_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
ih, iw = image_bgr.shape[:2]
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bids = sorted(bubble_boxes.keys(),
key=lambda b: (bubble_boxes[b][1] + bubble_boxes[b][3]) / 2.0)
merge_pairs = []
visited = set()
for i in range(len(bids)):
bid_i = bids[i]
if bid_i in visited:
continue
box_i = bubble_boxes[bid_i]
text_i = normalize_text(" ".join(bubbles.get(bid_i, [])))
if region_text_role_hint(text_i) == "sfx":
continue
for j in range(i + 1, len(bids)):
bid_j = bids[j]
if bid_j in visited:
continue
box_j = bubble_boxes[bid_j]
text_j = normalize_text(" ".join(bubbles.get(bid_j, [])))
if region_text_role_hint(text_j) == "sfx":
continue
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
if vert_gap > med_h * 4.0:
continue
h_ov = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0]))
min_w = min(xyxy_width(box_i), xyxy_width(box_j))
if h_ov / max(1, min_w) < 0.50:
continue
merged_h = max(box_i[3], box_j[3]) - min(box_i[1], box_j[1])
if merged_h > ih * 0.35:
continue
merge_pairs.append((bid_i, bid_j))
visited.add(bid_i)
visited.add(bid_j)
break
if not merge_pairs:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
print(f"\n📐 Same-column dialogue merge: {len(merge_pairs)} pair(s):")
processed = set()
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid = 1
for bid_a, bid_b in merge_pairs:
print(f" ✓ Merging BOX#{bid_a} + BOX#{bid_b}")
all_idx = sorted(
set(bubble_indices[bid_a]) | set(bubble_indices[bid_b]),
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
)
new_bubbles[next_bid] = build_lines_from_indices(all_idx, ocr)
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_idx])
new_quads[next_bid] = [ocr[i][0] for i in all_idx]
new_indices[next_bid] = all_idx
processed.update({bid_a, bid_b})
next_bid += 1
for bid in bids:
if bid not in processed:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
return new_bubbles, new_boxes, new_quads, new_indices
def auto_fix_bubble_detection(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
print("\n🔍 Running automatic bubble detection fixes...")
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
detect_and_split_multi_bubble_boxes(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
detect_and_merge_fragmented_bubbles(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_continuation_boxes(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_same_column_dialogue_boxes(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
# Pass 1
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_boxes_by_proximity_and_overlap(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, med_h)
# Pass 2
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_boxes_by_proximity_and_overlap(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, med_h)
return bubbles, bubble_boxes, bubble_quads, bubble_indices
def remove_nested_boxes(bubble_boxes, bubble_indices, bubble_quads, bubbles,
overlap_threshold=0.50):
bids = list(bubble_boxes.keys())
to_remove = set()
for i in range(len(bids)):
bid_i = bids[i]
if bid_i in to_remove:
continue
box_i = bubble_boxes[bid_i]
area_i = max(0, box_i[2]-box_i[0]) * max(0, box_i[3]-box_i[1])
for j in range(i + 1, len(bids)):
bid_j = bids[j]
if bid_j in to_remove:
continue
box_j = bubble_boxes[bid_j]
area_j = max(0, box_j[2]-box_j[0]) * max(0, box_j[3]-box_j[1])
shared = set(bubble_indices[bid_i]).intersection(bubble_indices[bid_j])
overlap = boxes_overlap_ratio(box_i, box_j)
if overlap > overlap_threshold or len(shared) > 0:
if area_i >= area_j:
to_remove.add(bid_j)
print(f" 🗑️ Removing BOX#{bid_j} (overlaps BOX#{bid_i})")
else:
to_remove.add(bid_i)
print(f" 🗑️ Removing BOX#{bid_i} (overlaps BOX#{bid_j})")
break
if to_remove:
print(f"\n🧹 Removed {len(to_remove)} overlapping/nested box(es)")
for bid in to_remove:
bubble_boxes.pop(bid, None)
bubble_indices.pop(bid, None)
bubble_quads.pop(bid, None)
bubbles.pop(bid, None)
return bubbles, bubble_boxes, bubble_quads, bubble_indices
def enforce_max_box_size(bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr,
max_width_ratio=0.6, max_height_ratio=0.5, image_shape=None):
if image_shape is None:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
ih, iw = image_shape[:2]
max_width = iw * max_width_ratio
max_height = ih * max_height_ratio
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid, splits_made = 1, []
for bid, box in bubble_boxes.items():
x1, y1, x2, y2 = box
w, h = x2 - x1, y2 - y1
if w > max_width or h > max_height:
indices = bubble_indices[bid]
col_split = split_bubble_if_multiple_columns(
indices, ocr, bid=bid, use_aggressive_thresholds=True)
if col_split:
for grp in col_split:
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in grp])
new_quads[next_bid] = [ocr[i][0] for i in grp]
new_indices[next_bid] = grp
next_bid += 1
splits_made.append(f"BOX#{bid} (oversized: {w}x{h}px)")
continue
row_split = split_bubble_if_multiple_rows(indices, ocr, bid=bid)
if row_split:
for grp in row_split:
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in grp])
new_quads[next_bid] = [ocr[i][0] for i in grp]
new_indices[next_bid] = grp
next_bid += 1
splits_made.append(f"BOX#{bid} (oversized: {w}x{h}px)")
continue
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = box
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
if splits_made:
print(f"\n📏 Split {len(splits_made)} oversized box(es):")
for s in splits_made:
print(f"{s}")
return new_bubbles, new_boxes, new_quads, new_indices
def should_merge_groups(group1_indices, group2_indices, ocr, median_height,
max_vertical_gap=None):
if max_vertical_gap is None:
max_vertical_gap = median_height * 2.5
box1 = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group1_indices])
box2 = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group2_indices])
if box1 is None or box2 is None:
return False
cx1 = (box1[0] + box1[2]) / 2.0
cx2 = (box2[0] + box2[2]) / 2.0
if abs(cx1 - cx2) > median_height * 1.8:
return False
vertical_gap = max(0, max(box1[1], box2[1]) - min(box1[3], box2[3]))
return vertical_gap <= max_vertical_gap
# ============================================================
# CONTOUR-AWARE BUBBLE SPLITTING
# ============================================================
def split_group_by_contour_membership(indices: list, ocr: list,
quad_to_bubble: Dict[int, int]) -> List[List[int]]:
buckets: Dict[int, List[int]] = {}
for idx in indices:
cid = quad_to_bubble.get(idx, -1)
buckets.setdefault(cid, []).append(idx)
if len(buckets) <= 1:
return [indices]
result = []
for cid, group in sorted(buckets.items()):
group_sorted = sorted(group,
key=lambda i: (quad_bbox(ocr[i][0])[1],
quad_bbox(ocr[i][0])[0]))
result.append(group_sorted)
return result
def split_group_by_region_type(indices: list, ocr: list) -> List[List[int]]:
if len(indices) <= 1:
return [indices]
typed: Dict[str, List[int]] = {}
for idx in indices:
text = normalize_text(ocr[idx][1])
role = region_text_role_hint(text)
typed.setdefault(role, []).append(idx)
has_sfx = "sfx" in typed
has_dialogue = "dialogue" in typed or "narration" in typed or "reaction" in typed
if not (has_sfx and has_dialogue):
return [indices]
result = []
for role in ("dialogue", "narration", "reaction", "sfx", "unknown"):
group = typed.get(role, [])
if group:
group_sorted = sorted(
group,
key=lambda i: (quad_bbox(ocr[i][0])[1], quad_bbox(ocr[i][0])[0])
)
result.append(group_sorted)
return result if len(result) > 1 else [indices]
def split_group_by_spatial_gap(indices: list, ocr: list,
gap_factor: float = 1.2) -> List[List[int]]:
if len(indices) <= 1:
return [indices]
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in indices]
med_h = float(np.median(all_h)) if all_h else 14.0
sorted_by_y = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[1])
inter_gaps_y = []
for k in range(len(sorted_by_y) - 1):
b_curr = quad_bbox(ocr[sorted_by_y[k]][0])
b_next = quad_bbox(ocr[sorted_by_y[k+1]][0])
gap = b_next[1] - b_curr[3]
if gap > 0:
inter_gaps_y.append(gap)
if inter_gaps_y:
median_inter = float(np.median(inter_gaps_y))
gap_threshold_y = min(med_h * gap_factor,
max(med_h * 0.8, median_inter * 2.5))
else:
gap_threshold_y = med_h * gap_factor
sorted_by_x = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[0])
boxes_x = [quad_bbox(ocr[i][0]) for i in sorted_by_x]
inter_gaps_x = []
for k in range(len(sorted_by_x) - 1):
gap = boxes_x[k+1][0] - boxes_x[k][2]
if gap > 0:
inter_gaps_x.append(gap)
if inter_gaps_x:
median_inter_x = float(np.median(inter_gaps_x))
gap_threshold_x = min(med_h * gap_factor,
max(med_h * 0.8, median_inter_x * 2.5))
else:
gap_threshold_x = med_h * gap_factor
best_h_gap, best_h_split = 0.0, None
for k in range(len(sorted_by_x) - 1):
gap = boxes_x[k + 1][0] - boxes_x[k][2]
if gap > gap_threshold_x and gap > best_h_gap:
best_h_gap = gap
best_h_split = k
if best_h_split is not None:
left = [sorted_by_x[i] for i in range(best_h_split + 1)]
right = [sorted_by_x[i] for i in range(best_h_split + 1, len(sorted_by_x))]
if left and right:
return (split_group_by_spatial_gap(left, ocr, gap_factor) +
split_group_by_spatial_gap(right, ocr, gap_factor))
boxes_y = [quad_bbox(ocr[i][0]) for i in sorted_by_y]
best_v_gap, best_v_split = 0.0, None
for k in range(len(sorted_by_y) - 1):
gap = boxes_y[k + 1][1] - boxes_y[k][3]
if gap > gap_threshold_y and gap > best_v_gap:
best_v_gap = gap
best_v_split = k
if best_v_split is not None:
top = [sorted_by_y[i] for i in range(best_v_split + 1)]
bottom = [sorted_by_y[i] for i in range(best_v_split + 1, len(sorted_by_y))]
if top and bottom:
return (split_group_by_spatial_gap(top, ocr, gap_factor) +
split_group_by_spatial_gap(bottom, ocr, gap_factor))
return [indices]
def split_at_sentence_boundaries(
indices: List[int],
lines: List[str],
ocr: List[Tuple],
min_gap_px: int = 8
) -> List[List[int]]:
"""
Split a flat list of quad indices at sentence-ending punctuation
boundaries IF there is a measurable vertical gap between the last
quad of sentence N and the first quad of sentence N+1.
Returns a list of groups (each group is a List[int] of indices).
Always returns at least one group (the original) if no split fires.
"""
if not indices or len(indices) < 2:
return [indices]
# Sort quads top-to-bottom by their y coordinate
sorted_idx = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[1])
# Rebuild full text in reading order
full_text = " ".join(ocr[i][1] for i in sorted_idx)
# Fix common OCR mangling: trailing I after ALL-CAPS word → !
# e.g. "LIKE THISI IF" → "LIKE THIS! IF"
full_text = re.sub(r'([A-Z]{2,})I(\s+[A-Z])', r'\1! \2', full_text)
full_text = re.sub(r'([A-Z]{2,})I$', r'\1!', full_text)
# Find ALL sentence boundaries, not just the first one
boundary_positions = [
m.start() for m in re.finditer(r'[.!?]\s+[A-Z]', full_text)
]
if not boundary_positions:
return [indices]
# Map each boundary character position → quad position in sorted_idx
split_after_positions = []
for boundary_pos in boundary_positions:
char_cursor = 0
for pos, i in enumerate(sorted_idx):
char_cursor += len(ocr[i][1]) + 1 # +1 for the joining space
if char_cursor >= boundary_pos + 2:
# Only a valid split if not at the very last quad
if pos < len(sorted_idx) - 1:
split_after_positions.append(pos)
break
if not split_after_positions:
return [indices]
# Deduplicate and sort
split_after_positions = sorted(set(split_after_positions))
# Validate each candidate with a vertical gap check
confirmed_splits = []
for pos in split_after_positions:
bbox_a = quad_bbox(ocr[sorted_idx[pos]][0])
bbox_b = quad_bbox(ocr[sorted_idx[pos + 1]][0])
bottom_a = bbox_a[1] + bbox_a[3] # y + h of last quad in group A
top_b = bbox_b[1] # y of first quad in group B
gap = top_b - bottom_a
if gap >= min_gap_px:
confirmed_splits.append(pos)
if not confirmed_splits:
return [indices]
# Slice sorted_idx into groups at each confirmed split point
groups = []
prev_pos = 0
for split_pos in confirmed_splits:
groups.append(sorted_idx[prev_pos : split_pos + 1])
prev_pos = split_pos + 1
groups.append(sorted_idx[prev_pos:]) # remainder
# Drop any empty groups (safety)
return [g for g in groups if g]
def apply_contour_split_to_all_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
"""
Pre-pass that runs BEFORE proximity merging.
Chains four split strategies in order:
1. Contour membership — different speech-bubble contours
2. Mixed region type — sfx quads merged with dialogue quads
3. Spatial gap — two dialogue bubbles side-by-side
4. Sentence boundary — tall box containing two stacked bubbles
"""
bubble_contours = detect_speech_bubbles(image_bgr)
quad_to_bubble = (build_quad_to_bubble_map(ocr, bubble_contours)
if bubble_contours else {})
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid = 1
splits_made = []
for bid in sorted(bubble_boxes.keys()):
indices = bubble_indices[bid]
# ── Strategy 1: contour membership ──────────────────────────────
groups = split_group_by_contour_membership(indices, ocr, quad_to_bubble)
# ── Strategy 2: mixed region type ───────────────────────────────
refined = []
for grp in groups:
sub = split_group_by_region_type(grp, ocr)
refined.extend(sub)
groups = refined
# ── Strategy 3: spatial gap ──────────────────────────────────────
gapped = []
for grp in groups:
sub = split_group_by_spatial_gap(grp, ocr, gap_factor=1.8)
gapped.extend(sub)
groups = gapped
# ── Strategy 4: sentence boundary ───────────────────────────────
# Signature: (indices, lines, ocr, min_gap_px) → List[List[int]]
sentenced = []
for grp in groups:
grp_lines = [normalize_text(ocr[i][1]) for i in grp]
sub = split_at_sentence_boundaries(
grp,
grp_lines,
ocr,
min_gap_px=8
)
sentenced.extend(sub)
groups = sentenced
# ── Commit results ───────────────────────────────────────────────
if len(groups) <= 1:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = indices
next_bid += 1
continue
for grp in groups:
if not grp:
continue
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
new_boxes[next_bid] = boxes_union_xyxy(
[quad_bbox(ocr[i][0]) for i in grp])
new_quads[next_bid] = [ocr[i][0] for i in grp]
new_indices[next_bid] = grp
next_bid += 1
splits_made.append(f"BOX#{bid}{len(groups)} groups")
if splits_made:
print(f"\n✂️ Contour-aware pre-split: {len(splits_made)} box(es) split:")
for s in splits_made:
print(f"{s}")
return new_bubbles, new_boxes, new_quads, new_indices
# ============================================================
# SPLIT HELPERS FOR enforce_max_box_size
# ============================================================
def split_bubble_if_multiple_columns(indices, ocr, bid=None,
use_aggressive_thresholds=False):
if len(indices) < 2:
return None
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
med_h = float(np.median(all_h)) if all_h else 14.0
factor = 1.5 if use_aggressive_thresholds else 2.5
result = detect_horizontal_gap_in_group(indices, ocr, med_h, gap_factor=factor)
if result is None:
return None
left_group, right_group = result
if not left_group or not right_group:
return None
return [left_group, right_group]
def split_bubble_if_multiple_rows(indices, ocr, bid=None):
if len(indices) < 2:
return None
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
med_h = float(np.median(all_h)) if all_h else 14.0
groups = check_vertical_alignment_split(indices, ocr, threshold=int(med_h * 2.5))
if len(groups) > 1:
return groups
return None
# ============================================================
# BUILD LINES FROM INDICES
# ============================================================
def build_lines_from_indices(indices, ocr, reading_mode="ltr"):
if not indices:
return []
return build_text_from_layout(indices, ocr, reading_mode=reading_mode)
def split_indices_into_vertical_blocks(indices, ocr, gap_factor=4.0):
"""FIX A: gap_factor raised from 2.5 → 4.0"""
if not indices:
return []
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
med_h = float(np.median(all_h)) if all_h else 14.0
gap_th = med_h * gap_factor
sorted_idx = sorted(indices, key=lambda i: (quad_bbox(ocr[i][0])[1],
quad_bbox(ocr[i][0])[0]))
blocks = [[sorted_idx[0]]]
for k in range(1, len(sorted_idx)):
prev_box = quad_bbox(ocr[sorted_idx[k-1]][0])
curr_box = quad_bbox(ocr[sorted_idx[k]][0])
gap = curr_box[1] - prev_box[3]
if gap > gap_th:
blocks.append([])
blocks[-1].append(sorted_idx[k])
return blocks
# ============================================================
# ENHANCED OCR ENGINE
# ============================================================
class ImprovedMacVisionDetector:
def __init__(self, source_lang="en"):
lang_key = source_lang.lower().strip()
lang_map = {
"en": "en-US", "english": "en-US",
"es": "es-ES", "spanish": "es-ES",
"ca": "ca-ES", "catalan": "ca-ES",
"fr": "fr-FR", "french": "fr-FR",
"ja": "ja-JP", "japanese": "ja-JP",
"it": "it-IT", "italian": "it-IT",
"de": "de-DE", "german": "de-DE",
"ko": "ko-KR", "korean": "ko-KR",
"zh": "zh-Hans", "chinese": "zh-Hans",
}
self.langs = [lang_map.get(lang_key, "en-US")]
print(f"⚡ Using Enhanced Apple Vision OCR (Language: {self.langs[0]})")
def preprocess_variants(self, image_bgr):
variants = [("enhanced", enhance_image_for_ocr(image_bgr, upscale_factor=2.5))]
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
_, hc = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
variants.append(("high_contrast",
cv2.cvtColor(
cv2.resize(hc, None, fx=2.5, fy=2.5,
interpolation=cv2.INTER_CUBIC),
cv2.COLOR_GRAY2BGR)))
variants.append(("bilateral",
cv2.resize(
cv2.bilateralFilter(image_bgr, 9, 75, 75),
None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)))
variants.append(("inverted",
cv2.resize(
cv2.bitwise_not(image_bgr),
None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)))
variants.append(("original",
cv2.resize(image_bgr, None, fx=2.5, fy=2.5,
interpolation=cv2.INTER_CUBIC)))
return variants
def run_vision_ocr(self, image_bgr):
if image_bgr is None or image_bgr.size == 0:
return []
ih, iw = image_bgr.shape[:2]
success, buffer = cv2.imencode('.png', image_bgr)
if not success:
return []
ns_data = NSData.dataWithBytes_length_(buffer.tobytes(), len(buffer))
cg_image = Quartz.CGImageSourceCreateWithData(ns_data, None)
cg_image = Quartz.CGImageSourceCreateImageAtIndex(cg_image, 0, None)
request = Vision.VNRecognizeTextRequest.alloc().init()
request.setRecognitionLevel_(1)
request.setUsesLanguageCorrection_(True)
request.setRecognitionLanguages_(self.langs)
handler = Vision.VNImageRequestHandler.alloc().initWithCGImage_options_(
cg_image, {})
handler.performRequests_error_([request], None)
results = []
scale_x, scale_y = iw, ih
for obs in (request.results() or []):
bbox = obs.boundingBox()
x1 = int(bbox.origin.x * scale_x)
y1 = int((1 - bbox.origin.y - bbox.size.height) * scale_y)
x2 = int((bbox.origin.x + bbox.size.width) * scale_x)
y2 = int((1 - bbox.origin.y) * scale_y)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(iw, x2), min(ih, y2)
if x2 <= x1 or y2 <= y1:
continue
text = obs.topCandidates_(1)[0].string() if obs.topCandidates_(1) else ""
conf = float(obs.topCandidates_(1)[0].confidence()) \
if obs.topCandidates_(1) else 0.0
quad = [[x1,y1],[x2,y1],[x2,y2],[x1,y2]]
results.append((quad, text, conf))
return results
def detect(self, image_bgr):
if image_bgr is None or image_bgr.size == 0:
return []
variants = self.preprocess_variants(image_bgr)
all_results = []
variant_names = []
for name, variant_img in variants:
try:
res = self.run_vision_ocr(variant_img)
vh, vw = variant_img.shape[:2]
oh, ow = image_bgr.shape[:2]
sx, sy = ow / max(1, vw), oh / max(1, vh)
scaled = []
for quad, text, conf in res:
sq = [[int(p[0]*sx), int(p[1]*sy)] for p in quad]
scaled.append((sq, text, conf))
all_results.append(scaled)
variant_names.append(name)
except Exception as e:
print(f" ⚠️ Variant '{name}' failed: {e}")
if not all_results:
return []
return self._merge_variant_results(all_results, variant_names)
def _merge_variant_results(self, all_results, variant_names):
"""FIX E: use self.langs[0] locale for is_meaningful_text()"""
if not all_results:
return []
lang_code = self.langs[0].split("-")[0].lower()
base_idx = max(range(len(all_results)), key=lambda i: len(all_results[i]))
base = list(all_results[base_idx])
others = [r for i, r in enumerate(all_results) if i != base_idx]
for other in others:
for quad_o, text_o, conf_o in other:
box_o = quad_bbox(quad_o)
matched = False
for k, (quad_b, text_b, conf_b) in enumerate(base):
box_b = quad_bbox(quad_b)
if boxes_iou(box_o, box_b) > 0.40:
if conf_o > conf_b:
base[k] = (quad_b, text_o, conf_o)
matched = True
break
if not matched and is_meaningful_text(text_o, lang_code):
base.append((quad_o, text_o, conf_o))
return base
# ============================================================
# MAIN PIPELINE ENTRY POINT
# ============================================================
def process_manga_page(image_path: str,
source_lang: str = "en",
target_lang: str = "ca",
output_json: str = None,
output_txt: str = None) -> Dict[str, Any]:
print(f"\n{'='*60}")
print(f"📖 Processing: {os.path.basename(image_path)}")
print(f"{'='*60}")
image_bgr = cv2.imread(image_path)
if image_bgr is None:
raise FileNotFoundError(f"Cannot load image: {image_path}")
ih, iw = image_bgr.shape[:2]
print(f" Image size: {iw}×{ih}px")
# ── Step 2: OCR ──────────────────────────────────────────
detector = ImprovedMacVisionDetector(source_lang=source_lang)
raw_ocr = detector.detect(image_bgr)
print(f" Raw OCR detections: {len(raw_ocr)}")
# ── Step 3: Filter ───────────────────────────────────────
filtered_ocr = []
for quad, text, conf in raw_ocr:
text_clean = fix_common_ocr_errors(text)
if not is_meaningful_text(text_clean, source_lang):
continue
if not is_valid_language(text_clean, source_lang):
continue
filtered_ocr.append((quad, text_clean, conf))
filtered_ocr, _ = validate_and_split_oversized_quads(image_bgr, filtered_ocr)
if not filtered_ocr:
print(" ⚠️ No valid OCR results after filtering.")
return {}
print(f" Filtered OCR detections: {len(filtered_ocr)}")
ocr = [(item[0], item[1], item[2]) for item in filtered_ocr]
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
# ── Step 4: Pre-split wide quads ─────────────────────────
ocr_list, _ = apply_column_gap_splits(image_bgr, ocr, med_h)
ocr = ocr_list
# Recompute med_h after potential splits
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
# ── Step 5: Propose regions (contour-aware) ──────────────
region_lines, region_boxes, region_quads, region_indices = \
propose_text_regions_from_ocr(ocr, image_bgr.shape, image_bgr=image_bgr)
print(f" Proposed regions: {len(region_boxes)}")
# ── Step 6: Contour-aware pre-split ──────────────────────
region_lines, region_boxes, region_quads, region_indices = \
apply_contour_split_to_all_boxes(
region_boxes, region_indices, region_quads,
region_lines, ocr, image_bgr)
print(f" Regions after contour split: {len(region_boxes)}")
# ── Step 7: Auto-fix bubble detection ────────────────────
region_lines, region_boxes, region_quads, region_indices = \
auto_fix_bubble_detection(
region_boxes, region_indices, region_quads,
region_lines, ocr, image_bgr)
print(f" Regions after auto-fix: {len(region_boxes)}")
# ── Step 8: Reconcile region + bubble groups ─────────────
out_lines, out_boxes, out_quads, out_indices = \
reconcile_region_and_bubble_groups(
region_lines, region_boxes, region_quads, region_indices,
region_lines, region_boxes, region_quads, region_indices,
ocr)
print(f" Boxes after reconciliation: {len(out_boxes)}")
# ── Step 9: Remove nested / duplicate boxes ───────────────
out_lines, out_boxes, out_quads, out_indices = \
remove_nested_boxes(out_boxes, out_indices, out_quads, out_lines,
overlap_threshold=0.50)
print(f" Boxes after dedup: {len(out_boxes)}")
# ── Step 10: Enforce max box size ─────────────────────────
out_lines, out_boxes, out_quads, out_indices = \
enforce_max_box_size(out_boxes, out_indices, out_quads, out_lines,
ocr, image_shape=image_bgr.shape)
print(f" Boxes after size enforcement: {len(out_boxes)}")
# ── Step 11 + 12: Classify, correct, score ────────────────
translator = GoogleTranslator(source=source_lang, target=target_lang)
results: Dict[str, Any] = {}
bid_order = sorted(
out_boxes.keys(),
key=lambda b: (
(out_boxes[b][1] + out_boxes[b][3]) / 2.0,
(out_boxes[b][0] + out_boxes[b][2]) / 2.0,
)
)
for order_idx, bid in enumerate(bid_order, start=1):
box = out_boxes[bid]
indices = out_indices[bid]
lines = out_lines[bid]
raw_text = normalize_text(" ".join(lines))
if not raw_text:
continue
# Classify
region_type = classify_region_type(image_bgr, box, lines)
# Correct OCR
corrected_text, correction_gain = correct_region_text(raw_text, region_type)
# Apply bold-font fixes on top of dialogue correction
corrected_text = fix_common_ocr_errors(corrected_text)
# 👉 INJECTED FIX: Adjust the box if words were added
adjusted_box_xyxy = adjust_box_for_added_text(box, raw_text, corrected_text)
# Confidence (using the adjusted box)
conf = compute_region_confidence(
raw_text, corrected_text, adjusted_box_xyxy, region_type, image_bgr)
conf = maybe_conf_floor_for_protected(corrected_text, conf)
# Flags
flags = build_region_flags(raw_text, corrected_text, region_type, conf)
# Bubble groups (lines as rendered in the bubble)
bubble_groups = build_text_from_layout(indices, ocr)
# ── Step 13: Translate ────────────────────────────────
translated = ""
translation_input = corrected_text
if region_type not in {"sfx"} and is_meaningful_text(corrected_text, source_lang):
try:
raw_translation = translator.translate(translation_input)
translated = postprocess_translation_general(raw_translation or "")
except Exception as e:
print(f" ⚠️ Translation failed for BOX#{bid}: {e}")
translated = corrected_text
# Segment bubble_groups into || separated string for output
bubble_groups_str = " || ".join(bubble_groups) if bubble_groups else corrected_text
# Determine OCR source label
ocr_source = "vision-base"
if correction_gain > 0.05:
ocr_source = "vision-reread"
# Add BUBBLE / SEGMENTED flags
if bubble_groups and len(bubble_groups) > 1:
if "BUBBLE" not in flags:
flags.append("BUBBLE")
if "SEGMENTED" not in flags:
flags.append("SEGMENTED")
results[str(bid)] = {
"order": order_idx,
"region_type": region_type,
"confidence": round(conf, 4),
"ocr_source": ocr_source,
"raw_ocr": raw_text,
"corrected_ocr": corrected_text,
"translation_input": translation_input,
"translated": translated,
"flags": flags,
"bubble_groups": bubble_groups,
"box": xyxy_to_xywh(adjusted_box_xyxy), # <--- Uses the adjusted box
"lines": bubble_groups,
}
print(f"\n ✅ Processed {len(results)} text region(s).")
# ── Step 14: Write outputs ────────────────────────────────
if output_json:
_write_json_output(results, output_json)
if output_txt:
_write_txt_output(results, output_txt)
return results
# ============================================================
# OUTPUT WRITERS
# ============================================================
def _write_json_output(results: Dict[str, Any], path: str) -> None:
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f" 💾 JSON saved → {path}")
except Exception as e:
print(f" ⚠️ Failed to write JSON: {e}")
def _write_txt_output(results: Dict[str, Any], path: str) -> None:
sep = "" * 120
lines = [
"BUBBLE|ORDER|TYPE|CONF|OCR_SOURCE|ORIGINAL|CORRECTED|BUBBLE_GROUPS|TRANSLATED|FLAGS",
sep,
]
for bid, data in sorted(results.items(), key=lambda kv: kv[1]["order"]):
bubble_groups_str = " || ".join(data.get("bubble_groups", []))
flags_str = ",".join(data.get("flags", []))
row = (
f"#{bid}"
f"|{data['order']}"
f"|{data['region_type']}"
f"|{data['confidence']:.2f}"
f"|{data['ocr_source']}"
f"|{data['raw_ocr']}"
f"|{data['corrected_ocr']}"
f"|{bubble_groups_str}"
f"|{data['translated']}"
f"|{flags_str}"
)
lines.append(row)
try:
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
print(f" 📄 TXT saved → {path}")
except Exception as e:
print(f" ⚠️ Failed to write TXT: {e}")
# ============================================================
# DEBUG VISUALISER
# ============================================================
def draw_debug_clusters(image_bgr: np.ndarray,
out_boxes: Dict[int, tuple],
out_lines: Dict[int, list],
out_indices: Dict[int, list],
ocr: list,
save_path: str = None) -> np.ndarray:
vis = image_bgr.copy()
ih, iw = vis.shape[:2]
COLOR_MAP = {
"dialogue": (0, 200, 0),
"narration": (0, 165, 255),
"reaction": (255, 200, 0),
"sfx": (0, 0, 220),
"unknown": (120, 120, 120),
}
bid_order = sorted(
out_boxes.keys(),
key=lambda b: (
(out_boxes[b][1] + out_boxes[b][3]) / 2.0,
(out_boxes[b][0] + out_boxes[b][2]) / 2.0,
)
)
for order_idx, bid in enumerate(bid_order, start=1):
box = out_boxes[bid]
lines = out_lines.get(bid, [])
text = normalize_text(" ".join(lines))
rtype = region_text_role_hint(text)
color = COLOR_MAP.get(rtype, (120, 120, 120))
x1, y1, x2, y2 = box
cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)
label = f"BOX#{bid} [{rtype}]"
preview = (text[:40] + "...") if len(text) > 40 else text
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.38
thickness = 1
(lw, lh), _ = cv2.getTextSize(label, font, font_scale, thickness)
cv2.rectangle(vis,
(x1, max(0, y1 - lh - 6)),
(x1 + lw + 4, y1),
color, -1)
cv2.putText(vis, label,
(x1 + 2, max(lh, y1 - 3)),
font, font_scale, (255, 255, 255), thickness,
cv2.LINE_AA)
cv2.putText(vis, preview,
(x1 + 2, min(ih - 5, y1 + lh + 6)),
font, font_scale * 0.85, color, thickness,
cv2.LINE_AA)
# Draw individual OCR quad outlines
for idx in out_indices.get(bid, []):
if idx < len(ocr):
q = ocr[idx][0]
pts = np.array(q, dtype=np.int32).reshape((-1, 1, 2))
cv2.polylines(vis, [pts], True,
tuple(min(255, c + 80) for c in color), 1)
if save_path:
cv2.imwrite(save_path, vis)
print(f" 🖼️ Debug image saved → {save_path}")
return vis
# ============================================================
# CLI ENTRY POINT
# ============================================================
def main():
import argparse
parser = argparse.ArgumentParser(
description="Manga page OCR + translation pipeline (macOS Vision)")
parser.add_argument("image", help="Path to manga page image")
parser.add_argument("--source", "-s", default="en",
help="Source language code (default: en)")
parser.add_argument("--target", "-t", default="ca",
help="Target language code (default: ca)")
parser.add_argument("--json", "-j", default=None,
help="Output JSON file path")
parser.add_argument("--txt", "-o", default=None,
help="Output TXT file path")
parser.add_argument("--debug", "-d", default=None,
help="Save debug visualisation to this path")
args = parser.parse_args()
base = os.path.splitext(args.image)[0]
json_out = args.json or f"{base}_bubbles.json"
txt_out = args.txt or f"{base}_output.txt"
debug_out = args.debug or f"{base}_debug_clusters.png"
results = process_manga_page(
image_path = args.image,
source_lang = args.source,
target_lang = args.target,
output_json = json_out,
output_txt = txt_out,
)
if not results:
print("\n❌ No results produced.")
return
# ── Debug visualisation ───────────────────────────────────
image_bgr = cv2.imread(args.image)
if image_bgr is not None:
vis_boxes: Dict[int, tuple] = {}
vis_lines: Dict[int, list] = {}
vis_indices: Dict[int, list] = {}
for bid_str, data in results.items():
bid = int(bid_str)
xywh = data["box"]
vis_boxes[bid] = (
xywh["x"],
xywh["y"],
xywh["x"] + xywh["w"],
xywh["y"] + xywh["h"],
)
vis_lines[bid] = data.get("lines", [])
vis_indices[bid] = []
draw_debug_clusters(
image_bgr,
vis_boxes,
vis_lines,
vis_indices,
ocr = [],
save_path = debug_out,
)
# ── Console summary ───────────────────────────────────────
print(f"\n{'='*60}")
print(f"📊 SUMMARY ({len(results)} boxes)")
print(f"{'='*60}")
for bid_str, data in sorted(results.items(), key=lambda kv: kv[1]["order"]):
print(
f" #{bid_str:>3} [{data['region_type']:<9}] "
f"conf={data['confidence']:.2f} "
f"\"{data['corrected_ocr'][:55]}\""
)
print(f"{'='*60}\n")
if __name__ == "__main__":
main()