diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index a42c8a0..cd25af1 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,5 +1,6 @@ import argparse import arrow +import hashlib import logging import re import httpx @@ -16,6 +17,7 @@ LOG_PATH = "twitter2bsky.log" SCRAPE_TWEET_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 TWEET_MAX_AGE_DAYS = 3 +APPEND_SOURCE_TWEET_URL = True # --- Logging Setup --- logging.basicConfig( @@ -67,6 +69,50 @@ def clean_url(url): return None +def canonicalize_tweet_url(url): + """ + Canonicalize x.com/twitter.com status URLs for dedupe. + """ + if not url: + return None + + url = url.strip() + + match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) + if not match: + return url.lower() + + handle = match.group(1).lower() + tweet_id = match.group(2) + return f"https://x.com/{handle}/status/{tweet_id}" + + +def extract_urls_from_text(text): + if not text: + return [] + return re.findall(r"https?://[^\s]+", text) + + +def extract_urls_from_facets(record): + """ + Extract link URLs from Bluesky rich text facets if present. + """ + urls = [] + + try: + facets = getattr(record, "facets", None) or [] + for facet in facets: + features = getattr(facet, "features", None) or [] + for feature in features: + uri = getattr(feature, "uri", None) + if uri: + urls.append(uri) + except Exception as e: + logging.debug(f"Could not extract facet URLs: {e}") + + return urls + + def get_blob_from_url(media_url, client): try: r = httpx.get(media_url, timeout=30, follow_redirects=True) @@ -86,12 +132,21 @@ def get_blob_from_file(file_path, client): return None -def prepare_post_text(text): +def prepare_post_text(text, tweet_url=None): """ Prepare the final text exactly as it would be posted to Bluesky. + Optionally append source tweet URL for stronger dedupe. """ raw_text = (text or "").strip() + if APPEND_SOURCE_TWEET_URL and tweet_url: + canonical_url = canonicalize_tweet_url(tweet_url) + if canonical_url and canonical_url not in raw_text: + if raw_text: + raw_text = f"{raw_text}\n\n{canonical_url}" + else: + raw_text = canonical_url + if len(raw_text) > 295: truncated = raw_text[:290] last_space = truncated.rfind(" ") @@ -115,10 +170,85 @@ def normalize_post_text(text): return text.lower() +def build_media_fingerprint(tweet): + """ + Build a deterministic media fingerprint from scraped tweet media. + Uses media type + canonicalized/stable media URL components. + """ + if not tweet or not tweet.media: + return "no-media" + + parts = [] + + for media in tweet.media: + media_type = getattr(media, "type", "unknown") + media_url = getattr(media, "media_url_https", "") or "" + + stable_value = media_url + + if media_type == "photo": + stable_value = re.sub(r"[?&]name=\w+", "", stable_value) + stable_value = re.sub(r"[?&]format=\w+", "", stable_value) + elif media_type == "video": + stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") + + parts.append(f"{media_type}:{stable_value}") + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def build_bsky_media_fingerprint(post_view): + """ + Build a best-effort media fingerprint from Bluesky embed structure. + This won't always perfectly match X source media IDs, but it gives a stable + signature for comparison among already-posted Bluesky items. + """ + try: + embed = getattr(post_view, "embed", None) + if not embed: + return "no-media" + + parts = [] + + images = getattr(embed, "images", None) + if images: + for img in images: + image_obj = getattr(img, "image", None) + ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) + parts.append(f"photo:{ref}") + + video = getattr(embed, "video", None) + if video: + ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) + parts.append(f"video:{ref}") + + external = getattr(embed, "external", None) + if external: + uri = getattr(external, "uri", None) or str(external) + parts.append(f"external:{uri}") + + if not parts: + return "no-media" + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + except Exception as e: + logging.debug(f"Could not build Bluesky media fingerprint: {e}") + return "no-media" + + +def build_text_media_key(normalized_text, media_fingerprint): + return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() + + def get_recent_bsky_posts(client, handle, limit=30): """ Fetch recent top-level Bluesky posts for duplicate detection. - Returns a list of dicts with original and normalized text. + Returns a list of dicts with dedupe keys. """ recent_posts = [] @@ -135,15 +265,30 @@ def get_recent_bsky_posts(client, handle, limit=30): continue text = getattr(record, "text", "") or "" - prepared = prepare_post_text(text) - normalized = normalize_post_text(prepared) + normalized_text = normalize_post_text(text) - if normalized: - recent_posts.append({ - "text": prepared, - "normalized_text": normalized, - "created_at": getattr(record, "created_at", None), - }) + urls = [] + urls.extend(extract_urls_from_text(text)) + urls.extend(extract_urls_from_facets(record)) + + canonical_urls = set() + for url in urls: + canonical = canonicalize_tweet_url(url) + if canonical: + canonical_urls.add(canonical) + + media_fingerprint = build_bsky_media_fingerprint(item.post) + text_media_key = build_text_media_key(normalized_text, media_fingerprint) + + recent_posts.append({ + "uri": getattr(item.post, "uri", None), + "text": text, + "normalized_text": normalized_text, + "canonical_urls": canonical_urls, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "created_at": getattr(record, "created_at", None), + }) except Exception as e: logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") @@ -603,6 +748,32 @@ def download_and_crop_video(video_url, output_path): pass +def candidate_matches_existing(candidate, recent_bsky_posts): + """ + Multi-signal dedupe: + 1. canonical tweet URL + 2. text + media fingerprint + 3. normalized text only + """ + candidate_url = candidate["canonical_tweet_url"] + candidate_text_media_key = candidate["text_media_key"] + candidate_normalized_text = candidate["normalized_text"] + + for existing in recent_bsky_posts: + existing_urls = existing["canonical_urls"] + + if candidate_url and candidate_url in existing_urls: + return True, "tweet_url" + + if candidate_text_media_key == existing["text_media_key"]: + return True, "text_media_fingerprint" + + if candidate_normalized_text == existing["normalized_text"]: + return True, "normalized_text" + + return False, None + + # --- Main Sync Function --- def sync_feeds(args): logging.info("🔄 Starting sync cycle...") @@ -626,10 +797,8 @@ def sync_feeds(args): args.bsky_handle, limit=DEDUPE_BSKY_LIMIT ) - recent_bsky_text_set = {post["normalized_text"] for post in recent_bsky_posts if post["normalized_text"]} - logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for 30-vs-30 duplicate detection.") - logging.info(f"🧠 Built normalized Bluesky dedupe set with {len(recent_bsky_text_set)} entries.") + logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for advanced duplicate detection.") too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}") @@ -644,33 +813,41 @@ def sync_feeds(args): logging.info(f"⏭️ Skipping old tweet from {tweet_time}") continue - prepared_text = prepare_post_text(tweet.text) + prepared_text = prepare_post_text(tweet.text, tweet.tweet_url) normalized_text = normalize_post_text(prepared_text) if not normalized_text: logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") continue + media_fingerprint = build_media_fingerprint(tweet) + text_media_key = build_text_media_key(normalized_text, media_fingerprint) + canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) + candidate_tweets.append({ "tweet": tweet, "tweet_time": tweet_time, "raw_text": prepared_text, "normalized_text": normalized_text, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "canonical_tweet_url": canonical_tweet_url, }) except Exception as e: logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}") - logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for comparison against recent Bluesky posts.") + logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for advanced dedupe comparison.") tweets_to_post = [] for candidate in candidate_tweets: - if candidate["normalized_text"] in recent_bsky_text_set: - logging.info("⏭️ Skipping candidate because text already exists in the last 30 Bluesky posts.") + is_dup, reason = candidate_matches_existing(candidate, recent_bsky_posts) + if is_dup: + logging.info(f"⏭️ Skipping candidate due to duplicate match on: {reason}") continue tweets_to_post.append(candidate) - logging.info(f"📬 {len(tweets_to_post)} tweets remain after 30-vs-30 duplicate filtering.") + logging.info(f"📬 {len(tweets_to_post)} tweets remain after advanced duplicate filtering.") if not tweets_to_post: logging.info("✅ No new tweets need posting after duplicate comparison.") @@ -760,7 +937,17 @@ def sync_feeds(args): else: bsky_client.send_post(text=rich_text, langs=["ca"]) - recent_bsky_text_set.add(candidate["normalized_text"]) + recent_bsky_posts.insert(0, { + "uri": None, + "text": raw_text, + "normalized_text": candidate["normalized_text"], + "canonical_urls": {candidate["canonical_tweet_url"]} if candidate["canonical_tweet_url"] else set(), + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "created_at": None, + }) + recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + new_posts += 1 logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}") time.sleep(5) @@ -817,4 +1004,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main()