Check new

This commit is contained in:
Guillem Hernandez Sola
2026-04-23 15:46:14 +02:00
parent 037dadd920
commit 3ca01dae8c
2 changed files with 362 additions and 102 deletions

View File

@@ -1661,18 +1661,25 @@ def merge_boxes_by_proximity_and_overlap(bubble_boxes, bubble_indices, bubble_qu
return new_bubbles, new_boxes, new_quads, new_indices return new_bubbles, new_boxes, new_quads, new_indices
def _majority_contour_id(indices: list, quad_to_bubble: Dict[int, int]) -> int:
"""
FIX B helper: Returns the most common contour ID among all quads
in a box. Falls back to -1 only if truly no quad is inside any contour.
"""
from collections import Counter
ids = [quad_to_bubble.get(i, -1) for i in indices]
valid = [cid for cid in ids if cid != -1]
if not valid:
return -1
return Counter(valid).most_common(1)[0][0]
def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads, def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr): bubbles, ocr, image_bgr):
""" """
FIX: Merges boxes that are: FIX B: Uses majority contour vote instead of idx[0] only.
1. Inside the same speech-bubble contour Also relaxed vert_gap threshold from med_h*2.5 → med_h*3.5
2. Vertically adjacent (gap ≤ 2 × med_h) to catch boxes like 002/box9+10 that have a slightly larger gap.
3. Both classified as dialogue/reaction/narration
(never merges sfx into dialogue)
This fixes split detections like Box7+Box9 in 001 and
Box9+Box10 in 002 where one bubble was detected as two
separate regions due to an intervening SFX quad.
""" """
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))] for i in range(len(ocr))]
@@ -1695,7 +1702,6 @@ def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
text_i = normalize_text(" ".join(bubbles.get(bid_i, []))) text_i = normalize_text(" ".join(bubbles.get(bid_i, [])))
role_i = region_text_role_hint(text_i) role_i = region_text_role_hint(text_i)
# Never merge sfx boxes into anything
if role_i == "sfx": if role_i == "sfx":
continue continue
@@ -1711,32 +1717,31 @@ def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
if role_j == "sfx": if role_j == "sfx":
continue continue
# Must share the same speech-bubble contour
idx_i = bubble_indices[bid_i] idx_i = bubble_indices[bid_i]
idx_j = bubble_indices[bid_j] idx_j = bubble_indices[bid_j]
if not idx_i or not idx_j: if not idx_i or not idx_j:
continue continue
cid_i = quad_to_bubble.get(idx_i[0], -1) # FIX B: majority vote instead of idx[0]
cid_j = quad_to_bubble.get(idx_j[0], -1) cid_i = _majority_contour_id(idx_i, quad_to_bubble)
cid_j = _majority_contour_id(idx_j, quad_to_bubble)
if cid_i == -1 or cid_j == -1 or cid_i != cid_j: if cid_i == -1 or cid_j == -1 or cid_i != cid_j:
continue continue
# Must be vertically adjacent # FIX B: relaxed from med_h*2.5 → med_h*3.5
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3])) vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
if vert_gap > med_h * 2.5: if vert_gap > med_h * 3.5:
continue continue
# Must have horizontal overlap
h_overlap = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0])) h_overlap = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0]))
min_w = min(xyxy_width(box_i), xyxy_width(box_j)) min_w = min(xyxy_width(box_i), xyxy_width(box_j))
if h_overlap / max(1, min_w) < 0.25: if h_overlap / max(1, min_w) < 0.20: # FIX B: relaxed from 0.25 → 0.20
continue continue
merge_pairs.append((bid_i, bid_j)) merge_pairs.append((bid_i, bid_j))
visited.add(bid_i) visited.add(bid_i)
visited.add(bid_j) visited.add(bid_j)
break # each box merges with at most one partner break
if not merge_pairs: if not merge_pairs:
return bubbles, bubble_boxes, bubble_quads, bubble_indices return bubbles, bubble_boxes, bubble_quads, bubble_indices
@@ -1770,15 +1775,116 @@ def merge_continuation_boxes(bubble_boxes, bubble_indices, bubble_quads,
return new_bubbles, new_boxes, new_quads, new_indices return new_bubbles, new_boxes, new_quads, new_indices
def merge_same_column_dialogue_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr):
"""
FIX D: Merges dialogue boxes that share the same horizontal column
(strong x-overlap) and are vertically close, even when they have
different contour IDs.
This catches 004/box2+6 where the speech bubble body and its
continuation are detected as separate contours.
Criteria:
- Both boxes are dialogue (not sfx)
- Horizontal overlap ratio ≥ 0.50 (same column)
- Vertical gap ≤ med_h * 4.0
- Combined height ≤ image_height * 0.35 (not a full-page merge)
"""
ih, iw = image_bgr.shape[:2]
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in range(len(ocr))]
med_h = float(np.median(all_h)) if all_h else 14.0
bids = sorted(bubble_boxes.keys(),
key=lambda b: (bubble_boxes[b][1] + bubble_boxes[b][3]) / 2.0)
merge_pairs = []
visited = set()
for i in range(len(bids)):
bid_i = bids[i]
if bid_i in visited:
continue
box_i = bubble_boxes[bid_i]
text_i = normalize_text(" ".join(bubbles.get(bid_i, [])))
if region_text_role_hint(text_i) == "sfx":
continue
for j in range(i + 1, len(bids)):
bid_j = bids[j]
if bid_j in visited:
continue
box_j = bubble_boxes[bid_j]
text_j = normalize_text(" ".join(bubbles.get(bid_j, [])))
if region_text_role_hint(text_j) == "sfx":
continue
# Vertical gap check
vert_gap = max(0, max(box_i[1], box_j[1]) - min(box_i[3], box_j[3]))
if vert_gap > med_h * 4.0:
continue
# Horizontal overlap check
h_ov = max(0, min(box_i[2], box_j[2]) - max(box_i[0], box_j[0]))
min_w = min(xyxy_width(box_i), xyxy_width(box_j))
if h_ov / max(1, min_w) < 0.50:
continue
# Combined height sanity check
merged_h = (max(box_i[3], box_j[3]) - min(box_i[1], box_j[1]))
if merged_h > ih * 0.35:
continue
merge_pairs.append((bid_i, bid_j))
visited.add(bid_i)
visited.add(bid_j)
break
if not merge_pairs:
return bubbles, bubble_boxes, bubble_quads, bubble_indices
print(f"\n📐 Same-column dialogue merge: {len(merge_pairs)} pair(s):")
processed = set()
new_bubbles, new_boxes, new_quads, new_indices = {}, {}, {}, {}
next_bid = 1
for bid_a, bid_b in merge_pairs:
print(f" ✓ Merging BOX#{bid_a} + BOX#{bid_b}")
all_idx = sorted(
set(bubble_indices[bid_a]) | set(bubble_indices[bid_b]),
key=lambda k: (quad_bbox(ocr[k][0])[1], quad_bbox(ocr[k][0])[0])
)
new_bubbles[next_bid] = build_lines_from_indices(all_idx, ocr)
new_boxes[next_bid] = boxes_union_xyxy([quad_bbox(ocr[i][0]) for i in all_idx])
new_quads[next_bid] = [ocr[i][0] for i in all_idx]
new_indices[next_bid] = all_idx
processed.update({bid_a, bid_b})
next_bid += 1
for bid in bids:
if bid not in processed:
new_bubbles[next_bid] = bubbles[bid]
new_boxes[next_bid] = bubble_boxes[bid]
new_quads[next_bid] = bubble_quads[bid]
new_indices[next_bid] = bubble_indices[bid]
next_bid += 1
return new_bubbles, new_boxes, new_quads, new_indices
def auto_fix_bubble_detection(bubble_boxes, bubble_indices, bubble_quads, def auto_fix_bubble_detection(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr): bubbles, ocr, image_bgr):
""" """
Full fix pipeline: Full fix pipeline:
1. Split boxes that span multiple speech bubbles. 1. Split boxes spanning multiple bubbles.
2. Merge fragments detected inside the same contour. 2. Merge fragments inside the same contour.
3. Merge continuation boxes split across same bubble (NEW). 3. Merge continuation boxes (same bubble, split detection).
4. Proximity+overlap merge — pass 1. 4. FIX D: Merge same-column dialogue boxes.
5. Proximity+overlap merge — pass 2 (chain resolution). 5. Proximity+overlap merge — pass 1.
6. Proximity+overlap merge — pass 2.
""" """
print("\n🔍 Running automatic bubble detection fixes...") print("\n🔍 Running automatic bubble detection fixes...")
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
@@ -1793,11 +1899,15 @@ def auto_fix_bubble_detection(bubble_boxes, bubble_indices, bubble_quads,
detect_and_merge_fragmented_bubbles( detect_and_merge_fragmented_bubbles(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr) bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
# FIX: merge continuation boxes (same bubble, split detection)
bubbles, bubble_boxes, bubble_quads, bubble_indices = \ bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_continuation_boxes( merge_continuation_boxes(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr) bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
# FIX D: same-column dialogue merge
bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_same_column_dialogue_boxes(
bubble_boxes, bubble_indices, bubble_quads, bubbles, ocr, image_bgr)
# Pass 1 # Pass 1
bubbles, bubble_boxes, bubble_quads, bubble_indices = \ bubbles, bubble_boxes, bubble_quads, bubble_indices = \
merge_boxes_by_proximity_and_overlap( merge_boxes_by_proximity_and_overlap(
@@ -1991,14 +2101,16 @@ def split_group_by_region_type(indices: list, ocr: list) -> List[List[int]]:
def split_group_by_spatial_gap(indices: list, ocr: list, def split_group_by_spatial_gap(indices: list, ocr: list,
gap_factor: float = 1.8) -> List[List[int]]: gap_factor: float = 1.2) -> List[List[int]]:
""" """
Splits a group of OCR indices where a large spatial gap exists FIX C: Reduced gap_factor from 1.8 → 1.2 and added adaptive
between clusters — catches Box-22/007 where two dialogue bubbles minimum gap based on the actual inter-quad spacing distribution.
sit side-by-side with a visible horizontal gap.
Works in both axes: tries horizontal split first, then vertical. This catches tight splits like:
Returns original list if no significant gap is found. 007/box12: "YOU'RE A BIG MEAN JERK." vs "I HATE YOU, SY-ON BOY."
007/box15: three separate italic caption lines
007/box21: two side-by-side dialogue bubbles
008/box13: "AND I'M TOO CUTE..." vs "I WAS NOT!"
""" """
if len(indices) <= 1: if len(indices) <= 1:
return [indices] return [indices]
@@ -2006,16 +2118,47 @@ def split_group_by_spatial_gap(indices: list, ocr: list,
all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1]) all_h = [max(1, quad_bbox(ocr[i][0])[3] - quad_bbox(ocr[i][0])[1])
for i in indices] for i in indices]
med_h = float(np.median(all_h)) if all_h else 14.0 med_h = float(np.median(all_h)) if all_h else 14.0
gap_threshold = med_h * gap_factor
# ── Try horizontal split (left / right columns) ─────────── # ── Adaptive gap: use median inter-quad gap as baseline ───
sorted_by_y = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[1])
inter_gaps_y = []
for k in range(len(sorted_by_y) - 1):
b_curr = quad_bbox(ocr[sorted_by_y[k]][0])
b_next = quad_bbox(ocr[sorted_by_y[k+1]][0])
gap = b_next[1] - b_curr[3]
if gap > 0:
inter_gaps_y.append(gap)
# Adaptive threshold: max of (med_h * gap_factor) and
# (median_inter_gap * 2.5) — whichever is smaller wins
if inter_gaps_y:
median_inter = float(np.median(inter_gaps_y))
gap_threshold_y = min(med_h * gap_factor,
max(med_h * 0.8, median_inter * 2.5))
else:
gap_threshold_y = med_h * gap_factor
# ── Try horizontal split first ────────────────────────────
sorted_by_x = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[0]) sorted_by_x = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[0])
boxes_x = [quad_bbox(ocr[i][0]) for i in sorted_by_x] boxes_x = [quad_bbox(ocr[i][0]) for i in sorted_by_x]
inter_gaps_x = []
for k in range(len(sorted_by_x) - 1):
gap = boxes_x[k+1][0] - boxes_x[k][2]
if gap > 0:
inter_gaps_x.append(gap)
if inter_gaps_x:
median_inter_x = float(np.median(inter_gaps_x))
gap_threshold_x = min(med_h * gap_factor,
max(med_h * 0.8, median_inter_x * 2.5))
else:
gap_threshold_x = med_h * gap_factor
best_h_gap, best_h_split = 0.0, None best_h_gap, best_h_split = 0.0, None
for k in range(len(sorted_by_x) - 1): for k in range(len(sorted_by_x) - 1):
gap = boxes_x[k + 1][0] - boxes_x[k][2] gap = boxes_x[k + 1][0] - boxes_x[k][2]
if gap > gap_threshold and gap > best_h_gap: if gap > gap_threshold_x and gap > best_h_gap:
best_h_gap = gap best_h_gap = gap
best_h_split = k best_h_split = k
@@ -2023,16 +2166,17 @@ def split_group_by_spatial_gap(indices: list, ocr: list,
left = [sorted_by_x[i] for i in range(best_h_split + 1)] left = [sorted_by_x[i] for i in range(best_h_split + 1)]
right = [sorted_by_x[i] for i in range(best_h_split + 1, len(sorted_by_x))] right = [sorted_by_x[i] for i in range(best_h_split + 1, len(sorted_by_x))]
if left and right: if left and right:
return [left, right] # Recurse to catch further splits in each half
return (split_group_by_spatial_gap(left, ocr, gap_factor) +
split_group_by_spatial_gap(right, ocr, gap_factor))
# ── Try vertical split (top / bottom rows) ──────────────── # ── Try vertical split ────────────────────────────────────
sorted_by_y = sorted(indices, key=lambda i: quad_bbox(ocr[i][0])[1])
boxes_y = [quad_bbox(ocr[i][0]) for i in sorted_by_y] boxes_y = [quad_bbox(ocr[i][0]) for i in sorted_by_y]
best_v_gap, best_v_split = 0.0, None best_v_gap, best_v_split = 0.0, None
for k in range(len(sorted_by_y) - 1): for k in range(len(sorted_by_y) - 1):
gap = boxes_y[k + 1][1] - boxes_y[k][3] gap = boxes_y[k + 1][1] - boxes_y[k][3]
if gap > gap_threshold and gap > best_v_gap: if gap > gap_threshold_y and gap > best_v_gap:
best_v_gap = gap best_v_gap = gap
best_v_split = k best_v_split = k
@@ -2040,11 +2184,12 @@ def split_group_by_spatial_gap(indices: list, ocr: list,
top = [sorted_by_y[i] for i in range(best_v_split + 1)] top = [sorted_by_y[i] for i in range(best_v_split + 1)]
bottom = [sorted_by_y[i] for i in range(best_v_split + 1, len(sorted_by_y))] bottom = [sorted_by_y[i] for i in range(best_v_split + 1, len(sorted_by_y))]
if top and bottom: if top and bottom:
return [top, bottom] # Recurse to catch further splits in each half
return (split_group_by_spatial_gap(top, ocr, gap_factor) +
split_group_by_spatial_gap(bottom, ocr, gap_factor))
return [indices] return [indices]
def apply_contour_split_to_all_boxes(bubble_boxes, bubble_indices, bubble_quads, def apply_contour_split_to_all_boxes(bubble_boxes, bubble_indices, bubble_quads,
bubbles, ocr, image_bgr): bubbles, ocr, image_bgr):
""" """
@@ -2233,10 +2378,17 @@ class ImprovedMacVisionDetector:
Strategy: use the variant with the most detections as base, Strategy: use the variant with the most detections as base,
then fill gaps from other variants using IoU matching. then fill gaps from other variants using IoU matching.
""" """
"""
FIX E: Use self.langs[0] locale for is_meaningful_text()
instead of hardcoded "en", so short words like "BUT" and "I"
are protected when source_lang != "en".
"""
if not all_results: if not all_results:
return [] return []
# pick base = most detections # Derive source_lang string from self.langs[0] (e.g. "en-US" → "en")
lang_code = self.langs[0].split("-")[0].lower()
base_idx = max(range(len(all_results)), key=lambda i: len(all_results[i])) base_idx = max(range(len(all_results)), key=lambda i: len(all_results[i]))
base = list(all_results[base_idx]) base = list(all_results[base_idx])
others = [r for i, r in enumerate(all_results) if i != base_idx] others = [r for i, r in enumerate(all_results) if i != base_idx]
@@ -2248,17 +2400,16 @@ class ImprovedMacVisionDetector:
for k, (quad_b, text_b, conf_b) in enumerate(base): for k, (quad_b, text_b, conf_b) in enumerate(base):
box_b = quad_bbox(quad_b) box_b = quad_bbox(quad_b)
if boxes_iou(box_o, box_b) > 0.40: if boxes_iou(box_o, box_b) > 0.40:
# keep higher-confidence reading
if conf_o > conf_b: if conf_o > conf_b:
base[k] = (quad_b, text_o, conf_o) base[k] = (quad_b, text_o, conf_o)
matched = True matched = True
break break
if not matched and is_meaningful_text(text_o, "en"): # FIX E: use lang_code not hardcoded "en"
if not matched and is_meaningful_text(text_o, lang_code):
base.append((quad_o, text_o, conf_o)) base.append((quad_o, text_o, conf_o))
return base return base
# ============================================================ # ============================================================
# BUILD LINES FROM INDICES # BUILD LINES FROM INDICES
# ============================================================ # ============================================================
@@ -2271,12 +2422,14 @@ def build_lines_from_indices(indices, ocr, reading_mode="ltr"):
return [] return []
return build_text_from_layout(indices, ocr, reading_mode=reading_mode) return build_text_from_layout(indices, ocr, reading_mode=reading_mode)
def split_indices_into_vertical_blocks(indices, ocr, gap_factor=4.0):
def split_indices_into_vertical_blocks(indices, ocr, gap_factor=2.5):
""" """
Split indices into vertically separated blocks. FIX A: Raised gap_factor from 2.5 → 4.0
A new block starts when the vertical gap between consecutive
quads (sorted top-to-bottom) exceeds gap_factor * median_height. The old value cut off trailing punctuation tokens ("...!!", "DY",
"ENEMIES.") that sit a few pixels below the main text block.
A larger gap is needed before we consider two groups to be in
separate bubbles — contour splitting handles the real separations.
""" """
if not indices: if not indices:
return [] return []
@@ -2298,7 +2451,6 @@ def split_indices_into_vertical_blocks(indices, ocr, gap_factor=2.5):
return blocks return blocks
# ============================================================ # ============================================================
# SPLIT HELPERS FOR enforce_max_box_size # SPLIT HELPERS FOR enforce_max_box_size
# ============================================================ # ============================================================

View File

@@ -6,6 +6,8 @@ Translation OCR pipeline (Batch Processing Only)
Usage: Usage:
python pipeline-translator.py /path/to/chapter/folder python pipeline-translator.py /path/to/chapter/folder
python pipeline-translator.py /path/to/chapter/folder --start 2 --end 5
python pipeline-translator.py /path/to/chapter/folder --source en --target es
""" """
import os import os
@@ -14,6 +16,7 @@ import argparse
import importlib.util import importlib.util
from pathlib import Path from pathlib import Path
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
# PIPELINE CONFIGURATION # PIPELINE CONFIGURATION
# Maps to the process_manga_page() signature in manga-translator.py # Maps to the process_manga_page() signature in manga-translator.py
@@ -23,14 +26,53 @@ PIPELINE_CONFIG = dict(
target_lang = "ca", target_lang = "ca",
) )
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
# DYNAMIC MODULE LOADER # DYNAMIC MODULE LOADER
# FIX: Always evicts stale sys.modules entry and deletes
# __pycache__ for manga-translator.py before loading,
# so edits are ALWAYS picked up on every run.
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
def purge_bytecode_cache(filepath: str) -> None:
"""
Delete the compiled .pyc file for the given .py path so Python
cannot silently use a stale cached version of the module.
"""
import py_compile
from importlib.util import cache_from_source
try:
pyc_path = cache_from_source(filepath)
if os.path.exists(pyc_path):
os.remove(pyc_path)
print(f"🗑️ Purged bytecode cache: {pyc_path}")
except Exception as e:
# Non-fatal — just warn and continue
print(f"⚠️ Could not purge bytecode cache: {e}")
def load_module(name: str, filepath: str): def load_module(name: str, filepath: str):
"""
Dynamically load a .py file as a module.
FIX 1: Purge the .pyc cache so edits are always reflected.
FIX 2: Evict any previously loaded version from sys.modules
to prevent Python reusing a stale module object across
multiple calls (e.g. when running in a REPL or test loop).
"""
# FIX 1: delete stale bytecode
purge_bytecode_cache(filepath)
# FIX 2: evict from module registry
if name in sys.modules:
del sys.modules[name]
spec = importlib.util.spec_from_file_location(name, filepath) spec = importlib.util.spec_from_file_location(name, filepath)
if spec is None or spec.loader is None: if spec is None or spec.loader is None:
raise FileNotFoundError(f"Cannot load spec for {filepath}") raise FileNotFoundError(f"Cannot load module spec for: {filepath}")
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[name] = module # register before exec (handles self-refs)
spec.loader.exec_module(module) spec.loader.exec_module(module)
return module return module
@@ -39,6 +81,7 @@ def load_module(name: str, filepath: str):
# HELPERS # HELPERS
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
def sorted_pages(chapter_dir: Path): def sorted_pages(chapter_dir: Path):
"""Return all image files in chapter_dir sorted by filename stem."""
exts = {".jpg", ".jpeg", ".png", ".webp"} exts = {".jpg", ".jpeg", ".png", ".webp"}
pages = [ pages = [
p for p in chapter_dir.iterdir() p for p in chapter_dir.iterdir()
@@ -48,6 +91,7 @@ def sorted_pages(chapter_dir: Path):
def make_page_workdir(chapter_dir: Path, page_stem: str) -> Path: def make_page_workdir(chapter_dir: Path, page_stem: str) -> Path:
"""Create and return translated/<page_stem>/ inside chapter_dir."""
workdir = chapter_dir / "translated" / page_stem workdir = chapter_dir / "translated" / page_stem
workdir.mkdir(parents=True, exist_ok=True) workdir.mkdir(parents=True, exist_ok=True)
return workdir return workdir
@@ -55,10 +99,9 @@ def make_page_workdir(chapter_dir: Path, page_stem: str) -> Path:
def verify_translator_api(module) -> bool: def verify_translator_api(module) -> bool:
""" """
Checks that the loaded module exposes process_manga_page() Checks that the loaded module exposes process_manga_page() and
and that it accepts all keys defined in PIPELINE_CONFIG. that it accepts all keys defined in PIPELINE_CONFIG.
Prints a warning for any missing parameter so mismatches are Prints a clear warning for any missing parameter.
caught immediately rather than silently falling back to defaults.
""" """
import inspect import inspect
@@ -82,20 +125,55 @@ def verify_translator_api(module) -> bool:
return ok return ok
def sanity_check_fixes(module_path: Path) -> None:
"""
Grep the translator source for key fix signatures and warn if
any are missing. Helps catch cases where an edit was not saved.
"""
checks = {
"Fix A (gap_factor=4.0)": "gap_factor=4.0",
"Fix B (_majority_contour_id)": "_majority_contour_id",
"Fix C (median_inter adaptive gap)": "median_inter",
"Fix D (merge_same_column_dialogue)": "merge_same_column_dialogue_boxes",
"Fix E (lang_code from self.langs)": "lang_code = self.langs",
}
print("\n🔎 Sanity-checking fixes in manga-translator.py:")
source = module_path.read_text(encoding="utf-8")
all_ok = True
for label, token in checks.items():
found = token in source
status = "" if found else "❌ MISSING"
print(f" {status} {label}")
if not found:
all_ok = False
if not all_ok:
print(
"\n⚠️ One or more fixes are missing from manga-translator.py.\n"
" Save the file and re-run. Aborting.\n"
)
sys.exit(1)
else:
print(" All fixes present.\n")
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
# PER-PAGE PIPELINE # PER-PAGE PIPELINE
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
def process_page(page_path: Path, workdir: Path, translator_module) -> bool: def process_page(page_path: Path, workdir: Path, translator_module) -> bool:
print(f"\n{'' * 70}") print(f"\n{'' * 70}")
print(f" PAGE : {page_path.name}") print(f" PAGE : {page_path.name}")
print(f" OUT : {workdir}")
print(f"{'' * 70}") print(f"{'' * 70}")
orig_dir = os.getcwd() orig_dir = os.getcwd()
try: try:
# Run inside the page's own workdir so debug images and
# output files land there automatically.
os.chdir(workdir) os.chdir(workdir)
# Use absolute paths so output always lands in workdir
# regardless of any internal os.getcwd() calls.
output_json = str(workdir / "bubbles.json") output_json = str(workdir / "bubbles.json")
output_txt = str(workdir / "output.txt") output_txt = str(workdir / "output.txt")
debug_path = str(workdir / "debug_clusters.png") debug_path = str(workdir / "debug_clusters.png")
@@ -109,17 +187,23 @@ def process_page(page_path: Path, workdir: Path, translator_module) -> bool:
**PIPELINE_CONFIG, **PIPELINE_CONFIG,
) )
# ── Optional debug visualisation ───────────────────── # ── Debug visualisation ───────────────────────────────
if results: # FIX: process_manga_page() already writes debug_clusters.png
# internally with full OCR quad data.
# We do NOT call draw_debug_clusters() here with ocr=[]
# because that would OVERWRITE the correct debug image with
# a degraded version that has no quad outlines.
#
# If process_manga_page() did not write a debug image
# (e.g. older version), we do a minimal fallback draw.
if results and not os.path.exists(debug_path):
try: try:
import cv2 import cv2
image_bgr = cv2.imread(str(page_path.resolve())) image_bgr = cv2.imread(str(page_path.resolve()))
if image_bgr is not None: if image_bgr is not None:
# Reconstruct vis_boxes / vis_lines from results dict vis_boxes: dict = {}
vis_boxes = {} vis_lines: dict = {}
vis_lines = {} vis_indices: dict = {}
vis_indices = {}
for bid_str, data in results.items(): for bid_str, data in results.items():
bid = int(bid_str) bid = int(bid_str)
@@ -133,6 +217,7 @@ def process_page(page_path: Path, workdir: Path, translator_module) -> bool:
vis_lines[bid] = data.get("lines", []) vis_lines[bid] = data.get("lines", [])
vis_indices[bid] = [] vis_indices[bid] = []
# Fallback only — ocr=[] means no quad outlines
translator_module.draw_debug_clusters( translator_module.draw_debug_clusters(
image_bgr = image_bgr, image_bgr = image_bgr,
out_boxes = vis_boxes, out_boxes = vis_boxes,
@@ -141,14 +226,22 @@ def process_page(page_path: Path, workdir: Path, translator_module) -> bool:
ocr = [], ocr = [],
save_path = debug_path, save_path = debug_path,
) )
print(f" 🖼️ Fallback debug image written → {debug_path}")
except Exception as e: except Exception as e:
print(f" ⚠️ Debug visualisation failed (non-fatal): {e}") print(f" ⚠️ Debug visualisation failed (non-fatal): {e}")
# ── Sanity-check outputs ────────────────────────────── # ── Sanity-check output files ─────────────────────────
all_good = True
for fname in ("output.txt", "bubbles.json"): for fname in ("output.txt", "bubbles.json"):
fpath = workdir / fname fpath = workdir / fname
if not fpath.exists() or fpath.stat().st_size == 0: if not fpath.exists():
print(f" ⚠️ {fname} is missing or empty after processing.") print(f" ⚠️ {fname} was NOT created.")
all_good = False
elif fpath.stat().st_size == 0:
print(f" ⚠️ {fname} exists but is EMPTY.")
all_good = False
else:
print(f" 📄 {fname}{fpath.stat().st_size} bytes")
if not results: if not results:
print(" ⚠️ process_manga_page() returned no results.") print(" ⚠️ process_manga_page() returned no results.")
@@ -172,7 +265,14 @@ def process_page(page_path: Path, workdir: Path, translator_module) -> bool:
# ───────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Manga Translation OCR Batch Pipeline" description="Manga Translation OCR Batch Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python pipeline-translator.py pages-for-tests
python pipeline-translator.py pages-for-tests --start 2 --end 4
python pipeline-translator.py pages-for-tests --source en --target es
"""
) )
parser.add_argument( parser.add_argument(
"chapter_dir", "chapter_dir",
@@ -194,24 +294,27 @@ def main():
"--target", "-t", default=None, "--target", "-t", default=None,
help=f"Override target language (default: {PIPELINE_CONFIG['target_lang']})" help=f"Override target language (default: {PIPELINE_CONFIG['target_lang']})"
) )
parser.add_argument(
"--skip-sanity", action="store_true",
help="Skip the fix sanity check (not recommended)"
)
args = parser.parse_args() args = parser.parse_args()
# Allow CLI overrides of source/target without touching PIPELINE_CONFIG # ── Apply CLI language overrides ─────────────────────────
config = dict(PIPELINE_CONFIG) config = dict(PIPELINE_CONFIG)
if args.source: if args.source:
config["source_lang"] = args.source config["source_lang"] = args.source
if args.target: if args.target:
config["target_lang"] = args.target config["target_lang"] = args.target
# Patch PIPELINE_CONFIG in-place so process_page() picks up overrides
PIPELINE_CONFIG.update(config) PIPELINE_CONFIG.update(config)
# ── Resolve chapter directory ─────────────────────────────
chapter_dir = Path(args.chapter_dir).resolve() chapter_dir = Path(args.chapter_dir).resolve()
if not chapter_dir.is_dir(): if not chapter_dir.is_dir():
print(f"❌ Not a directory: {chapter_dir}") print(f"❌ Not a directory: {chapter_dir}")
sys.exit(1) sys.exit(1)
# ── Load translator module ──────────────────────────────── # ── Locate manga-translator.py ────────────────────────────
script_dir = Path(__file__).parent script_dir = Path(__file__).parent
module_path = script_dir / "manga-translator.py" module_path = script_dir / "manga-translator.py"
@@ -219,6 +322,11 @@ def main():
print(f"❌ manga-translator.py not found in {script_dir}") print(f"❌ manga-translator.py not found in {script_dir}")
sys.exit(1) sys.exit(1)
# ── Sanity-check that all fixes are present ───────────────
if not args.skip_sanity:
sanity_check_fixes(module_path)
# ── Load translator module ────────────────────────────────
print(f"📦 Loading translator from: {module_path}") print(f"📦 Loading translator from: {module_path}")
try: try:
translator = load_module("manga_translator", str(module_path)) translator = load_module("manga_translator", str(module_path))
@@ -231,13 +339,12 @@ def main():
print("❌ Aborting — fix the parameter mismatch above first.") print("❌ Aborting — fix the parameter mismatch above first.")
sys.exit(1) sys.exit(1)
# ── Discover pages ──────────────────────────────────────── # ── Discover and slice pages ──────────────────────────────
all_pages = sorted_pages(chapter_dir) all_pages = sorted_pages(chapter_dir)
if not all_pages: if not all_pages:
print(f"❌ No images found in: {chapter_dir}") print(f"❌ No image files found in: {chapter_dir}")
sys.exit(1) sys.exit(1)
# Apply --start / --end slice (1-based, inclusive)
start_idx = max(0, args.start - 1) start_idx = max(0, args.start - 1)
end_idx = args.end if args.end is not None else len(all_pages) end_idx = args.end if args.end is not None else len(all_pages)
pages = all_pages[start_idx:end_idx] pages = all_pages[start_idx:end_idx]
@@ -246,37 +353,38 @@ def main():
print(f"❌ No pages in range [{args.start}, {args.end}]") print(f"❌ No pages in range [{args.start}, {args.end}]")
sys.exit(1) sys.exit(1)
# ── Summary header ──────────────────────────────────────── print(f"\n📚 Chapter : {chapter_dir.name}")
print(f"\n{'' * 70}") print(f" Pages : {len(pages)} of {len(all_pages)} total")
print(f" 📖 Chapter : {chapter_dir.name}") print(f" Source : {PIPELINE_CONFIG['source_lang']}")
print(f" 📄 Pages : {len(pages)} " print(f" Target : {PIPELINE_CONFIG['target_lang']}")
f"(of {len(all_pages)} total, " print(f" Output : {chapter_dir / 'translated'}\n")
f"range {args.start}{end_idx})")
print(f" 🌐 Lang : {PIPELINE_CONFIG['source_lang']}"
f"{PIPELINE_CONFIG['target_lang']}")
print(f"{'' * 70}\n")
succeeded, failed = [], [] # ── Process each page ─────────────────────────────────────
results_summary = []
for i, page_path in enumerate(pages, start=1): for page_num, page_path in enumerate(pages, start=start_idx + 1):
print(f"[{i}/{len(pages)}] {page_path.name}")
workdir = make_page_workdir(chapter_dir, page_path.stem) workdir = make_page_workdir(chapter_dir, page_path.stem)
success = process_page(page_path, workdir, translator)
results_summary.append((page_num, page_path.name, success))
if process_page(page_path, workdir, translator): # ── Final summary ─────────────────────────────────────────
succeeded.append(page_path.name)
else:
failed.append(page_path.name)
# ── Final report ──────────────────────────────────────────
print(f"\n{'' * 70}") print(f"\n{'' * 70}")
print(" PIPELINE COMPLETE") print(f" BATCH COMPLETE")
print(f"{len(succeeded)} page(s) succeeded") print(f"{'' * 70}")
if failed:
print(f"{len(failed)} page(s) failed:") passed = sum(1 for _, _, ok in results_summary if ok)
for name in failed: failed = len(results_summary) - passed
print(f"{name}")
for page_num, name, ok in results_summary:
status = "" if ok else ""
print(f" {status} [{page_num:>3}] {name}")
print(f"\n Total: {passed} succeeded, {failed} failed")
print(f"{'' * 70}\n") print(f"{'' * 70}\n")
if failed:
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()