fix(sync): sanitize visible tweet URLs by resolving t.co links and removing concatenated duplicates 2

This commit is contained in:
Guillem Hernandez Sola
2026-04-13 17:40:19 +02:00
parent ba313787b6
commit 526272fe30

View File

@@ -96,10 +96,6 @@ def strip_trailing_url_punctuation(url):
def split_concatenated_urls(text): def split_concatenated_urls(text):
"""
Insert whitespace between concatenated URLs like:
https://t.co/aaahttps://t.co/bbb
"""
if not text: if not text:
return text return text
@@ -290,6 +286,23 @@ def canonicalize_url(url):
return strip_trailing_url_punctuation(url.strip()) return strip_trailing_url_punctuation(url.strip())
def normalize_urlish_token(token):
if not token:
return None
token = strip_trailing_url_punctuation(token.strip())
if not token:
return None
if token.startswith(("http://", "https://")):
return token
if token.startswith("www."):
return f"https://{token}"
return None
def canonicalize_tweet_url(url): def canonicalize_tweet_url(url):
if not url: if not url:
return None return None
@@ -306,7 +319,8 @@ def canonicalize_tweet_url(url):
def is_x_or_twitter_domain(url): def is_x_or_twitter_domain(url):
try: try:
hostname = (urlparse(url).hostname or "").lower() normalized = normalize_urlish_token(url) or url
hostname = (urlparse(normalized).hostname or "").lower()
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"}
except Exception: except Exception:
return False return False
@@ -314,7 +328,8 @@ def is_x_or_twitter_domain(url):
def is_tco_domain(url): def is_tco_domain(url):
try: try:
hostname = (urlparse(url).hostname or "").lower() normalized = normalize_urlish_token(url) or url
hostname = (urlparse(normalized).hostname or "").lower()
return hostname == "t.co" return hostname == "t.co"
except Exception: except Exception:
return False return False
@@ -331,7 +346,9 @@ def extract_urls_from_text(text):
return [] return []
repaired = repair_broken_urls(text) repaired = repair_broken_urls(text)
return re.findall(r"https?://[^\s#]+", repaired)
pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+'
return re.findall(pattern, repaired)
def extract_quoted_text_from_og_title(og_title): def extract_quoted_text_from_og_title(og_title):
@@ -510,7 +527,8 @@ def resolve_url_if_needed(url, http_client):
if not url: if not url:
return None return None
cleaned = canonicalize_url(url) normalized = normalize_urlish_token(url) or url
cleaned = canonicalize_url(normalized)
if not cleaned: if not cleaned:
return None return None
@@ -537,7 +555,8 @@ def extract_non_x_urls_from_text(text):
result = [] result = []
for url in urls: for url in urls:
cleaned = strip_trailing_url_punctuation(url) normalized = normalize_urlish_token(url)
cleaned = strip_trailing_url_punctuation(normalized or url)
if not cleaned: if not cleaned:
continue continue
@@ -587,71 +606,94 @@ def extract_first_resolved_external_url(text, http_client):
def sanitize_visible_urls_in_text(text, http_client): def sanitize_visible_urls_in_text(text, http_client):
""" """
Resolve visible t.co URLs in the text, split malformed concatenations, Resolve visible t.co URLs in the text, remove x.com/twitter.com URLs from
and deduplicate repeated URLs. visible text, normalize www. URLs, and deduplicate repeated external URLs.
""" """
if not text: if not text:
return text, None return text, None
working = clean_post_text(text) working = clean_post_text(text)
urls = extract_urls_from_text(working) url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+'
urls = re.findall(url_pattern, working)
if not urls: if not urls:
return working, None return working, None
replacements = {} replacements = {}
first_external_resolved = None first_external_resolved = None
seen_final_urls = set() seen_external_per_line = set()
for raw_url in urls: for raw_url in urls:
cleaned = canonicalize_url(raw_url) normalized = normalize_urlish_token(raw_url)
cleaned = canonicalize_url(normalized or raw_url)
if not cleaned: if not cleaned:
continue continue
if is_x_or_twitter_domain(cleaned):
replacements[raw_url] = ""
logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}")
continue
final_url = cleaned final_url = cleaned
if is_tco_domain(cleaned): if is_tco_domain(cleaned):
resolved = resolve_url_if_needed(cleaned, http_client) resolved = resolve_url_if_needed(cleaned, http_client)
if resolved: if resolved:
final_url = resolved final_url = resolved
if is_x_or_twitter_domain(final_url):
replacements[raw_url] = ""
logging.info(f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}")
continue
if normalized and normalized.startswith("https://www."):
final_url = normalized
elif normalized and normalized.startswith("http://www."):
final_url = normalized
if is_external_non_x_url(final_url) and not first_external_resolved: if is_external_non_x_url(final_url) and not first_external_resolved:
first_external_resolved = final_url first_external_resolved = final_url
replacements[cleaned] = final_url replacements[raw_url] = final_url
def replace_match(match): def replace_match(match):
raw = match.group(0) raw = match.group(0)
cleaned = canonicalize_url(raw) return replacements.get(raw, raw)
replacement = replacements.get(cleaned, raw)
return replacement
working = re.sub(r"https?://[^\s#]+", replace_match, working) working = re.sub(url_pattern, replace_match, working)
# Deduplicate same visible URL repeated back to back or multiple times.
deduped_lines = [] deduped_lines = []
for line in working.splitlines(): for line in working.splitlines():
line_urls = re.findall(r"https?://[^\s#]+", line) line_urls = re.findall(url_pattern, line)
if len(line_urls) > 1: if len(line_urls) > 1:
rebuilt = line prefix = re.sub(url_pattern, "", line).strip()
unique_urls = [] kept_urls = []
for url in line_urls:
c = canonicalize_url(url)
if c and c not in seen_final_urls:
unique_urls.append(url)
seen_final_urls.add(c)
if unique_urls: seen_external_per_line.clear()
prefix = re.sub(r"https?://[^\s#]+", "", line).strip() for url in line_urls:
if prefix: normalized = normalize_urlish_token(url) or url
rebuilt = prefix + " " + " ".join(unique_urls) canonical = canonicalize_url(normalized)
else:
rebuilt = " ".join(unique_urls) if not canonical:
continue
if is_x_or_twitter_domain(canonical):
continue
if canonical in seen_external_per_line:
continue
seen_external_per_line.add(canonical)
kept_urls.append(url)
if prefix and kept_urls:
rebuilt = prefix + " " + " ".join(kept_urls)
elif prefix:
rebuilt = prefix
else: else:
rebuilt = re.sub(r"https?://[^\s#]+", "", line).strip() rebuilt = " ".join(kept_urls)
deduped_lines.append(rebuilt.strip()) deduped_lines.append(rebuilt.strip())
else: else:
deduped_lines.append(line.strip()) cleaned_line = re.sub(r"\s{2,}", " ", line).strip()
deduped_lines.append(cleaned_line)
working = "\n".join(deduped_lines) working = "\n".join(deduped_lines)
working = re.sub(r"[ \t]+", " ", working) working = re.sub(r"[ \t]+", " ", working)
@@ -698,7 +740,8 @@ def remove_url_from_visible_text(text, url_to_remove):
new_line = line new_line = line
for url in line_urls: for url in line_urls:
cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(url)) normalized = normalize_urlish_token(url) or url
cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(normalized))
if cleaned_candidate == canonical_target: if cleaned_candidate == canonical_target:
pattern = re.escape(url) pattern = re.escape(url)
new_line = re.sub(pattern, "", new_line) new_line = re.sub(pattern, "", new_line)
@@ -727,7 +770,7 @@ def looks_like_title_plus_url_post(text):
urls_in_last_line = extract_ordered_non_x_urls(last_line) urls_in_last_line = extract_ordered_non_x_urls(last_line)
total_urls = extract_ordered_non_x_urls(repaired) total_urls = extract_ordered_non_x_urls(repaired)
return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://")) return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://", "www."))
def looks_like_url_and_tag_tail(text, primary_non_x_url=None): def looks_like_url_and_tag_tail(text, primary_non_x_url=None):
@@ -740,10 +783,10 @@ def looks_like_url_and_tag_tail(text, primary_non_x_url=None):
return False return False
tail = repaired[idx:].strip() tail = repaired[idx:].strip()
if not tail.startswith(("http://", "https://")): if not tail.startswith(("http://", "https://", "www.")):
return False return False
if re.search(r"https?://\S+.*#[^\s#]+", tail): if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail):
return True return True
return False return False
@@ -1147,7 +1190,7 @@ def get_recent_bsky_posts(client, handle, limit=30):
canonical_non_x_urls = set() canonical_non_x_urls = set()
for url in urls: for url in urls:
if not is_tco_domain(url) and not is_x_or_twitter_domain(url): if not is_tco_domain(url) and not is_x_or_twitter_domain(url):
canonical = canonicalize_url(url) canonical = canonicalize_url(normalize_urlish_token(url) or url)
if canonical: if canonical:
canonical_non_x_urls.add(canonical) canonical_non_x_urls.add(canonical)
@@ -1567,20 +1610,22 @@ def make_rich(content):
continue continue
cleaned_word = strip_trailing_url_punctuation(word) cleaned_word = strip_trailing_url_punctuation(word)
normalized_candidate = normalize_urlish_token(cleaned_word)
if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"): if normalized_candidate:
if cleaned_word.startswith("http://"): if is_x_or_twitter_domain(normalized_candidate):
cleaned_word = cleaned_word.replace("http://", "https://", 1)
clean_url_value = clean_url(cleaned_word)
if clean_url_value and is_valid_url(clean_url_value):
text_builder.link(clean_url_value, clean_url_value)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
else:
text_builder.text(word) text_builder.text(word)
else:
clean_url_value = clean_url(normalized_candidate)
if clean_url_value and is_valid_url(clean_url_value):
display_text = cleaned_word
text_builder.link(display_text, clean_url_value)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
else:
text_builder.text(word)
elif cleaned_word.startswith("#") and len(cleaned_word) > 1: elif cleaned_word.startswith("#") and len(cleaned_word) > 1:
clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"") clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"")
@@ -1607,7 +1652,7 @@ def make_rich(content):
def build_dynamic_alt(raw_text): def build_dynamic_alt(raw_text):
dynamic_alt = clean_post_text(raw_text) dynamic_alt = clean_post_text(raw_text)
dynamic_alt = dynamic_alt.replace("\n", " ").strip() dynamic_alt = dynamic_alt.replace("\n", " ").strip()
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() dynamic_alt = re.sub(r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt).strip()
if len(dynamic_alt) > 150: if len(dynamic_alt) > 150:
dynamic_alt = dynamic_alt[:147] + "..." dynamic_alt = dynamic_alt[:147] + "..."
@@ -2115,18 +2160,11 @@ def sync_feeds(args):
has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or [])) has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or []))
has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or [])) has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or []))
if primary_non_x_url and not has_video and not has_photo: raw_text = choose_final_visible_text(
raw_text = choose_final_visible_text( full_clean_text,
full_clean_text, primary_non_x_url=primary_non_x_url,
primary_non_x_url=primary_non_x_url, prefer_full_text_without_url=False,
prefer_full_text_without_url=False, )
)
else:
raw_text = choose_final_visible_text(
full_clean_text,
primary_non_x_url=primary_non_x_url,
prefer_full_text_without_url=False,
)
media_fingerprint = build_media_fingerprint(tweet) media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(normalized_text, media_fingerprint) text_media_key = build_text_media_key(normalized_text, media_fingerprint)