diff --git a/tiktok2bsky.py b/tiktok2bsky.py index 3653541..f83dd44 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -34,13 +34,21 @@ from atproto import Client from dotenv import load_dotenv from playwright.sync_api import sync_playwright -# playwright-stealth 1.x uses stealth_sync, 2.x uses Stealth class + +# ───────────────────────────────────────────────────────────────────────────── +# playwright-stealth: support v1.x (stealth_sync) and v2.x (Stealth class) +# ───────────────────────────────────────────────────────────────────────────── +_STEALTH_V2 = None # None = not available at all + try: from playwright_stealth import stealth_sync _STEALTH_V2 = False except ImportError: - from playwright_stealth import Stealth - _STEALTH_V2 = True + try: + from playwright_stealth import Stealth + _STEALTH_V2 = True + except ImportError: + pass # stealth disabled — warning emitted at runtime # ───────────────────────────────────────────────────────────────────────────── @@ -146,7 +154,6 @@ def load_state() -> dict: def save_state(state: dict): - # Prune to last STATE_MAX_ENTRIES posted = state.get("posted", {}) if len(posted) > STATE_MAX_ENTRIES: sorted_keys = sorted( @@ -180,7 +187,7 @@ def mark_as_posted(video_id: str, state: dict, meta: dict = None): # Cookie helpers # ───────────────────────────────────────────────────────────────────────────── def load_cookies_from_file(path: str) -> list: - """Load cookies from a JSON file (format produced by generate_tiktok_cookies.py).""" + """Load cookies from a JSON file.""" if not os.path.exists(path): logging.warning(f"⚠️ Cookie file not found: {path}") return [] @@ -279,7 +286,6 @@ def is_transient_error(error_obj) -> bool: def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: """ Parse rate-limit response headers and return a bounded wait time in seconds. - Supports retry-after, x-ratelimit-after, and ratelimit-reset (unix timestamp). """ try: now_ts = int(time.time()) @@ -300,7 +306,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: except Exception: pass - # repr() fallback — parse headers embedded in the exception string text = repr(error_obj) for pattern, is_timestamp in [ (r"'retry-after':\s*'(\d+)'", False), @@ -318,6 +323,54 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: return default_delay +# ───────────────────────────────────────────────────────────────────────────── +# playwright-stealth application helper +# ───────────────────────────────────────────────────────────────────────────── +def apply_stealth(page): + """ + Apply playwright-stealth to a page object. + + Handles all known API variants: + v1.x → stealth_sync(page) + v2.x → Stealth().use_sync(page) returns a new wrapped page + v2.x → Stealth().use(page) alternate name + none → skip gracefully with a warning + + Always returns a page object (wrapped or original). + """ + if _STEALTH_V2 is None: + logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.") + return page + + try: + if _STEALTH_V2: + # v2.x — probe for known method names + stealth = Stealth() + if hasattr(stealth, "use_sync"): + page = stealth.use_sync(page) + logging.info("🥷 playwright-stealth v2.x applied (use_sync).") + elif hasattr(stealth, "use"): + page = stealth.use(page) + logging.info("🥷 playwright-stealth v2.x applied (use).") + else: + logging.warning( + "⚠️ playwright-stealth v2.x: no known apply method found " + "(tried use_sync, use). Skipping stealth." + ) + else: + # v1.x + stealth_sync(page) + logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).") + + except Exception as e: + logging.warning( + f"⚠️ playwright-stealth could not be applied: " + f"{type(e).__name__}: {e}. Continuing without stealth." + ) + + return page + + # ───────────────────────────────────────────────────────────────────────────── # Bluesky client # ───────────────────────────────────────────────────────────────────────────── @@ -327,14 +380,17 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: - logging.info(f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}") + logging.info( + f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}" + ) client.login(handle, app_password) client.me = client.get_profile(handle) logging.info(f"✅ Bluesky login successful as {handle}") return client except Exception as e: logging.warning( - f"⚠️ Bluesky login {type(e).__name__}: {e} (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})" + f"⚠️ Bluesky login {type(e).__name__}: {e} " + f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})" ) if is_rate_limited_error(e): delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) @@ -355,7 +411,9 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: logging.warning(f"⏳ Retrying login in {wait:.1f}s.") time.sleep(wait) else: - logging.error(f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts.") + logging.error( + f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts." + ) raise raise RuntimeError("Bluesky login failed: exhausted all retries.") @@ -388,12 +446,12 @@ def compress_video( input_path: str, output_path: str, max_duration: int = VIDEO_MAX_DURATION_S, - max_size_bytes: int = None, # resolved at call-time from get_video_size_limit() + max_size_bytes: int = None, ) -> bool: """ Re-encode input_path → output_path using libx264, targeting max_size_bytes. - Key fixes applied: + Fixes applied: • pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement) • -maxrate == -b:v — hard ceiling, no burst above target • post-encode size guard — rejects file if still over limit @@ -471,12 +529,12 @@ def compress_video( return True except Exception as e: - logging.error(f"❌ compress_video error: {e}") + logging.error(f"❌ compress_video error: {type(e).__name__}: {e}") return False # ───────────────────────────────────────────────────────────────────────────── -# yt-dlp download +# yt-dlp helpers # ───────────────────────────────────────────────────────────────────────────── def get_best_impersonation_target() -> str | None: """ @@ -491,7 +549,6 @@ def get_best_impersonation_target() -> str | None: if target in available: logging.info(f"🎭 yt-dlp impersonation target: {target}") return target - # fallback: return first available if available: target = sorted(available)[0] logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}") @@ -509,10 +566,10 @@ def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) - impersonate = get_best_impersonation_target() ydl_opts = { - "outtmpl": output_path, - "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", - "quiet": False, - "no_warnings": False, + "outtmpl": output_path, + "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", + "quiet": False, + "no_warnings": False, "merge_output_format": "mp4", } @@ -544,10 +601,7 @@ def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) - def download_video(url: str, output_path: str, cookies_path: str = None) -> bool: - """ - Download a TikTok video. Routes directly to yt-dlp with browser impersonation. - """ - cookies = load_cookies_from_file(cookies_path) if cookies_path else [] + """Download a TikTok video via yt-dlp with browser impersonation.""" logging.info(f"⬇️ Downloading: {url}") return download_video_ytdlp(url, output_path, cookies_path=cookies_path) @@ -563,8 +617,8 @@ def upload_video_to_bluesky( """ Upload a video file to Bluesky as a blob. - Fix 1 applied: exception is logged as type(e).__name__: e - so the actual error (413, 403, network error, etc.) is always visible. + Fix 1: exception is always logged as type(e).__name__: e + so the actual error (413, 403, network error, etc.) is visible in logs. """ size_mb = os.path.getsize(video_path) / 1024 / 1024 logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...") @@ -581,12 +635,8 @@ def upload_video_to_bluesky( return blob.blob except Exception as e: - # ── Fix 1: always log the full exception type and message ────── + # Fix 1 — always log the full exception type and message err_detail = f"{type(e).__name__}: {e}" - logging.warning( - f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} " - f"failed: {err_detail}. Retrying in {delay:.1f}s..." - ) if attempt >= BSKY_UPLOAD_MAX_RETRIES: logging.error( @@ -595,6 +645,10 @@ def upload_video_to_bluesky( ) return None + logging.warning( + f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} " + f"failed: {err_detail}. Retrying in {delay:.1f}s..." + ) time.sleep(delay + random.uniform(0, BSKY_UPLOAD_JITTER_MAX)) delay = min(delay * 2, BSKY_UPLOAD_MAX_DELAY) @@ -618,7 +672,6 @@ def post_video_to_bluesky( video_embed = models.AppBskyEmbedVideo.Main( video=blob, ) - client.send_post( text=caption, embed=video_embed, @@ -690,14 +743,14 @@ def scrape_tiktok_profile_playwright( page = context.new_page() - if _STEALTH_V2: - Stealth().apply(page) - else: - stealth_sync(page) + # Apply stealth — gracefully handles all v1/v2/missing variants + page = apply_stealth(page) for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): try: - logging.info(f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})...") + logging.info( + f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." + ) page.goto( profile_url, wait_until="domcontentloaded", @@ -720,7 +773,9 @@ def scrape_tiktok_profile_playwright( logging.warning(f"⚠️ Video grid not found on attempt {attempt}.") ts = int(time.time()) page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png") - logging.info(f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png") + logging.info( + f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png" + ) time.sleep(3) continue @@ -738,8 +793,8 @@ def scrape_tiktok_profile_playwright( else f"https://www.tiktok.com{link}" ) videos.append({ - "video_id": video_id, - "url": full_url, + "video_id": video_id, + "url": full_url, "timestamp": None, }) except Exception: @@ -750,7 +805,10 @@ def scrape_tiktok_profile_playwright( break except Exception as e: - logging.warning(f"⚠️ Playwright attempt {attempt} error: {type(e).__name__}: {e}") + logging.warning( + f"⚠️ Playwright attempt {attempt} error: " + f"{type(e).__name__}: {e}" + ) ts = int(time.time()) try: page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png") @@ -759,17 +817,31 @@ def scrape_tiktok_profile_playwright( time.sleep(3) if not videos: - logging.warning("⚠️ Video grid not found on attempt 3.") + logging.warning( + f"⚠️ Video grid not found on attempt {PLAYWRIGHT_MAX_RELOADS}." + ) ts = int(time.time()) try: - page.screenshot(path=f"screenshot_no_grid_3_{ts}.png") - logging.info(f"📸 Screenshot saved: screenshot_no_grid_3_{ts}.png") + page.screenshot(path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png") + logging.info( + f"📸 Screenshot saved: " + f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png" + ) except Exception: pass - page.close() - context.close() - browser.close() + try: + page.close() + except Exception: + pass + try: + context.close() + except Exception: + pass + try: + browser.close() + except Exception: + pass return videos @@ -794,10 +866,10 @@ def scrape_tiktok_profile_ytdlp( impersonate = get_best_impersonation_target() ydl_opts = { - "extract_flat": True, - "quiet": True, - "no_warnings": True, - "playlistend": limit, + "extract_flat": True, + "quiet": True, + "no_warnings": True, + "playlistend": limit, } if cookies_path and os.path.exists(cookies_path): ydl_opts["cookiefile"] = cookies_path @@ -837,7 +909,9 @@ def scrape_tiktok_profile_ytdlp( return videos[:limit] except Exception as e: - logging.error(f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}") + logging.error( + f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}" + ) return [] @@ -850,9 +924,8 @@ def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> s url = video_info.get("url", "") if desc: - # Truncate description to leave room for the URL - url_len = len(url) + 1 # +1 for newline - max_desc = max_len - url_len + url_len = len(url) + 1 # +1 for newline + max_desc = max_len - url_len if len(desc) > max_desc: desc = desc[: max_desc - 1] + "…" return f"{desc}\n{url}" @@ -888,7 +961,7 @@ def process_videos( logging.info(f"⏭️ Already posted: {video_id}") continue - # Age filter (only if timestamp is available) + # Age filter (only when timestamp is available) ts = video.get("timestamp") if ts: try: @@ -896,7 +969,8 @@ def process_videos( age_days = (now - video_time).days if age_days > max_age_days: logging.info( - f"⏭️ Video {video_id} too old ({age_days}d > {max_age_days}d). Skipping." + f"⏭️ Video {video_id} too old " + f"({age_days}d > {max_age_days}d). Skipping." ) continue except Exception: @@ -904,9 +978,6 @@ def process_videos( logging.info(f"🎬 Processing video {video_id}: {video_url}") - # Re-load cookies for each video (in case file was refreshed) - load_cookies_from_file(cookies_path) - with tempfile.TemporaryDirectory() as tmpdir: raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") comp_path = os.path.join(tmpdir, f"{video_id}.mp4") @@ -952,17 +1023,43 @@ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Cross-post TikTok videos to Bluesky." ) - parser.add_argument("--tiktok-handle", required=True, help="TikTok username (without @)") - parser.add_argument("--bsky-handle", required=True, help="Bluesky handle") - parser.add_argument("--bsky-app-password", required=True, help="Bluesky app password") - parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, - help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})") - parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, - help="BCP-47 language tags for posts (default: es)") - parser.add_argument("--cookies-path", default=TIKTOK_COOKIES_PATH, - help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})") - parser.add_argument("--max-age-days", type=int, default=VIDEO_MAX_AGE_DAYS, - help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})") + parser.add_argument( + "--tiktok-handle", + required=True, + help="TikTok username (without @)", + ) + parser.add_argument( + "--bsky-handle", + required=True, + help="Bluesky handle", + ) + parser.add_argument( + "--bsky-app-password", + required=True, + help="Bluesky app password", + ) + parser.add_argument( + "--bsky-base-url", + default=DEFAULT_BSKY_BASE_URL, + help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})", + ) + parser.add_argument( + "--bsky-langs", + nargs="+", + default=DEFAULT_BSKY_LANGS, + help="BCP-47 language tags for posts (default: es)", + ) + parser.add_argument( + "--cookies-path", + default=TIKTOK_COOKIES_PATH, + help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})", + ) + parser.add_argument( + "--max-age-days", + type=int, + default=VIDEO_MAX_AGE_DAYS, + help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})", + ) return parser.parse_args() @@ -970,7 +1067,7 @@ def main(): load_dotenv() args = parse_args() - # ── Fix 2: resolve video size limit based on PDS ────────────────────── + # Fix 2 — resolve video size limit based on PDS video_max_size_bytes = get_video_size_limit(args.bsky_base_url) logging.info("=" * 60) @@ -1004,17 +1101,8 @@ def main(): ) if not videos: - logging.warning("⚠️ Playwright grid scraping failed. Trying API fallback...") + logging.warning("⚠️ Playwright grid scraping failed. Trying yt-dlp fallback...") ts = int(time.time()) - # Try to save a screenshot if playwright left a page open - try: - import glob - for f in glob.glob("screenshot_no_grid_*.png"): - pass # already saved inside scrape function - except Exception: - pass - - # Save a "playwright failed" screenshot placeholder in logs logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png") videos = scrape_tiktok_profile_ytdlp(