import argparse import arrow import hashlib import html import io import json import logging import re import httpx import time import os import subprocess import uuid import random from urllib.parse import urlparse from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip from bs4 import BeautifulSoup from PIL import Image import grapheme # --- Configuration --- LOG_PATH = "tiktok2bsky.log" STATE_PATH = "tiktok2bsky_state.json" SCRAPE_VIDEO_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 300 DEFAULT_BSKY_LANGS = ["es"] VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 BSKY_IMAGE_MAX_BYTES = 950 * 1024 BSKY_IMAGE_MAX_DIMENSION = 2000 BSKY_IMAGE_MIN_JPEG_QUALITY = 45 EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 EXTERNAL_THUMB_MAX_DIMENSION = 1200 EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 BSKY_SEND_POST_MAX_RETRIES = 3 BSKY_SEND_POST_BASE_DELAY = 5 BSKY_SEND_POST_MAX_DELAY = 60 BSKY_LOGIN_MAX_RETRIES = 4 BSKY_LOGIN_BASE_DELAY = 10 BSKY_LOGIN_MAX_DELAY = 600 BSKY_LOGIN_JITTER_MAX = 1.5 MEDIA_DOWNLOAD_TIMEOUT = 30 LINK_METADATA_TIMEOUT = 10 SUBPROCESS_TIMEOUT_SECONDS = 180 FFPROBE_TIMEOUT_SECONDS = 15 DEFAULT_BSKY_BASE_URL = "https://bsky.social" SESSION_FILE_PERMISSIONS = 0o600 TIKTOK_PAGE_LOAD_WAIT_S = 5.0 TIKTOK_SCROLL_PAUSE_S = 2.5 TIKTOK_MAX_SCROLLS = 8 TIKTOK_BANNER_WAIT_S = 3.0 DYNAMIC_ALT_MAX_LENGTH = 150 TRUNCATE_MIN_PREFIX_CHARS = 20 ORPHAN_DIGIT_MAX_DIGITS = 3 # --- Top info/RGPD banner selectors (dismissed first) --- TOP_BANNER_SELECTORS = [ 'button:has-text("Entendido")', 'button:has-text("Got it")', 'button:has-text("Understood")', '[data-e2e="top-banner-close"]', '[class*="BannerContainer"] button', '[class*="DivBannerContainer"] button', ] # --- Cookie consent banner selectors (dismissed second) --- GDPR_SELECTORS = [ 'button:has-text("Permitir todas")', 'button:has-text("Rechazar cookies opcionales")', 'button:has-text("Entendido")', 'button:has-text("Aceptar todo")', 'button:has-text("Accept all")', 'button:has-text("Got it")', 'button:has-text("Decline optional")', '[data-e2e="cookie-banner-accept"]', '[id*="accept"]', '[class*="accept-btn"]', ] # --- Video grid selectors --- GRID_SELECTORS = ( '[data-e2e="user-post-item"], ' '[class*="DivItemContainerV2"], ' 'a[href*="/video/"], ' '[class*="video-feed"], ' 'div[class*="VideoFeed"], ' '[class*="DivVideoFeedV2"]' ) # --- Logging Setup --- logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler(), ], level=logging.INFO, ) # --- Per-run caches --- class _RunCache: def __init__(self): self.url_validity: dict = {} self.video_hash_owner: dict = {} self.video_url_owner: dict = {} self.locale: str = "es-ES" def clear(self): self.url_validity.clear() self.video_hash_owner.clear() self.video_url_owner.clear() _cache = _RunCache() def reset_caches(): _cache.clear() # --- Custom Classes --- class ScrapedMedia: def __init__(self, url, media_type="video"): self.type = media_type self.media_url_https = url class ScrapedTikTok: def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None): self.created_on = created_on self.text = text self.post_url = post_url self.thumbnail_url = thumbnail_url self.media = [ScrapedMedia(video_url, "video")] if video_url else [] # --- Helpers --- def sha256_file(path, chunk_size=1024 * 1024): h = hashlib.sha256() with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break h.update(chunk) return h.hexdigest() def grapheme_len(text): return grapheme.length(text) def remove_file_quietly(path): if path and os.path.exists(path): try: os.remove(path) logging.info(f"🧹 Removed temp file: {path}") except Exception as e: logging.warning(f"⚠️ Could not remove temp file {path}: {e}") def take_error_screenshot(page, label): timestamp = time.strftime("%Y%m%d_%H%M%S") name = f"screenshot_{label}_{timestamp}.png" try: page.screenshot(path=name) logging.info(f"πŸ“Έ Screenshot saved: {name}") except Exception as e: logging.warning(f"⚠️ Could not save screenshot: {e}") def clean_post_text(text): raw = (text or "").strip() raw = re.sub(r"\r", "\n", raw) raw = re.sub(r"\n{3,}", "\n\n", raw) return raw.strip() def normalize_post_text(text): if not text: return "" text = clean_post_text(text) text = re.sub(r"\s+", " ", text).strip() return text.lower() def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): if grapheme_len(text) <= max_length: return text clusters = list(grapheme.graphemes(text)) truncated = "".join(clusters[:max_length]) last_space = truncated.rfind(" ") if last_space > TRUNCATE_MIN_PREFIX_CHARS: return truncated[:last_space] return truncated def extract_tiktok_video_id(post_url): if not post_url: return None match = re.search(r"/video/(\d+)", post_url) return match.group(1) if match else None def canonicalize_tiktok_url(url): if not url: return None match = re.search( r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)", url, re.IGNORECASE, ) if match: return f"https://www.tiktok.com/@{match.group(1)}/video/{match.group(2)}" return url.strip() def make_unique_video_temp_base(post_url=None): video_id = extract_tiktok_video_id(post_url) or "unknown" ts_ms = int(time.time() * 1000) rand = uuid.uuid4().hex[:8] base = f"temp_tiktok_{video_id}_{ts_ms}_{rand}" logging.info(f"🎞️ Using unique temp video base: {base}") return base def build_media_fingerprint(tiktok): if not tiktok or not tiktok.media: return "no-media" parts = [] for media in tiktok.media: media_url = getattr(media, "media_url_https", "") or "" stable = canonicalize_tiktok_url(tiktok.post_url) or media_url parts.append(f"video:{stable}") parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def build_bsky_media_fingerprint(post_view): try: embed = getattr(post_view, "embed", None) if not embed: return "no-media" parts = [] video = getattr(embed, "video", None) if video: ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) parts.append(f"video:{ref}") if not parts: return "no-media" parts.sort() return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() except Exception as e: logging.debug(f"Could not build Bluesky media fingerprint: {e}") return "no-media" def build_text_media_key(normalized_text, media_fingerprint): return hashlib.sha256( f"{normalized_text}||{media_fingerprint}".encode("utf-8") ).hexdigest() # --- Bluesky login / retry helpers --- def is_rate_limited_error(e): t = repr(e).lower() return "429" in t or "ratelimitexceeded" in t or "too many requests" in t def is_auth_error(e): t = repr(e).lower() return "401" in t or "403" in t or "invalid identifier" in t def is_transient_error(e): signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(s in repr(e) for s in signals) def is_network_error(e): signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", ] return any(s in repr(e) for s in signals) def get_rate_limit_wait_seconds(e, default_delay): try: headers = getattr(e, "headers", None) or {} ra = headers.get("retry-after") or headers.get("Retry-After") if ra: return min(max(int(ra), 1), BSKY_LOGIN_MAX_DELAY) except Exception: pass return default_delay def create_bsky_client(base_url, handle, password): normalized = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") client = Client(base_url=normalized) for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: client.login(handle, password) logging.info("βœ… Bluesky login successful.") return client except Exception as e: if is_auth_error(e): raise if attempt < BSKY_LOGIN_MAX_RETRIES: wait = min(BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY) wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX) logging.warning(f"⏳ Bluesky login retry {attempt} in {wait:.1f}s: {e}") time.sleep(wait) continue raise raise RuntimeError("Bluesky login failed after all retries.") # --- State management --- def default_state(): return { "version": 1, "posted_videos": {}, "posted_by_bsky_uri": {}, "updated_at": None, } def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): return default_state() try: with open(state_path, "r", encoding="utf-8") as f: state = json.load(f) state.setdefault("version", 1) state.setdefault("posted_videos", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) return state except Exception as e: logging.warning(f"⚠️ Could not load state: {e}. Reinitializing.") return default_state() def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp = f"{state_path}.tmp" with open(temp, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(temp, state_path) logging.info(f"πŸ’Ύ State saved to {state_path}") except Exception as e: logging.error(f"❌ Failed to save state: {e}") def remember_posted_video(state, candidate, bsky_uri=None): key = candidate.get("canonical_post_url") or f"textmedia:{candidate['text_media_key']}" state["posted_videos"][key] = { "canonical_post_url": candidate.get("canonical_post_url"), "normalized_text": candidate["normalized_text"], "text_media_key": candidate["text_media_key"], "media_fingerprint": candidate["media_fingerprint"], "bsky_uri": bsky_uri, "video_created_on": candidate["tiktok"].created_on, "post_url": candidate["tiktok"].post_url, "video_id": candidate.get("video_id"), "posted_at": arrow.utcnow().isoformat(), } if bsky_uri: state["posted_by_bsky_uri"][bsky_uri] = key def candidate_matches_state(candidate, state): canonical_url = candidate["canonical_post_url"] text_media_key = candidate["text_media_key"] normalized_text = candidate["normalized_text"] posted = state.get("posted_videos", {}) if canonical_url and canonical_url in posted: return True, "state:post_url" for rec in posted.values(): if rec.get("text_media_key") == text_media_key: return True, "state:text_media_fingerprint" for rec in posted.values(): if rec.get("normalized_text") == normalized_text and normalized_text: return True, "state:normalized_text" return False, None def prune_state(state, max_entries=5000): posted = state.get("posted_videos", {}) if len(posted) <= max_entries: return state sortable = sorted( posted.items(), key=lambda x: x[1].get("posted_at", ""), reverse=True, ) keep = {k for k, _ in sortable[:max_entries]} state["posted_videos"] = {k: v for k, v in posted.items() if k in keep} state["posted_by_bsky_uri"] = { uri: k for uri, k in state.get("posted_by_bsky_uri", {}).items() if k in keep } return state # --- Bluesky feed helpers --- def get_recent_bsky_posts(client, handle, limit=30): recent = [] try: timeline = client.get_author_feed(handle, limit=limit) for item in timeline.feed: try: if item.reason is not None: continue record = item.post.record if getattr(record, "reply", None) is not None: continue text = getattr(record, "text", "") or "" normalized = normalize_post_text(text) media_fp = build_bsky_media_fingerprint(item.post) recent.append({ "uri": getattr(item.post, "uri", None), "normalized_text": normalized, "media_fingerprint": media_fp, "text_media_key": build_text_media_key(normalized, media_fp), }) except Exception as e: logging.debug(f"Skipping feed item: {e}") except Exception as e: logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}") return recent def candidate_matches_existing_bsky(candidate, recent_bsky_posts): for existing in recent_bsky_posts: if candidate["text_media_key"] == existing["text_media_key"]: return True, "bsky:text_media_fingerprint" if ( candidate["normalized_text"] and candidate["normalized_text"] == existing["normalized_text"] ): return True, "bsky:normalized_text" return False, None # --- Upload / blob helpers --- def upload_blob_with_retry(client, binary_data, media_label="media"): last_exception = None transient_attempts = 0 for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: result = client.upload_blob(binary_data) return result.blob except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): wait = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY, ) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning( f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s." ) time.sleep(wait) continue break if ( is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES ): transient_attempts += 1 wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts logging.warning( f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s." ) time.sleep(wait) continue logging.warning(f"Could not upload {media_label}: {repr(e)}") return None logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") return None def send_post_with_retry(client, **kwargs): last_exception = None for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): try: return client.send_post(**kwargs) except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): wait = min( BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY, ) if attempt < BSKY_SEND_POST_MAX_RETRIES: time.sleep(wait) continue raise if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: time.sleep(BSKY_SEND_POST_BASE_DELAY * attempt) continue raise raise last_exception def get_blob_from_file(file_path, client): try: if not os.path.exists(file_path): logging.warning(f"File not found: {file_path}") return None size_mb = os.path.getsize(file_path) / (1024 * 1024) if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.warning( f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB" ) return None with open(file_path, "rb") as f: data = f.read() return upload_blob_with_retry(client, data, media_label=file_path) except Exception as e: logging.warning(f"Could not upload file {file_path}: {repr(e)}") return None def build_video_embed(video_blob, alt_text): try: return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) except AttributeError: logging.error( "❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto." ) return None def build_dynamic_alt(text): alt = clean_post_text(text or "").replace("\n", " ").strip() alt = re.sub(r"(?:https?://|www\.)\S+", "", alt).strip() if not alt: alt = "TikTok video" return alt[:DYNAMIC_ALT_MAX_LENGTH] def make_rich(content): text_builder = client_utils.TextBuilder() content = clean_post_text(content) lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("#") and len(word) > 1: tag = word[1:].rstrip(".,;:!?)'\"") if tag: text_builder.tag(word, tag) else: text_builder.text(word) elif word.startswith(("http://", "https://")): text_builder.link(word, word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # --- TikTok Scraping --- def _dismiss_banners(page): """ Dismiss all TikTok banners in the correct order: 1. Top RGPD/info banner ("Entendido") 2. Cookie consent modal ("Permitir todas" / "Accept all" / etc.) Returns True if at least one banner was dismissed. """ any_dismissed = False # ── Step 1: Top RGPD info banner ──────────────────────────────────── for selector in TOP_BANNER_SELECTORS: try: btn = page.locator(selector).first if btn.is_visible(timeout=2000): btn.click() logging.info(f"βœ… Dismissed top banner: {selector}") time.sleep(1) any_dismissed = True break except Exception: pass # ── Step 2: Cookie consent modal ──────────────────────────────────── for selector in GDPR_SELECTORS: try: btn = page.locator(selector).first if btn.is_visible(timeout=3000): btn.click() logging.info(f"βœ… Dismissed cookie banner: {selector}") time.sleep(TIKTOK_BANNER_WAIT_S) any_dismissed = True break except Exception: pass if not any_dismissed: logging.info("ℹ️ No banners found β€” continuing.") return any_dismissed def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "es-ES") -> list: """ Scrape recent TikTok videos from a public profile using Playwright. No login required for public profiles. Banner-handling strategy (fixes applied): 1. Dismiss top RGPD info banner ("Entendido") first. 2. Dismiss cookie consent modal ("Permitir todas" / etc.) second. 3. Reload the page after all banners are dismissed so TikTok renders the video grid cleanly (avoids "Hubo un problema"). 4. playwright-stealth applied before navigation when available. 5. Broader grid selector list + 30 s timeout + soft-fail on timeout. """ tiktoks = [] profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}" # playwright-stealth β€” optional but strongly recommended try: from playwright_stealth import stealth_sync USE_STEALTH = True logging.info("πŸ₯· playwright-stealth available β€” stealth mode ON") except ImportError: USE_STEALTH = False logging.warning( "⚠️ playwright-stealth not installed β€” running without stealth. " "Run: pip install playwright-stealth" ) with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--window-size=1366,768", ], ) context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1366, "height": 768}, locale="es-ES", timezone_id="Europe/Madrid", extra_http_headers={ "Accept-Language": "es-ES,es;q=0.9,en;q=0.8", "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,*/*;q=0.8" ), "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Ch-Ua": ( '"Chromium";v="124", "Google Chrome";v="124", ' '"Not-A.Brand";v="99"' ), "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', }, ) page = context.new_page() if USE_STEALTH: stealth_sync(page) logging.info("πŸ₯· Stealth patches applied.") page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { get: () => [ { name: 'Chrome PDF Plugin' }, { name: 'Chrome PDF Viewer' }, { name: 'Native Client' } ] }); Object.defineProperty(navigator, 'languages', { get: () => ['es-ES', 'es', 'en'] }); window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} }; """) try: # ── Initial navigation ─────────────────────────────────────── logging.info(f"🌐 Navigating to TikTok profile: {profile_url}") page.goto(profile_url, wait_until="domcontentloaded", timeout=40000) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) # ── Dismiss all banners ────────────────────────────────────── _dismiss_banners(page) # ── Reload for a clean grid render ─────────────────────────── # TikTok renders "Hubo un problema" when the page first loaded # with banners present. A fresh reload after banner dismissal # gives TikTok a clean state and the grid renders correctly. logging.info("πŸ”„ Reloading page after banner dismissal for clean grid render...") page.reload(wait_until="domcontentloaded", timeout=40000) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) # ── Dismiss any banners that reappear after reload ─────────── _dismiss_banners(page) # ── Wait for video grid ────────────────────────────────────── try: page.wait_for_selector(GRID_SELECTORS, timeout=30000) logging.info("βœ… TikTok video grid detected.") except Exception: logging.warning( "⚠️ Grid selector timed out after 30s β€” " "attempting scroll anyway (grid may be partially loaded)" ) take_error_screenshot(page, "tiktok_grid_timeout") # ── Scroll to load more videos ─────────────────────────────── for scroll_i in range(TIKTOK_MAX_SCROLLS): page.evaluate("window.scrollBy(0, window.innerHeight * 2)") time.sleep(TIKTOK_SCROLL_PAUSE_S) logging.info(f"πŸ“œ Scroll {scroll_i + 1}/{TIKTOK_MAX_SCROLLS}") # ── Collect video links ────────────────────────────────────── video_links = page.locator('a[href*="/video/"]').all() logging.info( f"πŸ“Š Found {len(video_links)} video links. " f"Parsing up to {SCRAPE_VIDEO_LIMIT}..." ) if not video_links: take_error_screenshot(page, "tiktok_no_video_links") logging.error( "❌ No video links found after scroll. " "TikTok may still be blocking β€” check screenshot." ) browser.close() return [] seen_urls = set() for link in video_links: if len(tiktoks) >= SCRAPE_VIDEO_LIMIT: break try: href = link.get_attribute("href") if not href: continue post_url = ( f"https://www.tiktok.com{href}" if href.startswith("/") else href ) canonical = canonicalize_tiktok_url(post_url) if not canonical or canonical in seen_urls: continue if "/video/" not in canonical: continue seen_urls.add(canonical) # Caption caption = "" try: card = link.locator("..").first caption_el = card.locator( '[data-e2e="video-desc"], ' '[class*="SpanUniqueId"], ' 'p[class*="caption"]' ).first if caption_el.is_visible(timeout=1000): caption = caption_el.inner_text() except Exception: pass # Thumbnail thumbnail_url = None try: img = link.locator("img").first if img.is_visible(timeout=1000): thumbnail_url = img.get_attribute("src") except Exception: pass tiktoks.append( ScrapedTikTok( created_on=arrow.utcnow().isoformat(), text=caption, video_url=canonical, post_url=canonical, thumbnail_url=thumbnail_url, ) ) logging.info(f"🎡 Scraped TikTok: {canonical}") except Exception as e: logging.warning(f"⚠️ Failed to parse video card: {e}") continue except Exception as e: take_error_screenshot(page, "tiktok_scrape_failed") logging.error(f"❌ Failed to scrape TikTok profile: {e}") browser.close() logging.info(f"βœ… Scraped {len(tiktoks)} TikTok videos.") return tiktoks # --- Video URL extraction --- def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None): """ Open a single TikTok video page in an isolated context and intercept the actual MP4/HLS stream URL from network responses. """ ctx = None page = None best_mp4_url = None best_m3u8_url = None seen_urls = set() def current_best(): return best_mp4_url or best_m3u8_url def handle_response(response): nonlocal best_mp4_url, best_m3u8_url try: url = response.url if not url or url in seen_urls: return seen_urls.add(url) content_type = (response.headers.get("content-type") or "").lower() url_l = url.lower() if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l: return if ".m3u8" in url_l or "mpegurl" in content_type: if best_m3u8_url is None: best_m3u8_url = url return if ".mp4" in url_l or "video/mp4" in content_type: if best_mp4_url is None: best_mp4_url = url return except Exception as e: logging.debug(f"Response parse error: {e}") try: ctx = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1366, "height": 768}, ) page = ctx.new_page() page.on("response", handle_response) logging.info(f"[video_id={video_id}] 🎬 Opening TikTok video page: {post_url}") page.goto(post_url, wait_until="domcontentloaded", timeout=40000) time.sleep(2) for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']: try: player = page.locator(selector).first if player.count() > 0: player.click(force=True, timeout=3000) break except Exception: pass for _ in range(10): if current_best(): break time.sleep(1) selected = current_best() logging.info(f"[video_id={video_id}] βœ… Resolved video URL: {selected}") return selected except Exception as e: logging.warning(f"[video_id={video_id}] ⚠️ Could not extract video URL: {e}") return None finally: try: if page: page.remove_listener("response", handle_response) page.close() except Exception: pass try: if ctx: ctx.close() except Exception: pass # --- Video download + compress --- def _probe_video_duration(file_path): result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path, ], capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) if result.returncode != 0: raise RuntimeError(f"ffprobe error: {result.stderr.strip()}") duration_str = result.stdout.strip() if not duration_str: raise RuntimeError("ffprobe returned empty duration") return float(duration_str) def download_and_crop_video(video_url: str, output_path: str): temp_input = output_path.replace(".mp4", "_source.mp4") temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") temp_output = output_path.replace(".mp4", "_compressed.mp4") try: logging.info(f"⬇️ Downloading TikTok video: {video_url}") url_l = video_url.lower() if ".m3u8" in url_l: download_cmd = [ "ffmpeg", "-y", "-protocol_whitelist", "file,http,https,tcp,tls,crypto", "-allowed_extensions", "ALL", "-i", video_url, "-c", "copy", temp_input, ] else: download_cmd = [ "ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input, ] result = subprocess.run( download_cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT_SECONDS, ) if result.returncode != 0: logging.error(f"❌ ffmpeg download failed:\n{result.stderr}") return None if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: logging.error("❌ Downloaded file is missing or empty.") return None duration = _probe_video_duration(temp_input) if duration <= 0: logging.error("❌ Invalid video duration.") return None end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) end_time = min(end_time, duration - 0.05) end_time = max(end_time, 0.1) video_clip = VideoFileClip(temp_input) try: if hasattr(video_clip, "subclipped"): cropped = video_clip.subclipped(0, end_time) else: cropped = video_clip.subclip(0, end_time) try: cropped.write_videofile( temp_trimmed, codec="libx264", audio_codec="aac", preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None, ) finally: cropped.close() finally: video_clip.close() if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0: logging.error("❌ Trimmed video is missing or empty.") return None compress_cmd = [ "ffmpeg", "-y", "-i", temp_trimmed, "-vf", "scale='min(720,iw)':-2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "30", "-maxrate", "1800k", "-bufsize", "3600k", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", temp_output, ] result = subprocess.run( compress_cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT_SECONDS, ) if result.returncode != 0: logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}") return None if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: logging.error("❌ Compressed video is missing or empty.") return None os.replace(temp_output, output_path) size_mb = os.path.getsize(output_path) / (1024 * 1024) logging.info(f"βœ… Video ready: {output_path} ({size_mb:.2f} MB)") return output_path except subprocess.TimeoutExpired: logging.error(f"❌ ffmpeg timed out after {SUBPROCESS_TIMEOUT_SECONDS}s") return None except Exception as e: logging.error(f"❌ Video processing error: {repr(e)}") return None finally: remove_file_quietly(temp_input) remove_file_quietly(temp_trimmed) remove_file_quietly(temp_output) # --- Main sync logic --- def sync_feeds(args): logging.info("πŸ”„ Starting TikTok β†’ Bluesky sync cycle...") dry_run = getattr(args, "dry_run", False) bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS if dry_run: logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") try: state = load_state(STATE_PATH) state = prune_state(state, max_entries=5000) tiktoks = scrape_tiktoks_via_playwright( args.tiktok_handle, locale=bsky_langs[0] if bsky_langs else "es-ES", ) if not tiktoks: logging.warning("⚠️ No TikTok videos found. Skipping sync.") return bsky_client = None if not dry_run: bsky_client = create_bsky_client( args.bsky_base_url, args.bsky_handle, args.bsky_password, ) recent_bsky_posts = [] if not dry_run: recent_bsky_posts = get_recent_bsky_posts( bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT, ) # --- Build candidates --- candidates = [] for tiktok in reversed(tiktoks): try: canonical_url = canonicalize_tiktok_url(tiktok.post_url) if canonical_url and canonical_url in state.get("posted_videos", {}): logging.info(f"⚑ Early skip (already in state): {canonical_url}") continue text = clean_post_text(tiktok.text or "") normalized_text = normalize_post_text(text) media_fp = build_media_fingerprint(tiktok) text_media_key = build_text_media_key(normalized_text, media_fp) video_id = extract_tiktok_video_id(tiktok.post_url) candidate = { "tiktok": tiktok, "raw_text": truncate_text_safely(text), "normalized_text": normalized_text, "media_fingerprint": media_fp, "text_media_key": text_media_key, "canonical_post_url": canonical_url, "video_id": video_id, "resolved_video_url": None, "resolved_video_hash": None, } is_dup_state, reason = candidate_matches_state(candidate, state) if is_dup_state: logging.info( f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}" ) continue is_dup_bsky, reason = candidate_matches_existing_bsky( candidate, recent_bsky_posts ) if is_dup_bsky: logging.info( f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}" ) continue candidates.append(candidate) except Exception as e: logging.warning(f"⚠️ Failed to prepare candidate: {e}") logging.info( f"πŸ“¬ {len(candidates)} new TikTok videos to post after dedup." ) if not candidates: logging.info("βœ… Nothing new to post.") return # --- Pre-resolve video URLs --- with sync_playwright() as p_pre: pre_browser = p_pre.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"], ) try: for c in candidates: c["resolved_video_url"] = extract_tiktok_video_url_isolated( pre_browser, c["tiktok"].post_url, video_id=c.get("video_id"), ) finally: pre_browser.close() # --- Post to Bluesky --- new_posts = 0 for candidate in candidates: tiktok = candidate["tiktok"] raw_text = candidate["raw_text"] logging.info( f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " f"TikTok video: {tiktok.post_url}" ) if dry_run: logging.info(f" πŸ“„ Caption: {raw_text[:200]}") remember_posted_video( state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", ) save_state(state, STATE_PATH) new_posts += 1 continue real_video_url = candidate.get("resolved_video_url") video_embed = None if real_video_url: temp_base = make_unique_video_temp_base(tiktok.post_url) temp_path = f"{temp_base}.mp4" try: cropped_path = download_and_crop_video(real_video_url, temp_path) if cropped_path: video_hash = sha256_file(cropped_path) candidate["resolved_video_hash"] = video_hash owner = _cache.video_hash_owner.get(video_hash) if owner and owner != candidate["video_id"]: logging.warning( "⚠️ Video hash owned by another video. Skipping." ) else: _cache.video_hash_owner[video_hash] = candidate["video_id"] video_blob = get_blob_from_file(cropped_path, bsky_client) if video_blob: alt = build_dynamic_alt(raw_text) video_embed = build_video_embed(video_blob, alt) finally: remove_file_quietly(temp_path) remove_file_quietly(f"{temp_base}_source.mp4") remove_file_quietly(f"{temp_base}_trimmed.mp4") remove_file_quietly(f"{temp_base}_compressed.mp4") else: logging.warning( f"⚠️ Could not resolve video URL for {tiktok.post_url}" ) try: rich_text = make_rich(raw_text) if video_embed: post_result = send_post_with_retry( bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs, ) post_mode = "video" else: fallback_text = make_rich( f"{raw_text}\n\n{tiktok.post_url}".strip() ) post_result = send_post_with_retry( bsky_client, text=fallback_text, langs=bsky_langs, ) post_mode = "text_only_fallback" bsky_uri = getattr(post_result, "uri", None) remember_posted_video(state, candidate, bsky_uri=bsky_uri) state = prune_state(state, max_entries=5000) save_state(state, STATE_PATH) recent_bsky_posts.insert(0, { "uri": bsky_uri, "normalized_text": candidate["normalized_text"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], }) recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] new_posts += 1 logging.info( f"βœ… Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}" ) time.sleep(5) except Exception as e: logging.error(f"❌ Failed to post to Bluesky: {e}") logging.info(f"βœ… Sync complete. Posted {new_posts} new TikTok videos.") except Exception as e: logging.error(f"❌ Error during sync cycle: {e}") def main(): load_dotenv() parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync") parser.add_argument( "--tiktok-handle", help="TikTok account handle to scrape (without @)", ) parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") parser.add_argument( "--bsky-base-url", help="Bluesky PDS base URL", default=None, ) parser.add_argument( "--bsky-langs", help="Comma-separated language codes (e.g. es,en)", default=None, ) parser.add_argument("--dry-run", action="store_true", default=False) args = parser.parse_args() args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") args.bsky_base_url = ( args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL ) raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") args.bsky_langs = ( [l.strip() for l in raw_langs.split(",") if l.strip()] if raw_langs else DEFAULT_BSKY_LANGS ) missing = [] if not args.tiktok_handle: missing.append("--tiktok-handle / TIKTOK_HANDLE") if not args.bsky_handle: missing.append("--bsky-handle / BSKY_HANDLE") if not args.bsky_password: missing.append("--bsky-password / BSKY_APP_PASSWORD") if missing: logging.error(f"❌ Missing required arguments: {', '.join(missing)}") return logging.info(f"πŸ€– TikTokβ†’Bluesky bot started. Scraping @{args.tiktok_handle}") reset_caches() sync_feeds(args) logging.info("πŸ€– Bot finished.") if __name__ == "__main__": main()