559 lines
21 KiB
Python
559 lines
21 KiB
Python
import re
|
||
import os
|
||
import cv2
|
||
import numpy as np
|
||
import easyocr
|
||
from deep_translator import GoogleTranslator
|
||
from sklearn.cluster import DBSCAN
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# LANGUAGE CODE REFERENCE
|
||
# ─────────────────────────────────────────────
|
||
SUPPORTED_LANGUAGES = {
|
||
"Vietnamese" : "vi",
|
||
"Japanese" : "ja",
|
||
"English" : "en",
|
||
"Spanish" : "es",
|
||
"Korean" : "ko",
|
||
"Chinese (Simplified)" : "ch_sim",
|
||
"Chinese (Traditional)": "ch_tra",
|
||
"French" : "fr",
|
||
"German" : "de",
|
||
"Italian" : "it",
|
||
"Portuguese" : "pt",
|
||
"Arabic" : "ar",
|
||
"Russian" : "ru",
|
||
"Thai" : "th",
|
||
"Catalan" : "ca",
|
||
}
|
||
|
||
# ─────────────────────────────────────────────
|
||
# SOUND EFFECT FILTER
|
||
# ─────────────────────────────────────────────
|
||
SOUND_EFFECT_PATTERNS = [
|
||
r"^b+i+p+$",
|
||
r"^sha+$",
|
||
r"^ha+$",
|
||
r"^ah+$",
|
||
r"^oh+$",
|
||
r"^ugh+$",
|
||
r"^gr+$",
|
||
r"^bam+$",
|
||
r"^pow+$",
|
||
r"^crash+$",
|
||
r"^boom+$",
|
||
r"^bang+$",
|
||
r"^crack+$",
|
||
r"^whoosh+$",
|
||
r"^thud+$",
|
||
r"^snap+$",
|
||
r"^zip+$",
|
||
r"^swoosh+$",
|
||
]
|
||
|
||
def is_sound_effect(text):
|
||
cleaned = re.sub(r"[^a-z]", "", text.strip().lower())
|
||
return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# BOUNDING BOX HELPERS
|
||
# ─────────────────────────────────────────────
|
||
def get_cluster_bbox(items):
|
||
"""
|
||
Returns (x1, y1, x2, y2) tight bounding box around
|
||
all (cy, cx, text) center points in a cluster.
|
||
Uses a fixed half-size approximation per text block.
|
||
"""
|
||
half = 30
|
||
x1 = min(cx for _, cx, _ in items) - half
|
||
y1 = min(cy for cy, _, _ in items) - half
|
||
x2 = max(cx for _, cx, _ in items) + half
|
||
y2 = max(cy for cy, _, _ in items) + half
|
||
return x1, y1, x2, y2
|
||
|
||
|
||
def boxes_are_close(bbox_a, bbox_b, proximity_px=80):
|
||
"""
|
||
Returns True if two (x1,y1,x2,y2) boxes are within
|
||
proximity_px pixels of each other (or overlapping).
|
||
"""
|
||
ax1, ay1, ax2, ay2 = bbox_a
|
||
bx1, by1, bx2, by2 = bbox_b
|
||
ax1 -= proximity_px; ay1 -= proximity_px
|
||
ax2 += proximity_px; ay2 += proximity_px
|
||
return not (ax2 < bx1 or bx2 < ax1 or ay2 < by1 or by2 < ay1)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# POST-CLUSTER MERGE (Union-Find)
|
||
# ─────────────────────────────────────────────
|
||
def merge_nearby_clusters(raw_clusters, proximity_px=80):
|
||
"""
|
||
Merges clusters whose bounding boxes are within
|
||
proximity_px pixels of each other.
|
||
Fixes split bubbles without changing eps globally.
|
||
"""
|
||
labels = list(raw_clusters.keys())
|
||
bboxes = {lbl: get_cluster_bbox(raw_clusters[lbl]) for lbl in labels}
|
||
|
||
parent = {lbl: lbl for lbl in labels}
|
||
|
||
def find(x):
|
||
while parent[x] != x:
|
||
parent[x] = parent[parent[x]]
|
||
x = parent[x]
|
||
return x
|
||
|
||
def union(x, y):
|
||
parent[find(x)] = find(y)
|
||
|
||
for i in range(len(labels)):
|
||
for j in range(i + 1, len(labels)):
|
||
a, b = labels[i], labels[j]
|
||
if boxes_are_close(bboxes[a], bboxes[b], proximity_px):
|
||
union(a, b)
|
||
|
||
merged = {}
|
||
for lbl in labels:
|
||
root = find(lbl)
|
||
merged.setdefault(root, [])
|
||
merged[root].extend(raw_clusters[lbl])
|
||
|
||
return merged
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# CROP-BASED OCR RE-READ
|
||
# For each cluster bounding box, crop the
|
||
# original image with padding and re-run OCR
|
||
# at higher quality. This fixes garbled text
|
||
# in small or low-contrast bubbles.
|
||
# ─────────────────────────────────────────────
|
||
def reread_cluster_crop(
|
||
image,
|
||
bbox,
|
||
reader,
|
||
source_lang,
|
||
padding_px=20,
|
||
upscale_factor=2.5,
|
||
):
|
||
"""
|
||
Crops a cluster region from the full image, upscales it,
|
||
and re-runs OCR for higher accuracy on small text.
|
||
|
||
Args:
|
||
image : Full-page image as numpy array (BGR)
|
||
bbox : (x1, y1, x2, y2) cluster bounding box
|
||
reader : Initialized EasyOCR Reader
|
||
source_lang : Language code string
|
||
padding_px : Pixels of padding around the crop (default: 20)
|
||
upscale_factor: How much to enlarge the crop before OCR (default: 2.5)
|
||
|
||
Returns:
|
||
Single cleaned string with all OCR lines merged top-to-bottom,
|
||
or None if OCR found nothing.
|
||
"""
|
||
img_h, img_w = image.shape[:2]
|
||
x1, y1, x2, y2 = bbox
|
||
|
||
# Add padding, clamp to image bounds
|
||
x1 = max(0, int(x1) - padding_px)
|
||
y1 = max(0, int(y1) - padding_px)
|
||
x2 = min(img_w, int(x2) + padding_px)
|
||
y2 = min(img_h, int(y2) + padding_px)
|
||
|
||
crop = image[y1:y2, x1:x2]
|
||
if crop.size == 0:
|
||
return None
|
||
|
||
# Upscale for better OCR on small text
|
||
new_w = int(crop.shape[1] * upscale_factor)
|
||
new_h = int(crop.shape[0] * upscale_factor)
|
||
upscaled = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
|
||
|
||
# Light sharpening to improve OCR on manga fonts
|
||
kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
|
||
sharpened = cv2.filter2D(upscaled, -1, kernel)
|
||
|
||
# Save temp crop and OCR it
|
||
temp_path = "_temp_crop_ocr.png"
|
||
cv2.imwrite(temp_path, sharpened)
|
||
|
||
try:
|
||
crop_results = reader.readtext(temp_path, paragraph=False)
|
||
finally:
|
||
if os.path.exists(temp_path):
|
||
os.remove(temp_path)
|
||
|
||
if not crop_results:
|
||
return None
|
||
|
||
# Sort detections top-to-bottom and join lines
|
||
crop_results.sort(key=lambda r: r[0][0][1]) # sort by top-left Y
|
||
lines = [text.strip() for _, text, conf in crop_results if text.strip()]
|
||
|
||
return fix_hyphens(lines) if lines else None
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# DBSCAN BUBBLE CLUSTERING
|
||
# ─────────────────────────────────────────────
|
||
def cluster_into_bubbles(ocr_results, eps=80, min_samples=1, proximity_px=80):
|
||
"""
|
||
Two-pass clustering:
|
||
Pass 1 — DBSCAN on center points
|
||
Pass 2 — Bounding-box proximity merge
|
||
|
||
Returns:
|
||
bubble_dict : cluster_id → list of (cy, cx, text)
|
||
bbox_dict : cluster_id → (x1, y1, x2, y2)
|
||
"""
|
||
if not ocr_results:
|
||
return {}, {}
|
||
|
||
centers = []
|
||
for bbox, text, confidence in ocr_results:
|
||
xs = [pt[0] for pt in bbox]
|
||
ys = [pt[1] for pt in bbox]
|
||
centers.append([sum(xs) / 4, sum(ys) / 4])
|
||
|
||
centers_array = np.array(centers, dtype=np.float32)
|
||
|
||
db = DBSCAN(eps=eps, min_samples=min_samples, metric="euclidean")
|
||
labels = db.fit_predict(centers_array)
|
||
|
||
raw_clusters = {}
|
||
noise_counter = int(max(labels, default=0)) + 1
|
||
|
||
for idx, label in enumerate(labels):
|
||
if label == -1:
|
||
label = noise_counter
|
||
noise_counter += 1
|
||
raw_clusters.setdefault(label, [])
|
||
bbox, text, _ = ocr_results[idx]
|
||
raw_clusters[label].append((centers[idx][1], centers[idx][0], text))
|
||
|
||
print(f" DBSCAN pass: {len(raw_clusters)} cluster(s)")
|
||
|
||
merged_clusters = merge_nearby_clusters(raw_clusters, proximity_px=proximity_px)
|
||
print(f" After merge: {len(merged_clusters)} cluster(s)")
|
||
|
||
# Sort in reading order
|
||
row_band_px = 150
|
||
|
||
def cluster_sort_key(items):
|
||
return (min(cy for cy, cx, _ in items) // row_band_px,
|
||
min(cx for cy, cx, _ in items))
|
||
|
||
sorted_clusters = sorted(merged_clusters.values(), key=cluster_sort_key)
|
||
|
||
bubble_dict = {}
|
||
bbox_dict = {}
|
||
|
||
for i, items in enumerate(sorted_clusters, start=1):
|
||
items_sorted = sorted(items, key=lambda t: t[0])
|
||
bubble_dict[i] = [text for _, _, text in items_sorted]
|
||
bbox_dict[i] = get_cluster_bbox(items)
|
||
|
||
return bubble_dict, bbox_dict
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# HYPHEN REMOVAL
|
||
# ─────────────────────────────────────────────
|
||
def fix_hyphens(lines):
|
||
if not lines:
|
||
return ""
|
||
merged = lines[0]
|
||
for line in lines[1:]:
|
||
line = line.strip()
|
||
merged = merged[:-1] + line if merged.endswith("-") else merged + " " + line
|
||
return re.sub(r" {2,}", " ", merged).strip()
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# AUTO EPS
|
||
# ─────────────────────────────────────────────
|
||
def compute_auto_eps(image_path, base_eps=80, reference_width=750):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
return base_eps
|
||
img_w = image.shape[1]
|
||
scaled = base_eps * (img_w / reference_width)
|
||
print(f" ℹ️ Image width: {img_w}px → auto eps: {scaled:.1f}px")
|
||
return scaled
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# OCR QUALITY SCORE
|
||
# Heuristic to detect garbled OCR output.
|
||
# Low score = likely garbage, trigger re-read.
|
||
# ─────────────────────────────────────────────
|
||
def ocr_quality_score(text):
|
||
"""
|
||
Returns a quality score 0.0–1.0 for an OCR result.
|
||
|
||
Penalises:
|
||
- High ratio of non-alphabetic characters
|
||
- Very short text (< 4 chars)
|
||
- Suspicious character combos (,,- etc.)
|
||
|
||
A score below 0.5 triggers a crop re-read.
|
||
"""
|
||
if not text or len(text) < 2:
|
||
return 0.0
|
||
|
||
alpha_chars = sum(1 for c in text if c.isalpha())
|
||
total_chars = len(text)
|
||
alpha_ratio = alpha_chars / total_chars
|
||
|
||
# Penalise suspicious patterns
|
||
garbage_patterns = [r",,", r"\.\.-", r"[^\w\s\'\!\?\.,-]{2,}"]
|
||
penalty = sum(0.2 for p in garbage_patterns if re.search(p, text))
|
||
|
||
score = alpha_ratio - penalty
|
||
return max(0.0, min(1.0, score))
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# DEBUG CLUSTER IMAGE
|
||
# ─────────────────────────────────────────────
|
||
def save_debug_clusters(image_path, ocr_results, bubble_dict):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
return
|
||
|
||
np.random.seed(42)
|
||
num_bubbles = max(bubble_dict.keys(), default=1)
|
||
colors = [
|
||
tuple(int(c) for c in col)
|
||
for col in np.random.randint(50, 230, size=(num_bubbles + 2, 3))
|
||
]
|
||
|
||
text_to_bubble = {}
|
||
for bubble_id, lines in bubble_dict.items():
|
||
for line in lines:
|
||
text_to_bubble[line] = bubble_id
|
||
|
||
for bbox, text, _ in ocr_results:
|
||
bubble_id = text_to_bubble.get(text, 0)
|
||
color = colors[(bubble_id - 1) % len(colors)]
|
||
pts = np.array(bbox, dtype=np.int32)
|
||
cv2.polylines(image, [pts], isClosed=True, color=color, thickness=2)
|
||
x = int(pts[0][0])
|
||
y = max(int(pts[0][1]) - 5, 12)
|
||
cv2.putText(image, f"#{bubble_id}", (x, y),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
||
|
||
cv2.imwrite("debug_clusters.png", image)
|
||
print(" 🐛 Cluster debug saved → debug_clusters.png")
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# CORE FUNCTION
|
||
# ─────────────────────────────────────────────
|
||
def translate_manga_text(
|
||
image_path,
|
||
source_lang="it",
|
||
target_lang="ca",
|
||
confidence_threshold=0.15,
|
||
export_to_file=None,
|
||
min_text_length=2,
|
||
cluster_eps="auto",
|
||
proximity_px=80,
|
||
filter_sound_effects=True,
|
||
quality_threshold=0.5, # below this → trigger crop re-read
|
||
upscale_factor=2.5, # crop upscale multiplier for re-read
|
||
debug=False,
|
||
):
|
||
"""
|
||
Full pipeline:
|
||
OCR → filter → DBSCAN cluster → proximity merge
|
||
→ quality check → crop re-read if needed
|
||
→ fix hyphens → translate
|
||
|
||
Args:
|
||
image_path : Path to your image file
|
||
source_lang : Source language code (default: 'it')
|
||
target_lang : Target language code (default: 'ca')
|
||
confidence_threshold : Min OCR confidence (default: 0.15)
|
||
export_to_file : Save output to .txt (default: None)
|
||
min_text_length : Min characters per detection(default: 2)
|
||
cluster_eps : DBSCAN eps or 'auto' (default: 'auto')
|
||
proximity_px : Post-merge proximity px (default: 80)
|
||
filter_sound_effects : Skip onomatopoeia/SFX (default: True)
|
||
quality_threshold : Min quality score 0–1 before re-read (default: 0.5)
|
||
upscale_factor : Crop upscale for re-read (default: 2.5)
|
||
debug : Save debug_clusters.png (default: False)
|
||
"""
|
||
|
||
# ── 1. Resolve eps ────────────────────────────────────────────────────────
|
||
if cluster_eps == "auto":
|
||
print("Computing auto eps...")
|
||
eps = compute_auto_eps(image_path)
|
||
else:
|
||
eps = float(cluster_eps)
|
||
|
||
# ── 2. Load full image (needed for crop re-reads) ─────────────────────────
|
||
full_image = cv2.imread(image_path)
|
||
if full_image is None:
|
||
print(f"❌ Could not load image: {image_path}")
|
||
return
|
||
|
||
# ── 3. Initialize OCR ─────────────────────────────────────────────────────
|
||
print("\nLoading OCR model...")
|
||
ocr_lang_list = ["en", "es"] if source_lang == "ca" else [source_lang]
|
||
reader = easyocr.Reader(ocr_lang_list)
|
||
|
||
# ── 4. Initialize translator ──────────────────────────────────────────────
|
||
translator = GoogleTranslator(source=source_lang, target=target_lang)
|
||
|
||
# ── 5. Run OCR on full image ──────────────────────────────────────────────
|
||
print(f"\nRunning OCR on: {image_path}")
|
||
results = reader.readtext(image_path, paragraph=False)
|
||
print(f" Raw detections: {len(results)}")
|
||
|
||
# ── 6. Filter detections ──────────────────────────────────────────────────
|
||
filtered = []
|
||
skipped = 0
|
||
|
||
for bbox, text, confidence in results:
|
||
cleaned = text.strip()
|
||
if confidence < confidence_threshold:
|
||
skipped += 1
|
||
continue
|
||
if len(cleaned) < min_text_length:
|
||
skipped += 1
|
||
continue
|
||
if re.fullmatch(r"[\d\W]+", cleaned):
|
||
skipped += 1
|
||
continue
|
||
if filter_sound_effects and is_sound_effect(cleaned):
|
||
print(f" 🔇 SFX skipped: '{cleaned}'")
|
||
skipped += 1
|
||
continue
|
||
filtered.append((bbox, cleaned, confidence))
|
||
|
||
print(f" ✅ {len(filtered)} detection(s) kept, {skipped} skipped.\n")
|
||
|
||
if not filtered:
|
||
print("⚠️ No text detected after filtering.")
|
||
return
|
||
|
||
# ── 7. Cluster + merge ────────────────────────────────────────────────────
|
||
print(f"Clustering detections (eps={eps:.1f}px, proximity={proximity_px}px)...")
|
||
bubble_dict, bbox_dict = cluster_into_bubbles(
|
||
filtered, eps=eps, proximity_px=proximity_px
|
||
)
|
||
print(f" ✅ {len(bubble_dict)} bubble(s) after merge.\n")
|
||
|
||
# ── 8. Debug image ────────────────────────────────────────────────────────
|
||
if debug:
|
||
save_debug_clusters(image_path, filtered, bubble_dict)
|
||
|
||
# ── 9. Fix hyphens → first-pass text ─────────────────────────────────────
|
||
clean_bubbles = {
|
||
i: fix_hyphens(lines)
|
||
for i, lines in bubble_dict.items()
|
||
if lines
|
||
}
|
||
|
||
# ── 10. Quality check → crop re-read for low-quality bubbles ─────────────
|
||
print("Checking OCR quality per bubble...")
|
||
for i, text in clean_bubbles.items():
|
||
score = ocr_quality_score(text)
|
||
status = "✅" if score >= quality_threshold else "🔁"
|
||
print(f" Bubble #{i}: score={score:.2f} {status} '{text[:60]}'")
|
||
|
||
if score < quality_threshold:
|
||
print(f" → Re-reading bubble #{i} from crop...")
|
||
reread = reread_cluster_crop(
|
||
full_image,
|
||
bbox_dict[i],
|
||
reader,
|
||
source_lang,
|
||
upscale_factor=upscale_factor,
|
||
)
|
||
if reread:
|
||
print(f" → Re-read result: '{reread}'")
|
||
clean_bubbles[i] = reread
|
||
else:
|
||
print(f" → Re-read returned nothing, keeping original.")
|
||
|
||
# ── 11. Translate & print ─────────────────────────────────────────────────
|
||
print()
|
||
header = f"{'BUBBLE':<8} {'ORIGINAL (Italian)':<50} {'TRANSLATED (Catalan)'}"
|
||
divider = "─" * 105
|
||
|
||
output_lines = [header, divider]
|
||
print(header)
|
||
print(divider)
|
||
|
||
translated_count = 0
|
||
|
||
for i in sorted(clean_bubbles.keys()):
|
||
bubble_text = clean_bubbles[i].strip()
|
||
if not bubble_text:
|
||
continue
|
||
|
||
try:
|
||
translated = translator.translate(bubble_text)
|
||
except Exception as e:
|
||
translated = f"[Translation error: {e}]"
|
||
|
||
if translated is None:
|
||
translated = "[No translation returned]"
|
||
|
||
translated_count += 1
|
||
line = f"#{i:<7} {bubble_text:<50} {translated}"
|
||
print(line)
|
||
output_lines.append(line)
|
||
|
||
output_lines.append(divider)
|
||
summary = (
|
||
f"✅ Done! {translated_count} bubble(s) translated, "
|
||
f"{skipped} detection(s) skipped."
|
||
)
|
||
output_lines.append(summary)
|
||
print(divider)
|
||
print(summary)
|
||
|
||
# ── 12. Export ────────────────────────────────────────────────────────────
|
||
if export_to_file:
|
||
with open(export_to_file, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(output_lines))
|
||
print(f"📄 Output saved to: {export_to_file}")
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# HELPER
|
||
# ─────────────────────────────────────────────
|
||
def list_languages():
|
||
print(f"\n{'LANGUAGE':<30} {'CODE'}")
|
||
print("─" * 40)
|
||
for name, code in SUPPORTED_LANGUAGES.items():
|
||
print(f"{name:<30} {code}")
|
||
print("─" * 40)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# ENTRY POINT
|
||
# ─────────────────────────────────────────────
|
||
if __name__ == "__main__":
|
||
|
||
translate_manga_text(
|
||
image_path = "page.png",
|
||
source_lang = "it",
|
||
target_lang = "ca",
|
||
confidence_threshold = 0.15,
|
||
min_text_length = 2,
|
||
export_to_file = "output.txt",
|
||
cluster_eps = "auto",
|
||
proximity_px = 80,
|
||
filter_sound_effects = True,
|
||
quality_threshold = 0.5, # bubbles scoring below this get re-read
|
||
upscale_factor = 2.5, # how much to enlarge the crop for re-read
|
||
debug = True,
|
||
) |