import re import os import json import cv2 import numpy as np import easyocr from deep_translator import GoogleTranslator from sklearn.cluster import DBSCAN # ───────────────────────────────────────────── # LANGUAGE CODE REFERENCE # ───────────────────────────────────────────── SUPPORTED_LANGUAGES = { "Vietnamese" : "vi", "Japanese" : "ja", "English" : "en", "Spanish" : "es", "Korean" : "ko", "Chinese (Simplified)" : "ch_sim", "Chinese (Traditional)": "ch_tra", "French" : "fr", "German" : "de", "Italian" : "it", "Portuguese" : "pt", "Arabic" : "ar", "Russian" : "ru", "Thai" : "th", "Catalan" : "ca", } # ───────────────────────────────────────────── # SOUND EFFECT FILTER # ───────────────────────────────────────────── SOUND_EFFECT_PATTERNS = [ r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$", r"^oh+$", r"^ugh+$", r"^gr+$", r"^bam+$", r"^pow+$", r"^crash+$", r"^boom+$", r"^bang+$", r"^crack+$", r"^whoosh+$", r"^thud+$", r"^snap+$", r"^zip+$", r"^swoosh+$", ] def is_sound_effect(text): cleaned = re.sub(r"[^a-z]", "", text.strip().lower()) return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS) # ───────────────────────────────────────────── # TOKEN CLASSIFIER # ───────────────────────────────────────────── def classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects): """ Returns one of: "alpha" | "punct" | "noise" Rules (in order): 1. confidence below threshold → noise 2. shorter than min_text_length → noise 3. pure digit string → noise 4. single non-alpha character → noise 5. sound effect (if filter enabled) → noise 6. 2+ chars with no letters → punct 7. has at least one letter → alpha """ cleaned = text.strip() if confidence < confidence_threshold: return "noise" if len(cleaned) < min_text_length: return "noise" if re.fullmatch(r"\d+", cleaned): return "noise" if len(cleaned) == 1 and not cleaned.isalpha(): return "noise" if filter_sound_effects and is_sound_effect(cleaned): return "noise" if not any(ch.isalpha() for ch in cleaned): return "punct" return "alpha" def should_keep_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects): """ Backward-compatible wrapper. Returns (keep: bool, category: str). """ cat = classify_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects) return cat != "noise", cat # ───────────────────────────────────────────── # BOUNDING BOX # # Flat union of ALL quad corners. # Handles every layout correctly: # • "HN" + "..." same line → horizontal union # • Multi-line bubbles → vertical union # • Rotated/skewed quads → all 4 corners included # ───────────────────────────────────────────── def get_cluster_bbox_from_ocr(ocr_bboxes, image_shape, padding_px=10): """ Computes the bubble erase bbox by taking the flat union of ALL quad corners. Args: ocr_bboxes : List of EasyOCR quad bboxes Each = [[x0,y0],[x1,y1],[x2,y2],[x3,y3]] image_shape : (height, width) for clamping padding_px : Expansion on each side (default: 10) Returns: (x1, y1, x2, y2) clamped to image bounds """ img_h, img_w = image_shape[:2] if not ocr_bboxes: return 0, 0, 0, 0 all_x = [pt[0] for quad in ocr_bboxes for pt in quad] all_y = [pt[1] for quad in ocr_bboxes for pt in quad] x1 = max(0, min(all_x) - padding_px) y1 = max(0, min(all_y) - padding_px) x2 = min(img_w, max(all_x) + padding_px) y2 = min(img_h, max(all_y) + padding_px) return x1, y1, x2, y2 def get_cluster_bbox(items): """Fallback center-point bbox — used only during merge step.""" half = 30 x1 = min(cx for _, cx, _ in items) - half y1 = min(cy for cy, _, _ in items) - half x2 = max(cx for _, cx, _ in items) + half y2 = max(cy for cy, _, _ in items) + half return x1, y1, x2, y2 def boxes_are_close(bbox_a, bbox_b, proximity_px=80): ax1, ay1, ax2, ay2 = bbox_a bx1, by1, bx2, by2 = bbox_b ax1 -= proximity_px; ay1 -= proximity_px ax2 += proximity_px; ay2 += proximity_px return not (ax2 < bx1 or bx2 < ax1 or ay2 < by1 or by2 < ay1) # ───────────────────────────────────────────── # POST-CLUSTER MERGE (Union-Find) # ───────────────────────────────────────────── def merge_nearby_clusters(raw_clusters, raw_quads, proximity_px=80): labels = list(raw_clusters.keys()) bboxes = {lbl: get_cluster_bbox(raw_clusters[lbl]) for lbl in labels} parent = {lbl: lbl for lbl in labels} def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(x, y): parent[find(x)] = find(y) for i in range(len(labels)): for j in range(i + 1, len(labels)): a, b = labels[i], labels[j] if boxes_are_close(bboxes[a], bboxes[b], proximity_px): union(a, b) merged_clusters = {} merged_quads = {} for lbl in labels: root = find(lbl) merged_clusters.setdefault(root, []) merged_quads.setdefault(root, []) merged_clusters[root].extend(raw_clusters[lbl]) merged_quads[root].extend(raw_quads[lbl]) return merged_clusters, merged_quads # ───────────────────────────────────────────── # CROP-BASED OCR RE-READ # ───────────────────────────────────────────── def reread_cluster_crop(image, bbox, reader, source_lang, padding_px=20, upscale_factor=2.5): img_h, img_w = image.shape[:2] x1, y1, x2, y2 = bbox x1 = max(0, int(x1) - padding_px) y1 = max(0, int(y1) - padding_px) x2 = min(img_w, int(x2) + padding_px) y2 = min(img_h, int(y2) + padding_px) crop = image[y1:y2, x1:x2] if crop.size == 0: return None new_w = int(crop.shape[1] * upscale_factor) new_h = int(crop.shape[0] * upscale_factor) upscaled = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC) kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) sharpened = cv2.filter2D(upscaled, -1, kernel) temp_path = "_temp_crop_ocr.png" cv2.imwrite(temp_path, sharpened) try: crop_results = reader.readtext(temp_path, paragraph=False) finally: if os.path.exists(temp_path): os.remove(temp_path) if not crop_results: return None crop_results.sort(key=lambda r: r[0][0][1]) lines = [t.strip() for _, t, _ in crop_results if t.strip()] return fix_hyphens(lines) if lines else None # ───────────────────────────────────────────── # DBSCAN BUBBLE CLUSTERING # ───────────────────────────────────────────── def cluster_into_bubbles(ocr_results, image_shape, eps=80, min_samples=1, proximity_px=80, bbox_padding=10): """ Two-pass clustering: Pass 1 — DBSCAN on center points Pass 2 — Bounding-box proximity merge Token handling per cluster: "alpha" tokens → translation text + bbox "punct" tokens → bbox included, appended to nearest alpha line by Y distance (e.g. "..." joins "HN" → "HN...") Bbox uses flat union of ALL quad corners: min/max of all x,y across every quad in the cluster. Returns: bubble_dict : cluster_id → list of text lines bbox_dict : cluster_id → (x1, y1, x2, y2) ocr_quads : cluster_id → list of ALL raw quads """ if not ocr_results: return {}, {}, {} centers = [] for bbox, text, confidence in ocr_results: xs = [pt[0] for pt in bbox] ys = [pt[1] for pt in bbox] centers.append([sum(xs) / 4, sum(ys) / 4]) centers_array = np.array(centers, dtype=np.float32) db = DBSCAN(eps=eps, min_samples=min_samples, metric="euclidean") labels = db.fit_predict(centers_array) raw_clusters = {} raw_quads = {} noise_counter = int(max(labels, default=0)) + 1 for idx, label in enumerate(labels): if label == -1: label = noise_counter noise_counter += 1 raw_clusters.setdefault(label, []) raw_quads.setdefault(label, []) bbox, text, _ = ocr_results[idx] raw_clusters[label].append( (centers[idx][1], centers[idx][0], text)) raw_quads[label].append(bbox) print(f" DBSCAN pass: {len(raw_clusters)} cluster(s)") merged_clusters, merged_quads = merge_nearby_clusters( raw_clusters, raw_quads, proximity_px=proximity_px ) print(f" After merge: {len(merged_clusters)} cluster(s)") row_band_px = 150 def cluster_sort_key(items): return (min(cy for cy, cx, _ in items) // row_band_px, min(cx for cy, cx, _ in items)) sorted_labels = sorted( merged_clusters.keys(), key=lambda lbl: cluster_sort_key(merged_clusters[lbl]) ) bubble_dict = {} bbox_dict = {} ocr_quads = {} for i, lbl in enumerate(sorted_labels, start=1): items = merged_clusters[lbl] quads = merged_quads[lbl] items_sorted = sorted(items, key=lambda t: t[0]) # ── Separate alpha and punct tokens ─────────────────────── alpha_lines = [] # (cy, text) punct_tokens = [] # (cy, text) for cy, cx, text in items_sorted: if any(ch.isalpha() for ch in text): alpha_lines.append((cy, text)) else: punct_tokens.append((cy, text)) # ── Append punct to closest alpha line by Y ─────────────── for pcy, ptext in punct_tokens: if alpha_lines: closest_idx = min( range(len(alpha_lines)), key=lambda k: abs(alpha_lines[k][0] - pcy) ) cy_a, text_a = alpha_lines[closest_idx] alpha_lines[closest_idx] = (cy_a, text_a + ptext) text_lines = [t for _, t in alpha_lines] # Fallback: no alpha at all → keep everything as-is if not text_lines: text_lines = [text for _, _, text in items_sorted] bubble_dict[i] = text_lines ocr_quads[i] = quads # ALL quads → full bbox coverage bbox_dict[i] = get_cluster_bbox_from_ocr( quads, image_shape, padding_px=bbox_padding ) b = bbox_dict[i] print(f" Cluster #{i}: {len(quads)} quad(s) " f"bbox=({int(b[0])},{int(b[1])})→" f"({int(b[2])},{int(b[3])}) " f"w={int(b[2]-b[0])} h={int(b[3]-b[1])} " f"text={text_lines}") return bubble_dict, bbox_dict, ocr_quads # ───────────────────────────────────────────── # HYPHEN REMOVAL # ───────────────────────────────────────────── def fix_hyphens(lines): """ Joins lines, merging mid-word hyphens. e.g. ["GRAVEMEN-", "TE"] → "GRAVEMENTE" """ if not lines: return "" merged = lines[0] for line in lines[1:]: line = line.strip() merged = (merged[:-1] + line if merged.endswith("-") else merged + " " + line) return re.sub(r" {2,}", " ", merged).strip() # ───────────────────────────────────────────── # AUTO EPS # ───────────────────────────────────────────── def compute_auto_eps(image_path, base_eps=80, reference_width=750): image = cv2.imread(image_path) if image is None: return base_eps img_w = image.shape[1] scaled = base_eps * (img_w / reference_width) print(f" ℹ️ Image width: {img_w}px → auto eps: {scaled:.1f}px") return scaled # ───────────────────────────────────────────── # OCR QUALITY SCORE # ───────────────────────────────────────────── def ocr_quality_score(text): if not text or len(text) < 2: return 0.0 alpha_ratio = sum(1 for c in text if c.isalpha()) / len(text) garbage = [r",,", r"\.\.-", r"[^\w\s\'\!\?\.,-]{2,}"] penalty = sum(0.2 for p in garbage if re.search(p, text)) return max(0.0, min(1.0, alpha_ratio - penalty)) # ───────────────────────────────────────────── # BUBBLE JSON EXPORT # ───────────────────────────────────────────── def export_bubble_boxes(bbox_dict, ocr_quads_dict, filepath="bubbles.json"): export = {} for bubble_id, (x1, y1, x2, y2) in bbox_dict.items(): quads = ocr_quads_dict.get(bubble_id, []) export[str(bubble_id)] = { "x" : int(x1), "y" : int(y1), "w" : int(x2 - x1), "h" : int(y2 - y1), "quads": [[[int(pt[0]), int(pt[1])] for pt in quad] for quad in quads], } with open(filepath, "w", encoding="utf-8") as f: json.dump(export, f, indent=2, ensure_ascii=False) print(f"\n📦 Bubble boxes saved → {filepath}") for bid, v in export.items(): print(f" #{bid}: ({v['x']},{v['y']}) " f"{v['w']}×{v['h']}px " f"[{len(v['quads'])} quad(s)]") # ───────────────────────────────────────────── # DEBUG CLUSTER IMAGE # ───────────────────────────────────────────── def save_debug_clusters(image_path, ocr_results, bubble_dict, bbox_dict): image = cv2.imread(image_path) if image is None: return np.random.seed(42) num_bubbles = max(bubble_dict.keys(), default=1) colors = [ tuple(int(c) for c in col) for col in np.random.randint( 50, 230, size=(num_bubbles + 2, 3)) ] text_to_bubble = {} for bubble_id, lines in bubble_dict.items(): for line in lines: text_to_bubble[line] = bubble_id for bbox, text, _ in ocr_results: bubble_id = text_to_bubble.get(text, 0) color = colors[(bubble_id - 1) % len(colors)] pts = np.array(bbox, dtype=np.int32) cv2.polylines(image, [pts], isClosed=True, color=color, thickness=1) for bubble_id, (x1, y1, x2, y2) in bbox_dict.items(): color = colors[(bubble_id - 1) % len(colors)] cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, 2) cv2.putText(image, f"BOX#{bubble_id}", (int(x1) + 2, int(y1) + 16), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) cv2.imwrite("debug_clusters.png", image) print(" 🐛 debug_clusters.png saved") # ───────────────────────────────────────────── # CORE FUNCTION # ───────────────────────────────────────────── def translate_manga_text( image_path, source_lang="it", target_lang="ca", confidence_threshold=0.10, export_to_file=None, export_bubbles_to="bubbles.json", min_text_length=2, cluster_eps="auto", proximity_px=80, filter_sound_effects=True, quality_threshold=0.5, upscale_factor=2.5, bbox_padding=10, debug=False, ): # ── 1. Resolve eps ──────────────────────────────────────────── if cluster_eps == "auto": print("Computing auto eps...") eps = compute_auto_eps(image_path) else: eps = float(cluster_eps) # ── 2. Load full image ──────────────────────────────────────── full_image = cv2.imread(image_path) if full_image is None: print(f"❌ Could not load image: {image_path}") return # ── 3. Initialize OCR ───────────────────────────────────────── print("\nLoading OCR model...") ocr_lang_list = ["en", "es"] if source_lang == "ca" \ else [source_lang] reader = easyocr.Reader(ocr_lang_list) # ── 4. Initialize translator ────────────────────────────────── translator = GoogleTranslator(source=source_lang, target=target_lang) # ── 5. Run OCR ──────────────────────────────────────────────── print(f"\nRunning OCR on: {image_path}") results = reader.readtext(image_path, paragraph=False) print(f" Raw detections: {len(results)}") # ── 6. Filter tokens ────────────────────────────────────────── filtered = [] skipped = 0 for bbox, text, confidence in results: cleaned = text.strip() keep, category = should_keep_token( cleaned, confidence, confidence_threshold, min_text_length, filter_sound_effects ) if keep: filtered.append((bbox, cleaned, confidence)) if category == "punct": print(f" ✔ Punct kept: '{cleaned}'") else: if is_sound_effect(cleaned): print(f" 🔇 SFX skipped: '{cleaned}'") skipped += 1 print(f" ✅ {len(filtered)} kept, {skipped} skipped.\n") if not filtered: print("⚠️ No text detected after filtering.") return # ── 7. Cluster + merge ──────────────────────────────────────── print(f"Clustering (eps={eps:.1f}px, " f"proximity={proximity_px}px, " f"bbox_padding={bbox_padding}px)...") bubble_dict, bbox_dict, ocr_quads = cluster_into_bubbles( filtered, image_shape = full_image.shape, eps = eps, proximity_px = proximity_px, bbox_padding = bbox_padding, ) print(f" ✅ {len(bubble_dict)} bubble(s) after merge.\n") # ── 8. Debug clusters ───────────────────────────────────────── if debug: save_debug_clusters(image_path, filtered, bubble_dict, bbox_dict) # ── 9. Fix hyphens ──────────────────────────────────────────── clean_bubbles = { i: fix_hyphens(lines) for i, lines in bubble_dict.items() if lines } # ── 10. Quality check + crop re-read ────────────────────────── print("Checking OCR quality per bubble...") for i, text in clean_bubbles.items(): score = ocr_quality_score(text) status = "✅" if score >= quality_threshold else "🔁" print(f" #{i}: score={score:.2f} {status} " f"'{text[:55]}'") if score < quality_threshold: print(f" → Re-reading #{i} from crop...") reread = reread_cluster_crop( full_image, bbox_dict[i], reader, source_lang, upscale_factor=upscale_factor, ) if reread: print(f" → '{reread}'") clean_bubbles[i] = reread else: print(f" → Nothing found, keeping original.") # ── 11. Translate & print ───────────────────────────────────── print() header = (f"{'BUBBLE':<8} " f"{'ORIGINAL (Italian)':<50} " f"{'TRANSLATED (Catalan)'}") divider = "─" * 105 output_lines = [header, divider] print(header) print(divider) translated_count = 0 for i in sorted(clean_bubbles.keys()): bubble_text = clean_bubbles[i].strip() if not bubble_text: continue try: translated = translator.translate(bubble_text) except Exception as e: translated = f"[Translation error: {e}]" if translated is None: translated = "[No translation returned]" translated_count += 1 line = f"#{i:<7} {bubble_text:<50} {translated}" print(line) output_lines.append(line) output_lines.append(divider) summary = (f"✅ Done! {translated_count} bubble(s) " f"translated, {skipped} detection(s) skipped.") output_lines.append(summary) print(divider) print(summary) # ── 12. Export translations ─────────────────────────────────── if export_to_file: with open(export_to_file, "w", encoding="utf-8") as f: f.write("\n".join(output_lines)) print(f"📄 Translations saved → {export_to_file}") # ── 13. Export bubble boxes ─────────────────────────────────── if export_bubbles_to: export_bubble_boxes(bbox_dict, ocr_quads, filepath=export_bubbles_to) # ───────────────────────────────────────────── # HELPER # ───────────────────────────────────────────── def list_languages(): print(f"\n{'LANGUAGE':<30} {'CODE'}") print("─" * 40) for name, code in SUPPORTED_LANGUAGES.items(): print(f"{name:<30} {code}") print("─" * 40) # ───────────────────────────────────────────── # ENTRY POINT # ───────────────────────────────────────────── if __name__ == "__main__": translate_manga_text( image_path = "page.png", source_lang = "it", target_lang = "ca", confidence_threshold = 0.10, min_text_length = 2, export_to_file = "output.txt", export_bubbles_to = "bubbles.json", cluster_eps = "auto", proximity_px = 80, filter_sound_effects = True, quality_threshold = 0.5, upscale_factor = 2.5, bbox_padding = 5, debug = True, )