Added new yml
This commit is contained in:
@@ -18,19 +18,22 @@ logging.basicConfig(
|
|||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
)
|
)
|
||||||
|
|
||||||
# --- Custom Classes to replace Tweety ---
|
# --- Custom Classes ---
|
||||||
class ScrapedMedia:
|
class ScrapedMedia:
|
||||||
def __init__(self, url, media_type="photo"):
|
def __init__(self, url, media_type="photo"):
|
||||||
self.type = media_type # Type can be "photo" or "video"
|
self.type = media_type
|
||||||
self.media_url_https = url
|
self.media_url_https = url
|
||||||
|
|
||||||
|
|
||||||
class ScrapedTweet:
|
class ScrapedTweet:
|
||||||
def __init__(self, created_on, text, media_urls):
|
def __init__(self, created_on, text, media_urls, tweet_url=None):
|
||||||
self.created_on = created_on
|
self.created_on = created_on
|
||||||
self.text = text
|
self.text = text
|
||||||
|
self.tweet_url = tweet_url
|
||||||
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
|
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
|
||||||
|
|
||||||
# --- 1. Playwright Scraping Logic ---
|
|
||||||
|
# --- Helpers ---
|
||||||
def take_error_screenshot(page, error_msg):
|
def take_error_screenshot(page, error_msg):
|
||||||
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
|
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
|
||||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||||
@@ -38,31 +41,163 @@ def take_error_screenshot(page, error_msg):
|
|||||||
page.screenshot(path=screenshot_name)
|
page.screenshot(path=screenshot_name)
|
||||||
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
|
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_url(url):
|
||||||
|
try:
|
||||||
|
response = httpx.head(url, timeout=5, follow_redirects=True)
|
||||||
|
return response.status_code < 500
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def clean_url(url):
|
||||||
|
trimmed_url = url.strip()
|
||||||
|
cleaned_url = re.sub(r"\s+", "", trimmed_url)
|
||||||
|
cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url)
|
||||||
|
|
||||||
|
if is_valid_url(cleaned_url):
|
||||||
|
return cleaned_url
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_blob_from_url(media_url, client):
|
||||||
|
"""Fetches remote media and uploads it to Bluesky."""
|
||||||
|
try:
|
||||||
|
r = httpx.get(media_url, timeout=30, follow_redirects=True)
|
||||||
|
if r.status_code == 200:
|
||||||
|
return client.upload_blob(r.content).blob
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Could not fetch media {media_url}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_blob_from_file(file_path, client):
|
||||||
|
"""Uploads a local file to Bluesky."""
|
||||||
|
try:
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
return client.upload_blob(f.read()).blob
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Could not upload local file {file_path}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_last_bsky(client, handle):
|
||||||
|
timeline = client.get_author_feed(handle)
|
||||||
|
for titem in timeline.feed:
|
||||||
|
if titem.reason is None and getattr(titem.post.record, "reply", None) is None:
|
||||||
|
return arrow.get(titem.post.record.created_at)
|
||||||
|
return arrow.get(0)
|
||||||
|
|
||||||
|
|
||||||
|
def make_rich(content):
|
||||||
|
text_builder = client_utils.TextBuilder()
|
||||||
|
|
||||||
|
def repair_url(match):
|
||||||
|
raw = match.group(0)
|
||||||
|
|
||||||
|
if "\n" not in raw and "\r" not in raw:
|
||||||
|
return re.sub(r"[…\.]+$", "", raw)
|
||||||
|
|
||||||
|
glued = raw.replace("\n", "").replace("\r", "")
|
||||||
|
test_url = re.sub(r"[…\.]+$", "", glued)
|
||||||
|
|
||||||
|
if is_valid_url(test_url):
|
||||||
|
return test_url
|
||||||
|
|
||||||
|
parts = raw.split("\n")
|
||||||
|
test_part0 = re.sub(r"[…\.]+$", "", parts[0])
|
||||||
|
if is_valid_url(test_part0):
|
||||||
|
return raw
|
||||||
|
|
||||||
|
return test_url
|
||||||
|
|
||||||
|
content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip())
|
||||||
|
lines = content.splitlines()
|
||||||
|
|
||||||
|
for line_idx, line in enumerate(lines):
|
||||||
|
if not line.strip():
|
||||||
|
if line_idx < len(lines) - 1:
|
||||||
|
text_builder.text("\n")
|
||||||
|
continue
|
||||||
|
|
||||||
|
words = line.split(" ")
|
||||||
|
for i, word in enumerate(words):
|
||||||
|
if not word:
|
||||||
|
if i < len(words) - 1:
|
||||||
|
text_builder.text(" ")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if word.startswith("http://") or word.startswith("https://"):
|
||||||
|
if word.startswith("http://"):
|
||||||
|
word = word.replace("http://", "https://", 1)
|
||||||
|
|
||||||
|
word = re.sub(r"[…\.]+$", "", word)
|
||||||
|
clean_url_value = clean_url(word)
|
||||||
|
|
||||||
|
if clean_url_value and is_valid_url(clean_url_value):
|
||||||
|
text_builder.link(clean_url_value, clean_url_value)
|
||||||
|
else:
|
||||||
|
text_builder.text(word)
|
||||||
|
|
||||||
|
elif word.startswith("#"):
|
||||||
|
clean_tag = word[1:].rstrip(".,;:!?)'\"…")
|
||||||
|
text_builder.tag(word, clean_tag)
|
||||||
|
|
||||||
|
else:
|
||||||
|
text_builder.text(word)
|
||||||
|
|
||||||
|
if i < len(words) - 1:
|
||||||
|
text_builder.text(" ")
|
||||||
|
|
||||||
|
if line_idx < len(lines) - 1:
|
||||||
|
text_builder.text("\n")
|
||||||
|
|
||||||
|
return text_builder
|
||||||
|
|
||||||
|
|
||||||
|
def build_dynamic_alt(raw_text):
|
||||||
|
dynamic_alt = raw_text.replace("\n", " ").strip()
|
||||||
|
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip()
|
||||||
|
|
||||||
|
if len(dynamic_alt) > 150:
|
||||||
|
dynamic_alt = dynamic_alt[:147] + "..."
|
||||||
|
elif not dynamic_alt:
|
||||||
|
dynamic_alt = "Vídeo o imatge adjunta al tuit"
|
||||||
|
|
||||||
|
return dynamic_alt
|
||||||
|
|
||||||
|
|
||||||
|
# --- Playwright Scraping ---
|
||||||
def scrape_tweets_via_playwright(username, password, email, target_handle):
|
def scrape_tweets_via_playwright(username, password, email, target_handle):
|
||||||
"""Logs in (or loads session) and scrapes tweets directly from the DOM."""
|
"""Logs in (or loads session) and scrapes tweets directly from the DOM."""
|
||||||
tweets = []
|
tweets = []
|
||||||
state_file = "twitter_browser_state.json"
|
state_file = "twitter_browser_state.json"
|
||||||
|
|
||||||
with sync_playwright() as p:
|
with sync_playwright() as p:
|
||||||
browser = p.chromium.launch(headless=True, args=["--disable-blink-features=AutomationControlled"])
|
browser = p.chromium.launch(
|
||||||
clean_ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.7632.6 Safari/537.36"
|
headless=True,
|
||||||
|
args=["--disable-blink-features=AutomationControlled"]
|
||||||
|
)
|
||||||
|
clean_ua = (
|
||||||
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/145.0.7632.6 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
context = None
|
context = None
|
||||||
needs_login = True
|
needs_login = True
|
||||||
|
|
||||||
# 1. Try to load existing session
|
|
||||||
if os.path.exists(state_file):
|
if os.path.exists(state_file):
|
||||||
logging.info("✅ Found existing browser state. Attempting to bypass login...")
|
logging.info("✅ Found existing browser state. Attempting to bypass login...")
|
||||||
context = browser.new_context(
|
context = browser.new_context(
|
||||||
user_agent=clean_ua,
|
user_agent=clean_ua,
|
||||||
viewport={'width': 1920, 'height': 1080},
|
viewport={"width": 1920, "height": 1080},
|
||||||
storage_state=state_file
|
storage_state=state_file
|
||||||
)
|
)
|
||||||
page = context.new_page()
|
page = context.new_page()
|
||||||
page.goto("https://x.com/home")
|
page.goto("https://x.com/home")
|
||||||
time.sleep(4)
|
time.sleep(4)
|
||||||
|
|
||||||
# Check if we are actually logged in
|
|
||||||
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
|
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
|
||||||
logging.info("✅ Session is valid!")
|
logging.info("✅ Session is valid!")
|
||||||
needs_login = False
|
needs_login = False
|
||||||
@@ -70,35 +205,37 @@ def scrape_tweets_via_playwright(username, password, email, target_handle):
|
|||||||
logging.warning("⚠️ Saved session expired or invalid. Re-logging in...")
|
logging.warning("⚠️ Saved session expired or invalid. Re-logging in...")
|
||||||
context.close()
|
context.close()
|
||||||
os.remove(state_file)
|
os.remove(state_file)
|
||||||
|
|
||||||
# 2. Perform Login if needed
|
|
||||||
if needs_login:
|
if needs_login:
|
||||||
logging.info("🚀 Launching fresh browser for automated Twitter login...")
|
logging.info("🚀 Launching fresh browser for automated Twitter login...")
|
||||||
context = browser.new_context(
|
context = browser.new_context(
|
||||||
user_agent=clean_ua,
|
user_agent=clean_ua,
|
||||||
viewport={'width': 1920, 'height': 1080}
|
viewport={"width": 1920, "height": 1080}
|
||||||
)
|
)
|
||||||
page = context.new_page()
|
page = context.new_page()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
page.goto("https://x.com")
|
page.goto("https://x.com")
|
||||||
sign_in_button = page.get_by_text("Sign in", exact=True)
|
sign_in_button = page.get_by_text("Sign in", exact=True)
|
||||||
sign_in_button.wait_for(state="visible", timeout=15000)
|
sign_in_button.wait_for(state="visible", timeout=15000)
|
||||||
sign_in_button.click(force=True)
|
sign_in_button.click(force=True)
|
||||||
|
|
||||||
page.wait_for_selector('h1:has-text("Sign in to X")', state='visible', timeout=25000)
|
page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000)
|
||||||
logging.info(f"👤 Entering username: {username}...")
|
logging.info(f"👤 Entering username: {username}...")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
username_input = page.locator('input[autocomplete="username"]')
|
username_input = page.locator('input[autocomplete="username"]')
|
||||||
username_input.wait_for(state="visible", timeout=15000)
|
username_input.wait_for(state="visible", timeout=15000)
|
||||||
username_input.click(force=True)
|
username_input.click(force=True)
|
||||||
username_input.press_sequentially(username, delay=100)
|
username_input.press_sequentially(username, delay=100)
|
||||||
|
|
||||||
page.locator('button:has-text("Next")').first.click(force=True)
|
page.locator('button:has-text("Next")').first.click(force=True)
|
||||||
page.wait_for_selector('input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', timeout=15000)
|
page.wait_for_selector(
|
||||||
|
'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]',
|
||||||
|
timeout=15000
|
||||||
|
)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible():
|
if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible():
|
||||||
logging.warning("🛡️ Security challenge detected! Entering email/phone...")
|
logging.warning("🛡️ Security challenge detected! Entering email/phone...")
|
||||||
page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email)
|
page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email)
|
||||||
@@ -113,250 +250,223 @@ def scrape_tweets_via_playwright(username, password, email, target_handle):
|
|||||||
logging.info("🔑 Entering password...")
|
logging.info("🔑 Entering password...")
|
||||||
page.fill('input[name="password"]', password)
|
page.fill('input[name="password"]', password)
|
||||||
page.locator('span:has-text("Log in")').first.click()
|
page.locator('span:has-text("Log in")').first.click()
|
||||||
|
|
||||||
page.wait_for_url("**/home", timeout=20000)
|
page.wait_for_url("**/home", timeout=20000)
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
|
||||||
# Save state for next time
|
|
||||||
context.storage_state(path=state_file)
|
context.storage_state(path=state_file)
|
||||||
logging.info("✅ Login successful. Browser state saved.")
|
logging.info("✅ Login successful. Browser state saved.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
take_error_screenshot(page, "login_failed")
|
take_error_screenshot(page, "login_failed")
|
||||||
logging.error(f"❌ Login failed: {e}")
|
logging.error(f"❌ Login failed: {e}")
|
||||||
browser.close()
|
browser.close()
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# 3. Scrape the target profile
|
|
||||||
logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...")
|
logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...")
|
||||||
|
page = context.new_page()
|
||||||
page.goto(f"https://x.com/{target_handle}")
|
page.goto(f"https://x.com/{target_handle}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
page.wait_for_selector('article', timeout=20000)
|
page.wait_for_selector("article", timeout=20000)
|
||||||
time.sleep(3) # Let DOM settle and images load
|
time.sleep(3)
|
||||||
|
|
||||||
articles = page.locator('article').all()
|
articles = page.locator("article").all()
|
||||||
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing...")
|
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing...")
|
||||||
|
|
||||||
for article in articles[:10]: # Check top 10 tweets
|
for article in articles[:10]:
|
||||||
try:
|
try:
|
||||||
# Get Time
|
time_el = article.locator("time").first
|
||||||
time_el = article.locator('time').first
|
|
||||||
if not time_el.is_visible():
|
if not time_el.is_visible():
|
||||||
continue # Skip ads or invalid articles
|
continue
|
||||||
created_at = time_el.get_attribute('datetime')
|
|
||||||
|
created_at = time_el.get_attribute("datetime")
|
||||||
# Get Text
|
|
||||||
|
tweet_url = None
|
||||||
|
time_link = article.locator("a:has(time)").first
|
||||||
|
if time_link.is_visible():
|
||||||
|
href = time_link.get_attribute("href")
|
||||||
|
if href:
|
||||||
|
tweet_url = f"https://x.com{href}" if href.startswith("/") else href
|
||||||
|
|
||||||
text_locator = article.locator('[data-testid="tweetText"]').first
|
text_locator = article.locator('[data-testid="tweetText"]').first
|
||||||
text = text_locator.inner_text() if text_locator.is_visible() else ""
|
text = text_locator.inner_text() if text_locator.is_visible() else ""
|
||||||
|
|
||||||
# Get Media URLs
|
|
||||||
media_urls = []
|
media_urls = []
|
||||||
|
|
||||||
photo_locators = article.locator('[data-testid="tweetPhoto"] img').all()
|
photo_locators = article.locator('[data-testid="tweetPhoto"] img').all()
|
||||||
for img in photo_locators:
|
for img in photo_locators:
|
||||||
src = img.get_attribute('src')
|
src = img.get_attribute("src")
|
||||||
if src:
|
if src:
|
||||||
src = re.sub(r'&name=\w+', '&name=large', src)
|
src = re.sub(r"&name=\w+", "&name=large", src)
|
||||||
media_urls.append((src, "photo"))
|
media_urls.append((src, "photo"))
|
||||||
|
|
||||||
# Get Video URLs
|
|
||||||
video_locators = article.locator('[data-testid="videoPlayer"]').all()
|
video_locators = article.locator('[data-testid="videoPlayer"]').all()
|
||||||
for video in video_locators:
|
if video_locators:
|
||||||
video_url = video.get_attribute('src')
|
media_urls.append((tweet_url or "", "video"))
|
||||||
if video_url:
|
|
||||||
media_urls.append((video_url, "video"))
|
tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url))
|
||||||
|
|
||||||
tweets.append(ScrapedTweet(created_at, text, media_urls))
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.warning(f"⚠️ Failed to parse a specific tweet: {e}")
|
logging.warning(f"⚠️ Failed to parse a specific tweet: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
take_error_screenshot(page, "scrape_failed")
|
take_error_screenshot(page, "scrape_failed")
|
||||||
logging.error(f"❌ Failed to scrape profile: {e}")
|
logging.error(f"❌ Failed to scrape profile: {e}")
|
||||||
|
|
||||||
browser.close()
|
browser.close()
|
||||||
return tweets
|
return tweets
|
||||||
|
|
||||||
# --- 2. URL Validation Function ---
|
|
||||||
def is_valid_url(url):
|
def extract_video_url_from_tweet_page(context, tweet_url):
|
||||||
|
"""
|
||||||
|
Opens a tweet page and captures the first real MP4 video request.
|
||||||
|
"""
|
||||||
|
page = context.new_page()
|
||||||
|
found_video_url = None
|
||||||
|
|
||||||
|
def handle_response(response):
|
||||||
|
nonlocal found_video_url
|
||||||
|
try:
|
||||||
|
url = response.url
|
||||||
|
content_type = response.headers.get("content-type", "")
|
||||||
|
|
||||||
|
if found_video_url:
|
||||||
|
return
|
||||||
|
|
||||||
|
if ".mp4" in url or "video/mp4" in content_type:
|
||||||
|
found_video_url = url
|
||||||
|
logging.info(f"🎥 Found video URL: {url}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
page.on("response", handle_response)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Follow redirects and accept standard anti-bot codes (like 403) so we don't discard real news links
|
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
|
||||||
response = httpx.head(url, timeout=5, follow_redirects=True)
|
page.goto(tweet_url, wait_until="networkidle", timeout=30000)
|
||||||
return response.status_code < 500
|
time.sleep(5)
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# --- 3. URL Cleaning Function ---
|
video_player = page.locator('[data-testid="videoPlayer"]').first
|
||||||
def clean_url(url):
|
if video_player.count() > 0:
|
||||||
trimmed_url = url.strip()
|
try:
|
||||||
cleaned_url = re.sub(r'\s+', '', trimmed_url)
|
video_player.click(force=True, timeout=3000)
|
||||||
|
time.sleep(3)
|
||||||
# Strip trailing ellipsis (unicode '…' or ascii '...') and trailing periods
|
except Exception:
|
||||||
cleaned_url = re.sub(r'[…\.]+$', '', cleaned_url)
|
pass
|
||||||
|
|
||||||
if is_valid_url(cleaned_url):
|
|
||||||
return cleaned_url
|
|
||||||
return None
|
|
||||||
|
|
||||||
# --- 4. Video Processing ---
|
return found_video_url
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}")
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
page.close()
|
||||||
|
|
||||||
|
|
||||||
|
# --- Video Processing ---
|
||||||
def download_and_crop_video(video_url, output_path):
|
def download_and_crop_video(video_url, output_path):
|
||||||
"""Downloads the video and crops it to 59 seconds."""
|
"""Downloads the video and crops it to max 59 seconds."""
|
||||||
try:
|
try:
|
||||||
# Download the video
|
response = httpx.get(video_url, timeout=60, follow_redirects=True)
|
||||||
response = httpx.get(video_url, timeout=10)
|
if response.status_code != 200:
|
||||||
if response.status_code == 200:
|
logging.error(f"❌ Failed to download video: {video_url} (status {response.status_code})")
|
||||||
with open(output_path, 'wb') as f:
|
|
||||||
f.write(response.content)
|
|
||||||
logging.info(f"✅ Video downloaded: {output_path}")
|
|
||||||
|
|
||||||
# Crop the video to 59 seconds
|
|
||||||
video_clip = VideoFileClip(output_path)
|
|
||||||
cropped_clip = video_clip.subclip(0, min(59, video_clip.duration))
|
|
||||||
cropped_clip.write_videofile(output_path, codec='libx264')
|
|
||||||
logging.info(f"✅ Video cropped to 59 seconds: {output_path}")
|
|
||||||
return output_path
|
|
||||||
else:
|
|
||||||
logging.error(f"❌ Failed to download video: {video_url}")
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
with open(output_path, "wb") as f:
|
||||||
|
f.write(response.content)
|
||||||
|
logging.info(f"✅ Video downloaded: {output_path}")
|
||||||
|
|
||||||
|
video_clip = VideoFileClip(output_path)
|
||||||
|
end_time = min(59, float(video_clip.duration))
|
||||||
|
|
||||||
|
if hasattr(video_clip, "subclipped"):
|
||||||
|
cropped_clip = video_clip.subclipped(0, end_time)
|
||||||
|
else:
|
||||||
|
cropped_clip = video_clip.subclip(0, end_time)
|
||||||
|
|
||||||
|
temp_output = output_path.replace(".mp4", "_cropped.mp4")
|
||||||
|
cropped_clip.write_videofile(
|
||||||
|
temp_output,
|
||||||
|
codec="libx264",
|
||||||
|
audio_codec="aac",
|
||||||
|
logger=None
|
||||||
|
)
|
||||||
|
|
||||||
|
video_clip.close()
|
||||||
|
cropped_clip.close()
|
||||||
|
|
||||||
|
os.replace(temp_output, output_path)
|
||||||
|
logging.info(f"✅ Video cropped to 59 seconds: {output_path}")
|
||||||
|
return output_path
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"❌ Error processing video: {e}")
|
logging.error(f"❌ Error processing video: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# --- 5. Formatting & Bluesky Logic ---
|
|
||||||
def get_last_bsky(client, handle):
|
|
||||||
timeline = client.get_author_feed(handle)
|
|
||||||
for titem in timeline.feed:
|
|
||||||
if titem.reason is None and getattr(titem.post.record, "reply", None) is None:
|
|
||||||
return arrow.get(titem.post.record.created_at)
|
|
||||||
return arrow.get(0)
|
|
||||||
|
|
||||||
def make_rich(content):
|
def build_video_embed(video_blob, alt_text):
|
||||||
text_builder = client_utils.TextBuilder()
|
|
||||||
|
|
||||||
# 1. Smart URL Repair: Find URLs broken by newlines and glue them back together
|
|
||||||
def repair_url(match):
|
|
||||||
raw = match.group(0)
|
|
||||||
|
|
||||||
# If there are no newlines, just strip trailing ellipsis and return
|
|
||||||
if '\n' not in raw and '\r' not in raw:
|
|
||||||
return re.sub(r'[…\.]+$', '', raw)
|
|
||||||
|
|
||||||
# Try removing all newlines
|
|
||||||
glued = raw.replace('\n', '').replace('\r', '')
|
|
||||||
|
|
||||||
# Strip trailing ellipsis for validation
|
|
||||||
test_url = re.sub(r'[…\.]+$', '', glued)
|
|
||||||
|
|
||||||
# If gluing it creates a valid URL, it was definitely broken by Twitter's DOM
|
|
||||||
if is_valid_url(test_url):
|
|
||||||
return test_url
|
|
||||||
|
|
||||||
# If it's NOT a valid URL, maybe the newline was supposed to be there (e.g., URL\nNextParagraph)
|
|
||||||
parts = raw.split('\n')
|
|
||||||
test_part0 = re.sub(r'[…\.]+$', '', parts[0])
|
|
||||||
if is_valid_url(test_part0):
|
|
||||||
return raw # Return original to preserve the paragraph break
|
|
||||||
|
|
||||||
# Fallback: assume it's a broken URL and glue it anyway
|
|
||||||
return test_url
|
|
||||||
|
|
||||||
# This regex grabs http:// or https:// followed by any non-space characters (including newlines)
|
|
||||||
content = re.sub(r'https?://[^\ \t]+', repair_url, content.strip())
|
|
||||||
|
|
||||||
# 2. Split content into lines to preserve actual paragraph breaks
|
|
||||||
lines = content.splitlines()
|
|
||||||
|
|
||||||
for line_idx, line in enumerate(lines):
|
|
||||||
# Handle empty lines to preserve spacing
|
|
||||||
if not line.strip():
|
|
||||||
if line_idx < len(lines) - 1:
|
|
||||||
text_builder.text("\n")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Split by space to process words, URLs, and tags
|
|
||||||
words = line.split(" ")
|
|
||||||
for i, word in enumerate(words):
|
|
||||||
if not word: # Handle double spaces gracefully
|
|
||||||
if i < len(words) - 1:
|
|
||||||
text_builder.text(" ")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check and convert URLs
|
|
||||||
if word.startswith("http://") or word.startswith("https://"):
|
|
||||||
# Ensure the URL is converted to https if it starts with http
|
|
||||||
if word.startswith("http://"):
|
|
||||||
word = word.replace("http://", "https://", 1)
|
|
||||||
|
|
||||||
# Strip trailing ellipsis right here so it doesn't fail validation
|
|
||||||
word = re.sub(r'[…\.]+$', '', word)
|
|
||||||
|
|
||||||
# Clean the URL further using the clean_url function
|
|
||||||
clean_url_value = clean_url(word)
|
|
||||||
if clean_url_value and is_valid_url(clean_url_value):
|
|
||||||
text_builder.link(clean_url_value, clean_url_value)
|
|
||||||
else:
|
|
||||||
text_builder.text(word) # Add as plain text if invalid
|
|
||||||
elif word.startswith("#"):
|
|
||||||
clean_tag = word[1:].rstrip(".,;:!?)'\"…")
|
|
||||||
text_builder.tag(word, clean_tag)
|
|
||||||
else:
|
|
||||||
text_builder.text(word)
|
|
||||||
|
|
||||||
if i < len(words) - 1:
|
|
||||||
text_builder.text(" ") # Add space between words
|
|
||||||
|
|
||||||
# Add a line break after each processed line, except the very last one
|
|
||||||
if line_idx < len(lines) - 1:
|
|
||||||
text_builder.text("\n")
|
|
||||||
|
|
||||||
return text_builder
|
|
||||||
|
|
||||||
def get_blob_from_url(media_url, client):
|
|
||||||
"""Fetches and uploads the media (image or video) and returns the blob."""
|
|
||||||
try:
|
try:
|
||||||
r = httpx.get(media_url, timeout=10)
|
return models.AppBskyEmbedVideo.Main(
|
||||||
if r.status_code == 200:
|
video=video_blob,
|
||||||
return client.upload_blob(r.content).blob
|
alt=alt_text
|
||||||
except Exception as e:
|
)
|
||||||
logging.warning(f"Could not fetch media {media_url}: {e}")
|
except AttributeError:
|
||||||
return None
|
logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
|
||||||
|
return None
|
||||||
|
|
||||||
# --- 6. Main Sync Function ---
|
|
||||||
|
# --- Main Sync Function ---
|
||||||
def sync_feeds(args):
|
def sync_feeds(args):
|
||||||
logging.info("🔄 Starting sync cycle...")
|
logging.info("🔄 Starting sync cycle...")
|
||||||
try:
|
try:
|
||||||
# 1. Fetch Tweets via Playwright
|
|
||||||
tweets = scrape_tweets_via_playwright(
|
tweets = scrape_tweets_via_playwright(
|
||||||
args.twitter_username,
|
args.twitter_username,
|
||||||
args.twitter_password,
|
args.twitter_password,
|
||||||
args.twitter_email,
|
args.twitter_email,
|
||||||
args.twitter_handle
|
args.twitter_handle
|
||||||
)
|
)
|
||||||
|
|
||||||
if not tweets:
|
if not tweets:
|
||||||
logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.")
|
logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. Connect to Bluesky
|
|
||||||
bsky_client = Client()
|
bsky_client = Client()
|
||||||
bsky_client.login(args.bsky_handle, args.bsky_password)
|
bsky_client.login(args.bsky_handle, args.bsky_password)
|
||||||
last_bsky_time = get_last_bsky(bsky_client, args.bsky_handle)
|
last_bsky_time = get_last_bsky(bsky_client, args.bsky_handle)
|
||||||
|
|
||||||
# 3. Process and Post
|
|
||||||
new_posts = 0
|
|
||||||
|
|
||||||
for tweet in reversed(tweets):
|
new_posts = 0
|
||||||
tweet_time = arrow.get(tweet.created_on)
|
state_file = "twitter_browser_state.json"
|
||||||
|
|
||||||
if tweet_time > last_bsky_time: # Only post new tweets
|
with sync_playwright() as p:
|
||||||
#if True: # For testing, post all tweets regardless of time
|
browser = p.chromium.launch(
|
||||||
|
headless=True,
|
||||||
|
args=["--disable-blink-features=AutomationControlled"]
|
||||||
|
)
|
||||||
|
context_kwargs = {
|
||||||
|
"user_agent": (
|
||||||
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/145.0.7632.6 Safari/537.36"
|
||||||
|
),
|
||||||
|
"viewport": {"width": 1920, "height": 1080},
|
||||||
|
}
|
||||||
|
if os.path.exists(state_file):
|
||||||
|
context_kwargs["storage_state"] = state_file
|
||||||
|
|
||||||
|
context = browser.new_context(**context_kwargs)
|
||||||
|
|
||||||
|
for tweet in reversed(tweets):
|
||||||
|
tweet_time = arrow.get(tweet.created_on)
|
||||||
|
|
||||||
|
if tweet_time <= last_bsky_time:
|
||||||
|
continue
|
||||||
|
|
||||||
logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...")
|
logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...")
|
||||||
|
|
||||||
raw_text = tweet.text.strip()
|
raw_text = tweet.text.strip()
|
||||||
|
|
||||||
# Smart truncation: Don't cut off in the middle of a word/URL
|
|
||||||
if len(raw_text) > 295:
|
if len(raw_text) > 295:
|
||||||
truncated = raw_text[:290]
|
truncated = raw_text[:290]
|
||||||
last_space = truncated.rfind(" ")
|
last_space = truncated.rfind(" ")
|
||||||
@@ -365,113 +475,107 @@ def sync_feeds(args):
|
|||||||
else:
|
else:
|
||||||
raw_text = truncated + "..."
|
raw_text = truncated + "..."
|
||||||
logging.info("✂️ Tweet exceeded 300 characters. Truncated safely for Bluesky.")
|
logging.info("✂️ Tweet exceeded 300 characters. Truncated safely for Bluesky.")
|
||||||
|
|
||||||
# Generate rich_text (This is now a TextBuilder object)
|
|
||||||
rich_text = make_rich(raw_text)
|
rich_text = make_rich(raw_text)
|
||||||
|
dynamic_alt = build_dynamic_alt(raw_text)
|
||||||
|
|
||||||
|
image_embeds = []
|
||||||
|
video_embed = None
|
||||||
|
|
||||||
images = []
|
|
||||||
if tweet.media:
|
if tweet.media:
|
||||||
# --- NEW: Generate dynamic alt text based on the tweet content ---
|
|
||||||
# 1. Remove line breaks and extra spaces
|
|
||||||
dynamic_alt = raw_text.replace('\n', ' ').strip()
|
|
||||||
# 2. Remove URLs to keep the text clean
|
|
||||||
dynamic_alt = re.sub(r'https?://\S+', '', dynamic_alt).strip()
|
|
||||||
|
|
||||||
# 3. Truncate gracefully if it's too long
|
|
||||||
if len(dynamic_alt) > 150:
|
|
||||||
dynamic_alt = dynamic_alt[:147] + "..."
|
|
||||||
elif not dynamic_alt:
|
|
||||||
# Fallback if the tweet is literally just an image with no text
|
|
||||||
dynamic_alt = "Imatge adjunta al tuit"
|
|
||||||
# -----------------------------------------------------------------
|
|
||||||
|
|
||||||
for media in tweet.media:
|
for media in tweet.media:
|
||||||
if media.type == "photo":
|
if media.type == "photo":
|
||||||
blob = get_blob_from_url(media.media_url_https, bsky_client)
|
blob = get_blob_from_url(media.media_url_https, bsky_client)
|
||||||
if blob:
|
if blob:
|
||||||
# Inject our dynamic alt text here!
|
image_embeds.append(
|
||||||
images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob))
|
models.AppBskyEmbedImages.Image(
|
||||||
elif media.type == "video":
|
alt=dynamic_alt,
|
||||||
# Download and crop the video
|
image=blob
|
||||||
video_path = "temp_video.mp4"
|
)
|
||||||
cropped_video_path = download_and_crop_video(media.media_url_https, video_path)
|
)
|
||||||
if cropped_video_path:
|
|
||||||
blob = get_blob_from_url(cropped_video_path, bsky_client)
|
elif media.type == "video":
|
||||||
if blob:
|
if not tweet.tweet_url:
|
||||||
images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob))
|
logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.")
|
||||||
os.remove(video_path) # Clean up the temporary video file
|
continue
|
||||||
|
|
||||||
|
temp_video_path = "temp_video.mp4"
|
||||||
|
|
||||||
|
try:
|
||||||
|
real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url)
|
||||||
|
if not real_video_url:
|
||||||
|
logging.warning(f"⚠️ Could not resolve real video URL for {tweet.tweet_url}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
cropped_video_path = download_and_crop_video(real_video_url, temp_video_path)
|
||||||
|
if not cropped_video_path:
|
||||||
|
continue
|
||||||
|
|
||||||
|
video_blob = get_blob_from_file(cropped_video_path, bsky_client)
|
||||||
|
if not video_blob:
|
||||||
|
continue
|
||||||
|
|
||||||
|
video_embed = build_video_embed(video_blob, dynamic_alt)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if os.path.exists(temp_video_path):
|
||||||
|
os.remove(temp_video_path)
|
||||||
|
|
||||||
# 🌐 Posting with Catalan language tag
|
|
||||||
try:
|
try:
|
||||||
if images:
|
if video_embed:
|
||||||
embed = models.AppBskyEmbedImages.Main(images=images)
|
bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"])
|
||||||
|
elif image_embeds:
|
||||||
|
embed = models.AppBskyEmbedImages.Main(images=image_embeds)
|
||||||
bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"])
|
bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"])
|
||||||
else:
|
else:
|
||||||
bsky_client.send_post(text=rich_text, langs=["ca"])
|
bsky_client.send_post(text=rich_text, langs=["ca"])
|
||||||
|
|
||||||
new_posts += 1
|
new_posts += 1
|
||||||
logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}")
|
logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}")
|
||||||
time.sleep(5) # Rate limit handling
|
time.sleep(5)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"❌ Failed to post tweet to Bluesky: {e}")
|
logging.error(f"❌ Failed to post tweet to Bluesky: {e}")
|
||||||
|
|
||||||
|
browser.close()
|
||||||
|
|
||||||
logging.info(f"✅ Sync complete. Posted {new_posts} new updates.")
|
logging.info(f"✅ Sync complete. Posted {new_posts} new updates.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"❌ Error during sync cycle: {e}")
|
logging.error(f"❌ Error during sync cycle: {e}")
|
||||||
|
|
||||||
# --- 7. Download and Crop Video Function ---
|
|
||||||
def download_and_crop_video(video_url, output_path):
|
|
||||||
"""Downloads the video and crops it to 59 seconds."""
|
|
||||||
try:
|
|
||||||
# Download the video
|
|
||||||
response = httpx.get(video_url, timeout=10)
|
|
||||||
if response.status_code == 200:
|
|
||||||
with open(output_path, 'wb') as f:
|
|
||||||
f.write(response.content)
|
|
||||||
logging.info(f"✅ Video downloaded: {output_path}")
|
|
||||||
|
|
||||||
# Crop the video to 59 seconds
|
# --- Main Execution ---
|
||||||
video_clip = VideoFileClip(output_path)
|
|
||||||
cropped_clip = video_clip.subclip(0, min(59, video_clip.duration))
|
|
||||||
cropped_clip.write_videofile(output_path, codec='libx264')
|
|
||||||
logging.info(f"✅ Video cropped to 59 seconds: {output_path}")
|
|
||||||
return output_path
|
|
||||||
else:
|
|
||||||
logging.error(f"❌ Failed to download video: {video_url}")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"❌ Error processing video: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# --- 8. Main Execution ---
|
|
||||||
def main():
|
def main():
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync")
|
parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync")
|
||||||
|
|
||||||
parser.add_argument("--twitter-username", help="Your Twitter login username")
|
parser.add_argument("--twitter-username", help="Your Twitter login username")
|
||||||
parser.add_argument("--twitter-password", help="Your Twitter login password")
|
parser.add_argument("--twitter-password", help="Your Twitter login password")
|
||||||
parser.add_argument("--twitter-email", help="Your Twitter email for security challenges")
|
parser.add_argument("--twitter-email", help="Your Twitter email for security challenges")
|
||||||
parser.add_argument("--twitter-handle", help="The Twitter account to scrape")
|
parser.add_argument("--twitter-handle", help="The Twitter account to scrape")
|
||||||
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
|
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
|
||||||
parser.add_argument("--bsky-password", help="Your Bluesky app password")
|
parser.add_argument("--bsky-password", help="Your Bluesky app password")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME")
|
args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME")
|
||||||
args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD")
|
args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD")
|
||||||
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
|
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
|
||||||
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
|
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
|
||||||
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
|
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
|
||||||
|
|
||||||
args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username
|
args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username
|
||||||
|
|
||||||
missing_args = []
|
missing_args = []
|
||||||
if not args.twitter_username: missing_args.append("--twitter-username")
|
if not args.twitter_username:
|
||||||
if not args.twitter_password: missing_args.append("--twitter-password")
|
missing_args.append("--twitter-username")
|
||||||
if not args.bsky_handle: missing_args.append("--bsky-handle")
|
if not args.twitter_password:
|
||||||
if not args.bsky_password: missing_args.append("--bsky-password")
|
missing_args.append("--twitter-password")
|
||||||
|
if not args.bsky_handle:
|
||||||
|
missing_args.append("--bsky-handle")
|
||||||
|
if not args.bsky_password:
|
||||||
|
missing_args.append("--bsky-password")
|
||||||
|
|
||||||
if missing_args:
|
if missing_args:
|
||||||
logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}")
|
logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}")
|
||||||
@@ -479,7 +583,8 @@ def main():
|
|||||||
|
|
||||||
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
|
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
|
||||||
sync_feeds(args)
|
sync_feeds(args)
|
||||||
logging.info(f"🤖 Bot finished.")
|
logging.info("🤖 Bot finished.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
Reference in New Issue
Block a user