1072 lines
32 KiB
Python
1072 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import cv2
|
||
import numpy as np
|
||
|
||
from deep_translator import GoogleTranslator
|
||
|
||
# OCR engines
|
||
import easyocr
|
||
from paddleocr import PaddleOCR
|
||
|
||
|
||
# ============================================================
|
||
# CONFIG
|
||
# ============================================================
|
||
GLOSSARY = {
|
||
"ANYA": "ANYA",
|
||
"STARLIGHT ANYA": "STARLIGHT ANYA",
|
||
"MR. HENDERSON": "MR. HENDERSON",
|
||
"HENDERSON": "HENDERSON",
|
||
"STELLA STAR": "STELLA STAR",
|
||
}
|
||
|
||
SOUND_EFFECT_PATTERNS = [
|
||
r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$", r"^oh+$",
|
||
r"^ugh+$", r"^bam+$", r"^pow+$", r"^boom+$", r"^bang+$",
|
||
r"^crash+$", r"^thud+$", r"^zip+$", r"^swoosh+$", r"^chirp+$"
|
||
]
|
||
|
||
TITLE_PATTERNS = [
|
||
r"^(mission|chapter|episode|vol\.?|volume)\s*\d+$",
|
||
r"^(spy|family|spy.family)$",
|
||
r"^by\s+.+$",
|
||
]
|
||
|
||
NOISE_PATTERNS = [
|
||
r"^[^a-zA-Z0-9\?!.¡¿]+$",
|
||
r"^BOX[#\s0-9A-Z\-]*$",
|
||
r"^[0-9]{1,3}\s*[Xx]\s*[0-9]{1,3}$",
|
||
]
|
||
|
||
TOP_BAND_RATIO = 0.08
|
||
|
||
|
||
# ============================================================
|
||
# TEXT HELPERS
|
||
# ============================================================
|
||
def normalize_text(text: str) -> str:
|
||
t = (text or "").strip().upper()
|
||
t = t.replace("“", "\"").replace("”", "\"")
|
||
t = t.replace("’", "'").replace("‘", "'")
|
||
t = t.replace("…", "...")
|
||
t = re.sub(r"\s+", " ", t)
|
||
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
|
||
t = re.sub(r"([¡¿])\s+", r"\1", t)
|
||
t = re.sub(r"\(\s+", "(", t)
|
||
t = re.sub(r"\s+\)", ")", t)
|
||
t = re.sub(r"\.{4,}", "...", t)
|
||
return t.strip()
|
||
|
||
|
||
def apply_glossary(text: str) -> str:
|
||
out = text or ""
|
||
for k in sorted(GLOSSARY.keys(), key=len, reverse=True):
|
||
out = re.sub(rf"\b{re.escape(k)}\b", GLOSSARY[k], out, flags=re.IGNORECASE)
|
||
return out
|
||
|
||
|
||
def postprocess_translation_general(text: str) -> str:
|
||
t = normalize_text(text)
|
||
t = re.sub(r"\s{2,}", " ", t).strip()
|
||
t = re.sub(r"([!?]){3,}", r"\1\1", t)
|
||
t = re.sub(r"\.{4,}", "...", t)
|
||
return t
|
||
|
||
|
||
def is_sound_effect(text: str) -> bool:
|
||
cleaned = re.sub(r"[^a-z]", "", (text or "").strip().lower())
|
||
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS)
|
||
|
||
|
||
def is_title_text(text: str) -> bool:
|
||
t = (text or "").strip().lower()
|
||
return any(re.fullmatch(p, t, re.IGNORECASE) for p in TITLE_PATTERNS)
|
||
|
||
|
||
def is_noise_text(text: str) -> bool:
|
||
t = (text or "").strip()
|
||
if any(re.fullmatch(p, t) for p in NOISE_PATTERNS):
|
||
return True
|
||
|
||
if len(t) <= 2 and not re.search(r"[A-Z0-9]", t):
|
||
return True
|
||
|
||
symbol_ratio = sum(1 for c in t if not c.isalnum() and not c.isspace()) / max(1, len(t))
|
||
if len(t) <= 6 and symbol_ratio > 0.60:
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
# ============================================================
|
||
# GEOMETRY HELPERS
|
||
# ============================================================
|
||
def quad_bbox(quad):
|
||
xs = [p[0] for p in quad]
|
||
ys = [p[1] for p in quad]
|
||
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
|
||
|
||
|
||
def quad_center(quad):
|
||
x1, y1, x2, y2 = quad_bbox(quad)
|
||
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
|
||
|
||
|
||
def boxes_union_xyxy(boxes):
|
||
boxes = [b for b in boxes if b is not None]
|
||
if not boxes:
|
||
return None
|
||
return (
|
||
int(min(b[0] for b in boxes)),
|
||
int(min(b[1] for b in boxes)),
|
||
int(max(b[2] for b in boxes)),
|
||
int(max(b[3] for b in boxes)),
|
||
)
|
||
|
||
|
||
def bbox_area_xyxy(b):
|
||
if b is None:
|
||
return 0
|
||
return int(max(0, b[2] - b[0]) * max(0, b[3] - b[1]))
|
||
|
||
|
||
def xyxy_to_xywh(b):
|
||
if b is None:
|
||
return None
|
||
x1, y1, x2, y2 = b
|
||
return {"x": int(x1), "y": int(y1), "w": int(max(0, x2 - x1)), "h": int(max(0, y2 - y1))}
|
||
|
||
|
||
def overlap_or_near(a, b, gap=0):
|
||
ax1, ay1, ax2, ay2 = a
|
||
bx1, by1, bx2, by2 = b
|
||
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
|
||
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
|
||
return gap_x <= gap and gap_y <= gap
|
||
|
||
|
||
# ============================================================
|
||
# QUALITY
|
||
# ============================================================
|
||
def ocr_candidate_score(text: str) -> float:
|
||
if not text:
|
||
return 0.0
|
||
t = text.strip()
|
||
n = len(t)
|
||
if n == 0:
|
||
return 0.0
|
||
|
||
alpha = sum(c.isalpha() for c in t) / n
|
||
spaces = sum(c.isspace() for c in t) / n
|
||
punct_ok = sum(c in ".,!?'-:;()[]\"¡¿" for c in t) / n
|
||
bad = len(re.findall(r"[^\w\s\.\,\!\?\-\'\:\;\(\)\[\]\"¡¿]", t)) / n
|
||
|
||
penalty = 0.0
|
||
if re.search(r"\b[A-Z]\b", t):
|
||
penalty += 0.05
|
||
if re.search(r"[0-9]{2,}", t):
|
||
penalty += 0.08
|
||
if re.search(r"(..)\1\1", t):
|
||
penalty += 0.08
|
||
|
||
score = (0.62 * alpha) + (0.10 * spaces) + (0.20 * punct_ok) - (0.45 * bad) - penalty
|
||
return max(0.0, min(1.0, score))
|
||
|
||
|
||
# ============================================================
|
||
# OCR ENGINE WRAPPER (PADDLE + EASYOCR HYBRID)
|
||
# ============================================================
|
||
class HybridOCR:
|
||
def __init__(self, source_lang="en", use_gpu=False):
|
||
self.source_lang = source_lang
|
||
|
||
# Paddle language choice (single lang for Paddle)
|
||
# For manga EN/ES pages, latin model is robust.
|
||
if source_lang in ("en", "es", "ca", "fr", "de", "it", "pt"):
|
||
paddle_lang = "latin"
|
||
elif source_lang in ("ja",):
|
||
paddle_lang = "japan"
|
||
elif source_lang in ("ko",):
|
||
paddle_lang = "korean"
|
||
elif source_lang in ("ch", "zh", "zh-cn", "zh-tw"):
|
||
paddle_lang = "ch"
|
||
else:
|
||
paddle_lang = "latin"
|
||
|
||
# EasyOCR language list
|
||
if source_lang == "ca":
|
||
easy_langs = ["es", "en"]
|
||
elif source_lang == "en":
|
||
easy_langs = ["en", "es"]
|
||
elif source_lang == "es":
|
||
easy_langs = ["es", "en"]
|
||
else:
|
||
easy_langs = [source_lang]
|
||
|
||
self.paddle = PaddleOCR(
|
||
use_angle_cls=True,
|
||
lang=paddle_lang,
|
||
use_gpu=use_gpu,
|
||
show_log=False
|
||
)
|
||
self.easy = easyocr.Reader(easy_langs, gpu=use_gpu)
|
||
|
||
@staticmethod
|
||
def _paddle_to_std(result):
|
||
"""
|
||
Convert Paddle result to Easy-like:
|
||
[ (quad, text, conf), ... ]
|
||
"""
|
||
out = []
|
||
# paddle.ocr(...) returns list per image
|
||
# each item line: [ [ [x,y],...4pts ], (text, conf) ]
|
||
if not result:
|
||
return out
|
||
# result can be [None] or nested list
|
||
blocks = result if isinstance(result, list) else [result]
|
||
for blk in blocks:
|
||
if blk is None:
|
||
continue
|
||
if len(blk) == 0:
|
||
continue
|
||
# some versions wrap once more
|
||
if isinstance(blk[0], list) and len(blk[0]) > 0 and isinstance(blk[0][0], (list, tuple)) and len(blk[0]) == 2:
|
||
lines = blk
|
||
elif isinstance(blk[0], (list, tuple)) and len(blk[0]) >= 2:
|
||
lines = blk
|
||
else:
|
||
# maybe nested once more
|
||
if len(blk) == 1 and isinstance(blk[0], list):
|
||
lines = blk[0]
|
||
else:
|
||
lines = []
|
||
|
||
for ln in lines:
|
||
try:
|
||
pts, rec = ln
|
||
txt, conf = rec[0], float(rec[1])
|
||
quad = [[float(p[0]), float(p[1])] for p in pts]
|
||
out.append((quad, txt, conf))
|
||
except Exception:
|
||
continue
|
||
return out
|
||
|
||
def read_full_image(self, image_path):
|
||
"""
|
||
Primary: Paddle
|
||
Fallback merge: EasyOCR
|
||
Returns merged standardized detections.
|
||
"""
|
||
# Paddle
|
||
pr = self.paddle.ocr(image_path, cls=True)
|
||
paddle_det = self._paddle_to_std(pr)
|
||
|
||
# Easy
|
||
easy_det = self.easy.readtext(image_path, paragraph=False)
|
||
|
||
# Merge by IOU/text proximity
|
||
merged = list(paddle_det)
|
||
for eb in easy_det:
|
||
eq, et, ec = eb
|
||
ebox = quad_bbox(eq)
|
||
keep = True
|
||
for pb in paddle_det:
|
||
pq, pt, pc = pb
|
||
pbox = quad_bbox(pq)
|
||
|
||
ix1 = max(ebox[0], pbox[0]); iy1 = max(ebox[1], pbox[1])
|
||
ix2 = min(ebox[2], pbox[2]); iy2 = min(ebox[3], pbox[3])
|
||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
||
a1 = max(1, (ebox[2] - ebox[0]) * (ebox[3] - ebox[1]))
|
||
a2 = max(1, (pbox[2] - pbox[0]) * (pbox[3] - pbox[1]))
|
||
iou = inter / float(a1 + a2 - inter) if (a1 + a2 - inter) > 0 else 0.0
|
||
|
||
if iou > 0.55:
|
||
# if overlapped and paddle exists, keep paddle unless easy much higher conf
|
||
if float(ec) > float(pc) + 0.20:
|
||
# replace paddle with easy-like entry
|
||
try:
|
||
merged.remove(pb)
|
||
except Exception:
|
||
pass
|
||
merged.append((eq, et, float(ec)))
|
||
keep = False
|
||
break
|
||
|
||
if keep:
|
||
merged.append((eq, et, float(ec)))
|
||
|
||
return merged
|
||
|
||
def read_array_with_both(self, arr_gray_or_bgr):
|
||
"""
|
||
OCR from array (used in robust reread pass).
|
||
Returns merged detections in standardized format.
|
||
"""
|
||
tmp = "_tmp_ocr_hybrid.png"
|
||
cv2.imwrite(tmp, arr_gray_or_bgr)
|
||
try:
|
||
pr = self.paddle.ocr(tmp, cls=True)
|
||
paddle_det = self._paddle_to_std(pr)
|
||
easy_det = self.easy.readtext(tmp, paragraph=False)
|
||
|
||
merged = list(paddle_det)
|
||
|
||
for eb in easy_det:
|
||
eq, et, ec = eb
|
||
ebox = quad_bbox(eq)
|
||
keep = True
|
||
for pb in paddle_det:
|
||
pq, pt, pc = pb
|
||
pbox = quad_bbox(pq)
|
||
|
||
ix1 = max(ebox[0], pbox[0]); iy1 = max(ebox[1], pbox[1])
|
||
ix2 = min(ebox[2], pbox[2]); iy2 = min(ebox[3], pbox[3])
|
||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
||
a1 = max(1, (ebox[2] - ebox[0]) * (ebox[3] - ebox[1]))
|
||
a2 = max(1, (pbox[2] - pbox[0]) * (pbox[3] - pbox[1]))
|
||
iou = inter / float(a1 + a2 - inter) if (a1 + a2 - inter) > 0 else 0.0
|
||
|
||
if iou > 0.55:
|
||
if float(ec) > float(pc) + 0.20:
|
||
try:
|
||
merged.remove(pb)
|
||
except Exception:
|
||
pass
|
||
merged.append((eq, et, float(ec)))
|
||
keep = False
|
||
break
|
||
|
||
if keep:
|
||
merged.append((eq, et, float(ec)))
|
||
|
||
return merged
|
||
finally:
|
||
if os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
# ============================================================
|
||
# PREPROCESS + ROBUST REREAD
|
||
# ============================================================
|
||
def preprocess_variant(crop_bgr, mode):
|
||
gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
|
||
|
||
if mode == "raw":
|
||
return gray
|
||
|
||
if mode == "clahe":
|
||
return cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(gray)
|
||
|
||
if mode == "adaptive":
|
||
den = cv2.GaussianBlur(gray, (3, 3), 0)
|
||
return cv2.adaptiveThreshold(
|
||
den, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
cv2.THRESH_BINARY, 35, 11
|
||
)
|
||
|
||
if mode == "otsu":
|
||
den = cv2.GaussianBlur(gray, (3, 3), 0)
|
||
_, th = cv2.threshold(den, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||
return th
|
||
|
||
if mode == "invert":
|
||
return 255 - gray
|
||
|
||
return gray
|
||
|
||
|
||
def rotate_image_keep_bounds(img, angle_deg):
|
||
h, w = img.shape[:2]
|
||
c = (w / 2, h / 2)
|
||
M = cv2.getRotationMatrix2D(c, angle_deg, 1.0)
|
||
cos = abs(M[0, 0]); sin = abs(M[0, 1])
|
||
|
||
new_w = int((h * sin) + (w * cos))
|
||
new_h = int((h * cos) + (w * sin))
|
||
|
||
M[0, 2] += (new_w / 2) - c[0]
|
||
M[1, 2] += (new_h / 2) - c[1]
|
||
|
||
return cv2.warpAffine(img, M, (new_w, new_h), flags=cv2.INTER_CUBIC, borderValue=255)
|
||
|
||
|
||
def rebuild_text_from_ocr_result(res):
|
||
if not res:
|
||
return ""
|
||
|
||
norm = []
|
||
for item in res:
|
||
if len(item) != 3:
|
||
continue
|
||
bbox, txt, conf = item
|
||
if not txt or not txt.strip():
|
||
continue
|
||
b = quad_bbox(bbox)
|
||
xc = (b[0] + b[2]) / 2.0
|
||
yc = (b[1] + b[3]) / 2.0
|
||
h = max(1.0, b[3] - b[1])
|
||
norm.append((b, txt, conf, xc, yc, h))
|
||
|
||
if not norm:
|
||
return ""
|
||
|
||
med_h = float(np.median([x[5] for x in norm]))
|
||
row_tol = max(6.0, med_h * 0.75)
|
||
|
||
norm.sort(key=lambda z: z[4]) # y
|
||
rows = []
|
||
for it in norm:
|
||
placed = False
|
||
for r in rows:
|
||
if abs(it[4] - r["yc"]) <= row_tol:
|
||
r["m"].append(it)
|
||
r["yc"] = float(np.mean([k[4] for k in r["m"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"yc": it[4], "m": [it]})
|
||
|
||
rows.sort(key=lambda r: r["yc"])
|
||
lines = []
|
||
for r in rows:
|
||
mem = sorted(r["m"], key=lambda z: z[3]) # x
|
||
line = normalize_text(" ".join(x[1] for x in mem))
|
||
if line:
|
||
lines.append(line)
|
||
|
||
return normalize_text(" ".join(lines))
|
||
|
||
|
||
def reread_crop_robust(image, bbox, hybrid_ocr: HybridOCR, upscale=3.0, pad=24):
|
||
ih, iw = image.shape[:2]
|
||
x1, y1, x2, y2 = bbox
|
||
x1 = max(0, int(x1 - pad))
|
||
y1 = max(0, int(y1 - pad))
|
||
x2 = min(iw, int(x2 + pad))
|
||
y2 = min(ih, int(y2 + pad))
|
||
crop = image[y1:y2, x1:x2]
|
||
if crop.size == 0:
|
||
return None, 0.0
|
||
|
||
up = cv2.resize(
|
||
crop,
|
||
(int(crop.shape[1] * upscale), int(crop.shape[0] * upscale)),
|
||
interpolation=cv2.INTER_CUBIC
|
||
)
|
||
|
||
modes = ["raw", "clahe", "adaptive", "otsu", "invert"]
|
||
angles = [0.0, 1.5, -1.5]
|
||
|
||
best_text, best_score = "", 0.0
|
||
|
||
for mode in modes:
|
||
proc = preprocess_variant(up, mode)
|
||
|
||
if len(proc.shape) == 2:
|
||
proc3 = cv2.cvtColor(proc, cv2.COLOR_GRAY2BGR)
|
||
else:
|
||
proc3 = proc
|
||
|
||
for a in angles:
|
||
rot = rotate_image_keep_bounds(proc3, a)
|
||
res = hybrid_ocr.read_array_with_both(rot)
|
||
txt = rebuild_text_from_ocr_result(res)
|
||
sc = ocr_candidate_score(txt)
|
||
|
||
if sc > best_score:
|
||
best_text, best_score = txt, sc
|
||
|
||
if not best_text:
|
||
return None, 0.0
|
||
return best_text, best_score
|
||
|
||
|
||
# ============================================================
|
||
# LINE REBUILD + YELLOW BOXES
|
||
# ============================================================
|
||
def build_lines_from_indices(indices, ocr):
|
||
if not indices:
|
||
return []
|
||
|
||
items = []
|
||
for i in indices:
|
||
b = quad_bbox(ocr[i][0])
|
||
xc = (b[0] + b[2]) / 2.0
|
||
yc = (b[1] + b[3]) / 2.0
|
||
h = max(1.0, b[3] - b[1])
|
||
items.append((i, b, xc, yc, h))
|
||
|
||
med_h = float(np.median([it[4] for it in items])) if items else 10.0
|
||
row_tol = max(6.0, med_h * 0.75)
|
||
|
||
items.sort(key=lambda x: x[3])
|
||
rows = []
|
||
for it in items:
|
||
i, b, xc, yc, h = it
|
||
placed = False
|
||
for r in rows:
|
||
if abs(yc - r["yc"]) <= row_tol:
|
||
r["m"].append((i, b, xc, yc))
|
||
r["yc"] = float(np.mean([k[3] for k in r["m"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"yc": yc, "m": [(i, b, xc, yc)]})
|
||
|
||
rows.sort(key=lambda r: r["yc"])
|
||
lines = []
|
||
for r in rows:
|
||
mem = sorted(r["m"], key=lambda z: z[2])
|
||
txt = normalize_text(" ".join(ocr[i][1] for i, _, _, _ in mem))
|
||
if txt and not is_noise_text(txt):
|
||
lines.append(txt)
|
||
|
||
return lines
|
||
|
||
|
||
def build_line_boxes_from_indices(indices, ocr, image_shape=None):
|
||
if not indices:
|
||
return []
|
||
|
||
items = []
|
||
for i in indices:
|
||
b = quad_bbox(ocr[i][0])
|
||
txt = normalize_text(ocr[i][1])
|
||
if is_noise_text(txt):
|
||
continue
|
||
|
||
xc = (b[0] + b[2]) / 2.0
|
||
yc = (b[1] + b[3]) / 2.0
|
||
w = max(1.0, b[2] - b[0])
|
||
h = max(1.0, b[3] - b[1])
|
||
|
||
items.append({
|
||
"i": i, "b": b, "txt": txt,
|
||
"xc": xc, "yc": yc, "w": w, "h": h
|
||
})
|
||
|
||
if not items:
|
||
return []
|
||
|
||
med_h = float(np.median([it["h"] for it in items]))
|
||
row_tol = max(6.0, med_h * 0.90)
|
||
gap_x_tol = max(8.0, med_h * 1.25)
|
||
pad = max(3, int(round(med_h * 0.22)))
|
||
|
||
def is_punct_like(t):
|
||
raw = (t or "").strip()
|
||
if raw == "":
|
||
return True
|
||
punct_ratio = sum(1 for c in raw if not c.isalnum()) / max(1, len(raw))
|
||
return punct_ratio >= 0.5 or len(raw) <= 2
|
||
|
||
items_sorted = sorted(items, key=lambda x: x["yc"])
|
||
rows = []
|
||
for it in items_sorted:
|
||
placed = False
|
||
for r in rows:
|
||
if abs(it["yc"] - r["yc"]) <= row_tol:
|
||
r["m"].append(it)
|
||
r["yc"] = float(np.mean([k["yc"] for k in r["m"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"yc": it["yc"], "m": [it]})
|
||
|
||
rows.sort(key=lambda r: r["yc"])
|
||
out_boxes = []
|
||
|
||
for r in rows:
|
||
mem = sorted(r["m"], key=lambda z: z["xc"])
|
||
normal = [t for t in mem if not is_punct_like(t["txt"])]
|
||
punct = [t for t in mem if is_punct_like(t["txt"])]
|
||
|
||
if not normal:
|
||
normal = mem
|
||
punct = []
|
||
|
||
chunks = []
|
||
cur = [normal[0]]
|
||
for t in normal[1:]:
|
||
prev = cur[-1]["b"]
|
||
b = t["b"]
|
||
gap = b[0] - prev[2]
|
||
if gap <= gap_x_tol:
|
||
cur.append(t)
|
||
else:
|
||
chunks.append(cur)
|
||
cur = [t]
|
||
chunks.append(cur)
|
||
|
||
for p in punct:
|
||
pb = p["b"]
|
||
pxc, pyc = p["xc"], p["yc"]
|
||
best_k = -1
|
||
best_score = 1e18
|
||
|
||
for k, ch in enumerate(chunks):
|
||
ub = boxes_union_xyxy([x["b"] for x in ch])
|
||
cx = (ub[0] + ub[2]) / 2.0
|
||
cy = (ub[1] + ub[3]) / 2.0
|
||
dx = abs(pxc - cx)
|
||
dy = abs(pyc - cy)
|
||
score = dx + 1.8 * dy
|
||
|
||
near = overlap_or_near(pb, ub, gap=int(med_h * 1.25))
|
||
if near:
|
||
score -= med_h * 2.0
|
||
|
||
if score < best_score:
|
||
best_score = score
|
||
best_k = k
|
||
|
||
if best_k >= 0:
|
||
chunks[best_k].append(p)
|
||
else:
|
||
chunks.append([p])
|
||
|
||
for ch in chunks:
|
||
ub = boxes_union_xyxy([x["b"] for x in ch])
|
||
if ub:
|
||
x1, y1, x2, y2 = ub
|
||
pad_x = pad
|
||
pad_top = int(round(pad * 1.35))
|
||
pad_bot = int(round(pad * 0.95))
|
||
out_boxes.append((x1 - pad_x, y1 - pad_top, x2 + pad_x, y2 + pad_bot))
|
||
|
||
token_boxes = [it["b"] for it in items]
|
||
|
||
def inside(tb, lb):
|
||
return tb[0] >= lb[0] and tb[1] >= lb[1] and tb[2] <= lb[2] and tb[3] <= lb[3]
|
||
|
||
for tb in token_boxes:
|
||
if not any(inside(tb, lb) for lb in out_boxes):
|
||
x1, y1, x2, y2 = tb
|
||
pad_x = pad
|
||
pad_top = int(round(pad * 1.35))
|
||
pad_bot = int(round(pad * 0.95))
|
||
out_boxes.append((x1 - pad_x, y1 - pad_top, x2 + pad_x, y2 + pad_bot))
|
||
|
||
merged = []
|
||
for b in out_boxes:
|
||
merged_into = False
|
||
for i, m in enumerate(merged):
|
||
ix1 = max(b[0], m[0]); iy1 = max(b[1], m[1])
|
||
ix2 = min(b[2], m[2]); iy2 = min(b[3], m[3])
|
||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
||
a1 = max(1, (b[2] - b[0]) * (b[3] - b[1]))
|
||
a2 = max(1, (m[2] - m[0]) * (m[3] - m[1]))
|
||
iou = inter / float(a1 + a2 - inter) if (a1 + a2 - inter) > 0 else 0.0
|
||
if iou > 0.72:
|
||
merged[i] = boxes_union_xyxy([b, m])
|
||
merged_into = True
|
||
break
|
||
if not merged_into:
|
||
merged.append(b)
|
||
|
||
safe = []
|
||
for (x1, y1, x2, y2) in merged:
|
||
w = x2 - x1
|
||
h = y2 - y1
|
||
if w < 28:
|
||
d = (28 - w) // 2 + 2
|
||
x1 -= d; x2 += d
|
||
if h < 18:
|
||
d = (18 - h) // 2 + 2
|
||
y1 -= d; y2 += d
|
||
safe.append((x1, y1, x2, y2))
|
||
merged = safe
|
||
|
||
if image_shape is not None:
|
||
ih, iw = image_shape[:2]
|
||
clamped = []
|
||
for b in merged:
|
||
x1 = max(0, int(b[0]))
|
||
y1 = max(0, int(b[1]))
|
||
x2 = min(iw - 1, int(b[2]))
|
||
y2 = min(ih - 1, int(b[3]))
|
||
if x2 > x1 and y2 > y1:
|
||
clamped.append((x1, y1, x2, y2))
|
||
merged = clamped
|
||
else:
|
||
merged = [(int(b[0]), int(b[1]), int(b[2]), int(b[3])) for b in merged]
|
||
|
||
merged.sort(key=lambda z: (z[1], z[0]))
|
||
return merged
|
||
|
||
|
||
# ============================================================
|
||
# GROUPING
|
||
# ============================================================
|
||
def auto_gap(image_path, base=18, ref_w=750):
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
return base
|
||
return base * (img.shape[1] / ref_w)
|
||
|
||
|
||
def group_tokens(ocr, image_shape, gap_px=18, bbox_padding=3):
|
||
n = len(ocr)
|
||
if n == 0:
|
||
return {}, {}, {}, {}
|
||
|
||
boxes = [quad_bbox(r[0]) for r in ocr]
|
||
centers = [quad_center(r[0]) for r in ocr]
|
||
hs = [max(1.0, b[3] - b[1]) for b in boxes]
|
||
med_h = float(np.median(hs)) if hs else 12.0
|
||
dist_thresh = max(20.0, med_h * 2.2)
|
||
|
||
p = list(range(n))
|
||
|
||
def find(x):
|
||
while p[x] != x:
|
||
p[x] = p[p[x]]
|
||
x = p[x]
|
||
return x
|
||
|
||
def unite(a, b):
|
||
p[find(a)] = find(b)
|
||
|
||
for i in range(n):
|
||
for j in range(i + 1, n):
|
||
if overlap_or_near(boxes[i], boxes[j], gap=gap_px):
|
||
unite(i, j)
|
||
continue
|
||
cx1, cy1 = centers[i]
|
||
cx2, cy2 = centers[j]
|
||
d = ((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) ** 0.5
|
||
if d <= dist_thresh and abs(cy1 - cy2) <= med_h * 3.0:
|
||
unite(i, j)
|
||
|
||
groups = {}
|
||
for i in range(n):
|
||
groups.setdefault(find(i), []).append(i)
|
||
|
||
sorted_groups = sorted(
|
||
groups.values(),
|
||
key=lambda idxs: (
|
||
min(boxes[i][1] for i in idxs),
|
||
min(boxes[i][0] for i in idxs)
|
||
)
|
||
)
|
||
|
||
bubbles = {}
|
||
bubble_boxes = {}
|
||
bubble_quads = {}
|
||
bubble_indices = {}
|
||
|
||
ih, iw = image_shape[:2]
|
||
for bid, idxs in enumerate(sorted_groups, start=1):
|
||
idxs = sorted(idxs, key=lambda k: boxes[k][1])
|
||
|
||
lines = build_lines_from_indices(idxs, ocr)
|
||
quads = [ocr[k][0] for k in idxs]
|
||
ub = boxes_union_xyxy([quad_bbox(q) for q in quads])
|
||
if ub is None:
|
||
continue
|
||
|
||
x1, y1, x2, y2 = ub
|
||
x1 = max(0, x1 - bbox_padding)
|
||
y1 = max(0, y1 - bbox_padding)
|
||
x2 = min(iw - 1, x2 + bbox_padding)
|
||
y2 = min(ih - 1, y2 + bbox_padding)
|
||
|
||
bubbles[bid] = lines
|
||
bubble_boxes[bid] = (x1, y1, x2, y2)
|
||
bubble_quads[bid] = quads
|
||
bubble_indices[bid] = idxs
|
||
|
||
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
||
|
||
|
||
# ============================================================
|
||
# DEBUG
|
||
# ============================================================
|
||
def save_debug_clusters(image_path, ocr, bubble_boxes, bubble_indices, out_path="debug_clusters.png"):
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
return
|
||
|
||
for bbox, txt, conf in ocr:
|
||
pts = np.array(bbox, dtype=np.int32)
|
||
cv2.polylines(img, [pts], True, (180, 180, 180), 1)
|
||
|
||
for bid, bb in bubble_boxes.items():
|
||
x1, y1, x2, y2 = bb
|
||
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 220, 0), 2)
|
||
cv2.putText(
|
||
img, f"BOX#{bid}", (x1 + 2, max(15, y1 + 16)),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 220, 0), 2
|
||
)
|
||
|
||
idxs = bubble_indices.get(bid, [])
|
||
line_boxes = build_line_boxes_from_indices(idxs, ocr, image_shape=img.shape)
|
||
for lb in line_boxes:
|
||
lx1, ly1, lx2, ly2 = lb
|
||
cv2.rectangle(img, (lx1, ly1), (lx2, ly2), (0, 255, 255), 3)
|
||
|
||
cv2.imwrite(out_path, img)
|
||
|
||
|
||
# ============================================================
|
||
# EXPORT
|
||
# ============================================================
|
||
def estimate_reading_order(bbox_dict, mode="ltr"):
|
||
items = []
|
||
for bid, (x1, y1, x2, y2) in bbox_dict.items():
|
||
cx = (x1 + x2) / 2.0
|
||
cy = (y1 + y2) / 2.0
|
||
items.append((bid, cx, cy))
|
||
|
||
items.sort(key=lambda t: t[2])
|
||
|
||
rows = []
|
||
tol = 90
|
||
for it in items:
|
||
placed = False
|
||
for r in rows:
|
||
if abs(it[2] - r["cy"]) <= tol:
|
||
r["items"].append(it)
|
||
r["cy"] = float(np.mean([x[2] for x in r["items"]]))
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
rows.append({"cy": it[2], "items": [it]})
|
||
|
||
rows.sort(key=lambda r: r["cy"])
|
||
order = []
|
||
for r in rows:
|
||
r["items"].sort(key=lambda x: x[1], reverse=(mode == "rtl"))
|
||
order.extend([z[0] for z in r["items"]])
|
||
|
||
return {bid: i + 1 for i, bid in enumerate(order)}
|
||
|
||
|
||
def export_bubbles(filepath, bbox_dict, quads_dict, indices_dict, ocr, reading_map, image_shape):
|
||
out = {}
|
||
|
||
for bid, bb in bbox_dict.items():
|
||
x1, y1, x2, y2 = bb
|
||
quads = quads_dict.get(bid, [])
|
||
idxs = indices_dict.get(bid, [])
|
||
|
||
qboxes = [quad_bbox(q) for q in quads]
|
||
text_union = boxes_union_xyxy(qboxes)
|
||
|
||
line_boxes_xyxy = build_line_boxes_from_indices(idxs, ocr, image_shape=image_shape)
|
||
line_union_xyxy = boxes_union_xyxy(line_boxes_xyxy)
|
||
line_union_area = bbox_area_xyxy(line_union_xyxy)
|
||
|
||
out[str(bid)] = {
|
||
"x": int(x1), "y": int(y1), "w": int(x2 - x1), "h": int(y2 - y1),
|
||
"reading_order": int(reading_map.get(bid, bid)),
|
||
"quad_bboxes": [
|
||
{"x": int(b[0]), "y": int(b[1]), "w": int(b[2] - b[0]), "h": int(b[3] - b[1])}
|
||
for b in qboxes
|
||
],
|
||
"quads": [
|
||
[[int(p[0]), int(p[1])] for p in q] for q in quads
|
||
],
|
||
"text_bbox": xyxy_to_xywh(text_union),
|
||
"line_bboxes": [xyxy_to_xywh(lb) for lb in line_boxes_xyxy],
|
||
"line_union_bbox": xyxy_to_xywh(line_union_xyxy) if line_union_xyxy else None,
|
||
"line_union_area": int(line_union_area),
|
||
}
|
||
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
json.dump(out, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
# ============================================================
|
||
# MAIN PIPELINE
|
||
# ============================================================
|
||
def translate_manga_text(
|
||
image_path,
|
||
source_lang="en",
|
||
target_lang="ca",
|
||
confidence_threshold=0.12,
|
||
min_text_length=1,
|
||
gap_px="auto",
|
||
filter_sound_effects=True,
|
||
quality_threshold=0.62,
|
||
export_to_file="output.txt",
|
||
export_bubbles_to="bubbles.json",
|
||
reading_mode="ltr",
|
||
debug=True,
|
||
use_gpu=False
|
||
):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
print(f"❌ Cannot load image: {image_path}")
|
||
return
|
||
|
||
resolved_gap = auto_gap(image_path) if gap_px == "auto" else float(gap_px)
|
||
|
||
print("Loading Hybrid OCR (Paddle + EasyOCR)...")
|
||
hybrid = HybridOCR(source_lang=source_lang, use_gpu=use_gpu)
|
||
|
||
print("Running OCR...")
|
||
raw = hybrid.read_full_image(image_path)
|
||
print(f"Raw detections (merged): {len(raw)}")
|
||
|
||
filtered = []
|
||
skipped = 0
|
||
ih, iw = image.shape[:2]
|
||
|
||
for bbox, text, conf in raw:
|
||
t = normalize_text(text)
|
||
qb = quad_bbox(bbox)
|
||
|
||
if conf < confidence_threshold:
|
||
skipped += 1
|
||
continue
|
||
if len(t) < min_text_length:
|
||
skipped += 1
|
||
continue
|
||
if is_noise_text(t):
|
||
skipped += 1
|
||
continue
|
||
if filter_sound_effects and is_sound_effect(t):
|
||
skipped += 1
|
||
continue
|
||
if is_title_text(t):
|
||
skipped += 1
|
||
continue
|
||
|
||
if qb[1] < int(ih * TOP_BAND_RATIO):
|
||
if conf < 0.70 and len(t) >= 5:
|
||
skipped += 1
|
||
continue
|
||
|
||
filtered.append((bbox, t, conf))
|
||
|
||
print(f"Kept: {len(filtered)} | Skipped: {skipped}")
|
||
if not filtered:
|
||
print("⚠️ No text after filtering.")
|
||
return
|
||
|
||
bubbles, bubble_boxes, bubble_quads, bubble_indices = group_tokens(
|
||
filtered, image.shape, gap_px=resolved_gap, bbox_padding=3
|
||
)
|
||
|
||
if debug:
|
||
save_debug_clusters(
|
||
image_path=image_path,
|
||
ocr=filtered,
|
||
bubble_boxes=bubble_boxes,
|
||
bubble_indices=bubble_indices,
|
||
out_path="debug_clusters.png"
|
||
)
|
||
|
||
translator = GoogleTranslator(source=source_lang, target=target_lang)
|
||
|
||
clean_lines = {}
|
||
for bid, lines in bubbles.items():
|
||
base_txt = normalize_text(" ".join(lines))
|
||
base_sc = ocr_candidate_score(base_txt)
|
||
|
||
if base_sc < quality_threshold:
|
||
rr_txt, rr_sc = reread_crop_robust(
|
||
image,
|
||
bubble_boxes[bid],
|
||
hybrid,
|
||
upscale=3.0,
|
||
pad=24
|
||
)
|
||
if rr_txt and rr_sc > base_sc + 0.06:
|
||
txt = rr_txt
|
||
else:
|
||
txt = base_txt
|
||
else:
|
||
txt = base_txt
|
||
|
||
txt = txt.replace(" BOMPORTA", " IMPORTA")
|
||
txt = txt.replace(" TESTO ", " ESTO ")
|
||
txt = txt.replace(" MIVERDAD", " MI VERDAD")
|
||
|
||
clean_lines[bid] = apply_glossary(normalize_text(txt))
|
||
|
||
reading_map = estimate_reading_order(bubble_boxes, mode=reading_mode)
|
||
|
||
divider = "─" * 120
|
||
out_lines = ["BUBBLE|ORDER|ORIGINAL|TRANSLATED|FLAGS", divider]
|
||
|
||
print(divider)
|
||
print(f"{'BUBBLE':<8} {'ORDER':<6} {'ORIGINAL':<50} {'TRANSLATED':<50} FLAGS")
|
||
print(divider)
|
||
|
||
translated_count = 0
|
||
for bid in sorted(clean_lines.keys(), key=lambda x: reading_map.get(x, x)):
|
||
src = clean_lines[bid].strip()
|
||
if not src:
|
||
continue
|
||
|
||
flags = []
|
||
try:
|
||
tgt = translator.translate(src) or ""
|
||
except Exception as e:
|
||
tgt = f"[Translation error: {e}]"
|
||
flags.append("TRANSLATION_ERROR")
|
||
|
||
tgt = apply_glossary(postprocess_translation_general(tgt)).upper()
|
||
src_u = src.upper()
|
||
|
||
out_lines.append(
|
||
f"#{bid}|{reading_map.get(bid,bid)}|{src_u}|{tgt}|{','.join(flags) if flags else '-'}"
|
||
)
|
||
|
||
print(
|
||
f"#{bid:<7} {reading_map.get(bid,bid):<6} "
|
||
f"{src_u[:50]:<50} {tgt[:50]:<50} {','.join(flags) if flags else '-'}"
|
||
)
|
||
translated_count += 1
|
||
|
||
out_lines.append(divider)
|
||
out_lines.append(f"✅ Done! {translated_count} bubble(s) translated, {skipped} detection(s) skipped.")
|
||
|
||
with open(export_to_file, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(out_lines))
|
||
|
||
export_bubbles(
|
||
export_bubbles_to,
|
||
bbox_dict=bubble_boxes,
|
||
quads_dict=bubble_quads,
|
||
indices_dict=bubble_indices,
|
||
ocr=filtered,
|
||
reading_map=reading_map,
|
||
image_shape=image.shape
|
||
)
|
||
|
||
print(divider)
|
||
print(f"Saved: {export_to_file}")
|
||
print(f"Saved: {export_bubbles_to}")
|
||
if debug:
|
||
print("Saved: debug_clusters.png")
|
||
|
||
|
||
# ============================================================
|
||
# ENTRYPOINT
|
||
# ============================================================
|
||
if __name__ == "__main__":
|
||
translate_manga_text(
|
||
image_path="001-page.png",
|
||
source_lang="it",
|
||
target_lang="ca",
|
||
confidence_threshold=0.12,
|
||
min_text_length=1,
|
||
gap_px="auto",
|
||
filter_sound_effects=True,
|
||
quality_threshold=0.62,
|
||
export_to_file="output.txt",
|
||
export_bubbles_to="bubbles.json",
|
||
reading_mode="ltr",
|
||
debug=True,
|
||
use_gpu=False
|
||
)
|