Added new yml

This commit is contained in:
Guillem Hernandez Sola
2026-03-30 18:03:07 +02:00
parent 9faabf48d0
commit fdfe377986

View File

@@ -62,7 +62,6 @@ def clean_url(url):
def get_blob_from_url(media_url, client): def get_blob_from_url(media_url, client):
"""Fetches remote media and uploads it to Bluesky."""
try: try:
r = httpx.get(media_url, timeout=30, follow_redirects=True) r = httpx.get(media_url, timeout=30, follow_redirects=True)
if r.status_code == 200: if r.status_code == 200:
@@ -73,7 +72,6 @@ def get_blob_from_url(media_url, client):
def get_blob_from_file(file_path, client): def get_blob_from_file(file_path, client):
"""Uploads a local file to Bluesky."""
try: try:
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
return client.upload_blob(f.read()).blob return client.upload_blob(f.read()).blob
@@ -170,10 +168,7 @@ def build_dynamic_alt(raw_text):
def build_video_embed(video_blob, alt_text): def build_video_embed(video_blob, alt_text):
try: try:
return models.AppBskyEmbedVideo.Main( return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
video=video_blob,
alt=alt_text
)
except AttributeError: except AttributeError:
logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
return None return None
@@ -332,18 +327,28 @@ def scrape_tweets_via_playwright(username, password, email, target_handle):
def extract_video_url_from_tweet_page(context, tweet_url): def extract_video_url_from_tweet_page(context, tweet_url):
""" """
Prefer HLS playlist first because it usually contains the full playable stream Open tweet page and capture media requests.
with audio + video and proper timing metadata.
Fallback to direct video MP4 only if no HLS playlist is found. Strategy:
- listen for network responses
- wait for player
- scroll into view
- click player
- poll for a few seconds
- retry interaction once if needed
Preference:
1. HLS .m3u8
2. real video .mp4
Ignore: Ignore:
- .m4s fragments - .m4s
- audio-only MP4 URLs - audio-only mp4
""" """
page = context.new_page() page = context.new_page()
best_m3u8_url = None best_m3u8_url = None
best_video_mp4_url = None best_video_mp4_url = None
seen_urls = set()
def is_audio_only_mp4(url, content_type): def is_audio_only_mp4(url, content_type):
url_l = url.lower() url_l = url.lower()
@@ -359,6 +364,10 @@ def extract_video_url_from_tweet_page(context, tweet_url):
nonlocal best_m3u8_url, best_video_mp4_url nonlocal best_m3u8_url, best_video_mp4_url
try: try:
url = response.url url = response.url
if url in seen_urls:
return
seen_urls.add(url)
url_l = url.lower() url_l = url.lower()
content_type = response.headers.get("content-type", "") content_type = response.headers.get("content-type", "")
content_type_l = content_type.lower() content_type_l = content_type.lower()
@@ -376,37 +385,76 @@ def extract_video_url_from_tweet_page(context, tweet_url):
logging.info(f"📺 Found HLS playlist URL: {url}") logging.info(f"📺 Found HLS playlist URL: {url}")
return return
if (".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l) and is_audio_only_mp4(url, content_type): if (".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l):
logging.info(f"🔇 Ignoring audio-only MP4: {url}") if is_audio_only_mp4(url, content_type):
return logging.info(f"🔇 Ignoring audio-only MP4: {url}")
return
if ".mp4" in url_l or "video/mp4" in content_type_l:
if best_video_mp4_url is None: if best_video_mp4_url is None:
best_video_mp4_url = url best_video_mp4_url = url
logging.info(f"🎥 Found VIDEO MP4 URL: {url}") logging.info(f"🎥 Found VIDEO MP4 URL: {url}")
return return
except Exception: except Exception as e:
pass logging.debug(f"Response parsing error: {e}")
page.on("response", handle_response) page.on("response", handle_response)
def current_best():
return best_m3u8_url or best_video_mp4_url
try: try:
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
page.goto(tweet_url, wait_until="networkidle", timeout=30000) page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(5) time.sleep(3)
video_player = page.locator('[data-testid="videoPlayer"]').first player = page.locator('[data-testid="videoPlayer"]').first
if video_player.count() > 0:
if player.count() > 0:
try: try:
video_player.click(force=True, timeout=3000) player.scroll_into_view_if_needed(timeout=5000)
time.sleep(5)
except Exception: except Exception:
pass pass
selected_url = best_m3u8_url or best_video_mp4_url try:
player.click(force=True, timeout=5000)
logging.info("▶️ Clicked video player")
except Exception as e:
logging.info(f"⚠️ First player click failed: {e}")
else:
logging.warning("⚠️ No video player locator found on tweet page")
for _ in range(12):
if current_best():
break
time.sleep(1)
if not current_best() and player.count() > 0:
logging.info("🔁 No media URL found yet, retrying player interaction...")
try:
player.click(force=True, timeout=5000)
time.sleep(2)
except Exception as e:
logging.info(f"⚠️ Retry click failed: {e}")
try:
page.keyboard.press("Space")
time.sleep(1)
except Exception:
pass
for _ in range(8):
if current_best():
break
time.sleep(1)
selected_url = current_best()
if selected_url: if selected_url:
logging.info(f"✅ Selected media URL for download: {selected_url}") logging.info(f"✅ Selected media URL for download: {selected_url}")
else:
logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}")
return selected_url return selected_url
except Exception as e: except Exception as e:
@@ -418,12 +466,6 @@ def extract_video_url_from_tweet_page(context, tweet_url):
# --- Video Processing --- # --- Video Processing ---
def download_and_crop_video(video_url, output_path): def download_and_crop_video(video_url, output_path):
"""
Downloads a video from MP4 or HLS (.m3u8), then trims it to max 59 seconds.
Uses ffmpeg for download and MoviePy for crop.
HLS is preferred because it usually produces a complete muxed file.
"""
temp_input = output_path.replace(".mp4", "_source.mp4") temp_input = output_path.replace(".mp4", "_source.mp4")
temp_output = output_path.replace(".mp4", "_cropped.mp4") temp_output = output_path.replace(".mp4", "_cropped.mp4")
@@ -545,8 +587,7 @@ def sync_feeds(args):
for tweet in reversed(tweets): for tweet in reversed(tweets):
tweet_time = arrow.get(tweet.created_on) tweet_time = arrow.get(tweet.created_on)
#if tweet_time <= last_bsky_time: if tweet_time <= last_bsky_time:
if False:
continue continue
logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...") logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...")