Files
post2bsky/twitter2bsky_daemon.py
Guillem Hernandez Sola f85457a045 Added new yml
2026-03-30 17:38:35 +02:00

630 lines
22 KiB
Python

import argparse
import arrow
import logging
import re
import httpx
import time
import os
import subprocess
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
# --- Logging Setup ---
LOG_PATH = "twitter2bsky.log"
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
level=logging.INFO,
)
# --- Custom Classes ---
class ScrapedMedia:
def __init__(self, url, media_type="photo"):
self.type = media_type
self.media_url_https = url
class ScrapedTweet:
def __init__(self, created_on, text, media_urls, tweet_url=None):
self.created_on = created_on
self.text = text
self.tweet_url = tweet_url
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
# --- Helpers ---
def take_error_screenshot(page, error_msg):
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
timestamp = time.strftime("%Y%m%d_%H%M%S")
screenshot_name = f"screenshot_{timestamp}.png"
page.screenshot(path=screenshot_name)
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
def is_valid_url(url):
try:
response = httpx.head(url, timeout=5, follow_redirects=True)
return response.status_code < 500
except Exception:
return False
def clean_url(url):
trimmed_url = url.strip()
cleaned_url = re.sub(r"\s+", "", trimmed_url)
cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url)
if is_valid_url(cleaned_url):
return cleaned_url
return None
def get_blob_from_url(media_url, client):
"""Fetches remote media and uploads it to Bluesky."""
try:
r = httpx.get(media_url, timeout=30, follow_redirects=True)
if r.status_code == 200:
return client.upload_blob(r.content).blob
except Exception as e:
logging.warning(f"Could not fetch media {media_url}: {e}")
return None
def get_blob_from_file(file_path, client):
"""Uploads a local file to Bluesky."""
try:
with open(file_path, "rb") as f:
return client.upload_blob(f.read()).blob
except Exception as e:
logging.warning(f"Could not upload local file {file_path}: {e}")
return None
def get_last_bsky(client, handle):
timeline = client.get_author_feed(handle)
for titem in timeline.feed:
if titem.reason is None and getattr(titem.post.record, "reply", None) is None:
return arrow.get(titem.post.record.created_at)
return arrow.get(0)
def make_rich(content):
text_builder = client_utils.TextBuilder()
def repair_url(match):
raw = match.group(0)
if "\n" not in raw and "\r" not in raw:
return re.sub(r"[…\.]+$", "", raw)
glued = raw.replace("\n", "").replace("\r", "")
test_url = re.sub(r"[…\.]+$", "", glued)
if is_valid_url(test_url):
return test_url
parts = raw.split("\n")
test_part0 = re.sub(r"[…\.]+$", "", parts[0])
if is_valid_url(test_part0):
return raw
return test_url
content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip())
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("http://") or word.startswith("https://"):
if word.startswith("http://"):
word = word.replace("http://", "https://", 1)
word = re.sub(r"[…\.]+$", "", word)
clean_url_value = clean_url(word)
if clean_url_value and is_valid_url(clean_url_value):
text_builder.link(clean_url_value, clean_url_value)
else:
text_builder.text(word)
elif word.startswith("#"):
clean_tag = word[1:].rstrip(".,;:!?)'\"")
text_builder.tag(word, clean_tag)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
def build_dynamic_alt(raw_text):
dynamic_alt = raw_text.replace("\n", " ").strip()
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip()
if len(dynamic_alt) > 150:
dynamic_alt = dynamic_alt[:147] + "..."
elif not dynamic_alt:
dynamic_alt = "Vídeo o imatge adjunta al tuit"
return dynamic_alt
def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(
video=video_blob,
alt=alt_text
)
except AttributeError:
logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
return None
# --- Playwright Scraping ---
def scrape_tweets_via_playwright(username, password, email, target_handle):
"""Logs in (or loads session) and scrapes tweets directly from the DOM."""
tweets = []
state_file = "twitter_browser_state.json"
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"]
)
clean_ua = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
)
context = None
needs_login = True
if os.path.exists(state_file):
logging.info("✅ Found existing browser state. Attempting to bypass login...")
context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080},
storage_state=state_file
)
page = context.new_page()
page.goto("https://x.com/home")
time.sleep(4)
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
logging.info("✅ Session is valid!")
needs_login = False
else:
logging.warning("⚠️ Saved session expired or invalid. Re-logging in...")
context.close()
os.remove(state_file)
if needs_login:
logging.info("🚀 Launching fresh browser for automated Twitter login...")
context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080}
)
page = context.new_page()
try:
page.goto("https://x.com")
sign_in_button = page.get_by_text("Sign in", exact=True)
sign_in_button.wait_for(state="visible", timeout=15000)
sign_in_button.click(force=True)
page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000)
logging.info(f"👤 Entering username: {username}...")
time.sleep(1)
username_input = page.locator('input[autocomplete="username"]')
username_input.wait_for(state="visible", timeout=15000)
username_input.click(force=True)
username_input.press_sequentially(username, delay=100)
page.locator('button:has-text("Next")').first.click(force=True)
page.wait_for_selector(
'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]',
timeout=15000
)
time.sleep(1)
if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible():
logging.warning("🛡️ Security challenge detected! Entering email/phone...")
page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email)
sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first
if sec_next.is_visible():
sec_next.click(force=True)
else:
page.keyboard.press("Enter")
page.wait_for_selector('input[name="password"]', timeout=15000)
time.sleep(1)
logging.info("🔑 Entering password...")
page.fill('input[name="password"]', password)
page.locator('span:has-text("Log in")').first.click()
page.wait_for_url("**/home", timeout=20000)
time.sleep(3)
context.storage_state(path=state_file)
logging.info("✅ Login successful. Browser state saved.")
except Exception as e:
take_error_screenshot(page, "login_failed")
logging.error(f"❌ Login failed: {e}")
browser.close()
return []
logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...")
page = context.new_page()
page.goto(f"https://x.com/{target_handle}")
try:
page.wait_for_selector("article", timeout=20000)
time.sleep(3)
articles = page.locator("article").all()
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing...")
for article in articles[:10]:
try:
time_el = article.locator("time").first
if not time_el.is_visible():
continue
created_at = time_el.get_attribute("datetime")
tweet_url = None
time_link = article.locator("a:has(time)").first
if time_link.is_visible():
href = time_link.get_attribute("href")
if href:
tweet_url = f"https://x.com{href}" if href.startswith("/") else href
text_locator = article.locator('[data-testid="tweetText"]').first
text = text_locator.inner_text() if text_locator.is_visible() else ""
media_urls = []
photo_locators = article.locator('[data-testid="tweetPhoto"] img').all()
for img in photo_locators:
src = img.get_attribute("src")
if src:
src = re.sub(r"&name=\w+", "&name=large", src)
media_urls.append((src, "photo"))
video_locators = article.locator('[data-testid="videoPlayer"]').all()
if video_locators:
media_urls.append((tweet_url or "", "video"))
tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url))
except Exception as e:
logging.warning(f"⚠️ Failed to parse a specific tweet: {e}")
continue
except Exception as e:
take_error_screenshot(page, "scrape_failed")
logging.error(f"❌ Failed to scrape profile: {e}")
browser.close()
return tweets
def extract_video_url_from_tweet_page(context, tweet_url):
"""
Opens a tweet page and captures the best real video URL.
Preference order:
1. .mp4
2. .m3u8
Ignores .m4s fragment files.
"""
page = context.new_page()
best_mp4_url = None
best_m3u8_url = None
def handle_response(response):
nonlocal best_mp4_url, best_m3u8_url
try:
url = response.url.lower()
content_type = response.headers.get("content-type", "").lower()
if ".m4s" in url:
return
if ".mp4" in url or "video/mp4" in content_type:
if best_mp4_url is None:
best_mp4_url = response.url
logging.info(f"🎥 Found MP4 video URL: {response.url}")
return
if ".m3u8" in url or "application/vnd.apple.mpegurl" in content_type or "application/x-mpegurl" in content_type:
if best_m3u8_url is None:
best_m3u8_url = response.url
logging.info(f"📺 Found HLS playlist URL: {response.url}")
return
except Exception:
pass
page.on("response", handle_response)
try:
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
page.goto(tweet_url, wait_until="networkidle", timeout=30000)
time.sleep(5)
video_player = page.locator('[data-testid="videoPlayer"]').first
if video_player.count() > 0:
try:
video_player.click(force=True, timeout=3000)
time.sleep(5)
except Exception:
pass
return best_mp4_url or best_m3u8_url
except Exception as e:
logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}")
return None
finally:
page.close()
# --- Video Processing ---
def download_and_crop_video(video_url, output_path):
"""
Downloads a video from MP4 or HLS (.m3u8), then trims it to max 59 seconds.
Requires ffmpeg installed on the system.
"""
temp_input = output_path.replace(".mp4", "_source.mp4")
try:
logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}")
download_cmd = [
"ffmpeg",
"-y",
"-i", video_url,
"-c", "copy",
temp_input,
]
download_result = subprocess.run(
download_cmd,
capture_output=True,
text=True
)
if download_result.returncode != 0:
logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}")
return None
logging.info(f"✅ Video downloaded: {temp_input}")
video_clip = VideoFileClip(temp_input)
end_time = min(59, float(video_clip.duration))
if hasattr(video_clip, "subclipped"):
cropped_clip = video_clip.subclipped(0, end_time)
else:
cropped_clip = video_clip.subclip(0, end_time)
temp_output = output_path.replace(".mp4", "_cropped.mp4")
cropped_clip.write_videofile(
temp_output,
codec="libx264",
audio_codec="aac",
logger=None
)
video_clip.close()
cropped_clip.close()
os.replace(temp_output, output_path)
if os.path.exists(temp_input):
os.remove(temp_input)
logging.info(f"✅ Video cropped to 59 seconds: {output_path}")
return output_path
except Exception as e:
logging.error(f"❌ Error processing video: {e}")
if os.path.exists(temp_input):
os.remove(temp_input)
return None
# --- Main Sync Function ---
def sync_feeds(args):
logging.info("🔄 Starting sync cycle...")
try:
tweets = scrape_tweets_via_playwright(
args.twitter_username,
args.twitter_password,
args.twitter_email,
args.twitter_handle
)
if not tweets:
logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.")
return
bsky_client = Client()
bsky_client.login(args.bsky_handle, args.bsky_password)
last_bsky_time = get_last_bsky(bsky_client, args.bsky_handle)
new_posts = 0
state_file = "twitter_browser_state.json"
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"]
)
context_kwargs = {
"user_agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
"viewport": {"width": 1920, "height": 1080},
}
if os.path.exists(state_file):
context_kwargs["storage_state"] = state_file
context = browser.new_context(**context_kwargs)
for tweet in reversed(tweets):
tweet_time = arrow.get(tweet.created_on)
if tweet_time <= last_bsky_time:
continue
logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...")
raw_text = tweet.text.strip()
if len(raw_text) > 295:
truncated = raw_text[:290]
last_space = truncated.rfind(" ")
if last_space > 0:
raw_text = truncated[:last_space] + "..."
else:
raw_text = truncated + "..."
logging.info("✂️ Tweet exceeded 300 characters. Truncated safely for Bluesky.")
rich_text = make_rich(raw_text)
dynamic_alt = build_dynamic_alt(raw_text)
image_embeds = []
video_embed = None
if tweet.media:
for media in tweet.media:
if media.type == "photo":
blob = get_blob_from_url(media.media_url_https, bsky_client)
if blob:
image_embeds.append(
models.AppBskyEmbedImages.Image(
alt=dynamic_alt,
image=blob
)
)
elif media.type == "video":
if not tweet.tweet_url:
logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.")
continue
temp_video_path = "temp_video.mp4"
try:
real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url)
if not real_video_url:
logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}")
continue
cropped_video_path = download_and_crop_video(real_video_url, temp_video_path)
if not cropped_video_path:
logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}")
continue
video_blob = get_blob_from_file(cropped_video_path, bsky_client)
if not video_blob:
logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}")
continue
video_embed = build_video_embed(video_blob, dynamic_alt)
finally:
if os.path.exists(temp_video_path):
os.remove(temp_video_path)
try:
if video_embed:
bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"])
elif image_embeds:
embed = models.AppBskyEmbedImages.Main(images=image_embeds)
bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"])
else:
bsky_client.send_post(text=rich_text, langs=["ca"])
new_posts += 1
logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}")
time.sleep(5)
except Exception as e:
logging.error(f"❌ Failed to post tweet to Bluesky: {e}")
browser.close()
logging.info(f"✅ Sync complete. Posted {new_posts} new updates.")
except Exception as e:
logging.error(f"❌ Error during sync cycle: {e}")
# --- Main Execution ---
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync")
parser.add_argument("--twitter-username", help="Your Twitter login username")
parser.add_argument("--twitter-password", help="Your Twitter login password")
parser.add_argument("--twitter-email", help="Your Twitter email for security challenges")
parser.add_argument("--twitter-handle", help="The Twitter account to scrape")
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
parser.add_argument("--bsky-password", help="Your Bluesky app password")
args = parser.parse_args()
args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME")
args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD")
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username
missing_args = []
if not args.twitter_username:
missing_args.append("--twitter-username")
if not args.twitter_password:
missing_args.append("--twitter-password")
if not args.bsky_handle:
missing_args.append("--bsky-handle")
if not args.bsky_password:
missing_args.append("--bsky-password")
if missing_args:
logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}")
return
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
sync_feeds(args)
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()