diff --git a/tiktok2bsky.py b/tiktok2bsky.py index f83dd44..4ee5cff 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -36,7 +36,7 @@ from playwright.sync_api import sync_playwright # ───────────────────────────────────────────────────────────────────────────── -# playwright-stealth: support v1.x (stealth_sync) and v2.x (Stealth class) +# playwright-stealth: detect installed version # ───────────────────────────────────────────────────────────────────────────── _STEALTH_V2 = None # None = not available at all @@ -96,9 +96,9 @@ PLAYWRIGHT_SLOW_MO = 50 PLAYWRIGHT_MAX_RELOADS = 3 # TikTok selectors -TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]' -TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]' -TIKTOK_BANNER_SELS = [ +TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]' +TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]' +TIKTOK_BANNER_SELS = [ '[id*="banner"]', '[class*="banner"]', '[data-e2e="recommend-modal-close"]', @@ -227,6 +227,63 @@ def inject_cookies_into_context(context, cookies: list): logging.warning(f"⚠️ Could not inject cookies: {e}") +def convert_json_cookies_to_netscape(json_path: str) -> str | None: + """ + Convert a JSON cookie file (browser extension format) to a Netscape + cookie file that yt-dlp can consume. + + Returns the path to a temporary Netscape file, or None on failure. + The caller is responsible for deleting the file when done. + + Netscape format columns (tab-separated): + domain include_subdomains path secure expiry name value + """ + try: + with open(json_path, "r", encoding="utf-8") as f: + cookies = json.load(f) + + tmp = tempfile.NamedTemporaryFile( + mode="w", + suffix=".txt", + delete=False, + encoding="utf-8", + ) + + tmp.write("# Netscape HTTP Cookie File\n") + tmp.write("# Generated by tiktok2bsky.py\n\n") + + for c in cookies: + domain = c.get("domain", ".tiktok.com") + # Netscape format requires domain to start with a dot for + # include_subdomains=TRUE to work correctly + include_sub = "TRUE" if domain.startswith(".") else "FALSE" + path = c.get("path", "/") + secure = "TRUE" if c.get("secure", False) else "FALSE" + expiry = int( + c.get("expirationDate") or c.get("expires") or 0 + ) + name = c.get("name", "") + value = c.get("value", "") + + tmp.write( + f"{domain}\t{include_sub}\t{path}\t" + f"{secure}\t{expiry}\t{name}\t{value}\n" + ) + + tmp.close() + logging.info( + f"🍪 Converted {len(cookies)} cookies to Netscape format: {tmp.name}" + ) + return tmp.name + + except Exception as e: + logging.warning( + f"⚠️ Could not convert cookies to Netscape format: " + f"{type(e).__name__}: {e}" + ) + return None + + # ───────────────────────────────────────────────────────────────────────────── # Bluesky error classification helpers # ───────────────────────────────────────────────────────────────────────────── @@ -323,54 +380,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: return default_delay -# ───────────────────────────────────────────────────────────────────────────── -# playwright-stealth application helper -# ───────────────────────────────────────────────────────────────────────────── -def apply_stealth(page): - """ - Apply playwright-stealth to a page object. - - Handles all known API variants: - v1.x → stealth_sync(page) - v2.x → Stealth().use_sync(page) returns a new wrapped page - v2.x → Stealth().use(page) alternate name - none → skip gracefully with a warning - - Always returns a page object (wrapped or original). - """ - if _STEALTH_V2 is None: - logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.") - return page - - try: - if _STEALTH_V2: - # v2.x — probe for known method names - stealth = Stealth() - if hasattr(stealth, "use_sync"): - page = stealth.use_sync(page) - logging.info("🥷 playwright-stealth v2.x applied (use_sync).") - elif hasattr(stealth, "use"): - page = stealth.use(page) - logging.info("🥷 playwright-stealth v2.x applied (use).") - else: - logging.warning( - "⚠️ playwright-stealth v2.x: no known apply method found " - "(tried use_sync, use). Skipping stealth." - ) - else: - # v1.x - stealth_sync(page) - logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).") - - except Exception as e: - logging.warning( - f"⚠️ playwright-stealth could not be applied: " - f"{type(e).__name__}: {e}. Continuing without stealth." - ) - - return page - - # ───────────────────────────────────────────────────────────────────────────── # Bluesky client # ───────────────────────────────────────────────────────────────────────────── @@ -393,21 +402,18 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})" ) if is_rate_limited_error(e): - delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) + delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) - wait = delay + jitter + wait = delay + jitter logging.warning( f"⏳ Bluesky login rate-limited (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " f"Retrying in {wait:.1f}s." ) time.sleep(wait) elif attempt < BSKY_LOGIN_MAX_RETRIES: - delay = min( - BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_LOGIN_MAX_DELAY, - ) + delay = min(BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), BSKY_LOGIN_MAX_DELAY) jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) - wait = delay + jitter + wait = delay + jitter logging.warning(f"⏳ Retrying login in {wait:.1f}s.") time.sleep(wait) else: @@ -558,9 +564,14 @@ def get_best_impersonation_target() -> str | None: return None -def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) -> bool: +def download_video_ytdlp( + url: str, + output_path: str, + netscape_cookies_path: str = None, +) -> bool: """ Download a TikTok video using yt-dlp with browser impersonation. + Accepts a Netscape-format cookie file path (not JSON). Returns True on success, False on failure. """ impersonate = get_best_impersonation_target() @@ -573,8 +584,8 @@ def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) - "merge_output_format": "mp4", } - if cookies_path and os.path.exists(cookies_path): - ydl_opts["cookiefile"] = cookies_path + if netscape_cookies_path and os.path.exists(netscape_cookies_path): + ydl_opts["cookiefile"] = netscape_cookies_path if impersonate: ydl_opts["impersonate"] = impersonate @@ -596,14 +607,20 @@ def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) - return False except Exception as e: - logging.error(f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}") + logging.error( + f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}" + ) return False -def download_video(url: str, output_path: str, cookies_path: str = None) -> bool: +def download_video( + url: str, + output_path: str, + netscape_cookies_path: str = None, +) -> bool: """Download a TikTok video via yt-dlp with browser impersonation.""" logging.info(f"⬇️ Downloading: {url}") - return download_video_ytdlp(url, output_path, cookies_path=cookies_path) + return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path) # ───────────────────────────────────────────────────────────────────────────── @@ -616,9 +633,7 @@ def upload_video_to_bluesky( ) -> object | None: """ Upload a video file to Bluesky as a blob. - - Fix 1: exception is always logged as type(e).__name__: e - so the actual error (413, 403, network error, etc.) is visible in logs. + Exception is always logged as type(e).__name__: e for full visibility. """ size_mb = os.path.getsize(video_path) / 1024 / 1024 logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...") @@ -635,7 +650,6 @@ def upload_video_to_bluesky( return blob.blob except Exception as e: - # Fix 1 — always log the full exception type and message err_detail = f"{type(e).__name__}: {e}" if attempt >= BSKY_UPLOAD_MAX_RETRIES: @@ -669,9 +683,7 @@ def post_video_to_bluesky( from atproto import models try: - video_embed = models.AppBskyEmbedVideo.Main( - video=blob, - ) + video_embed = models.AppBskyEmbedVideo.Main(video=blob) client.send_post( text=caption, embed=video_embed, @@ -713,6 +725,9 @@ def scrape_tiktok_profile_playwright( """ Scrape the most recent video URLs from a TikTok profile page using Playwright. Returns a list of dicts with keys: video_id, url, timestamp. + + Stealth fix: playwright-stealth v2.x must wrap the page via a context manager + on new_page(), not via .apply() or .use_sync() after the fact. """ profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"🕷️ Scraping TikTok profile: {profile_url}") @@ -741,80 +756,202 @@ def scrape_tiktok_profile_playwright( inject_cookies_into_context(context, cookies) - page = context.new_page() + # ── Stealth application ─────────────────────────────────────────── + # v1.x: stealth_sync(page) — called after new_page() + # v2.x: context manager on new_page — page must be created inside + # the Stealth() context, NOT wrapped after the fact. + # Stealth().use_sync(page) returns a SyncWrappingContextManager, + # not a Page — calling .goto() on it crashes. + # ───────────────────────────────────────────────────────────────── + page = None - # Apply stealth — gracefully handles all v1/v2/missing variants - page = apply_stealth(page) + if _STEALTH_V2 is None: + logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.") + page = context.new_page() - for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): + elif _STEALTH_V2: + # v2.x — use as context manager so the page is created inside it try: - logging.info( - f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." - ) - page.goto( - profile_url, - wait_until="domcontentloaded", - timeout=PLAYWRIGHT_TIMEOUT_MS, - ) - time.sleep(3) - dismiss_overlays(page) + stealth_instance = Stealth() + with stealth_instance(context) as stealthy_context: + page = stealthy_context.new_page() + logging.info("🥷 playwright-stealth v2.x applied (context manager).") + # Run the scraping loop inside the context manager scope + for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): + try: + logging.info( + f"🌐 Loading profile " + f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." + ) + page.goto( + profile_url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_TIMEOUT_MS, + ) + time.sleep(3) + dismiss_overlays(page) - # Wait for video grid - try: - page.wait_for_selector( - TIKTOK_VIDEO_GRID_SEL, - timeout=PLAYWRIGHT_TIMEOUT_MS, - ) - except Exception: - pass - - grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first - if not grid.is_visible(timeout=5000): - logging.warning(f"⚠️ Video grid not found on attempt {attempt}.") - ts = int(time.time()) - page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png") - logging.info( - f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png" - ) - time.sleep(3) - continue - - # Extract video links - items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() - for item in items[:limit]: - try: - link = item.locator("a").first.get_attribute("href") - if link and "/video/" in link: - vid_match = re.search(r"/video/(\d+)", link) - if vid_match: - video_id = vid_match.group(1) - full_url = ( - link if link.startswith("http") - else f"https://www.tiktok.com{link}" + try: + page.wait_for_selector( + TIKTOK_VIDEO_GRID_SEL, + timeout=PLAYWRIGHT_TIMEOUT_MS, ) - videos.append({ - "video_id": video_id, - "url": full_url, - "timestamp": None, - }) - except Exception: - pass + except Exception: + pass - if videos: - logging.info(f"✅ Playwright scraped {len(videos)} videos.") - break + grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first + if not grid.is_visible(timeout=5000): + logging.warning( + f"⚠️ Video grid not found on attempt {attempt}." + ) + ts = int(time.time()) + page.screenshot( + path=f"screenshot_no_grid_{attempt}_{ts}.png" + ) + logging.info( + f"📸 Screenshot saved: " + f"screenshot_no_grid_{attempt}_{ts}.png" + ) + time.sleep(3) + continue + + items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() + for item in items[:limit]: + try: + link = item.locator("a").first.get_attribute("href") + if link and "/video/" in link: + vid_match = re.search(r"/video/(\d+)", link) + if vid_match: + video_id = vid_match.group(1) + full_url = ( + link if link.startswith("http") + else f"https://www.tiktok.com{link}" + ) + videos.append({ + "video_id": video_id, + "url": full_url, + "timestamp": None, + }) + except Exception: + pass + + if videos: + logging.info( + f"✅ Playwright scraped {len(videos)} videos." + ) + break + + except Exception as e: + logging.warning( + f"⚠️ Playwright attempt {attempt} error: " + f"{type(e).__name__}: {e}" + ) + ts = int(time.time()) + try: + page.screenshot( + path=f"screenshot_error_{attempt}_{ts}.png" + ) + except Exception: + pass + time.sleep(3) except Exception as e: logging.warning( - f"⚠️ Playwright attempt {attempt} error: " - f"{type(e).__name__}: {e}" + f"⚠️ playwright-stealth v2.x context manager failed: " + f"{type(e).__name__}: {e}. Falling back to no-stealth page." ) - ts = int(time.time()) + page = context.new_page() + + else: + # v1.x — create page then apply stealth + page = context.new_page() + try: + stealth_sync(page) + logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).") + except Exception as e: + logging.warning( + f"⚠️ playwright-stealth v1.x failed: " + f"{type(e).__name__}: {e}. Continuing without stealth." + ) + + # ── Scraping loop for v1.x and no-stealth paths ─────────────────── + # (v2.x runs its loop inside the context manager above) + if page is not None and not videos and _STEALTH_V2 is not True: + for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): try: - page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png") - except Exception: - pass - time.sleep(3) + logging.info( + f"🌐 Loading profile " + f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." + ) + page.goto( + profile_url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_TIMEOUT_MS, + ) + time.sleep(3) + dismiss_overlays(page) + + try: + page.wait_for_selector( + TIKTOK_VIDEO_GRID_SEL, + timeout=PLAYWRIGHT_TIMEOUT_MS, + ) + except Exception: + pass + + grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first + if not grid.is_visible(timeout=5000): + logging.warning( + f"⚠️ Video grid not found on attempt {attempt}." + ) + ts = int(time.time()) + page.screenshot( + path=f"screenshot_no_grid_{attempt}_{ts}.png" + ) + logging.info( + f"📸 Screenshot saved: " + f"screenshot_no_grid_{attempt}_{ts}.png" + ) + time.sleep(3) + continue + + items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() + for item in items[:limit]: + try: + link = item.locator("a").first.get_attribute("href") + if link and "/video/" in link: + vid_match = re.search(r"/video/(\d+)", link) + if vid_match: + video_id = vid_match.group(1) + full_url = ( + link if link.startswith("http") + else f"https://www.tiktok.com{link}" + ) + videos.append({ + "video_id": video_id, + "url": full_url, + "timestamp": None, + }) + except Exception: + pass + + if videos: + logging.info(f"✅ Playwright scraped {len(videos)} videos.") + break + + except Exception as e: + logging.warning( + f"⚠️ Playwright attempt {attempt} error: " + f"{type(e).__name__}: {e}" + ) + ts = int(time.time()) + try: + page.screenshot( + path=f"screenshot_error_{attempt}_{ts}.png" + ) + except Exception: + pass + time.sleep(3) if not videos: logging.warning( @@ -822,26 +959,24 @@ def scrape_tiktok_profile_playwright( ) ts = int(time.time()) try: - page.screenshot(path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png") - logging.info( - f"📸 Screenshot saved: " - f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png" - ) + if page: + page.screenshot( + path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png" + ) + logging.info( + f"📸 Screenshot saved: " + f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png" + ) except Exception: pass - try: - page.close() - except Exception: - pass - try: - context.close() - except Exception: - pass - try: - browser.close() - except Exception: - pass + # ── Cleanup ─────────────────────────────────────────────────────── + for obj in (page, context, browser): + try: + if obj: + obj.close() + except Exception: + pass return videos @@ -851,11 +986,12 @@ def scrape_tiktok_profile_playwright( # ───────────────────────────────────────────────────────────────────────────── def scrape_tiktok_profile_ytdlp( handle: str, - cookies_path: str = None, + netscape_cookies_path: str = None, limit: int = SCRAPE_VIDEO_LIMIT, ) -> list[dict]: """ Fallback: use yt-dlp to extract the video list from a TikTok profile. + Accepts a Netscape-format cookie file path (not JSON). Returns a list of dicts with keys: video_id, url, timestamp. """ import yt_dlp @@ -871,8 +1007,8 @@ def scrape_tiktok_profile_ytdlp( "no_warnings": True, "playlistend": limit, } - if cookies_path and os.path.exists(cookies_path): - ydl_opts["cookiefile"] = cookies_path + if netscape_cookies_path and os.path.exists(netscape_cookies_path): + ydl_opts["cookiefile"] = netscape_cookies_path if impersonate: ydl_opts["impersonate"] = impersonate @@ -941,7 +1077,7 @@ def process_videos( state: dict, client: Client, tiktok_handle: str, - cookies_path: str, + netscape_cookies_path: str, langs: list[str], max_age_days: int, video_max_size_bytes: int, @@ -983,7 +1119,11 @@ def process_videos( comp_path = os.path.join(tmpdir, f"{video_id}.mp4") # 1. Download - ok = download_video(video_url, raw_path, cookies_path=cookies_path) + ok = download_video( + video_url, + raw_path, + netscape_cookies_path=netscape_cookies_path, + ) if not ok: logging.error(f"❌ Download failed for {video_id}. Skipping.") continue @@ -1090,48 +1230,69 @@ def main(): args.bsky_base_url, ) - # Scrape TikTok profile - logging.info(f"🔄 Scraping @{args.tiktok_handle}...") - cookies = load_cookies_from_file(args.cookies_path) + # Convert JSON cookies → Netscape format for yt-dlp + # Playwright uses the JSON cookies directly via inject_cookies_into_context() + # yt-dlp requires Netscape .txt format — convert once and reuse + netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path) + if netscape_cookies_path: + logging.info(f"🍪 Netscape cookie file ready: {netscape_cookies_path}") + else: + logging.warning("⚠️ Could not create Netscape cookie file. yt-dlp will run without cookies.") - videos = scrape_tiktok_profile_playwright( - args.tiktok_handle, - cookies, - limit=SCRAPE_VIDEO_LIMIT, - ) + try: + # Scrape TikTok profile + logging.info(f"🔄 Scraping @{args.tiktok_handle}...") + cookies = load_cookies_from_file(args.cookies_path) - if not videos: - logging.warning("⚠️ Playwright grid scraping failed. Trying yt-dlp fallback...") - ts = int(time.time()) - logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png") - - videos = scrape_tiktok_profile_ytdlp( + videos = scrape_tiktok_profile_playwright( args.tiktok_handle, - cookies_path=args.cookies_path, + cookies, limit=SCRAPE_VIDEO_LIMIT, ) - if not videos: - logging.error("❌ No videos found. Exiting.") - sys.exit(0) + if not videos: + logging.warning( + "⚠️ Playwright grid scraping failed. Trying yt-dlp fallback..." + ) + ts = int(time.time()) + logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png") - logging.info(f"📋 Found {len(videos)} video(s). Processing new ones...") + videos = scrape_tiktok_profile_ytdlp( + args.tiktok_handle, + netscape_cookies_path=netscape_cookies_path, + limit=SCRAPE_VIDEO_LIMIT, + ) - posted = process_videos( - videos=videos, - state=state, - client=client, - tiktok_handle=args.tiktok_handle, - cookies_path=args.cookies_path, - langs=args.bsky_langs, - max_age_days=args.max_age_days, - video_max_size_bytes=video_max_size_bytes, - ) + if not videos: + logging.error("❌ No videos found. Exiting.") + sys.exit(0) - logging.info("=" * 60) - logging.info(f"✅ Sync complete. Posted {posted} new video(s).") - logging.info("🤖 Bot finished.") - logging.info("=" * 60) + logging.info(f"📋 Found {len(videos)} video(s). Processing new ones...") + + posted = process_videos( + videos=videos, + state=state, + client=client, + tiktok_handle=args.tiktok_handle, + netscape_cookies_path=netscape_cookies_path, + langs=args.bsky_langs, + max_age_days=args.max_age_days, + video_max_size_bytes=video_max_size_bytes, + ) + + logging.info("=" * 60) + logging.info(f"✅ Sync complete. Posted {posted} new video(s).") + logging.info("🤖 Bot finished.") + logging.info("=" * 60) + + finally: + # Always clean up the temporary Netscape cookie file + if netscape_cookies_path and os.path.exists(netscape_cookies_path): + try: + os.remove(netscape_cookies_path) + logging.info(f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}") + except Exception as e: + logging.warning(f"⚠️ Could not remove Netscape cookie file: {e}") if __name__ == "__main__":