1292 lines
44 KiB
Python
1292 lines
44 KiB
Python
import argparse
|
|
import arrow
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import httpx
|
|
import time
|
|
import os
|
|
import subprocess
|
|
from urllib.parse import urlparse
|
|
from dotenv import load_dotenv
|
|
from atproto import Client, client_utils, models
|
|
from playwright.sync_api import sync_playwright
|
|
from moviepy import VideoFileClip
|
|
|
|
# --- Configuration ---
|
|
LOG_PATH = "twitter2bsky.log"
|
|
STATE_PATH = "twitter2bsky_state.json"
|
|
SCRAPE_TWEET_LIMIT = 30
|
|
DEDUPE_BSKY_LIMIT = 30
|
|
TWEET_MAX_AGE_DAYS = 3
|
|
VIDEO_MAX_DURATION_SECONDS = 179
|
|
|
|
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
|
|
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
|
|
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
|
|
MEDIA_DOWNLOAD_TIMEOUT = 30
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
|
|
level=logging.INFO,
|
|
)
|
|
|
|
|
|
# --- Custom Classes ---
|
|
class ScrapedMedia:
|
|
def __init__(self, url, media_type="photo"):
|
|
self.type = media_type
|
|
self.media_url_https = url
|
|
|
|
|
|
class ScrapedTweet:
|
|
def __init__(self, created_on, text, media_urls, tweet_url=None):
|
|
self.created_on = created_on
|
|
self.text = text
|
|
self.tweet_url = tweet_url
|
|
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
|
|
|
|
|
|
# --- Helpers ---
|
|
def take_error_screenshot(page, error_msg):
|
|
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|
screenshot_name = f"screenshot_{timestamp}.png"
|
|
page.screenshot(path=screenshot_name)
|
|
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
|
|
|
|
|
|
def is_valid_url(url):
|
|
try:
|
|
response = httpx.head(url, timeout=5, follow_redirects=True)
|
|
return response.status_code < 500
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def clean_url(url):
|
|
trimmed_url = url.strip()
|
|
cleaned_url = re.sub(r"\s+", "", trimmed_url)
|
|
cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url)
|
|
|
|
if is_valid_url(cleaned_url):
|
|
return cleaned_url
|
|
return None
|
|
|
|
|
|
def canonicalize_url(url):
|
|
if not url:
|
|
return None
|
|
return url.strip()
|
|
|
|
|
|
def canonicalize_tweet_url(url):
|
|
"""
|
|
Canonicalize x.com/twitter.com status URLs for internal dedupe only.
|
|
"""
|
|
if not url:
|
|
return None
|
|
|
|
url = url.strip()
|
|
match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE)
|
|
if not match:
|
|
return url.lower()
|
|
|
|
handle = match.group(1).lower()
|
|
tweet_id = match.group(2)
|
|
return f"https://x.com/{handle}/status/{tweet_id}"
|
|
|
|
|
|
def is_x_or_twitter_domain(url):
|
|
try:
|
|
hostname = (urlparse(url).hostname or "").lower()
|
|
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"}
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def extract_urls_from_text(text):
|
|
if not text:
|
|
return []
|
|
return re.findall(r"https?://[^\s]+", text)
|
|
|
|
|
|
def extract_non_x_urls_from_text(text):
|
|
urls = extract_urls_from_text(text)
|
|
result = []
|
|
|
|
for url in urls:
|
|
cleaned = re.sub(r"[…\.]+$", "", url.strip())
|
|
if cleaned and not is_x_or_twitter_domain(cleaned):
|
|
result.append(cleaned)
|
|
|
|
return result
|
|
|
|
|
|
def extract_urls_from_facets(record):
|
|
"""
|
|
Extract link URLs from Bluesky rich text facets if present.
|
|
"""
|
|
urls = []
|
|
|
|
try:
|
|
facets = getattr(record, "facets", None) or []
|
|
for facet in facets:
|
|
features = getattr(facet, "features", None) or []
|
|
for feature in features:
|
|
uri = getattr(feature, "uri", None)
|
|
if uri:
|
|
urls.append(uri)
|
|
except Exception as e:
|
|
logging.debug(f"Could not extract facet URLs: {e}")
|
|
|
|
return urls
|
|
|
|
|
|
def get_rate_limit_wait_seconds(error_obj, default_delay):
|
|
"""
|
|
Try to extract a sensible wait time from atproto/http error objects.
|
|
"""
|
|
try:
|
|
headers = getattr(error_obj, "headers", None)
|
|
if headers:
|
|
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
|
|
if reset_value:
|
|
now_ts = int(time.time())
|
|
reset_ts = int(reset_value)
|
|
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
|
|
return wait_seconds
|
|
except Exception:
|
|
pass
|
|
|
|
return default_delay
|
|
|
|
|
|
def upload_blob_with_retry(client, binary_data, media_label="media"):
|
|
"""
|
|
Retry Bluesky blob upload when rate-limited.
|
|
"""
|
|
last_exception = None
|
|
|
|
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
|
|
try:
|
|
result = client.upload_blob(binary_data)
|
|
return result.blob
|
|
|
|
except Exception as e:
|
|
last_exception = e
|
|
error_text = str(e)
|
|
is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text
|
|
|
|
if not is_rate_limited:
|
|
logging.warning(f"Could not upload {media_label}: {e}")
|
|
return None
|
|
|
|
backoff_delay = min(
|
|
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
|
|
BSKY_BLOB_UPLOAD_MAX_DELAY
|
|
)
|
|
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
|
|
|
|
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
|
|
logging.warning(
|
|
f"⏳ Bluesky blob upload rate-limited for {media_label}. "
|
|
f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s."
|
|
)
|
|
time.sleep(wait_seconds)
|
|
else:
|
|
logging.warning(
|
|
f"❌ Exhausted blob upload retries for {media_label} after rate limiting: {e}"
|
|
)
|
|
|
|
logging.warning(f"Could not upload {media_label}: {last_exception}")
|
|
return None
|
|
|
|
|
|
def get_blob_from_url(media_url, client, http_client):
|
|
"""
|
|
Download media and upload to Bluesky with retry support for upload rate limits.
|
|
"""
|
|
try:
|
|
r = http_client.get(media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True)
|
|
if r.status_code != 200:
|
|
logging.warning(f"Could not fetch media {media_url}: HTTP {r.status_code}")
|
|
return None
|
|
|
|
content = r.content
|
|
if not content:
|
|
logging.warning(f"Could not fetch media {media_url}: empty response body")
|
|
return None
|
|
|
|
return upload_blob_with_retry(client, content, media_label=media_url)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Could not fetch media {media_url}: {e}")
|
|
return None
|
|
|
|
|
|
def get_blob_from_file(file_path, client):
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
binary_data = f.read()
|
|
|
|
return upload_blob_with_retry(client, binary_data, media_label=file_path)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Could not upload local file {file_path}: {e}")
|
|
return None
|
|
|
|
|
|
def prepare_post_text(text):
|
|
"""
|
|
Prepare the final public text exactly as it should be posted to Bluesky.
|
|
Does NOT append the source X URL.
|
|
"""
|
|
raw_text = (text or "").strip()
|
|
|
|
if len(raw_text) > 295:
|
|
truncated = raw_text[:290]
|
|
last_space = truncated.rfind(" ")
|
|
if last_space > 0:
|
|
raw_text = truncated[:last_space] + "..."
|
|
else:
|
|
raw_text = truncated + "..."
|
|
|
|
return raw_text
|
|
|
|
|
|
def normalize_post_text(text):
|
|
"""
|
|
Normalize post text for duplicate detection.
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
text = text.replace("\r", "\n")
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text.lower()
|
|
|
|
|
|
def build_media_fingerprint(tweet):
|
|
"""
|
|
Build a deterministic media fingerprint from scraped tweet media.
|
|
"""
|
|
if not tweet or not tweet.media:
|
|
return "no-media"
|
|
|
|
parts = []
|
|
|
|
for media in tweet.media:
|
|
media_type = getattr(media, "type", "unknown")
|
|
media_url = getattr(media, "media_url_https", "") or ""
|
|
|
|
stable_value = media_url
|
|
|
|
if media_type == "photo":
|
|
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
|
|
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
|
|
elif media_type == "video":
|
|
stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "")
|
|
|
|
parts.append(f"{media_type}:{stable_value}")
|
|
|
|
parts.sort()
|
|
raw = "|".join(parts)
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def build_bsky_media_fingerprint(post_view):
|
|
"""
|
|
Best-effort media fingerprint from Bluesky embed structure.
|
|
"""
|
|
try:
|
|
embed = getattr(post_view, "embed", None)
|
|
if not embed:
|
|
return "no-media"
|
|
|
|
parts = []
|
|
|
|
images = getattr(embed, "images", None)
|
|
if images:
|
|
for img in images:
|
|
image_obj = getattr(img, "image", None)
|
|
ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj)
|
|
parts.append(f"photo:{ref}")
|
|
|
|
video = getattr(embed, "video", None)
|
|
if video:
|
|
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
|
|
parts.append(f"video:{ref}")
|
|
|
|
external = getattr(embed, "external", None)
|
|
if external:
|
|
uri = getattr(external, "uri", None) or str(external)
|
|
parts.append(f"external:{uri}")
|
|
|
|
if not parts:
|
|
return "no-media"
|
|
|
|
parts.sort()
|
|
raw = "|".join(parts)
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
|
except Exception as e:
|
|
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
|
|
return "no-media"
|
|
|
|
|
|
def build_text_media_key(normalized_text, media_fingerprint):
|
|
return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest()
|
|
|
|
|
|
# --- Local State Management ---
|
|
def default_state():
|
|
return {
|
|
"version": 1,
|
|
"posted_tweets": {},
|
|
"posted_by_bsky_uri": {},
|
|
"updated_at": None,
|
|
}
|
|
|
|
|
|
def load_state(state_path=STATE_PATH):
|
|
if not os.path.exists(state_path):
|
|
logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.")
|
|
return default_state()
|
|
|
|
try:
|
|
with open(state_path, "r", encoding="utf-8") as f:
|
|
state = json.load(f)
|
|
|
|
if not isinstance(state, dict):
|
|
logging.warning("⚠️ State file is invalid. Reinitializing.")
|
|
return default_state()
|
|
|
|
state.setdefault("version", 1)
|
|
state.setdefault("posted_tweets", {})
|
|
state.setdefault("posted_by_bsky_uri", {})
|
|
state.setdefault("updated_at", None)
|
|
|
|
return state
|
|
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.")
|
|
return default_state()
|
|
|
|
|
|
def save_state(state, state_path=STATE_PATH):
|
|
try:
|
|
state["updated_at"] = arrow.utcnow().isoformat()
|
|
temp_path = f"{state_path}.tmp"
|
|
|
|
with open(temp_path, "w", encoding="utf-8") as f:
|
|
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
|
|
|
|
os.replace(temp_path, state_path)
|
|
logging.info(f"💾 State saved to {state_path}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Failed to save state file {state_path}: {e}")
|
|
|
|
|
|
def remember_posted_tweet(state, candidate, bsky_uri=None):
|
|
"""
|
|
Store successful post in local state.
|
|
Primary key is canonical tweet URL when available.
|
|
Fallback key uses text_media_key.
|
|
"""
|
|
canonical_tweet_url = candidate.get("canonical_tweet_url")
|
|
fallback_key = f"textmedia:{candidate['text_media_key']}"
|
|
state_key = canonical_tweet_url or fallback_key
|
|
|
|
record = {
|
|
"canonical_tweet_url": canonical_tweet_url,
|
|
"normalized_text": candidate["normalized_text"],
|
|
"raw_text": candidate["raw_text"],
|
|
"media_fingerprint": candidate["media_fingerprint"],
|
|
"text_media_key": candidate["text_media_key"],
|
|
"canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]),
|
|
"bsky_uri": bsky_uri,
|
|
"tweet_created_on": candidate["tweet"].created_on,
|
|
"tweet_url": candidate["tweet"].tweet_url,
|
|
"posted_at": arrow.utcnow().isoformat(),
|
|
}
|
|
|
|
state["posted_tweets"][state_key] = record
|
|
|
|
if bsky_uri:
|
|
state["posted_by_bsky_uri"][bsky_uri] = state_key
|
|
|
|
|
|
def candidate_matches_state(candidate, state):
|
|
"""
|
|
Strong private dedupe using local persistent state.
|
|
Match order:
|
|
1. canonical tweet URL
|
|
2. text + media fingerprint
|
|
3. normalized text
|
|
"""
|
|
canonical_tweet_url = candidate["canonical_tweet_url"]
|
|
text_media_key = candidate["text_media_key"]
|
|
normalized_text = candidate["normalized_text"]
|
|
|
|
posted_tweets = state.get("posted_tweets", {})
|
|
|
|
if canonical_tweet_url and canonical_tweet_url in posted_tweets:
|
|
return True, "state:tweet_url"
|
|
|
|
for _, record in posted_tweets.items():
|
|
if record.get("text_media_key") == text_media_key:
|
|
return True, "state:text_media_fingerprint"
|
|
|
|
for _, record in posted_tweets.items():
|
|
if record.get("normalized_text") == normalized_text:
|
|
return True, "state:normalized_text"
|
|
|
|
return False, None
|
|
|
|
|
|
def prune_state(state, max_entries=5000):
|
|
"""
|
|
Keep state file from growing forever.
|
|
Prunes oldest records by posted_at if necessary.
|
|
"""
|
|
posted_tweets = state.get("posted_tweets", {})
|
|
|
|
if len(posted_tweets) <= max_entries:
|
|
return state
|
|
|
|
sortable = []
|
|
for key, record in posted_tweets.items():
|
|
posted_at = record.get("posted_at") or ""
|
|
sortable.append((key, posted_at))
|
|
|
|
sortable.sort(key=lambda x: x[1], reverse=True)
|
|
keep_keys = {key for key, _ in sortable[:max_entries]}
|
|
|
|
new_posted_tweets = {}
|
|
for key, record in posted_tweets.items():
|
|
if key in keep_keys:
|
|
new_posted_tweets[key] = record
|
|
|
|
new_posted_by_bsky_uri = {}
|
|
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
|
|
if key in keep_keys:
|
|
new_posted_by_bsky_uri[bsky_uri] = key
|
|
|
|
state["posted_tweets"] = new_posted_tweets
|
|
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
|
|
return state
|
|
|
|
|
|
# --- Bluesky Post History ---
|
|
def get_recent_bsky_posts(client, handle, limit=30):
|
|
"""
|
|
Fetch recent top-level Bluesky posts for duplicate detection.
|
|
Returns a list of dicts with dedupe keys.
|
|
"""
|
|
recent_posts = []
|
|
|
|
try:
|
|
timeline = client.get_author_feed(handle, limit=limit)
|
|
|
|
for item in timeline.feed:
|
|
try:
|
|
if item.reason is not None:
|
|
continue
|
|
|
|
record = item.post.record
|
|
if getattr(record, "reply", None) is not None:
|
|
continue
|
|
|
|
text = getattr(record, "text", "") or ""
|
|
normalized_text = normalize_post_text(text)
|
|
|
|
urls = []
|
|
urls.extend(extract_non_x_urls_from_text(text))
|
|
urls.extend(extract_urls_from_facets(record))
|
|
|
|
canonical_non_x_urls = set()
|
|
for url in urls:
|
|
if not is_x_or_twitter_domain(url):
|
|
canonical = canonicalize_url(url)
|
|
if canonical:
|
|
canonical_non_x_urls.add(canonical)
|
|
|
|
media_fingerprint = build_bsky_media_fingerprint(item.post)
|
|
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
|
|
|
|
recent_posts.append({
|
|
"uri": getattr(item.post, "uri", None),
|
|
"text": text,
|
|
"normalized_text": normalized_text,
|
|
"canonical_non_x_urls": canonical_non_x_urls,
|
|
"media_fingerprint": media_fingerprint,
|
|
"text_media_key": text_media_key,
|
|
"created_at": getattr(record, "created_at", None),
|
|
})
|
|
|
|
except Exception as e:
|
|
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
|
|
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}")
|
|
|
|
return recent_posts
|
|
|
|
|
|
def make_rich(content):
|
|
text_builder = client_utils.TextBuilder()
|
|
|
|
def repair_url(match):
|
|
raw = match.group(0)
|
|
|
|
if "\n" not in raw and "\r" not in raw:
|
|
return re.sub(r"[…\.]+$", "", raw)
|
|
|
|
glued = raw.replace("\n", "").replace("\r", "")
|
|
test_url = re.sub(r"[…\.]+$", "", glued)
|
|
|
|
if is_valid_url(test_url):
|
|
return test_url
|
|
|
|
parts = raw.split("\n")
|
|
test_part0 = re.sub(r"[…\.]+$", "", parts[0])
|
|
if is_valid_url(test_part0):
|
|
return raw
|
|
|
|
return test_url
|
|
|
|
content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip())
|
|
lines = content.splitlines()
|
|
|
|
for line_idx, line in enumerate(lines):
|
|
if not line.strip():
|
|
if line_idx < len(lines) - 1:
|
|
text_builder.text("\n")
|
|
continue
|
|
|
|
words = line.split(" ")
|
|
for i, word in enumerate(words):
|
|
if not word:
|
|
if i < len(words) - 1:
|
|
text_builder.text(" ")
|
|
continue
|
|
|
|
if word.startswith("http://") or word.startswith("https://"):
|
|
if word.startswith("http://"):
|
|
word = word.replace("http://", "https://", 1)
|
|
|
|
word = re.sub(r"[…\.]+$", "", word)
|
|
clean_url_value = clean_url(word)
|
|
|
|
if clean_url_value and is_valid_url(clean_url_value):
|
|
text_builder.link(clean_url_value, clean_url_value)
|
|
else:
|
|
text_builder.text(word)
|
|
|
|
elif word.startswith("#"):
|
|
clean_tag = word[1:].rstrip(".,;:!?)'\"…")
|
|
text_builder.tag(word, clean_tag)
|
|
|
|
else:
|
|
text_builder.text(word)
|
|
|
|
if i < len(words) - 1:
|
|
text_builder.text(" ")
|
|
|
|
if line_idx < len(lines) - 1:
|
|
text_builder.text("\n")
|
|
|
|
return text_builder
|
|
|
|
|
|
def build_dynamic_alt(raw_text):
|
|
dynamic_alt = raw_text.replace("\n", " ").strip()
|
|
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip()
|
|
|
|
if len(dynamic_alt) > 150:
|
|
dynamic_alt = dynamic_alt[:147] + "..."
|
|
elif not dynamic_alt:
|
|
dynamic_alt = "Vídeo o imatge adjunta al tuit"
|
|
|
|
return dynamic_alt
|
|
|
|
|
|
def build_video_embed(video_blob, alt_text):
|
|
try:
|
|
return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
|
|
except AttributeError:
|
|
logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
|
|
return None
|
|
|
|
|
|
# --- Playwright Scraping ---
|
|
def scrape_tweets_via_playwright(username, password, email, target_handle):
|
|
tweets = []
|
|
state_file = "twitter_browser_state.json"
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=True,
|
|
args=["--disable-blink-features=AutomationControlled"]
|
|
)
|
|
clean_ua = (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/145.0.7632.6 Safari/537.36"
|
|
)
|
|
|
|
context = None
|
|
needs_login = True
|
|
|
|
if os.path.exists(state_file):
|
|
logging.info("✅ Found existing browser state. Attempting to bypass login...")
|
|
context = browser.new_context(
|
|
user_agent=clean_ua,
|
|
viewport={"width": 1920, "height": 1080},
|
|
storage_state=state_file
|
|
)
|
|
page = context.new_page()
|
|
page.goto("https://x.com/home")
|
|
time.sleep(4)
|
|
|
|
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
|
|
logging.info("✅ Session is valid!")
|
|
needs_login = False
|
|
else:
|
|
logging.warning("⚠️ Saved session expired or invalid. Re-logging in...")
|
|
context.close()
|
|
os.remove(state_file)
|
|
|
|
if needs_login:
|
|
logging.info("🚀 Launching fresh browser for automated Twitter login...")
|
|
context = browser.new_context(
|
|
user_agent=clean_ua,
|
|
viewport={"width": 1920, "height": 1080}
|
|
)
|
|
page = context.new_page()
|
|
|
|
try:
|
|
page.goto("https://x.com")
|
|
sign_in_button = page.get_by_text("Sign in", exact=True)
|
|
sign_in_button.wait_for(state="visible", timeout=15000)
|
|
sign_in_button.click(force=True)
|
|
|
|
page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000)
|
|
logging.info(f"👤 Entering username: {username}...")
|
|
time.sleep(1)
|
|
|
|
username_input = page.locator('input[autocomplete="username"]')
|
|
username_input.wait_for(state="visible", timeout=15000)
|
|
username_input.click(force=True)
|
|
username_input.press_sequentially(username, delay=100)
|
|
|
|
page.locator('button:has-text("Next")').first.click(force=True)
|
|
page.wait_for_selector(
|
|
'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]',
|
|
timeout=15000
|
|
)
|
|
time.sleep(1)
|
|
|
|
if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible():
|
|
logging.warning("🛡️ Security challenge detected! Entering email/phone...")
|
|
page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email)
|
|
sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first
|
|
if sec_next.is_visible():
|
|
sec_next.click(force=True)
|
|
else:
|
|
page.keyboard.press("Enter")
|
|
page.wait_for_selector('input[name="password"]', timeout=15000)
|
|
time.sleep(1)
|
|
|
|
logging.info("🔑 Entering password...")
|
|
page.fill('input[name="password"]', password)
|
|
page.locator('span:has-text("Log in")').first.click()
|
|
|
|
page.wait_for_url("**/home", timeout=20000)
|
|
time.sleep(3)
|
|
|
|
context.storage_state(path=state_file)
|
|
logging.info("✅ Login successful. Browser state saved.")
|
|
|
|
except Exception as e:
|
|
take_error_screenshot(page, "login_failed")
|
|
logging.error(f"❌ Login failed: {e}")
|
|
browser.close()
|
|
return []
|
|
|
|
logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...")
|
|
page = context.new_page()
|
|
page.goto(f"https://x.com/{target_handle}")
|
|
|
|
try:
|
|
page.wait_for_selector("article", timeout=20000)
|
|
time.sleep(3)
|
|
|
|
articles = page.locator("article").all()
|
|
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...")
|
|
|
|
for article in articles[:SCRAPE_TWEET_LIMIT]:
|
|
try:
|
|
time_el = article.locator("time").first
|
|
if not time_el.is_visible():
|
|
continue
|
|
|
|
created_at = time_el.get_attribute("datetime")
|
|
|
|
tweet_url = None
|
|
time_link = article.locator("a:has(time)").first
|
|
if time_link.is_visible():
|
|
href = time_link.get_attribute("href")
|
|
if href:
|
|
tweet_url = f"https://x.com{href}" if href.startswith("/") else href
|
|
|
|
text_locator = article.locator('[data-testid="tweetText"]').first
|
|
text = text_locator.inner_text() if text_locator.is_visible() else ""
|
|
|
|
media_urls = []
|
|
|
|
photo_locators = article.locator('[data-testid="tweetPhoto"] img').all()
|
|
for img in photo_locators:
|
|
src = img.get_attribute("src")
|
|
if src:
|
|
src = re.sub(r"&name=\w+", "&name=large", src)
|
|
media_urls.append((src, "photo"))
|
|
|
|
video_locators = article.locator('[data-testid="videoPlayer"]').all()
|
|
if video_locators:
|
|
media_urls.append((tweet_url or "", "video"))
|
|
|
|
tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url))
|
|
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Failed to parse a specific tweet: {e}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
take_error_screenshot(page, "scrape_failed")
|
|
logging.error(f"❌ Failed to scrape profile: {e}")
|
|
|
|
browser.close()
|
|
return tweets
|
|
|
|
|
|
def extract_video_url_from_tweet_page(context, tweet_url):
|
|
page = context.new_page()
|
|
best_m3u8_url = None
|
|
best_video_mp4_url = None
|
|
seen_urls = set()
|
|
|
|
def is_audio_only_mp4(url, content_type):
|
|
url_l = url.lower()
|
|
content_type_l = content_type.lower()
|
|
return (
|
|
"/aud/" in url_l or
|
|
"/audio/" in url_l or
|
|
"mp4a" in url_l or
|
|
("audio/" in content_type_l and "video/" not in content_type_l)
|
|
)
|
|
|
|
def handle_response(response):
|
|
nonlocal best_m3u8_url, best_video_mp4_url
|
|
try:
|
|
url = response.url
|
|
if url in seen_urls:
|
|
return
|
|
seen_urls.add(url)
|
|
|
|
url_l = url.lower()
|
|
content_type = response.headers.get("content-type", "")
|
|
content_type_l = content_type.lower()
|
|
|
|
if ".m4s" in url_l:
|
|
return
|
|
|
|
if (
|
|
".m3u8" in url_l or
|
|
"application/vnd.apple.mpegurl" in content_type_l or
|
|
"application/x-mpegurl" in content_type_l
|
|
):
|
|
if best_m3u8_url is None:
|
|
best_m3u8_url = url
|
|
logging.info(f"📺 Found HLS playlist URL: {url}")
|
|
return
|
|
|
|
if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l:
|
|
if is_audio_only_mp4(url, content_type):
|
|
logging.info(f"🔇 Ignoring audio-only MP4: {url}")
|
|
return
|
|
|
|
if best_video_mp4_url is None:
|
|
best_video_mp4_url = url
|
|
logging.info(f"🎥 Found VIDEO MP4 URL: {url}")
|
|
return
|
|
|
|
except Exception as e:
|
|
logging.debug(f"Response parsing error: {e}")
|
|
|
|
page.on("response", handle_response)
|
|
|
|
def current_best():
|
|
return best_m3u8_url or best_video_mp4_url
|
|
|
|
try:
|
|
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
|
|
page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000)
|
|
time.sleep(3)
|
|
|
|
player = page.locator('[data-testid="videoPlayer"]').first
|
|
|
|
if player.count() > 0:
|
|
try:
|
|
player.scroll_into_view_if_needed(timeout=5000)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
player.click(force=True, timeout=5000)
|
|
logging.info("▶️ Clicked video player")
|
|
except Exception as e:
|
|
logging.info(f"⚠️ First player click failed: {e}")
|
|
else:
|
|
logging.warning("⚠️ No video player locator found on tweet page")
|
|
|
|
for _ in range(12):
|
|
if current_best():
|
|
break
|
|
time.sleep(1)
|
|
|
|
if not current_best() and player.count() > 0:
|
|
logging.info("🔁 No media URL found yet, retrying player interaction...")
|
|
try:
|
|
player.click(force=True, timeout=5000)
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
logging.info(f"⚠️ Retry click failed: {e}")
|
|
|
|
try:
|
|
page.keyboard.press("Space")
|
|
time.sleep(1)
|
|
except Exception:
|
|
pass
|
|
|
|
for _ in range(8):
|
|
if current_best():
|
|
break
|
|
time.sleep(1)
|
|
|
|
selected_url = current_best()
|
|
if selected_url:
|
|
logging.info(f"✅ Selected media URL for download: {selected_url}")
|
|
else:
|
|
logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}")
|
|
|
|
return selected_url
|
|
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}")
|
|
return None
|
|
finally:
|
|
page.close()
|
|
|
|
|
|
# --- Video Processing ---
|
|
def download_and_crop_video(video_url, output_path):
|
|
temp_input = output_path.replace(".mp4", "_source.mp4")
|
|
temp_output = output_path.replace(".mp4", "_cropped.mp4")
|
|
|
|
try:
|
|
logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}")
|
|
|
|
video_url_l = video_url.lower()
|
|
|
|
if ".m3u8" in video_url_l:
|
|
logging.info("📺 Using HLS ffmpeg mode")
|
|
download_cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-protocol_whitelist", "file,http,https,tcp,tls,crypto",
|
|
"-allowed_extensions", "ALL",
|
|
"-i", video_url,
|
|
"-c", "copy",
|
|
temp_input,
|
|
]
|
|
else:
|
|
logging.info("🎥 Using direct MP4 ffmpeg mode")
|
|
download_cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i", video_url,
|
|
"-c", "copy",
|
|
temp_input,
|
|
]
|
|
|
|
download_result = subprocess.run(
|
|
download_cmd,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if download_result.returncode != 0:
|
|
logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}")
|
|
return None
|
|
|
|
if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0:
|
|
logging.error("❌ Downloaded video source file is missing or empty.")
|
|
return None
|
|
|
|
logging.info(f"✅ Video downloaded: {temp_input}")
|
|
|
|
video_clip = VideoFileClip(temp_input)
|
|
duration = float(video_clip.duration) if video_clip.duration else 0
|
|
|
|
if duration <= 0:
|
|
video_clip.close()
|
|
logging.error("❌ Downloaded video has invalid or unknown duration.")
|
|
return None
|
|
|
|
end_time = min(VIDEO_MAX_DURATION_SECONDS, duration)
|
|
|
|
if hasattr(video_clip, "subclipped"):
|
|
cropped_clip = video_clip.subclipped(0, end_time)
|
|
else:
|
|
cropped_clip = video_clip.subclip(0, end_time)
|
|
|
|
cropped_clip.write_videofile(
|
|
temp_output,
|
|
codec="libx264",
|
|
audio_codec="aac",
|
|
logger=None
|
|
)
|
|
|
|
video_clip.close()
|
|
cropped_clip.close()
|
|
|
|
if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0:
|
|
logging.error("❌ Cropped video output is missing or empty.")
|
|
return None
|
|
|
|
os.replace(temp_output, output_path)
|
|
logging.info(f"✅ Video cropped to {int(end_time)} seconds: {output_path}")
|
|
return output_path
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Error processing video: {e}")
|
|
return None
|
|
|
|
finally:
|
|
for path in [temp_input, temp_output]:
|
|
if os.path.exists(path):
|
|
try:
|
|
os.remove(path)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
|
|
"""
|
|
Multi-signal dedupe against recent Bluesky posts.
|
|
"""
|
|
candidate_non_x_urls = candidate["canonical_non_x_urls"]
|
|
candidate_text_media_key = candidate["text_media_key"]
|
|
candidate_normalized_text = candidate["normalized_text"]
|
|
|
|
for existing in recent_bsky_posts:
|
|
existing_non_x_urls = existing["canonical_non_x_urls"]
|
|
|
|
if (
|
|
candidate_non_x_urls and
|
|
candidate_non_x_urls == existing_non_x_urls and
|
|
candidate_normalized_text == existing["normalized_text"]
|
|
):
|
|
return True, "bsky:normalized_text_plus_non_x_urls"
|
|
|
|
if candidate_text_media_key == existing["text_media_key"]:
|
|
return True, "bsky:text_media_fingerprint"
|
|
|
|
if candidate_normalized_text == existing["normalized_text"]:
|
|
return True, "bsky:normalized_text"
|
|
|
|
return False, None
|
|
|
|
|
|
# --- Main Sync Function ---
|
|
def sync_feeds(args):
|
|
logging.info("🔄 Starting sync cycle...")
|
|
try:
|
|
state = load_state(STATE_PATH)
|
|
|
|
tweets = scrape_tweets_via_playwright(
|
|
args.twitter_username,
|
|
args.twitter_password,
|
|
args.twitter_email,
|
|
args.twitter_handle
|
|
)
|
|
|
|
if not tweets:
|
|
logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.")
|
|
return
|
|
|
|
bsky_client = Client()
|
|
bsky_client.login(args.bsky_handle, args.bsky_password)
|
|
|
|
recent_bsky_posts = get_recent_bsky_posts(
|
|
bsky_client,
|
|
args.bsky_handle,
|
|
limit=DEDUPE_BSKY_LIMIT
|
|
)
|
|
|
|
logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.")
|
|
logging.info(f"🧠 Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.")
|
|
|
|
too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS)
|
|
logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}")
|
|
|
|
candidate_tweets = []
|
|
|
|
for tweet in reversed(tweets):
|
|
try:
|
|
tweet_time = arrow.get(tweet.created_on)
|
|
|
|
if tweet_time < too_old_cutoff:
|
|
logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
|
|
continue
|
|
|
|
prepared_text = prepare_post_text(tweet.text)
|
|
normalized_text = normalize_post_text(prepared_text)
|
|
|
|
if not normalized_text:
|
|
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}")
|
|
continue
|
|
|
|
media_fingerprint = build_media_fingerprint(tweet)
|
|
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
|
|
|
|
canonical_non_x_urls = set()
|
|
for url in extract_non_x_urls_from_text(prepared_text):
|
|
canonical = canonicalize_url(url)
|
|
if canonical:
|
|
canonical_non_x_urls.add(canonical)
|
|
|
|
candidate_tweets.append({
|
|
"tweet": tweet,
|
|
"tweet_time": tweet_time,
|
|
"raw_text": prepared_text,
|
|
"normalized_text": normalized_text,
|
|
"media_fingerprint": media_fingerprint,
|
|
"text_media_key": text_media_key,
|
|
"canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url),
|
|
"canonical_non_x_urls": canonical_non_x_urls,
|
|
})
|
|
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}")
|
|
|
|
logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.")
|
|
|
|
tweets_to_post = []
|
|
for candidate in candidate_tweets:
|
|
is_dup_state, reason_state = candidate_matches_state(candidate, state)
|
|
if is_dup_state:
|
|
logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}")
|
|
continue
|
|
|
|
is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
|
|
if is_dup_bsky:
|
|
logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}")
|
|
continue
|
|
|
|
tweets_to_post.append(candidate)
|
|
|
|
logging.info(f"📬 {len(tweets_to_post)} tweets remain after duplicate filtering.")
|
|
|
|
if not tweets_to_post:
|
|
logging.info("✅ No new tweets need posting after duplicate comparison.")
|
|
return
|
|
|
|
new_posts = 0
|
|
browser_state_file = "twitter_browser_state.json"
|
|
|
|
with sync_playwright() as p, httpx.Client() as media_http_client:
|
|
browser = p.chromium.launch(
|
|
headless=True,
|
|
args=["--disable-blink-features=AutomationControlled"]
|
|
)
|
|
context_kwargs = {
|
|
"user_agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/145.0.7632.6 Safari/537.36"
|
|
),
|
|
"viewport": {"width": 1920, "height": 1080},
|
|
}
|
|
if os.path.exists(browser_state_file):
|
|
context_kwargs["storage_state"] = browser_state_file
|
|
|
|
context = browser.new_context(**context_kwargs)
|
|
|
|
for candidate in tweets_to_post:
|
|
tweet = candidate["tweet"]
|
|
tweet_time = candidate["tweet_time"]
|
|
raw_text = candidate["raw_text"]
|
|
|
|
logging.info(f"📝 Posting missing tweet from {tweet_time} to Bluesky...")
|
|
|
|
rich_text = make_rich(raw_text)
|
|
dynamic_alt = build_dynamic_alt(raw_text)
|
|
|
|
image_embeds = []
|
|
video_embed = None
|
|
media_upload_failures = []
|
|
|
|
if tweet.media:
|
|
for media in tweet.media:
|
|
if media.type == "photo":
|
|
blob = get_blob_from_url(media.media_url_https, bsky_client, media_http_client)
|
|
if blob:
|
|
image_embeds.append(
|
|
models.AppBskyEmbedImages.Image(
|
|
alt=dynamic_alt,
|
|
image=blob
|
|
)
|
|
)
|
|
else:
|
|
media_upload_failures.append(f"photo:{media.media_url_https}")
|
|
|
|
elif media.type == "video":
|
|
if not tweet.tweet_url:
|
|
logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.")
|
|
media_upload_failures.append("video:no_tweet_url")
|
|
continue
|
|
|
|
temp_video_path = "temp_video.mp4"
|
|
|
|
try:
|
|
real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url)
|
|
if not real_video_url:
|
|
logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}")
|
|
media_upload_failures.append(f"video:resolve_failed:{tweet.tweet_url}")
|
|
continue
|
|
|
|
cropped_video_path = download_and_crop_video(real_video_url, temp_video_path)
|
|
if not cropped_video_path:
|
|
logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}")
|
|
media_upload_failures.append(f"video:crop_failed:{tweet.tweet_url}")
|
|
continue
|
|
|
|
video_blob = get_blob_from_file(cropped_video_path, bsky_client)
|
|
if not video_blob:
|
|
logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}")
|
|
media_upload_failures.append(f"video:upload_failed:{tweet.tweet_url}")
|
|
continue
|
|
|
|
video_embed = build_video_embed(video_blob, dynamic_alt)
|
|
if not video_embed:
|
|
media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}")
|
|
|
|
finally:
|
|
if os.path.exists(temp_video_path):
|
|
os.remove(temp_video_path)
|
|
|
|
try:
|
|
post_result = None
|
|
post_mode = "text"
|
|
|
|
if video_embed:
|
|
post_result = bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"])
|
|
post_mode = "video"
|
|
elif image_embeds:
|
|
embed = models.AppBskyEmbedImages.Main(images=image_embeds)
|
|
post_result = bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"])
|
|
post_mode = f"images:{len(image_embeds)}"
|
|
else:
|
|
post_result = bsky_client.send_post(text=rich_text, langs=["ca"])
|
|
post_mode = "text_only"
|
|
|
|
bsky_uri = getattr(post_result, "uri", None)
|
|
|
|
remember_posted_tweet(state, candidate, bsky_uri=bsky_uri)
|
|
state = prune_state(state, max_entries=5000)
|
|
save_state(state, STATE_PATH)
|
|
|
|
recent_bsky_posts.insert(0, {
|
|
"uri": bsky_uri,
|
|
"text": raw_text,
|
|
"normalized_text": candidate["normalized_text"],
|
|
"canonical_non_x_urls": candidate["canonical_non_x_urls"],
|
|
"media_fingerprint": candidate["media_fingerprint"],
|
|
"text_media_key": candidate["text_media_key"],
|
|
"created_at": arrow.utcnow().isoformat(),
|
|
})
|
|
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
|
|
|
|
new_posts += 1
|
|
|
|
if media_upload_failures:
|
|
logging.warning(
|
|
f"✅ Posted tweet to Bluesky with degraded media mode ({post_mode}). "
|
|
f"Failed media items: {media_upload_failures}"
|
|
)
|
|
else:
|
|
logging.info(f"✅ Posted new tweet to Bluesky with mode {post_mode}: {raw_text}")
|
|
|
|
time.sleep(5)
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Failed to post tweet to Bluesky: {e}")
|
|
|
|
browser.close()
|
|
|
|
logging.info(f"✅ Sync complete. Posted {new_posts} new updates.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Error during sync cycle: {e}")
|
|
|
|
|
|
# --- Main Execution ---
|
|
def main():
|
|
load_dotenv()
|
|
|
|
parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync")
|
|
parser.add_argument("--twitter-username", help="Your Twitter login username")
|
|
parser.add_argument("--twitter-password", help="Your Twitter login password")
|
|
parser.add_argument("--twitter-email", help="Your Twitter email for security challenges")
|
|
parser.add_argument("--twitter-handle", help="The Twitter account to scrape")
|
|
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
|
|
parser.add_argument("--bsky-password", help="Your Bluesky app password")
|
|
|
|
args = parser.parse_args()
|
|
|
|
args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME")
|
|
args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD")
|
|
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
|
|
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
|
|
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
|
|
args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username
|
|
|
|
missing_args = []
|
|
if not args.twitter_username:
|
|
missing_args.append("--twitter-username")
|
|
if not args.twitter_password:
|
|
missing_args.append("--twitter-password")
|
|
if not args.bsky_handle:
|
|
missing_args.append("--bsky-handle")
|
|
if not args.bsky_password:
|
|
missing_args.append("--bsky-password")
|
|
|
|
if missing_args:
|
|
logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}")
|
|
return
|
|
|
|
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
|
|
sync_feeds(args)
|
|
logging.info("🤖 Bot finished.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|