875 lines
26 KiB
Python
875 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import cv2
|
|
import numpy as np
|
|
import warnings
|
|
from typing import List, Tuple, Dict, Any, Optional
|
|
|
|
from deep_translator import GoogleTranslator
|
|
|
|
# macOS Native Vision imports
|
|
import Vision
|
|
import Quartz
|
|
from Foundation import NSData
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
# ============================================================
|
|
# CONFIG
|
|
# ============================================================
|
|
GLOSSARY = {
|
|
"ANYA": "ANYA",
|
|
"STARLIGHT ANYA": "STARLIGHT ANYA",
|
|
"MR. HENDERSON": "MR. HENDERSON",
|
|
"HENDERSON": "HENDERSON",
|
|
"STELLA STAR": "STELLA STAR",
|
|
}
|
|
|
|
SOUND_EFFECT_PATTERNS = [
|
|
r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$",
|
|
r"^ugh+$", r"^bam+$", r"^pow+$", r"^boom+$", r"^bang+$",
|
|
r"^crash+$", r"^thud+$", r"^zip+$", r"^swoosh+$", r"^chirp+$"
|
|
]
|
|
|
|
TITLE_PATTERNS = [
|
|
r"^(mission|chapter|episode|vol\.?|volume)\s*\d+$",
|
|
r"^(spy|family|spy.family)$",
|
|
r"^by\s+.+$",
|
|
]
|
|
|
|
NOISE_PATTERNS = [
|
|
r"^[^a-zA-Z0-9\?!.¡¿]+$",
|
|
r"^BOX[#\s0-9A-Z\-]*$",
|
|
r"^[0-9]{1,3}\s*[Xx]\s*[0-9]{1,3}$",
|
|
]
|
|
|
|
TOP_BAND_RATIO = 0.08
|
|
|
|
|
|
# ============================================================
|
|
# HELPERS
|
|
# ============================================================
|
|
def normalize_text(text: str) -> str:
|
|
t = (text or "").strip().upper()
|
|
t = t.replace("\u201c", "\"").replace("\u201d", "\"")
|
|
t = t.replace("\u2018", "'").replace("\u2019", "'")
|
|
t = t.replace("\u2026", "...")
|
|
t = re.sub(r"\s+", " ", t)
|
|
t = re.sub(r"\s+([,.;:!?])", r"\1", t)
|
|
t = re.sub(r"([¡¿])\s+", r"\1", t)
|
|
t = re.sub(r"\(\s+", "(", t)
|
|
t = re.sub(r"\s+\)", ")", t)
|
|
t = re.sub(r"\.{4,}", "...", t)
|
|
return t.strip()
|
|
|
|
|
|
def apply_glossary(text: str) -> str:
|
|
out = text or ""
|
|
for k in sorted(GLOSSARY.keys(), key=len, reverse=True):
|
|
out = re.sub(rf"\b{re.escape(k)}\b", GLOSSARY[k], out, flags=re.IGNORECASE)
|
|
return out
|
|
|
|
|
|
def postprocess_translation_general(text: str) -> str:
|
|
t = normalize_text(text)
|
|
t = re.sub(r"\s{2,}", " ", t).strip()
|
|
t = re.sub(r"([!?]){3,}", r"\1\1", t)
|
|
t = re.sub(r"\.{4,}", "...", t)
|
|
return t
|
|
|
|
|
|
def is_sound_effect(text: str) -> bool:
|
|
cleaned = re.sub(r"[^a-z]", "", (text or "").strip().lower())
|
|
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS)
|
|
|
|
|
|
def is_title_text(text: str) -> bool:
|
|
t = (text or "").strip().lower()
|
|
return any(re.fullmatch(p, t, re.IGNORECASE) for p in TITLE_PATTERNS)
|
|
|
|
|
|
def looks_like_box_tag(t: str) -> bool:
|
|
s = re.sub(r"[^A-Z0-9#]", "", (t or "").upper())
|
|
if re.fullmatch(r"[BEF]?[O0D]X#?\d{0,3}", s):
|
|
return True
|
|
if re.fullmatch(r"B[O0D]X\d{0,3}", s):
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_noise_text(text: str) -> bool:
|
|
t = (text or "").strip()
|
|
|
|
# Explicitly allow standalone punctuation like ? or !
|
|
if re.fullmatch(r"[\?\!]+", t):
|
|
return False
|
|
|
|
if any(re.fullmatch(p, t) for p in NOISE_PATTERNS):
|
|
return True
|
|
if looks_like_box_tag(t):
|
|
return True
|
|
if len(t) <= 2 and not re.search(r"[A-Z0-9\?\!]", t):
|
|
return True
|
|
|
|
symbol_ratio = sum(1 for c in t if not c.isalnum() and not c.isspace()) / max(1, len(t))
|
|
if len(t) <= 6 and symbol_ratio > 0.60:
|
|
return True
|
|
return False
|
|
|
|
|
|
def quad_bbox(quad):
|
|
xs = [p[0] for p in quad]
|
|
ys = [p[1] for p in quad]
|
|
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
|
|
|
|
|
|
def quad_center(quad):
|
|
x1, y1, x2, y2 = quad_bbox(quad)
|
|
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
|
|
|
|
|
|
def boxes_union_xyxy(boxes):
|
|
boxes = [b for b in boxes if b is not None]
|
|
if not boxes:
|
|
return None
|
|
return (
|
|
int(min(b[0] for b in boxes)),
|
|
int(min(b[1] for b in boxes)),
|
|
int(max(b[2] for b in boxes)),
|
|
int(max(b[3] for b in boxes)),
|
|
)
|
|
|
|
|
|
def bbox_area_xyxy(b):
|
|
if b is None:
|
|
return 0
|
|
return int(max(0, b[2] - b[0]) * max(0, b[3] - b[1]))
|
|
|
|
|
|
def xyxy_to_xywh(b):
|
|
if b is None:
|
|
return None
|
|
x1, y1, x2, y2 = b
|
|
return {"x": int(x1), "y": int(y1), "w": int(max(0, x2 - x1)), "h": int(max(0, y2 - y1))}
|
|
|
|
|
|
def overlap_or_near(a, b, gap=0):
|
|
ax1, ay1, ax2, ay2 = a
|
|
bx1, by1, bx2, by2 = b
|
|
gap_x = max(0, max(ax1, bx1) - min(ax2, bx2))
|
|
gap_y = max(0, max(ay1, by1) - min(ay2, by2))
|
|
return gap_x <= gap and gap_y <= gap
|
|
|
|
|
|
def ocr_candidate_score(text: str) -> float:
|
|
if not text:
|
|
return 0.0
|
|
t = text.strip()
|
|
n = len(t)
|
|
if n == 0:
|
|
return 0.0
|
|
|
|
alpha = sum(c.isalpha() for c in t) / n
|
|
spaces = sum(c.isspace() for c in t) / n
|
|
punct_ok = sum(c in ".,!?'-:;()[]\"¡¿" for c in t) / n
|
|
bad = len(re.findall(r"[^\w\s\.\,\!\?\-\'\:\;\(\)\[\]\"¡¿]", t)) / n
|
|
|
|
penalty = 0.0
|
|
if re.search(r"\b[A-Z]\b", t):
|
|
penalty += 0.05
|
|
if re.search(r"[0-9]{2,}", t):
|
|
penalty += 0.08
|
|
if re.search(r"(..)\1\1", t):
|
|
penalty += 0.08
|
|
|
|
score = (0.62 * alpha) + (0.10 * spaces) + (0.20 * punct_ok) - (0.45 * bad) - penalty
|
|
return max(0.0, min(1.0, score))
|
|
|
|
|
|
# ============================================================
|
|
# OCR ENGINES (Apple Native Vision)
|
|
# ============================================================
|
|
class MacVisionDetector:
|
|
def __init__(self, source_lang="en"):
|
|
lang_map = {"en": "en-US", "es": "es-ES", "ca": "ca-ES", "fr": "fr-FR", "ja": "ja-JP"}
|
|
apple_lang = lang_map.get(source_lang, "en-US")
|
|
self.langs = [apple_lang]
|
|
print(f"⚡ Using Apple Vision OCR (Language: {self.langs})")
|
|
|
|
def read(self, image_path_or_array):
|
|
if isinstance(image_path_or_array, str):
|
|
img = cv2.imread(image_path_or_array)
|
|
else:
|
|
img = image_path_or_array
|
|
|
|
if img is None or img.size == 0:
|
|
return []
|
|
|
|
ih, iw = img.shape[:2]
|
|
|
|
success, buffer = cv2.imencode('.png', img)
|
|
if not success:
|
|
return []
|
|
|
|
ns_data = NSData.dataWithBytes_length_(buffer.tobytes(), len(buffer.tobytes()))
|
|
handler = Vision.VNImageRequestHandler.alloc().initWithData_options_(ns_data, None)
|
|
results = []
|
|
|
|
def completion_handler(request, error):
|
|
if error:
|
|
print(f"Vision API Error: {error}")
|
|
return
|
|
|
|
for observation in request.results():
|
|
candidate = observation.topCandidates_(1)[0]
|
|
text = candidate.string()
|
|
confidence = candidate.confidence()
|
|
|
|
bbox = observation.boundingBox()
|
|
x = bbox.origin.x * iw
|
|
y_bottom_left = bbox.origin.y * ih
|
|
w = bbox.size.width * iw
|
|
h = bbox.size.height * ih
|
|
|
|
y = ih - y_bottom_left - h
|
|
|
|
quad = [
|
|
[int(x), int(y)],
|
|
[int(x + w), int(y)],
|
|
[int(x + w), int(y + h)],
|
|
[int(x), int(y + h)]
|
|
]
|
|
|
|
results.append((quad, text, confidence))
|
|
|
|
request = Vision.VNRecognizeTextRequest.alloc().initWithCompletionHandler_(completion_handler)
|
|
request.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelAccurate)
|
|
request.setUsesLanguageCorrection_(True)
|
|
request.setRecognitionLanguages_(self.langs)
|
|
|
|
handler.performRequests_error_([request], None)
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================
|
|
# PREPROCESS
|
|
# ============================================================
|
|
def preprocess_variant(crop_bgr, mode):
|
|
gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
|
|
|
|
if mode == "raw":
|
|
return gray
|
|
if mode == "clahe":
|
|
return cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(gray)
|
|
if mode == "adaptive":
|
|
den = cv2.GaussianBlur(gray, (3, 3), 0)
|
|
return cv2.adaptiveThreshold(den, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 11)
|
|
if mode == "otsu":
|
|
den = cv2.GaussianBlur(gray, (3, 3), 0)
|
|
_, th = cv2.threshold(den, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
return th
|
|
if mode == "invert":
|
|
return 255 - gray
|
|
if mode == "bilateral":
|
|
den = cv2.bilateralFilter(gray, 7, 60, 60)
|
|
_, th = cv2.threshold(den, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
return th
|
|
if mode == "morph_open":
|
|
_, th = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
k = np.ones((2, 2), np.uint8)
|
|
return cv2.morphologyEx(th, cv2.MORPH_OPEN, k)
|
|
|
|
return gray
|
|
|
|
|
|
def rotate_image_keep_bounds(img, angle_deg):
|
|
h, w = img.shape[:2]
|
|
c = (w / 2, h / 2)
|
|
M = cv2.getRotationMatrix2D(c, angle_deg, 1.0)
|
|
cos = abs(M[0, 0]); sin = abs(M[0, 1])
|
|
|
|
new_w = int((h * sin) + (w * cos))
|
|
new_h = int((h * cos) + (w * sin))
|
|
M[0, 2] += (new_w / 2) - c[0]
|
|
M[1, 2] += (new_h / 2) - c[1]
|
|
|
|
return cv2.warpAffine(img, M, (new_w, new_h), flags=cv2.INTER_CUBIC, borderValue=255)
|
|
|
|
|
|
def rebuild_text_from_vision_result(res):
|
|
if not res:
|
|
return ""
|
|
|
|
norm = []
|
|
for bbox, txt, conf in res:
|
|
if not txt or not txt.strip():
|
|
continue
|
|
b = quad_bbox(bbox)
|
|
xc = (b[0] + b[2]) / 2.0
|
|
yc = (b[1] + b[3]) / 2.0
|
|
h = max(1.0, b[3] - b[1])
|
|
norm.append((b, txt, conf, xc, yc, h))
|
|
|
|
if not norm:
|
|
return ""
|
|
|
|
med_h = float(np.median([x[5] for x in norm]))
|
|
row_tol = max(6.0, med_h * 0.75)
|
|
|
|
norm.sort(key=lambda z: z[4])
|
|
rows = []
|
|
for it in norm:
|
|
placed = False
|
|
for r in rows:
|
|
if abs(it[4] - r["yc"]) <= row_tol:
|
|
r["m"].append(it)
|
|
r["yc"] = float(np.mean([k[4] for k in r["m"]]))
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
rows.append({"yc": it[4], "m": [it]})
|
|
|
|
rows.sort(key=lambda r: r["yc"])
|
|
lines = []
|
|
for r in rows:
|
|
mem = sorted(r["m"], key=lambda z: z[3])
|
|
line = normalize_text(" ".join(x[1] for x in mem))
|
|
if line:
|
|
lines.append(line)
|
|
|
|
return normalize_text(" ".join(lines))
|
|
|
|
|
|
def reread_bubble_with_vision(
|
|
image_bgr,
|
|
bbox_xyxy,
|
|
vision_detector: MacVisionDetector,
|
|
upscale=3.0,
|
|
pad=24
|
|
):
|
|
ih, iw = image_bgr.shape[:2]
|
|
x1, y1, x2, y2 = bbox_xyxy
|
|
x1 = max(0, int(x1 - pad)); y1 = max(0, int(y1 - pad))
|
|
x2 = min(iw, int(x2 + pad)); y2 = min(ih, int(y2 + pad))
|
|
|
|
crop = image_bgr[y1:y2, x1:x2]
|
|
if crop.size == 0:
|
|
return None, 0.0, "none"
|
|
|
|
modes = ["raw", "clahe", "adaptive", "otsu", "invert", "bilateral", "morph_open"]
|
|
angles = [0.0, 1.5, -1.5]
|
|
|
|
best_v_txt, best_v_sc = "", 0.0
|
|
up0 = cv2.resize(crop, (int(crop.shape[1]*upscale), int(crop.shape[0]*upscale)), interpolation=cv2.INTER_CUBIC)
|
|
|
|
for mode in modes:
|
|
proc = preprocess_variant(up0, mode)
|
|
proc3 = cv2.cvtColor(proc, cv2.COLOR_GRAY2BGR) if len(proc.shape) == 2 else proc
|
|
for a in angles:
|
|
rot = rotate_image_keep_bounds(proc3, a)
|
|
res = vision_detector.read(rot)
|
|
txt = rebuild_text_from_vision_result(res)
|
|
sc = ocr_candidate_score(txt)
|
|
if sc > best_v_sc:
|
|
best_v_txt, best_v_sc = txt, sc
|
|
|
|
if best_v_txt:
|
|
return best_v_txt, best_v_sc, "vision-reread"
|
|
|
|
return None, 0.0, "none"
|
|
|
|
|
|
# ============================================================
|
|
# LINES + BUBBLES
|
|
# ============================================================
|
|
def build_lines_from_indices(indices, ocr):
|
|
if not indices:
|
|
return []
|
|
items = []
|
|
for i in indices:
|
|
b = quad_bbox(ocr[i][0])
|
|
xc = (b[0] + b[2]) / 2.0
|
|
yc = (b[1] + b[3]) / 2.0
|
|
h = max(1.0, b[3] - b[1])
|
|
items.append((i, b, xc, yc, h))
|
|
|
|
med_h = float(np.median([it[4] for it in items])) if items else 10.0
|
|
row_tol = max(6.0, med_h * 0.75)
|
|
|
|
items.sort(key=lambda x: x[3])
|
|
rows = []
|
|
for it in items:
|
|
i, b, xc, yc, h = it
|
|
placed = False
|
|
for r in rows:
|
|
if abs(yc - r["yc"]) <= row_tol:
|
|
r["m"].append((i, b, xc, yc))
|
|
r["yc"] = float(np.mean([k[3] for k in r["m"]]))
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
rows.append({"yc": yc, "m": [(i, b, xc, yc)]})
|
|
|
|
rows.sort(key=lambda r: r["yc"])
|
|
lines = []
|
|
for r in rows:
|
|
mem = sorted(r["m"], key=lambda z: z[2])
|
|
txt = normalize_text(" ".join(ocr[i][1] for i, _, _, _ in mem))
|
|
if txt and not is_noise_text(txt):
|
|
lines.append(txt)
|
|
return lines
|
|
|
|
|
|
def build_line_boxes_from_indices(indices, ocr, image_shape=None):
|
|
if not indices:
|
|
return []
|
|
|
|
items = []
|
|
for i in indices:
|
|
b = quad_bbox(ocr[i][0])
|
|
txt = normalize_text(ocr[i][1])
|
|
if is_noise_text(txt):
|
|
continue
|
|
xc = (b[0] + b[2]) / 2.0
|
|
yc = (b[1] + b[3]) / 2.0
|
|
h = max(1.0, b[3] - b[1])
|
|
items.append({"i": i, "b": b, "txt": txt, "xc": xc, "yc": yc, "h": h})
|
|
|
|
if not items:
|
|
return []
|
|
|
|
med_h = float(np.median([it["h"] for it in items]))
|
|
row_tol = max(6.0, med_h * 0.90)
|
|
gap_x_tol = max(8.0, med_h * 1.25)
|
|
pad = max(3, int(round(med_h * 0.22)))
|
|
|
|
rows = []
|
|
for it in sorted(items, key=lambda x: x["yc"]):
|
|
placed = False
|
|
for r in rows:
|
|
if abs(it["yc"] - r["yc"]) <= row_tol:
|
|
r["m"].append(it)
|
|
r["yc"] = float(np.mean([k["yc"] for k in r["m"]]))
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
rows.append({"yc": it["yc"], "m": [it]})
|
|
|
|
rows.sort(key=lambda r: r["yc"])
|
|
out_boxes = []
|
|
|
|
for r in rows:
|
|
mem = sorted(r["m"], key=lambda z: z["xc"])
|
|
if not mem:
|
|
continue
|
|
|
|
chunks = []
|
|
cur = [mem[0]]
|
|
for t in mem[1:]:
|
|
prev = cur[-1]["b"]
|
|
b = t["b"]
|
|
gap = b[0] - prev[2]
|
|
if gap <= gap_x_tol:
|
|
cur.append(t)
|
|
else:
|
|
chunks.append(cur)
|
|
cur = [t]
|
|
chunks.append(cur)
|
|
|
|
for ch in chunks:
|
|
ub = boxes_union_xyxy([x["b"] for x in ch])
|
|
if ub:
|
|
x1, y1, x2, y2 = ub
|
|
out_boxes.append((x1 - pad, y1 - int(round(pad*1.35)), x2 + pad, y2 + int(round(pad*0.95))))
|
|
|
|
if image_shape is not None:
|
|
ih, iw = image_shape[:2]
|
|
clamped = []
|
|
for b in out_boxes:
|
|
x1 = max(0, int(b[0])); y1 = max(0, int(b[1]))
|
|
x2 = min(iw - 1, int(b[2])); y2 = min(ih - 1, int(b[3]))
|
|
if x2 > x1 and y2 > y1:
|
|
clamped.append((x1, y1, x2, y2))
|
|
out_boxes = clamped
|
|
|
|
out_boxes.sort(key=lambda z: (z[1], z[0]))
|
|
return out_boxes
|
|
|
|
|
|
def auto_gap(image_path, base=18, ref_w=750):
|
|
img = cv2.imread(image_path)
|
|
if img is None:
|
|
return base
|
|
return base * (img.shape[1] / ref_w)
|
|
|
|
|
|
def group_tokens(ocr, image_shape, gap_px=18, bbox_padding=3):
|
|
n = len(ocr)
|
|
if n == 0:
|
|
return {}, {}, {}, {}
|
|
|
|
boxes = [quad_bbox(r[0]) for r in ocr]
|
|
centers = [quad_center(r[0]) for r in ocr]
|
|
hs = [max(1.0, b[3] - b[1]) for b in boxes]
|
|
med_h = float(np.median(hs)) if hs else 12.0
|
|
dist_thresh = max(20.0, med_h * 2.2)
|
|
|
|
p = list(range(n))
|
|
|
|
def find(x):
|
|
while p[x] != x:
|
|
p[x] = p[p[x]]
|
|
x = p[x]
|
|
return x
|
|
|
|
def unite(a, b):
|
|
p[find(a)] = find(b)
|
|
|
|
for i in range(n):
|
|
for j in range(i + 1, n):
|
|
if overlap_or_near(boxes[i], boxes[j], gap=gap_px):
|
|
unite(i, j)
|
|
continue
|
|
cx1, cy1 = centers[i]
|
|
cx2, cy2 = centers[j]
|
|
d = ((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) ** 0.5
|
|
if d <= dist_thresh and abs(cy1 - cy2) <= med_h * 3.0:
|
|
unite(i, j)
|
|
|
|
groups = {}
|
|
for i in range(n):
|
|
groups.setdefault(find(i), []).append(i)
|
|
|
|
sorted_groups = sorted(
|
|
groups.values(),
|
|
key=lambda idxs: (min(boxes[i][1] for i in idxs), min(boxes[i][0] for i in idxs))
|
|
)
|
|
|
|
bubbles, bubble_boxes, bubble_quads, bubble_indices = {}, {}, {}, {}
|
|
ih, iw = image_shape[:2]
|
|
|
|
for bid, idxs in enumerate(sorted_groups, start=1):
|
|
idxs = sorted(idxs, key=lambda k: boxes[k][1])
|
|
lines = build_lines_from_indices(idxs, ocr)
|
|
quads = [ocr[k][0] for k in idxs]
|
|
ub = boxes_union_xyxy([quad_bbox(q) for q in quads])
|
|
if ub is None:
|
|
continue
|
|
|
|
x1, y1, x2, y2 = ub
|
|
x1 = max(0, x1 - bbox_padding); y1 = max(0, y1 - bbox_padding)
|
|
x2 = min(iw - 1, x2 + bbox_padding); y2 = min(ih - 1, y2 + bbox_padding)
|
|
|
|
bubbles[bid] = lines
|
|
bubble_boxes[bid] = (x1, y1, x2, y2)
|
|
bubble_quads[bid] = quads
|
|
bubble_indices[bid] = idxs
|
|
|
|
return bubbles, bubble_boxes, bubble_quads, bubble_indices
|
|
|
|
|
|
# ============================================================
|
|
# DEBUG / EXPORT
|
|
# ============================================================
|
|
def save_debug_clusters(
|
|
image_path,
|
|
ocr,
|
|
bubble_boxes,
|
|
bubble_indices,
|
|
clean_lines=None,
|
|
out_path="debug_clusters.png"
|
|
):
|
|
img = cv2.imread(image_path)
|
|
if img is None:
|
|
return
|
|
|
|
# ── FIX 1: white-fill each OCR quad before drawing its outline ──
|
|
for bbox, txt, conf in ocr:
|
|
pts = np.array(bbox, dtype=np.int32)
|
|
cv2.fillPoly(img, [pts], (255, 255, 255)) # ← white background
|
|
cv2.polylines(img, [pts], True, (180, 180, 180), 1) # ← grey outline
|
|
|
|
for bid, bb in bubble_boxes.items():
|
|
x1, y1, x2, y2 = bb
|
|
|
|
# Draw green bubble bounding box + ID label
|
|
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 220, 0), 2)
|
|
cv2.putText(img, f"BOX#{bid}", (x1 + 2, max(15, y1 + 16)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 220, 0), 2)
|
|
|
|
# ── FIX 2: yellow line-box drawing loop removed entirely ────
|
|
|
|
# Draw translated text overlay below each bubble box
|
|
if clean_lines and bid in clean_lines:
|
|
text = clean_lines[bid]
|
|
words = text.split()
|
|
lines = []
|
|
current_line = ""
|
|
|
|
for word in words:
|
|
if len(current_line) + len(word) < 25:
|
|
current_line += word + " "
|
|
else:
|
|
lines.append(current_line.strip())
|
|
current_line = word + " "
|
|
if current_line:
|
|
lines.append(current_line.strip())
|
|
|
|
y_text = y2 + 18
|
|
for line in lines:
|
|
cv2.putText(img, line, (x1, y_text),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 3)
|
|
cv2.putText(img, line, (x1, y_text),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
|
|
y_text += 18
|
|
|
|
cv2.imwrite(out_path, img)
|
|
|
|
|
|
def estimate_reading_order(bbox_dict, mode="ltr"):
|
|
items = []
|
|
for bid, (x1, y1, x2, y2) in bbox_dict.items():
|
|
cx = (x1 + x2) / 2.0
|
|
cy = (y1 + y2) / 2.0
|
|
items.append((bid, cx, cy))
|
|
|
|
items.sort(key=lambda t: t[2])
|
|
|
|
rows, tol = [], 90
|
|
for it in items:
|
|
placed = False
|
|
for r in rows:
|
|
if abs(it[2] - r["cy"]) <= tol:
|
|
r["items"].append(it)
|
|
r["cy"] = float(np.mean([x[2] for x in r["items"]]))
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
rows.append({"cy": it[2], "items": [it]})
|
|
|
|
rows.sort(key=lambda r: r["cy"])
|
|
order = []
|
|
for r in rows:
|
|
r["items"].sort(key=lambda x: x[1], reverse=(mode == "rtl"))
|
|
order.extend([z[0] for z in r["items"]])
|
|
|
|
return {bid: i + 1 for i, bid in enumerate(order)}
|
|
|
|
|
|
def export_bubbles(filepath, bbox_dict, quads_dict, indices_dict, ocr, reading_map, image_shape):
|
|
out = {}
|
|
for bid, bb in bbox_dict.items():
|
|
x1, y1, x2, y2 = bb
|
|
quads = quads_dict.get(bid, [])
|
|
idxs = indices_dict.get(bid, [])
|
|
|
|
qboxes = [quad_bbox(q) for q in quads]
|
|
text_union = boxes_union_xyxy(qboxes)
|
|
|
|
line_boxes_xyxy = build_line_boxes_from_indices(idxs, ocr, image_shape=image_shape)
|
|
line_union_xyxy = boxes_union_xyxy(line_boxes_xyxy)
|
|
line_union_area = bbox_area_xyxy(line_union_xyxy)
|
|
|
|
out[str(bid)] = {
|
|
"x": int(x1), "y": int(y1), "w": int(x2 - x1), "h": int(y2 - y1),
|
|
"reading_order": int(reading_map.get(bid, bid)),
|
|
"quad_bboxes": [
|
|
{"x": int(b[0]), "y": int(b[1]), "w": int(b[2] - b[0]), "h": int(b[3] - b[1])}
|
|
for b in qboxes
|
|
],
|
|
"quads": [[[int(p[0]), int(p[1])] for p in q] for q in quads],
|
|
"text_bbox": xyxy_to_xywh(text_union),
|
|
"line_bboxes": [xyxy_to_xywh(lb) for lb in line_boxes_xyxy],
|
|
"line_union_bbox": xyxy_to_xywh(line_union_xyxy) if line_union_xyxy else None,
|
|
"line_union_area": int(line_union_area),
|
|
}
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
json.dump(out, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
# ============================================================
|
|
# PIPELINE
|
|
# ============================================================
|
|
def translate_manga_text(
|
|
image_path="001-page.png",
|
|
source_lang="en",
|
|
target_lang="ca",
|
|
confidence_threshold=0.12,
|
|
min_text_length=1,
|
|
gap_px="auto",
|
|
filter_sound_effects=True,
|
|
quality_threshold=0.62,
|
|
export_to_file="output.txt",
|
|
export_bubbles_to="bubbles.json",
|
|
reading_mode="ltr",
|
|
debug=True
|
|
):
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
print(f"❌ Cannot load image: {image_path}")
|
|
return
|
|
|
|
resolved_gap = auto_gap(image_path) if gap_px == "auto" else float(gap_px)
|
|
|
|
print("Loading OCR engines...")
|
|
detector = MacVisionDetector(source_lang=source_lang)
|
|
|
|
print("Running detection OCR (Apple Vision)...")
|
|
raw = detector.read(image_path)
|
|
print(f"Raw detections: {len(raw)}")
|
|
|
|
filtered = []
|
|
skipped = 0
|
|
ih, iw = image.shape[:2]
|
|
|
|
for bbox, text, conf in raw:
|
|
t = normalize_text(text)
|
|
qb = quad_bbox(bbox)
|
|
|
|
if conf < confidence_threshold:
|
|
skipped += 1; continue
|
|
if len(t) < min_text_length:
|
|
skipped += 1; continue
|
|
if is_noise_text(t):
|
|
skipped += 1; continue
|
|
if filter_sound_effects and is_sound_effect(t):
|
|
skipped += 1; continue
|
|
if is_title_text(t):
|
|
skipped += 1; continue
|
|
if qb[1] < int(ih * TOP_BAND_RATIO):
|
|
if conf < 0.70 and len(t) >= 5:
|
|
skipped += 1; continue
|
|
|
|
filtered.append((bbox, t, conf))
|
|
|
|
print(f"Kept: {len(filtered)} | Skipped: {skipped}")
|
|
if not filtered:
|
|
print("⚠️ No text after filtering.")
|
|
return
|
|
|
|
bubbles, bubble_boxes, bubble_quads, bubble_indices = group_tokens(
|
|
filtered, image.shape, gap_px=resolved_gap, bbox_padding=3
|
|
)
|
|
|
|
translator = GoogleTranslator(source=source_lang, target=target_lang)
|
|
|
|
clean_lines: Dict[int, str] = {}
|
|
sources_used: Dict[int, str] = {}
|
|
|
|
for bid, lines in bubbles.items():
|
|
base_txt = normalize_text(" ".join(lines))
|
|
base_sc = ocr_candidate_score(base_txt)
|
|
|
|
txt = base_txt
|
|
src_used = "vision-base"
|
|
|
|
if base_sc < quality_threshold:
|
|
rr_txt, rr_sc, rr_src = reread_bubble_with_vision(
|
|
image_bgr=image,
|
|
bbox_xyxy=bubble_boxes[bid],
|
|
vision_detector=detector,
|
|
upscale=3.0,
|
|
pad=24
|
|
)
|
|
if rr_txt and rr_sc > base_sc + 0.04:
|
|
txt = rr_txt
|
|
src_used = rr_src
|
|
|
|
txt = txt.replace(" BOMPORTA", " IMPORTA")
|
|
txt = txt.replace(" TESTO ", " ESTO ")
|
|
txt = txt.replace(" MIVERDAD", " MI VERDAD")
|
|
|
|
clean_lines[bid] = apply_glossary(normalize_text(txt))
|
|
sources_used[bid] = src_used
|
|
|
|
reading_map = estimate_reading_order(bubble_boxes, mode=reading_mode)
|
|
|
|
if debug:
|
|
save_debug_clusters(
|
|
image_path=image_path,
|
|
ocr=filtered,
|
|
bubble_boxes=bubble_boxes,
|
|
bubble_indices=bubble_indices,
|
|
clean_lines=clean_lines,
|
|
out_path="debug_clusters.png"
|
|
)
|
|
|
|
divider = "─" * 120
|
|
out_lines = ["BUBBLE|ORDER|OCR_SOURCE|ORIGINAL|TRANSLATED|FLAGS", divider]
|
|
|
|
print(divider)
|
|
print(f"{'BUBBLE':<8} {'ORDER':<6} {'SOURCE':<12} {'ORIGINAL':<40} {'TRANSLATED':<40} FLAGS")
|
|
print(divider)
|
|
|
|
translated_count = 0
|
|
for bid in sorted(clean_lines.keys(), key=lambda x: reading_map.get(x, x)):
|
|
src_txt = clean_lines[bid].strip()
|
|
if not src_txt:
|
|
continue
|
|
|
|
flags = []
|
|
try:
|
|
tgt = translator.translate(src_txt) or ""
|
|
except Exception as e:
|
|
tgt = f"[Translation error: {e}]"
|
|
flags.append("TRANSLATION_ERROR")
|
|
|
|
tgt = apply_glossary(postprocess_translation_general(tgt)).upper()
|
|
src_u = src_txt.upper()
|
|
src_engine = sources_used.get(bid, "unknown")
|
|
|
|
out_lines.append(
|
|
f"#{bid}|{reading_map.get(bid,bid)}|{src_engine}|{src_u}|{tgt}|{','.join(flags) if flags else '-'}"
|
|
)
|
|
|
|
print(
|
|
f"#{bid:<7} {reading_map.get(bid,bid):<6} {src_engine:<12} "
|
|
f"{src_u[:40]:<40} {tgt[:40]:<40} {','.join(flags) if flags else '-'}"
|
|
)
|
|
translated_count += 1
|
|
|
|
out_lines.append(divider)
|
|
out_lines.append(f"✅ Done! {translated_count} bubble(s) translated, {skipped} detection(s) skipped.")
|
|
|
|
with open(export_to_file, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(out_lines))
|
|
|
|
export_bubbles(
|
|
export_bubbles_to,
|
|
bbox_dict=bubble_boxes,
|
|
quads_dict=bubble_quads,
|
|
indices_dict=bubble_indices,
|
|
ocr=filtered,
|
|
reading_map=reading_map,
|
|
image_shape=image.shape
|
|
)
|
|
|
|
print(divider)
|
|
print(f"Saved: {export_to_file}")
|
|
print(f"Saved: {export_bubbles_to}")
|
|
if debug:
|
|
print("Saved: debug_clusters.png")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
translate_manga_text(
|
|
image_path="004.png",
|
|
source_lang="en",
|
|
target_lang="ca",
|
|
confidence_threshold=0.12,
|
|
min_text_length=2,
|
|
gap_px="auto",
|
|
filter_sound_effects=True,
|
|
quality_threshold=0.62,
|
|
export_to_file="output.txt",
|
|
export_bubbles_to="bubbles.json",
|
|
reading_mode="ltr",
|
|
debug=True
|
|
)
|