From 526272fe30522534f3b6025116933bfeb695ef54 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Mon, 13 Apr 2026 17:40:19 +0200 Subject: [PATCH] fix(sync): sanitize visible tweet URLs by resolving t.co links and removing concatenated duplicates 2 --- twitter2bsky_daemon.py | 170 +++++++++++++++++++++++++---------------- 1 file changed, 104 insertions(+), 66 deletions(-) diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index 299b110..2a43dc6 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -96,10 +96,6 @@ def strip_trailing_url_punctuation(url): def split_concatenated_urls(text): - """ - Insert whitespace between concatenated URLs like: - https://t.co/aaahttps://t.co/bbb - """ if not text: return text @@ -290,6 +286,23 @@ def canonicalize_url(url): return strip_trailing_url_punctuation(url.strip()) +def normalize_urlish_token(token): + if not token: + return None + + token = strip_trailing_url_punctuation(token.strip()) + if not token: + return None + + if token.startswith(("http://", "https://")): + return token + + if token.startswith("www."): + return f"https://{token}" + + return None + + def canonicalize_tweet_url(url): if not url: return None @@ -306,7 +319,8 @@ def canonicalize_tweet_url(url): def is_x_or_twitter_domain(url): try: - hostname = (urlparse(url).hostname or "").lower() + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} except Exception: return False @@ -314,7 +328,8 @@ def is_x_or_twitter_domain(url): def is_tco_domain(url): try: - hostname = (urlparse(url).hostname or "").lower() + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() return hostname == "t.co" except Exception: return False @@ -331,7 +346,9 @@ def extract_urls_from_text(text): return [] repaired = repair_broken_urls(text) - return re.findall(r"https?://[^\s#]+", repaired) + + pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + return re.findall(pattern, repaired) def extract_quoted_text_from_og_title(og_title): @@ -510,7 +527,8 @@ def resolve_url_if_needed(url, http_client): if not url: return None - cleaned = canonicalize_url(url) + normalized = normalize_urlish_token(url) or url + cleaned = canonicalize_url(normalized) if not cleaned: return None @@ -537,7 +555,8 @@ def extract_non_x_urls_from_text(text): result = [] for url in urls: - cleaned = strip_trailing_url_punctuation(url) + normalized = normalize_urlish_token(url) + cleaned = strip_trailing_url_punctuation(normalized or url) if not cleaned: continue @@ -587,71 +606,94 @@ def extract_first_resolved_external_url(text, http_client): def sanitize_visible_urls_in_text(text, http_client): """ - Resolve visible t.co URLs in the text, split malformed concatenations, - and deduplicate repeated URLs. + Resolve visible t.co URLs in the text, remove x.com/twitter.com URLs from + visible text, normalize www. URLs, and deduplicate repeated external URLs. """ if not text: return text, None working = clean_post_text(text) - urls = extract_urls_from_text(working) + url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + urls = re.findall(url_pattern, working) if not urls: return working, None replacements = {} first_external_resolved = None - seen_final_urls = set() + seen_external_per_line = set() for raw_url in urls: - cleaned = canonicalize_url(raw_url) + normalized = normalize_urlish_token(raw_url) + cleaned = canonicalize_url(normalized or raw_url) if not cleaned: continue + if is_x_or_twitter_domain(cleaned): + replacements[raw_url] = "" + logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}") + continue + final_url = cleaned if is_tco_domain(cleaned): resolved = resolve_url_if_needed(cleaned, http_client) if resolved: final_url = resolved + if is_x_or_twitter_domain(final_url): + replacements[raw_url] = "" + logging.info(f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}") + continue + + if normalized and normalized.startswith("https://www."): + final_url = normalized + elif normalized and normalized.startswith("http://www."): + final_url = normalized + if is_external_non_x_url(final_url) and not first_external_resolved: first_external_resolved = final_url - replacements[cleaned] = final_url + replacements[raw_url] = final_url def replace_match(match): raw = match.group(0) - cleaned = canonicalize_url(raw) - replacement = replacements.get(cleaned, raw) - return replacement + return replacements.get(raw, raw) - working = re.sub(r"https?://[^\s#]+", replace_match, working) + working = re.sub(url_pattern, replace_match, working) - # Deduplicate same visible URL repeated back to back or multiple times. deduped_lines = [] for line in working.splitlines(): - line_urls = re.findall(r"https?://[^\s#]+", line) + line_urls = re.findall(url_pattern, line) if len(line_urls) > 1: - rebuilt = line - unique_urls = [] - for url in line_urls: - c = canonicalize_url(url) - if c and c not in seen_final_urls: - unique_urls.append(url) - seen_final_urls.add(c) + prefix = re.sub(url_pattern, "", line).strip() + kept_urls = [] - if unique_urls: - prefix = re.sub(r"https?://[^\s#]+", "", line).strip() - if prefix: - rebuilt = prefix + " " + " ".join(unique_urls) - else: - rebuilt = " ".join(unique_urls) + seen_external_per_line.clear() + for url in line_urls: + normalized = normalize_urlish_token(url) or url + canonical = canonicalize_url(normalized) + + if not canonical: + continue + if is_x_or_twitter_domain(canonical): + continue + if canonical in seen_external_per_line: + continue + + seen_external_per_line.add(canonical) + kept_urls.append(url) + + if prefix and kept_urls: + rebuilt = prefix + " " + " ".join(kept_urls) + elif prefix: + rebuilt = prefix else: - rebuilt = re.sub(r"https?://[^\s#]+", "", line).strip() + rebuilt = " ".join(kept_urls) deduped_lines.append(rebuilt.strip()) else: - deduped_lines.append(line.strip()) + cleaned_line = re.sub(r"\s{2,}", " ", line).strip() + deduped_lines.append(cleaned_line) working = "\n".join(deduped_lines) working = re.sub(r"[ \t]+", " ", working) @@ -698,7 +740,8 @@ def remove_url_from_visible_text(text, url_to_remove): new_line = line for url in line_urls: - cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(url)) + normalized = normalize_urlish_token(url) or url + cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(normalized)) if cleaned_candidate == canonical_target: pattern = re.escape(url) new_line = re.sub(pattern, "", new_line) @@ -727,7 +770,7 @@ def looks_like_title_plus_url_post(text): urls_in_last_line = extract_ordered_non_x_urls(last_line) total_urls = extract_ordered_non_x_urls(repaired) - return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://")) + return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://", "www.")) def looks_like_url_and_tag_tail(text, primary_non_x_url=None): @@ -740,10 +783,10 @@ def looks_like_url_and_tag_tail(text, primary_non_x_url=None): return False tail = repaired[idx:].strip() - if not tail.startswith(("http://", "https://")): + if not tail.startswith(("http://", "https://", "www.")): return False - if re.search(r"https?://\S+.*#[^\s#]+", tail): + if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail): return True return False @@ -1147,7 +1190,7 @@ def get_recent_bsky_posts(client, handle, limit=30): canonical_non_x_urls = set() for url in urls: if not is_tco_domain(url) and not is_x_or_twitter_domain(url): - canonical = canonicalize_url(url) + canonical = canonicalize_url(normalize_urlish_token(url) or url) if canonical: canonical_non_x_urls.add(canonical) @@ -1567,20 +1610,22 @@ def make_rich(content): continue cleaned_word = strip_trailing_url_punctuation(word) + normalized_candidate = normalize_urlish_token(cleaned_word) - if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"): - if cleaned_word.startswith("http://"): - cleaned_word = cleaned_word.replace("http://", "https://", 1) - - clean_url_value = clean_url(cleaned_word) - - if clean_url_value and is_valid_url(clean_url_value): - text_builder.link(clean_url_value, clean_url_value) - trailing = word[len(cleaned_word):] - if trailing: - text_builder.text(trailing) - else: + if normalized_candidate: + if is_x_or_twitter_domain(normalized_candidate): text_builder.text(word) + else: + clean_url_value = clean_url(normalized_candidate) + + if clean_url_value and is_valid_url(clean_url_value): + display_text = cleaned_word + text_builder.link(display_text, clean_url_value) + trailing = word[len(cleaned_word):] + if trailing: + text_builder.text(trailing) + else: + text_builder.text(word) elif cleaned_word.startswith("#") and len(cleaned_word) > 1: clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"…") @@ -1607,7 +1652,7 @@ def make_rich(content): def build_dynamic_alt(raw_text): dynamic_alt = clean_post_text(raw_text) dynamic_alt = dynamic_alt.replace("\n", " ").strip() - dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() + dynamic_alt = re.sub(r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt).strip() if len(dynamic_alt) > 150: dynamic_alt = dynamic_alt[:147] + "..." @@ -2115,18 +2160,11 @@ def sync_feeds(args): has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or [])) has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or [])) - if primary_non_x_url and not has_video and not has_photo: - raw_text = choose_final_visible_text( - full_clean_text, - primary_non_x_url=primary_non_x_url, - prefer_full_text_without_url=False, - ) - else: - raw_text = choose_final_visible_text( - full_clean_text, - primary_non_x_url=primary_non_x_url, - prefer_full_text_without_url=False, - ) + raw_text = choose_final_visible_text( + full_clean_text, + primary_non_x_url=primary_non_x_url, + prefer_full_text_without_url=False, + ) media_fingerprint = build_media_fingerprint(tweet) text_media_key = build_text_media_key(normalized_text, media_fingerprint)