#!/usr/bin/env python3 """ tiktok2bsky.py ────────────── Scrapes recent videos from a public TikTok profile and cross-posts them to a Bluesky account. Usage: python tiktok2bsky.py \ --tiktok-handle jijantesfc \ --bsky-handle jijantesfc.bsky.social \ --bsky-app-password xxxx-xxxx-xxxx-xxxx \ --bsky-base-url https://bsky.social \ --bsky-langs es \ --cookies-path tiktok_cookies.json """ import argparse import json import logging import os import random import re import subprocess import sys import tempfile import time from datetime import datetime, timezone from pathlib import Path import arrow import httpx from atproto import Client from dotenv import load_dotenv from playwright.sync_api import sync_playwright # playwright-stealth 1.x uses stealth_sync, 2.x uses Stealth class try: from playwright_stealth import stealth_sync _STEALTH_V2 = False except ImportError: from playwright_stealth import Stealth _STEALTH_V2 = True # ───────────────────────────────────────────────────────────────────────────── # Logging # ───────────────────────────────────────────────────────────────────────────── logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("tiktok2bsky.log", encoding="utf-8"), ], level=logging.INFO, ) # ───────────────────────────────────────────────────────────────────────────── # Constants & defaults # ───────────────────────────────────────────────────────────────────────────── DEFAULT_BSKY_BASE_URL = "https://bsky.social" DEFAULT_BSKY_LANGS = ["es"] TIKTOK_COOKIES_PATH = "tiktok_cookies.json" STATE_FILE = "tiktok2bsky_state.json" STATE_MAX_ENTRIES = 5000 SCRAPE_VIDEO_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s VIDEO_MAX_SIZE_BYTES = 45 * 1024 * 1024 # 45 MB # Bluesky login retry config BSKY_LOGIN_MAX_RETRIES = 4 BSKY_LOGIN_BASE_DELAY = 15.0 BSKY_LOGIN_MAX_DELAY = 120.0 BSKY_LOGIN_JITTER_MAX = 10.0 # Bluesky upload retry config BSKY_UPLOAD_MAX_RETRIES = 5 BSKY_UPLOAD_BASE_DELAY = 10.0 BSKY_UPLOAD_MAX_DELAY = 120.0 BSKY_UPLOAD_JITTER_MAX = 5.0 # Playwright scraping config PLAYWRIGHT_TIMEOUT_MS = 30_000 PLAYWRIGHT_SLOW_MO = 50 PLAYWRIGHT_MAX_RELOADS = 3 # TikTok selectors TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]' TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]' TIKTOK_BANNER_SELS = [ '[id*="banner"]', '[class*="banner"]', '[data-e2e="recommend-modal-close"]', 'button:has-text("Rechazar")', 'button:has-text("Reject")', 'button:has-text("Accept")', 'button:has-text("Aceptar")', '[aria-label="Close"]', '[aria-label="Cerrar"]', ] TIKTOK_COOKIE_MODAL_SELS = [ 'button:has-text("Decline all")', 'button:has-text("Rechazar todo")', 'button:has-text("Reject all")', 'button:has-text("Accept all")', 'button:has-text("Aceptar todo")', '[class*="cookie"] button', '[id*="cookie"] button', ] TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]' TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")' # ───────────────────────────────────────────────────────────────────────────── # State management # ───────────────────────────────────────────────────────────────────────────── def load_state() -> dict: if os.path.exists(STATE_FILE): try: with open(STATE_FILE, "r", encoding="utf-8") as f: state = json.load(f) logging.info( f"📂 Loaded state: {len(state.get('posted', {}))} entries." ) return state except Exception as e: logging.warning(f"⚠️ Could not load state file: {e}. Starting fresh.") return {"posted": {}} def save_state(state: dict): # Prune to last STATE_MAX_ENTRIES posted = state.get("posted", {}) if len(posted) > STATE_MAX_ENTRIES: sorted_keys = sorted( posted.keys(), key=lambda k: posted[k].get("posted_at", ""), ) for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]: del posted[old_key] state["posted"] = posted try: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) except Exception as e: logging.error(f"❌ Could not save state: {e}") def is_already_posted(video_id: str, state: dict) -> bool: return video_id in state.get("posted", {}) def mark_as_posted(video_id: str, state: dict, meta: dict = None): state.setdefault("posted", {})[video_id] = { "posted_at": arrow.utcnow().isoformat(), **(meta or {}), } save_state(state) # ───────────────────────────────────────────────────────────────────────────── # Cookie helpers # ───────────────────────────────────────────────────────────────────────────── def load_cookies_from_file(path: str) -> list: """Load cookies from a JSON file (format produced by generate_tiktok_cookies.py).""" if not os.path.exists(path): logging.warning(f"⚠️ Cookie file not found: {path}") return [] try: with open(path, "r", encoding="utf-8") as f: cookies = json.load(f) logging.info(f"🍪 Loaded {len(cookies)} cookies from {path}") return cookies except Exception as e: logging.warning(f"⚠️ Could not load cookies from {path}: {e}") return [] def inject_cookies_into_context(context, cookies: list): """Inject a list of cookie dicts into a Playwright browser context.""" if not cookies: return playwright_cookies = [] for c in cookies: entry = { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ".tiktok.com"), "path": c.get("path", "/"), "secure": c.get("secure", False), "httpOnly": c.get("httpOnly", False), "sameSite": c.get("sameSite", "None"), } exp = c.get("expirationDate") or c.get("expires") if exp and float(exp) > 0: entry["expires"] = float(exp) playwright_cookies.append(entry) try: context.add_cookies(playwright_cookies) logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.") except Exception as e: logging.warning(f"⚠️ Could not inject cookies: {e}") # ───────────────────────────────────────────────────────────────────────────── # Bluesky error classification helpers (ported from twitter2bsky.py) # ───────────────────────────────────────────────────────────────────────────── def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: text = repr(error_obj) signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(sig in text for sig in signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: """ Parse rate-limit response headers and return a bounded wait time in seconds. Supports retry-after, x-ratelimit-after, and ratelimit-reset (unix timestamp). Ported from twitter2bsky.py. """ try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} for key in ("retry-after", "Retry-After"): if headers.get(key): return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY) for key in ("x-ratelimit-after", "X-RateLimit-After"): if headers.get(key): return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY) for key in ("ratelimit-reset", "RateLimit-Reset"): if headers.get(key): wait = max(int(headers[key]) - now_ts + 1, default_delay) return min(wait, BSKY_LOGIN_MAX_DELAY) except Exception: pass # repr() fallback — parse headers embedded in the exception string text = repr(error_obj) for pattern, is_timestamp in [ (r"'retry-after':\s*'(\d+)'", False), (r"'x-ratelimit-after':\s*'(\d+)'", False), (r"'ratelimit-reset':\s*'(\d+)'", True), ]: m = re.search(pattern, text, re.IGNORECASE) if m: val = int(m.group(1)) if is_timestamp: return min( max(val - int(time.time()) + 1, default_delay), BSKY_LOGIN_MAX_DELAY, ) return min(max(val, 1), BSKY_LOGIN_MAX_DELAY) return default_delay # ───────────────────────────────────────────────────────────────────────────── # Bluesky helpers # ───────────────────────────────────────────────────────────────────────────── def bsky_login(client: Client, handle: str, password: str, base_url: str = DEFAULT_BSKY_BASE_URL) -> bool: """ Authenticate against the AT Protocol PDS. base_url is always https://bsky.social for standard Bluesky accounts — even when the user's handle lives on a custom domain like eurosky.social. The Client is re-initialised with the base URL baked in at construction time, which is the only reliable way to override the internal session resolver (mirrors create_bsky_client() in twitter2bsky.py). """ normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}") # Re-initialise the client so the base URL is baked in from the start. # Setting client.base_url after construction does not reliably override # the internal session resolver in the atproto SDK. client.__init__(base_url=normalized_base_url) for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: logging.info( f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} " f"for {handle}" ) client.login(handle, password) logging.info(f"✅ Bluesky login successful as {handle}") return True except Exception as e: # ── 401 / auth errors — no point retrying ───────────────── if is_auth_error(e): logging.error( f"❌ Bluesky login failed: invalid handle or app password.\n" f" Handle : {handle}\n" f" PDS : {normalized_base_url}\n" f" Fix : regenerate app password at " f"https://bsky.app/settings/app-passwords\n" f" Detail : {repr(e)}" ) return False # ── Rate limit ───────────────────────────────────────────── if is_rate_limited_error(e): if attempt < BSKY_LOGIN_MAX_RETRIES: wait = get_rate_limit_wait_seconds( e, default_delay=BSKY_LOGIN_BASE_DELAY ) wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX) logging.warning( f"⏳ Bluesky login rate-limited " f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " f"Retrying in {wait:.1f}s." ) time.sleep(wait) continue logging.error( "❌ Exhausted Bluesky login retries due to rate limiting." ) return False # ── Transient / network errors ───────────────────────────── if is_network_error(e) or is_transient_error(e): if attempt < BSKY_LOGIN_MAX_RETRIES: wait = min( BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY, ) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) logging.warning( f"⏳ Transient login failure " f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " f"Retrying in {wait:.1f}s." ) time.sleep(wait) continue logging.error( "❌ Exhausted Bluesky login retries after " "transient/network errors." ) return False # ── Unexpected error — retry with backoff ────────────────── if attempt < BSKY_LOGIN_MAX_RETRIES: wait = min( BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY, ) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) logging.warning( f"⏳ Unexpected login error " f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): " f"{repr(e)}. Retrying in {wait:.1f}s." ) time.sleep(wait) continue logging.error( f"❌ All Bluesky login attempts failed. Last error: {repr(e)}" ) return False return False def bsky_get_recent_post_urls(client: Client, handle: str, limit: int = 50) -> set: """Return a set of URLs recently posted to Bluesky (to avoid duplicates).""" urls: set = set() try: feed = client.get_author_feed(actor=handle, limit=limit) for item in feed.feed: post = item.post if hasattr(post, "record") and hasattr(post.record, "embed"): embed = post.record.embed if hasattr(embed, "external") and hasattr(embed.external, "uri"): urls.add(embed.external.uri) if hasattr(post, "record") and hasattr(post.record, "text"): text = post.record.text found = re.findall(r"https?://\S+", text) urls.update(found) except Exception as e: logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}") return urls def bsky_upload_blob_with_retry(client: Client, data: bytes, mime_type: str) -> object: """Upload a blob to Bluesky with retry + exponential backoff.""" for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): try: resp = client.upload_blob(data) logging.info( f"✅ Blob uploaded ({len(data) / 1024 / 1024:.1f} MB) " f"on attempt {attempt}." ) return resp.blob except Exception as e: is_rate_limit = is_rate_limited_error(e) if attempt == BSKY_UPLOAD_MAX_RETRIES: logging.error( f"❌ Blob upload failed after " f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}" ) raise delay = min( BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, BSKY_UPLOAD_JITTER_MAX), BSKY_UPLOAD_MAX_DELAY, ) if is_rate_limit: delay = max( get_rate_limit_wait_seconds(e, default_delay=delay), 60.0, ) logging.warning( f"⚠️ Blob upload attempt {attempt} failed: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) def bsky_create_post_with_retry(client: Client, text: str, embed=None, langs=None) -> bool: """Create a Bluesky post with retry + exponential backoff.""" for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): try: kwargs = {"text": text} if embed: kwargs["embed"] = embed if langs: kwargs["langs"] = langs client.send_post(**kwargs) logging.info(f"✅ Post created on attempt {attempt}.") return True except Exception as e: is_rate_limit = is_rate_limited_error(e) if attempt == BSKY_UPLOAD_MAX_RETRIES: logging.error( f"❌ Post creation failed after " f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}" ) return False delay = min( BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, BSKY_UPLOAD_JITTER_MAX), BSKY_UPLOAD_MAX_DELAY, ) if is_rate_limit: delay = max( get_rate_limit_wait_seconds(e, default_delay=delay), 60.0, ) logging.warning( f"⚠️ Post creation attempt {attempt} failed: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) return False # ───────────────────────────────────────────────────────────────────────────── # Video processing helpers # ───────────────────────────────────────────────────────────────────────────── def get_video_duration(path: str) -> float: """Return video duration in seconds using ffprobe.""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ], capture_output=True, text=True, timeout=30, ) return float(result.stdout.strip()) except Exception as e: logging.warning(f"⚠️ ffprobe failed: {e}") return 0.0 def compress_video(input_path: str, output_path: str, max_duration: int = VIDEO_MAX_DURATION_S, max_size_bytes: int = VIDEO_MAX_SIZE_BYTES) -> bool: try: duration = get_video_duration(input_path) # Guard: ffprobe returned 0 = file is not a valid video if duration <= 0: logging.error( f"❌ compress_video: ffprobe returned duration={duration} " f"— file is not a valid video: {input_path} " f"({os.path.getsize(input_path)} bytes)" ) return False trim_to = min(duration, max_duration) target_bits = max_size_bytes * 8 * 0.90 target_kbps = int(target_bits / trim_to / 1000) video_kbps = max(200, target_kbps - 128) logging.info( f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, " f"video_bitrate={video_kbps}k" ) cmd = [ "ffmpeg", "-y", "-i", input_path, "-t", str(trim_to), "-vf", "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease", "-c:v", "libx264", "-b:v", f"{video_kbps}k", "-maxrate", f"{video_kbps * 2}k", "-bufsize", f"{video_kbps * 4}k", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-pix_fmt", "yuv420p", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: logging.error(f"❌ ffmpeg failed:\n{result.stderr}") return False final_size = os.path.getsize(output_path) logging.info( f"✅ Compressed video: {final_size / 1024 / 1024:.1f} MB → {output_path}" ) return True except Exception as e: logging.error(f"❌ compress_video error: {e}") return False def download_video(url: str, output_path: str, cookies: list = None) -> bool: """ Download a TikTok video using yt-dlp with impersonation. Direct HTTP download is skipped — TikTok always returns HTML for video page URLs, never a raw MP4. """ return download_video_ytdlp(url, output_path, cookies=cookies) def download_video_ytdlp(url: str, output_path: str, cookies: list = None) -> bool: """ Download a video using yt-dlp with TikTok impersonation. Requires curl-cffi: pip install curl-cffi """ cookie_file = None try: import yt_dlp ydl_opts = { "outtmpl": output_path, "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", "quiet": True, "no_warnings": False, "merge_output_format": "mp4", } # ── Impersonation: try targets in order of preference ────────── # curl_cffi must be installed: pip install curl-cffi impersonate_targets = ["chrome126", "chrome124", "chrome", "safari"] impersonate_set = False try: import yt_dlp.networking.impersonate as _imp available = {str(t) for t in _imp.ImpersonateTarget.supported_targets()} for target in impersonate_targets: if any(target in a for a in available): ydl_opts["impersonate"] = target logging.info(f"🎭 yt-dlp impersonation target: {target}") impersonate_set = True break if not impersonate_set: logging.warning( f"⚠️ No impersonation target available. " f"Available: {available}. " f"Install curl-cffi: pip install curl-cffi" ) except Exception as e: logging.warning(f"⚠️ Could not check impersonation targets: {e}") if cookies: cookie_file = _write_netscape_cookies(cookies) if cookie_file: ydl_opts["cookiefile"] = cookie_file with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Validate: must exist AND be a real video (> 50 KB) if os.path.exists(output_path): size = os.path.getsize(output_path) if size > 50_000: logging.info( f"✅ yt-dlp download OK: {size / 1024 / 1024:.1f} MB" ) return True logging.error( f"❌ yt-dlp output too small ({size} bytes) — " f"likely an HTML error page, not a video." ) return False logging.error("❌ yt-dlp produced no output file.") return False except Exception as e: logging.error(f"❌ yt-dlp download failed: {e}") return False finally: if cookie_file and os.path.exists(cookie_file): os.unlink(cookie_file) def _write_netscape_cookies(cookies: list) -> str | None: """Write cookies list to a Netscape-format temp file for yt-dlp.""" try: fd, path = tempfile.mkstemp(suffix=".txt", prefix="tiktok_cookies_") with os.fdopen(fd, "w", encoding="utf-8") as f: f.write("# Netscape HTTP Cookie File\n") for c in cookies: domain = c.get("domain", ".tiktok.com") flag = "TRUE" if domain.startswith(".") else "FALSE" path_val = c.get("path", "/") secure = "TRUE" if c.get("secure") else "FALSE" exp = int( c.get("expirationDate", 0) or c.get("expires", 0) or 0 ) name = c.get("name", "") value = c.get("value", "") f.write( f"{domain}\t{flag}\t{path_val}\t{secure}\t" f"{exp}\t{name}\t{value}\n" ) return path except Exception as e: logging.warning(f"⚠️ Could not write Netscape cookie file: {e}") return None # ───────────────────────────────────────────────────────────────────────────── # TikTok scraping via Playwright # ───────────────────────────────────────────────────────────────────────────── def _dismiss_overlays(page): """Dismiss cookie banners and RGPD modals.""" for sel in TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS: try: el = page.locator(sel).first if el.is_visible(timeout=1500): el.click(timeout=2000) logging.info(f"🚫 Dismissed overlay: {sel}") time.sleep(0.5) except Exception: pass def _take_debug_screenshot(page, label: str): """Save a debug screenshot to workspace.""" try: path = f"screenshot_{label}_{int(time.time())}.png" page.screenshot(path=path) logging.info(f"📸 Screenshot saved: {path}") except Exception: pass TIKTOK_GDPR_SELS = [ 'button:has-text("Entendido")', 'button:has-text("Understood")', 'button:has-text("Got it")', '[class*="gdpr"] button', '[class*="privacy"] button:has-text("Entendido")', ] def _dismiss_all_overlays(page): """Dismiss GDPR notices, cookie banners and any other modals.""" for sel in TIKTOK_GDPR_SELS + TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS: try: el = page.locator(sel).first if el.is_visible(timeout=1500): el.click(timeout=2000) logging.info(f"🚫 Dismissed overlay: {sel}") time.sleep(0.6) except Exception: pass def _try_refresh_grid(page, max_attempts: int = 4) -> bool: """ Click the Actualizar / Refresh button up to max_attempts times, waiting progressively longer each time. Returns True if the video grid eventually appears. """ for i in range(1, max_attempts + 1): wait_s = 4.0 * i logging.info( f"🔄 Grid error detected — clicking Actualizar " f"(attempt {i}/{max_attempts}, waiting {wait_s:.0f}s)..." ) try: page.locator(TIKTOK_REFRESH_BTN_SEL).first.click(timeout=3000) except Exception: pass time.sleep(wait_s) _dismiss_all_overlays(page) try: page.wait_for_selector(TIKTOK_VIDEO_GRID_SEL, timeout=6000) logging.info("✅ Video grid appeared after refresh.") return True except Exception: pass return False def _scrape_via_api(handle: str, cookies: list) -> list: """ Fallback scraper using yt-dlp to list videos from a TikTok profile. yt-dlp handles TikTok's request signing internally — no raw API needed. Returns same list-of-dicts format as the Playwright scraper. """ logging.info(f"📦 yt-dlp profile scrape fallback for @{handle}...") cookie_file = None videos = [] try: import yt_dlp cookie_file = _write_netscape_cookies(cookies) ydl_opts = { "quiet": True, "no_warnings": False, "extract_flat": True, # metadata only — no video download yet "playlistend": SCRAPE_VIDEO_LIMIT, "ignoreerrors": True, } if cookie_file: ydl_opts["cookiefile"] = cookie_file profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"🌐 yt-dlp extracting: {profile_url}") with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(profile_url, download=False) if not info: logging.warning("⚠️ yt-dlp returned no info for profile.") return [] entries = info.get("entries") or [] logging.info( f"✅ yt-dlp returned {len(entries)} entries " f"(playlist: {info.get('title', '?')})" ) for entry in entries[:SCRAPE_VIDEO_LIMIT]: try: if not entry: continue vid_id = str(entry.get("id") or "") url = ( entry.get("webpage_url") or entry.get("url") or "" ) desc = ( entry.get("title") or entry.get("description") or "" ) # Normalise URL if vid_id and not url: url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" # Extract ID from URL if missing if not vid_id and url: m = re.search(r"/video/(\d+)", url) if m: vid_id = m.group(1) if not vid_id: logging.debug(f"⏭️ Skipping entry with no ID: {entry}") continue videos.append({ "id": vid_id, "url": url, "desc": desc, "timestamp": arrow.utcnow().isoformat(), "video_url": url, }) logging.debug(f" 📹 {vid_id}: {desc[:60]}") except Exception as e: logging.warning(f"⚠️ yt-dlp entry parse error: {e}") logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.") except Exception as e: logging.error(f"❌ yt-dlp profile scrape failed: {e}") finally: if cookie_file and os.path.exists(cookie_file): os.unlink(cookie_file) return videos def _resolve_tiktok_ids(handle: str, headers: dict) -> tuple[str | None, str | None]: """ Extract both the numeric user ID and secUid from the profile page HTML. Returns (user_id, sec_uid) — either may be None. """ user_id = None sec_uid = None try: resp = httpx.get( f"https://www.tiktok.com/@{handle}", headers=headers, timeout=15, follow_redirects=True, ) html = resp.text # ── Numeric user ID ──────────────────────────────────────────── id_patterns = [ r'"authorId"\s*:\s*"(\d{15,25})"', r'"author"\s*:\s*\{[^}]*"id"\s*:\s*"(\d{15,25})"', r'"userId"\s*:\s*"(\d{15,25})"', r'"uid"\s*:\s*"(\d{15,25})"', r'"ownerUid"\s*:\s*"(\d{15,25})"', r',"id":"(\d{15,25})","uniqueId":"' + re.escape(handle) + r'"', r'"uniqueId":"' + re.escape(handle) + r'","id":"(\d{15,25})"', ] for pattern in id_patterns: m = re.search(pattern, html, re.IGNORECASE) if m: user_id = m.group(1) logging.info(f"✅ Resolved TikTok user ID: {user_id}") break # ── secUid ───────────────────────────────────────────────────── sec_patterns = [ r'"secUid"\s*:\s*"([A-Za-z0-9_\-]{20,})"', r'"authorSecId"\s*:\s*"([A-Za-z0-9_\-]{20,})"', ] for pattern in sec_patterns: m = re.search(pattern, html, re.IGNORECASE) if m: sec_uid = m.group(1) logging.info(f"✅ Resolved TikTok secUid: {sec_uid[:30]}...") break if not user_id and not sec_uid: # Window search fallback handle_pos = html.find(f'"uniqueId":"{handle}"') if handle_pos != -1: window = html[max(0, handle_pos - 300): handle_pos + 300] m = re.search(r'"id"\s*:\s*"(\d{15,25})"', window) if m: user_id = m.group(1) logging.info(f"✅ Resolved TikTok user ID (window): {user_id}") m = re.search(r'"secUid"\s*:\s*"([A-Za-z0-9_\-]{20,})"', window) if m: sec_uid = m.group(1) logging.info(f"✅ Resolved TikTok secUid (window): {sec_uid[:30]}...") if not user_id and not sec_uid: logging.warning( f"⚠️ Could not resolve any TikTok ID for @{handle}. " f"HTML length: {len(html)} chars." ) except Exception as e: logging.warning(f"⚠️ Could not resolve TikTok IDs: {e}") return user_id, sec_uid def scrape_tiktoks_via_playwright(handle: str) -> list: """ Scrape recent videos from a public TikTok profile. Returns a list of dicts: {id, url, desc, timestamp, video_url} """ profile_url = f"https://www.tiktok.com/@{handle.lstrip('@')}" cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH) videos = [] logging.info(f"🕷️ Scraping TikTok profile: {profile_url}") with sync_playwright() as p: browser = p.chromium.launch( headless=True, slow_mo=PLAYWRIGHT_SLOW_MO, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-blink-features=AutomationControlled", "--disable-dev-shm-usage", "--disable-gpu", ], ) context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 900}, locale="es-ES", timezone_id="Europe/Madrid", ) if cookies: inject_cookies_into_context(context, cookies) page = context.new_page() # Stealth mode — compatible with both v1.x and v2.x if _STEALTH_V2: Stealth().apply_stealth_sync(page) else: stealth_sync(page) page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.chrome = { runtime: {} }; Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]}); Object.defineProperty(navigator, 'languages', {get: () => ['es-ES', 'es', 'en']}); """) grid_loaded = False for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): logging.info( f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..." ) try: page.goto( profile_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_TIMEOUT_MS, ) except Exception as e: logging.warning(f"⚠️ page.goto failed on attempt {attempt}: {e}") _take_debug_screenshot(page, f"goto_fail_{attempt}") if attempt < PLAYWRIGHT_MAX_RELOADS: time.sleep(3.0) continue break time.sleep(random.uniform(2.5, 4.0)) # ── Dismiss ALL overlays including GDPR ──────────────────── _dismiss_all_overlays(page) time.sleep(1.5) # ── Check for grid error and retry with Actualizar ───────── try: if page.locator(TIKTOK_GRID_ERROR_SEL).is_visible(timeout=2000): if _try_refresh_grid(page, max_attempts=4): grid_loaded = True break # Grid still broken — try a full page reload logging.warning( "⚠️ Grid still broken after Actualizar retries. " "Reloading page..." ) if attempt < PLAYWRIGHT_MAX_RELOADS: time.sleep(3.0) continue except Exception: pass # ── Wait for video grid normally ─────────────────────────── try: page.wait_for_selector( TIKTOK_VIDEO_GRID_SEL, timeout=PLAYWRIGHT_TIMEOUT_MS, ) logging.info("✅ Video grid found.") grid_loaded = True break except Exception: logging.warning( f"⚠️ Video grid not found on attempt {attempt}." ) _take_debug_screenshot(page, f"no_grid_{attempt}") if attempt < PLAYWRIGHT_MAX_RELOADS: time.sleep(3.0) if not grid_loaded: logging.warning( "⚠️ Playwright grid scraping failed. " "Trying API fallback..." ) _take_debug_screenshot(page, "playwright_failed") browser.close() # ── API fallback ─────────────────────────────────────────── return _scrape_via_api(handle, cookies) # ── Scroll to load more videos ───────────────────────────────── logging.info("📜 Scrolling to load videos...") for _ in range(5): page.evaluate("window.scrollBy(0, window.innerHeight * 2)") time.sleep(random.uniform(1.0, 2.0)) # ── Extract video items ──────────────────────────────────────── items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() logging.info(f"📋 Found {len(items)} video items in grid.") for item in items[:SCRAPE_VIDEO_LIMIT]: try: link_el = item.locator("a").first href = link_el.get_attribute("href") or "" if not href or "/video/" not in href: continue if href.startswith("/"): href = "https://www.tiktok.com" + href vid_match = re.search(r"/video/(\d+)", href) if not vid_match: continue video_id = vid_match.group(1) desc = "" try: desc = item.get_attribute("aria-label") or "" if not desc: desc_el = item.locator( '[class*="desc"], [class*="title"]' ).first desc = desc_el.inner_text(timeout=1000).strip() except Exception: pass videos.append({ "id": video_id, "url": href, "desc": desc, "timestamp": arrow.utcnow().isoformat(), "video_url": href, }) except Exception as e: logging.warning(f"⚠️ Error parsing video item: {e}") continue browser.close() # ── If Playwright found nothing, try API fallback ────────────────── if not videos: logging.warning( "⚠️ Playwright returned 0 videos. Trying API fallback..." ) return _scrape_via_api(handle, cookies) logging.info(f"✅ Scraped {len(videos)} videos from @{handle}.") return videos # ───────────────────────────────────────────────────────────────────────────── # Core: process a single TikTok video → post to Bluesky # ───────────────────────────────────────────────────────────────────────────── def process_tiktok(video: dict, client: Client, langs: list, state: dict) -> bool: """ Download, compress, and post a single TikTok video to Bluesky. Returns True if successfully posted. """ video_id = video["id"] video_url = video["url"] desc = video.get("desc", "") # ── Deduplication ────────────────────────────────────────────────── if is_already_posted(video_id, state): logging.info(f"⏭️ Skipping already-posted video: {video_id}") return False logging.info(f"🎬 Processing video {video_id}: {video_url}") cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH) with tempfile.TemporaryDirectory() as tmpdir: raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") processed_path = os.path.join(tmpdir, f"{video_id}.mp4") # ── Download ─────────────────────────────────────────────────── logging.info(f"⬇️ Downloading: {video_url}") if not download_video(video_url, raw_path, cookies=cookies): logging.error(f"❌ Download failed for {video_id}. Skipping.") return False # ── Compress / trim ──────────────────────────────────────────── if not compress_video(raw_path, processed_path): logging.error(f"❌ Compression failed for {video_id}. Skipping.") return False # ── Size guard ───────────────────────────────────────────────── final_size = os.path.getsize(processed_path) if final_size > VIDEO_MAX_SIZE_BYTES: logging.error( f"❌ Compressed video still too large: " f"{final_size / 1024 / 1024:.1f} MB > " f"{VIDEO_MAX_SIZE_BYTES / 1024 / 1024:.0f} MB. Skipping." ) return False # ── Upload to Bluesky ────────────────────────────────────────── logging.info( f"⬆️ Uploading to Bluesky " f"({final_size / 1024 / 1024:.1f} MB)..." ) with open(processed_path, "rb") as f: video_data = f.read() try: blob = bsky_upload_blob_with_retry(client, video_data, "video/mp4") except Exception as e: logging.error(f"❌ Blob upload failed for {video_id}: {e}") return False # ── Build post text ──────────────────────────────────────────── post_text = desc.strip() if desc else "" if len(post_text) > 280: post_text = post_text[:277] + "..." if not post_text: post_text = f"🎬 {video_url}" # ── Build video embed ────────────────────────────────────────── try: from atproto import models video_embed = models.AppBskyEmbedVideo.Main( video=blob, alt=desc[:1000] if desc else "", ) except Exception as e: logging.error(f"❌ Could not build video embed: {e}") return False # ── Create post ──────────────────────────────────────────────── success = bsky_create_post_with_retry( client, text=post_text, embed=video_embed, langs=langs, ) if success: mark_as_posted(video_id, state, { "tiktok_url": video_url, "desc": desc[:200] if desc else "", }) logging.info(f"✅ Posted video {video_id} to Bluesky.") return True logging.error(f"❌ Failed to post video {video_id} to Bluesky.") return False # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── def main(): global TIKTOK_COOKIES_PATH # must be first line in function load_dotenv() parser = argparse.ArgumentParser( description="TikTok → Bluesky cross-poster" ) parser.add_argument( "--tiktok-handle", required=True, help="TikTok handle to scrape (without @)", ) parser.add_argument( "--bsky-handle", required=True, help="Bluesky handle (e.g. user.bsky.social)", ) parser.add_argument( "--bsky-app-password", required=True, help="Bluesky app password (not account password)", ) parser.add_argument( "--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, help=( "Bluesky AT Protocol PDS base URL. " "Always https://bsky.social even for custom-domain users " "(e.g. eurosky.social handles still authenticate via bsky.social). " f"Default: {DEFAULT_BSKY_BASE_URL}" ), ) parser.add_argument( "--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, help="Post language codes (default: es)", ) parser.add_argument( "--cookies-path", default=TIKTOK_COOKIES_PATH, help="Path to TikTok cookies JSON file", ) args = parser.parse_args() # Override global cookie path from CLI TIKTOK_COOKIES_PATH = args.cookies_path logging.info("=" * 60) logging.info("🤖 TikTok→Bluesky bot started") logging.info(f" TikTok handle : @{args.tiktok_handle}") logging.info(f" Bluesky handle: {args.bsky_handle}") logging.info(f" Bluesky PDS : {args.bsky_base_url}") logging.info(f" Languages : {args.bsky_langs}") logging.info( f" Cookie file : {TIKTOK_COOKIES_PATH} " f"({'✅ found' if os.path.exists(TIKTOK_COOKIES_PATH) else '❌ NOT FOUND'})" ) logging.info("=" * 60) state = load_state() # Instantiate client — base URL is baked in via bsky_login() client = Client() # ── Bluesky login ────────────────────────────────────────────────── if not bsky_login( client, args.bsky_handle, args.bsky_app_password, args.bsky_base_url, ): logging.error("❌ Cannot proceed without Bluesky login. Exiting.") sys.exit(1) # ── Scrape TikTok ────────────────────────────────────────────────── logging.info(f"🔄 Scraping @{args.tiktok_handle}...") tiktoks = scrape_tiktoks_via_playwright(args.tiktok_handle) if not tiktoks: logging.warning("⚠️ No TikTok videos found. Skipping sync.") logging.info("🤖 Bot finished.") return logging.info(f"📋 Found {len(tiktoks)} video(s). Processing new ones...") # ── Process each video ───────────────────────────────────────────── posted = 0 for tiktok in tiktoks: try: if process_tiktok(tiktok, client, args.bsky_langs, state): posted += 1 # Polite delay between posts time.sleep(random.uniform(3.0, 7.0)) except Exception as e: logging.error( f"❌ Unexpected error processing video " f"{tiktok.get('id', '?')}: {e}" ) continue logging.info("=" * 60) logging.info(f"✅ Sync complete. Posted {posted} new video(s).") logging.info("🤖 Bot finished.") logging.info("=" * 60) if __name__ == "__main__": main()