2785 lines
102 KiB
Python
2785 lines
102 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import cv2
|
||
import numpy as np
|
||
import warnings
|
||
from typing import List, Tuple, Dict, Any, Optional
|
||
|
||
from deep_translator import GoogleTranslator
|
||
|
||
# macOS Native Vision imports
|
||
import Vision
|
||
import Quartz
|
||
from Foundation import NSData
|
||
|
||
warnings.filterwarnings("ignore", category=UserWarning)
|
||
|
||
# ============================================================
|
||
# CONFIG
|
||
# ============================================================
|
||
TOP_BAND_RATIO = 0.08
|
||
|
||
# ============================================================
|
||
# REGION-FIRST LAYOUT HELPERS
|
||
# ============================================================
|
||
import math
|
||
from difflib import SequenceMatcher
|
||
|
||
# ============================================================
|
||
# FIX: COMMON SHORT ENGLISH WORDS (1–2 chars)
|
||
# Prevents OCR from discarding or misclassifying valid short tokens.
|
||
# Source: most frequent 1-char and 2-char English words.
|
||
# ============================================================
|
||
SHORT_ENGLISH_WORDS_1 = {
|
||
"A", "I",
|
||
}
|
||
|
||
SHORT_ENGLISH_WORDS_2 = {
|
||
"AM", "AN", "AS", "AT", "BE", "BY", "DO", "GO", "HE", "IF",
|
||
"IN", "IS", "IT", "ME", "MY", "NO", "OF", "OH", "OK", "ON",
|
||
"OR", "SO", "TO", "UP", "US", "WE","BUT","I"
|
||
}
|
||
|
||
# Combined protected set used by is_meaningful_text()
|
||
SHORT_ENGLISH_PROTECTED = SHORT_ENGLISH_WORDS_1 | SHORT_ENGLISH_WORDS_2
|
||
|
||
|
||
DIALOGUE_STOPWORDS = {
|
||
"I", "YOU", "HE", "SHE", "WE", "THEY", "IT", "ME", "MY", "YOUR", "OUR",
|
||
"IS", "ARE", "WAS", "WERE", "AM", "DO", "DID", "DON'T", "DIDN'T", "NOT",
|
||
"WHAT", "WHY", "HOW", "WHO", "IN", "ON", "AT", "TO", "OF", "FOR", "WITH",
|
||
"AND", "BUT", "SO", "THAT", "THIS", "THERE", "HERE", "THAN", "ALL", "RIGHT"
|
||
}
|
||
|
||
# FIX: SFX_HINTS contains ONLY pure onomatopoeia — no words
|
||
# that could appear in dialogue (MORNING, GOOD, etc. removed)
|
||
SFX_HINTS = {
|
||
# impact / hits
|
||
"BAM", "BOOM", "WHAM", "SLAM", "SMACK", "THUD", "CRACK",
|
||
"CRASH", "BANG", "POW", "BIFF", "BONK", "CLUNK", "CLANG",
|
||
"THWACK", "WHAP", "WHUMP", "FWAP", "FWUP", "FWOOP",
|
||
# motion / air
|
||
"FSHOO", "WHOOSH", "SWISH", "SWOOSH", "WOOSH", "ZOOM",
|
||
"VROOM", "WHIRR", "WHIZZ",
|
||
# bells / rings
|
||
"RRRING", "RING", "RINGG", "DING", "DONG", "CLANG",
|
||
"JINGLE", "CHIME",
|
||
# body / breath
|
||
"SNIF", "SNIFF", "GULP", "GASP", "WHEEZE", "PANT",
|
||
"GRUNT", "GROAN", "SNORE",
|
||
# misc short
|
||
"GRRP", "GRRR", "TICK", "TOCK", "DRIP", "PLOP",
|
||
"SQUEAK", "CREAK", "RUSTLE", "THUMP",
|
||
# typing / tech
|
||
"BEEP", "BOOP", "BUZZ", "CLICK", "CLACK",
|
||
# specific manga sfx
|
||
"FWMP", "FTMP", "FWIP", "FWSH", "SHFF", "SHFFT",
|
||
"TMP", "TMP TMP", "STEP", "STOMP",
|
||
}
|
||
|
||
# FIX: REACTION_HINTS — short emotional utterances only
|
||
# Proper nouns and greetings removed (they are dialogue)
|
||
REACTION_HINTS = {
|
||
"HUH", "HUH?!", "HUH?", "HUH??",
|
||
"OH", "OH!", "OOH", "OOH!",
|
||
"AH", "AH!", "UH", "EH", "EH?",
|
||
"TCH", "TSK",
|
||
"WHAT?!", "WHAT?",
|
||
"NO!", "YES!",
|
||
"EEK", "EEEEP", "EEEP",
|
||
}
|
||
|
||
# ============================================================
|
||
# FIX: narration and dialogue are treated as the same output type.
|
||
# Narration boxes are kept structurally but labelled as dialogue
|
||
# so they are translated and rendered identically.
|
||
# ============================================================
|
||
DIALOGUE_EQUIVALENT_TYPES = {"dialogue", "narration", "reaction"}
|
||
|
||
NARRATION_HINTS = {
|
||
"AND SO", "MEANWHILE", "LATER", "THEN", "TO BE CONTINUED"
|
||
}
|
||
|
||
# FIX: Added common sentence-leading words that are 2–3 chars
|
||
# and would otherwise be dropped by the alpha-count gate.
|
||
_MANGA_INTERJECTIONS = {
|
||
# --- existing entries ---
|
||
'HUH', 'HUH?', 'HUH??', 'HUH?!',
|
||
'OH', 'OH!', 'OOH', 'OOH!',
|
||
'AH', 'AH!', 'UH', 'UH...',
|
||
'HEY', 'HEY!',
|
||
'EH', 'EH?',
|
||
'WOW', 'WOW!',
|
||
'YES', 'NO', 'NO!',
|
||
'RUN', 'GO', 'GO!',
|
||
'STOP', 'WAIT',
|
||
'WHAT', 'WHAT?', 'WHAT?!',
|
||
'WHY', 'WHY?',
|
||
'HOW', 'HOW?',
|
||
'OK', 'OK!', 'OKAY',
|
||
'EEEEP', 'EEEP',
|
||
'OMIGOSH',
|
||
'BECKY', 'BECKY!',
|
||
'HMM', 'HMM...',
|
||
'TSK', 'TCH',
|
||
'GRRR', 'I', 'A',
|
||
'FWUP', 'FWAP',
|
||
'SHIVER',
|
||
'RRRING',
|
||
'MORNING', 'MORNING.',
|
||
# --- FIX: sentence starters and conjunctions ---
|
||
'BUT', 'AND', 'SO', 'OR', 'IF', 'AS',
|
||
'YET', 'NOR', 'FOR',
|
||
# --- FIX: common short dialogue words ---
|
||
'GET', 'GOT', 'NOT', 'NOW', 'TOO',
|
||
'YOU', 'HIM', 'HER', 'ITS', 'OUR',
|
||
'CAN', 'DID', 'HAS', 'HAD', 'LET',
|
||
'SAY', 'SEE', 'TRY', 'USE',
|
||
'ALL', 'ANY', 'ONE', 'OWN', 'NEW',
|
||
'OLD', 'BIG', 'BAD', 'ODD',
|
||
}
|
||
|
||
|
||
def normalise_region_type(region_type: str) -> str:
|
||
"""
|
||
FIX: Collapse narration → dialogue so both are treated
|
||
identically in translation, output, and rendering.
|
||
"""
|
||
if region_type == "narration":
|
||
return "dialogue"
|
||
return region_type
|
||
|
||
|
||
def xyxy_width(b):
|
||
return max(1, b[2] - b[0])
|
||
|
||
def xyxy_height(b):
|
||
return max(1, b[3] - b[1])
|
||
|
||
def xyxy_center(b):
|
||
return ((b[0] + b[2]) / 2.0, (b[1] + b[3]) / 2.0)
|
||
|
||
def box_distance(a, b):
|
||
ax, ay = xyxy_center(a)
|
||
bx, by = xyxy_center(b)
|
||
return math.hypot(ax - bx, ay - by)
|
||
|
||
def horizontal_overlap_ratio(a, b):
|
||
ix1, ix2 = max(a[0], b[0]), min(a[2], b[2])
|
||
ov = max(0, ix2 - ix1)
|
||
return ov / max(1, min(xyxy_width(a), xyxy_width(b)))
|
||
|
||
def vertical_overlap_ratio(a, b):
|
||
iy1, iy2 = max(a[1], b[1]), min(a[3], b[3])
|
||
ov = max(0, iy2 - iy1)
|
||
return ov / max(1, min(xyxy_height(a), xyxy_height(b)))
|
||
|
||
def box_expand(b, pad, iw, ih):
|
||
return (
|
||
max(0, int(b[0] - pad)),
|
||
max(0, int(b[1] - pad)),
|
||
min(iw - 1, int(b[2] + pad)),
|
||
min(ih - 1, int(b[3] + pad)),
|
||
)
|
||
|
||
def count_alpha(text):
|
||
return len(re.findall(r"[A-ZÀ-Ýa-zà-ÿ]", text or ""))
|
||
|
||
def uppercase_ratio(text):
|
||
alpha = re.findall(r"[A-Za-zÀ-ÿ]", text or "")
|
||
if not alpha:
|
||
return 0.0
|
||
ups = sum(1 for c in alpha if c.isupper())
|
||
return ups / len(alpha)
|
||
|
||
def punctuation_ratio(text):
|
||
if not text:
|
||
return 0.0
|
||
return len(re.findall(r"[!?.,'\"-]", text)) / max(1, len(text))
|
||
|
||
def stopword_ratio(text):
|
||
toks = re.findall(r"[A-Z']+", normalize_text(text or ""))
|
||
if not toks:
|
||
return 0.0
|
||
hits = sum(1 for t in toks if t in DIALOGUE_STOPWORDS)
|
||
return hits / len(toks)
|
||
|
||
def looks_like_sfx_text(text: str) -> bool:
|
||
"""
|
||
FIX: Rewritten with much stricter guards.
|
||
|
||
True SFX characteristics:
|
||
- Single token OR very short (≤ 2 words)
|
||
- No sentence-ending punctuation (. ! ?) that implies speech
|
||
- No stopwords at all
|
||
- No known proper nouns (names are dialogue, not sfx)
|
||
- Matches known sfx vocabulary OR is a pure onomatopoeia pattern
|
||
|
||
Multi-word sentences with stopwords, names, or punctuation
|
||
are NEVER sfx regardless of uppercase ratio.
|
||
"""
|
||
t = normalize_text(text or "")
|
||
if not t:
|
||
return False
|
||
|
||
alpha = re.sub(r"[^A-Z]", "", t)
|
||
words = t.split()
|
||
|
||
# Hard block: proper nouns are always dialogue
|
||
for name in KNOWN_NAMES:
|
||
if name in words:
|
||
return False
|
||
|
||
# Hard block: any stopword present → dialogue
|
||
toks = re.findall(r"[A-Z']+", t)
|
||
if any(tok in DIALOGUE_STOPWORDS for tok in toks):
|
||
return False
|
||
|
||
# Hard block: sentence punctuation implies speech
|
||
if re.search(r"[.?!,]", t) and len(words) > 2:
|
||
return False
|
||
|
||
# Hard block: more than 3 words is almost certainly dialogue
|
||
if len(words) > 3:
|
||
return False
|
||
|
||
# Exact sfx vocabulary match
|
||
if t in SFX_HINTS or alpha in SFX_HINTS:
|
||
return True
|
||
|
||
# Pure onomatopoeia: repeated consonant clusters, no vowel variety
|
||
# e.g. GRRP, THUD, WHAM, FWUP — short, no spaces, high consonant ratio
|
||
if (len(alpha) >= 2 and len(alpha) <= 8
|
||
and uppercase_ratio(t) > 0.90
|
||
and stopword_ratio(t) < 0.05
|
||
and len(words) == 1):
|
||
vowels = len(re.findall(r"[AEIOU]", alpha))
|
||
consonants = len(alpha) - vowels
|
||
# Pure sfx tends to be consonant-heavy or vowel-repetition
|
||
if consonants >= len(alpha) * 0.55:
|
||
return True
|
||
|
||
return False
|
||
|
||
def looks_like_reaction_text(text):
|
||
t = normalize_text(text or "")
|
||
alpha = re.sub(r"[^A-Z?!]", "", t)
|
||
if t in REACTION_HINTS or alpha in REACTION_HINTS:
|
||
return True
|
||
if len(re.sub(r"[^A-Z]", "", t)) <= 5 and punctuation_ratio(t) > 0.10:
|
||
return True
|
||
return False
|
||
|
||
def looks_like_narration_text(text):
|
||
t = normalize_text(text or "")
|
||
if any(t.startswith(h) for h in NARRATION_HINTS):
|
||
return True
|
||
if len(t.split()) >= 5 and t.endswith(".") and uppercase_ratio(t) > 0.75:
|
||
return True
|
||
return False
|
||
|
||
def contour_features_for_box(image_bgr, box_xyxy):
|
||
x1, y1, x2, y2 = box_xyxy
|
||
crop = image_bgr[y1:y2, x1:x2]
|
||
if crop.size == 0:
|
||
return {
|
||
"mean_brightness": 0.0,
|
||
"edge_density": 1.0,
|
||
"whiteness_ratio": 0.0,
|
||
}
|
||
|
||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
|
||
mean_brightness = float(np.mean(gray)) / 255.0
|
||
|
||
edges = cv2.Canny(gray, 50, 150)
|
||
edge_density = float(np.mean(edges > 0))
|
||
|
||
whiteness_ratio = float(np.mean(gray > 220))
|
||
return {
|
||
"mean_brightness": mean_brightness,
|
||
"edge_density": edge_density,
|
||
"whiteness_ratio": whiteness_ratio,
|
||
}
|
||
|
||
def classify_region_type(image_bgr, box_xyxy, lines):
|
||
"""
|
||
FIX: Dialogue is now the safe default.
|
||
|
||
Decision tree (in priority order):
|
||
1. sfx — only when looks_like_sfx_text() AND ≤ 3 words
|
||
2. reaction — very short (≤ 3 words), no stopwords, known reaction vocab
|
||
3. narration — rectangular banner shape + no speech punctuation
|
||
4. dialogue — everything else (DEFAULT)
|
||
|
||
Narration is immediately normalised to dialogue per project rules.
|
||
"""
|
||
text = normalize_text(" ".join(lines))
|
||
words = text.split()
|
||
word_count = len(words)
|
||
feats = contour_features_for_box(image_bgr, box_xyxy)
|
||
w, h = xyxy_width(box_xyxy), xyxy_height(box_xyxy)
|
||
ar = w / max(1, h)
|
||
|
||
# ── 1. SFX ───────────────────────────────────────────────
|
||
# Requires BOTH text hint AND short word count
|
||
if looks_like_sfx_text(text) and word_count <= 3:
|
||
return "sfx"
|
||
|
||
# ── 2. Reaction ──────────────────────────────────────────
|
||
# Very short utterances with no stopwords and reaction vocab
|
||
if (word_count <= 2
|
||
and looks_like_reaction_text(text)
|
||
and stopword_ratio(text) < 0.10):
|
||
return "reaction"
|
||
|
||
# ── 3. Narration → immediately collapsed to dialogue ─────
|
||
# Narration boxes are wide banners, no tail, rectangular
|
||
# Only fire when shape strongly suggests a caption box
|
||
is_wide_banner = ar > 3.5 and h < 60
|
||
if (is_wide_banner
|
||
and looks_like_narration_text(text)
|
||
and word_count >= 4):
|
||
return "dialogue" # normalise narration → dialogue directly
|
||
|
||
# ── 4. Dialogue (default) ────────────────────────────────
|
||
return "dialogue"
|
||
|
||
def text_similarity(a, b):
|
||
return SequenceMatcher(None, normalize_text(a or ""), normalize_text(b or "")).ratio()
|
||
|
||
def dedupe_repeated_phrase(text):
|
||
t = normalize_text(text or "")
|
||
words = t.split()
|
||
if len(words) < 4:
|
||
return t
|
||
|
||
half = len(words) // 2
|
||
if len(words) % 2 == 0 and words[:half] == words[half:]:
|
||
return " ".join(words[:half])
|
||
|
||
cleaned = []
|
||
for w in words:
|
||
if cleaned and cleaned[-1] == w and len(w) > 2:
|
||
continue
|
||
cleaned.append(w)
|
||
return " ".join(cleaned)
|
||
|
||
def dehyphenate_linebreak_artifacts(text):
|
||
t = normalize_text(text or "")
|
||
t = re.sub(r"\b([A-Z]+)- ([A-Z]+)\b", r"\1\2", t)
|
||
return t
|
||
|
||
def fix_common_dialogue_ocr(text):
|
||
t = normalize_text(text or "")
|
||
if not t:
|
||
return t
|
||
|
||
replacements = {
|
||
"1'M": "I'M",
|
||
"1 DIDN'T": "I DIDN'T",
|
||
"1 HATE": "I HATE",
|
||
"1 WAS": "I WAS",
|
||
"1'M ": "I'M ",
|
||
"YO U": "YOU",
|
||
"YOU RE": "YOU'RE",
|
||
"YOURE": "YOU'RE",
|
||
"I LL": "I'LL",
|
||
"ILL ": "I'LL ",
|
||
"DONT": "DON'T",
|
||
"DIDNT": "DIDN'T",
|
||
"CANT": "CAN'T",
|
||
"WONT": "WON'T",
|
||
"THATS": "THAT'S",
|
||
"MOMS": "MOM'S",
|
||
"DADS": "DAD'S",
|
||
"LEARN- ING": "LEARNING",
|
||
"COV- ERED": "COVERED",
|
||
"SY ON": "SY-ON",
|
||
"P PROPERLY": "P-PROPERLY",
|
||
"SH SHUT": "SH- SHUT",
|
||
}
|
||
|
||
for a, b in replacements.items():
|
||
t = t.replace(a, b)
|
||
|
||
t = re.sub(r"\b([A-Z]+) NT\b", r"\1N'T", t)
|
||
t = re.sub(r"\b([A-Z]+) RE\b", r"\1'RE", t)
|
||
t = re.sub(r"\b([A-Z]+) VE\b", r"\1'VE", t)
|
||
t = re.sub(r"\b([A-Z]+) LL\b", r"\1'LL", t)
|
||
t = re.sub(r"\b([A-Z]+) S\b", r"\1'S", t)
|
||
|
||
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
|
||
t = dehyphenate_linebreak_artifacts(t)
|
||
t = dedupe_repeated_phrase(t)
|
||
|
||
words = t.split()
|
||
cleaned = []
|
||
for w in words:
|
||
if cleaned and cleaned[-1] == w and len(re.sub(r"[^A-Z]", "", w)) > 2:
|
||
continue
|
||
cleaned.append(w)
|
||
t = " ".join(cleaned)
|
||
|
||
t = re.sub(r"\s{2,}", " ", t).strip()
|
||
|
||
return t
|
||
|
||
def region_text_role_hint(text: str) -> str:
|
||
"""
|
||
FIX: Mirrors the stricter classify_region_type() logic for
|
||
use in grouping/scoring where image features are unavailable.
|
||
Narration collapses to dialogue.
|
||
"""
|
||
words = normalize_text(text or "").split()
|
||
|
||
if looks_like_sfx_text(text) and len(words) <= 3:
|
||
return "sfx"
|
||
|
||
if (len(words) <= 2
|
||
and looks_like_reaction_text(text)
|
||
and stopword_ratio(text) < 0.10):
|
||
return "reaction"
|
||
|
||
# narration → dialogue
|
||
return "dialogue"
|
||
|
||
|
||
def correct_region_text(text, region_type="dialogue"):
|
||
t = normalize_text(text or "")
|
||
if not t:
|
||
return t, 0.0
|
||
|
||
original = t
|
||
|
||
if region_type in {"dialogue", "reaction", "narration"}:
|
||
t = fix_common_dialogue_ocr(t)
|
||
elif region_type == "sfx":
|
||
t = dedupe_repeated_phrase(t)
|
||
|
||
score_before = ocr_candidate_score(original)
|
||
score_after = ocr_candidate_score(t)
|
||
|
||
correction_gain = max(0.0, score_after - score_before)
|
||
return t, correction_gain
|
||
|
||
def compute_region_confidence(raw_text, corrected_text, box_xyxy, region_type, image_bgr):
|
||
feats = contour_features_for_box(image_bgr, box_xyxy)
|
||
text_score = ocr_candidate_score(corrected_text)
|
||
gain = max(0.0, text_score - ocr_candidate_score(raw_text))
|
||
role_bonus = 0.08 if region_type in {"dialogue", "reaction", "narration", "sfx"} else 0.0
|
||
|
||
score = (
|
||
0.55 * text_score +
|
||
0.15 * feats["whiteness_ratio"] +
|
||
0.10 * (1.0 - min(1.0, feats["edge_density"] * 2.0)) +
|
||
0.10 * gain +
|
||
role_bonus
|
||
)
|
||
return max(0.0, min(1.0, score))
|
||
|
||
def build_region_flags(raw_text, corrected_text, region_type, conf):
|
||
flags = []
|
||
if region_type == "unknown":
|
||
flags.append("REGION_UNKNOWN")
|
||
if region_type == "sfx":
|
||
flags.append("SFX")
|
||
if conf < 0.45:
|
||
flags.append("LOW_CONF")
|
||
if text_similarity(raw_text, corrected_text) < 0.75:
|
||
flags.append("HEAVY_CORRECTION")
|
||
if len(corrected_text.split()) > 22:
|
||
flags.append("LONG_TEXT")
|
||
return flags
|
||
|
||
# ============================================================
|
||
# HELPERS
|
||
# ============================================================
|
||
def normalize_text(text: str) -> str:
|
||
t = (text or "").strip().upper()
|
||
t = t.replace("\u201c", "\"").replace("\u201d", "\"")
|
||
t = t.replace("\u2018", "'").replace("\u2019", "'")
|
||
t = t.replace("\u2026", "...")
|
||
t = re.sub(r"\s+", " ", t)
|
||
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
|
||
t = re.sub(r"([¡¿])\s+", r"\1", t)
|
||
t = re.sub(r"\(\s+", "(", t)
|
||
t = re.sub(r"\s+\)", ")", t)
|
||
t = re.sub(r"\.{4,}", "...", t)
|
||
return t.strip()
|
||
|
||
def postprocess_translation_general(text: str) -> str:
|
||
t = normalize_text(text)
|
||
t = re.sub(r"\s{2,}", " ", t).strip()
|
||
t = re.sub(r"([!?]){3,}", r"\1\1", t)
|
||
t = re.sub(r"\.{4,}", "...", t)
|
||
return t
|
||
|
||
def fix_common_ocr_errors(text: str) -> str:
|
||
result = text
|
||
result = re.sub(r'(\d)O(\d)', r'\g<1>0\g<2>', result)
|
||
result = re.sub(r'(\d)O([^a-zA-Z])', r'\g<1>0\g<2>', result)
|
||
result = result.replace('|', 'I')
|
||
result = result.replace('`', "'")
|
||
return result
|
||
|
||
def is_valid_language(text: str, source_lang: str) -> bool:
|
||
if not text:
|
||
return False
|
||
clean_text = re.sub(r'[^\w]', '', text)
|
||
if not clean_text:
|
||
return False
|
||
lang = source_lang.lower()
|
||
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
|
||
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
|
||
foreign_chars = len(re.findall(
|
||
r'[\u0600-\u06FF\u0750-\u077F\u3040-\u30FF'
|
||
r'\u3400-\u4DBF\u4E00-\u9FFF\uAC00-\uD7AF\u1100-\u11FF]',
|
||
clean_text))
|
||
if foreign_chars > 0:
|
||
return False
|
||
latin_chars = len(re.findall(r'[a-zA-ZÀ-ÿ]', clean_text))
|
||
total = len(clean_text)
|
||
if total <= 3:
|
||
return latin_chars >= 1
|
||
if total <= 6:
|
||
return (latin_chars / total) >= 0.55
|
||
return (latin_chars / total) >= 0.45
|
||
elif lang in ['ja', 'japanese']:
|
||
ja_chars = len(re.findall(r'[\u3040-\u30FF\u3400-\u4DBF\u4E00-\u9FFF]', clean_text))
|
||
if len(clean_text) <= 3:
|
||
return ja_chars >= 1
|
||
return (ja_chars / len(clean_text)) >= 0.4
|
||
elif lang in ['ko', 'korean']:
|
||
ko_chars = len(re.findall(r'[\uAC00-\uD7AF\u1100-\u11FF]', clean_text))
|
||
if len(clean_text) <= 3:
|
||
return ko_chars >= 1
|
||
return (ko_chars / len(clean_text)) >= 0.4
|
||
elif lang in ['zh', 'chinese']:
|
||
zh_chars = len(re.findall(r'[\u4E00-\u9FFF\u3400-\u4DBF]', clean_text))
|
||
if len(clean_text) <= 3:
|
||
return zh_chars >= 1
|
||
return (zh_chars / len(clean_text)) >= 0.4
|
||
return True
|
||
|
||
|
||
_NOISE_TOKENS = {
|
||
'P', 'F', 'N', 'M', 'X', 'Z', 'Q',
|
||
'FN', 'PF', 'NM', 'XZ', 'FSHOO', 'GRRP',
|
||
}
|
||
|
||
_MANGA_INTERJECTIONS = {
|
||
'HUH', 'HUH?', 'HUH??', 'HUH?!',
|
||
'OH', 'OH!', 'OOH', 'OOH!',
|
||
'AH', 'AH!', 'UH', 'UH...',
|
||
'HEY', 'HEY!',
|
||
'EH', 'EH?',
|
||
'WOW', 'WOW!',
|
||
'YES', 'NO', 'NO!',
|
||
'RUN', 'GO', 'GO!',
|
||
'STOP', 'WAIT',
|
||
'WHAT', 'WHAT?', 'WHAT?!',
|
||
'WHY', 'WHY?',
|
||
'HOW', 'HOW?',
|
||
'OK', 'OK!', 'OKAY',
|
||
'EEEEP', 'EEEP',
|
||
'OMIGOSH',
|
||
'BECKY', 'BECKY!',
|
||
'HMM', 'HMM...',
|
||
'TSK', 'TCH',
|
||
'GRRR','I','A',
|
||
'FWUP', 'FWAP',
|
||
'SHIVER',
|
||
'RRRING',
|
||
'MORNING', 'MORNING.',
|
||
}
|
||
|
||
def group_indices_into_vertical_columns(indices, ocr,
|
||
x_tolerance_factor=1.4,
|
||
min_vertical_span_factor=1.8):
|
||
if not indices:
|
||
return []
|
||
|
||
items = []
|
||
for i in indices:
|
||
b = quad_bbox(ocr[i][0])
|
||
cx = (b[0] + b[2]) / 2.0
|
||
cy = (b[1] + b[3]) / 2.0
|
||
w = max(1, b[2] - b[0])
|
||
h = max(1, b[3] - b[1])
|
||
items.append((i, b, cx, cy, w, h))
|
||
|
||
med_w = float(np.median([it[4] for it in items])) if items else 12.0
|
||
med_h = float(np.median([it[5] for it in items])) if items else 12.0
|
||
x_tol = max(10.0, med_w * x_tolerance_factor)
|
||
|
||
items_sorted = sorted(items, key=lambda x: x[2])
|
||
columns = []
|
||
|
||
for it in items_sorted:
|
||
placed = False
|
||
for col in columns:
|
||
if abs(it[2] - col["xc"]) <= x_tol:
|
||
col["members"].append(it)
|
||
col["xc"] = float(np.mean([m[2] for m in col["members"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
columns.append({"xc": it[2], "members": [it]})
|
||
|
||
clean_columns = []
|
||
for col in columns:
|
||
members = sorted(col["members"], key=lambda x: x[3])
|
||
ys = [m[3] for m in members]
|
||
vertical_span = max(ys) - min(ys) if len(ys) > 1 else 0.0
|
||
|
||
if len(members) >= 2 or vertical_span >= med_h * min_vertical_span_factor:
|
||
clean_columns.append([m[0] for m in members])
|
||
else:
|
||
clean_columns.append([m[0] for m in members])
|
||
|
||
clean_columns.sort(key=lambda grp: np.mean([(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in grp]))
|
||
return clean_columns
|
||
|
||
def group_indices_into_horizontal_rows(indices, ocr, row_tol_factor=0.75):
|
||
if not indices:
|
||
return []
|
||
|
||
items = []
|
||
for i in indices:
|
||
b = quad_bbox(ocr[i][0])
|
||
cx = (b[0] + b[2]) / 2.0
|
||
cy = (b[1] + b[3]) / 2.0
|
||
h = max(1, b[3] - b[1])
|
||
items.append((i, b, cx, cy, h))
|
||
|
||
med_h = float(np.median([it[4] for it in items])) if items else 10.0
|
||
row_tol = max(6.0, med_h * row_tol_factor)
|
||
|
||
items.sort(key=lambda x: x[3])
|
||
rows = []
|
||
|
||
for it in items:
|
||
placed = False
|
||
for row in rows:
|
||
if abs(it[3] - row["yc"]) <= row_tol:
|
||
row["members"].append(it)
|
||
row["yc"] = float(np.mean([m[3] for m in row["members"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"yc": it[3], "members": [it]})
|
||
|
||
groups = []
|
||
for row in rows:
|
||
members = sorted(row["members"], key=lambda x: x[2])
|
||
groups.append([m[0] for m in members])
|
||
|
||
return groups
|
||
|
||
def score_text_groups(groups, ocr):
|
||
if not groups:
|
||
return 0.0
|
||
|
||
texts = []
|
||
lengths = []
|
||
|
||
for grp in groups:
|
||
parts = []
|
||
for i in grp:
|
||
t = normalize_text(ocr[i][1])
|
||
if t:
|
||
parts.append(t)
|
||
txt = normalize_text(" ".join(parts))
|
||
if txt:
|
||
texts.append(txt)
|
||
lengths.append(len(txt.split()))
|
||
|
||
if not texts:
|
||
return 0.0
|
||
|
||
text_scores = [ocr_candidate_score(t) for t in texts]
|
||
avg_text_score = float(np.mean(text_scores)) if text_scores else 0.0
|
||
avg_len = float(np.mean(lengths)) if lengths else 0.0
|
||
fragmentation_penalty = max(0.0, len(groups) - 4) * 0.08
|
||
|
||
return avg_text_score + min(0.5, avg_len * 0.05) - fragmentation_penalty
|
||
|
||
def detect_internal_text_layout(indices, ocr, reading_mode="ltr"):
|
||
if not indices:
|
||
return {"mode": "horizontal", "blocks": []}
|
||
|
||
blocks = split_indices_into_vertical_blocks(indices, ocr)
|
||
|
||
resolved_blocks = []
|
||
|
||
for block in blocks:
|
||
horizontal_groups = group_indices_into_horizontal_rows(block, ocr)
|
||
vertical_groups = group_indices_into_vertical_columns(block, ocr)
|
||
|
||
h_score = score_text_groups(horizontal_groups, ocr)
|
||
v_score = score_text_groups(vertical_groups, ocr)
|
||
|
||
if len(vertical_groups) >= 2 and v_score >= h_score - 0.03:
|
||
resolved_blocks.append({
|
||
"mode": "vertical",
|
||
"groups": vertical_groups
|
||
})
|
||
else:
|
||
resolved_blocks.append({
|
||
"mode": "horizontal",
|
||
"groups": horizontal_groups
|
||
})
|
||
|
||
return {"mode": "block-mixed", "blocks": resolved_blocks}
|
||
|
||
|
||
def build_text_from_layout(indices, ocr, reading_mode="ltr"):
|
||
layout = detect_internal_text_layout(indices, ocr, reading_mode=reading_mode)
|
||
output_lines = []
|
||
|
||
for block in layout["blocks"]:
|
||
groups = block["groups"]
|
||
mode = block["mode"]
|
||
|
||
if mode == "horizontal":
|
||
for grp in groups:
|
||
line = normalize_text(" ".join(
|
||
ocr[i][1] for i in grp if normalize_text(ocr[i][1])
|
||
))
|
||
if line:
|
||
output_lines.append(line)
|
||
|
||
elif mode == "vertical":
|
||
if reading_mode == "rtl":
|
||
groups = sorted(
|
||
groups,
|
||
key=lambda grp: np.mean([(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in grp]),
|
||
reverse=True
|
||
)
|
||
else:
|
||
groups = sorted(
|
||
groups,
|
||
key=lambda grp: np.mean([(quad_bbox(ocr[i][0])[0] + quad_bbox(ocr[i][0])[2]) / 2.0 for i in grp])
|
||
)
|
||
|
||
for grp in groups:
|
||
grp_sorted = sorted(grp, key=lambda i: (quad_bbox(ocr[i][0])[1] + quad_bbox(ocr[i][0])[3]) / 2.0)
|
||
line = normalize_text(" ".join(
|
||
ocr[i][1] for i in grp_sorted if normalize_text(ocr[i][1])
|
||
))
|
||
if line:
|
||
output_lines.append(line)
|
||
|
||
return output_lines
|
||
|
||
# ============================================================
|
||
# FIX: BUBBLE CONTOUR MEMBERSHIP CACHE
|
||
# Pre-compute which speech-bubble contour each OCR quad belongs to
|
||
# so that two quads in *different* contours are NEVER merged.
|
||
# ============================================================
|
||
|
||
def build_quad_to_bubble_map(ocr: list, bubble_contours: list) -> Dict[int, int]:
|
||
"""
|
||
Returns a dict {ocr_index -> bubble_contour_index}
|
||
OCR quads that fall outside every contour get value -1.
|
||
"""
|
||
mapping: Dict[int, int] = {}
|
||
for idx in range(len(ocr)):
|
||
bbox = quad_bbox(ocr[idx][0])
|
||
cx = (bbox[0] + bbox[2]) / 2.0
|
||
cy = (bbox[1] + bbox[3]) / 2.0
|
||
assigned = -1
|
||
for cidx, contour in enumerate(bubble_contours):
|
||
if cv2.pointPolygonTest(contour, (float(cx), float(cy)), False) >= 0:
|
||
assigned = cidx
|
||
break
|
||
mapping[idx] = assigned
|
||
return mapping
|
||
|
||
|
||
def same_bubble_contour(idx_a: int, idx_b: int,
|
||
quad_to_bubble: Dict[int, int]) -> bool:
|
||
"""
|
||
Returns True only when both quads are inside the SAME detected contour.
|
||
Two quads that are both 'outside' (-1) are treated as potentially
|
||
different regions (conservative).
|
||
"""
|
||
ca = quad_to_bubble.get(idx_a, -1)
|
||
cb = quad_to_bubble.get(idx_b, -1)
|
||
if ca == -1 or cb == -1:
|
||
return False # unknown → don't force-merge
|
||
return ca == cb
|
||
|
||
|
||
# ============================================================
|
||
# REGION PROPOSAL FROM OCR GEOMETRY (FIXED)
|
||
# ============================================================
|
||
def propose_text_regions_from_ocr(ocr, image_shape, image_bgr=None):
|
||
"""
|
||
Build larger text containers from OCR boxes before final classification.
|
||
|
||
FIX 1: Tightened proximity thresholds so quads from adjacent speech
|
||
bubbles are not merged.
|
||
FIX 2: When image_bgr is supplied, pre-compute bubble contours and
|
||
refuse to merge two quads that belong to *different* contours.
|
||
"""
|
||
ih, iw = image_shape[:2]
|
||
if not ocr:
|
||
return {}, {}, {}, {}
|
||
|
||
boxes = [quad_bbox(x[0]) for x in ocr]
|
||
hs = [max(1, b[3] - b[1]) for b in boxes]
|
||
med_h = float(np.median(hs)) if hs else 14.0
|
||
|
||
# FIX: build contour membership map when image is available
|
||
quad_to_bubble: Dict[int, int] = {}
|
||
if image_bgr is not None:
|
||
bubble_contours = detect_speech_bubbles(image_bgr)
|
||
quad_to_bubble = build_quad_to_bubble_map(ocr, bubble_contours)
|
||
|
||
parent = list(range(len(ocr)))
|
||
|
||
def find(x):
|
||
while parent[x] != x:
|
||
parent[x] = parent[parent[x]]
|
||
x = parent[x]
|
||
return x
|
||
|
||
def union(a, b):
|
||
ra, rb = find(a), find(b)
|
||
if ra != rb:
|
||
parent[rb] = ra
|
||
|
||
for i in range(len(ocr)):
|
||
bi = boxes[i]
|
||
for j in range(i + 1, len(ocr)):
|
||
bj = boxes[j]
|
||
|
||
# FIX: hard-block merging quads from different contours
|
||
if quad_to_bubble and not same_bubble_contour(i, j, quad_to_bubble):
|
||
continue
|
||
|
||
dx = abs(xyxy_center(bi)[0] - xyxy_center(bj)[0])
|
||
dy = abs(xyxy_center(bi)[1] - xyxy_center(bj)[1])
|
||
|
||
hov = horizontal_overlap_ratio(bi, bj)
|
||
vov = vertical_overlap_ratio(bi, bj)
|
||
dist = box_distance(bi, bj)
|
||
|
||
# FIX: tightened from med_h*2.2 → med_h*1.4
|
||
same_band = dy <= med_h * 1.4
|
||
# FIX: tightened from med_h*3.2 → med_h*2.0
|
||
stacked = hov >= 0.35 and dy <= med_h * 2.0
|
||
# FIX: tightened from med_h*5.0 → med_h*3.5
|
||
same_line = vov >= 0.45 and dx <= med_h * 3.5
|
||
# FIX: tightened from med_h*4.5 → med_h*2.8
|
||
near = dist <= med_h * 2.8
|
||
|
||
if same_line or stacked or (near and (same_band or hov > 0.25)):
|
||
if orientation_compatible(i, j, ocr):
|
||
union(i, j)
|
||
|
||
groups = {}
|
||
for i in range(len(ocr)):
|
||
groups.setdefault(find(i), []).append(i)
|
||
|
||
region_lines = {}
|
||
region_boxes = {}
|
||
region_quads = {}
|
||
region_indices = {}
|
||
next_id = 1
|
||
|
||
for _, idxs in sorted(groups.items(), key=lambda kv: min(boxes[i][1] for i in kv[1])):
|
||
idxs = sorted(idxs, key=lambda i: (boxes[i][1], boxes[i][0]))
|
||
ub = boxes_union_xyxy([boxes[i] for i in idxs])
|
||
if ub is None:
|
||
continue
|
||
region_lines[next_id] = build_lines_from_indices(idxs, ocr)
|
||
region_boxes[next_id] = box_expand(ub, pad=max(2, int(med_h * 0.25)), iw=iw, ih=ih)
|
||
region_quads[next_id] = [ocr[i][0] for i in idxs]
|
||
region_indices[next_id] = idxs
|
||
next_id += 1
|
||
|
||
return region_lines, region_boxes, region_quads, region_indices
|
||
|
||
# ============================================================
|
||
# RECONCILE REGION-FIRST AND BUBBLE-FIRST GROUPS (FIXED)
|
||
# ============================================================
|
||
def reconcile_region_and_bubble_groups(region_lines, region_boxes, region_quads, region_indices,
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices,
|
||
ocr):
|
||
"""
|
||
Reconcile region-first and bubble-first groupings.
|
||
|
||
FIX: Tightened overlap/IoU thresholds so that spatially adjacent but
|
||
semantically distinct boxes are no longer collapsed.
|
||
overlap_ratio: 0.55 → 0.70
|
||
iou: 0.35 → 0.45
|
||
shared indices: still triggers merge (correct behaviour)
|
||
"""
|
||
combined = []
|
||
|
||
for rid in region_boxes:
|
||
combined.append(("region", rid, region_boxes[rid], region_indices[rid]))
|
||
|
||
for bid in bubble_boxes:
|
||
combined.append(("bubble", bid, bubble_boxes[bid], bubble_indices[bid]))
|
||
|
||
if not combined:
|
||
return {}, {}, {}, {}
|
||
|
||
visited = set()
|
||
kept = []
|
||
|
||
def group_score(box, idxs):
|
||
text = normalize_text(" ".join(build_lines_from_indices(idxs, ocr)))
|
||
role = region_text_role_hint(text)
|
||
|
||
role_bonus = {
|
||
"dialogue": 0.8,
|
||
"narration": 0.75,
|
||
"reaction": 0.7,
|
||
"sfx": 0.2,
|
||
"unknown": 0.1
|
||
}.get(role, 0.1)
|
||
|
||
box_area = bbox_area_xyxy(box)
|
||
area_bonus = min(1.0, box_area / 50000.0)
|
||
|
||
return (
|
||
len(idxs) * 2.0 +
|
||
min(20, len(text.split())) * 0.5 +
|
||
min(1.0, ocr_candidate_score(text)) +
|
||
role_bonus +
|
||
area_bonus * 0.25
|
||
)
|
||
|
||
for i in range(len(combined)):
|
||
if i in visited:
|
||
continue
|
||
|
||
cluster = [i]
|
||
visited.add(i)
|
||
|
||
_, _, box_i, idx_i = combined[i]
|
||
|
||
for j in range(i + 1, len(combined)):
|
||
if j in visited:
|
||
continue
|
||
|
||
_, _, box_j, idx_j = combined[j]
|
||
|
||
ovs = boxes_overlap_ratio(box_i, box_j)
|
||
iou = boxes_iou(box_i, box_j)
|
||
shared = len(set(idx_i).intersection(idx_j))
|
||
|
||
# FIX: raised thresholds — only collapse truly overlapping boxes
|
||
if ovs >= 0.70 or iou >= 0.45 or shared > 0:
|
||
cluster.append(j)
|
||
visited.add(j)
|
||
|
||
best_idx = max(
|
||
cluster,
|
||
key=lambda k: group_score(combined[k][2], combined[k][3])
|
||
)
|
||
kept.append(combined[best_idx])
|
||
|
||
kept.sort(key=lambda item: (
|
||
(item[2][1] + item[2][3]) / 2.0,
|
||
(item[2][0] + item[2][2]) / 2.0
|
||
))
|
||
|
||
out_lines, out_boxes, out_quads, out_indices = {}, {}, {}, {}
|
||
next_id = 1
|
||
|
||
for typ, oid, box, idxs in kept:
|
||
idxs = sorted(
|
||
set(idxs),
|
||
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
|
||
)
|
||
|
||
out_lines[next_id] = build_lines_from_indices(idxs, ocr)
|
||
out_boxes[next_id] = box
|
||
out_quads[next_id] = [ocr[k][0] for k in idxs]
|
||
out_indices[next_id] = idxs
|
||
next_id += 1
|
||
|
||
return out_lines, out_boxes, out_quads, out_indices
|
||
|
||
# ============================================================
|
||
# PROTECTED TOKENS / SHORT DIALOGUE SAFETY NET
|
||
# ============================================================
|
||
PROTECTED_SHORT_TOKENS = {
|
||
"HUH", "HUH?", "HUH??", "HUH?!",
|
||
"OH", "OH!", "OOH", "OOH!",
|
||
"AH", "AH!", "UH", "UH...",
|
||
"HEY", "HEY!", "EH", "EH?",
|
||
"WOW", "WOW!",
|
||
"MORNING", "MORNING.",
|
||
"BECKY", "BECKY!",
|
||
"DAMIAN", "CECILE", "WALD",
|
||
"OMIGOSH", "EEEP", "EEEEP"
|
||
}
|
||
|
||
KNOWN_NAMES = {
|
||
"BECKY", "DAMIAN", "CECILE", "WALD"
|
||
}
|
||
|
||
def is_protected_token(text: str) -> bool:
|
||
t = normalize_text(text or "")
|
||
if not t:
|
||
return False
|
||
if t in PROTECTED_SHORT_TOKENS:
|
||
return True
|
||
t_alpha = re.sub(r'[^A-ZÀ-Ý]', '', t)
|
||
return t_alpha in PROTECTED_SHORT_TOKENS
|
||
|
||
def maybe_conf_floor_for_protected(text: str, conf: float, floor: float = 0.40) -> float:
|
||
if is_protected_token(text):
|
||
return max(conf, floor)
|
||
return conf
|
||
|
||
def is_meaningful_text(text: str, source_lang: str, min_alpha_chars: int = 2) -> bool:
|
||
if not text:
|
||
return False
|
||
|
||
t = text.strip()
|
||
t_upper = normalize_text(t)
|
||
|
||
# ── FIX: ALL protection checks run BEFORE any length gate ──
|
||
# Order matters: shortest/most fragile tokens must be
|
||
# protected first so they never reach the discard logic.
|
||
|
||
# 1. Common 1–2 char English words
|
||
lang = source_lang.lower()
|
||
if lang in {"en", "english"} and t_upper in SHORT_ENGLISH_PROTECTED:
|
||
return True
|
||
|
||
# 2. Explicitly protected tokens (names, interjections)
|
||
if is_protected_token(t_upper):
|
||
return True
|
||
|
||
# 3. Manga interjections and sentence starters
|
||
t_alpha_only = re.sub(r'[^A-Za-zÀ-ÿ]', '', t_upper)
|
||
if t_upper in _MANGA_INTERJECTIONS or t_alpha_only in _MANGA_INTERJECTIONS:
|
||
return True
|
||
|
||
# 4. Short punctuated utterances like "Huh?" / "Oh!"
|
||
if re.fullmatch(r"[A-Za-zÀ-ÿ]{1,6}[!?\\.]{1,3}", t.strip()):
|
||
return True
|
||
|
||
# ── Now apply the alpha character count gate ───────────────
|
||
alpha_count = sum(c.isalpha() for c in t)
|
||
if alpha_count < min_alpha_chars:
|
||
return False
|
||
|
||
# ── Noise token blocklist ──────────────────────────────────
|
||
if t_upper in _NOISE_TOKENS:
|
||
return False
|
||
|
||
# ── Non-Latin character ratio check ───────────────────────
|
||
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
|
||
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
|
||
non_alpha = sum(not c.isalpha() for c in t)
|
||
if len(t) > 0 and (non_alpha / len(t)) > 0.72:
|
||
return False
|
||
|
||
# ── Repeated single character (e.g. "AAAA") ───────────────
|
||
if len(t) >= 3 and len(set(t_upper)) == 1:
|
||
return False
|
||
|
||
# ── No vowels in a long word → likely noise ────────────────
|
||
if lang in ['en', 'english', 'es', 'spanish', 'fr', 'french',
|
||
'it', 'italian', 'ca', 'catalan', 'de', 'german']:
|
||
if len(t) > 5:
|
||
vowels = len(re.findall(r'[AEIOUaeiouÀ-ÿ]', t))
|
||
if vowels == 0:
|
||
return False
|
||
|
||
return True
|
||
|
||
def quad_bbox(quad):
|
||
xs = [p[0] for p in quad]
|
||
ys = [p[1] for p in quad]
|
||
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
|
||
|
||
def quad_center(quad):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
|
||
|
||
def boxes_union_xyxy(boxes):
|
||
boxes = [b for b in boxes if b is not None]
|
||
if not boxes:
|
||
return None
|
||
return (
|
||
int(min(b[0] for b in boxes)),
|
||
int(min(b[1] for b in boxes)),
|
||
int(max(b[2] for b in boxes)),
|
||
int(max(b[3] for b in boxes)),
|
||
)
|
||
|
||
def bbox_area_xyxy(b):
|
||
if b is None:
|
||
return 0
|
||
return int(max(0, b[2] - b[0]) * max(0, b[3] - b[1]))
|
||
|
||
def xyxy_to_xywh(b):
|
||
if b is None:
|
||
return None
|
||
x1, y1, x2, y2 = b
|
||
return {"x": int(x1), "y": int(y1),
|
||
"w": int(max(0, x2 - x1)), "h": int(max(0, y2 - y1))}
|
||
|
||
def overlap_or_near(a, b, gap=0):
|
||
ax1, ay1, ax2, ay2 = a
|
||
bx1, by1, bx2, by2 = b
|
||
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
|
||
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
|
||
return gap_x <= gap and gap_y <= gap
|
||
|
||
def boxes_iou(a, b):
|
||
ax1, ay1, ax2, ay2 = a
|
||
bx1, by1, bx2, by2 = b
|
||
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
||
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
||
if inter == 0:
|
||
return 0.0
|
||
area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1)
|
||
area_b = max(0, bx2 - bx1) * max(0, by2 - by1)
|
||
return inter / max(1, area_a + area_b - inter)
|
||
|
||
def boxes_overlap_ratio(a, b):
|
||
"""Ratio of intersection to the SMALLER box area."""
|
||
ax1, ay1, ax2, ay2 = a
|
||
bx1, by1, bx2, by2 = b
|
||
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
||
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
||
if inter == 0:
|
||
return 0.0
|
||
area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1)
|
||
area_b = max(0, bx2 - bx1) * max(0, by2 - by1)
|
||
return inter / max(1, min(area_a, area_b))
|
||
|
||
def ocr_candidate_score(text: str) -> float:
|
||
if not text:
|
||
return 0.0
|
||
t = text.strip()
|
||
n = len(t)
|
||
if n == 0:
|
||
return 0.0
|
||
alpha = sum(c.isalpha() for c in t) / n
|
||
spaces = sum(c.isspace() for c in t) / n
|
||
punct_ok = sum(c in ".,!?'-:;()[]\"¡¿" for c in t) / n
|
||
bad = len(re.findall(r"[^\w\s\.\,\!\?\-\'\:\;\(\)\[\]\"¡¿]", t)) / n
|
||
penalty = 0.0
|
||
if re.search(r"\b[A-Z]\b", t):
|
||
penalty += 0.05
|
||
if re.search(r"[0-9]{2,}", t):
|
||
penalty += 0.08
|
||
score = (0.62 * alpha) + (0.10 * spaces) + (0.20 * punct_ok) - (0.45 * bad) - penalty
|
||
return max(0.0, min(1.0, score))
|
||
|
||
def quad_is_horizontal(quad, ratio_threshold=1.5) -> bool:
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return (max(1, x2 - x1) / max(1, y2 - y1)) >= ratio_threshold
|
||
|
||
def quad_is_vertical(quad, ratio_threshold=1.5) -> bool:
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return (max(1, y2 - y1) / max(1, x2 - x1)) >= ratio_threshold
|
||
|
||
|
||
# ============================================================
|
||
# ENHANCED IMAGE PREPROCESSING
|
||
# ============================================================
|
||
def enhance_image_for_ocr(image_bgr, upscale_factor=2.5):
|
||
h, w = image_bgr.shape[:2]
|
||
upscaled = cv2.resize(image_bgr, (int(w * upscale_factor), int(h * upscale_factor)),
|
||
interpolation=cv2.INTER_CUBIC)
|
||
gray = cv2.cvtColor(upscaled, cv2.COLOR_BGR2GRAY)
|
||
denoised = cv2.fastNlMeansDenoising(gray, None, h=10,
|
||
templateWindowSize=7, searchWindowSize=21)
|
||
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
||
enhanced = clahe.apply(denoised)
|
||
sharpened = cv2.filter2D(enhanced, -1,
|
||
np.array([[-1,-1,-1],[-1,9,-1],[-1,-1,-1]]))
|
||
binary = cv2.adaptiveThreshold(sharpened, 255,
|
||
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
cv2.THRESH_BINARY, 11, 2)
|
||
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, np.ones((2, 2), np.uint8))
|
||
return cv2.cvtColor(cleaned, cv2.COLOR_GRAY2BGR)
|
||
|
||
def detect_small_text_regions(image_bgr, existing_quads):
|
||
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
||
mask = np.zeros(gray.shape, dtype=np.uint8)
|
||
for quad in existing_quads:
|
||
cv2.fillPoly(mask, [np.array(quad, dtype=np.int32)], 255)
|
||
mask_inv = cv2.bitwise_not(mask)
|
||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
binary_masked = cv2.bitwise_and(binary, binary, mask=mask_inv)
|
||
contours, _ = cv2.findContours(binary_masked, cv2.RETR_EXTERNAL,
|
||
cv2.CHAIN_APPROX_SIMPLE)
|
||
text_regions = []
|
||
for contour in contours:
|
||
x, y, w, h = cv2.boundingRect(contour)
|
||
area = w * h
|
||
if 50 < area < 5000 and 0.1 < h / max(w, 1) < 10:
|
||
text_regions.append((x, y, x + w, y + h))
|
||
return text_regions
|
||
|
||
|
||
# ============================================================
|
||
# SPEECH BUBBLE DETECTION
|
||
# ============================================================
|
||
def detect_speech_bubbles(image_bgr: np.ndarray) -> List[np.ndarray]:
|
||
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
||
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
cv2.THRESH_BINARY_INV, 11, 2)
|
||
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
return [c for c in contours if cv2.contourArea(c) > 500]
|
||
|
||
def is_quad_in_bubble(quad_bbox_xyxy, bubble_contour, tolerance=5):
|
||
x1, y1, x2, y2 = quad_bbox_xyxy
|
||
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
|
||
return cv2.pointPolygonTest(bubble_contour, (float(cx), float(cy)), False) >= -tolerance
|
||
|
||
def split_indices_by_bubble(indices, ocr, bubble_contours):
|
||
if not indices:
|
||
return []
|
||
bubble_groups, outside_group = {}, []
|
||
for idx in indices:
|
||
bbox = quad_bbox(ocr[idx][0])
|
||
found = False
|
||
for bidx, bubble in enumerate(bubble_contours):
|
||
if is_quad_in_bubble(bbox, bubble):
|
||
bubble_groups.setdefault(bidx, []).append(idx)
|
||
found = True
|
||
break
|
||
if not found:
|
||
outside_group.append(idx)
|
||
result = list(bubble_groups.values())
|
||
if outside_group:
|
||
result.append(outside_group)
|
||
return result
|
||
|
||
def check_vertical_alignment_split(indices, ocr, threshold=20):
|
||
if len(indices) <= 1:
|
||
return [indices]
|
||
items = sorted([(idx, quad_bbox(ocr[idx][0])) for idx in indices],
|
||
key=lambda x: x[1][1])
|
||
groups, current_group = [], [items[0][0]]
|
||
for i in range(1, len(items)):
|
||
if items[i][1][1] - items[i-1][1][3] > threshold:
|
||
groups.append(current_group)
|
||
current_group = [items[i][0]]
|
||
else:
|
||
current_group.append(items[i][0])
|
||
if current_group:
|
||
groups.append(current_group)
|
||
return groups
|
||
|
||
|
||
# ============================================================
|
||
# QUAD SIZE VALIDATION AND SPLITTING
|
||
# ============================================================
|
||
def is_quad_oversized(quad, median_height, width_threshold=8.0):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
w, h = x2 - x1, max(1, y2 - y1)
|
||
return w > median_height * width_threshold or w / h > 12.0
|
||
|
||
def split_oversized_quad_by_content(image_bgr, quad, text, conf, median_height):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
w, h = x2 - x1, max(1, y2 - y1)
|
||
pad = 2
|
||
roi = image_bgr[max(0,y1-pad):min(image_bgr.shape[0],y2+pad),
|
||
max(0,x1):min(image_bgr.shape[1],x2)]
|
||
if roi.size == 0:
|
||
return [(quad, text, conf)]
|
||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
v_proj = np.sum(binary, axis=0)
|
||
gap_threshold = h * 255 * 0.20
|
||
gaps, in_gap, gap_start = [], False, 0
|
||
for x in range(len(v_proj)):
|
||
if v_proj[x] < gap_threshold:
|
||
if not in_gap: gap_start, in_gap = x, True
|
||
else:
|
||
if in_gap:
|
||
gw = x - gap_start
|
||
if gw >= max(int(median_height * 0.8), 15):
|
||
gaps.append((gap_start + gw // 2, gw))
|
||
in_gap = False
|
||
if not gaps:
|
||
return [(quad, text, conf)]
|
||
gaps.sort(key=lambda g: g[1], reverse=True)
|
||
split_x_abs = max(0, x1) + gaps[0][0]
|
||
if ' ' in text:
|
||
char_w = w / max(1, len(text))
|
||
split_idx = int((split_x_abs - x1) / max(1e-6, char_w))
|
||
spaces = [i for i, c in enumerate(text) if c == ' ']
|
||
if spaces:
|
||
split_idx = min(spaces, key=lambda i: abs(i - split_idx))
|
||
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
|
||
else:
|
||
split_idx = int(len(text) * (split_x_abs - x1) / w)
|
||
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
|
||
if tl and tr:
|
||
return [([[x1,y1],[split_x_abs,y1],[split_x_abs,y2],[x1,y2]], tl, conf),
|
||
([[split_x_abs,y1],[x2,y1],[x2,y2],[split_x_abs,y2]], tr, conf)]
|
||
return [(quad, text, conf)]
|
||
|
||
def validate_and_split_oversized_quads(image_bgr, filtered_ocr):
|
||
if not filtered_ocr:
|
||
return filtered_ocr, 0
|
||
heights = [max(1, quad_bbox(q)[3] - quad_bbox(q)[1]) for q, _, _ in filtered_ocr]
|
||
median_height = float(np.median(heights)) if heights else 14.0
|
||
result, splits_made = [], 0
|
||
for quad, text, conf in filtered_ocr:
|
||
if is_quad_oversized(quad, median_height, 8.0):
|
||
sr = split_oversized_quad_by_content(image_bgr, quad, text, conf, median_height)
|
||
if len(sr) > 1:
|
||
result.extend(sr); splits_made += 1
|
||
else:
|
||
result.append((quad, text, conf))
|
||
else:
|
||
result.append((quad, text, conf))
|
||
return result, splits_made
|
||
|
||
|
||
# ============================================================
|
||
# HORIZONTAL GAP DETECTION AT QUAD LEVEL
|
||
# ============================================================
|
||
def detect_horizontal_gap_in_group(indices, ocr, med_h, gap_factor=2.5):
|
||
if len(indices) < 2:
|
||
return None
|
||
items = sorted(indices, key=lambda i: quad_center(ocr[i][0])[0])
|
||
boxes = [quad_bbox(ocr[i][0]) for i in items]
|
||
gap_threshold = med_h * gap_factor
|
||
best_gap, best_split = 0.0, None
|
||
for k in range(len(items) - 1):
|
||
gap = boxes[k + 1][0] - boxes[k][2]
|
||
if gap > gap_threshold and gap > best_gap:
|
||
best_gap, best_split = gap, k
|
||
if best_split is None:
|
||
return None
|
||
left_group = [items[i] for i in range(best_split + 1)]
|
||
right_group = [items[i] for i in range(best_split + 1, len(items))]
|
||
if not left_group or not right_group:
|
||
return None
|
||
return (left_group, right_group)
|
||
|
||
def orientation_compatible(idx_a, idx_b, ocr):
|
||
ba = quad_bbox(ocr[idx_a][0])
|
||
bb = quad_bbox(ocr[idx_b][0])
|
||
wa, ha = max(1, ba[2]-ba[0]), max(1, ba[3]-ba[1])
|
||
wb, hb = max(1, bb[2]-bb[0]), max(1, bb[3]-bb[1])
|
||
ra, rb = wa / ha, wb / hb
|
||
if (ra < 0.6 and rb > 2.0) or (rb < 0.6 and ra > 2.0):
|
||
return False
|
||
return True
|
||
|
||
|
||
# ============================================================
|
||
# WIDE QUAD COLUMN SPLIT — pre-grouping
|
||
# ============================================================
|
||
def split_wide_quad_by_column_gap(image_bgr, quad, text, conf, med_h,
|
||
min_gap_factor=1.8):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
w, h = x2 - x1, max(1, y2 - y1)
|
||
if w < med_h * 3.0:
|
||
return [(quad, text, conf)]
|
||
pad = 2
|
||
roi = image_bgr[max(0,y1-pad):min(image_bgr.shape[0],y2+pad),
|
||
max(0,x1):min(image_bgr.shape[1],x2)]
|
||
if roi.size == 0:
|
||
return [(quad, text, conf)]
|
||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
v_proj = np.sum(binary, axis=0)
|
||
gap_threshold = h * 255 * 0.12
|
||
min_gap_px = max(int(med_h * min_gap_factor), 10)
|
||
gaps, in_gap, gap_start = [], False, 0
|
||
for x in range(len(v_proj)):
|
||
if v_proj[x] < gap_threshold:
|
||
if not in_gap: gap_start, in_gap = x, True
|
||
else:
|
||
if in_gap:
|
||
gw = x - gap_start
|
||
if gw >= min_gap_px:
|
||
gaps.append((gap_start + gw // 2, gw))
|
||
in_gap = False
|
||
if not gaps:
|
||
return [(quad, text, conf)]
|
||
gaps.sort(key=lambda g: g[1], reverse=True)
|
||
split_x_rel = gaps[0][0]
|
||
split_x_abs = x1 + split_x_rel
|
||
if split_x_abs - x1 < med_h or x2 - split_x_abs < med_h:
|
||
return [(quad, text, conf)]
|
||
if ' ' in text:
|
||
char_w = w / max(1, len(text))
|
||
split_idx = int(split_x_rel / max(1e-6, char_w))
|
||
spaces = [i for i, c in enumerate(text) if c == ' ']
|
||
if spaces:
|
||
split_idx = min(spaces, key=lambda i: abs(i - split_idx))
|
||
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
|
||
else:
|
||
split_idx = int(len(text) * split_x_rel / w)
|
||
tl, tr = text[:split_idx].strip(), text[split_idx:].strip()
|
||
if tl and tr:
|
||
return [([[x1,y1],[split_x_abs,y1],[split_x_abs,y2],[x1,y2]], tl, conf),
|
||
([[split_x_abs,y1],[x2,y1],[x2,y2],[split_x_abs,y2]], tr, conf)]
|
||
return [(quad, text, conf)]
|
||
|
||
|
||
def apply_column_gap_splits(image_bgr, ocr_list, med_h):
|
||
result, splits_made = [], 0
|
||
for quad, text, conf in ocr_list:
|
||
parts = split_wide_quad_by_column_gap(image_bgr, quad, text, conf, med_h)
|
||
if len(parts) > 1:
|
||
splits_made += 1
|
||
result.extend(parts)
|
||
if splits_made:
|
||
print(f"📐 Column-gap split: {splits_made} wide quad(s) split before grouping")
|
||
return result, splits_made
|
||
|
||
|
||
# ============================================================
|
||
# GENERALIZED BOX FIXING FUNCTIONS
|
||
# ============================================================
|
||
def detect_and_split_multi_bubble_boxes(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, image_bgr):
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
bubble_contours = detect_speech_bubbles(image_bgr)
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
next_bid, splits_made = 1, []
|
||
|
||
for bid, indices in bubble_indices.items():
|
||
if len(indices) < 2:
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = indices
|
||
next_bid += 1
|
||
continue
|
||
|
||
split_groups = split_indices_by_bubble(indices, ocr, bubble_contours)
|
||
if len(split_groups) > 1:
|
||
for group in split_groups:
|
||
if group:
|
||
new_bubbles[next_bid] = build_lines_from_indices(group, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group])
|
||
new_quads[next_bid] = [ocr[i][0] for i in group]
|
||
new_indices[next_bid] = group
|
||
next_bid += 1
|
||
splits_made.append(f"BOX#{bid} → {len(split_groups)} bubbles")
|
||
continue
|
||
|
||
vertical_splits = check_vertical_alignment_split(indices, ocr,
|
||
threshold=int(med_h * 2.0))
|
||
if len(vertical_splits) > 1:
|
||
for group in vertical_splits:
|
||
if group:
|
||
new_bubbles[next_bid] = build_lines_from_indices(group, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group])
|
||
new_quads[next_bid] = [ocr[i][0] for i in group]
|
||
new_indices[next_bid] = group
|
||
next_bid += 1
|
||
splits_made.append(f"BOX#{bid} → {len(vertical_splits)} vertical groups")
|
||
continue
|
||
|
||
box = bubble_boxes[bid]
|
||
x1, y1, x2, y2 = box
|
||
if (x2 - x1) > med_h * 10:
|
||
x_centers = [quad_center(ocr[i][0])[0] for i in indices]
|
||
x_median = np.median(x_centers)
|
||
left_group = [i for i in indices if quad_center(ocr[i][0])[0] < x_median]
|
||
right_group = [i for i in indices if quad_center(ocr[i][0])[0] >= x_median]
|
||
if left_group and right_group:
|
||
left_box = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in left_group])
|
||
right_box = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in right_group])
|
||
if right_box[0] - left_box[2] > med_h * 1.5:
|
||
for grp in [left_group, right_group]:
|
||
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in grp])
|
||
new_quads[next_bid] = [ocr[i][0] for i in grp]
|
||
new_indices[next_bid] = grp
|
||
next_bid += 1
|
||
splits_made.append(f"BOX#{bid} → 2 horizontal panels")
|
||
continue
|
||
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = indices
|
||
next_bid += 1
|
||
|
||
if splits_made:
|
||
print(f"\n🔧 Split {len(splits_made)} multi-bubble box(es):")
|
||
for s in splits_made: print(f" ✓ {s}")
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
|
||
def detect_and_merge_fragmented_bubbles(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, image_bgr):
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
bubble_contours = detect_speech_bubbles(image_bgr)
|
||
bids = list(bubble_boxes.keys())
|
||
to_merge = []
|
||
|
||
for i in range(len(bids)):
|
||
for j in range(i + 1, len(bids)):
|
||
bid_i, bid_j = bids[i], bids[j]
|
||
box_i, box_j = bubble_boxes[bid_i], bubble_boxes[bid_j]
|
||
cx_i = (box_i[0] + box_i[2]) / 2.0
|
||
cy_i = (box_i[1] + box_i[3]) / 2.0
|
||
cx_j = (box_j[0] + box_j[2]) / 2.0
|
||
cy_j = (box_j[1] + box_j[3]) / 2.0
|
||
in_same_bubble = any(
|
||
cv2.pointPolygonTest(c, (cx_i, cy_i), False) >= 0 and
|
||
cv2.pointPolygonTest(c, (cx_j, cy_j), False) >= 0
|
||
for c in bubble_contours
|
||
)
|
||
if in_same_bubble:
|
||
if abs(cx_i - cx_j) < med_h * 3.0 and abs(cy_i - cy_j) < med_h * 6.0:
|
||
to_merge.append((bid_i, bid_j) if cy_i < cy_j else (bid_j, bid_i))
|
||
|
||
if not to_merge:
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
print(f"\n🔗 Merging {len(to_merge)} fragmented bubble(s):")
|
||
merge_groups = {}
|
||
for top, bottom in to_merge:
|
||
found = False
|
||
for key in merge_groups:
|
||
if top in merge_groups[key] or bottom in merge_groups[key]:
|
||
merge_groups[key].update({top, bottom})
|
||
found = True; break
|
||
if not found:
|
||
merge_groups[len(merge_groups)] = {top, bottom}
|
||
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
merged_bids, next_bid = set(), 1
|
||
for merge_set in merge_groups.values():
|
||
merge_list = sorted(merge_set)
|
||
print(f" ✓ Merging: {', '.join(f'#{b}' for b in merge_list)}")
|
||
all_indices = sorted(set(idx for b in merge_list for idx in bubble_indices[b]))
|
||
for b in merge_list: merged_bids.add(b)
|
||
new_bubbles[next_bid] = build_lines_from_indices(all_indices, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_indices])
|
||
new_quads[next_bid] = [ocr[i][0] for i in all_indices]
|
||
new_indices[next_bid] = all_indices
|
||
next_bid += 1
|
||
for bid in bids:
|
||
if bid not in merged_bids:
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = bubble_indices[bid]
|
||
next_bid += 1
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
|
||
def merge_boxes_by_proximity_and_overlap(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, med_h):
|
||
"""
|
||
Merges boxes that are vertically close AND share significant horizontal overlap.
|
||
|
||
FIX: Tightened thresholds to prevent cross-bubble merging:
|
||
vert_gap: med_h * 1.5 → med_h * 0.8
|
||
h_overlap_ratio: 0.35 → 0.55
|
||
|
||
This keeps legitimate fragment merges (same bubble, split by OCR)
|
||
while blocking merges across adjacent bubbles that happen to be
|
||
vertically stacked (the Box-8 / Box-6 failure cases).
|
||
"""
|
||
bids = sorted(bubble_boxes.keys())
|
||
merge_map: Dict[int, List[int]] = {}
|
||
merged_into: Dict[int, int] = {}
|
||
|
||
for i, bid_i in enumerate(bids):
|
||
if bid_i in merged_into:
|
||
continue
|
||
box_i = bubble_boxes[bid_i]
|
||
wi = max(1, box_i[2] - box_i[0])
|
||
|
||
for j in range(i + 1, len(bids)):
|
||
bid_j = bids[j]
|
||
if bid_j in merged_into:
|
||
continue
|
||
box_j = bubble_boxes[bid_j]
|
||
wj = max(1, box_j[2] - box_j[0])
|
||
|
||
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
|
||
h_ix1 = max(box_i[0], box_j[0])
|
||
h_ix2 = min(box_i[2], box_j[2])
|
||
h_overlap = max(0, h_ix2 - h_ix1)
|
||
h_overlap_ratio = h_overlap / max(1, min(wi, wj))
|
||
|
||
# FIX: tightened from med_h*1.5 → med_h*0.8
|
||
# FIX: tightened from 0.35 → 0.55
|
||
if vert_gap <= med_h * 0.8 and h_overlap_ratio >= 0.55:
|
||
root = merged_into.get(bid_i, bid_i)
|
||
merge_map.setdefault(root, [root])
|
||
if bid_j not in merge_map[root]:
|
||
merge_map[root].append(bid_j)
|
||
merged_into[bid_j] = root
|
||
|
||
if not merge_map:
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
print(f"\n🔀 Proximity+overlap merge: {len(merge_map)} group(s):")
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
processed, next_bid = set(), 1
|
||
|
||
for root, group in merge_map.items():
|
||
group_unique = sorted(set(group))
|
||
print(f" ✓ Merging: {', '.join(f'#{b}' for b in group_unique)}")
|
||
all_indices = sorted(set(idx for b in group_unique for idx in bubble_indices[b]))
|
||
new_bubbles[next_bid] = build_lines_from_indices(all_indices, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_indices])
|
||
new_quads[next_bid] = [ocr[i][0] for i in all_indices]
|
||
new_indices[next_bid] = all_indices
|
||
next_bid += 1
|
||
processed.update(group_unique)
|
||
|
||
for bid in bids:
|
||
if bid not in processed:
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = bubble_indices[bid]
|
||
next_bid += 1
|
||
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, image_bgr):
|
||
"""
|
||
FIX: Merges boxes that are:
|
||
1. Inside the same speech-bubble contour
|
||
2. Vertically adjacent (gap ≤ 2 × med_h)
|
||
3. Both classified as dialogue/reaction/narration
|
||
(never merges sfx into dialogue)
|
||
|
||
This fixes split detections like Box7+Box9 in 001 and
|
||
Box9+Box10 in 002 where one bubble was detected as two
|
||
separate regions due to an intervening SFX quad.
|
||
"""
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
bubble_contours = detect_speech_bubbles(image_bgr)
|
||
quad_to_bubble = build_quad_to_bubble_map(ocr, bubble_contours)
|
||
|
||
bids = sorted(bubble_boxes.keys(),
|
||
key=lambda b: (bubble_boxes[b][1] + bubble_boxes[b][3]) / 2.0)
|
||
|
||
merge_pairs = []
|
||
visited = set()
|
||
|
||
for i in range(len(bids)):
|
||
bid_i = bids[i]
|
||
if bid_i in visited:
|
||
continue
|
||
|
||
box_i = bubble_boxes[bid_i]
|
||
text_i = normalize_text(" ".join(bubbles.get(bid_i, [])))
|
||
role_i = region_text_role_hint(text_i)
|
||
|
||
# Never merge sfx boxes into anything
|
||
if role_i == "sfx":
|
||
continue
|
||
|
||
for j in range(i + 1, len(bids)):
|
||
bid_j = bids[j]
|
||
if bid_j in visited:
|
||
continue
|
||
|
||
box_j = bubble_boxes[bid_j]
|
||
text_j = normalize_text(" ".join(bubbles.get(bid_j, [])))
|
||
role_j = region_text_role_hint(text_j)
|
||
|
||
if role_j == "sfx":
|
||
continue
|
||
|
||
# Must share the same speech-bubble contour
|
||
idx_i = bubble_indices[bid_i]
|
||
idx_j = bubble_indices[bid_j]
|
||
if not idx_i or not idx_j:
|
||
continue
|
||
|
||
cid_i = quad_to_bubble.get(idx_i[0], -1)
|
||
cid_j = quad_to_bubble.get(idx_j[0], -1)
|
||
if cid_i == -1 or cid_j == -1 or cid_i != cid_j:
|
||
continue
|
||
|
||
# Must be vertically adjacent
|
||
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
|
||
if vert_gap > med_h * 2.5:
|
||
continue
|
||
|
||
# Must have horizontal overlap
|
||
h_overlap = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0]))
|
||
min_w = min(xyxy_width(box_i), xyxy_width(box_j))
|
||
if h_overlap / max(1, min_w) < 0.25:
|
||
continue
|
||
|
||
merge_pairs.append((bid_i, bid_j))
|
||
visited.add(bid_i)
|
||
visited.add(bid_j)
|
||
break # each box merges with at most one partner
|
||
|
||
if not merge_pairs:
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
print(f"\n🔗 Continuation merge: {len(merge_pairs)} pair(s):")
|
||
|
||
processed = set()
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
next_bid = 1
|
||
|
||
for bid_a, bid_b in merge_pairs:
|
||
print(f" ✓ Merging BOX#{bid_a} + BOX#{bid_b}")
|
||
all_idx = sorted(
|
||
set(bubble_indices[bid_a]) | set(bubble_indices[bid_b]),
|
||
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
|
||
)
|
||
new_bubbles[next_bid] = build_lines_from_indices(all_idx, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_idx])
|
||
new_quads[next_bid] = [ocr[i][0] for i in all_idx]
|
||
new_indices[next_bid] = all_idx
|
||
processed.update({bid_a, bid_b})
|
||
next_bid += 1
|
||
|
||
for bid in bids:
|
||
if bid not in processed:
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = bubble_indices[bid]
|
||
next_bid += 1
|
||
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
def auto_fix_bubble_detection(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, image_bgr):
|
||
"""
|
||
Full fix pipeline:
|
||
1. Split boxes that span multiple speech bubbles.
|
||
2. Merge fragments detected inside the same contour.
|
||
3. Merge continuation boxes split across same bubble (NEW).
|
||
4. Proximity+overlap merge — pass 1.
|
||
5. Proximity+overlap merge — pass 2 (chain resolution).
|
||
"""
|
||
print("\n🔍 Running automatic bubble detection fixes...")
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
|
||
detect_and_split_multi_bubble_boxes(
|
||
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
|
||
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
|
||
detect_and_merge_fragmented_bubbles(
|
||
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
|
||
|
||
# FIX: merge continuation boxes (same bubble, split detection)
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
|
||
merge_continuation_boxes(
|
||
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
|
||
|
||
# Pass 1
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
|
||
merge_boxes_by_proximity_and_overlap(
|
||
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, med_h)
|
||
|
||
# Pass 2
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
|
||
merge_boxes_by_proximity_and_overlap(
|
||
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, med_h)
|
||
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
|
||
def remove_nested_boxes(bubble_boxes, bubble_indices, bubble_quads, bubbles,
|
||
overlap_threshold=0.50):
|
||
bids = list(bubble_boxes.keys())
|
||
to_remove = set()
|
||
for i in range(len(bids)):
|
||
bid_i = bids[i]
|
||
if bid_i in to_remove: continue
|
||
box_i = bubble_boxes[bid_i]
|
||
area_i = max(0, box_i[2]-box_i[0]) * max(0, box_i[3]-box_i[1])
|
||
for j in range(i + 1, len(bids)):
|
||
bid_j = bids[j]
|
||
if bid_j in to_remove: continue
|
||
box_j = bubble_boxes[bid_j]
|
||
area_j = max(0, box_j[2]-box_j[0]) * max(0, box_j[3]-box_j[1])
|
||
shared = set(bubble_indices[bid_i]).intersection(bubble_indices[bid_j])
|
||
overlap = boxes_overlap_ratio(box_i, box_j)
|
||
if overlap > overlap_threshold or len(shared) > 0:
|
||
if area_i >= area_j:
|
||
to_remove.add(bid_j)
|
||
print(f" 🗑️ Removing BOX#{bid_j} (overlaps BOX#{bid_i})")
|
||
else:
|
||
to_remove.add(bid_i)
|
||
print(f" 🗑️ Removing BOX#{bid_i} (overlaps BOX#{bid_j})")
|
||
break
|
||
if to_remove:
|
||
print(f"\n🧹 Removed {len(to_remove)} overlapping/nested box(es)")
|
||
for bid in to_remove:
|
||
bubble_boxes.pop(bid, None)
|
||
bubble_indices.pop(bid, None)
|
||
bubble_quads.pop(bid, None)
|
||
bubbles.pop(bid, None)
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
|
||
def enforce_max_box_size(bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr,
|
||
max_width_ratio=0.6, max_height_ratio=0.5, image_shape=None):
|
||
if image_shape is None:
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
ih, iw = image_shape[:2]
|
||
max_width, max_height = iw * max_width_ratio, ih * max_height_ratio
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
next_bid, splits_made = 1, []
|
||
|
||
for bid, box in bubble_boxes.items():
|
||
x1, y1, x2, y2 = box
|
||
w, h = x2 - x1, y2 - y1
|
||
if w > max_width or h > max_height:
|
||
indices = bubble_indices[bid]
|
||
col_split = split_bubble_if_multiple_columns(indices, ocr, bid=bid,
|
||
use_aggressive_thresholds=True)
|
||
if col_split:
|
||
for grp in col_split:
|
||
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in grp])
|
||
new_quads[next_bid] = [ocr[i][0] for i in grp]
|
||
new_indices[next_bid] = grp
|
||
next_bid += 1
|
||
splits_made.append(f"BOX#{bid} (oversized: {w}x{h}px)")
|
||
continue
|
||
row_split = split_bubble_if_multiple_rows(indices, ocr, bid=bid)
|
||
if row_split:
|
||
for grp in row_split:
|
||
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in grp])
|
||
new_quads[next_bid] = [ocr[i][0] for i in grp]
|
||
new_indices[next_bid] = grp
|
||
next_bid += 1
|
||
splits_made.append(f"BOX#{bid} (oversized: {w}x{h}px)")
|
||
continue
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = box
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = bubble_indices[bid]
|
||
next_bid += 1
|
||
|
||
if splits_made:
|
||
print(f"\n📏 Split {len(splits_made)} oversized box(es):")
|
||
for s in splits_made: print(f" ✓ {s}")
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
|
||
def should_merge_groups(group1_indices, group2_indices, ocr, median_height,
|
||
max_vertical_gap=None):
|
||
if max_vertical_gap is None:
|
||
max_vertical_gap = median_height * 2.5
|
||
box1 = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group1_indices])
|
||
box2 = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in group2_indices])
|
||
if box1 is None or box2 is None:
|
||
return False
|
||
cx1 = (box1[0] + box1[2]) / 2.0
|
||
cx2 = (box2[0] + box2[2]) / 2.0
|
||
if abs(cx1 - cx2) > median_height * 1.8:
|
||
return False
|
||
vertical_gap = max(0, max(box1[1], box2[1]) - min(box1[3], box2[3]))
|
||
return vertical_gap <= max_vertical_gap
|
||
|
||
|
||
# ============================================================
|
||
# FIX: CONTOUR-AWARE BUBBLE SPLITTING
|
||
# Splits a merged group using actual contour membership BEFORE
|
||
# any proximity/overlap merging pass runs.
|
||
# ============================================================
|
||
|
||
def split_group_by_contour_membership(indices: list, ocr: list,
|
||
quad_to_bubble: Dict[int, int]) -> List[List[int]]:
|
||
"""
|
||
Partition OCR indices by their assigned bubble contour.
|
||
Indices with no contour (-1) form their own singleton groups.
|
||
|
||
Returns a list of groups; if all indices share the same contour
|
||
the original list is returned as-is (no split needed).
|
||
"""
|
||
buckets: Dict[int, List[int]] = {}
|
||
for idx in indices:
|
||
cid = quad_to_bubble.get(idx, -1)
|
||
buckets.setdefault(cid, []).append(idx)
|
||
|
||
if len(buckets) <= 1:
|
||
return [indices]
|
||
|
||
# Sort each bucket top-to-bottom
|
||
result = []
|
||
for cid, group in sorted(buckets.items()):
|
||
group_sorted = sorted(group,
|
||
key=lambda i: (quad_bbox(ocr[i][0])[1],
|
||
quad_bbox(ocr[i][0])[0]))
|
||
result.append(group_sorted)
|
||
return result
|
||
|
||
# ============================================================
|
||
# FIX: MIXED-TYPE GROUP SPLITTER
|
||
# Splits a group whose quads contain BOTH sfx-like and
|
||
# dialogue-like text into separate sub-groups.
|
||
# This fixes Box-12/007, Box-22/007, Box-13/008 where an SFX
|
||
# quad (RRRING, A MEAL-GRUBBING SHE-BEAST) was merged with a
|
||
# dialogue quad because they shared the same contour region.
|
||
# ============================================================
|
||
|
||
def split_group_by_region_type(indices: list, ocr: list) -> List[List[int]]:
|
||
"""
|
||
Partition OCR indices by their inferred region type.
|
||
|
||
Groups with only one type are returned as-is.
|
||
Groups mixing sfx + dialogue/narration are split so each
|
||
type forms its own sub-group, ordered top-to-bottom.
|
||
|
||
Returns a list of index groups.
|
||
"""
|
||
if len(indices) <= 1:
|
||
return [indices]
|
||
|
||
typed: Dict[str, List[int]] = {}
|
||
for idx in indices:
|
||
text = normalize_text(ocr[idx][1])
|
||
role = region_text_role_hint(text)
|
||
typed.setdefault(role, []).append(idx)
|
||
|
||
# Only split when we have genuinely different types present
|
||
# and at least one group is sfx (the most common contaminator)
|
||
has_sfx = "sfx" in typed
|
||
has_dialogue = "dialogue" in typed or "narration" in typed or "reaction" in typed
|
||
|
||
if not (has_sfx and has_dialogue):
|
||
return [indices]
|
||
|
||
# Build clean groups sorted top-to-bottom within each type
|
||
result = []
|
||
for role in ("dialogue", "narration", "reaction", "sfx", "unknown"):
|
||
group = typed.get(role, [])
|
||
if group:
|
||
group_sorted = sorted(
|
||
group,
|
||
key=lambda i: (quad_bbox(ocr[i][0])[1], quad_bbox(ocr[i][0])[0])
|
||
)
|
||
result.append(group_sorted)
|
||
|
||
return result if len(result) > 1 else [indices]
|
||
|
||
|
||
def split_group_by_spatial_gap(indices: list, ocr: list,
|
||
gap_factor: float = 1.8) -> List[List[int]]:
|
||
"""
|
||
Splits a group of OCR indices where a large spatial gap exists
|
||
between clusters — catches Box-22/007 where two dialogue bubbles
|
||
sit side-by-side with a visible horizontal gap.
|
||
|
||
Works in both axes: tries horizontal split first, then vertical.
|
||
Returns original list if no significant gap is found.
|
||
"""
|
||
if len(indices) <= 1:
|
||
return [indices]
|
||
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in indices]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
gap_threshold = med_h * gap_factor
|
||
|
||
# ── Try horizontal split (left / right columns) ───────────
|
||
sorted_by_x = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[0])
|
||
boxes_x = [quad_bbox(ocr[i][0]) for i in sorted_by_x]
|
||
|
||
best_h_gap, best_h_split = 0.0, None
|
||
for k in range(len(sorted_by_x) - 1):
|
||
gap = boxes_x[k + 1][0] - boxes_x[k][2]
|
||
if gap > gap_threshold and gap > best_h_gap:
|
||
best_h_gap = gap
|
||
best_h_split = k
|
||
|
||
if best_h_split is not None:
|
||
left = [sorted_by_x[i] for i in range(best_h_split + 1)]
|
||
right = [sorted_by_x[i] for i in range(best_h_split + 1, len(sorted_by_x))]
|
||
if left and right:
|
||
return [left, right]
|
||
|
||
# ── Try vertical split (top / bottom rows) ────────────────
|
||
sorted_by_y = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[1])
|
||
boxes_y = [quad_bbox(ocr[i][0]) for i in sorted_by_y]
|
||
|
||
best_v_gap, best_v_split = 0.0, None
|
||
for k in range(len(sorted_by_y) - 1):
|
||
gap = boxes_y[k + 1][1] - boxes_y[k][3]
|
||
if gap > gap_threshold and gap > best_v_gap:
|
||
best_v_gap = gap
|
||
best_v_split = k
|
||
|
||
if best_v_split is not None:
|
||
top = [sorted_by_y[i] for i in range(best_v_split + 1)]
|
||
bottom = [sorted_by_y[i] for i in range(best_v_split + 1, len(sorted_by_y))]
|
||
if top and bottom:
|
||
return [top, bottom]
|
||
|
||
return [indices]
|
||
|
||
|
||
def apply_contour_split_to_all_boxes(bubble_boxes, bubble_indices, bubble_quads,
|
||
bubbles, ocr, image_bgr):
|
||
"""
|
||
FIX: Pre-pass that runs BEFORE proximity merging.
|
||
Chains three split strategies in order:
|
||
1. Contour membership — different speech-bubble contours
|
||
2. Mixed region type — sfx quads merged with dialogue quads (NEW)
|
||
3. Spatial gap — two dialogue bubbles side-by-side (NEW)
|
||
|
||
Primary fix for:
|
||
Box-8/008 (4 bubbles merged)
|
||
Box-6/008 (2 adjacent bubbles merged)
|
||
Box-12/007 (RRRING + dialogue merged)
|
||
Box-22/007 (two dialogue bubbles merged)
|
||
Box-13/008 (RRRING + dialogue merged)
|
||
"""
|
||
bubble_contours = detect_speech_bubbles(image_bgr)
|
||
quad_to_bubble = (build_quad_to_bubble_map(ocr, bubble_contours)
|
||
if bubble_contours else {})
|
||
|
||
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
|
||
next_bid = 1
|
||
splits_made = []
|
||
|
||
for bid in sorted(bubble_boxes.keys()):
|
||
indices = bubble_indices[bid]
|
||
|
||
# ── Strategy 1: contour membership ───────────────────
|
||
groups = split_group_by_contour_membership(indices, ocr, quad_to_bubble)
|
||
|
||
# ── Strategy 2: mixed region type ────────────────────
|
||
# Apply to every group produced by strategy 1
|
||
refined = []
|
||
for grp in groups:
|
||
sub = split_group_by_region_type(grp, ocr)
|
||
refined.extend(sub)
|
||
groups = refined
|
||
|
||
# ── Strategy 3: spatial gap ───────────────────────────
|
||
# Apply to every group produced by strategies 1+2
|
||
final = []
|
||
for grp in groups:
|
||
sub = split_group_by_spatial_gap(grp, ocr, gap_factor=1.8)
|
||
final.extend(sub)
|
||
groups = final
|
||
|
||
# ── Commit results ────────────────────────────────────
|
||
if len(groups) <= 1:
|
||
new_bubbles[next_bid] = bubbles[bid]
|
||
new_boxes[next_bid] = bubble_boxes[bid]
|
||
new_quads[next_bid] = bubble_quads[bid]
|
||
new_indices[next_bid] = indices
|
||
next_bid += 1
|
||
continue
|
||
|
||
for grp in groups:
|
||
if not grp:
|
||
continue
|
||
new_bubbles[next_bid] = build_lines_from_indices(grp, ocr)
|
||
new_boxes[next_bid] = boxes_union_xyxy(
|
||
[quad_bbox(ocr[i][0]) for i in grp])
|
||
new_quads[next_bid] = [ocr[i][0] for i in grp]
|
||
new_indices[next_bid] = grp
|
||
next_bid += 1
|
||
|
||
splits_made.append(f"BOX#{bid} → {len(groups)} groups")
|
||
|
||
if splits_made:
|
||
print(f"\n✂️ Contour-aware pre-split: {len(splits_made)} box(es) split:")
|
||
for s in splits_made:
|
||
print(f" ✓ {s}")
|
||
|
||
return new_bubbles, new_boxes, new_quads, new_indices
|
||
|
||
|
||
# ============================================================
|
||
# ENHANCED OCR ENGINE
|
||
# ============================================================
|
||
class ImprovedMacVisionDetector:
|
||
def __init__(self, source_lang="en"):
|
||
lang_key = source_lang.lower().strip()
|
||
lang_map = {
|
||
"en": "en-US", "english": "en-US",
|
||
"es": "es-ES", "spanish": "es-ES",
|
||
"ca": "ca-ES", "catalan": "ca-ES",
|
||
"fr": "fr-FR", "french": "fr-FR",
|
||
"ja": "ja-JP", "japanese": "ja-JP",
|
||
"it": "it-IT", "italian": "it-IT",
|
||
"de": "de-DE", "german": "de-DE",
|
||
"ko": "ko-KR", "korean": "ko-KR",
|
||
"zh": "zh-Hans", "chinese": "zh-Hans"
|
||
}
|
||
self.langs = [lang_map.get(lang_key, "en-US")]
|
||
print(f"⚡ Using Enhanced Apple Vision OCR (Language: {self.langs[0]})")
|
||
|
||
def preprocess_variants(self, image_bgr):
|
||
variants = [("enhanced", enhance_image_for_ocr(image_bgr, upscale_factor=2.5))]
|
||
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
||
_, hc = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||
variants.append(("high_contrast",
|
||
cv2.cvtColor(cv2.resize(hc, None, fx=2.5, fy=2.5,
|
||
interpolation=cv2.INTER_CUBIC),
|
||
cv2.COLOR_GRAY2BGR)))
|
||
variants.append(("bilateral",
|
||
cv2.resize(cv2.bilateralFilter(image_bgr, 9, 75, 75),
|
||
None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)))
|
||
variants.append(("inverted",
|
||
cv2.resize(cv2.bitwise_not(image_bgr),
|
||
None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)))
|
||
variants.append(("original",
|
||
cv2.resize(image_bgr, None, fx=2.5, fy=2.5,
|
||
interpolation=cv2.INTER_CUBIC)))
|
||
return variants
|
||
|
||
def run_vision_ocr(self, image_bgr):
|
||
if image_bgr is None or image_bgr.size == 0:
|
||
return []
|
||
ih, iw = image_bgr.shape[:2]
|
||
success, buffer = cv2.imencode('.png', image_bgr)
|
||
if not success:
|
||
return []
|
||
ns_data = NSData.dataWithBytes_length_(buffer.tobytes(), len(buffer))
|
||
cg_image = Quartz.CGImageSourceCreateWithData(ns_data, None)
|
||
cg_image = Quartz.CGImageSourceCreateImageAtIndex(cg_image, 0, None)
|
||
request = Vision.VNRecognizeTextRequest.alloc().init()
|
||
request.setRecognitionLevel_(1)
|
||
request.setUsesLanguageCorrection_(True)
|
||
request.setRecognitionLanguages_(self.langs)
|
||
handler = Vision.VNImageRequestHandler.alloc().initWithCGImage_options_(
|
||
cg_image, {})
|
||
handler.performRequests_error_([request], None)
|
||
results = []
|
||
scale_x, scale_y = iw, ih
|
||
for obs in (request.results() or []):
|
||
bbox = obs.boundingBox()
|
||
x1 = int(bbox.origin.x * scale_x)
|
||
y1 = int((1 - bbox.origin.y - bbox.size.height) * scale_y)
|
||
x2 = int((bbox.origin.x + bbox.size.width) * scale_x)
|
||
y2 = int((1 - bbox.origin.y) * scale_y)
|
||
x1, y1 = max(0, x1), max(0, y1)
|
||
x2, y2 = min(iw, x2), min(ih, y2)
|
||
if x2 <= x1 or y2 <= y1:
|
||
continue
|
||
text = obs.topCandidates_(1)[0].string() if obs.topCandidates_(1) else ""
|
||
conf = float(obs.topCandidates_(1)[0].confidence()) if obs.topCandidates_(1) else 0.0
|
||
quad = [[x1,y1],[x2,y1],[x2,y2],[x1,y2]]
|
||
results.append((quad, text, conf))
|
||
return results
|
||
|
||
def detect(self, image_bgr):
|
||
"""
|
||
Multi-variant OCR with consensus merging.
|
||
Returns list of (quad, text, conf) tuples.
|
||
"""
|
||
if image_bgr is None or image_bgr.size == 0:
|
||
return []
|
||
|
||
variants = self.preprocess_variants(image_bgr)
|
||
all_results = []
|
||
variant_names = []
|
||
|
||
for name, variant_img in variants:
|
||
try:
|
||
res = self.run_vision_ocr(variant_img)
|
||
# scale coordinates back to original image space
|
||
vh, vw = variant_img.shape[:2]
|
||
oh, ow = image_bgr.shape[:2]
|
||
sx, sy = ow / max(1, vw), oh / max(1, vh)
|
||
scaled = []
|
||
for quad, text, conf in res:
|
||
sq = [[int(p[0]*sx), int(p[1]*sy)] for p in quad]
|
||
scaled.append((sq, text, conf))
|
||
all_results.append(scaled)
|
||
variant_names.append(name)
|
||
except Exception as e:
|
||
print(f" ⚠️ Variant '{name}' failed: {e}")
|
||
|
||
if not all_results:
|
||
return []
|
||
|
||
return self._merge_variant_results(all_results, variant_names)
|
||
|
||
def _merge_variant_results(self, all_results, variant_names):
|
||
"""
|
||
Merge OCR results from multiple preprocessing variants.
|
||
Strategy: use the variant with the most detections as base,
|
||
then fill gaps from other variants using IoU matching.
|
||
"""
|
||
if not all_results:
|
||
return []
|
||
|
||
# pick base = most detections
|
||
base_idx = max(range(len(all_results)), key=lambda i: len(all_results[i]))
|
||
base = list(all_results[base_idx])
|
||
others = [r for i, r in enumerate(all_results) if i != base_idx]
|
||
|
||
for other in others:
|
||
for quad_o, text_o, conf_o in other:
|
||
box_o = quad_bbox(quad_o)
|
||
matched = False
|
||
for k, (quad_b, text_b, conf_b) in enumerate(base):
|
||
box_b = quad_bbox(quad_b)
|
||
if boxes_iou(box_o, box_b) > 0.40:
|
||
# keep higher-confidence reading
|
||
if conf_o > conf_b:
|
||
base[k] = (quad_b, text_o, conf_o)
|
||
matched = True
|
||
break
|
||
if not matched and is_meaningful_text(text_o, "en"):
|
||
base.append((quad_o, text_o, conf_o))
|
||
|
||
return base
|
||
|
||
|
||
# ============================================================
|
||
# BUILD LINES FROM INDICES
|
||
# ============================================================
|
||
def build_lines_from_indices(indices, ocr, reading_mode="ltr"):
|
||
"""
|
||
Build ordered text lines from a set of OCR quad indices.
|
||
Uses layout detection to handle both horizontal and vertical text.
|
||
"""
|
||
if not indices:
|
||
return []
|
||
return build_text_from_layout(indices, ocr, reading_mode=reading_mode)
|
||
|
||
|
||
def split_indices_into_vertical_blocks(indices, ocr, gap_factor=2.5):
|
||
"""
|
||
Split indices into vertically separated blocks.
|
||
A new block starts when the vertical gap between consecutive
|
||
quads (sorted top-to-bottom) exceeds gap_factor * median_height.
|
||
"""
|
||
if not indices:
|
||
return []
|
||
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
gap_th = med_h * gap_factor
|
||
|
||
sorted_idx = sorted(indices, key=lambda i: (quad_bbox(ocr[i][0])[1],
|
||
quad_bbox(ocr[i][0])[0]))
|
||
blocks = [[sorted_idx[0]]]
|
||
for k in range(1, len(sorted_idx)):
|
||
prev_box = quad_bbox(ocr[sorted_idx[k-1]][0])
|
||
curr_box = quad_bbox(ocr[sorted_idx[k]][0])
|
||
gap = curr_box[1] - prev_box[3]
|
||
if gap > gap_th:
|
||
blocks.append([])
|
||
blocks[-1].append(sorted_idx[k])
|
||
|
||
return blocks
|
||
|
||
|
||
# ============================================================
|
||
# SPLIT HELPERS FOR enforce_max_box_size
|
||
# ============================================================
|
||
def split_bubble_if_multiple_columns(indices, ocr, bid=None,
|
||
use_aggressive_thresholds=False):
|
||
"""
|
||
Attempt to split indices into left/right column groups.
|
||
Returns list of groups if a clear column gap is found, else None.
|
||
"""
|
||
if len(indices) < 2:
|
||
return None
|
||
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
factor = 1.5 if use_aggressive_thresholds else 2.5
|
||
|
||
result = detect_horizontal_gap_in_group(indices, ocr, med_h, gap_factor=factor)
|
||
if result is None:
|
||
return None
|
||
left_group, right_group = result
|
||
if not left_group or not right_group:
|
||
return None
|
||
return [left_group, right_group]
|
||
|
||
|
||
def split_bubble_if_multiple_rows(indices, ocr, bid=None):
|
||
"""
|
||
Attempt to split indices into top/bottom row groups.
|
||
Returns list of groups if a clear row gap is found, else None.
|
||
"""
|
||
if len(indices) < 2:
|
||
return None
|
||
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) for i in indices]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
|
||
groups = check_vertical_alignment_split(indices, ocr,
|
||
threshold=int(med_h * 2.5))
|
||
if len(groups) > 1:
|
||
return groups
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# MAIN PIPELINE ENTRY POINT
|
||
# ============================================================
|
||
def process_manga_page(image_path: str,
|
||
source_lang: str = "en",
|
||
target_lang: str = "ca",
|
||
output_json: str = None,
|
||
output_txt: str = None) -> Dict[str, Any]:
|
||
"""
|
||
Full manga page OCR + translation pipeline.
|
||
|
||
Pipeline order:
|
||
1. Load image
|
||
2. Run multi-variant OCR
|
||
3. Filter noise / invalid quads
|
||
4. Pre-split wide quads by column gap
|
||
5. Propose text regions (contour-aware, tightened thresholds) <- FIX
|
||
6. Contour-aware pre-split of merged groups <- FIX
|
||
7. Auto-fix bubble detection (split multi-bubble, merge frags)
|
||
8. Reconcile region + bubble groups (tightened IoU/overlap) <- FIX
|
||
9. Remove nested/duplicate boxes
|
||
10. Enforce max box size
|
||
11. Classify region types
|
||
12. Correct OCR text
|
||
13. Translate
|
||
14. Build output
|
||
"""
|
||
print(f"\n{'='*60}")
|
||
print(f"📖 Processing: {os.path.basename(image_path)}")
|
||
print(f"{'='*60}")
|
||
|
||
image_bgr = cv2.imread(image_path)
|
||
if image_bgr is None:
|
||
raise FileNotFoundError(f"Cannot load image: {image_path}")
|
||
|
||
ih, iw = image_bgr.shape[:2]
|
||
print(f" Image size: {iw}×{ih}px")
|
||
|
||
# ── Step 2: OCR ──────────────────────────────────────────
|
||
detector = ImprovedMacVisionDetector(source_lang=source_lang)
|
||
raw_ocr = detector.detect(image_bgr)
|
||
print(f" Raw OCR detections: {len(raw_ocr)}")
|
||
|
||
# ── Step 3: Filter ───────────────────────────────────────
|
||
filtered_ocr = []
|
||
for quad, text, conf in raw_ocr:
|
||
text_clean = fix_common_ocr_errors(text)
|
||
if not is_meaningful_text(text_clean, source_lang):
|
||
continue
|
||
if not is_valid_language(text_clean, source_lang):
|
||
continue
|
||
filtered_ocr.append((quad, text_clean, conf))
|
||
|
||
filtered_ocr, _ = validate_and_split_oversized_quads(image_bgr, filtered_ocr)
|
||
|
||
if not filtered_ocr:
|
||
print(" ⚠️ No valid OCR results after filtering.")
|
||
return {}
|
||
|
||
print(f" Filtered OCR detections: {len(filtered_ocr)}")
|
||
|
||
# Build indexed OCR list for downstream functions
|
||
ocr = [(item[0], item[1], item[2]) for item in filtered_ocr]
|
||
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
|
||
# ── Step 4: Pre-split wide quads ─────────────────────────
|
||
ocr_list, _ = apply_column_gap_splits(image_bgr, ocr, med_h)
|
||
ocr = ocr_list
|
||
|
||
# Recompute med_h after potential splits
|
||
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
|
||
for i in range(len(ocr))]
|
||
med_h = float(np.median(all_h)) if all_h else 14.0
|
||
|
||
# ── Step 5: Propose regions (contour-aware) ──────────────
|
||
# FIX: pass image_bgr so contour membership gates merging
|
||
region_lines, region_boxes, region_quads, region_indices = \
|
||
propose_text_regions_from_ocr(ocr, image_bgr.shape, image_bgr=image_bgr)
|
||
|
||
print(f" Proposed regions: {len(region_boxes)}")
|
||
|
||
# ── Step 6: Contour-aware pre-split ──────────────────────
|
||
# FIX: split any region that spans multiple speech-bubble contours
|
||
# BEFORE any merging pass runs — primary fix for Box-8 / Box-6
|
||
region_lines, region_boxes, region_quads, region_indices = \
|
||
apply_contour_split_to_all_boxes(
|
||
region_boxes, region_indices, region_quads,
|
||
region_lines, ocr, image_bgr)
|
||
|
||
print(f" Regions after contour split: {len(region_boxes)}")
|
||
|
||
# ── Step 7: Auto-fix bubble detection ────────────────────
|
||
region_lines, region_boxes, region_quads, region_indices = \
|
||
auto_fix_bubble_detection(
|
||
region_boxes, region_indices, region_quads,
|
||
region_lines, ocr, image_bgr)
|
||
|
||
print(f" Regions after auto-fix: {len(region_boxes)}")
|
||
|
||
# ── Step 8: Reconcile region + bubble groups ─────────────
|
||
# For this pipeline we use region groups as both inputs since
|
||
# we have already applied contour splitting above.
|
||
# bubble_* mirrors region_* here; reconcile deduplicates overlaps.
|
||
out_lines, out_boxes, out_quads, out_indices = \
|
||
reconcile_region_and_bubble_groups(
|
||
region_lines, region_boxes, region_quads, region_indices,
|
||
region_lines, region_boxes, region_quads, region_indices,
|
||
ocr)
|
||
|
||
print(f" Boxes after reconciliation: {len(out_boxes)}")
|
||
|
||
# ── Step 9: Remove nested / duplicate boxes ───────────────
|
||
out_lines, out_boxes, out_quads, out_indices = \
|
||
remove_nested_boxes(out_boxes, out_indices, out_quads, out_lines,
|
||
overlap_threshold=0.50)
|
||
|
||
print(f" Boxes after dedup: {len(out_boxes)}")
|
||
|
||
# ── Step 10: Enforce max box size ─────────────────────────
|
||
out_lines, out_boxes, out_quads, out_indices = \
|
||
enforce_max_box_size(out_boxes, out_indices, out_quads, out_lines,
|
||
ocr, image_shape=image_bgr.shape)
|
||
|
||
print(f" Boxes after size enforcement: {len(out_boxes)}")
|
||
|
||
# ── Step 11 + 12: Classify, correct, score ────────────────
|
||
translator = GoogleTranslator(source=source_lang, target=target_lang)
|
||
|
||
results: Dict[str, Any] = {}
|
||
bid_order = sorted(
|
||
out_boxes.keys(),
|
||
key=lambda b: (
|
||
(out_boxes[b][1] + out_boxes[b][3]) / 2.0,
|
||
(out_boxes[b][0] + out_boxes[b][2]) / 2.0,
|
||
)
|
||
)
|
||
|
||
for order_idx, bid in enumerate(bid_order, start=1):
|
||
box = out_boxes[bid]
|
||
indices = out_indices[bid]
|
||
lines = out_lines[bid]
|
||
|
||
raw_text = normalize_text(" ".join(lines))
|
||
if not raw_text:
|
||
continue
|
||
|
||
# Classify
|
||
region_type = classify_region_type(image_bgr, box, lines)
|
||
|
||
# Correct OCR
|
||
corrected_text, correction_gain = correct_region_text(raw_text, region_type)
|
||
|
||
# Confidence
|
||
conf = compute_region_confidence(
|
||
raw_text, corrected_text, box, region_type, image_bgr)
|
||
conf = maybe_conf_floor_for_protected(corrected_text, conf)
|
||
|
||
# Flags
|
||
flags = build_region_flags(raw_text, corrected_text, region_type, conf)
|
||
|
||
# Bubble groups (lines as rendered in the bubble)
|
||
bubble_groups = build_text_from_layout(indices, ocr)
|
||
|
||
# ── Step 13: Translate ────────────────────────────────
|
||
translated = ""
|
||
translation_input = corrected_text
|
||
|
||
if region_type not in {"sfx"} and is_meaningful_text(corrected_text, source_lang):
|
||
try:
|
||
raw_translation = translator.translate(translation_input)
|
||
translated = postprocess_translation_general(raw_translation or "")
|
||
except Exception as e:
|
||
print(f" ⚠️ Translation failed for BOX#{bid}: {e}")
|
||
translated = corrected_text
|
||
|
||
# Segment bubble_groups into || separated string for output
|
||
bubble_groups_str = " || ".join(bubble_groups) if bubble_groups else corrected_text
|
||
|
||
# Determine OCR source label
|
||
ocr_source = "vision-base"
|
||
if correction_gain > 0.05:
|
||
ocr_source = "vision-reread"
|
||
|
||
# Add BUBBLE / SEGMENTED flags
|
||
if bubble_groups and len(bubble_groups) > 1:
|
||
if "BUBBLE" not in flags:
|
||
flags.append("BUBBLE")
|
||
if "SEGMENTED" not in flags:
|
||
flags.append("SEGMENTED")
|
||
|
||
results[str(bid)] = {
|
||
"order": order_idx,
|
||
"region_type": region_type,
|
||
"confidence": round(conf, 4),
|
||
"ocr_source": ocr_source,
|
||
"raw_ocr": raw_text,
|
||
"corrected_ocr": corrected_text,
|
||
"translation_input": translation_input,
|
||
"translated": translated,
|
||
"flags": flags,
|
||
"bubble_groups": bubble_groups,
|
||
"box": xyxy_to_xywh(box),
|
||
"lines": bubble_groups,
|
||
}
|
||
|
||
print(f"\n ✅ Processed {len(results)} text region(s).")
|
||
|
||
# ── Step 14: Write outputs ────────────────────────────────
|
||
if output_json:
|
||
_write_json_output(results, output_json)
|
||
|
||
if output_txt:
|
||
_write_txt_output(results, output_txt)
|
||
|
||
return results
|
||
|
||
|
||
# ============================================================
|
||
# OUTPUT WRITERS
|
||
# ============================================================
|
||
def _write_json_output(results: Dict[str, Any], path: str) -> None:
|
||
"""Write full results dict to a JSON file."""
|
||
try:
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
print(f" 💾 JSON saved → {path}")
|
||
except Exception as e:
|
||
print(f" ⚠️ Failed to write JSON: {e}")
|
||
|
||
|
||
def _write_txt_output(results: Dict[str, Any], path: str) -> None:
|
||
"""
|
||
Write a human-readable columnar summary to a .txt file.
|
||
|
||
Format:
|
||
BUBBLE|ORDER|TYPE|CONF|OCR_SOURCE|ORIGINAL|CORRECTED|BUBBLE_GROUPS|TRANSLATED|FLAGS
|
||
"""
|
||
sep = "─" * 120
|
||
lines = [
|
||
"BUBBLE|ORDER|TYPE|CONF|OCR_SOURCE|ORIGINAL|CORRECTED|BUBBLE_GROUPS|TRANSLATED|FLAGS",
|
||
sep,
|
||
]
|
||
|
||
for bid, data in sorted(results.items(), key=lambda kv: kv[1]["order"]):
|
||
bubble_groups_str = " || ".join(data.get("bubble_groups", []))
|
||
flags_str = ",".join(data.get("flags", []))
|
||
row = (
|
||
f"#{bid}"
|
||
f"|{data['order']}"
|
||
f"|{data['region_type']}"
|
||
f"|{data['confidence']:.2f}"
|
||
f"|{data['ocr_source']}"
|
||
f"|{data['raw_ocr']}"
|
||
f"|{data['corrected_ocr']}"
|
||
f"|{bubble_groups_str}"
|
||
f"|{data['translated']}"
|
||
f"|{flags_str}"
|
||
)
|
||
lines.append(row)
|
||
|
||
try:
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(lines) + "\n")
|
||
print(f" 📄 TXT saved → {path}")
|
||
except Exception as e:
|
||
print(f" ⚠️ Failed to write TXT: {e}")
|
||
|
||
|
||
# ============================================================
|
||
# DEBUG VISUALISER
|
||
# ============================================================
|
||
def draw_debug_clusters(image_bgr: np.ndarray,
|
||
out_boxes: Dict[int, tuple],
|
||
out_lines: Dict[int, list],
|
||
out_indices: Dict[int, list],
|
||
ocr: list,
|
||
save_path: str = None) -> np.ndarray:
|
||
"""
|
||
Draw all detected boxes with their IDs and first line of text
|
||
onto a copy of the image for visual debugging.
|
||
|
||
Color coding:
|
||
Green = dialogue
|
||
Orange = narration
|
||
Cyan = reaction
|
||
Red = sfx / unknown
|
||
"""
|
||
vis = image_bgr.copy()
|
||
ih, iw = vis.shape[:2]
|
||
|
||
COLOR_MAP = {
|
||
"dialogue": (0, 200, 0),
|
||
"narration": (0, 165, 255),
|
||
"reaction": (255, 200, 0),
|
||
"sfx": (0, 0, 220),
|
||
"unknown": (120, 120, 120),
|
||
}
|
||
|
||
bid_order = sorted(
|
||
out_boxes.keys(),
|
||
key=lambda b: (
|
||
(out_boxes[b][1] + out_boxes[b][3]) / 2.0,
|
||
(out_boxes[b][0] + out_boxes[b][2]) / 2.0,
|
||
)
|
||
)
|
||
|
||
for order_idx, bid in enumerate(bid_order, start=1):
|
||
box = out_boxes[bid]
|
||
lines = out_lines.get(bid, [])
|
||
text = normalize_text(" ".join(lines))
|
||
|
||
rtype = region_text_role_hint(text)
|
||
color = COLOR_MAP.get(rtype, (120, 120, 120))
|
||
|
||
x1, y1, x2, y2 = box
|
||
cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)
|
||
|
||
label = f"BOX#{bid} [{rtype}]"
|
||
preview = (text[:40] + "...") if len(text) > 40 else text
|
||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||
font_scale = 0.38
|
||
thickness = 1
|
||
|
||
# label background
|
||
(lw, lh), _ = cv2.getTextSize(label, font, font_scale, thickness)
|
||
cv2.rectangle(vis,
|
||
(x1, max(0, y1 - lh - 6)),
|
||
(x1 + lw + 4, y1),
|
||
color, -1)
|
||
cv2.putText(vis, label,
|
||
(x1 + 2, max(lh, y1 - 3)),
|
||
font, font_scale, (255, 255, 255), thickness,
|
||
cv2.LINE_AA)
|
||
|
||
# preview text below label
|
||
cv2.putText(vis, preview,
|
||
(x1 + 2, min(ih - 5, y1 + lh + 6)),
|
||
font, font_scale * 0.85, color, thickness,
|
||
cv2.LINE_AA)
|
||
|
||
# draw individual OCR quad outlines in lighter shade
|
||
for idx in out_indices.get(bid, []):
|
||
q = ocr[idx][0]
|
||
pts = np.array(q, dtype=np.int32).reshape((-1, 1, 2))
|
||
cv2.polylines(vis, [pts], True,
|
||
tuple(min(255, c + 80) for c in color), 1)
|
||
|
||
if save_path:
|
||
cv2.imwrite(save_path, vis)
|
||
print(f" 🖼️ Debug image saved → {save_path}")
|
||
|
||
return vis
|
||
|
||
|
||
# ============================================================
|
||
# CLI ENTRY POINT
|
||
# ============================================================
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Manga page OCR + translation pipeline (macOS Vision)")
|
||
parser.add_argument("image", help="Path to manga page image")
|
||
parser.add_argument("--source", "-s", default="en",
|
||
help="Source language code (default: en)")
|
||
parser.add_argument("--target", "-t", default="ca",
|
||
help="Target language code (default: ca)")
|
||
parser.add_argument("--json", "-j", default=None,
|
||
help="Output JSON file path")
|
||
parser.add_argument("--txt", "-o", default=None,
|
||
help="Output TXT file path")
|
||
parser.add_argument("--debug", "-d", default=None,
|
||
help="Save debug visualisation to this path")
|
||
args = parser.parse_args()
|
||
|
||
# derive default output paths from image name if not specified
|
||
base = os.path.splitext(args.image)[0]
|
||
json_out = args.json or f"{base}_bubbles.json"
|
||
txt_out = args.txt or f"{base}_output.txt"
|
||
debug_out = args.debug or f"{base}_debug_clusters.png"
|
||
|
||
results = process_manga_page(
|
||
image_path = args.image,
|
||
source_lang = args.source,
|
||
target_lang = args.target,
|
||
output_json = json_out,
|
||
output_txt = txt_out,
|
||
)
|
||
|
||
if not results:
|
||
print("\n❌ No results produced.")
|
||
return
|
||
|
||
# ── Debug visualisation ───────────────────────────────────
|
||
image_bgr = cv2.imread(args.image)
|
||
if image_bgr is not None:
|
||
# Rebuild out_boxes / out_lines / out_indices from results
|
||
# for the visualiser (they were local to process_manga_page)
|
||
vis_boxes: Dict[int, tuple] = {}
|
||
vis_lines: Dict[int, list] = {}
|
||
vis_indices: Dict[int, list] = {}
|
||
|
||
for bid_str, data in results.items():
|
||
bid = int(bid_str)
|
||
xywh = data["box"]
|
||
vis_boxes[bid] = (
|
||
xywh["x"],
|
||
xywh["y"],
|
||
xywh["x"] + xywh["w"],
|
||
xywh["y"] + xywh["h"],
|
||
)
|
||
vis_lines[bid] = data.get("lines", [])
|
||
vis_indices[bid] = [] # indices not stored in output; quads drawn from box only
|
||
|
||
draw_debug_clusters(
|
||
image_bgr,
|
||
vis_boxes,
|
||
vis_lines,
|
||
vis_indices,
|
||
ocr=[], # no raw quads available at this stage
|
||
save_path=debug_out,
|
||
)
|
||
|
||
# ── Console summary ───────────────────────────────────────
|
||
print(f"\n{'='*60}")
|
||
print(f"📊 SUMMARY ({len(results)} boxes)")
|
||
print(f"{'='*60}")
|
||
for bid_str, data in sorted(results.items(), key=lambda kv: kv[1]["order"]):
|
||
print(
|
||
f" #{bid_str:>3} [{data['region_type']:<9}] "
|
||
f"conf={data['confidence']:.2f} "
|
||
f"\"{data['corrected_ocr'][:55]}\""
|
||
)
|
||
print(f"{'='*60}\n")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |