diff --git a/tiktok2bsky.py b/tiktok2bsky.py index 953f0fd..aee2f37 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -724,6 +724,161 @@ def _take_debug_screenshot(page, label: str): except Exception: pass +TIKTOK_GDPR_SELS = [ + 'button:has-text("Entendido")', + 'button:has-text("Understood")', + 'button:has-text("Got it")', + '[class*="gdpr"] button', + '[class*="privacy"] button:has-text("Entendido")', +] + + +def _dismiss_all_overlays(page): + """Dismiss GDPR notices, cookie banners and any other modals.""" + for sel in TIKTOK_GDPR_SELS + TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS: + try: + el = page.locator(sel).first + if el.is_visible(timeout=1500): + el.click(timeout=2000) + logging.info(f"🚫 Dismissed overlay: {sel}") + time.sleep(0.6) + except Exception: + pass + + +def _try_refresh_grid(page, max_attempts: int = 4) -> bool: + """ + Click the Actualizar / Refresh button up to max_attempts times, + waiting progressively longer each time. + Returns True if the video grid eventually appears. + """ + for i in range(1, max_attempts + 1): + wait_s = 4.0 * i + logging.info( + f"🔄 Grid error detected — clicking Actualizar " + f"(attempt {i}/{max_attempts}, waiting {wait_s:.0f}s)..." + ) + try: + page.locator(TIKTOK_REFRESH_BTN_SEL).first.click(timeout=3000) + except Exception: + pass + time.sleep(wait_s) + _dismiss_all_overlays(page) + try: + page.wait_for_selector(TIKTOK_VIDEO_GRID_SEL, timeout=6000) + logging.info("✅ Video grid appeared after refresh.") + return True + except Exception: + pass + return False + + +def _scrape_via_api(handle: str, cookies: list) -> list: + """ + Fallback: hit TikTok's internal item_list API directly with httpx. + Returns same list-of-dicts format as the Playwright scraper. + """ + logging.info(f"🌐 Trying TikTok API fallback for @{handle}...") + + # Build a cookie header string from the injected cookies + cookie_header = "; ".join( + f"{c.get('name', '')}={c.get('value', '')}" + for c in cookies + if c.get("name") and c.get("value") + ) + + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Referer": f"https://www.tiktok.com/@{handle}", + "Cookie": cookie_header, + "Accept": "application/json, text/plain, */*", + "Accept-Language": "es-ES,es;q=0.9", + } + + # Resolve the numeric user ID from the profile page HTML + user_id = _resolve_tiktok_user_id(handle, headers) + if not user_id: + logging.warning("⚠️ Could not resolve TikTok user ID for API fallback.") + return [] + + videos = [] + cursor = 0 + + try: + params = { + "aid": "1988", + "app_name": "tiktok_web", + "count": str(SCRAPE_VIDEO_LIMIT), + "cursor": str(cursor), + "secUid": "", + "id": user_id, + "type": "1", + "sourceType": "8", + "appId": "1233", + "region": "ES", + "priority_region": "ES", + "language": "es", + } + resp = httpx.get( + "https://www.tiktok.com/api/post/item_list/", + params=params, + headers=headers, + timeout=20, + follow_redirects=True, + ) + resp.raise_for_status() + data = resp.json() + + for item in data.get("itemList", []): + try: + vid_id = item.get("id", "") + desc = item.get("desc", "") + url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" + videos.append({ + "id": vid_id, + "url": url, + "desc": desc, + "timestamp": arrow.utcnow().isoformat(), + "video_url": url, + }) + except Exception as e: + logging.warning(f"⚠️ API item parse error: {e}") + + logging.info(f"✅ API fallback returned {len(videos)} videos.") + + except Exception as e: + logging.warning(f"⚠️ TikTok API fallback failed: {e}") + + return videos + + +def _resolve_tiktok_user_id(handle: str, headers: dict) -> str | None: + """Extract the numeric TikTok user ID from the profile page HTML.""" + try: + resp = httpx.get( + f"https://www.tiktok.com/@{handle}", + headers=headers, + timeout=15, + follow_redirects=True, + ) + # TikTok embeds user data in a __UNIVERSAL_DATA_FOR_REHYDRATION__ script tag + match = re.search(r'"uniqueId"\s*:\s*"[^"]+"\s*,\s*"id"\s*:\s*"(\d+)"', resp.text) + if match: + uid = match.group(1) + logging.info(f"✅ Resolved TikTok user ID: {uid}") + return uid + # Fallback pattern + match = re.search(r'"authorId"\s*:\s*"(\d+)"', resp.text) + if match: + return match.group(1) + except Exception as e: + logging.warning(f"⚠️ Could not resolve TikTok user ID: {e}") + return None + def scrape_tiktoks_via_playwright(handle: str) -> list: """ @@ -760,7 +915,6 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: timezone_id="Europe/Madrid", ) - # Inject saved cookies if cookies: inject_cookies_into_context(context, cookies) @@ -772,7 +926,6 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: else: stealth_sync(page) - # Mask automation signals page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.chrome = { runtime: {} }; @@ -780,7 +933,8 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: Object.defineProperty(navigator, 'languages', {get: () => ['es-ES', 'es', 'en']}); """) - # ── Multi-attempt page load ──────────────────────────────────── + grid_loaded = False + for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): logging.info( f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." @@ -800,28 +954,36 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: break time.sleep(random.uniform(2.5, 4.0)) - _dismiss_overlays(page) + + # ── Dismiss ALL overlays including GDPR ──────────────────── + _dismiss_all_overlays(page) time.sleep(1.5) - # Check for grid error state + # ── Check for grid error and retry with Actualizar ───────── try: if page.locator(TIKTOK_GRID_ERROR_SEL).is_visible(timeout=2000): - logging.warning("⚠️ Grid error state detected. Clicking Refresh...") - try: - page.locator(TIKTOK_REFRESH_BTN_SEL).first.click(timeout=3000) + if _try_refresh_grid(page, max_attempts=4): + grid_loaded = True + break + # Grid still broken — try a full page reload + logging.warning( + "⚠️ Grid still broken after Actualizar retries. " + "Reloading page..." + ) + if attempt < PLAYWRIGHT_MAX_RELOADS: time.sleep(3.0) - except Exception: - pass + continue except Exception: pass - # Wait for video grid + # ── Wait for video grid normally ─────────────────────────── try: page.wait_for_selector( TIKTOK_VIDEO_GRID_SEL, timeout=PLAYWRIGHT_TIMEOUT_MS, ) logging.info("✅ Video grid found.") + grid_loaded = True break except Exception: logging.warning( @@ -830,11 +992,16 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: _take_debug_screenshot(page, f"no_grid_{attempt}") if attempt < PLAYWRIGHT_MAX_RELOADS: time.sleep(3.0) - else: - logging.error("❌ Video grid never loaded after all attempts.") - _take_debug_screenshot(page, "final_fail") + + if not grid_loaded: + logging.warning( + "⚠️ Playwright grid scraping failed. " + "Trying API fallback..." + ) + _take_debug_screenshot(page, "playwright_failed") browser.close() - return [] + # ── API fallback ─────────────────────────────────────────── + return _scrape_via_api(handle, cookies) # ── Scroll to load more videos ───────────────────────────────── logging.info("📜 Scrolling to load videos...") @@ -848,23 +1015,19 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: for item in items[:SCRAPE_VIDEO_LIMIT]: try: - # Get the link link_el = item.locator("a").first href = link_el.get_attribute("href") or "" if not href or "/video/" not in href: continue - # Normalise URL if href.startswith("/"): href = "https://www.tiktok.com" + href - # Extract video ID vid_match = re.search(r"/video/(\d+)", href) if not vid_match: continue video_id = vid_match.group(1) - # Get description (best-effort) desc = "" try: desc = item.get_attribute("aria-label") or "" @@ -881,7 +1044,7 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: "url": href, "desc": desc, "timestamp": arrow.utcnow().isoformat(), - "video_url": href, # resolved later during download + "video_url": href, }) except Exception as e: @@ -890,10 +1053,15 @@ def scrape_tiktoks_via_playwright(handle: str) -> list: browser.close() + # ── If Playwright found nothing, try API fallback ────────────────── + if not videos: + logging.warning( + "⚠️ Playwright returned 0 videos. Trying API fallback..." + ) + return _scrape_via_api(handle, cookies) + logging.info(f"✅ Scraped {len(videos)} videos from @{handle}.") return videos - - # ───────────────────────────────────────────────────────────────────────────── # Core: process a single TikTok video → post to Bluesky # ─────────────────────────────────────────────────────────────────────────────