import argparse import arrow import hashlib import html import io import json import logging import re import httpx import time import os import subprocess import uuid import random from urllib.parse import urlparse from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip from bs4 import BeautifulSoup from PIL import Image import grapheme # --- Configuration --- LOG_PATH = "tiktok2bsky.log" STATE_PATH = "tiktok2bsky_state.json" DEDUPE_BSKY_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 300 DEFAULT_BSKY_LANGS = ["ca"] TIKTOK_PAGE_LOAD_WAIT_S = 5.0 # was 3.0 β€” increased for slower grid render TIKTOK_MAX_SCROLLS = 8 # was 5 β€” more scrolls = more videos discovered SCRAPE_VIDEO_LIMIT = 30 # was 15 VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 BSKY_IMAGE_MAX_BYTES = 950 * 1024 BSKY_IMAGE_MAX_DIMENSION = 2000 BSKY_IMAGE_MIN_JPEG_QUALITY = 45 EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 EXTERNAL_THUMB_MAX_DIMENSION = 1200 EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 BSKY_SEND_POST_MAX_RETRIES = 3 BSKY_SEND_POST_BASE_DELAY = 5 BSKY_SEND_POST_MAX_DELAY = 60 BSKY_LOGIN_MAX_RETRIES = 4 BSKY_LOGIN_BASE_DELAY = 10 BSKY_LOGIN_MAX_DELAY = 600 BSKY_LOGIN_JITTER_MAX = 1.5 MEDIA_DOWNLOAD_TIMEOUT = 30 LINK_METADATA_TIMEOUT = 10 SUBPROCESS_TIMEOUT_SECONDS = 180 FFPROBE_TIMEOUT_SECONDS = 15 DEFAULT_BSKY_BASE_URL = "https://bsky.social" SESSION_FILE_PERMISSIONS = 0o600 TIKTOK_SCROLL_PAUSE_S = 2.5 # pause between scrolls to let videos load TIKTOK_PAGE_LOAD_WAIT_S = 3.0 # initial wait after profile page loads DYNAMIC_ALT_MAX_LENGTH = 150 TRUNCATE_MIN_PREFIX_CHARS = 20 ORPHAN_DIGIT_MAX_DIGITS = 3 # --- Logging Setup --- logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler(), ], level=logging.INFO, ) # --- Per-run caches --- class _RunCache: def __init__(self): self.url_validity: dict = {} self.video_hash_owner: dict = {} self.video_url_owner: dict = {} self.locale: str = "en-US" def clear(self): self.url_validity.clear() self.video_hash_owner.clear() self.video_url_owner.clear() _cache = _RunCache() def reset_caches(): _cache.clear() # --- Custom Classes --- class ScrapedMedia: def __init__(self, url, media_type="video"): self.type = media_type self.media_url_https = url class ScrapedTikTok: """Mirrors ScrapedTweet from twitter2bsky.py.""" def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None): self.created_on = created_on # ISO8601 string or arrow-parseable self.text = text # caption / description self.post_url = post_url # https://www.tiktok.com/@user/video/123 self.thumbnail_url = thumbnail_url self.media = [ScrapedMedia(video_url, "video")] if video_url else [] # --- Helpers (shared with twitter2bsky.py pattern) --- def sha256_file(path, chunk_size=1024 * 1024): h = hashlib.sha256() with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break h.update(chunk) return h.hexdigest() def grapheme_len(text): return grapheme.length(text) def remove_file_quietly(path): if path and os.path.exists(path): try: os.remove(path) logging.info(f"🧹 Removed temp file: {path}") except Exception as e: logging.warning(f"⚠️ Could not remove temp file {path}: {e}") def take_error_screenshot(page, label): timestamp = time.strftime("%Y%m%d_%H%M%S") name = f"screenshot_{label}_{timestamp}.png" try: page.screenshot(path=name) logging.info(f"πŸ“Έ Screenshot saved: {name}") except Exception as e: logging.warning(f"⚠️ Could not save screenshot: {e}") def clean_post_text(text): raw = (text or "").strip() raw = re.sub(r"\r", "\n", raw) raw = re.sub(r"\n{3,}", "\n\n", raw) return raw.strip() def normalize_post_text(text): if not text: return "" text = clean_post_text(text) text = re.sub(r"\s+", " ", text).strip() return text.lower() def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): if grapheme_len(text) <= max_length: return text clusters = list(grapheme.graphemes(text)) truncated = "".join(clusters[:max_length]) last_space = truncated.rfind(" ") if last_space > TRUNCATE_MIN_PREFIX_CHARS: return truncated[:last_space] return truncated def extract_tiktok_video_id(post_url): """Extract numeric video ID from a TikTok URL.""" if not post_url: return None match = re.search(r"/video/(\d+)", post_url) return match.group(1) if match else None def canonicalize_tiktok_url(url): """Normalize TikTok URL to a stable canonical form.""" if not url: return None match = re.search( r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)", url, re.IGNORECASE, ) if match: return f"https://www.tiktok.com/@{match.group(1)}/video/{match.group(2)}" return url.strip() def make_unique_video_temp_base(post_url=None): video_id = extract_tiktok_video_id(post_url) or "unknown" ts_ms = int(time.time() * 1000) rand = uuid.uuid4().hex[:8] base = f"temp_tiktok_{video_id}_{ts_ms}_{rand}" logging.info(f"🎞️ Using unique temp video base: {base}") return base def build_media_fingerprint(tiktok): if not tiktok or not tiktok.media: return "no-media" parts = [] for media in tiktok.media: media_url = getattr(media, "media_url_https", "") or "" stable = canonicalize_tiktok_url(tiktok.post_url) or media_url parts.append(f"video:{stable}") parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def build_bsky_media_fingerprint(post_view): try: embed = getattr(post_view, "embed", None) if not embed: return "no-media" parts = [] video = getattr(embed, "video", None) if video: ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) parts.append(f"video:{ref}") if not parts: return "no-media" parts.sort() return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() except Exception as e: logging.debug(f"Could not build Bluesky media fingerprint: {e}") return "no-media" def build_text_media_key(normalized_text, media_fingerprint): return hashlib.sha256( f"{normalized_text}||{media_fingerprint}".encode("utf-8") ).hexdigest() # --- Bluesky login / retry helpers (identical pattern to twitter2bsky.py) --- def is_rate_limited_error(e): t = repr(e).lower() return "429" in t or "ratelimitexceeded" in t or "too many requests" in t def is_auth_error(e): t = repr(e).lower() return "401" in t or "403" in t or "invalid identifier" in t def is_transient_error(e): signals = ["InvokeTimeoutError","ReadTimeout","WriteTimeout", "RemoteProtocolError","ConnectError","503","502","504"] return any(s in repr(e) for s in signals) def is_network_error(e): signals = ["ConnectError","RemoteProtocolError","ReadTimeout", "WriteTimeout","TimeoutException","503","502","504"] return any(s in repr(e) for s in signals) def get_rate_limit_wait_seconds(e, default_delay): try: headers = getattr(e, "headers", None) or {} ra = headers.get("retry-after") or headers.get("Retry-After") if ra: return min(max(int(ra), 1), BSKY_LOGIN_MAX_DELAY) except Exception: pass return default_delay def create_bsky_client(base_url, handle, password): normalized = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") client = Client(base_url=normalized) for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: client.login(handle, password) logging.info("βœ… Bluesky login successful.") return client except Exception as e: if is_auth_error(e): raise if attempt < BSKY_LOGIN_MAX_RETRIES: wait = min(BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY) wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX) logging.warning(f"⏳ Bluesky login retry {attempt} in {wait:.1f}s: {e}") time.sleep(wait) continue raise raise RuntimeError("Bluesky login failed after all retries.") # --- State management (identical pattern) --- def default_state(): return {"version": 1, "posted_videos": {}, "posted_by_bsky_uri": {}, "updated_at": None} def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): return default_state() try: with open(state_path, "r", encoding="utf-8") as f: state = json.load(f) state.setdefault("version", 1) state.setdefault("posted_videos", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) return state except Exception as e: logging.warning(f"⚠️ Could not load state: {e}. Reinitializing.") return default_state() def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp = f"{state_path}.tmp" with open(temp, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(temp, state_path) logging.info(f"πŸ’Ύ State saved to {state_path}") except Exception as e: logging.error(f"❌ Failed to save state: {e}") def remember_posted_video(state, candidate, bsky_uri=None): key = candidate.get("canonical_post_url") or f"textmedia:{candidate['text_media_key']}" state["posted_videos"][key] = { "canonical_post_url": candidate.get("canonical_post_url"), "normalized_text": candidate["normalized_text"], "text_media_key": candidate["text_media_key"], "media_fingerprint": candidate["media_fingerprint"], "bsky_uri": bsky_uri, "video_created_on": candidate["tiktok"].created_on, "post_url": candidate["tiktok"].post_url, "video_id": candidate.get("video_id"), "posted_at": arrow.utcnow().isoformat(), } if bsky_uri: state["posted_by_bsky_uri"][bsky_uri] = key def candidate_matches_state(candidate, state): canonical_url = candidate["canonical_post_url"] text_media_key = candidate["text_media_key"] normalized_text = candidate["normalized_text"] posted = state.get("posted_videos", {}) if canonical_url and canonical_url in posted: return True, "state:post_url" for rec in posted.values(): if rec.get("text_media_key") == text_media_key: return True, "state:text_media_fingerprint" for rec in posted.values(): if rec.get("normalized_text") == normalized_text and normalized_text: return True, "state:normalized_text" return False, None def prune_state(state, max_entries=5000): posted = state.get("posted_videos", {}) if len(posted) <= max_entries: return state sortable = sorted(posted.items(), key=lambda x: x[1].get("posted_at", ""), reverse=True) keep = {k for k, _ in sortable[:max_entries]} state["posted_videos"] = {k: v for k, v in posted.items() if k in keep} state["posted_by_bsky_uri"] = { uri: k for uri, k in state.get("posted_by_bsky_uri", {}).items() if k in keep } return state # --- Bluesky feed helpers --- def get_recent_bsky_posts(client, handle, limit=30): recent = [] try: timeline = client.get_author_feed(handle, limit=limit) for item in timeline.feed: try: if item.reason is not None: continue record = item.post.record if getattr(record, "reply", None) is not None: continue text = getattr(record, "text", "") or "" normalized = normalize_post_text(text) media_fp = build_bsky_media_fingerprint(item.post) recent.append({ "uri": getattr(item.post, "uri", None), "normalized_text": normalized, "media_fingerprint": media_fp, "text_media_key": build_text_media_key(normalized, media_fp), }) except Exception as e: logging.debug(f"Skipping feed item: {e}") except Exception as e: logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}") return recent def candidate_matches_existing_bsky(candidate, recent_bsky_posts): for existing in recent_bsky_posts: if candidate["text_media_key"] == existing["text_media_key"]: return True, "bsky:text_media_fingerprint" if candidate["normalized_text"] and candidate["normalized_text"] == existing["normalized_text"]: return True, "bsky:normalized_text" return False, None # --- Upload / blob helpers (same as twitter2bsky.py) --- def upload_blob_with_retry(client, binary_data, media_label="media"): last_exception = None transient_attempts = 0 for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: result = client.upload_blob(binary_data) return result.blob except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): wait = min(BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning(f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s.") time.sleep(wait) continue break if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: transient_attempts += 1 wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts logging.warning(f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s.") time.sleep(wait) continue logging.warning(f"Could not upload {media_label}: {repr(e)}") return None logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") return None def send_post_with_retry(client, **kwargs): last_exception = None for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): try: return client.send_post(**kwargs) except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): wait = min(BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY) if attempt < BSKY_SEND_POST_MAX_RETRIES: time.sleep(wait) continue raise if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: time.sleep(BSKY_SEND_POST_BASE_DELAY * attempt) continue raise raise last_exception def get_blob_from_file(file_path, client): try: if not os.path.exists(file_path): logging.warning(f"File not found: {file_path}") return None size_mb = os.path.getsize(file_path) / (1024 * 1024) if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.warning(f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB") return None with open(file_path, "rb") as f: data = f.read() return upload_blob_with_retry(client, data, media_label=file_path) except Exception as e: logging.warning(f"Could not upload file {file_path}: {repr(e)}") return None def build_video_embed(video_blob, alt_text): try: return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) except AttributeError: logging.error("❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") return None def build_dynamic_alt(text): alt = clean_post_text(text or "").replace("\n", " ").strip() alt = re.sub(r"(?:https?://|www\.)\S+", "", alt).strip() if not alt: alt = "TikTok video" return alt[:DYNAMIC_ALT_MAX_LENGTH] def make_rich(content): """Build a Bluesky TextBuilder with hashtag and URL facets.""" text_builder = client_utils.TextBuilder() content = clean_post_text(content) lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("#") and len(word) > 1: tag = word[1:].rstrip(".,;:!?)'\"") if tag: text_builder.tag(word, tag) else: text_builder.text(word) elif word.startswith(("http://", "https://")): text_builder.link(word, word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # --- TikTok Scraping --- # --- TikTok Scraping --- def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> list: """ Scrape recent TikTok videos from a public profile using Playwright. No login required for public profiles. Returns a list of ScrapedTikTok objects. Fixes applied: 1. Aggressive GDPR/consent banner dismissal (Spanish + English) 2. Stealth headers: timezone, locale, sec-ch-ua, webdriver flag hidden 3. playwright-stealth applied before navigation 4. Broader + longer grid selector wait (30s, more selectors) """ tiktoks = [] profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}" # playwright-stealth is optional but strongly recommended try: from playwright_stealth import stealth_sync USE_STEALTH = True logging.info("πŸ₯· playwright-stealth available β€” stealth mode ON") except ImportError: USE_STEALTH = False logging.warning("⚠️ playwright-stealth not installed β€” running without stealth") with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--window-size=1366,768", ], ) # FIX 2 β€” Fake a real Windows Chrome browser with Spanish locale + Madrid timezone context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1366, "height": 768}, locale="es-ES", timezone_id="Europe/Madrid", extra_http_headers={ "Accept-Language": "es-ES,es;q=0.9,en;q=0.8", "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,*/*;q=0.8" ), "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', }, ) page = context.new_page() # FIX 3 β€” Apply playwright-stealth before any navigation if USE_STEALTH: stealth_sync(page) logging.info("πŸ₯· Stealth patches applied.") # FIX 2 β€” Hide webdriver flag + fake plugins/languages via init script page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { get: () => [ { name: 'Chrome PDF Plugin' }, { name: 'Chrome PDF Viewer' }, { name: 'Native Client' } ] }); Object.defineProperty(navigator, 'languages', { get: () => ['es-ES', 'es', 'en'] }); window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} }; """) try: logging.info(f"🌐 Navigating to TikTok profile: {profile_url}") page.goto(profile_url, wait_until="domcontentloaded", timeout=40000) # FIX 1 β€” Wait longer for initial page render (was 3.0s) time.sleep(TIKTOK_PAGE_LOAD_WAIT_S + 2) # FIX 1 β€” Aggressive GDPR/consent banner dismissal (Spanish + English) GDPR_SELECTORS = [ 'button:has-text("Entendido")', 'button:has-text("Aceptar todo")', 'button:has-text("Accept all")', 'button:has-text("Got it")', 'button:has-text("Decline optional")', '[data-e2e="cookie-banner-accept"]', '[id*="accept"]', '[class*="accept-btn"]', ] for selector in GDPR_SELECTORS: try: btn = page.locator(selector).first if btn.is_visible(timeout=3000): btn.click() logging.info(f"βœ… Dismissed banner: {selector}") time.sleep(2) break except Exception: pass # FIX 4 β€” Broader selector list + longer timeout (30s, was 20s) GRID_SELECTORS = ( '[data-e2e="user-post-item"], ' '[class*="DivItemContainerV2"], ' 'a[href*="/video/"], ' '[class*="video-feed"], ' 'div[class*="VideoFeed"], ' '[class*="DivVideoFeedV2"]' ) try: page.wait_for_selector(GRID_SELECTORS, timeout=30000) logging.info("βœ… TikTok video grid detected.") except Exception: # FIX 4 β€” Don't give up immediately: try scrolling anyway logging.warning( "⚠️ Grid selector timed out β€” attempting scroll anyway " "(grid may still be partially loaded)" ) # Scroll to load more videos for scroll_i in range(TIKTOK_MAX_SCROLLS): page.evaluate("window.scrollBy(0, window.innerHeight * 2)") time.sleep(TIKTOK_SCROLL_PAUSE_S) logging.info(f"πŸ“œ Scroll {scroll_i + 1}/{TIKTOK_MAX_SCROLLS}") # Collect video links video_links = page.locator('a[href*="/video/"]').all() logging.info( f"πŸ“Š Found {len(video_links)} video links. " f"Parsing up to {SCRAPE_VIDEO_LIMIT}..." ) if not video_links: take_error_screenshot(page, "tiktok_no_video_links") logging.error("❌ No video links found after scroll. TikTok may be blocking.") browser.close() return [] seen_urls = set() for link in video_links: if len(tiktoks) >= SCRAPE_VIDEO_LIMIT: break try: href = link.get_attribute("href") if not href: continue post_url = ( f"https://www.tiktok.com{href}" if href.startswith("/") else href ) canonical = canonicalize_tiktok_url(post_url) if not canonical or canonical in seen_urls: continue if "/video/" not in canonical: continue seen_urls.add(canonical) # Try to get caption from the card itself caption = "" try: card = link.locator("..").first caption_el = card.locator( '[data-e2e="video-desc"], ' '[class*="SpanUniqueId"], ' 'p[class*="caption"]' ).first if caption_el.is_visible(timeout=1000): caption = caption_el.inner_text() except Exception: pass # Thumbnail thumbnail_url = None try: img = link.locator("img").first if img.is_visible(timeout=1000): thumbnail_url = img.get_attribute("src") except Exception: pass created_on = arrow.utcnow().isoformat() tiktoks.append( ScrapedTikTok( created_on=created_on, text=caption, video_url=canonical, post_url=canonical, thumbnail_url=thumbnail_url, ) ) logging.info(f"🎡 Scraped TikTok: {canonical}") except Exception as e: logging.warning(f"⚠️ Failed to parse video card: {e}") continue except Exception as e: take_error_screenshot(page, "tiktok_scrape_failed") logging.error(f"❌ Failed to scrape TikTok profile: {e}") browser.close() logging.info(f"βœ… Scraped {len(tiktoks)} TikTok videos.") return tiktoks # --- Video extraction --- def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None) -> str | None: """ Open a single TikTok video page in an isolated context and intercept the actual MP4/HLS stream URL from network responses. Mirrors extract_video_url_from_tweet_page_isolated() in twitter2bsky.py. """ ctx = None page = None best_mp4_url = None best_m3u8_url = None seen_urls = set() def current_best(): return best_mp4_url or best_m3u8_url def handle_response(response): nonlocal best_mp4_url, best_m3u8_url try: url = response.url if not url or url in seen_urls: return seen_urls.add(url) content_type = (response.headers.get("content-type") or "").lower() url_l = url.lower() # Skip audio-only and segment files if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l: return if ".m3u8" in url_l or "mpegurl" in content_type: if best_m3u8_url is None: best_m3u8_url = url return if ".mp4" in url_l or "video/mp4" in content_type: if best_mp4_url is None: best_mp4_url = url return except Exception as e: logging.debug(f"Response parse error: {e}") try: ctx = browser.new_context( user_agent=( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ), viewport={"width": 1920, "height": 1080}, ) page = ctx.new_page() page.on("response", handle_response) logging.info(f"[video_id={video_id}] 🎬 Opening TikTok video page: {post_url}") page.goto(post_url, wait_until="domcontentloaded", timeout=40000) time.sleep(2) # Try clicking the video player to trigger stream loading for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']: try: player = page.locator(selector).first if player.count() > 0: player.click(force=True, timeout=3000) break except Exception: pass # Wait up to 10s for a stream URL to appear for _ in range(10): if current_best(): break time.sleep(1) selected = current_best() logging.info(f"[video_id={video_id}] βœ… Resolved video URL: {selected}") return selected except Exception as e: logging.warning(f"[video_id={video_id}] ⚠️ Could not extract video URL: {e}") return None finally: try: if page: page.remove_listener("response", handle_response) page.close() except Exception: pass try: if ctx: ctx.close() except Exception: pass # --- Video download + compress (same ffmpeg pipeline as twitter2bsky.py) --- def _probe_video_duration(file_path): result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path], capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) if result.returncode != 0: raise RuntimeError(f"ffprobe error: {result.stderr.strip()}") duration_str = result.stdout.strip() if not duration_str: raise RuntimeError("ffprobe returned empty duration") return float(duration_str) def download_and_crop_video(video_url: str, output_path: str) -> str | None: """Identical ffmpeg pipeline to twitter2bsky.py.""" temp_input = output_path.replace(".mp4", "_source.mp4") temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") temp_output = output_path.replace(".mp4", "_compressed.mp4") try: logging.info(f"⬇️ Downloading TikTok video: {video_url}") url_l = video_url.lower() if ".m3u8" in url_l: download_cmd = [ "ffmpeg", "-y", "-protocol_whitelist", "file,http,https,tcp,tls,crypto", "-allowed_extensions", "ALL", "-i", video_url, "-c", "copy", temp_input, ] else: download_cmd = [ "ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input, ] result = subprocess.run(download_cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT_SECONDS) if result.returncode != 0: logging.error(f"❌ ffmpeg download failed:\n{result.stderr}") return None if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: logging.error("❌ Downloaded file is missing or empty.") return None duration = _probe_video_duration(temp_input) if duration <= 0: logging.error("❌ Invalid video duration.") return None end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) end_time = min(end_time, duration - 0.05) end_time = max(end_time, 0.1) from moviepy import VideoFileClip video_clip = VideoFileClip(temp_input) try: if hasattr(video_clip, "subclipped"): cropped = video_clip.subclipped(0, end_time) else: cropped = video_clip.subclip(0, end_time) try: cropped.write_videofile( temp_trimmed, codec="libx264", audio_codec="aac", preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None, ) finally: cropped.close() finally: video_clip.close() if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0: logging.error("❌ Trimmed video is missing or empty.") return None compress_cmd = [ "ffmpeg", "-y", "-i", temp_trimmed, "-vf", "scale='min(720,iw)':-2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "30", "-maxrate", "1800k", "-bufsize", "3600k", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", temp_output, ] result = subprocess.run(compress_cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT_SECONDS) if result.returncode != 0: logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}") return None if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: logging.error("❌ Compressed video is missing or empty.") return None os.replace(temp_output, output_path) size_mb = os.path.getsize(output_path) / (1024 * 1024) logging.info(f"βœ… Video ready: {output_path} ({size_mb:.2f} MB)") return output_path except subprocess.TimeoutExpired: logging.error(f"❌ ffmpeg timed out after {SUBPROCESS_TIMEOUT_SECONDS}s") return None except Exception as e: logging.error(f"❌ Video processing error: {repr(e)}") return None finally: remove_file_quietly(temp_input) remove_file_quietly(temp_trimmed) remove_file_quietly(temp_output) # --- Main sync logic --- def sync_feeds(args): logging.info("πŸ”„ Starting TikTok β†’ Bluesky sync cycle...") dry_run = getattr(args, "dry_run", False) bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS if dry_run: logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") try: state = load_state(STATE_PATH) state = prune_state(state, max_entries=5000) tiktoks = scrape_tiktoks_via_playwright( args.tiktok_handle, locale=bsky_langs[0] if bsky_langs else "en-US", ) if not tiktoks: logging.warning("⚠️ No TikTok videos found. Skipping sync.") return bsky_client = None if not dry_run: bsky_client = create_bsky_client( args.bsky_base_url, args.bsky_handle, args.bsky_password, ) recent_bsky_posts = [] if not dry_run: recent_bsky_posts = get_recent_bsky_posts( bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT, ) too_old_cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS) # --- Build candidates --- candidates = [] for tiktok in reversed(tiktoks): try: # TikTok grid doesn't expose timestamps reliably β€” # use state-based dedup as primary guard canonical_url = canonicalize_tiktok_url(tiktok.post_url) if canonical_url and canonical_url in state.get("posted_videos", {}): logging.info(f"⚑ Early skip (already in state): {canonical_url}") continue text = clean_post_text(tiktok.text or "") normalized_text = normalize_post_text(text) media_fp = build_media_fingerprint(tiktok) text_media_key = build_text_media_key(normalized_text, media_fp) video_id = extract_tiktok_video_id(tiktok.post_url) candidate = { "tiktok": tiktok, "raw_text": truncate_text_safely(text), "normalized_text": normalized_text, "media_fingerprint": media_fp, "text_media_key": text_media_key, "canonical_post_url": canonical_url, "video_id": video_id, "resolved_video_url": None, "resolved_video_hash": None, } is_dup_state, reason = candidate_matches_state(candidate, state) if is_dup_state: logging.info(f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}") continue is_dup_bsky, reason = candidate_matches_existing_bsky(candidate, recent_bsky_posts) if is_dup_bsky: logging.info(f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}") continue candidates.append(candidate) except Exception as e: logging.warning(f"⚠️ Failed to prepare candidate: {e}") logging.info(f"πŸ“¬ {len(candidates)} new TikTok videos to post after dedup.") if not candidates: logging.info("βœ… Nothing new to post.") return # --- Pre-resolve video URLs --- with sync_playwright() as p_pre: pre_browser = p_pre.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"], ) try: for c in candidates: c["resolved_video_url"] = extract_tiktok_video_url_isolated( pre_browser, c["tiktok"].post_url, video_id=c.get("video_id"), ) finally: pre_browser.close() # --- Post to Bluesky --- new_posts = 0 for candidate in candidates: tiktok = candidate["tiktok"] raw_text = candidate["raw_text"] logging.info( f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " f"TikTok video: {tiktok.post_url}" ) if dry_run: logging.info(f" πŸ“„ Caption: {raw_text[:200]}") remember_posted_video(state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}") save_state(state, STATE_PATH) new_posts += 1 continue real_video_url = candidate.get("resolved_video_url") video_embed = None video_blob = None if real_video_url: temp_base = make_unique_video_temp_base(tiktok.post_url) temp_path = f"{temp_base}.mp4" try: cropped_path = download_and_crop_video(real_video_url, temp_path) if cropped_path: video_hash = sha256_file(cropped_path) candidate["resolved_video_hash"] = video_hash owner = _cache.video_hash_owner.get(video_hash) if owner and owner != candidate["video_id"]: logging.warning(f"⚠️ Video hash owned by another video. Skipping.") else: _cache.video_hash_owner[video_hash] = candidate["video_id"] video_blob = get_blob_from_file(cropped_path, bsky_client) if video_blob: alt = build_dynamic_alt(raw_text) video_embed = build_video_embed(video_blob, alt) finally: remove_file_quietly(temp_path) remove_file_quietly(f"{temp_base}_source.mp4") remove_file_quietly(f"{temp_base}_trimmed.mp4") remove_file_quietly(f"{temp_base}_compressed.mp4") else: logging.warning(f"⚠️ Could not resolve video URL for {tiktok.post_url}") try: rich_text = make_rich(raw_text) if video_embed: post_result = send_post_with_retry( bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs, ) post_mode = "video" else: # Fallback: post caption as text-only with link to TikTok fallback_text = make_rich( f"{raw_text}\n\n{tiktok.post_url}".strip() ) post_result = send_post_with_retry( bsky_client, text=fallback_text, langs=bsky_langs, ) post_mode = "text_only_fallback" bsky_uri = getattr(post_result, "uri", None) remember_posted_video(state, candidate, bsky_uri=bsky_uri) state = prune_state(state, max_entries=5000) save_state(state, STATE_PATH) recent_bsky_posts.insert(0, { "uri": bsky_uri, "normalized_text": candidate["normalized_text"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], }) recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] new_posts += 1 logging.info(f"βœ… Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}") time.sleep(5) except Exception as e: logging.error(f"❌ Failed to post to Bluesky: {e}") logging.info(f"βœ… Sync complete. Posted {new_posts} new TikTok videos.") except Exception as e: logging.error(f"❌ Error during sync cycle: {e}") def main(): load_dotenv() parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync") parser.add_argument("--tiktok-handle", help="TikTok account handle to scrape (without @)") parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") parser.add_argument("--bsky-base-url", help="Bluesky PDS base URL", default=None) parser.add_argument("--bsky-langs", help="Comma-separated language codes", default=None) parser.add_argument("--dry-run", action="store_true", default=False) args = parser.parse_args() args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") args.bsky_base_url = args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") args.bsky_langs = ( [l.strip() for l in raw_langs.split(",") if l.strip()] if raw_langs else DEFAULT_BSKY_LANGS ) missing = [] if not args.tiktok_handle: missing.append("--tiktok-handle / TIKTOK_HANDLE") if not args.bsky_handle: missing.append("--bsky-handle / BSKY_HANDLE") if not args.bsky_password: missing.append("--bsky-password / BSKY_APP_PASSWORD") if missing: logging.error(f"❌ Missing: {', '.join(missing)}") return logging.info(f"πŸ€– TikTokβ†’Bluesky bot started. Scraping @{args.tiktok_handle}") reset_caches() sync_feeds(args) logging.info("πŸ€– Bot finished.") if __name__ == "__main__": main()