#!/usr/bin/env python3 """ tiktok2bsky.py ────────────── Scrapes recent videos from a public TikTok profile and cross-posts them to a Bluesky account. Usage: python tiktok2bsky.py \ --tiktok-handle jijantesfc \ --bsky-handle jijantesfc.bsky.social \ --bsky-app-password xxxx-xxxx-xxxx-xxxx \ --bsky-base-url https://bsky.social \ --bsky-langs es \ --cookies-path tiktok_cookies.json """ import argparse import json import logging import os import random import re import subprocess import sys import tempfile import time from datetime import datetime, timezone from pathlib import Path import arrow import httpx from atproto import Client from dotenv import load_dotenv from playwright.sync_api import sync_playwright # playwright-stealth 1.x uses stealth_sync, 2.x uses Stealth class try: from playwright_stealth import stealth_sync _STEALTH_V2 = False except ImportError: from playwright_stealth import Stealth _STEALTH_V2 = True # ───────────────────────────────────────────────────────────────────────────── # Logging # ───────────────────────────────────────────────────────────────────────────── logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("tiktok2bsky.log", encoding="utf-8"), ], level=logging.INFO, ) # ───────────────────────────────────────────────────────────────────────────── # Constants & defaults # ───────────────────────────────────────────────────────────────────────────── DEFAULT_BSKY_BASE_URL = "https://bsky.social" DEFAULT_BSKY_LANGS = ["es"] TIKTOK_COOKIES_PATH = "tiktok_cookies.json" STATE_FILE = "tiktok2bsky_state.json" STATE_MAX_ENTRIES = 5000 SCRAPE_VIDEO_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s # Bluesky login retry config BSKY_LOGIN_MAX_RETRIES = 4 BSKY_LOGIN_BASE_DELAY = 15.0 BSKY_LOGIN_MAX_DELAY = 120.0 BSKY_LOGIN_JITTER_MAX = 10.0 # Bluesky upload retry config BSKY_UPLOAD_MAX_RETRIES = 5 BSKY_UPLOAD_BASE_DELAY = 10.0 BSKY_UPLOAD_MAX_DELAY = 120.0 BSKY_UPLOAD_JITTER_MAX = 5.0 # Playwright scraping config PLAYWRIGHT_TIMEOUT_MS = 30_000 PLAYWRIGHT_SLOW_MO = 50 PLAYWRIGHT_MAX_RELOADS = 3 # TikTok selectors TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]' TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]' TIKTOK_BANNER_SELS = [ '[id*="banner"]', '[class*="banner"]', '[data-e2e="recommend-modal-close"]', 'button:has-text("Rechazar")', 'button:has-text("Reject")', 'button:has-text("Accept")', 'button:has-text("Aceptar")', '[aria-label="Close"]', '[aria-label="Cerrar"]', ] TIKTOK_COOKIE_MODAL_SELS = [ 'button:has-text("Decline all")', 'button:has-text("Rechazar todo")', 'button:has-text("Reject all")', 'button:has-text("Accept all")', 'button:has-text("Aceptar todo")', '[class*="cookie"] button', '[id*="cookie"] button', ] TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]' TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")' # ───────────────────────────────────────────────────────────────────────────── # Fix 2 — Dynamic video size limit based on PDS # ───────────────────────────────────────────────────────────────────────────── def get_video_size_limit(bsky_base_url: str) -> int: """ bsky.social supports ~50 MB blobs. Third-party PDS instances typically cap at 10–20 MB. Use a conservative 10 MB for anything that isn't the official PDS. """ if "bsky.social" in (bsky_base_url or ""): return 20 * 1024 * 1024 # 20 MB — official PDS return 10 * 1024 * 1024 # 10 MB — safe for third-party PDS # ───────────────────────────────────────────────────────────────────────────── # State management # ───────────────────────────────────────────────────────────────────────────── def load_state() -> dict: if os.path.exists(STATE_FILE): try: with open(STATE_FILE, "r", encoding="utf-8") as f: state = json.load(f) logging.info( f"📂 Loaded state: {len(state.get('posted', {}))} entries." ) return state except Exception as e: logging.warning(f"⚠️ Could not load state file: {e}. Starting fresh.") return {"posted": {}} def save_state(state: dict): # Prune to last STATE_MAX_ENTRIES posted = state.get("posted", {}) if len(posted) > STATE_MAX_ENTRIES: sorted_keys = sorted( posted.keys(), key=lambda k: posted[k].get("posted_at", ""), ) for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]: del posted[old_key] state["posted"] = posted try: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) except Exception as e: logging.error(f"❌ Could not save state: {e}") def is_already_posted(video_id: str, state: dict) -> bool: return video_id in state.get("posted", {}) def mark_as_posted(video_id: str, state: dict, meta: dict = None): state.setdefault("posted", {})[video_id] = { "posted_at": arrow.utcnow().isoformat(), **(meta or {}), } save_state(state) # ───────────────────────────────────────────────────────────────────────────── # Cookie helpers # ───────────────────────────────────────────────────────────────────────────── def load_cookies_from_file(path: str) -> list: """Load cookies from a JSON file (format produced by generate_tiktok_cookies.py).""" if not os.path.exists(path): logging.warning(f"⚠️ Cookie file not found: {path}") return [] try: with open(path, "r", encoding="utf-8") as f: cookies = json.load(f) logging.info(f"🍪 Loaded {len(cookies)} cookies from {path}") return cookies except Exception as e: logging.warning(f"⚠️ Could not load cookies from {path}: {e}") return [] def inject_cookies_into_context(context, cookies: list): """Inject a list of cookie dicts into a Playwright browser context.""" if not cookies: return playwright_cookies = [] for c in cookies: entry = { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ".tiktok.com"), "path": c.get("path", "/"), "secure": c.get("secure", False), "httpOnly": c.get("httpOnly", False), "sameSite": c.get("sameSite", "None"), } exp = c.get("expirationDate") or c.get("expires") if exp and float(exp) > 0: entry["expires"] = float(exp) playwright_cookies.append(entry) try: context.add_cookies(playwright_cookies) logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.") except Exception as e: logging.warning(f"⚠️ Could not inject cookies: {e}") # ───────────────────────────────────────────────────────────────────────────── # Bluesky error classification helpers # ───────────────────────────────────────────────────────────────────────────── def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: text = repr(error_obj) signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(sig in text for sig in signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: """ Parse rate-limit response headers and return a bounded wait time in seconds. Supports retry-after, x-ratelimit-after, and ratelimit-reset (unix timestamp). """ try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} for key in ("retry-after", "Retry-After"): if headers.get(key): return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY) for key in ("x-ratelimit-after", "X-RateLimit-After"): if headers.get(key): return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY) for key in ("ratelimit-reset", "RateLimit-Reset"): if headers.get(key): wait = max(int(headers[key]) - now_ts + 1, default_delay) return min(wait, BSKY_LOGIN_MAX_DELAY) except Exception: pass # repr() fallback — parse headers embedded in the exception string text = repr(error_obj) for pattern, is_timestamp in [ (r"'retry-after':\s*'(\d+)'", False), (r"'x-ratelimit-after':\s*'(\d+)'", False), (r"'ratelimit-reset':\s*'(\d+)'", True), ]: m = re.search(pattern, text, re.IGNORECASE) if m: val = int(m.group(1)) if is_timestamp: wait = max(val - int(time.time()) + 1, default_delay) return min(wait, BSKY_LOGIN_MAX_DELAY) return min(max(val, 1), BSKY_LOGIN_MAX_DELAY) return default_delay # ───────────────────────────────────────────────────────────────────────────── # Bluesky client # ───────────────────────────────────────────────────────────────────────────── def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: logging.info(f"🔐 Connecting Bluesky client via base URL: {base_url}") client = Client(base_url=base_url) for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: logging.info(f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}") client.login(handle, app_password) client.me = client.get_profile(handle) logging.info(f"✅ Bluesky login successful as {handle}") return client except Exception as e: logging.warning( f"⚠️ Bluesky login {type(e).__name__}: {e} (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})" ) if is_rate_limited_error(e): delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) wait = delay + jitter logging.warning( f"⏳ Bluesky login rate-limited (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). " f"Retrying in {wait:.1f}s." ) time.sleep(wait) elif attempt < BSKY_LOGIN_MAX_RETRIES: delay = min( BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), BSKY_LOGIN_MAX_DELAY, ) jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX) wait = delay + jitter logging.warning(f"⏳ Retrying login in {wait:.1f}s.") time.sleep(wait) else: logging.error(f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts.") raise raise RuntimeError("Bluesky login failed: exhausted all retries.") # ───────────────────────────────────────────────────────────────────────────── # Video helpers # ───────────────────────────────────────────────────────────────────────────── def get_video_duration(path: str) -> float: """Return video duration in seconds via ffprobe, or 0.0 on failure.""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ], capture_output=True, text=True, timeout=15, ) return float(result.stdout.strip()) except Exception as e: logging.warning(f"⚠️ ffprobe failed for {path}: {e}") return 0.0 def compress_video( input_path: str, output_path: str, max_duration: int = VIDEO_MAX_DURATION_S, max_size_bytes: int = None, # resolved at call-time from get_video_size_limit() ) -> bool: """ Re-encode input_path → output_path using libx264, targeting max_size_bytes. Key fixes applied: • pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement) • -maxrate == -b:v — hard ceiling, no burst above target • post-encode size guard — rejects file if still over limit """ if max_size_bytes is None: max_size_bytes = 20 * 1024 * 1024 # fallback try: duration = get_video_duration(input_path) if duration <= 0: logging.error( f"❌ compress_video: invalid duration={duration} " f"for {input_path} ({os.path.getsize(input_path)} bytes)" ) return False trim_to = min(duration, max_duration) # Target 85% of the size budget to leave headroom for container overhead target_bits = max_size_bytes * 8 * 0.85 total_kbps = int(target_bits / trim_to / 1000) audio_kbps = 96 video_kbps = max(200, total_kbps - audio_kbps) logging.info( f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, " f"video_bitrate={video_kbps}k " f"(target ≤ {max_size_bytes // 1024 // 1024}MB)" ) cmd = [ "ffmpeg", "-y", "-i", input_path, "-t", str(trim_to), # Scale to 720p max, then pad to even dimensions. # The pad filter is required because libx264 needs width/height # divisible by 2. Portrait TikTok videos (9:16) would otherwise # produce odd widths like 405px and crash the encoder. "-vf", ( "scale='min(1280,iw)':'min(720,ih)'" ":force_original_aspect_ratio=decrease," "pad=ceil(iw/2)*2:ceil(ih/2)*2" ), "-c:v", "libx264", "-b:v", f"{video_kbps}k", "-maxrate", f"{video_kbps}k", # hard ceiling — no burst above target "-bufsize", f"{video_kbps * 2}k", "-c:a", "aac", "-b:a", f"{audio_kbps}k", "-movflags", "+faststart", "-pix_fmt", "yuv420p", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: logging.error(f"❌ ffmpeg failed:\n{result.stderr}") return False final_size = os.path.getsize(output_path) # Reject if still over the hard limit if final_size > max_size_bytes: logging.error( f"❌ Compressed file still too large: " f"{final_size / 1024 / 1024:.1f} MB > " f"{max_size_bytes / 1024 / 1024:.0f} MB limit. Skipping." ) return False logging.info( f"✅ Compressed video: {final_size / 1024 / 1024:.1f} MB → {output_path}" ) return True except Exception as e: logging.error(f"❌ compress_video error: {e}") return False # ───────────────────────────────────────────────────────────────────────────── # yt-dlp download # ───────────────────────────────────────────────────────────────────────────── def get_best_impersonation_target() -> str | None: """ Dynamically select the best available curl_cffi impersonation target. Returns None if curl_cffi is not installed or no target is available. """ try: from curl_cffi.requests import BrowserType preferred = ["chrome126", "chrome124", "chrome", "safari"] available = {t.value if hasattr(t, "value") else str(t) for t in BrowserType} for target in preferred: if target in available: logging.info(f"🎭 yt-dlp impersonation target: {target}") return target # fallback: return first available if available: target = sorted(available)[0] logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}") return target except Exception as e: logging.warning(f"⚠️ Could not check impersonation targets: {e}") return None def download_video_ytdlp(url: str, output_path: str, cookies_path: str = None) -> bool: """ Download a TikTok video using yt-dlp with browser impersonation. Returns True on success, False on failure. """ impersonate = get_best_impersonation_target() ydl_opts = { "outtmpl": output_path, "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", "quiet": False, "no_warnings": False, "merge_output_format": "mp4", } if cookies_path and os.path.exists(cookies_path): ydl_opts["cookiefile"] = cookies_path if impersonate: ydl_opts["impersonate"] = impersonate try: import yt_dlp with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) if os.path.exists(output_path) and os.path.getsize(output_path) > 50 * 1024: size_mb = os.path.getsize(output_path) / 1024 / 1024 logging.info(f"✅ yt-dlp download OK: {size_mb:.1f} MB") return True else: logging.warning( f"⚠️ yt-dlp output too small or missing: {output_path} " f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)" ) return False except Exception as e: logging.error(f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}") return False def download_video(url: str, output_path: str, cookies_path: str = None) -> bool: """ Download a TikTok video. Routes directly to yt-dlp with browser impersonation. """ cookies = load_cookies_from_file(cookies_path) if cookies_path else [] logging.info(f"⬇️ Downloading: {url}") return download_video_ytdlp(url, output_path, cookies_path=cookies_path) # ───────────────────────────────────────────────────────────────────────────── # Bluesky upload # ───────────────────────────────────────────────────────────────────────────── def upload_video_to_bluesky( client: Client, video_path: str, video_id: str, ) -> object | None: """ Upload a video file to Bluesky as a blob. Fix 1 applied: exception is logged as type(e).__name__: e so the actual error (413, 403, network error, etc.) is always visible. """ size_mb = os.path.getsize(video_path) / 1024 / 1024 logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...") with open(video_path, "rb") as f: video_data = f.read() delay = BSKY_UPLOAD_BASE_DELAY for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1): try: blob = client.upload_blob(video_data) logging.info(f"✅ Blob uploaded successfully for {video_id}") return blob.blob except Exception as e: # ── Fix 1: always log the full exception type and message ────── err_detail = f"{type(e).__name__}: {e}" logging.warning( f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} " f"failed: {err_detail}. Retrying in {delay:.1f}s..." ) if attempt >= BSKY_UPLOAD_MAX_RETRIES: logging.error( f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: " f"{err_detail}" ) return None time.sleep(delay + random.uniform(0, BSKY_UPLOAD_JITTER_MAX)) delay = min(delay * 2, BSKY_UPLOAD_MAX_DELAY) return None # ───────────────────────────────────────────────────────────────────────────── # Bluesky post # ───────────────────────────────────────────────────────────────────────────── def post_video_to_bluesky( client: Client, blob, caption: str, langs: list[str], video_id: str, ) -> bool: """Create a Bluesky post embedding the uploaded video blob.""" from atproto import models try: video_embed = models.AppBskyEmbedVideo.Main( video=blob, ) client.send_post( text=caption, embed=video_embed, langs=langs, ) logging.info(f"✅ Posted video {video_id} to Bluesky.") return True except Exception as e: logging.error( f"❌ Failed to post video {video_id} to Bluesky: " f"{type(e).__name__}: {e}" ) return False # ───────────────────────────────────────────────────────────────────────────── # TikTok scraping — Playwright # ───────────────────────────────────────────────────────────────────────────── def dismiss_overlays(page) -> None: """Try to dismiss cookie banners and modal overlays.""" all_sels = TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS for sel in all_sels: try: el = page.locator(sel).first if el.is_visible(timeout=1500): el.click(timeout=1500) logging.info(f"🚫 Dismissed overlay: {sel}") time.sleep(0.5) except Exception: pass def scrape_tiktok_profile_playwright( handle: str, cookies: list, limit: int = SCRAPE_VIDEO_LIMIT, ) -> list[dict]: """ Scrape the most recent video URLs from a TikTok profile page using Playwright. Returns a list of dicts with keys: video_id, url, timestamp. """ profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"🕷️ Scraping TikTok profile: {profile_url}") videos = [] with sync_playwright() as p: browser = p.chromium.launch( headless=True, slow_mo=PLAYWRIGHT_SLOW_MO, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", ], ) context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/126.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 900}, locale="es-ES", ) inject_cookies_into_context(context, cookies) page = context.new_page() if _STEALTH_V2: Stealth().apply(page) else: stealth_sync(page) for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): try: logging.info(f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})...") page.goto( profile_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_TIMEOUT_MS, ) time.sleep(3) dismiss_overlays(page) # Wait for video grid try: page.wait_for_selector( TIKTOK_VIDEO_GRID_SEL, timeout=PLAYWRIGHT_TIMEOUT_MS, ) except Exception: pass grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first if not grid.is_visible(timeout=5000): logging.warning(f"⚠️ Video grid not found on attempt {attempt}.") ts = int(time.time()) page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png") logging.info(f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png") time.sleep(3) continue # Extract video links items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all() for item in items[:limit]: try: link = item.locator("a").first.get_attribute("href") if link and "/video/" in link: vid_match = re.search(r"/video/(\d+)", link) if vid_match: video_id = vid_match.group(1) full_url = ( link if link.startswith("http") else f"https://www.tiktok.com{link}" ) videos.append({ "video_id": video_id, "url": full_url, "timestamp": None, }) except Exception: pass if videos: logging.info(f"✅ Playwright scraped {len(videos)} videos.") break except Exception as e: logging.warning(f"⚠️ Playwright attempt {attempt} error: {type(e).__name__}: {e}") ts = int(time.time()) try: page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png") except Exception: pass time.sleep(3) if not videos: logging.warning("⚠️ Video grid not found on attempt 3.") ts = int(time.time()) try: page.screenshot(path=f"screenshot_no_grid_3_{ts}.png") logging.info(f"📸 Screenshot saved: screenshot_no_grid_3_{ts}.png") except Exception: pass page.close() context.close() browser.close() return videos # ───────────────────────────────────────────────────────────────────────────── # TikTok scraping — yt-dlp fallback # ───────────────────────────────────────────────────────────────────────────── def scrape_tiktok_profile_ytdlp( handle: str, cookies_path: str = None, limit: int = SCRAPE_VIDEO_LIMIT, ) -> list[dict]: """ Fallback: use yt-dlp to extract the video list from a TikTok profile. Returns a list of dicts with keys: video_id, url, timestamp. """ import yt_dlp profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"📦 yt-dlp profile scrape fallback for @{handle}...") impersonate = get_best_impersonation_target() ydl_opts = { "extract_flat": True, "quiet": True, "no_warnings": True, "playlistend": limit, } if cookies_path and os.path.exists(cookies_path): ydl_opts["cookiefile"] = cookies_path if impersonate: ydl_opts["impersonate"] = impersonate try: logging.info(f"🌐 yt-dlp extracting: {profile_url}") with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(profile_url, download=False) entries = info.get("entries", []) if info else [] logging.info( f"✅ yt-dlp returned {len(entries)} entries " f"(playlist: {info.get('title', '?') if info else '?'})" ) videos = [] for entry in entries: if not entry: continue url = entry.get("url") or entry.get("webpage_url") or "" vid_match = re.search(r"/video/(\d+)", url) if not vid_match: vid_id = entry.get("id", "") if vid_id: url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" vid_match = re.search(r"/video/(\d+)", url) if vid_match: videos.append({ "video_id": vid_match.group(1), "url": url, "timestamp": entry.get("timestamp"), }) logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.") return videos[:limit] except Exception as e: logging.error(f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}") return [] # ───────────────────────────────────────────────────────────────────────────── # Caption builder # ───────────────────────────────────────────────────────────────────────────── def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str: """Build a Bluesky post caption from video metadata.""" desc = (video_info.get("description") or "").strip() url = video_info.get("url", "") if desc: # Truncate description to leave room for the URL url_len = len(url) + 1 # +1 for newline max_desc = max_len - url_len if len(desc) > max_desc: desc = desc[: max_desc - 1] + "…" return f"{desc}\n{url}" return url # ───────────────────────────────────────────────────────────────────────────── # Main processing loop # ───────────────────────────────────────────────────────────────────────────── def process_videos( videos: list[dict], state: dict, client: Client, tiktok_handle: str, cookies_path: str, langs: list[str], max_age_days: int, video_max_size_bytes: int, ) -> int: """ Download, compress, upload and post each new video. Returns the count of successfully posted videos. """ posted_count = 0 now = arrow.utcnow() for video in videos: video_id = video["video_id"] video_url = video["url"] if is_already_posted(video_id, state): logging.info(f"⏭️ Already posted: {video_id}") continue # Age filter (only if timestamp is available) ts = video.get("timestamp") if ts: try: video_time = arrow.get(ts) age_days = (now - video_time).days if age_days > max_age_days: logging.info( f"⏭️ Video {video_id} too old ({age_days}d > {max_age_days}d). Skipping." ) continue except Exception: pass logging.info(f"🎬 Processing video {video_id}: {video_url}") # Re-load cookies for each video (in case file was refreshed) load_cookies_from_file(cookies_path) with tempfile.TemporaryDirectory() as tmpdir: raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") comp_path = os.path.join(tmpdir, f"{video_id}.mp4") # 1. Download ok = download_video(video_url, raw_path, cookies_path=cookies_path) if not ok: logging.error(f"❌ Download failed for {video_id}. Skipping.") continue # 2. Compress ok = compress_video( raw_path, comp_path, max_size_bytes=video_max_size_bytes, ) if not ok: logging.error(f"❌ Compression failed for {video_id}. Skipping.") continue # 3. Upload blob blob = upload_video_to_bluesky(client, comp_path, video_id) if blob is None: logging.error(f"❌ Blob upload failed for {video_id}.") continue # 4. Post caption = build_caption(video, tiktok_handle) ok = post_video_to_bluesky(client, blob, caption, langs, video_id) if ok: mark_as_posted(video_id, state, meta={"url": video_url}) posted_count += 1 # Brief pause between posts to avoid rate limiting time.sleep(random.uniform(2.0, 5.0)) return posted_count # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Cross-post TikTok videos to Bluesky." ) parser.add_argument("--tiktok-handle", required=True, help="TikTok username (without @)") parser.add_argument("--bsky-handle", required=True, help="Bluesky handle") parser.add_argument("--bsky-app-password", required=True, help="Bluesky app password") parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})") parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, help="BCP-47 language tags for posts (default: es)") parser.add_argument("--cookies-path", default=TIKTOK_COOKIES_PATH, help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})") parser.add_argument("--max-age-days", type=int, default=VIDEO_MAX_AGE_DAYS, help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})") return parser.parse_args() def main(): load_dotenv() args = parse_args() # ── Fix 2: resolve video size limit based on PDS ────────────────────── video_max_size_bytes = get_video_size_limit(args.bsky_base_url) logging.info("=" * 60) logging.info("🤖 TikTok→Bluesky bot started") logging.info(f" TikTok handle : @{args.tiktok_handle}") logging.info(f" Bluesky handle: {args.bsky_handle}") logging.info(f" Bluesky PDS : {args.bsky_base_url}") logging.info(f" Languages : {args.bsky_langs}") logging.info(f" Video size cap: {video_max_size_bytes // 1024 // 1024} MB") cookie_status = "✅ found" if os.path.exists(args.cookies_path) else "❌ NOT FOUND" logging.info(f" Cookie file : {args.cookies_path} ({cookie_status})") logging.info("=" * 60) state = load_state() # Connect to Bluesky client = connect_bluesky( args.bsky_handle, args.bsky_app_password, args.bsky_base_url, ) # Scrape TikTok profile logging.info(f"🔄 Scraping @{args.tiktok_handle}...") cookies = load_cookies_from_file(args.cookies_path) videos = scrape_tiktok_profile_playwright( args.tiktok_handle, cookies, limit=SCRAPE_VIDEO_LIMIT, ) if not videos: logging.warning("⚠️ Playwright grid scraping failed. Trying API fallback...") ts = int(time.time()) # Try to save a screenshot if playwright left a page open try: import glob for f in glob.glob("screenshot_no_grid_*.png"): pass # already saved inside scrape function except Exception: pass # Save a "playwright failed" screenshot placeholder in logs logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png") videos = scrape_tiktok_profile_ytdlp( args.tiktok_handle, cookies_path=args.cookies_path, limit=SCRAPE_VIDEO_LIMIT, ) if not videos: logging.error("❌ No videos found. Exiting.") sys.exit(0) logging.info(f"📋 Found {len(videos)} video(s). Processing new ones...") posted = process_videos( videos=videos, state=state, client=client, tiktok_handle=args.tiktok_handle, cookies_path=args.cookies_path, langs=args.bsky_langs, max_age_days=args.max_age_days, video_max_size_bytes=video_max_size_bytes, ) logging.info("=" * 60) logging.info(f"✅ Sync complete. Posted {posted} new video(s).") logging.info("🤖 Bot finished.") logging.info("=" * 60) if __name__ == "__main__": main()