import re import os import json import cv2 import numpy as np import easyocr from deep_translator import GoogleTranslator from sklearn.cluster import DBSCAN # ───────────────────────────────────────────── # LANGUAGE CODE REFERENCE # ───────────────────────────────────────────── SUPPORTED_LANGUAGES = { "Vietnamese" : "vi", "Japanese" : "ja", "English" : "en", "Spanish" : "es", "Korean" : "ko", "Chinese (Simplified)" : "ch_sim", "Chinese (Traditional)": "ch_tra", "French" : "fr", "German" : "de", "Italian" : "it", "Portuguese" : "pt", "Arabic" : "ar", "Russian" : "ru", "Thai" : "th", "Catalan" : "ca", } # ───────────────────────────────────────────── # SOUND EFFECT FILTER # ───────────────────────────────────────────── SOUND_EFFECT_PATTERNS = [ r"^b+i+p+$", r"^sha+$", r"^ha+$", r"^ah+$", r"^oh+$", r"^ugh+$", r"^gr+$", r"^bam+$", r"^pow+$", r"^crash+$", r"^boom+$", r"^bang+$", r"^crack+$", r"^whoosh+$", r"^thud+$", r"^snap+$", r"^zip+$", r"^swoosh+$", ] def is_sound_effect(text): cleaned = re.sub(r"[^a-z]", "", text.strip().lower()) return any(re.fullmatch(p, cleaned, re.IGNORECASE) for p in SOUND_EFFECT_PATTERNS) # ───────────────────────────────────────────── # TOKEN FILTER # ───────────────────────────────────────────── def should_keep_token(text, confidence, confidence_threshold, min_text_length, filter_sound_effects): """ Returns (keep: bool, reason: str). Rules: 1. Drop if confidence below threshold 2. Drop if shorter than min_text_length 3. Drop pure digit strings 4. Drop single non-alpha characters 5. Drop sound effects if filter enabled 6. Keep everything else """ cleaned = text.strip() if confidence < confidence_threshold: return False, f"low confidence ({confidence:.2f})" if len(cleaned) < min_text_length: return False, "too short" if re.fullmatch(r"\d+", cleaned): return False, "pure digits" if len(cleaned) == 1 and not cleaned.isalpha(): return False, "single symbol" if filter_sound_effects and is_sound_effect(cleaned): return False, "sound effect" return True, "ok" # ───────────────────────────────────────────── # BOUNDING BOX # # Rules (match the red square exactly): # Width = widest single quad's width # Height = sum of ALL quad heights stacked # X = centered on the widest quad's CX # Y = topmost Y1 of all quads # ───────────────────────────────────────────── def get_cluster_bbox_from_ocr(ocr_bboxes, image_shape, padding_px=10): """ Computes the bubble erase bbox: 1. Per-quad: measure w, h, cx for every OCR detection 2. Width = width of the widest single quad 3. Height = sum of every quad's height 4. X = widest quad's center ± max_w/2 (all lines sit symmetrically inside) 5. Y = top of topmost quad, bottom = Y + total_h Args: ocr_bboxes : List of EasyOCR quad bboxes image_shape : (height, width) for clamping padding_px : Expansion on each side (default: 10) Returns: (x1, y1, x2, y2) clamped to image bounds """ img_h, img_w = image_shape[:2] if not ocr_bboxes: return 0, 0, 0, 0 # ── Per-quad metrics ────────────────────────────────────────── quad_metrics = [] for quad in ocr_bboxes: xs = [pt[0] for pt in quad] ys = [pt[1] for pt in quad] qx1, qx2 = min(xs), max(xs) qy1, qy2 = min(ys), max(ys) quad_metrics.append({ "x1" : qx1, "x2" : qx2, "y1" : qy1, "y2" : qy2, "w" : qx2 - qx1, "h" : qy2 - qy1, "cx" : (qx1 + qx2) / 2.0, }) # ── Width: widest single quad ───────────────────────────────── widest = max(quad_metrics, key=lambda q: q["w"]) max_w = widest["w"] center_x = widest["cx"] # ── Height: sum of all quad heights ────────────────────────── total_h = sum(q["h"] for q in quad_metrics) # ── Box edges ───────────────────────────────────────────────── box_x1 = center_x - max_w / 2.0 box_x2 = center_x + max_w / 2.0 box_y1 = min(q["y1"] for q in quad_metrics) box_y2 = box_y1 + total_h # ── Padding + clamp ─────────────────────────────────────────── x1 = max(0, box_x1 - padding_px) y1 = max(0, box_y1 - padding_px) x2 = min(img_w, box_x2 + padding_px) y2 = min(img_h, box_y2 + padding_px) return x1, y1, x2, y2 def get_cluster_bbox(items): """Fallback center-point bbox — used only during merge step.""" half = 30 x1 = min(cx for _, cx, _ in items) - half y1 = min(cy for cy, _, _ in items) - half x2 = max(cx for _, cx, _ in items) + half y2 = max(cy for cy, _, _ in items) + half return x1, y1, x2, y2 def boxes_are_close(bbox_a, bbox_b, proximity_px=80): ax1, ay1, ax2, ay2 = bbox_a bx1, by1, bx2, by2 = bbox_b ax1 -= proximity_px; ay1 -= proximity_px ax2 += proximity_px; ay2 += proximity_px return not (ax2 < bx1 or bx2 < ax1 or ay2 < by1 or by2 < ay1) # ───────────────────────────────────────────── # TEXT LINE FILTER # ───────────────────────────────────────────── def has_translatable_content(text): """ True if text contains at least one letter. ch.isalpha() handles È, é, ñ, ü etc. """ return any(ch.isalpha() for ch in text) # ───────────────────────────────────────────── # POST-CLUSTER MERGE (Union-Find) # ───────────────────────────────────────────── def merge_nearby_clusters(raw_clusters, raw_quads, proximity_px=80): labels = list(raw_clusters.keys()) bboxes = {lbl: get_cluster_bbox(raw_clusters[lbl]) for lbl in labels} parent = {lbl: lbl for lbl in labels} def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(x, y): parent[find(x)] = find(y) for i in range(len(labels)): for j in range(i + 1, len(labels)): a, b = labels[i], labels[j] if boxes_are_close(bboxes[a], bboxes[b], proximity_px): union(a, b) merged_clusters = {} merged_quads = {} for lbl in labels: root = find(lbl) merged_clusters.setdefault(root, []) merged_quads.setdefault(root, []) merged_clusters[root].extend(raw_clusters[lbl]) merged_quads[root].extend(raw_quads[lbl]) return merged_clusters, merged_quads # ───────────────────────────────────────────── # CROP-BASED OCR RE-READ # ───────────────────────────────────────────── def reread_cluster_crop(image, bbox, reader, source_lang, padding_px=20, upscale_factor=2.5): img_h, img_w = image.shape[:2] x1, y1, x2, y2 = bbox x1 = max(0, int(x1) - padding_px) y1 = max(0, int(y1) - padding_px) x2 = min(img_w, int(x2) + padding_px) y2 = min(img_h, int(y2) + padding_px) crop = image[y1:y2, x1:x2] if crop.size == 0: return None new_w = int(crop.shape[1] * upscale_factor) new_h = int(crop.shape[0] * upscale_factor) upscaled = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC) kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) sharpened = cv2.filter2D(upscaled, -1, kernel) temp_path = "_temp_crop_ocr.png" cv2.imwrite(temp_path, sharpened) try: crop_results = reader.readtext(temp_path, paragraph=False) finally: if os.path.exists(temp_path): os.remove(temp_path) if not crop_results: return None crop_results.sort(key=lambda r: r[0][0][1]) lines = [t.strip() for _, t, _ in crop_results if t.strip()] return fix_hyphens(lines) if lines else None # ───────────────────────────────────────────── # DBSCAN BUBBLE CLUSTERING # ───────────────────────────────────────────── def cluster_into_bubbles(ocr_results, image_shape, eps=80, min_samples=1, proximity_px=80, bbox_padding=10): """ Two-pass clustering: Pass 1 — DBSCAN on center points Pass 2 — Bounding-box proximity merge Bbox: widest-line width (centered) × stacked height. All quads contribute to bbox regardless of content. Returns: bubble_dict : cluster_id → list of translatable text lines bbox_dict : cluster_id → (x1, y1, x2, y2) ocr_quads : cluster_id → list of ALL raw EasyOCR quads """ if not ocr_results: return {}, {}, {} centers = [] for bbox, text, confidence in ocr_results: xs = [pt[0] for pt in bbox] ys = [pt[1] for pt in bbox] centers.append([sum(xs) / 4, sum(ys) / 4]) centers_array = np.array(centers, dtype=np.float32) db = DBSCAN(eps=eps, min_samples=min_samples, metric="euclidean") labels = db.fit_predict(centers_array) raw_clusters = {} raw_quads = {} noise_counter = int(max(labels, default=0)) + 1 for idx, label in enumerate(labels): if label == -1: label = noise_counter noise_counter += 1 raw_clusters.setdefault(label, []) raw_quads.setdefault(label, []) bbox, text, _ = ocr_results[idx] raw_clusters[label].append( (centers[idx][1], centers[idx][0], text)) raw_quads[label].append(bbox) print(f" DBSCAN pass: {len(raw_clusters)} cluster(s)") merged_clusters, merged_quads = merge_nearby_clusters( raw_clusters, raw_quads, proximity_px=proximity_px ) print(f" After merge: {len(merged_clusters)} cluster(s)") row_band_px = 150 def cluster_sort_key(items): return (min(cy for cy, cx, _ in items) // row_band_px, min(cx for cy, cx, _ in items)) sorted_labels = sorted( merged_clusters.keys(), key=lambda lbl: cluster_sort_key(merged_clusters[lbl]) ) bubble_dict = {} bbox_dict = {} ocr_quads = {} for i, lbl in enumerate(sorted_labels, start=1): items = merged_clusters[lbl] quads = merged_quads[lbl] items_sorted = sorted(items, key=lambda t: t[0]) text_lines = [ text for _, _, text in items_sorted if has_translatable_content(text) ] if not text_lines: text_lines = [text for _, _, text in items_sorted] bubble_dict[i] = text_lines ocr_quads[i] = quads bbox_dict[i] = get_cluster_bbox_from_ocr( quads, image_shape, padding_px=bbox_padding ) b = bbox_dict[i] print(f" Cluster #{i}: {len(quads)} quad(s) " f"bbox=({int(b[0])},{int(b[1])})→" f"({int(b[2])},{int(b[3])}) " f"w={int(b[2]-b[0])} h={int(b[3]-b[1])}") return bubble_dict, bbox_dict, ocr_quads # ───────────────────────────────────────────── # HYPHEN REMOVAL # ───────────────────────────────────────────── def fix_hyphens(lines): if not lines: return "" merged = lines[0] for line in lines[1:]: line = line.strip() merged = (merged[:-1] + line if merged.endswith("-") else merged + " " + line) return re.sub(r" {2,}", " ", merged).strip() # ───────────────────────────────────────────── # AUTO EPS # ───────────────────────────────────────────── def compute_auto_eps(image_path, base_eps=80, reference_width=750): image = cv2.imread(image_path) if image is None: return base_eps img_w = image.shape[1] scaled = base_eps * (img_w / reference_width) print(f" ℹ️ Image width: {img_w}px → auto eps: {scaled:.1f}px") return scaled # ───────────────────────────────────────────── # OCR QUALITY SCORE # ───────────────────────────────────────────── def ocr_quality_score(text): if not text or len(text) < 2: return 0.0 alpha_ratio = sum(1 for c in text if c.isalpha()) / len(text) garbage = [r",,", r"\.\.-", r"[^\w\s\'\!\?\.,-]{2,}"] penalty = sum(0.2 for p in garbage if re.search(p, text)) return max(0.0, min(1.0, alpha_ratio - penalty)) # ───────────────────────────────────────────── # BUBBLE JSON EXPORT # ───────────────────────────────────────────── def export_bubble_boxes(bbox_dict, ocr_quads_dict, filepath="bubbles.json"): export = {} for bubble_id, (x1, y1, x2, y2) in bbox_dict.items(): quads = ocr_quads_dict.get(bubble_id, []) export[str(bubble_id)] = { "x" : int(x1), "y" : int(y1), "w" : int(x2 - x1), "h" : int(y2 - y1), "quads": [[[int(pt[0]), int(pt[1])] for pt in quad] for quad in quads], } with open(filepath, "w", encoding="utf-8") as f: json.dump(export, f, indent=2, ensure_ascii=False) print(f"\n📦 Bubble boxes saved → {filepath}") for bid, v in export.items(): print(f" #{bid}: ({v['x']},{v['y']}) " f"{v['w']}×{v['h']}px [{len(v['quads'])} quad(s)]") # ───────────────────────────────────────────── # DEBUG CLUSTER IMAGE # ───────────────────────────────────────────── def save_debug_clusters(image_path, ocr_results, bubble_dict, bbox_dict): image = cv2.imread(image_path) if image is None: return np.random.seed(42) num_bubbles = max(bubble_dict.keys(), default=1) colors = [ tuple(int(c) for c in col) for col in np.random.randint(50, 230, size=(num_bubbles + 2, 3)) ] text_to_bubble = {} for bubble_id, lines in bubble_dict.items(): for line in lines: text_to_bubble[line] = bubble_id for bbox, text, _ in ocr_results: bubble_id = text_to_bubble.get(text, 0) color = colors[(bubble_id - 1) % len(colors)] pts = np.array(bbox, dtype=np.int32) cv2.polylines(image, [pts], isClosed=True, color=color, thickness=1) for bubble_id, (x1, y1, x2, y2) in bbox_dict.items(): color = colors[(bubble_id - 1) % len(colors)] cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, 2) cv2.putText(image, f"BOX#{bubble_id}", (int(x1) + 2, int(y1) + 16), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) cv2.imwrite("debug_clusters.png", image) print(" 🐛 debug_clusters.png saved") # ───────────────────────────────────────────── # CORE FUNCTION # ───────────────────────────────────────────── def translate_manga_text( image_path, source_lang="it", target_lang="ca", confidence_threshold=0.10, export_to_file=None, export_bubbles_to="bubbles.json", min_text_length=2, cluster_eps="auto", proximity_px=80, filter_sound_effects=True, quality_threshold=0.5, upscale_factor=2.5, bbox_padding=10, debug=False, ): # ── 1. Resolve eps ──────────────────────────────────────────────────────── if cluster_eps == "auto": print("Computing auto eps...") eps = compute_auto_eps(image_path) else: eps = float(cluster_eps) # ── 2. Load full image ──────────────────────────────────────────────────── full_image = cv2.imread(image_path) if full_image is None: print(f"❌ Could not load image: {image_path}") return # ── 3. Initialize OCR ───────────────────────────────────────────────────── print("\nLoading OCR model...") ocr_lang_list = ["en", "es"] if source_lang == "ca" \ else [source_lang] reader = easyocr.Reader(ocr_lang_list) # ── 4. Initialize translator ────────────────────────────────────────────── translator = GoogleTranslator(source=source_lang, target=target_lang) # ── 5. Run OCR ──────────────────────────────────────────────────────────── print(f"\nRunning OCR on: {image_path}") results = reader.readtext(image_path, paragraph=False) print(f" Raw detections: {len(results)}") # ── 6. Filter ───────────────────────────────────────────────────────────── filtered = [] skipped = 0 for bbox, text, confidence in results: cleaned = text.strip() keep, reason = should_keep_token( cleaned, confidence, confidence_threshold, min_text_length, filter_sound_effects ) if keep: filtered.append((bbox, cleaned, confidence)) else: if reason == "sound effect": print(f" 🔇 SFX skipped: '{cleaned}'") skipped += 1 print(f" ✅ {len(filtered)} kept, {skipped} skipped.\n") if not filtered: print("⚠️ No text detected after filtering.") return # ── 7. Cluster + merge ──────────────────────────────────────────────────── print(f"Clustering (eps={eps:.1f}px, " f"proximity={proximity_px}px, " f"bbox_padding={bbox_padding}px)...") bubble_dict, bbox_dict, ocr_quads = cluster_into_bubbles( filtered, image_shape = full_image.shape, eps = eps, proximity_px = proximity_px, bbox_padding = bbox_padding, ) print(f" ✅ {len(bubble_dict)} bubble(s) after merge.\n") # ── 8. Debug ────────────────────────────────────────────────────────────── if debug: save_debug_clusters(image_path, filtered, bubble_dict, bbox_dict) # ── 9. Fix hyphens ──────────────────────────────────────────────────────── clean_bubbles = { i: fix_hyphens(lines) for i, lines in bubble_dict.items() if lines } # ── 10. Quality check + crop re-read ────────────────────────────────────── print("Checking OCR quality per bubble...") for i, text in clean_bubbles.items(): score = ocr_quality_score(text) status = "✅" if score >= quality_threshold else "🔁" print(f" #{i}: score={score:.2f} {status} '{text[:55]}'") if score < quality_threshold: print(f" → Re-reading #{i} from crop...") reread = reread_cluster_crop( full_image, bbox_dict[i], reader, source_lang, upscale_factor=upscale_factor, ) if reread: print(f" → '{reread}'") clean_bubbles[i] = reread else: print(f" → Nothing found, keeping original.") # ── 11. Translate & print ───────────────────────────────────────────────── print() header = (f"{'BUBBLE':<8} " f"{'ORIGINAL (Italian)':<50} " f"{'TRANSLATED (Catalan)'}") divider = "─" * 105 output_lines = [header, divider] print(header) print(divider) translated_count = 0 for i in sorted(clean_bubbles.keys()): bubble_text = clean_bubbles[i].strip() if not bubble_text: continue try: translated = translator.translate(bubble_text) except Exception as e: translated = f"[Translation error: {e}]" if translated is None: translated = "[No translation returned]" translated_count += 1 line = f"#{i:<7} {bubble_text:<50} {translated}" print(line) output_lines.append(line) output_lines.append(divider) summary = (f"✅ Done! {translated_count} bubble(s) translated, " f"{skipped} detection(s) skipped.") output_lines.append(summary) print(divider) print(summary) # ── 12. Export translations ─────────────────────────────────────────────── if export_to_file: with open(export_to_file, "w", encoding="utf-8") as f: f.write("\n".join(output_lines)) print(f"📄 Translations saved → {export_to_file}") # ── 13. Export bubble boxes ─────────────────────────────────────────────── if export_bubbles_to: export_bubble_boxes(bbox_dict, ocr_quads, filepath=export_bubbles_to) # ───────────────────────────────────────────── # HELPER # ───────────────────────────────────────────── def list_languages(): print(f"\n{'LANGUAGE':<30} {'CODE'}") print("─" * 40) for name, code in SUPPORTED_LANGUAGES.items(): print(f"{name:<30} {code}") print("─" * 40) # ───────────────────────────────────────────── # ENTRY POINT # ───────────────────────────────────────────── if __name__ == "__main__": translate_manga_text( image_path = "page.png", source_lang = "it", target_lang = "ca", confidence_threshold = 0.10, min_text_length = 2, export_to_file = "output.txt", export_bubbles_to = "bubbles.json", cluster_eps = "auto", proximity_px = 80, filter_sound_effects = True, quality_threshold = 0.5, upscale_factor = 2.5, bbox_padding = 0, debug = True, )