import argparse import arrow import hashlib import json import logging import re import time import os import subprocess import uuid import random import tempfile from pathlib import Path from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip import grapheme # ───────────────────────────────────────────── # Configuration # ───────────────────────────────────────────── LOG_PATH = "tiktok2bsky.log" STATE_PATH = "tiktok2bsky_state.json" TIKTOK_COOKIES_PATH = "tiktok_cookies.json" # ← export from your browser SCRAPE_VIDEO_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 300 DEFAULT_BSKY_LANGS = ["es"] VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 BSKY_SEND_POST_MAX_RETRIES = 3 BSKY_SEND_POST_BASE_DELAY = 5 BSKY_SEND_POST_MAX_DELAY = 60 BSKY_LOGIN_MAX_RETRIES = 4 BSKY_LOGIN_BASE_DELAY = 10 BSKY_LOGIN_MAX_DELAY = 600 BSKY_LOGIN_JITTER_MAX = 1.5 SUBPROCESS_TIMEOUT_SECONDS = 180 FFPROBE_TIMEOUT_SECONDS = 15 DEFAULT_BSKY_BASE_URL = "https://bsky.social" TIKTOK_PAGE_LOAD_WAIT_S = 5.0 TIKTOK_SCROLL_PAUSE_S = 2.5 TIKTOK_MAX_SCROLLS = 8 TIKTOK_BANNER_WAIT_S = 3.0 TIKTOK_MAX_LOAD_ATTEMPTS = 3 DYNAMIC_ALT_MAX_LENGTH = 150 TRUNCATE_MIN_PREFIX_CHARS = 20 # ───────────────────────────────────────────── # Selectors # ───────────────────────────────────────────── GDPR_SELECTORS = [ 'button:has-text("Permitir todas")', 'button:has-text("Rechazar cookies opcionales")', 'button:has-text("Entendido")', 'button:has-text("Aceptar todo")', 'button:has-text("Accept all")', 'button:has-text("Got it")', 'button:has-text("Decline optional")', '[data-e2e="cookie-banner-accept"]', '[id*="accept"]', '[class*="accept-btn"]', ] TOP_BANNER_SELECTORS = [ 'button:has-text("Entendido")', 'button:has-text("Got it")', 'button:has-text("Understood")', '[data-e2e="top-banner-close"]', '[class*="BannerContainer"] button', '[class*="DivBannerContainer"] button', ] CAPTCHA_SELECTORS = [ '[class*="captcha"]', '[id*="captcha"]', 'div:has-text("Drag the puzzle")', 'div:has-text("puzzle piece")', '[class*="secsdk-captcha"]', '[class*="tiktok-captcha"]', ] GRID_SELECTORS = ( '[data-e2e="user-post-item"], ' '[class*="DivItemContainerV2"], ' 'a[href*="/video/"], ' '[class*="video-feed"], ' 'div[class*="VideoFeed"], ' '[class*="DivVideoFeedV2"]' ) # ───────────────────────────────────────────── # Logging # ───────────────────────────────────────────── logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler(), ], level=logging.INFO, ) # ───────────────────────────────────────────── # Data classes # ───────────────────────────────────────────── class ScrapedMedia: def __init__(self, url, media_type="video"): self.type = media_type self.media_url_https = url class ScrapedTikTok: def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None): self.created_on = created_on self.text = text self.post_url = post_url self.thumbnail_url = thumbnail_url self.media = ([ScrapedMedia(video_url, "video")] if video_url else []) # ───────────────────────────────────────────── # Generic helpers # ───────────────────────────────────────────── def sha256_file(path, chunk_size=1024 * 1024): h = hashlib.sha256() with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break h.update(chunk) return h.hexdigest() def take_error_screenshot(page, label): timestamp = time.strftime("%Y%m%d_%H%M%S") name = f"screenshot_{label}_{timestamp}.png" try: page.screenshot(path=name, full_page=True) logging.info(f"📸 Screenshot saved: {name}") except Exception as e: logging.warning(f"⚠️ Could not save screenshot: {e}") def canonicalize_tiktok_url(url): if not url: return None match = re.search( r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)", url, re.IGNORECASE, ) if match: return (f"https://www.tiktok.com/@{match.group(1)}" f"/video/{match.group(2)}") return url.strip() def load_state(): if os.path.exists(STATE_PATH): with open(STATE_PATH, "r", encoding="utf-8") as f: return json.load(f) return {"posted_ids": []} def save_state(state): with open(STATE_PATH, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) def tiktok_id_from_url(url): if not url: return None match = re.search(r"/video/(\d+)", url) return match.group(1) if match else url def truncate_grapheme(text, max_len, suffix="…"): clusters = list(grapheme.graphemes(text)) if len(clusters) <= max_len: return text keep = max(TRUNCATE_MIN_PREFIX_CHARS, max_len - len(suffix)) return "".join(clusters[:keep]) + suffix # ───────────────────────────────────────────── # Cookie helpers (Option 1) # ───────────────────────────────────────────── def load_tiktok_cookies() -> list: """ Load TikTok session cookies exported from a real browser. Supports both Netscape/EditThisCookie JSON format and the simpler list-of-dicts format used by Cookie-Editor. """ if not os.path.exists(TIKTOK_COOKIES_PATH): logging.warning( f"⚠️ Cookie file not found at '{TIKTOK_COOKIES_PATH}'. " "Running without session — CAPTCHA risk is higher." ) return [] with open(TIKTOK_COOKIES_PATH, "r", encoding="utf-8") as f: raw = json.load(f) # Normalise to Playwright format cookies = [] for c in raw: entry = { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ".tiktok.com"), "path": c.get("path", "/"), } # sameSite must be one of "Strict" | "Lax" | "None" ss = c.get("sameSite", "None") entry["sameSite"] = ss if ss in ("Strict", "Lax", "None") else "None" if "expirationDate" in c: entry["expires"] = int(c["expirationDate"]) elif "expires" in c: entry["expires"] = int(c["expires"]) cookies.append(entry) logging.info(f"🍪 Loaded {len(cookies)} TikTok cookies from {TIKTOK_COOKIES_PATH}") return cookies def _is_captcha_visible(page) -> bool: for sel in CAPTCHA_SELECTORS: try: if page.locator(sel).first.is_visible(timeout=1500): logging.warning(f"🚧 CAPTCHA detected via selector: {sel}") return True except Exception: pass return False # ───────────────────────────────────────────── # yt-dlp scraper (Option 2 — fallback) # ───────────────────────────────────────────── def scrape_tiktoks_via_ytdlp(target_handle: str) -> list: """ Use yt-dlp as a fallback scraper when Playwright hits a CAPTCHA. Extracts video URLs from the public TikTok profile without a browser. Requires: pip install yt-dlp """ logging.info(f"🔄 Falling back to yt-dlp scraper for @{target_handle}...") tiktoks = [] try: import yt_dlp # noqa: F401 — verify it's installed except ImportError: logging.error( "❌ yt-dlp is not installed. Run: pip install yt-dlp\n" " Cannot scrape without Playwright session or yt-dlp." ) return [] profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": "in_playlist", # don't download, just list URLs "playlistend": SCRAPE_VIDEO_LIMIT, "ignoreerrors": True, "socket_timeout": 30, # Pass cookies file if available so yt-dlp also benefits from session **({"cookiefile": TIKTOK_COOKIES_PATH} if os.path.exists(TIKTOK_COOKIES_PATH) else {}), } try: import yt_dlp with yt_dlp.YoutubeDL(ydl_opts) as ydl: logging.info(f"🌐 yt-dlp extracting profile: {profile_url}") info = ydl.extract_info(profile_url, download=False) if not info: logging.error("❌ yt-dlp returned no info for profile.") return [] entries = info.get("entries", []) if not entries: logging.warning("⚠️ yt-dlp found no video entries.") return [] seen_urls = set() for entry in entries: if not entry: continue if len(tiktoks) >= SCRAPE_VIDEO_LIMIT: break url = entry.get("url") or entry.get("webpage_url") or "" canonical = canonicalize_tiktok_url(url) if not canonical or canonical in seen_urls: continue if "/video/" not in canonical: continue seen_urls.add(canonical) # yt-dlp gives us rich metadata for free title = entry.get("title", "") timestamp = entry.get("timestamp") thumbnail = entry.get("thumbnail", "") created = (arrow.Arrow.fromtimestamp(timestamp).isoformat() if timestamp else arrow.utcnow().isoformat()) tiktoks.append(ScrapedTikTok( created_on = created, text = title, video_url = canonical, post_url = canonical, thumbnail_url = thumbnail, )) logging.info(f"🎵 [yt-dlp] Scraped: {canonical}") logging.info(f"✅ yt-dlp scraped {len(tiktoks)} videos.") except Exception as e: logging.error(f"❌ yt-dlp scrape failed: {e}") return tiktoks # ───────────────────────────────────────────── # Playwright scraper (Option 1 — primary) # ───────────────────────────────────────────── def _dismiss_banners(page): for sel in TOP_BANNER_SELECTORS + GDPR_SELECTORS: try: btn = page.locator(sel).first if btn.is_visible(timeout=2000): btn.click() logging.info(f"✅ Dismissed banner: {sel}") time.sleep(1.0) return except Exception: pass def _click_retry_button(page) -> bool: for label in ("Actualizar", "Refresh", "Retry", "Reintentar"): try: btn = page.locator(f'button:has-text("{label}")').first if btn.is_visible(timeout=1500): btn.click() logging.info(f"🔁 Clicked grid retry button: {label}") time.sleep(2.0) return True except Exception: pass return False def scrape_tiktoks_via_playwright(target_handle: str) -> list: """ Primary scraper: Playwright + session cookies. Automatically falls back to yt-dlp if a CAPTCHA is detected. """ tiktoks = [] profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}" cookies = load_tiktok_cookies() # ── Stealth: support both playwright-stealth 2.x and 1.x ────────── try: from playwright_stealth import Stealth USE_STEALTH = "v2" _stealth = Stealth() logging.info("🥷 playwright-stealth 2.x — stealth ON") except ImportError: try: from playwright_stealth import stealth_sync USE_STEALTH = "v1" logging.info("🥷 playwright-stealth 1.x — stealth ON (legacy)") except ImportError: USE_STEALTH = False logging.warning("⚠️ playwright-stealth not installed — no stealth.") with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--window-size=1366,768", ], ) context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1366, "height": 768}, locale="es-ES", timezone_id="Europe/Madrid", extra_http_headers={ "Accept-Language": "es-ES,es;q=0.9,en;q=0.8", "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,*/*;q=0.8" ), "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Ch-Ua": ( '"Chromium";v="124","Google Chrome";v="124",' '"Not-A.Brand";v="99"' ), "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', }, ) # ── Inject session cookies BEFORE navigation ─────────────────── if cookies: context.add_cookies(cookies) logging.info(f"🍪 Injected {len(cookies)} session cookies.") else: logging.warning( "⚠️ No cookies loaded. " f"Create '{TIKTOK_COOKIES_PATH}' to avoid CAPTCHAs." ) page = context.new_page() # ── Apply stealth patches ────────────────────────────────────── if USE_STEALTH == "v2": _stealth.apply_stealth_sync(page) logging.info("🥷 Stealth patches applied (2.x).") elif USE_STEALTH == "v1": stealth_sync(page) logging.info("🥷 Stealth patches applied (1.x).") page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'plugins', { get: () => [ {name:'Chrome PDF Plugin'}, {name:'Chrome PDF Viewer'}, {name:'Native Client'} ] }); Object.defineProperty(navigator, 'languages', { get: () => ['es-ES','es','en'] }); window.chrome = { runtime:{}, loadTimes:function(){}, csi:function(){}, app:{} }; """) try: # ── 1. Navigate ──────────────────────────────────────────── logging.info(f"🌐 Navigating to TikTok profile: {profile_url}") page.goto(profile_url, wait_until="domcontentloaded", timeout=40000) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) # ── 2. CAPTCHA check immediately after load ───────────────── if _is_captcha_visible(page): take_error_screenshot(page, "captcha_after_load") logging.warning( "🚧 CAPTCHA detected right after page load. " "Cookies may be expired — falling back to yt-dlp." ) browser.close() return scrape_tiktoks_via_ytdlp(target_handle) # ── 3. Dismiss banners ───────────────────────────────────── _dismiss_banners(page) # ── 4. Reload for clean grid ─────────────────────────────── logging.info("🔄 Reloading page for clean grid render...") page.reload(wait_until="domcontentloaded", timeout=40000) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) # ── 5. Multi-attempt loop ────────────────────────────────── video_links = [] for attempt in range(1, TIKTOK_MAX_LOAD_ATTEMPTS + 1): logging.info( f"🔁 Grid load attempt {attempt}/{TIKTOK_MAX_LOAD_ATTEMPTS}..." ) # CAPTCHA check on every attempt if _is_captcha_visible(page): take_error_screenshot(page, f"captcha_attempt_{attempt}") logging.warning( f"🚧 CAPTCHA on attempt {attempt} — falling back to yt-dlp." ) browser.close() return scrape_tiktoks_via_ytdlp(target_handle) _dismiss_banners(page) try: page.wait_for_selector(GRID_SELECTORS, timeout=15000) logging.info(f"✅ Grid selector found on attempt {attempt}.") except Exception: logging.warning( f"⚠️ Grid selector timed out on attempt {attempt}." ) take_error_screenshot( page, f"grid_timeout_attempt_{attempt}" ) _click_retry_button(page) try: page.wait_for_selector(GRID_SELECTORS, timeout=10000) except Exception: pass time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) video_links = page.locator('a[href*="/video/"]').all() logging.info( f"📊 Attempt {attempt}: found {len(video_links)} video links." ) if video_links: logging.info(f"✅ Got video links on attempt {attempt}.") break if attempt < TIKTOK_MAX_LOAD_ATTEMPTS: logging.info( f"🔄 No videos — reloading " f"(attempt {attempt + 1}/{TIKTOK_MAX_LOAD_ATTEMPTS})..." ) page.reload(wait_until="domcontentloaded", timeout=40000) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) # ── 6. Scroll to load more ───────────────────────────────── if video_links: for i in range(TIKTOK_MAX_SCROLLS): page.evaluate("window.scrollBy(0, window.innerHeight * 2)") time.sleep(TIKTOK_SCROLL_PAUSE_S) video_links = page.locator('a[href*="/video/"]').all() logging.info( f"📊 {len(video_links)} video links after scrolling." ) # ── 7. Still nothing → yt-dlp fallback ──────────────────── if not video_links: take_error_screenshot(page, "no_video_links_final") logging.warning( "⚠️ No video links found after all Playwright attempts. " "Falling back to yt-dlp." ) browser.close() return scrape_tiktoks_via_ytdlp(target_handle) # ── 8. Parse video links ─────────────────────────────────── seen_urls = set() for link in video_links: if len(tiktoks) >= SCRAPE_VIDEO_LIMIT: break try: href = link.get_attribute("href") if not href: continue post_url = ( f"https://www.tiktok.com{href}" if href.startswith("/") else href ) canonical = canonicalize_tiktok_url(post_url) if not canonical or canonical in seen_urls: continue if "/video/" not in canonical: continue seen_urls.add(canonical) caption = "" try: card = link.locator("..").first cap_el = card.locator( '[data-e2e="video-desc"], ' '[class*="SpanUniqueId"], ' 'p[class*="caption"]' ).first if cap_el.is_visible(timeout=1000): caption = cap_el.inner_text() except Exception: pass thumbnail_url = None try: img = link.locator("img").first if img.is_visible(timeout=1000): thumbnail_url = img.get_attribute("src") except Exception: pass tiktoks.append(ScrapedTikTok( created_on = arrow.utcnow().isoformat(), text = caption, video_url = canonical, post_url = canonical, thumbnail_url = thumbnail_url, )) logging.info(f"🎵 [Playwright] Scraped: {canonical}") except Exception as e: logging.warning(f"⚠️ Failed to parse video card: {e}") except Exception as e: take_error_screenshot(page, "playwright_scrape_failed") logging.error(f"❌ Playwright scrape failed: {e}") browser.close() logging.info("🔄 Attempting yt-dlp fallback after Playwright error...") return scrape_tiktoks_via_ytdlp(target_handle) browser.close() logging.info(f"✅ [Playwright] Scraped {len(tiktoks)} videos.") return tiktoks # ───────────────────────────────────────────── # Video download (yt-dlp) # ───────────────────────────────────────────── def download_video_ytdlp(post_url: str, output_dir: str) -> str | None: """ Download a single TikTok video using yt-dlp. Returns the path to the downloaded file, or None on failure. """ try: import yt_dlp except ImportError: logging.error("❌ yt-dlp not installed. Run: pip install yt-dlp") return None output_template = os.path.join(output_dir, "%(id)s.%(ext)s") ydl_opts = { "quiet": True, "no_warnings": True, "outtmpl": output_template, "format": "mp4/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", "merge_output_format": "mp4", "socket_timeout": 30, "retries": 3, **({"cookiefile": TIKTOK_COOKIES_PATH} if os.path.exists(TIKTOK_COOKIES_PATH) else {}), } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(post_url, download=True) if not info: return None filename = ydl.prepare_filename(info) # yt-dlp may change extension after merge for ext in ("mp4", "mkv", "webm"): candidate = re.sub(r"\.\w+$", f".{ext}", filename) if os.path.exists(candidate): logging.info(f"📥 Downloaded via yt-dlp: {candidate}") return candidate if os.path.exists(filename): return filename except Exception as e: logging.error(f"❌ yt-dlp download failed for {post_url}: {e}") return None # ───────────────────────────────────────────── # Video processing helpers # ───────────────────────────────────────────── def get_video_duration(path: str) -> float | None: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ], capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) return float(result.stdout.strip()) except Exception as e: logging.warning(f"⚠️ ffprobe failed: {e}") return None def trim_video(input_path: str, output_path: str, max_seconds: int = VIDEO_MAX_DURATION_SECONDS) -> bool: try: subprocess.run( [ "ffmpeg", "-y", "-i", input_path, "-t", str(max_seconds), "-c", "copy", output_path, ], capture_output=True, check=True, timeout=SUBPROCESS_TIMEOUT_SECONDS, ) return True except Exception as e: logging.error(f"❌ ffmpeg trim failed: {e}") return False def get_video_dimensions(path: str) -> tuple[int, int] | None: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path, ], capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) parts = result.stdout.strip().split(",") if len(parts) == 2: return int(parts[0]), int(parts[1]) except Exception as e: logging.warning(f"⚠️ Could not get video dimensions: {e}") return None def extract_thumbnail(video_path: str, output_path: str) -> bool: try: subprocess.run( [ "ffmpeg", "-y", "-i", video_path, "-ss", "00:00:01", "-vframes", "1", "-q:v", "2", output_path, ], capture_output=True, check=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) return os.path.exists(output_path) except Exception as e: logging.warning(f"⚠️ Thumbnail extraction failed: {e}") return False # ───────────────────────────────────────────── # Bluesky helpers # ───────────────────────────────────────────── def bsky_login(client: Client, handle: str, password: str, base_url: str) -> bool: for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: client.base_url = base_url client.login(handle, password) logging.info(f"✅ Logged in to Bluesky as {handle}") return True except Exception as e: delay = min( BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, BSKY_LOGIN_JITTER_MAX), BSKY_LOGIN_MAX_DELAY, ) logging.warning( f"⚠️ Bluesky login attempt {attempt} failed: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) logging.error("❌ All Bluesky login attempts failed.") return False def upload_video_blob(client: Client, video_path: str): size_mb = os.path.getsize(video_path) / (1024 * 1024) if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.error( f"❌ Video too large: {size_mb:.1f} MB " f"(max {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" ) return None for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: with open(video_path, "rb") as f: data = f.read() resp = client.upload_blob(data) logging.info(f"✅ Video blob uploaded on attempt {attempt}.") return resp.blob except Exception as e: err = str(e).lower() is_transient = any( k in err for k in ("rate", "timeout", "503", "502", "500") ) delay = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY, ) logging.warning( f"⚠️ Blob upload attempt {attempt} failed: {e}. " f"Retrying in {delay}s..." ) if not is_transient and attempt >= BSKY_BLOB_TRANSIENT_ERROR_RETRIES: break time.sleep(delay) logging.error("❌ All blob upload attempts failed.") return None def upload_thumb_blob(client: Client, thumb_path: str): try: with open(thumb_path, "rb") as f: data = f.read() resp = client.upload_blob(data) logging.info("✅ Thumbnail blob uploaded.") return resp.blob except Exception as e: logging.warning(f"⚠️ Thumbnail upload failed: {e}") return None def send_bsky_post(client: Client, text: str, video_blob, thumb_blob, langs: list, aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None, alt_text: str = "") -> bool: tb = client_utils.TextBuilder() tb.text(text) video_embed = models.AppBskyEmbedVideo.Main( video = video_blob, alt = alt_text[:DYNAMIC_ALT_MAX_LENGTH], thumbnail = thumb_blob, **({"aspectRatio": aspect_ratio} if aspect_ratio else {}), ) for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): try: client.send_post( text = tb, embed = video_embed, langs = langs, ) logging.info("✅ Post sent to Bluesky.") return True except Exception as e: delay = min( BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY, ) logging.warning( f"⚠️ Send post attempt {attempt} failed: {e}. " f"Retrying in {delay}s..." ) time.sleep(delay) logging.error("❌ All send-post attempts failed.") return False # ───────────────────────────────────────────── # Core sync logic # ───────────────────────────────────────────── def already_posted(video_id: str, state: dict) -> bool: return video_id in state.get("posted_ids", []) def mark_posted(video_id: str, state: dict): ids = state.setdefault("posted_ids", []) if video_id not in ids: ids.append(video_id) # Keep only the last N to avoid unbounded growth state["posted_ids"] = ids[-DEDUPE_BSKY_LIMIT * 10:] def process_tiktok(tiktok: ScrapedTikTok, client: Client, langs: list, state: dict) -> bool: """Download, process, and post a single TikTok video to Bluesky.""" if not tiktok.media: logging.warning("⚠️ TikTok has no media — skipping.") return False post_url = tiktok.post_url or tiktok.media[0].media_url_https video_id = tiktok_id_from_url(post_url) if already_posted(video_id, state): logging.info(f"⏭️ Already posted {video_id} — skipping.") return False with tempfile.TemporaryDirectory() as tmpdir: # ── Download video ──────────────────────────────────────────── video_path = download_video_ytdlp(post_url, tmpdir) if not video_path or not os.path.exists(video_path): logging.error(f"❌ Could not download video: {post_url}") return False # ── Check / trim duration ───────────────────────────────────── duration = get_video_duration(video_path) if duration and duration > VIDEO_MAX_DURATION_SECONDS: logging.info( f"✂️ Video {duration:.0f}s > {VIDEO_MAX_DURATION_SECONDS}s — trimming." ) trimmed = os.path.join(tmpdir, "trimmed.mp4") if not trim_video(video_path, trimmed): logging.error("❌ Trim failed — skipping.") return False video_path = trimmed # ── Check file size ─────────────────────────────────────────── size_mb = os.path.getsize(video_path) / (1024 * 1024) if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.error( f"❌ Video still too large after trim: {size_mb:.1f} MB — skipping." ) return False # ── Thumbnail ───────────────────────────────────────────────── thumb_path = os.path.join(tmpdir, "thumb.jpg") if not extract_thumbnail(video_path, thumb_path): thumb_path = None # ── Aspect ratio ────────────────────────────────────────────── aspect_ratio = None dims = get_video_dimensions(video_path) if dims: w, h = dims aspect_ratio = models.AppBskyEmbedDefs.AspectRatio( width=w, height=h ) # ── Upload blobs ────────────────────────────────────────────── video_blob = upload_video_blob(client, video_path) if not video_blob: return False thumb_blob = None if thumb_path and os.path.exists(thumb_path): thumb_blob = upload_thumb_blob(client, thumb_path) # ── Build post text ─────────────────────────────────────────── raw_text = tiktok.text or "" max_chars = BSKY_TEXT_MAX_LENGTH if post_url: max_chars -= len(post_url) + 2 # " \n" separator post_text = truncate_grapheme(raw_text, max_chars) if post_url: post_text = f"{post_text}\n{post_url}".strip() alt_text = truncate_grapheme(raw_text, DYNAMIC_ALT_MAX_LENGTH) # ── Send post ───────────────────────────────────────────────── success = send_bsky_post( client, post_text, video_blob, thumb_blob, langs, aspect_ratio, alt_text, ) if success: mark_posted(video_id, state) save_state(state) logging.info(f"🎉 Posted {video_id} to Bluesky.") return success # ───────────────────────────────────────────── # Entry point # ───────────────────────────────────────────── def main(): load_dotenv() parser = argparse.ArgumentParser( description="TikTok → Bluesky cross-poster" ) parser.add_argument("--tiktok-handle", required=True) parser.add_argument("--bsky-handle", required=True) parser.add_argument("--bsky-password", required=True) parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL) parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS) parser.add_argument( "--cookies-path", default=TIKTOK_COOKIES_PATH, help="Path to exported TikTok cookies JSON file.", ) args = parser.parse_args() # Allow overriding cookie path via CLI global TIKTOK_COOKIES_PATH TIKTOK_COOKIES_PATH = args.cookies_path logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}") logging.info(f"🍪 Cookie file: {TIKTOK_COOKIES_PATH} " f"({'found' if os.path.exists(TIKTOK_COOKIES_PATH) else 'NOT FOUND'})") state = load_state() client = Client() if not bsky_login(client, args.bsky_handle, args.bsky_password, args.bsky_base_url): logging.error("❌ Cannot proceed without Bluesky login.") return logging.info("🔄 Starting TikTok → Bluesky sync cycle...") # ── Scrape (Playwright first, yt-dlp fallback is automatic) ─────── tiktoks = scrape_tiktoks_via_playwright(args.tiktok_handle) if not tiktoks: logging.warning("⚠️ No TikTok videos found. Skipping sync.") logging.info("🤖 Bot finished.") return logging.info(f"📋 Found {len(tiktoks)} videos. Processing new ones...") posted = 0 for tiktok in tiktoks: try: if process_tiktok(tiktok, client, args.bsky_langs, state): posted += 1 # Small courtesy delay between posts time.sleep(random.uniform(3.0, 7.0)) except Exception as e: logging.error(f"❌ Unexpected error processing video: {e}") continue logging.info(f"✅ Sync complete. Posted {posted} new video(s).") logging.info("🤖 Bot finished.") if __name__ == "__main__": main()