Triple Dedupe

This commit is contained in:
2026-04-04 20:31:08 +00:00
parent ade283c8b0
commit 586f7e29f1

View File

@@ -1,5 +1,6 @@
import argparse import argparse
import arrow import arrow
import hashlib
import logging import logging
import re import re
import httpx import httpx
@@ -16,6 +17,7 @@ LOG_PATH = "twitter2bsky.log"
SCRAPE_TWEET_LIMIT = 30 SCRAPE_TWEET_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30
TWEET_MAX_AGE_DAYS = 3 TWEET_MAX_AGE_DAYS = 3
APPEND_SOURCE_TWEET_URL = True
# --- Logging Setup --- # --- Logging Setup ---
logging.basicConfig( logging.basicConfig(
@@ -67,6 +69,50 @@ def clean_url(url):
return None return None
def canonicalize_tweet_url(url):
"""
Canonicalize x.com/twitter.com status URLs for dedupe.
"""
if not url:
return None
url = url.strip()
match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE)
if not match:
return url.lower()
handle = match.group(1).lower()
tweet_id = match.group(2)
return f"https://x.com/{handle}/status/{tweet_id}"
def extract_urls_from_text(text):
if not text:
return []
return re.findall(r"https?://[^\s]+", text)
def extract_urls_from_facets(record):
"""
Extract link URLs from Bluesky rich text facets if present.
"""
urls = []
try:
facets = getattr(record, "facets", None) or []
for facet in facets:
features = getattr(facet, "features", None) or []
for feature in features:
uri = getattr(feature, "uri", None)
if uri:
urls.append(uri)
except Exception as e:
logging.debug(f"Could not extract facet URLs: {e}")
return urls
def get_blob_from_url(media_url, client): def get_blob_from_url(media_url, client):
try: try:
r = httpx.get(media_url, timeout=30, follow_redirects=True) r = httpx.get(media_url, timeout=30, follow_redirects=True)
@@ -86,12 +132,21 @@ def get_blob_from_file(file_path, client):
return None return None
def prepare_post_text(text): def prepare_post_text(text, tweet_url=None):
""" """
Prepare the final text exactly as it would be posted to Bluesky. Prepare the final text exactly as it would be posted to Bluesky.
Optionally append source tweet URL for stronger dedupe.
""" """
raw_text = (text or "").strip() raw_text = (text or "").strip()
if APPEND_SOURCE_TWEET_URL and tweet_url:
canonical_url = canonicalize_tweet_url(tweet_url)
if canonical_url and canonical_url not in raw_text:
if raw_text:
raw_text = f"{raw_text}\n\n{canonical_url}"
else:
raw_text = canonical_url
if len(raw_text) > 295: if len(raw_text) > 295:
truncated = raw_text[:290] truncated = raw_text[:290]
last_space = truncated.rfind(" ") last_space = truncated.rfind(" ")
@@ -115,10 +170,85 @@ def normalize_post_text(text):
return text.lower() return text.lower()
def build_media_fingerprint(tweet):
"""
Build a deterministic media fingerprint from scraped tweet media.
Uses media type + canonicalized/stable media URL components.
"""
if not tweet or not tweet.media:
return "no-media"
parts = []
for media in tweet.media:
media_type = getattr(media, "type", "unknown")
media_url = getattr(media, "media_url_https", "") or ""
stable_value = media_url
if media_type == "photo":
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
elif media_type == "video":
stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "")
parts.append(f"{media_type}:{stable_value}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
"""
Build a best-effort media fingerprint from Bluesky embed structure.
This won't always perfectly match X source media IDs, but it gives a stable
signature for comparison among already-posted Bluesky items.
"""
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
images = getattr(embed, "images", None)
if images:
for img in images:
image_obj = getattr(img, "image", None)
ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj)
parts.append(f"photo:{ref}")
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
external = getattr(embed, "external", None)
if external:
uri = getattr(external, "uri", None) or str(external)
parts.append(f"external:{uri}")
if not parts:
return "no-media"
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest()
def get_recent_bsky_posts(client, handle, limit=30): def get_recent_bsky_posts(client, handle, limit=30):
""" """
Fetch recent top-level Bluesky posts for duplicate detection. Fetch recent top-level Bluesky posts for duplicate detection.
Returns a list of dicts with original and normalized text. Returns a list of dicts with dedupe keys.
""" """
recent_posts = [] recent_posts = []
@@ -135,15 +265,30 @@ def get_recent_bsky_posts(client, handle, limit=30):
continue continue
text = getattr(record, "text", "") or "" text = getattr(record, "text", "") or ""
prepared = prepare_post_text(text) normalized_text = normalize_post_text(text)
normalized = normalize_post_text(prepared)
if normalized: urls = []
recent_posts.append({ urls.extend(extract_urls_from_text(text))
"text": prepared, urls.extend(extract_urls_from_facets(record))
"normalized_text": normalized,
"created_at": getattr(record, "created_at", None), canonical_urls = set()
}) for url in urls:
canonical = canonicalize_tweet_url(url)
if canonical:
canonical_urls.add(canonical)
media_fingerprint = build_bsky_media_fingerprint(item.post)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
recent_posts.append({
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_urls": canonical_urls,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"created_at": getattr(record, "created_at", None),
})
except Exception as e: except Exception as e:
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
@@ -603,6 +748,32 @@ def download_and_crop_video(video_url, output_path):
pass pass
def candidate_matches_existing(candidate, recent_bsky_posts):
"""
Multi-signal dedupe:
1. canonical tweet URL
2. text + media fingerprint
3. normalized text only
"""
candidate_url = candidate["canonical_tweet_url"]
candidate_text_media_key = candidate["text_media_key"]
candidate_normalized_text = candidate["normalized_text"]
for existing in recent_bsky_posts:
existing_urls = existing["canonical_urls"]
if candidate_url and candidate_url in existing_urls:
return True, "tweet_url"
if candidate_text_media_key == existing["text_media_key"]:
return True, "text_media_fingerprint"
if candidate_normalized_text == existing["normalized_text"]:
return True, "normalized_text"
return False, None
# --- Main Sync Function --- # --- Main Sync Function ---
def sync_feeds(args): def sync_feeds(args):
logging.info("🔄 Starting sync cycle...") logging.info("🔄 Starting sync cycle...")
@@ -626,10 +797,8 @@ def sync_feeds(args):
args.bsky_handle, args.bsky_handle,
limit=DEDUPE_BSKY_LIMIT limit=DEDUPE_BSKY_LIMIT
) )
recent_bsky_text_set = {post["normalized_text"] for post in recent_bsky_posts if post["normalized_text"]}
logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for 30-vs-30 duplicate detection.") logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for advanced duplicate detection.")
logging.info(f"🧠 Built normalized Bluesky dedupe set with {len(recent_bsky_text_set)} entries.")
too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS)
logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}") logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}")
@@ -644,33 +813,41 @@ def sync_feeds(args):
logging.info(f"⏭️ Skipping old tweet from {tweet_time}") logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
continue continue
prepared_text = prepare_post_text(tweet.text) prepared_text = prepare_post_text(tweet.text, tweet.tweet_url)
normalized_text = normalize_post_text(prepared_text) normalized_text = normalize_post_text(prepared_text)
if not normalized_text: if not normalized_text:
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}")
continue continue
media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url)
candidate_tweets.append({ candidate_tweets.append({
"tweet": tweet, "tweet": tweet,
"tweet_time": tweet_time, "tweet_time": tweet_time,
"raw_text": prepared_text, "raw_text": prepared_text,
"normalized_text": normalized_text, "normalized_text": normalized_text,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"canonical_tweet_url": canonical_tweet_url,
}) })
except Exception as e: except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}") logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}")
logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for comparison against recent Bluesky posts.") logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for advanced dedupe comparison.")
tweets_to_post = [] tweets_to_post = []
for candidate in candidate_tweets: for candidate in candidate_tweets:
if candidate["normalized_text"] in recent_bsky_text_set: is_dup, reason = candidate_matches_existing(candidate, recent_bsky_posts)
logging.info("⏭️ Skipping candidate because text already exists in the last 30 Bluesky posts.") if is_dup:
logging.info(f"⏭️ Skipping candidate due to duplicate match on: {reason}")
continue continue
tweets_to_post.append(candidate) tweets_to_post.append(candidate)
logging.info(f"📬 {len(tweets_to_post)} tweets remain after 30-vs-30 duplicate filtering.") logging.info(f"📬 {len(tweets_to_post)} tweets remain after advanced duplicate filtering.")
if not tweets_to_post: if not tweets_to_post:
logging.info("✅ No new tweets need posting after duplicate comparison.") logging.info("✅ No new tweets need posting after duplicate comparison.")
@@ -760,7 +937,17 @@ def sync_feeds(args):
else: else:
bsky_client.send_post(text=rich_text, langs=["ca"]) bsky_client.send_post(text=rich_text, langs=["ca"])
recent_bsky_text_set.add(candidate["normalized_text"]) recent_bsky_posts.insert(0, {
"uri": None,
"text": raw_text,
"normalized_text": candidate["normalized_text"],
"canonical_urls": {candidate["canonical_tweet_url"]} if candidate["canonical_tweet_url"] else set(),
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"created_at": None,
})
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1 new_posts += 1
logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}") logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}")
time.sleep(5) time.sleep(5)
@@ -817,4 +1004,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()