Files
manga-translator/manga-translator.py
Guillem Hernandez Sola 1bdbcf6ed4 Added all
2026-04-14 14:11:48 +02:00

886 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import os
import json
import difflib
import cv2
import numpy as np
import easyocr
from deep_translator import GoogleTranslator
# ─────────────────────────────────────────────
# LANGUAGE CODE REFERENCE
# ─────────────────────────────────────────────
SUPPORTED_LANGUAGES = {
"Vietnamese" : "vi",
"Japanese" : "ja",
"English" : "en",
"Spanish" : "es",
"Korean" : "ko",
"Chinese (Simplified)" : "ch_sim",
"Chinese (Traditional)": "ch_tra",
"French" : "fr",
"German" : "de",
"Italian" : "it",
"Portuguese" : "pt",
"Arabic" : "ar",
"Russian" : "ru",
"Thai" : "th",
"Catalan" : "ca",
}
# ─────────────────────────────────────────────
# DOMAIN GLOSSARY
# ─────────────────────────────────────────────
GLOSSARY = {
"ANYA": "ANYA",
"STELLA STAR": "STELLA STAR",
"MR. HENDERSON": "MR. HENDERSON",
"STARLIGHT ANYA": "STARLIGHT ANYA",
}
# Phrase-level fallback (source IT -> target CA)
PHRASE_MAP_IT_CA = {
"LA BAMBINA È ILLESA!": "LA NENA ESTÀ IL·LESA!",
"L'UOMO E LA DONNA SONO MORTI!": "L'HOME I LA DONA SÓN MORTS!",
"IL BAMBINO È FERITO GRAVEMENTE, MA È ANCORA VIVO!!": "EL NEN ESTÀ GREUMENT FERIT, PERÒ ENCARA ÉS VIU!!",
"UN CASO URGENTE...?": "UN CAS URGENT...?",
"UN CASO URGENTE,?": "UN CAS URGENT?",
}
ITALIAN_OCR_FIXES = [
(r"\bL'LOMO\b", "L'UOMO"),
(r"\bLOMO\b", "UOMO"),
(r"\bMORT I\b", "MORTI"),
(r"\bI[L1]LESA\b", "ILLESA"),
(r"\bBAM8INA\b", "BAMBINA"),
(r"\bBAM8INO\b", "BAMBINO"),
(r",\?", "?"),
(r"\?{2,}", "?"),
(r"\!{3,}", "!!"),
]
# ─────────────────────────────────────────────
# SOUND EFFECT FILTER
# ─────────────────────────────────────────────
SOUND_EFFECT_PATTERNS = [
r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$",
r"^oh+$", r"^ugh+$", r"^gr+$", r"^bam+$",
r"^pow+$", r"^crash+$", r"^boom+$", r"^bang+$",
r"^crack+$", r"^whoosh+$", r"^thud+$", r"^snap+$",
r"^zip+$", r"^swoosh+$", r"^chirp+$", r"^tweet+$",
]
def is_sound_effect(text):
cleaned = re.sub(r"[^a-z]", "", text.strip().lower())
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS)
# ─────────────────────────────────────────────
# TITLE / LOGO / AUTHOR FILTER
# ─────────────────────────────────────────────
TITLE_PATTERNS = [
r"^(mission|chapter|episode|vol\.?|volume)\s*\d+$",
r"^(spy|family|spy.family)$",
r"^by\s+.+$",
r"^[a-z]{1,4}\s+[a-z]+\s+[a-z]+$",
]
def is_title_text(text):
cleaned = text.strip().lower()
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in TITLE_PATTERNS)
# ─────────────────────────────────────────────
# GARBAGE TOKEN FILTER
# ─────────────────────────────────────────────
GARBAGE_PATTERNS = [
r"^[^a-zA-Z]*$",
r"^.{1,2}$",
r".*\d+.*",
r"^[A-Z]{1,4}$",
]
def is_garbage(text):
t = text.strip()
return any(re.fullmatch(p, t) for p in GARBAGE_PATTERNS)
# ─────────────────────────────────────────────
# TOKEN CLASSIFIER
# ─────────────────────────────────────────────
def classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects):
cleaned = text.strip()
if confidence < confidence_threshold:
return "noise"
if len(cleaned) < min_text_length:
return "noise"
if re.fullmatch(r"\d+", cleaned):
return "noise"
if len(cleaned) == 1 and not cleaned.isalpha():
return "noise"
if filter_sound_effects and is_sound_effect(cleaned):
return "noise"
if is_title_text(cleaned):
return "noise"
if is_garbage(cleaned):
return "noise"
if not any(ch.isalpha() for ch in cleaned):
return "punct"
return "alpha"
def should_keep_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects):
cat = classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects)
return cat != "noise", cat
# ─────────────────────────────────────────────
# QUAD / BBOX HELPERS
# ─────────────────────────────────────────────
def quad_bbox(quad):
xs = [pt[0] for pt in quad]
ys = [pt[1] for pt in quad]
return min(xs), min(ys), max(xs), max(ys)
def quad_center(quad):
x1, y1, x2, y2 = quad_bbox(quad)
return (x1 + x2) / 2.0, (y1 + y2) / 2.0
def quad_h(quad):
x1, y1, x2, y2 = quad_bbox(quad)
return max(1.0, y2 - y1)
def bbox_center(b):
x1, y1, x2, y2 = b
return (x1 + x2) / 2.0, (y1 + y2) / 2.0
def bbox_h(b):
return max(1.0, b[3] - b[1])
def distance_pt(a, b):
return ((a[0]-b[0])**2 + (a[1]-b[1])**2) ** 0.5
def quads_bbox(quads, image_shape, padding_px=10):
img_h, img_w = image_shape[:2]
all_x = [pt[0] for quad in quads for pt in quad]
all_y = [pt[1] for quad in quads for pt in quad]
x1 = max(0, min(all_x) - padding_px)
y1 = max(0, min(all_y) - padding_px)
x2 = min(img_w, max(all_x) + padding_px)
y2 = min(img_h, max(all_y) + padding_px)
return x1, y1, x2, y2
def bboxes_overlap_or_touch(a, b, gap_px=0):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
return gap_x <= gap_px and gap_y <= gap_px
# ─────────────────────────────────────────────
# TEXT NORMALIZATION
# ─────────────────────────────────────────────
def normalize_ocr_text(text):
t = text.strip().upper()
t = t.replace("", "\"").replace("", "\"")
t = t.replace("", "'").replace("", "'")
t = t.replace("", "...")
t = re.sub(r"\s+", " ", t)
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
t = re.sub(r"\(\s+", "(", t)
t = re.sub(r"\s+\)", ")", t)
t = re.sub(r"\.{4,}", "...", t)
t = re.sub(r",\?", "?", t)
return t.strip()
def italian_post_ocr_cleanup(text):
t = normalize_ocr_text(text)
for pat, rep in ITALIAN_OCR_FIXES:
t = re.sub(pat, rep, t, flags=re.IGNORECASE)
t = re.sub(r"\s{2,}", " ", t).strip().upper()
return t
def fix_hyphens(lines):
if not lines:
return ""
merged = lines[0]
for line in lines[1:]:
line = line.strip()
if merged.endswith("-"):
merged = merged[:-1] + line
else:
merged = merged + " " + line
merged = re.sub(r" {2,}", " ", merged).strip()
return normalize_ocr_text(merged)
def apply_glossary(text, glossary):
out = text
keys = sorted(glossary.keys(), key=len, reverse=True)
for k in keys:
v = glossary[k]
out = re.sub(rf"\b{re.escape(k)}\b", v, out, flags=re.IGNORECASE)
return out
# ─────────────────────────────────────────────
# TRANSLATION SAFETY
# ─────────────────────────────────────────────
def fuzzy_phrase_match(source_text, phrase_map, min_ratio=0.88):
if source_text in phrase_map:
return phrase_map[source_text], 1.0, source_text
best_key, best_ratio = None, 0.0
for k in phrase_map.keys():
ratio = difflib.SequenceMatcher(None, source_text, k).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_key = k
if best_key and best_ratio >= min_ratio:
return phrase_map[best_key], best_ratio, best_key
return None, best_ratio, best_key
def looks_suspicious_translation(src, tgt):
t = normalize_ocr_text(tgt)
bad_tokens = ["NEETA", "LOMO", "MORT I", "ESTA IL", "MORT I LA"]
if any(b in t for b in bad_tokens):
return True
if len(t) < 3:
return True
return False
# ─────────────────────────────────────────────
# LINE REBUILD (shared)
# ─────────────────────────────────────────────
def rebuild_bubble_lines_from_indices(indices, ocr_results):
if not indices:
return []
token_bboxes = [quad_bbox(ocr_results[i][0]) for i in indices]
items = []
for i, bx in zip(indices, token_bboxes):
xc = (bx[0] + bx[2]) / 2.0
yc = (bx[1] + bx[3]) / 2.0
h = max(1.0, bx[3] - bx[1])
items.append((i, xc, yc, h))
line_tol = max(6.0, float(np.median([it[3] for it in items])) * 0.6)
items.sort(key=lambda t: t[2])
lines = []
for it in items:
i, xc, yc, h = it
placed = False
for ln in lines:
if abs(yc - ln["yc"]) <= line_tol:
ln["members"].append((i, xc, yc))
ln["yc"] = np.mean([m[2] for m in ln["members"]])
placed = True
break
if not placed:
lines.append({"yc": yc, "members": [(i, xc, yc)]})
lines.sort(key=lambda ln: ln["yc"])
out = []
for ln in lines:
mem = sorted(ln["members"], key=lambda m: m[1])
toks = [ocr_results[i][1] for i, _, _ in mem]
line = " ".join(toks)
line = re.sub(r"\s+([,.;:!?])", r"\1", line)
line = re.sub(r"\(\s+", "(", line)
line = re.sub(r"\s+\)", ")", line)
out.append(normalize_ocr_text(line))
return out
# ─────────────────────────────────────────────
# GROUPING (pass 1)
# ─────────────────────────────────────────────
def group_quads_by_overlap(ocr_results, image_shape, gap_px=18, bbox_padding=10):
n = len(ocr_results)
if n == 0:
return {}, {}, {}
token_bboxes = [quad_bbox(r[0]) for r in ocr_results]
token_centers = [quad_center(r[0]) for r in ocr_results]
token_heights = [quad_h(r[0]) for r in ocr_results]
median_h = float(np.median(token_heights)) if token_heights else 12.0
dist_thresh = max(20.0, median_h * 2.2)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x, y):
parent[find(x)] = find(y)
for i in range(n):
for j in range(i + 1, n):
ov = bboxes_overlap_or_touch(token_bboxes[i], token_bboxes[j], gap_px=gap_px)
if ov:
union(i, j)
continue
cx1, cy1 = token_centers[i]
cx2, cy2 = token_centers[j]
d = ((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) ** 0.5
if d <= dist_thresh and abs(cy1 - cy2) <= median_h * 3.0:
union(i, j)
groups = {}
for i in range(n):
root = find(i)
groups.setdefault(root, []).append(i)
def group_sort_key(indices):
ys = [token_bboxes[i][1] for i in indices]
xs = [token_bboxes[i][0] for i in indices]
return (min(ys) // 150, min(xs))
sorted_groups = sorted(groups.values(), key=group_sort_key)
bubble_dict = {}
bbox_dict = {}
ocr_quads = {}
bubble_indices = {}
for gid, indices in enumerate(sorted_groups, start=1):
idxs = sorted(indices, key=lambda k: token_bboxes[k][1])
lines = rebuild_bubble_lines_from_indices(idxs, ocr_results)
quads = [ocr_results[k][0] for k in idxs]
bb = quads_bbox(quads, image_shape, padding_px=bbox_padding)
bubble_dict[gid] = lines
ocr_quads[gid] = quads
bbox_dict[gid] = bb
bubble_indices[gid] = idxs
return bubble_dict, bbox_dict, ocr_quads, bubble_indices
# ─────────────────────────────────────────────
# ORPHAN ABSORPTION (pass 2)
# ─────────────────────────────────────────────
def absorb_orphan_tokens_into_bubbles(
ocr_results,
bubble_dict,
bbox_dict,
ocr_quads,
bubble_indices,
image_shape,
bbox_padding=2,
gap_factor=1.9,
max_center_dist_factor=3.2,
):
n = len(ocr_results)
token_bboxes = [quad_bbox(r[0]) for r in ocr_results]
token_centers = [bbox_center(b) for b in token_bboxes]
token_heights = [bbox_h(b) for b in token_bboxes]
median_h = float(np.median(token_heights)) if token_heights else 12.0
used = set()
for bid, idxs in bubble_indices.items():
for i in idxs:
used.add(i)
orphan_indices = [i for i in range(n) if i not in used]
for i in orphan_indices:
tb = token_bboxes[i]
tc = token_centers[i]
best_bid = None
best_score = 1e18
for bid, bb in bbox_dict.items():
bc = bbox_center(bb)
dist = distance_pt(tc, bc)
bh = bbox_h(bb)
max_dist = max(60.0, median_h * max_center_dist_factor + bh * 0.15)
if dist > max_dist:
continue
near = bboxes_overlap_or_touch(tb, bb, gap_px=int(median_h * gap_factor))
y_ok = abs(tc[1] - bc[1]) <= max(bh * 0.65, median_h * 4.0)
if near or y_ok:
score = dist - (25.0 if near else 0.0)
if score < best_score:
best_score = score
best_bid = bid
if best_bid is not None:
bubble_indices.setdefault(best_bid, [])
bubble_indices[best_bid].append(i)
# rebuild bubbles after absorption
new_bubble_dict = {}
new_ocr_quads = {}
new_bbox_dict = {}
new_bubble_indices = {}
for bid in sorted(bubble_dict.keys()):
idxs = sorted(set(bubble_indices.get(bid, [])), key=lambda k: token_bboxes[k][1])
if not idxs:
idxs = []
lines = rebuild_bubble_lines_from_indices(idxs, ocr_results) if idxs else bubble_dict.get(bid, [])
quads = [ocr_results[k][0] for k in idxs] if idxs else ocr_quads.get(bid, [])
if quads:
bb = quads_bbox(quads, image_shape, padding_px=bbox_padding)
else:
bb = bbox_dict[bid]
new_bubble_dict[bid] = lines
new_ocr_quads[bid] = quads
new_bbox_dict[bid] = bb
new_bubble_indices[bid] = idxs
return new_bubble_dict, new_bbox_dict, new_ocr_quads, new_bubble_indices
# ─────────────────────────────────────────────
# OCR QUALITY SCORE
# ─────────────────────────────────────────────
def ocr_quality_score(text):
if not text or len(text) < 2:
return 0.0
alpha_ratio = sum(1 for c in text if c.isalpha()) / max(1, len(text))
penalty = 0.0
for p in [r",,", r"\.\.-", r"[^\w\s\'\!\?\.,\-]{2,}"]:
if re.search(p, text):
penalty += 0.2
bonus = 0.05 if re.search(r"[.!?]$", text) else 0.0
return max(0.0, min(1.0, alpha_ratio - penalty + bonus))
# ─────────────────────────────────────────────
# OCR VARIANTS
# ─────────────────────────────────────────────
def preprocess_variant(crop_bgr, mode):
gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
if mode == "raw":
return gray
if mode == "clahe":
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
return clahe.apply(gray)
if mode == "adaptive":
den = cv2.GaussianBlur(gray, (3, 3), 0)
return cv2.adaptiveThreshold(den, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 11)
return gray
def run_ocr_on_img_array(reader, img_arr):
temp_path = "_temp_crop_ocr.png"
cv2.imwrite(temp_path, img_arr)
try:
return reader.readtext(temp_path, paragraph=False)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
def reread_cluster_crop(image, bbox, reader, source_lang="en", padding_px=20, upscale_factor=2.5):
img_h, img_w = image.shape[:2]
x1, y1, x2, y2 = bbox
x1 = max(0, int(x1) - padding_px)
y1 = max(0, int(y1) - padding_px)
x2 = min(img_w, int(x2) + padding_px)
y2 = min(img_h, int(y2) + padding_px)
crop = image[y1:y2, x1:x2]
if crop.size == 0:
return None
new_w = int(crop.shape[1] * upscale_factor)
new_h = int(crop.shape[0] * upscale_factor)
upscaled = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
candidates = []
for mode in ("raw", "clahe", "adaptive"):
proc = preprocess_variant(upscaled, mode)
res = run_ocr_on_img_array(reader, proc)
if not res:
continue
res.sort(key=lambda r: (r[0][0][1], r[0][0][0]))
lines = [normalize_ocr_text(t) for _, t, _ in res if t.strip()]
merged = fix_hyphens(lines) if lines else ""
if source_lang == "it":
merged = italian_post_ocr_cleanup(merged)
score = ocr_quality_score(merged)
candidates.append((score, mode, merged))
if not candidates:
return None
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][2] if candidates[0][2] else None
# ─────────────────────────────────────────────
# AUTO GAP
# ─────────────────────────────────────────────
def compute_auto_gap(image_path, base_gap=18, reference_width=750):
image = cv2.imread(image_path)
if image is None:
return base_gap
img_w = image.shape[1]
return base_gap * (img_w / reference_width)
# ─────────────────────────────────────────────
# READING ORDER
# ─────────────────────────────────────────────
def estimate_reading_order(bbox_dict, mode="ltr"):
items = []
for bid, (x1, y1, x2, y2) in bbox_dict.items():
cx = (x1 + x2) / 2.0
cy = (y1 + y2) / 2.0
items.append((bid, x1, y1, x2, y2, cx, cy))
items.sort(key=lambda t: t[6])
rows = []
row_tol = 90
for it in items:
placed = False
for row in rows:
if abs(it[6] - row["cy"]) <= row_tol:
row["items"].append(it)
row["cy"] = np.mean([x[6] for x in row["items"]])
placed = True
break
if not placed:
rows.append({"cy": it[6], "items": [it]})
rows.sort(key=lambda r: r["cy"])
order = []
for r in rows:
if mode == "rtl":
r["items"].sort(key=lambda t: t[5], reverse=True)
else:
r["items"].sort(key=lambda t: t[5])
order.extend([it[0] for it in r["items"]])
return {bid: idx + 1 for idx, bid in enumerate(order)}
# ─────────────────────────────────────────────
# EXPORTERS
# ─────────────────────────────────────────────
def export_bubble_boxes(
bbox_dict,
ocr_quads_dict,
reading_order_map,
filepath="bubbles.json",
bbox_expand_ratio=0.16,
image_shape=None,
):
export = {}
for bubble_id, (x1, y1, x2, y2) in bbox_dict.items():
quads = ocr_quads_dict.get(bubble_id, [])
w_orig = x2 - x1
h_orig = y2 - y1
pad_x = int(w_orig * bbox_expand_ratio)
pad_y = int(h_orig * bbox_expand_ratio)
if image_shape is not None:
img_h, img_w = image_shape[:2]
ex1 = max(0, x1 - pad_x)
ey1 = max(0, y1 - pad_y)
ex2 = min(img_w, x2 + pad_x)
ey2 = min(img_h, y2 + pad_y)
else:
ex1, ey1, ex2, ey2 = x1 - pad_x, y1 - pad_y, x2 + pad_x, y2 + pad_y
export[str(bubble_id)] = {
"x": int(ex1),
"y": int(ey1),
"w": int(ex2 - ex1),
"h": int(ey2 - ey1),
"x_tight": int(x1),
"y_tight": int(y1),
"w_tight": int(w_orig),
"h_tight": int(h_orig),
"reading_order": int(reading_order_map.get(bubble_id, bubble_id)),
"quad_bboxes": [
{
"x": int(quad_bbox(q)[0]),
"y": int(quad_bbox(q)[1]),
"w": int(quad_bbox(q)[2] - quad_bbox(q)[0]),
"h": int(quad_bbox(q)[3] - quad_bbox(q)[1]),
}
for q in quads
],
"quads": [[[int(pt[0]), int(pt[1])] for pt in quad] for quad in quads],
}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(export, f, indent=2, ensure_ascii=False)
def write_output(output_lines, filepath):
with open(filepath, "w", encoding="utf-8") as f:
f.write("\n".join(output_lines))
# ─────────────────────────────────────────────
# DEBUG IMAGE
# ─────────────────────────────────────────────
def save_debug_clusters(image_path, ocr_results, bubble_dict, bbox_dict):
image = cv2.imread(image_path)
if image is None:
return
np.random.seed(42)
num_bubbles = max(bubble_dict.keys(), default=1)
colors = [tuple(int(c) for c in col) for col in np.random.randint(50, 230, size=(num_bubbles + 2, 3))]
# draw all OCR quads lightly
for bbox, text, _ in ocr_results:
pts = np.array(bbox, dtype=np.int32)
cv2.polylines(image, [pts], isClosed=True, color=(180, 180, 180), thickness=1)
# draw bubble bboxes
for bubble_id, (x1, y1, x2, y2) in bbox_dict.items():
color = colors[(bubble_id - 1) % len(colors)]
cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
cv2.putText(image, f"BOX#{bubble_id}", (int(x1) + 2, int(y1) + 16),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
cv2.imwrite("debug_clusters.png", image)
# ─────────────────────────────────────────────
# CORE FUNCTION
# ─────────────────────────────────────────────
def translate_manga_text(
image_path,
source_lang="en",
target_lang="ca",
confidence_threshold=0.12,
export_to_file=None,
export_bubbles_to="bubbles.json",
min_text_length=2,
gap_px="auto",
filter_sound_effects=True,
quality_threshold=0.62,
upscale_factor=2.5,
bbox_padding=3,
debug=False,
reading_mode="ltr",
):
# gap resolve
if gap_px == "auto":
resolved_gap = compute_auto_gap(image_path)
else:
resolved_gap = float(gap_px)
full_image = cv2.imread(image_path)
if full_image is None:
print(f"❌ Could not load image: {image_path}")
return
# OCR init
print("\nLoading OCR model...")
ocr_lang_list = ["en", "es"] if source_lang == "ca" else [source_lang]
reader = easyocr.Reader(ocr_lang_list)
# Translator init
translator = GoogleTranslator(source=source_lang, target=target_lang)
# OCR full image
print(f"\nRunning OCR on: {image_path}")
results = reader.readtext(image_path, paragraph=False)
print(f" Raw detections: {len(results)}")
# Filter tokens
filtered = []
skipped = 0
for bbox, text, confidence in results:
cleaned = normalize_ocr_text(text)
keep, _ = should_keep_token(cleaned, confidence, confidence_threshold, min_text_length, filter_sound_effects)
if keep:
filtered.append((bbox, cleaned, confidence))
else:
skipped += 1
print(f"{len(filtered)} kept, {skipped} skipped.\n")
if not filtered:
print("⚠️ No text detected after filtering.")
return
# Pass 1 grouping
bubble_dict, bbox_dict, ocr_quads, bubble_indices = group_quads_by_overlap(
filtered,
image_shape=full_image.shape,
gap_px=resolved_gap,
bbox_padding=bbox_padding,
)
# Pass 2 orphan absorption
bubble_dict, bbox_dict, ocr_quads, bubble_indices = absorb_orphan_tokens_into_bubbles(
ocr_results=filtered,
bubble_dict=bubble_dict,
bbox_dict=bbox_dict,
ocr_quads=ocr_quads,
bubble_indices=bubble_indices,
image_shape=full_image.shape,
bbox_padding=bbox_padding,
)
print(f"{len(bubble_dict)} bubble(s) detected after absorption.\n")
if debug:
save_debug_clusters(image_path, filtered, bubble_dict, bbox_dict)
# merge lines
clean_bubbles = {i: fix_hyphens(lines) for i, lines in bubble_dict.items() if lines}
# OCR quality + reread
print("Checking OCR quality per bubble...")
for i, text in clean_bubbles.items():
if source_lang == "it":
text = italian_post_ocr_cleanup(text)
clean_bubbles[i] = text
score = ocr_quality_score(text)
status = "" if score >= quality_threshold else "🔁"
print(f" #{i}: score={score:.2f} {status} '{text[:65]}'")
if score < quality_threshold:
reread = reread_cluster_crop(
full_image,
bbox_dict[i],
reader,
source_lang=source_lang,
upscale_factor=upscale_factor,
)
if reread:
clean_bubbles[i] = reread
# Reading order + glossary prepass
reading_order_map = estimate_reading_order(bbox_dict, mode=reading_mode)
for i in list(clean_bubbles.keys()):
clean_bubbles[i] = apply_glossary(clean_bubbles[i], GLOSSARY)
# Translate
header = "BUBBLE|ORDER|ORIGINAL|TRANSLATED|FLAGS"
divider = "" * 120
output_lines = [header, divider]
print()
print(f"{'BUBBLE':<8} {'ORDER':<6} {'ORIGINAL':<50} {'TRANSLATED':<50} FLAGS")
print(divider)
ordered_ids = sorted(clean_bubbles.keys(), key=lambda b: reading_order_map.get(b, b))
translated_count = 0
for i in ordered_ids:
src = clean_bubbles[i].strip()
if not src:
continue
flags = []
forced_translation = None
# phrase-map pass
if source_lang == "it" and target_lang == "ca":
exact = PHRASE_MAP_IT_CA.get(src)
if exact:
forced_translation = exact
flags.append("PHRASE_EXACT")
else:
fuzzy, ratio, _ = fuzzy_phrase_match(src, PHRASE_MAP_IT_CA, min_ratio=0.88)
if fuzzy:
forced_translation = fuzzy
flags.append(f"PHRASE_FUZZY:{ratio:.2f}")
if forced_translation is not None:
tgt = forced_translation
else:
try:
tgt = translator.translate(src)
except Exception as e:
tgt = f"[Translation error: {e}]"
if tgt is None:
tgt = "[No translation returned]"
tgt = normalize_ocr_text(tgt)
tgt = apply_glossary(tgt, GLOSSARY)
# suspicious retry
if looks_suspicious_translation(src, tgt):
flags.append("SUSPICIOUS_RETRY")
retry_src = italian_post_ocr_cleanup(src) if source_lang == "it" else src
try:
retry_tgt = translator.translate(retry_src)
if retry_tgt:
retry_tgt = normalize_ocr_text(retry_tgt)
retry_tgt = apply_glossary(retry_tgt, GLOSSARY)
if not looks_suspicious_translation(src, retry_tgt):
tgt = retry_tgt
flags.append("RETRY_OK")
else:
if source_lang == "it" and target_lang == "ca":
fallback, ratio, _ = fuzzy_phrase_match(src, PHRASE_MAP_IT_CA, min_ratio=0.80)
if fallback:
tgt = fallback
flags.append(f"FALLBACK_MAP:{ratio:.2f}")
except Exception:
pass
tgt = tgt.upper()
translated_count += 1
ro = reading_order_map.get(i, i)
output_lines.append(f"#{i}|{ro}|{src}|{tgt}|{','.join(flags) if flags else '-'}")
print(f"#{i:<7} {ro:<6} {src:<50} {tgt:<50} {','.join(flags) if flags else '-'}")
output_lines.append(divider)
summary = f"✅ Done! {translated_count} bubble(s) translated, {skipped} detection(s) skipped."
output_lines.append(summary)
print(divider)
print(summary)
if export_to_file:
write_output(output_lines, export_to_file)
if export_bubbles_to:
export_bubble_boxes(
bbox_dict,
ocr_quads,
reading_order_map=reading_order_map,
filepath=export_bubbles_to,
bbox_expand_ratio=0.16,
image_shape=full_image.shape,
)
# ─────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────
if __name__ == "__main__":
translate_manga_text(
image_path="001-page.png",
source_lang="it",
target_lang="ca",
confidence_threshold=0.12,
min_text_length=2,
export_to_file="output.txt",
export_bubbles_to="bubbles.json",
gap_px="auto",
filter_sound_effects=True,
quality_threshold=0.62,
upscale_factor=2.5,
bbox_padding=3,
debug=True,
reading_mode="ltr",
)