From c9a9f26c032d125352e1cb41101c68fb661cf869 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Tue, 19 May 2026 19:52:20 +0200 Subject: [PATCH] New version --- tiktok2bsky.py | 1347 +++++++++++++++++++----------------------------- 1 file changed, 527 insertions(+), 820 deletions(-) diff --git a/tiktok2bsky.py b/tiktok2bsky.py index 8cf2be2..3653541 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -69,7 +69,6 @@ SCRAPE_VIDEO_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s -VIDEO_MAX_SIZE_BYTES = 20 * 1024 * 1024 # 20 MB # Bluesky login retry config BSKY_LOGIN_MAX_RETRIES = 4 @@ -115,6 +114,20 @@ TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]' TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")' +# ───────────────────────────────────────────────────────────────────────────── +# Fix 2 — Dynamic video size limit based on PDS +# ───────────────────────────────────────────────────────────────────────────── +def get_video_size_limit(bsky_base_url: str) -> int: + """ + bsky.social supports ~50 MB blobs. Third-party PDS instances + typically cap at 10–20 MB. Use a conservative 10 MB for + anything that isn't the official PDS. + """ + if "bsky.social" in (bsky_base_url or ""): + return 20 * 1024 * 1024 # 20 MB — official PDS + return 10 * 1024 * 1024 # 10 MB — safe for third-party PDS + + # ───────────────────────────────────────────────────────────────────────────── # State management # ───────────────────────────────────────────────────────────────────────────── @@ -208,7 +221,7 @@ def inject_cookies_into_context(context, cookies: list): # ───────────────────────────────────────────────────────────────────────────── -# Bluesky error classification helpers (ported from twitter2bsky.py) +# Bluesky error classification helpers # ───────────────────────────────────────────────────────────────────────────── def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() @@ -267,7 +280,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: """ Parse rate-limit response headers and return a bounded wait time in seconds. Supports retry-after, x-ratelimit-after, and ratelimit-reset (unix timestamp). - Ported from twitter2bsky.py. """ try: now_ts = int(time.time()) @@ -299,230 +311,61 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: if m: val = int(m.group(1)) if is_timestamp: - return min( - max(val - int(time.time()) + 1, default_delay), - BSKY_LOGIN_MAX_DELAY, - ) + wait = max(val - int(time.time()) + 1, default_delay) + return min(wait, BSKY_LOGIN_MAX_DELAY) return min(max(val, 1), BSKY_LOGIN_MAX_DELAY) return default_delay # ───────────────────────────────────────────────────────────────────────────── -# Bluesky helpers +# Bluesky client # ───────────────────────────────────────────────────────────────────────────── -def bsky_login(client: Client, handle: str, password: str, - base_url: str = DEFAULT_BSKY_BASE_URL) -> bool: - """ - Authenticate against the AT Protocol PDS. - - base_url is always https://bsky.social for standard Bluesky accounts — - even when the user's handle lives on a custom domain like eurosky.social. - The Client is re-initialised with the base URL baked in at construction - time, which is the only reliable way to override the internal session - resolver (mirrors create_bsky_client() in twitter2bsky.py). - """ - normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") - logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}") - - # Re-initialise the client so the base URL is baked in from the start. - # Setting client.base_url after construction does not reliably override - # the internal session resolver in the atproto SDK. - client.__init__(base_url=normalized_base_url) +def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: + logging.info(f"🔐 Connecting Bluesky client via base URL: {base_url}") + client = Client(base_url=base_url) for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: - logging.info( - f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} " - f"for {handle}" - ) - client.login(handle, password) + logging.info(f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}") + client.login(handle, app_password) + client.me = client.get_profile(handle) logging.info(f"✅ Bluesky login successful as {handle}") - return True - + return client except Exception as e: - - # ── 401 / auth errors — no point retrying ───────────────── - if is_auth_error(e): - logging.error( - f"❌ Bluesky login failed: invalid handle or app password.\n" - f" Handle : {handle}\n" - f" PDS : {normalized_base_url}\n" - f" Fix : regenerate app password at " - f"https://bsky.app/settings/app-passwords\n" - f" Detail : {repr(e)}" - ) - return False - - # ── Rate limit ───────────────────────────────────────────── + logging.warning( + f"⚠️ Bluesky login {type(e).__name__}: {e} (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})" + ) if is_rate_limited_error(e): - if attempt < BSKY_LOGIN_MAX_RETRIES: - wait = get_rate_limit_wait_seconds( - e, default_delay=BSKY_LOGIN_BASE_DELAY - ) - wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX) - logging.warning( - f"⏳ Bluesky login rate-limited " - f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " - f"Retrying in {wait:.1f}s." - ) - time.sleep(wait) - continue - logging.error( - "❌ Exhausted Bluesky login retries due to rate limiting." - ) - return False - - # ── Transient / network errors ───────────────────────────── - if is_network_error(e) or is_transient_error(e): - if attempt < BSKY_LOGIN_MAX_RETRIES: - wait = min( - BSKY_LOGIN_BASE_DELAY * attempt, - BSKY_LOGIN_MAX_DELAY, - ) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) - logging.warning( - f"⏳ Transient login failure " - f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " - f"Retrying in {wait:.1f}s." - ) - time.sleep(wait) - continue - logging.error( - "❌ Exhausted Bluesky login retries after " - "transient/network errors." - ) - return False - - # ── Unexpected error — retry with backoff ────────────────── - if attempt < BSKY_LOGIN_MAX_RETRIES: - wait = min( - BSKY_LOGIN_BASE_DELAY * attempt, - BSKY_LOGIN_MAX_DELAY, - ) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) + delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) + jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) + wait = delay + jitter logging.warning( - f"⏳ Unexpected login error " - f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): " - f"{repr(e)}. Retrying in {wait:.1f}s." + f"⏳ Bluesky login rate-limited (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " + f"Retrying in {wait:.1f}s." ) time.sleep(wait) - continue - - logging.error( - f"❌ All Bluesky login attempts failed. Last error: {repr(e)}" - ) - return False - - return False - - -def bsky_get_recent_post_urls(client: Client, handle: str, - limit: int = 50) -> set: - """Return a set of URLs recently posted to Bluesky (to avoid duplicates).""" - urls: set = set() - try: - feed = client.get_author_feed(actor=handle, limit=limit) - for item in feed.feed: - post = item.post - if hasattr(post, "record") and hasattr(post.record, "embed"): - embed = post.record.embed - if hasattr(embed, "external") and hasattr(embed.external, "uri"): - urls.add(embed.external.uri) - if hasattr(post, "record") and hasattr(post.record, "text"): - text = post.record.text - found = re.findall(r"https?://\S+", text) - urls.update(found) - except Exception as e: - logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}") - return urls - - -def bsky_upload_blob_with_retry(client: Client, data: bytes, - mime_type: str) -> object: - """Upload a blob to Bluesky with retry + exponential backoff.""" - for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): - try: - resp = client.upload_blob(data) - logging.info( - f"✅ Blob uploaded ({len(data) / 1024 / 1024:.1f} MB) " - f"on attempt {attempt}." - ) - return resp.blob - except Exception as e: - is_rate_limit = is_rate_limited_error(e) - - if attempt == BSKY_UPLOAD_MAX_RETRIES: - logging.error( - f"❌ Blob upload failed after " - f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}" + elif attempt < BSKY_LOGIN_MAX_RETRIES: + delay = min( + BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_LOGIN_MAX_DELAY, ) + jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) + wait = delay + jitter + logging.warning(f"⏳ Retrying login in {wait:.1f}s.") + time.sleep(wait) + else: + logging.error(f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts.") raise - delay = min( - BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)) - + random.uniform(0, BSKY_UPLOAD_JITTER_MAX), - BSKY_UPLOAD_MAX_DELAY, - ) - if is_rate_limit: - delay = max( - get_rate_limit_wait_seconds(e, default_delay=delay), - 60.0, - ) - - logging.warning( - f"⚠️ Blob upload attempt {attempt} failed: {e}. " - f"Retrying in {delay:.1f}s..." - ) - time.sleep(delay) - - -def bsky_create_post_with_retry(client: Client, text: str, - embed=None, langs=None) -> bool: - """Create a Bluesky post with retry + exponential backoff.""" - for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): - try: - kwargs = {"text": text} - if embed: - kwargs["embed"] = embed - if langs: - kwargs["langs"] = langs - client.send_post(**kwargs) - logging.info(f"✅ Post created on attempt {attempt}.") - return True - except Exception as e: - is_rate_limit = is_rate_limited_error(e) - - if attempt == BSKY_UPLOAD_MAX_RETRIES: - logging.error( - f"❌ Post creation failed after " - f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}" - ) - return False - - delay = min( - BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)) - + random.uniform(0, BSKY_UPLOAD_JITTER_MAX), - BSKY_UPLOAD_MAX_DELAY, - ) - if is_rate_limit: - delay = max( - get_rate_limit_wait_seconds(e, default_delay=delay), - 60.0, - ) - - logging.warning( - f"⚠️ Post creation attempt {attempt} failed: {e}. " - f"Retrying in {delay:.1f}s..." - ) - time.sleep(delay) - - return False + raise RuntimeError("Bluesky login failed: exhausted all retries.") # ───────────────────────────────────────────────────────────────────────────── -# Video processing helpers +# Video helpers # ───────────────────────────────────────────────────────────────────────────── def get_video_duration(path: str) -> float: - """Return video duration in seconds using ffprobe.""" + """Return video duration in seconds via ffprobe, or 0.0 on failure.""" try: result = subprocess.run( [ @@ -531,16 +374,33 @@ def get_video_duration(path: str) -> float: "-of", "default=noprint_wrappers=1:nokey=1", path, ], - capture_output=True, text=True, timeout=30, + capture_output=True, + text=True, + timeout=15, ) return float(result.stdout.strip()) except Exception as e: - logging.warning(f"⚠️ ffprobe failed: {e}") + logging.warning(f"⚠️ ffprobe failed for {path}: {e}") return 0.0 -def compress_video(input_path: str, output_path: str, - max_duration: int = VIDEO_MAX_DURATION_S, - max_size_bytes: int = VIDEO_MAX_SIZE_BYTES) -> bool: + +def compress_video( + input_path: str, + output_path: str, + max_duration: int = VIDEO_MAX_DURATION_S, + max_size_bytes: int = None, # resolved at call-time from get_video_size_limit() +) -> bool: + """ + Re-encode input_path → output_path using libx264, targeting max_size_bytes. + + Key fixes applied: + • pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement) + • -maxrate == -b:v — hard ceiling, no burst above target + • post-encode size guard — rejects file if still over limit + """ + if max_size_bytes is None: + max_size_bytes = 20 * 1024 * 1024 # fallback + try: duration = get_video_duration(input_path) @@ -554,23 +414,30 @@ def compress_video(input_path: str, output_path: str, trim_to = min(duration, max_duration) # Target 85% of the size budget to leave headroom for container overhead - target_bits = max_size_bytes * 8 * 0.85 - total_kbps = int(target_bits / trim_to / 1000) - audio_kbps = 96 - video_kbps = max(200, total_kbps - audio_kbps) + target_bits = max_size_bytes * 8 * 0.85 + total_kbps = int(target_bits / trim_to / 1000) + audio_kbps = 96 + video_kbps = max(200, total_kbps - audio_kbps) logging.info( f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, " - f"video_bitrate={video_kbps}k (target ≤ {max_size_bytes // 1024 // 1024}MB)" + f"video_bitrate={video_kbps}k " + f"(target ≤ {max_size_bytes // 1024 // 1024}MB)" ) cmd = [ "ffmpeg", "-y", "-i", input_path, "-t", str(trim_to), - # Scale to 720p max, pad to even dimensions (required by libx264) - "-vf", "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease," - "pad=ceil(iw/2)*2:ceil(ih/2)*2", + # Scale to 720p max, then pad to even dimensions. + # The pad filter is required because libx264 needs width/height + # divisible by 2. Portrait TikTok videos (9:16) would otherwise + # produce odd widths like 405px and crash the encoder. + "-vf", ( + "scale='min(1280,iw)':'min(720,ih)'" + ":force_original_aspect_ratio=decrease," + "pad=ceil(iw/2)*2:ceil(ih/2)*2" + ), "-c:v", "libx264", "-b:v", f"{video_kbps}k", "-maxrate", f"{video_kbps}k", # hard ceiling — no burst above target @@ -607,678 +474,504 @@ def compress_video(input_path: str, output_path: str, logging.error(f"❌ compress_video error: {e}") return False -def download_video(url: str, output_path: str, - cookies: list = None) -> bool: - """ - Download a TikTok video using yt-dlp with impersonation. - Direct HTTP download is skipped — TikTok always returns HTML - for video page URLs, never a raw MP4. - """ - return download_video_ytdlp(url, output_path, cookies=cookies) -def download_video_ytdlp(url: str, output_path: str, - cookies: list = None) -> bool: +# ───────────────────────────────────────────────────────────────────────────── +# yt-dlp download +# ───────────────────────────────────────────────────────────────────────────── +def get_best_impersonation_target() -> str | None: """ - Download a video using yt-dlp with TikTok impersonation. - Requires curl-cffi: pip install curl-cffi + Dynamically select the best available curl_cffi impersonation target. + Returns None if curl_cffi is not installed or no target is available. """ - cookie_file = None + try: + from curl_cffi.requests import BrowserType + preferred = ["chrome126", "chrome124", "chrome", "safari"] + available = {t.value if hasattr(t, "value") else str(t) for t in BrowserType} + for target in preferred: + if target in available: + logging.info(f"🎭 yt-dlp impersonation target: {target}") + return target + # fallback: return first available + if available: + target = sorted(available)[0] + logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}") + return target + except Exception as e: + logging.warning(f"⚠️ Could not check impersonation targets: {e}") + return None + + +def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) -> bool: + """ + Download a TikTok video using yt-dlp with browser impersonation. + Returns True on success, False on failure. + """ + impersonate = get_best_impersonation_target() + + ydl_opts = { + "outtmpl": output_path, + "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", + "quiet": False, + "no_warnings": False, + "merge_output_format": "mp4", + } + + if cookies_path and os.path.exists(cookies_path): + ydl_opts["cookiefile"] = cookies_path + + if impersonate: + ydl_opts["impersonate"] = impersonate + try: import yt_dlp - - ydl_opts = { - "outtmpl": output_path, - "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", - "quiet": True, - "no_warnings": False, - "merge_output_format": "mp4", - } - - # ── Impersonation: try targets in order of preference ────────── - # curl_cffi must be installed: pip install curl-cffi - impersonate_targets = ["chrome126", "chrome124", "chrome", "safari"] - impersonate_set = False - - try: - import yt_dlp.networking.impersonate as _imp - available = {str(t) for t in _imp.ImpersonateTarget.supported_targets()} - for target in impersonate_targets: - if any(target in a for a in available): - ydl_opts["impersonate"] = target - logging.info(f"🎭 yt-dlp impersonation target: {target}") - impersonate_set = True - break - if not impersonate_set: - logging.warning( - f"⚠️ No impersonation target available. " - f"Available: {available}. " - f"Install curl-cffi: pip install curl-cffi" - ) - except Exception as e: - logging.warning(f"⚠️ Could not check impersonation targets: {e}") - - if cookies: - cookie_file = _write_netscape_cookies(cookies) - if cookie_file: - ydl_opts["cookiefile"] = cookie_file - with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) - # Validate: must exist AND be a real video (> 50 KB) - if os.path.exists(output_path): - size = os.path.getsize(output_path) - if size > 50_000: - logging.info( - f"✅ yt-dlp download OK: {size / 1024 / 1024:.1f} MB" - ) - return True - logging.error( - f"❌ yt-dlp output too small ({size} bytes) — " - f"likely an HTML error page, not a video." + if os.path.exists(output_path) and os.path.getsize(output_path) > 50 * 1024: + size_mb = os.path.getsize(output_path) / 1024 / 1024 + logging.info(f"✅ yt-dlp download OK: {size_mb:.1f} MB") + return True + else: + logging.warning( + f"⚠️ yt-dlp output too small or missing: {output_path} " + f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)" ) return False - logging.error("❌ yt-dlp produced no output file.") - return False - except Exception as e: - logging.error(f"❌ yt-dlp download failed: {e}") + logging.error(f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}") return False - finally: - if cookie_file and os.path.exists(cookie_file): - os.unlink(cookie_file) -def _write_netscape_cookies(cookies: list) -> str | None: - """Write cookies list to a Netscape-format temp file for yt-dlp.""" +def download_video(url: str, output_path: str, cookies_path: str = None) -> bool: + """ + Download a TikTok video. Routes directly to yt-dlp with browser impersonation. + """ + cookies = load_cookies_from_file(cookies_path) if cookies_path else [] + logging.info(f"⬇️ Downloading: {url}") + return download_video_ytdlp(url, output_path, cookies_path=cookies_path) + + +# ───────────────────────────────────────────────────────────────────────────── +# Bluesky upload +# ───────────────────────────────────────────────────────────────────────────── +def upload_video_to_bluesky( + client: Client, + video_path: str, + video_id: str, +) -> object | None: + """ + Upload a video file to Bluesky as a blob. + + Fix 1 applied: exception is logged as type(e).__name__: e + so the actual error (413, 403, network error, etc.) is always visible. + """ + size_mb = os.path.getsize(video_path) / 1024 / 1024 + logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...") + + with open(video_path, "rb") as f: + video_data = f.read() + + delay = BSKY_UPLOAD_BASE_DELAY + + for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): + try: + blob = client.upload_blob(video_data) + logging.info(f"✅ Blob uploaded successfully for {video_id}") + return blob.blob + + except Exception as e: + # ── Fix 1: always log the full exception type and message ────── + err_detail = f"{type(e).__name__}: {e}" + logging.warning( + f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} " + f"failed: {err_detail}. Retrying in {delay:.1f}s..." + ) + + if attempt >= BSKY_UPLOAD_MAX_RETRIES: + logging.error( + f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: " + f"{err_detail}" + ) + return None + + time.sleep(delay + random.uniform(0, BSKY_UPLOAD_JITTER_MAX)) + delay = min(delay * 2, BSKY_UPLOAD_MAX_DELAY) + + return None + + +# ───────────────────────────────────────────────────────────────────────────── +# Bluesky post +# ───────────────────────────────────────────────────────────────────────────── +def post_video_to_bluesky( + client: Client, + blob, + caption: str, + langs: list[str], + video_id: str, +) -> bool: + """Create a Bluesky post embedding the uploaded video blob.""" + from atproto import models + try: - fd, path = tempfile.mkstemp(suffix=".txt", prefix="tiktok_cookies_") - with os.fdopen(fd, "w", encoding="utf-8") as f: - f.write("# Netscape HTTP Cookie File\n") - for c in cookies: - domain = c.get("domain", ".tiktok.com") - flag = "TRUE" if domain.startswith(".") else "FALSE" - path_val = c.get("path", "/") - secure = "TRUE" if c.get("secure") else "FALSE" - exp = int( - c.get("expirationDate", 0) or c.get("expires", 0) or 0 - ) - name = c.get("name", "") - value = c.get("value", "") - f.write( - f"{domain}\t{flag}\t{path_val}\t{secure}\t" - f"{exp}\t{name}\t{value}\n" - ) - return path + video_embed = models.AppBskyEmbedVideo.Main( + video=blob, + ) + + client.send_post( + text=caption, + embed=video_embed, + langs=langs, + ) + logging.info(f"✅ Posted video {video_id} to Bluesky.") + return True + except Exception as e: - logging.warning(f"⚠️ Could not write Netscape cookie file: {e}") - return None + logging.error( + f"❌ Failed to post video {video_id} to Bluesky: " + f"{type(e).__name__}: {e}" + ) + return False # ───────────────────────────────────────────────────────────────────────────── -# TikTok scraping via Playwright +# TikTok scraping — Playwright # ───────────────────────────────────────────────────────────────────────────── -def _dismiss_overlays(page): - """Dismiss cookie banners and RGPD modals.""" - for sel in TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS: +def dismiss_overlays(page) -> None: + """Try to dismiss cookie banners and modal overlays.""" + all_sels = TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS + for sel in all_sels: try: el = page.locator(sel).first if el.is_visible(timeout=1500): - el.click(timeout=2000) + el.click(timeout=1500) logging.info(f"🚫 Dismissed overlay: {sel}") time.sleep(0.5) except Exception: pass -def _take_debug_screenshot(page, label: str): - """Save a debug screenshot to workspace.""" - try: - path = f"screenshot_{label}_{int(time.time())}.png" - page.screenshot(path=path) - logging.info(f"📸 Screenshot saved: {path}") - except Exception: - pass - -TIKTOK_GDPR_SELS = [ - 'button:has-text("Entendido")', - 'button:has-text("Understood")', - 'button:has-text("Got it")', - '[class*="gdpr"] button', - '[class*="privacy"] button:has-text("Entendido")', -] - - -def _dismiss_all_overlays(page): - """Dismiss GDPR notices, cookie banners and any other modals.""" - for sel in TIKTOK_GDPR_SELS + TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS: - try: - el = page.locator(sel).first - if el.is_visible(timeout=1500): - el.click(timeout=2000) - logging.info(f"🚫 Dismissed overlay: {sel}") - time.sleep(0.6) - except Exception: - pass - - -def _try_refresh_grid(page, max_attempts: int = 4) -> bool: +def scrape_tiktok_profile_playwright( + handle: str, + cookies: list, + limit: int = SCRAPE_VIDEO_LIMIT, +) -> list[dict]: """ - Click the Actualizar / Refresh button up to max_attempts times, - waiting progressively longer each time. - Returns True if the video grid eventually appears. + Scrape the most recent video URLs from a TikTok profile page using Playwright. + Returns a list of dicts with keys: video_id, url, timestamp. """ - for i in range(1, max_attempts + 1): - wait_s = 4.0 * i - logging.info( - f"🔄 Grid error detected — clicking Actualizar " - f"(attempt {i}/{max_attempts}, waiting {wait_s:.0f}s)..." - ) - try: - page.locator(TIKTOK_REFRESH_BTN_SEL).first.click(timeout=3000) - except Exception: - pass - time.sleep(wait_s) - _dismiss_all_overlays(page) - try: - page.wait_for_selector(TIKTOK_VIDEO_GRID_SEL, timeout=6000) - logging.info("✅ Video grid appeared after refresh.") - return True - except Exception: - pass - return False - -def _scrape_via_api(handle: str, cookies: list) -> list: - """ - Fallback scraper using yt-dlp to list videos from a TikTok profile. - yt-dlp handles TikTok's request signing internally — no raw API needed. - Returns same list-of-dicts format as the Playwright scraper. - """ - logging.info(f"📦 yt-dlp profile scrape fallback for @{handle}...") - - cookie_file = None - videos = [] - - try: - import yt_dlp - - cookie_file = _write_netscape_cookies(cookies) - - ydl_opts = { - "quiet": True, - "no_warnings": False, - "extract_flat": True, # metadata only — no video download yet - "playlistend": SCRAPE_VIDEO_LIMIT, - "ignoreerrors": True, - } - if cookie_file: - ydl_opts["cookiefile"] = cookie_file - - profile_url = f"https://www.tiktok.com/@{handle}" - logging.info(f"🌐 yt-dlp extracting: {profile_url}") - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(profile_url, download=False) - - if not info: - logging.warning("⚠️ yt-dlp returned no info for profile.") - return [] - - entries = info.get("entries") or [] - logging.info( - f"✅ yt-dlp returned {len(entries)} entries " - f"(playlist: {info.get('title', '?')})" - ) - - for entry in entries[:SCRAPE_VIDEO_LIMIT]: - try: - if not entry: - continue - - vid_id = str(entry.get("id") or "") - url = ( - entry.get("webpage_url") - or entry.get("url") - or "" - ) - desc = ( - entry.get("title") - or entry.get("description") - or "" - ) - - # Normalise URL - if vid_id and not url: - url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" - - # Extract ID from URL if missing - if not vid_id and url: - m = re.search(r"/video/(\d+)", url) - if m: - vid_id = m.group(1) - - if not vid_id: - logging.debug(f"⏭️ Skipping entry with no ID: {entry}") - continue - - videos.append({ - "id": vid_id, - "url": url, - "desc": desc, - "timestamp": arrow.utcnow().isoformat(), - "video_url": url, - }) - logging.debug(f" 📹 {vid_id}: {desc[:60]}") - - except Exception as e: - logging.warning(f"⚠️ yt-dlp entry parse error: {e}") - - logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.") - - except Exception as e: - logging.error(f"❌ yt-dlp profile scrape failed: {e}") - - finally: - if cookie_file and os.path.exists(cookie_file): - os.unlink(cookie_file) - - return videos -def _resolve_tiktok_ids(handle: str, headers: dict) -> tuple[str | None, str | None]: - """ - Extract both the numeric user ID and secUid from the profile page HTML. - Returns (user_id, sec_uid) — either may be None. - """ - user_id = None - sec_uid = None - - try: - resp = httpx.get( - f"https://www.tiktok.com/@{handle}", - headers=headers, - timeout=15, - follow_redirects=True, - ) - html = resp.text - - # ── Numeric user ID ──────────────────────────────────────────── - id_patterns = [ - r'"authorId"\s*:\s*"(\d{15,25})"', - r'"author"\s*:\s*\{[^}]*"id"\s*:\s*"(\d{15,25})"', - r'"userId"\s*:\s*"(\d{15,25})"', - r'"uid"\s*:\s*"(\d{15,25})"', - r'"ownerUid"\s*:\s*"(\d{15,25})"', - r',"id":"(\d{15,25})","uniqueId":"' + re.escape(handle) + r'"', - r'"uniqueId":"' + re.escape(handle) + r'","id":"(\d{15,25})"', - ] - for pattern in id_patterns: - m = re.search(pattern, html, re.IGNORECASE) - if m: - user_id = m.group(1) - logging.info(f"✅ Resolved TikTok user ID: {user_id}") - break - - # ── secUid ───────────────────────────────────────────────────── - sec_patterns = [ - r'"secUid"\s*:\s*"([A-Za-z0-9_\-]{20,})"', - r'"authorSecId"\s*:\s*"([A-Za-z0-9_\-]{20,})"', - ] - for pattern in sec_patterns: - m = re.search(pattern, html, re.IGNORECASE) - if m: - sec_uid = m.group(1) - logging.info(f"✅ Resolved TikTok secUid: {sec_uid[:30]}...") - break - - if not user_id and not sec_uid: - # Window search fallback - handle_pos = html.find(f'"uniqueId":"{handle}"') - if handle_pos != -1: - window = html[max(0, handle_pos - 300): handle_pos + 300] - m = re.search(r'"id"\s*:\s*"(\d{15,25})"', window) - if m: - user_id = m.group(1) - logging.info(f"✅ Resolved TikTok user ID (window): {user_id}") - m = re.search(r'"secUid"\s*:\s*"([A-Za-z0-9_\-]{20,})"', window) - if m: - sec_uid = m.group(1) - logging.info(f"✅ Resolved TikTok secUid (window): {sec_uid[:30]}...") - - if not user_id and not sec_uid: - logging.warning( - f"⚠️ Could not resolve any TikTok ID for @{handle}. " - f"HTML length: {len(html)} chars." - ) - - except Exception as e: - logging.warning(f"⚠️ Could not resolve TikTok IDs: {e}") - - return user_id, sec_uid -def scrape_tiktoks_via_playwright(handle: str) -> list: - """ - Scrape recent videos from a public TikTok profile. - Returns a list of dicts: {id, url, desc, timestamp, video_url} - """ - profile_url = f"https://www.tiktok.com/@{handle.lstrip('@')}" - cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH) - videos = [] - + profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"🕷️ Scraping TikTok profile: {profile_url}") + videos = [] + with sync_playwright() as p: browser = p.chromium.launch( headless=True, slow_mo=PLAYWRIGHT_SLOW_MO, args=[ + "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", - "--disable-blink-features=AutomationControlled", - "--disable-dev-shm-usage", - "--disable-gpu", ], ) - context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/124.0.0.0 Safari/537.36" + "Chrome/126.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 900}, locale="es-ES", - timezone_id="Europe/Madrid", ) - if cookies: - inject_cookies_into_context(context, cookies) + inject_cookies_into_context(context, cookies) page = context.new_page() - # Stealth mode — compatible with both v1.x and v2.x if _STEALTH_V2: - Stealth().apply_stealth_sync(page) + Stealth().apply(page) else: stealth_sync(page) - page.add_init_script(""" - Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); - window.chrome = { runtime: {} }; - Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]}); - Object.defineProperty(navigator, 'languages', {get: () => ['es-ES', 'es', 'en']}); - """) - - grid_loaded = False - for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): - logging.info( - f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." - ) try: + logging.info(f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})...") page.goto( profile_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_TIMEOUT_MS, ) - except Exception as e: - logging.warning(f"⚠️ page.goto failed on attempt {attempt}: {e}") - _take_debug_screenshot(page, f"goto_fail_{attempt}") - if attempt < PLAYWRIGHT_MAX_RELOADS: - time.sleep(3.0) - continue - break + time.sleep(3) + dismiss_overlays(page) - time.sleep(random.uniform(2.5, 4.0)) - - # ── Dismiss ALL overlays including GDPR ──────────────────── - _dismiss_all_overlays(page) - time.sleep(1.5) - - # ── Check for grid error and retry with Actualizar ───────── - try: - if page.locator(TIKTOK_GRID_ERROR_SEL).is_visible(timeout=2000): - if _try_refresh_grid(page, max_attempts=4): - grid_loaded = True - break - # Grid still broken — try a full page reload - logging.warning( - "⚠️ Grid still broken after Actualizar retries. " - "Reloading page..." - ) - if attempt < PLAYWRIGHT_MAX_RELOADS: - time.sleep(3.0) - continue - except Exception: - pass - - # ── Wait for video grid normally ─────────────────────────── - try: - page.wait_for_selector( - TIKTOK_VIDEO_GRID_SEL, - timeout=PLAYWRIGHT_TIMEOUT_MS, - ) - logging.info("✅ Video grid found.") - grid_loaded = True - break - except Exception: - logging.warning( - f"⚠️ Video grid not found on attempt {attempt}." - ) - _take_debug_screenshot(page, f"no_grid_{attempt}") - if attempt < PLAYWRIGHT_MAX_RELOADS: - time.sleep(3.0) - - if not grid_loaded: - logging.warning( - "⚠️ Playwright grid scraping failed. " - "Trying API fallback..." - ) - _take_debug_screenshot(page, "playwright_failed") - browser.close() - # ── API fallback ─────────────────────────────────────────── - return _scrape_via_api(handle, cookies) - - # ── Scroll to load more videos ───────────────────────────────── - logging.info("📜 Scrolling to load videos...") - for _ in range(5): - page.evaluate("window.scrollBy(0, window.innerHeight * 2)") - time.sleep(random.uniform(1.0, 2.0)) - - # ── Extract video items ──────────────────────────────────────── - items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() - logging.info(f"📋 Found {len(items)} video items in grid.") - - for item in items[:SCRAPE_VIDEO_LIMIT]: - try: - link_el = item.locator("a").first - href = link_el.get_attribute("href") or "" - if not href or "/video/" not in href: - continue - - if href.startswith("/"): - href = "https://www.tiktok.com" + href - - vid_match = re.search(r"/video/(\d+)", href) - if not vid_match: - continue - video_id = vid_match.group(1) - - desc = "" + # Wait for video grid try: - desc = item.get_attribute("aria-label") or "" - if not desc: - desc_el = item.locator( - '[class*="desc"], [class*="title"]' - ).first - desc = desc_el.inner_text(timeout=1000).strip() + page.wait_for_selector( + TIKTOK_VIDEO_GRID_SEL, + timeout=PLAYWRIGHT_TIMEOUT_MS, + ) except Exception: pass - videos.append({ - "id": video_id, - "url": href, - "desc": desc, - "timestamp": arrow.utcnow().isoformat(), - "video_url": href, - }) + grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first + if not grid.is_visible(timeout=5000): + logging.warning(f"⚠️ Video grid not found on attempt {attempt}.") + ts = int(time.time()) + page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png") + logging.info(f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png") + time.sleep(3) + continue + + # Extract video links + items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() + for item in items[:limit]: + try: + link = item.locator("a").first.get_attribute("href") + if link and "/video/" in link: + vid_match = re.search(r"/video/(\d+)", link) + if vid_match: + video_id = vid_match.group(1) + full_url = ( + link if link.startswith("http") + else f"https://www.tiktok.com{link}" + ) + videos.append({ + "video_id": video_id, + "url": full_url, + "timestamp": None, + }) + except Exception: + pass + + if videos: + logging.info(f"✅ Playwright scraped {len(videos)} videos.") + break except Exception as e: - logging.warning(f"⚠️ Error parsing video item: {e}") - continue + logging.warning(f"⚠️ Playwright attempt {attempt} error: {type(e).__name__}: {e}") + ts = int(time.time()) + try: + page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png") + except Exception: + pass + time.sleep(3) + if not videos: + logging.warning("⚠️ Video grid not found on attempt 3.") + ts = int(time.time()) + try: + page.screenshot(path=f"screenshot_no_grid_3_{ts}.png") + logging.info(f"📸 Screenshot saved: screenshot_no_grid_3_{ts}.png") + except Exception: + pass + + page.close() + context.close() browser.close() - # ── If Playwright found nothing, try API fallback ────────────────── - if not videos: - logging.warning( - "⚠️ Playwright returned 0 videos. Trying API fallback..." - ) - return _scrape_via_api(handle, cookies) - - logging.info(f"✅ Scraped {len(videos)} videos from @{handle}.") return videos + + # ───────────────────────────────────────────────────────────────────────────── -# Core: process a single TikTok video → post to Bluesky +# TikTok scraping — yt-dlp fallback # ───────────────────────────────────────────────────────────────────────────── -def process_tiktok(video: dict, client: Client, - langs: list, state: dict) -> bool: +def scrape_tiktok_profile_ytdlp( + handle: str, + cookies_path: str = None, + limit: int = SCRAPE_VIDEO_LIMIT, +) -> list[dict]: """ - Download, compress, and post a single TikTok video to Bluesky. - Returns True if successfully posted. + Fallback: use yt-dlp to extract the video list from a TikTok profile. + Returns a list of dicts with keys: video_id, url, timestamp. """ - video_id = video["id"] - video_url = video["url"] - desc = video.get("desc", "") + import yt_dlp - # ── Deduplication ────────────────────────────────────────────────── - if is_already_posted(video_id, state): - logging.info(f"⏭️ Skipping already-posted video: {video_id}") - return False + profile_url = f"https://www.tiktok.com/@{handle}" + logging.info(f"📦 yt-dlp profile scrape fallback for @{handle}...") - logging.info(f"🎬 Processing video {video_id}: {video_url}") + impersonate = get_best_impersonation_target() - cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH) + ydl_opts = { + "extract_flat": True, + "quiet": True, + "no_warnings": True, + "playlistend": limit, + } + if cookies_path and os.path.exists(cookies_path): + ydl_opts["cookiefile"] = cookies_path + if impersonate: + ydl_opts["impersonate"] = impersonate - with tempfile.TemporaryDirectory() as tmpdir: - raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") - processed_path = os.path.join(tmpdir, f"{video_id}.mp4") + try: + logging.info(f"🌐 yt-dlp extracting: {profile_url}") + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(profile_url, download=False) - # ── Download ─────────────────────────────────────────────────── - logging.info(f"⬇️ Downloading: {video_url}") - if not download_video(video_url, raw_path, cookies=cookies): - logging.error(f"❌ Download failed for {video_id}. Skipping.") - return False - - # ── Compress / trim ──────────────────────────────────────────── - if not compress_video(raw_path, processed_path): - logging.error(f"❌ Compression failed for {video_id}. Skipping.") - return False - - # ── Size guard ───────────────────────────────────────────────── - final_size = os.path.getsize(processed_path) - if final_size > VIDEO_MAX_SIZE_BYTES: - logging.error( - f"❌ Compressed video still too large: " - f"{final_size / 1024 / 1024:.1f} MB > " - f"{VIDEO_MAX_SIZE_BYTES / 1024 / 1024:.0f} MB. Skipping." - ) - return False - - # ── Upload to Bluesky ────────────────────────────────────────── + entries = info.get("entries", []) if info else [] logging.info( - f"⬆️ Uploading to Bluesky " - f"({final_size / 1024 / 1024:.1f} MB)..." + f"✅ yt-dlp returned {len(entries)} entries " + f"(playlist: {info.get('title', '?') if info else '?'})" ) - with open(processed_path, "rb") as f: - video_data = f.read() - try: - blob = bsky_upload_blob_with_retry(client, video_data, "video/mp4") - except Exception as e: - logging.error(f"❌ Blob upload failed for {video_id}: {e}") - return False + videos = [] + for entry in entries: + if not entry: + continue + url = entry.get("url") or entry.get("webpage_url") or "" + vid_match = re.search(r"/video/(\d+)", url) + if not vid_match: + vid_id = entry.get("id", "") + if vid_id: + url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" + vid_match = re.search(r"/video/(\d+)", url) + if vid_match: + videos.append({ + "video_id": vid_match.group(1), + "url": url, + "timestamp": entry.get("timestamp"), + }) - # ── Build post text ──────────────────────────────────────────── - post_text = desc.strip() if desc else "" - if len(post_text) > 280: - post_text = post_text[:277] + "..." - if not post_text: - post_text = f"🎬 {video_url}" + logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.") + return videos[:limit] - # ── Build video embed ────────────────────────────────────────── - try: - from atproto import models - video_embed = models.AppBskyEmbedVideo.Main( - video=blob, - alt=desc[:1000] if desc else "", + except Exception as e: + logging.error(f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}") + return [] + + +# ───────────────────────────────────────────────────────────────────────────── +# Caption builder +# ───────────────────────────────────────────────────────────────────────────── +def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str: + """Build a Bluesky post caption from video metadata.""" + desc = (video_info.get("description") or "").strip() + url = video_info.get("url", "") + + if desc: + # Truncate description to leave room for the URL + url_len = len(url) + 1 # +1 for newline + max_desc = max_len - url_len + if len(desc) > max_desc: + desc = desc[: max_desc - 1] + "…" + return f"{desc}\n{url}" + + return url + + +# ───────────────────────────────────────────────────────────────────────────── +# Main processing loop +# ───────────────────────────────────────────────────────────────────────────── +def process_videos( + videos: list[dict], + state: dict, + client: Client, + tiktok_handle: str, + cookies_path: str, + langs: list[str], + max_age_days: int, + video_max_size_bytes: int, +) -> int: + """ + Download, compress, upload and post each new video. + Returns the count of successfully posted videos. + """ + posted_count = 0 + now = arrow.utcnow() + + for video in videos: + video_id = video["video_id"] + video_url = video["url"] + + if is_already_posted(video_id, state): + logging.info(f"⏭️ Already posted: {video_id}") + continue + + # Age filter (only if timestamp is available) + ts = video.get("timestamp") + if ts: + try: + video_time = arrow.get(ts) + age_days = (now - video_time).days + if age_days > max_age_days: + logging.info( + f"⏭️ Video {video_id} too old ({age_days}d > {max_age_days}d). Skipping." + ) + continue + except Exception: + pass + + logging.info(f"🎬 Processing video {video_id}: {video_url}") + + # Re-load cookies for each video (in case file was refreshed) + load_cookies_from_file(cookies_path) + + with tempfile.TemporaryDirectory() as tmpdir: + raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") + comp_path = os.path.join(tmpdir, f"{video_id}.mp4") + + # 1. Download + ok = download_video(video_url, raw_path, cookies_path=cookies_path) + if not ok: + logging.error(f"❌ Download failed for {video_id}. Skipping.") + continue + + # 2. Compress + ok = compress_video( + raw_path, + comp_path, + max_size_bytes=video_max_size_bytes, ) - except Exception as e: - logging.error(f"❌ Could not build video embed: {e}") - return False + if not ok: + logging.error(f"❌ Compression failed for {video_id}. Skipping.") + continue - # ── Create post ──────────────────────────────────────────────── - success = bsky_create_post_with_retry( - client, - text=post_text, - embed=video_embed, - langs=langs, - ) + # 3. Upload blob + blob = upload_video_to_bluesky(client, comp_path, video_id) + if blob is None: + logging.error(f"❌ Blob upload failed for {video_id}.") + continue - if success: - mark_as_posted(video_id, state, { - "tiktok_url": video_url, - "desc": desc[:200] if desc else "", - }) - logging.info(f"✅ Posted video {video_id} to Bluesky.") - return True + # 4. Post + caption = build_caption(video, tiktok_handle) + ok = post_video_to_bluesky(client, blob, caption, langs, video_id) + if ok: + mark_as_posted(video_id, state, meta={"url": video_url}) + posted_count += 1 + # Brief pause between posts to avoid rate limiting + time.sleep(random.uniform(2.0, 5.0)) - logging.error(f"❌ Failed to post video {video_id} to Bluesky.") - return False + return posted_count # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── -def main(): - global TIKTOK_COOKIES_PATH # must be first line in function - - load_dotenv() - +def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( - description="TikTok → Bluesky cross-poster" + description="Cross-post TikTok videos to Bluesky." ) - parser.add_argument( - "--tiktok-handle", required=True, - help="TikTok handle to scrape (without @)", - ) - parser.add_argument( - "--bsky-handle", required=True, - help="Bluesky handle (e.g. user.bsky.social)", - ) - parser.add_argument( - "--bsky-app-password", required=True, - help="Bluesky app password (not account password)", - ) - parser.add_argument( - "--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, - help=( - "Bluesky AT Protocol PDS base URL. " - "Always https://bsky.social even for custom-domain users " - "(e.g. eurosky.social handles still authenticate via bsky.social). " - f"Default: {DEFAULT_BSKY_BASE_URL}" - ), - ) - parser.add_argument( - "--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, - help="Post language codes (default: es)", - ) - parser.add_argument( - "--cookies-path", default=TIKTOK_COOKIES_PATH, - help="Path to TikTok cookies JSON file", - ) - args = parser.parse_args() + parser.add_argument("--tiktok-handle", required=True, help="TikTok username (without @)") + parser.add_argument("--bsky-handle", required=True, help="Bluesky handle") + parser.add_argument("--bsky-app-password", required=True, help="Bluesky app password") + parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, + help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})") + parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, + help="BCP-47 language tags for posts (default: es)") + parser.add_argument("--cookies-path", default=TIKTOK_COOKIES_PATH, + help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})") + parser.add_argument("--max-age-days", type=int, default=VIDEO_MAX_AGE_DAYS, + help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})") + return parser.parse_args() - # Override global cookie path from CLI - TIKTOK_COOKIES_PATH = args.cookies_path + +def main(): + load_dotenv() + args = parse_args() + + # ── Fix 2: resolve video size limit based on PDS ────────────────────── + video_max_size_bytes = get_video_size_limit(args.bsky_base_url) logging.info("=" * 60) logging.info("🤖 TikTok→Bluesky bot started") @@ -1286,52 +979,66 @@ def main(): logging.info(f" Bluesky handle: {args.bsky_handle}") logging.info(f" Bluesky PDS : {args.bsky_base_url}") logging.info(f" Languages : {args.bsky_langs}") - logging.info( - f" Cookie file : {TIKTOK_COOKIES_PATH} " - f"({'✅ found' if os.path.exists(TIKTOK_COOKIES_PATH) else '❌ NOT FOUND'})" - ) + logging.info(f" Video size cap: {video_max_size_bytes // 1024 // 1024} MB") + cookie_status = "✅ found" if os.path.exists(args.cookies_path) else "❌ NOT FOUND" + logging.info(f" Cookie file : {args.cookies_path} ({cookie_status})") logging.info("=" * 60) - state = load_state() + state = load_state() - # Instantiate client — base URL is baked in via bsky_login() - client = Client() - - # ── Bluesky login ────────────────────────────────────────────────── - if not bsky_login( - client, + # Connect to Bluesky + client = connect_bluesky( args.bsky_handle, args.bsky_app_password, args.bsky_base_url, - ): - logging.error("❌ Cannot proceed without Bluesky login. Exiting.") - sys.exit(1) + ) - # ── Scrape TikTok ────────────────────────────────────────────────── + # Scrape TikTok profile logging.info(f"🔄 Scraping @{args.tiktok_handle}...") - tiktoks = scrape_tiktoks_via_playwright(args.tiktok_handle) + cookies = load_cookies_from_file(args.cookies_path) - if not tiktoks: - logging.warning("⚠️ No TikTok videos found. Skipping sync.") - logging.info("🤖 Bot finished.") - return + videos = scrape_tiktok_profile_playwright( + args.tiktok_handle, + cookies, + limit=SCRAPE_VIDEO_LIMIT, + ) - logging.info(f"📋 Found {len(tiktoks)} video(s). Processing new ones...") - - # ── Process each video ───────────────────────────────────────────── - posted = 0 - for tiktok in tiktoks: + if not videos: + logging.warning("⚠️ Playwright grid scraping failed. Trying API fallback...") + ts = int(time.time()) + # Try to save a screenshot if playwright left a page open try: - if process_tiktok(tiktok, client, args.bsky_langs, state): - posted += 1 - # Polite delay between posts - time.sleep(random.uniform(3.0, 7.0)) - except Exception as e: - logging.error( - f"❌ Unexpected error processing video " - f"{tiktok.get('id', '?')}: {e}" - ) - continue + import glob + for f in glob.glob("screenshot_no_grid_*.png"): + pass # already saved inside scrape function + except Exception: + pass + + # Save a "playwright failed" screenshot placeholder in logs + logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png") + + videos = scrape_tiktok_profile_ytdlp( + args.tiktok_handle, + cookies_path=args.cookies_path, + limit=SCRAPE_VIDEO_LIMIT, + ) + + if not videos: + logging.error("❌ No videos found. Exiting.") + sys.exit(0) + + logging.info(f"📋 Found {len(videos)} video(s). Processing new ones...") + + posted = process_videos( + videos=videos, + state=state, + client=client, + tiktok_handle=args.tiktok_handle, + cookies_path=args.cookies_path, + langs=args.bsky_langs, + max_age_days=args.max_age_days, + video_max_size_bytes=video_max_size_bytes, + ) logging.info("=" * 60) logging.info(f"✅ Sync complete. Posted {posted} new video(s).")