886 lines
31 KiB
Python
886 lines
31 KiB
Python
import re
|
||
import os
|
||
import json
|
||
import difflib
|
||
import cv2
|
||
import numpy as np
|
||
import easyocr
|
||
from deep_translator import GoogleTranslator
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# LANGUAGE CODE REFERENCE
|
||
# ─────────────────────────────────────────────
|
||
SUPPORTED_LANGUAGES = {
|
||
"Vietnamese" : "vi",
|
||
"Japanese" : "ja",
|
||
"English" : "en",
|
||
"Spanish" : "es",
|
||
"Korean" : "ko",
|
||
"Chinese (Simplified)" : "ch_sim",
|
||
"Chinese (Traditional)": "ch_tra",
|
||
"French" : "fr",
|
||
"German" : "de",
|
||
"Italian" : "it",
|
||
"Portuguese" : "pt",
|
||
"Arabic" : "ar",
|
||
"Russian" : "ru",
|
||
"Thai" : "th",
|
||
"Catalan" : "ca",
|
||
}
|
||
|
||
# ─────────────────────────────────────────────
|
||
# DOMAIN GLOSSARY
|
||
# ─────────────────────────────────────────────
|
||
GLOSSARY = {
|
||
"ANYA": "ANYA",
|
||
"STELLA STAR": "STELLA STAR",
|
||
"MR. HENDERSON": "MR. HENDERSON",
|
||
"STARLIGHT ANYA": "STARLIGHT ANYA",
|
||
}
|
||
|
||
# Phrase-level fallback (source IT -> target CA)
|
||
PHRASE_MAP_IT_CA = {
|
||
"LA BAMBINA È ILLESA!": "LA NENA ESTÀ IL·LESA!",
|
||
"L'UOMO E LA DONNA SONO MORTI!": "L'HOME I LA DONA SÓN MORTS!",
|
||
"IL BAMBINO È FERITO GRAVEMENTE, MA È ANCORA VIVO!!": "EL NEN ESTÀ GREUMENT FERIT, PERÒ ENCARA ÉS VIU!!",
|
||
"UN CASO URGENTE...?": "UN CAS URGENT...?",
|
||
"UN CASO URGENTE,?": "UN CAS URGENT?",
|
||
}
|
||
|
||
ITALIAN_OCR_FIXES = [
|
||
(r"\bL'LOMO\b", "L'UOMO"),
|
||
(r"\bLOMO\b", "UOMO"),
|
||
(r"\bMORT I\b", "MORTI"),
|
||
(r"\bI[L1]LESA\b", "ILLESA"),
|
||
(r"\bBAM8INA\b", "BAMBINA"),
|
||
(r"\bBAM8INO\b", "BAMBINO"),
|
||
(r",\?", "?"),
|
||
(r"\?{2,}", "?"),
|
||
(r"\!{3,}", "!!"),
|
||
]
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# SOUND EFFECT FILTER
|
||
# ─────────────────────────────────────────────
|
||
SOUND_EFFECT_PATTERNS = [
|
||
r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$",
|
||
r"^oh+$", r"^ugh+$", r"^gr+$", r"^bam+$",
|
||
r"^pow+$", r"^crash+$", r"^boom+$", r"^bang+$",
|
||
r"^crack+$", r"^whoosh+$", r"^thud+$", r"^snap+$",
|
||
r"^zip+$", r"^swoosh+$", r"^chirp+$", r"^tweet+$",
|
||
]
|
||
|
||
def is_sound_effect(text):
|
||
cleaned = re.sub(r"[^a-z]", "", text.strip().lower())
|
||
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# TITLE / LOGO / AUTHOR FILTER
|
||
# ─────────────────────────────────────────────
|
||
TITLE_PATTERNS = [
|
||
r"^(mission|chapter|episode|vol\.?|volume)\s*\d+$",
|
||
r"^(spy|family|spy.family)$",
|
||
r"^by\s+.+$",
|
||
r"^[a-z]{1,4}\s+[a-z]+\s+[a-z]+$",
|
||
]
|
||
|
||
def is_title_text(text):
|
||
cleaned = text.strip().lower()
|
||
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in TITLE_PATTERNS)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# GARBAGE TOKEN FILTER
|
||
# ─────────────────────────────────────────────
|
||
GARBAGE_PATTERNS = [
|
||
r"^[^a-zA-Z]*$",
|
||
r"^.{1,2}$",
|
||
r".*\d+.*",
|
||
r"^[A-Z]{1,4}$",
|
||
]
|
||
|
||
def is_garbage(text):
|
||
t = text.strip()
|
||
return any(re.fullmatch(p, t) for p in GARBAGE_PATTERNS)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# TOKEN CLASSIFIER
|
||
# ─────────────────────────────────────────────
|
||
def classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects):
|
||
cleaned = text.strip()
|
||
|
||
if confidence < confidence_threshold:
|
||
return "noise"
|
||
if len(cleaned) < min_text_length:
|
||
return "noise"
|
||
if re.fullmatch(r"\d+", cleaned):
|
||
return "noise"
|
||
if len(cleaned) == 1 and not cleaned.isalpha():
|
||
return "noise"
|
||
if filter_sound_effects and is_sound_effect(cleaned):
|
||
return "noise"
|
||
if is_title_text(cleaned):
|
||
return "noise"
|
||
if is_garbage(cleaned):
|
||
return "noise"
|
||
if not any(ch.isalpha() for ch in cleaned):
|
||
return "punct"
|
||
return "alpha"
|
||
|
||
def should_keep_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects):
|
||
cat = classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects)
|
||
return cat != "noise", cat
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# QUAD / BBOX HELPERS
|
||
# ─────────────────────────────────────────────
|
||
def quad_bbox(quad):
|
||
xs = [pt[0] for pt in quad]
|
||
ys = [pt[1] for pt in quad]
|
||
return min(xs), min(ys), max(xs), max(ys)
|
||
|
||
def quad_center(quad):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return (x1 + x2) / 2.0, (y1 + y2) / 2.0
|
||
|
||
def quad_h(quad):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return max(1.0, y2 - y1)
|
||
|
||
def bbox_center(b):
|
||
x1, y1, x2, y2 = b
|
||
return (x1 + x2) / 2.0, (y1 + y2) / 2.0
|
||
|
||
def bbox_h(b):
|
||
return max(1.0, b[3] - b[1])
|
||
|
||
def distance_pt(a, b):
|
||
return ((a[0]-b[0])**2 + (a[1]-b[1])**2) ** 0.5
|
||
|
||
def quads_bbox(quads, image_shape, padding_px=10):
|
||
img_h, img_w = image_shape[:2]
|
||
all_x = [pt[0] for quad in quads for pt in quad]
|
||
all_y = [pt[1] for quad in quads for pt in quad]
|
||
x1 = max(0, min(all_x) - padding_px)
|
||
y1 = max(0, min(all_y) - padding_px)
|
||
x2 = min(img_w, max(all_x) + padding_px)
|
||
y2 = min(img_h, max(all_y) + padding_px)
|
||
return x1, y1, x2, y2
|
||
|
||
def bboxes_overlap_or_touch(a, b, gap_px=0):
|
||
ax1, ay1, ax2, ay2 = a
|
||
bx1, by1, bx2, by2 = b
|
||
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
|
||
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
|
||
return gap_x <= gap_px and gap_y <= gap_px
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# TEXT NORMALIZATION
|
||
# ─────────────────────────────────────────────
|
||
def normalize_ocr_text(text):
|
||
t = text.strip().upper()
|
||
t = t.replace("“", "\"").replace("”", "\"")
|
||
t = t.replace("’", "'").replace("‘", "'")
|
||
t = t.replace("…", "...")
|
||
t = re.sub(r"\s+", " ", t)
|
||
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
|
||
t = re.sub(r"\(\s+", "(", t)
|
||
t = re.sub(r"\s+\)", ")", t)
|
||
t = re.sub(r"\.{4,}", "...", t)
|
||
t = re.sub(r",\?", "?", t)
|
||
return t.strip()
|
||
|
||
def italian_post_ocr_cleanup(text):
|
||
t = normalize_ocr_text(text)
|
||
for pat, rep in ITALIAN_OCR_FIXES:
|
||
t = re.sub(pat, rep, t, flags=re.IGNORECASE)
|
||
t = re.sub(r"\s{2,}", " ", t).strip().upper()
|
||
return t
|
||
|
||
def fix_hyphens(lines):
|
||
if not lines:
|
||
return ""
|
||
merged = lines[0]
|
||
for line in lines[1:]:
|
||
line = line.strip()
|
||
if merged.endswith("-"):
|
||
merged = merged[:-1] + line
|
||
else:
|
||
merged = merged + " " + line
|
||
merged = re.sub(r" {2,}", " ", merged).strip()
|
||
return normalize_ocr_text(merged)
|
||
|
||
def apply_glossary(text, glossary):
|
||
out = text
|
||
keys = sorted(glossary.keys(), key=len, reverse=True)
|
||
for k in keys:
|
||
v = glossary[k]
|
||
out = re.sub(rf"\b{re.escape(k)}\b", v, out, flags=re.IGNORECASE)
|
||
return out
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# TRANSLATION SAFETY
|
||
# ─────────────────────────────────────────────
|
||
def fuzzy_phrase_match(source_text, phrase_map, min_ratio=0.88):
|
||
if source_text in phrase_map:
|
||
return phrase_map[source_text], 1.0, source_text
|
||
|
||
best_key, best_ratio = None, 0.0
|
||
for k in phrase_map.keys():
|
||
ratio = difflib.SequenceMatcher(None, source_text, k).ratio()
|
||
if ratio > best_ratio:
|
||
best_ratio = ratio
|
||
best_key = k
|
||
|
||
if best_key and best_ratio >= min_ratio:
|
||
return phrase_map[best_key], best_ratio, best_key
|
||
return None, best_ratio, best_key
|
||
|
||
def looks_suspicious_translation(src, tgt):
|
||
t = normalize_ocr_text(tgt)
|
||
bad_tokens = ["NEETA", "LOMO", "MORT I", "ESTA IL", "MORT I LA"]
|
||
if any(b in t for b in bad_tokens):
|
||
return True
|
||
if len(t) < 3:
|
||
return True
|
||
return False
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# LINE REBUILD (shared)
|
||
# ─────────────────────────────────────────────
|
||
def rebuild_bubble_lines_from_indices(indices, ocr_results):
|
||
if not indices:
|
||
return []
|
||
|
||
token_bboxes = [quad_bbox(ocr_results[i][0]) for i in indices]
|
||
items = []
|
||
for i, bx in zip(indices, token_bboxes):
|
||
xc = (bx[0] + bx[2]) / 2.0
|
||
yc = (bx[1] + bx[3]) / 2.0
|
||
h = max(1.0, bx[3] - bx[1])
|
||
items.append((i, xc, yc, h))
|
||
|
||
line_tol = max(6.0, float(np.median([it[3] for it in items])) * 0.6)
|
||
items.sort(key=lambda t: t[2])
|
||
|
||
lines = []
|
||
for it in items:
|
||
i, xc, yc, h = it
|
||
placed = False
|
||
for ln in lines:
|
||
if abs(yc - ln["yc"]) <= line_tol:
|
||
ln["members"].append((i, xc, yc))
|
||
ln["yc"] = np.mean([m[2] for m in ln["members"]])
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
lines.append({"yc": yc, "members": [(i, xc, yc)]})
|
||
|
||
lines.sort(key=lambda ln: ln["yc"])
|
||
out = []
|
||
for ln in lines:
|
||
mem = sorted(ln["members"], key=lambda m: m[1])
|
||
toks = [ocr_results[i][1] for i, _, _ in mem]
|
||
line = " ".join(toks)
|
||
line = re.sub(r"\s+([,.;:!?])", r"\1", line)
|
||
line = re.sub(r"\(\s+", "(", line)
|
||
line = re.sub(r"\s+\)", ")", line)
|
||
out.append(normalize_ocr_text(line))
|
||
return out
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# GROUPING (pass 1)
|
||
# ─────────────────────────────────────────────
|
||
def group_quads_by_overlap(ocr_results, image_shape, gap_px=18, bbox_padding=10):
|
||
n = len(ocr_results)
|
||
if n == 0:
|
||
return {}, {}, {}
|
||
|
||
token_bboxes = [quad_bbox(r[0]) for r in ocr_results]
|
||
token_centers = [quad_center(r[0]) for r in ocr_results]
|
||
token_heights = [quad_h(r[0]) for r in ocr_results]
|
||
median_h = float(np.median(token_heights)) if token_heights else 12.0
|
||
dist_thresh = max(20.0, median_h * 2.2)
|
||
|
||
parent = list(range(n))
|
||
|
||
def find(x):
|
||
while parent[x] != x:
|
||
parent[x] = parent[parent[x]]
|
||
x = parent[x]
|
||
return x
|
||
|
||
def union(x, y):
|
||
parent[find(x)] = find(y)
|
||
|
||
for i in range(n):
|
||
for j in range(i + 1, n):
|
||
ov = bboxes_overlap_or_touch(token_bboxes[i], token_bboxes[j], gap_px=gap_px)
|
||
if ov:
|
||
union(i, j)
|
||
continue
|
||
cx1, cy1 = token_centers[i]
|
||
cx2, cy2 = token_centers[j]
|
||
d = ((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) ** 0.5
|
||
if d <= dist_thresh and abs(cy1 - cy2) <= median_h * 3.0:
|
||
union(i, j)
|
||
|
||
groups = {}
|
||
for i in range(n):
|
||
root = find(i)
|
||
groups.setdefault(root, []).append(i)
|
||
|
||
def group_sort_key(indices):
|
||
ys = [token_bboxes[i][1] for i in indices]
|
||
xs = [token_bboxes[i][0] for i in indices]
|
||
return (min(ys) // 150, min(xs))
|
||
|
||
sorted_groups = sorted(groups.values(), key=group_sort_key)
|
||
|
||
bubble_dict = {}
|
||
bbox_dict = {}
|
||
ocr_quads = {}
|
||
bubble_indices = {}
|
||
|
||
for gid, indices in enumerate(sorted_groups, start=1):
|
||
idxs = sorted(indices, key=lambda k: token_bboxes[k][1])
|
||
lines = rebuild_bubble_lines_from_indices(idxs, ocr_results)
|
||
quads = [ocr_results[k][0] for k in idxs]
|
||
bb = quads_bbox(quads, image_shape, padding_px=bbox_padding)
|
||
|
||
bubble_dict[gid] = lines
|
||
ocr_quads[gid] = quads
|
||
bbox_dict[gid] = bb
|
||
bubble_indices[gid] = idxs
|
||
|
||
return bubble_dict, bbox_dict, ocr_quads, bubble_indices
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# ORPHAN ABSORPTION (pass 2)
|
||
# ─────────────────────────────────────────────
|
||
def absorb_orphan_tokens_into_bubbles(
|
||
ocr_results,
|
||
bubble_dict,
|
||
bbox_dict,
|
||
ocr_quads,
|
||
bubble_indices,
|
||
image_shape,
|
||
bbox_padding=2,
|
||
gap_factor=1.9,
|
||
max_center_dist_factor=3.2,
|
||
):
|
||
n = len(ocr_results)
|
||
token_bboxes = [quad_bbox(r[0]) for r in ocr_results]
|
||
token_centers = [bbox_center(b) for b in token_bboxes]
|
||
token_heights = [bbox_h(b) for b in token_bboxes]
|
||
median_h = float(np.median(token_heights)) if token_heights else 12.0
|
||
|
||
used = set()
|
||
for bid, idxs in bubble_indices.items():
|
||
for i in idxs:
|
||
used.add(i)
|
||
|
||
orphan_indices = [i for i in range(n) if i not in used]
|
||
|
||
for i in orphan_indices:
|
||
tb = token_bboxes[i]
|
||
tc = token_centers[i]
|
||
|
||
best_bid = None
|
||
best_score = 1e18
|
||
|
||
for bid, bb in bbox_dict.items():
|
||
bc = bbox_center(bb)
|
||
dist = distance_pt(tc, bc)
|
||
bh = bbox_h(bb)
|
||
|
||
max_dist = max(60.0, median_h * max_center_dist_factor + bh * 0.15)
|
||
if dist > max_dist:
|
||
continue
|
||
|
||
near = bboxes_overlap_or_touch(tb, bb, gap_px=int(median_h * gap_factor))
|
||
y_ok = abs(tc[1] - bc[1]) <= max(bh * 0.65, median_h * 4.0)
|
||
|
||
if near or y_ok:
|
||
score = dist - (25.0 if near else 0.0)
|
||
if score < best_score:
|
||
best_score = score
|
||
best_bid = bid
|
||
|
||
if best_bid is not None:
|
||
bubble_indices.setdefault(best_bid, [])
|
||
bubble_indices[best_bid].append(i)
|
||
|
||
# rebuild bubbles after absorption
|
||
new_bubble_dict = {}
|
||
new_ocr_quads = {}
|
||
new_bbox_dict = {}
|
||
new_bubble_indices = {}
|
||
|
||
for bid in sorted(bubble_dict.keys()):
|
||
idxs = sorted(set(bubble_indices.get(bid, [])), key=lambda k: token_bboxes[k][1])
|
||
if not idxs:
|
||
idxs = []
|
||
|
||
lines = rebuild_bubble_lines_from_indices(idxs, ocr_results) if idxs else bubble_dict.get(bid, [])
|
||
quads = [ocr_results[k][0] for k in idxs] if idxs else ocr_quads.get(bid, [])
|
||
|
||
if quads:
|
||
bb = quads_bbox(quads, image_shape, padding_px=bbox_padding)
|
||
else:
|
||
bb = bbox_dict[bid]
|
||
|
||
new_bubble_dict[bid] = lines
|
||
new_ocr_quads[bid] = quads
|
||
new_bbox_dict[bid] = bb
|
||
new_bubble_indices[bid] = idxs
|
||
|
||
return new_bubble_dict, new_bbox_dict, new_ocr_quads, new_bubble_indices
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# OCR QUALITY SCORE
|
||
# ─────────────────────────────────────────────
|
||
def ocr_quality_score(text):
|
||
if not text or len(text) < 2:
|
||
return 0.0
|
||
alpha_ratio = sum(1 for c in text if c.isalpha()) / max(1, len(text))
|
||
penalty = 0.0
|
||
for p in [r",,", r"\.\.-", r"[^\w\s\'\!\?\.,\-]{2,}"]:
|
||
if re.search(p, text):
|
||
penalty += 0.2
|
||
bonus = 0.05 if re.search(r"[.!?]$", text) else 0.0
|
||
return max(0.0, min(1.0, alpha_ratio - penalty + bonus))
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# OCR VARIANTS
|
||
# ─────────────────────────────────────────────
|
||
def preprocess_variant(crop_bgr, mode):
|
||
gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
|
||
if mode == "raw":
|
||
return gray
|
||
if mode == "clahe":
|
||
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
||
return clahe.apply(gray)
|
||
if mode == "adaptive":
|
||
den = cv2.GaussianBlur(gray, (3, 3), 0)
|
||
return cv2.adaptiveThreshold(den, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 11)
|
||
return gray
|
||
|
||
def run_ocr_on_img_array(reader, img_arr):
|
||
temp_path = "_temp_crop_ocr.png"
|
||
cv2.imwrite(temp_path, img_arr)
|
||
try:
|
||
return reader.readtext(temp_path, paragraph=False)
|
||
finally:
|
||
if os.path.exists(temp_path):
|
||
os.remove(temp_path)
|
||
|
||
def reread_cluster_crop(image, bbox, reader, source_lang="en", padding_px=20, upscale_factor=2.5):
|
||
img_h, img_w = image.shape[:2]
|
||
x1, y1, x2, y2 = bbox
|
||
x1 = max(0, int(x1) - padding_px)
|
||
y1 = max(0, int(y1) - padding_px)
|
||
x2 = min(img_w, int(x2) + padding_px)
|
||
y2 = min(img_h, int(y2) + padding_px)
|
||
|
||
crop = image[y1:y2, x1:x2]
|
||
if crop.size == 0:
|
||
return None
|
||
|
||
new_w = int(crop.shape[1] * upscale_factor)
|
||
new_h = int(crop.shape[0] * upscale_factor)
|
||
upscaled = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
|
||
|
||
candidates = []
|
||
for mode in ("raw", "clahe", "adaptive"):
|
||
proc = preprocess_variant(upscaled, mode)
|
||
res = run_ocr_on_img_array(reader, proc)
|
||
if not res:
|
||
continue
|
||
res.sort(key=lambda r: (r[0][0][1], r[0][0][0]))
|
||
lines = [normalize_ocr_text(t) for _, t, _ in res if t.strip()]
|
||
merged = fix_hyphens(lines) if lines else ""
|
||
if source_lang == "it":
|
||
merged = italian_post_ocr_cleanup(merged)
|
||
score = ocr_quality_score(merged)
|
||
candidates.append((score, mode, merged))
|
||
|
||
if not candidates:
|
||
return None
|
||
candidates.sort(key=lambda x: x[0], reverse=True)
|
||
return candidates[0][2] if candidates[0][2] else None
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# AUTO GAP
|
||
# ─────────────────────────────────────────────
|
||
def compute_auto_gap(image_path, base_gap=18, reference_width=750):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
return base_gap
|
||
img_w = image.shape[1]
|
||
return base_gap * (img_w / reference_width)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# READING ORDER
|
||
# ─────────────────────────────────────────────
|
||
def estimate_reading_order(bbox_dict, mode="ltr"):
|
||
items = []
|
||
for bid, (x1, y1, x2, y2) in bbox_dict.items():
|
||
cx = (x1 + x2) / 2.0
|
||
cy = (y1 + y2) / 2.0
|
||
items.append((bid, x1, y1, x2, y2, cx, cy))
|
||
|
||
items.sort(key=lambda t: t[6])
|
||
rows = []
|
||
row_tol = 90
|
||
for it in items:
|
||
placed = False
|
||
for row in rows:
|
||
if abs(it[6] - row["cy"]) <= row_tol:
|
||
row["items"].append(it)
|
||
row["cy"] = np.mean([x[6] for x in row["items"]])
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"cy": it[6], "items": [it]})
|
||
|
||
rows.sort(key=lambda r: r["cy"])
|
||
order = []
|
||
for r in rows:
|
||
if mode == "rtl":
|
||
r["items"].sort(key=lambda t: t[5], reverse=True)
|
||
else:
|
||
r["items"].sort(key=lambda t: t[5])
|
||
order.extend([it[0] for it in r["items"]])
|
||
|
||
return {bid: idx + 1 for idx, bid in enumerate(order)}
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# EXPORTERS
|
||
# ─────────────────────────────────────────────
|
||
def export_bubble_boxes(
|
||
bbox_dict,
|
||
ocr_quads_dict,
|
||
reading_order_map,
|
||
filepath="bubbles.json",
|
||
bbox_expand_ratio=0.16,
|
||
image_shape=None,
|
||
):
|
||
export = {}
|
||
for bubble_id, (x1, y1, x2, y2) in bbox_dict.items():
|
||
quads = ocr_quads_dict.get(bubble_id, [])
|
||
|
||
w_orig = x2 - x1
|
||
h_orig = y2 - y1
|
||
pad_x = int(w_orig * bbox_expand_ratio)
|
||
pad_y = int(h_orig * bbox_expand_ratio)
|
||
|
||
if image_shape is not None:
|
||
img_h, img_w = image_shape[:2]
|
||
ex1 = max(0, x1 - pad_x)
|
||
ey1 = max(0, y1 - pad_y)
|
||
ex2 = min(img_w, x2 + pad_x)
|
||
ey2 = min(img_h, y2 + pad_y)
|
||
else:
|
||
ex1, ey1, ex2, ey2 = x1 - pad_x, y1 - pad_y, x2 + pad_x, y2 + pad_y
|
||
|
||
export[str(bubble_id)] = {
|
||
"x": int(ex1),
|
||
"y": int(ey1),
|
||
"w": int(ex2 - ex1),
|
||
"h": int(ey2 - ey1),
|
||
"x_tight": int(x1),
|
||
"y_tight": int(y1),
|
||
"w_tight": int(w_orig),
|
||
"h_tight": int(h_orig),
|
||
"reading_order": int(reading_order_map.get(bubble_id, bubble_id)),
|
||
"quad_bboxes": [
|
||
{
|
||
"x": int(quad_bbox(q)[0]),
|
||
"y": int(quad_bbox(q)[1]),
|
||
"w": int(quad_bbox(q)[2] - quad_bbox(q)[0]),
|
||
"h": int(quad_bbox(q)[3] - quad_bbox(q)[1]),
|
||
}
|
||
for q in quads
|
||
],
|
||
"quads": [[[int(pt[0]), int(pt[1])] for pt in quad] for quad in quads],
|
||
}
|
||
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
json.dump(export, f, indent=2, ensure_ascii=False)
|
||
|
||
def write_output(output_lines, filepath):
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(output_lines))
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# DEBUG IMAGE
|
||
# ─────────────────────────────────────────────
|
||
def save_debug_clusters(image_path, ocr_results, bubble_dict, bbox_dict):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
return
|
||
|
||
np.random.seed(42)
|
||
num_bubbles = max(bubble_dict.keys(), default=1)
|
||
colors = [tuple(int(c) for c in col) for col in np.random.randint(50, 230, size=(num_bubbles + 2, 3))]
|
||
|
||
# draw all OCR quads lightly
|
||
for bbox, text, _ in ocr_results:
|
||
pts = np.array(bbox, dtype=np.int32)
|
||
cv2.polylines(image, [pts], isClosed=True, color=(180, 180, 180), thickness=1)
|
||
|
||
# draw bubble bboxes
|
||
for bubble_id, (x1, y1, x2, y2) in bbox_dict.items():
|
||
color = colors[(bubble_id - 1) % len(colors)]
|
||
cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
|
||
cv2.putText(image, f"BOX#{bubble_id}", (int(x1) + 2, int(y1) + 16),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
||
|
||
cv2.imwrite("debug_clusters.png", image)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# CORE FUNCTION
|
||
# ─────────────────────────────────────────────
|
||
def translate_manga_text(
|
||
image_path,
|
||
source_lang="en",
|
||
target_lang="ca",
|
||
confidence_threshold=0.12,
|
||
export_to_file=None,
|
||
export_bubbles_to="bubbles.json",
|
||
min_text_length=2,
|
||
gap_px="auto",
|
||
filter_sound_effects=True,
|
||
quality_threshold=0.62,
|
||
upscale_factor=2.5,
|
||
bbox_padding=3,
|
||
debug=False,
|
||
reading_mode="ltr",
|
||
):
|
||
# gap resolve
|
||
if gap_px == "auto":
|
||
resolved_gap = compute_auto_gap(image_path)
|
||
else:
|
||
resolved_gap = float(gap_px)
|
||
|
||
full_image = cv2.imread(image_path)
|
||
if full_image is None:
|
||
print(f"❌ Could not load image: {image_path}")
|
||
return
|
||
|
||
# OCR init
|
||
print("\nLoading OCR model...")
|
||
ocr_lang_list = ["en", "es"] if source_lang == "ca" else [source_lang]
|
||
reader = easyocr.Reader(ocr_lang_list)
|
||
|
||
# Translator init
|
||
translator = GoogleTranslator(source=source_lang, target=target_lang)
|
||
|
||
# OCR full image
|
||
print(f"\nRunning OCR on: {image_path}")
|
||
results = reader.readtext(image_path, paragraph=False)
|
||
print(f" Raw detections: {len(results)}")
|
||
|
||
# Filter tokens
|
||
filtered = []
|
||
skipped = 0
|
||
for bbox, text, confidence in results:
|
||
cleaned = normalize_ocr_text(text)
|
||
keep, _ = should_keep_token(cleaned, confidence, confidence_threshold, min_text_length, filter_sound_effects)
|
||
if keep:
|
||
filtered.append((bbox, cleaned, confidence))
|
||
else:
|
||
skipped += 1
|
||
|
||
print(f" ✅ {len(filtered)} kept, {skipped} skipped.\n")
|
||
if not filtered:
|
||
print("⚠️ No text detected after filtering.")
|
||
return
|
||
|
||
# Pass 1 grouping
|
||
bubble_dict, bbox_dict, ocr_quads, bubble_indices = group_quads_by_overlap(
|
||
filtered,
|
||
image_shape=full_image.shape,
|
||
gap_px=resolved_gap,
|
||
bbox_padding=bbox_padding,
|
||
)
|
||
|
||
# Pass 2 orphan absorption
|
||
bubble_dict, bbox_dict, ocr_quads, bubble_indices = absorb_orphan_tokens_into_bubbles(
|
||
ocr_results=filtered,
|
||
bubble_dict=bubble_dict,
|
||
bbox_dict=bbox_dict,
|
||
ocr_quads=ocr_quads,
|
||
bubble_indices=bubble_indices,
|
||
image_shape=full_image.shape,
|
||
bbox_padding=bbox_padding,
|
||
)
|
||
|
||
print(f" ✅ {len(bubble_dict)} bubble(s) detected after absorption.\n")
|
||
|
||
if debug:
|
||
save_debug_clusters(image_path, filtered, bubble_dict, bbox_dict)
|
||
|
||
# merge lines
|
||
clean_bubbles = {i: fix_hyphens(lines) for i, lines in bubble_dict.items() if lines}
|
||
|
||
# OCR quality + reread
|
||
print("Checking OCR quality per bubble...")
|
||
for i, text in clean_bubbles.items():
|
||
if source_lang == "it":
|
||
text = italian_post_ocr_cleanup(text)
|
||
clean_bubbles[i] = text
|
||
|
||
score = ocr_quality_score(text)
|
||
status = "✅" if score >= quality_threshold else "🔁"
|
||
print(f" #{i}: score={score:.2f} {status} '{text[:65]}'")
|
||
|
||
if score < quality_threshold:
|
||
reread = reread_cluster_crop(
|
||
full_image,
|
||
bbox_dict[i],
|
||
reader,
|
||
source_lang=source_lang,
|
||
upscale_factor=upscale_factor,
|
||
)
|
||
if reread:
|
||
clean_bubbles[i] = reread
|
||
|
||
# Reading order + glossary prepass
|
||
reading_order_map = estimate_reading_order(bbox_dict, mode=reading_mode)
|
||
for i in list(clean_bubbles.keys()):
|
||
clean_bubbles[i] = apply_glossary(clean_bubbles[i], GLOSSARY)
|
||
|
||
# Translate
|
||
header = "BUBBLE|ORDER|ORIGINAL|TRANSLATED|FLAGS"
|
||
divider = "─" * 120
|
||
output_lines = [header, divider]
|
||
|
||
print()
|
||
print(f"{'BUBBLE':<8} {'ORDER':<6} {'ORIGINAL':<50} {'TRANSLATED':<50} FLAGS")
|
||
print(divider)
|
||
|
||
ordered_ids = sorted(clean_bubbles.keys(), key=lambda b: reading_order_map.get(b, b))
|
||
translated_count = 0
|
||
|
||
for i in ordered_ids:
|
||
src = clean_bubbles[i].strip()
|
||
if not src:
|
||
continue
|
||
|
||
flags = []
|
||
forced_translation = None
|
||
|
||
# phrase-map pass
|
||
if source_lang == "it" and target_lang == "ca":
|
||
exact = PHRASE_MAP_IT_CA.get(src)
|
||
if exact:
|
||
forced_translation = exact
|
||
flags.append("PHRASE_EXACT")
|
||
else:
|
||
fuzzy, ratio, _ = fuzzy_phrase_match(src, PHRASE_MAP_IT_CA, min_ratio=0.88)
|
||
if fuzzy:
|
||
forced_translation = fuzzy
|
||
flags.append(f"PHRASE_FUZZY:{ratio:.2f}")
|
||
|
||
if forced_translation is not None:
|
||
tgt = forced_translation
|
||
else:
|
||
try:
|
||
tgt = translator.translate(src)
|
||
except Exception as e:
|
||
tgt = f"[Translation error: {e}]"
|
||
|
||
if tgt is None:
|
||
tgt = "[No translation returned]"
|
||
|
||
tgt = normalize_ocr_text(tgt)
|
||
tgt = apply_glossary(tgt, GLOSSARY)
|
||
|
||
# suspicious retry
|
||
if looks_suspicious_translation(src, tgt):
|
||
flags.append("SUSPICIOUS_RETRY")
|
||
retry_src = italian_post_ocr_cleanup(src) if source_lang == "it" else src
|
||
try:
|
||
retry_tgt = translator.translate(retry_src)
|
||
if retry_tgt:
|
||
retry_tgt = normalize_ocr_text(retry_tgt)
|
||
retry_tgt = apply_glossary(retry_tgt, GLOSSARY)
|
||
if not looks_suspicious_translation(src, retry_tgt):
|
||
tgt = retry_tgt
|
||
flags.append("RETRY_OK")
|
||
else:
|
||
if source_lang == "it" and target_lang == "ca":
|
||
fallback, ratio, _ = fuzzy_phrase_match(src, PHRASE_MAP_IT_CA, min_ratio=0.80)
|
||
if fallback:
|
||
tgt = fallback
|
||
flags.append(f"FALLBACK_MAP:{ratio:.2f}")
|
||
except Exception:
|
||
pass
|
||
|
||
tgt = tgt.upper()
|
||
translated_count += 1
|
||
ro = reading_order_map.get(i, i)
|
||
|
||
output_lines.append(f"#{i}|{ro}|{src}|{tgt}|{','.join(flags) if flags else '-'}")
|
||
print(f"#{i:<7} {ro:<6} {src:<50} {tgt:<50} {','.join(flags) if flags else '-'}")
|
||
|
||
output_lines.append(divider)
|
||
summary = f"✅ Done! {translated_count} bubble(s) translated, {skipped} detection(s) skipped."
|
||
output_lines.append(summary)
|
||
print(divider)
|
||
print(summary)
|
||
|
||
if export_to_file:
|
||
write_output(output_lines, export_to_file)
|
||
|
||
if export_bubbles_to:
|
||
export_bubble_boxes(
|
||
bbox_dict,
|
||
ocr_quads,
|
||
reading_order_map=reading_order_map,
|
||
filepath=export_bubbles_to,
|
||
bbox_expand_ratio=0.16,
|
||
image_shape=full_image.shape,
|
||
)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# ENTRY POINT
|
||
# ─────────────────────────────────────────────
|
||
if __name__ == "__main__":
|
||
translate_manga_text(
|
||
image_path="001-page.png",
|
||
source_lang="it",
|
||
target_lang="ca",
|
||
confidence_threshold=0.12,
|
||
min_text_length=2,
|
||
export_to_file="output.txt",
|
||
export_bubbles_to="bubbles.json",
|
||
gap_px="auto",
|
||
filter_sound_effects=True,
|
||
quality_threshold=0.62,
|
||
upscale_factor=2.5,
|
||
bbox_padding=3,
|
||
debug=True,
|
||
reading_mode="ltr",
|
||
)
|