Files
post2bsky/tiktok2bsky.py
Guillem Hernandez Sola 1cf7a334c0 Tiktok example 5
2026-05-19 09:56:31 +02:00

1384 lines
48 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import arrow
import hashlib
import html
import io
import json
import logging
import re
import httpx
import time
import os
import subprocess
import uuid
import random
from urllib.parse import urlparse
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
from bs4 import BeautifulSoup
from PIL import Image
import grapheme
# --- Configuration ---
LOG_PATH = "tiktok2bsky.log"
STATE_PATH = "tiktok2bsky_state.json"
SCRAPE_VIDEO_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 300
DEFAULT_BSKY_LANGS = ["es"]
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
BSKY_IMAGE_MAX_BYTES = 950 * 1024
BSKY_IMAGE_MAX_DIMENSION = 2000
BSKY_IMAGE_MIN_JPEG_QUALITY = 45
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
EXTERNAL_THUMB_MAX_DIMENSION = 1200
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
BSKY_SEND_POST_MAX_RETRIES = 3
BSKY_SEND_POST_BASE_DELAY = 5
BSKY_SEND_POST_MAX_DELAY = 60
BSKY_LOGIN_MAX_RETRIES = 4
BSKY_LOGIN_BASE_DELAY = 10
BSKY_LOGIN_MAX_DELAY = 600
BSKY_LOGIN_JITTER_MAX = 1.5
MEDIA_DOWNLOAD_TIMEOUT = 30
LINK_METADATA_TIMEOUT = 10
SUBPROCESS_TIMEOUT_SECONDS = 180
FFPROBE_TIMEOUT_SECONDS = 15
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
SESSION_FILE_PERMISSIONS = 0o600
TIKTOK_PAGE_LOAD_WAIT_S = 5.0
TIKTOK_SCROLL_PAUSE_S = 2.5
TIKTOK_MAX_SCROLLS = 8
TIKTOK_BANNER_WAIT_S = 3.0
DYNAMIC_ALT_MAX_LENGTH = 150
TRUNCATE_MIN_PREFIX_CHARS = 20
ORPHAN_DIGIT_MAX_DIGITS = 3
# --- Top info/RGPD banner selectors (dismissed first) ---
TOP_BANNER_SELECTORS = [
'button:has-text("Entendido")',
'button:has-text("Got it")',
'button:has-text("Understood")',
'[data-e2e="top-banner-close"]',
'[class*="BannerContainer"] button',
'[class*="DivBannerContainer"] button',
]
# --- Cookie consent banner selectors (dismissed second) ---
GDPR_SELECTORS = [
'button:has-text("Permitir todas")',
'button:has-text("Rechazar cookies opcionales")',
'button:has-text("Entendido")',
'button:has-text("Aceptar todo")',
'button:has-text("Accept all")',
'button:has-text("Got it")',
'button:has-text("Decline optional")',
'[data-e2e="cookie-banner-accept"]',
'[id*="accept"]',
'[class*="accept-btn"]',
]
# --- Video grid selectors ---
GRID_SELECTORS = (
'[data-e2e="user-post-item"], '
'[class*="DivItemContainerV2"], '
'a[href*="/video/"], '
'[class*="video-feed"], '
'div[class*="VideoFeed"], '
'[class*="DivVideoFeedV2"]'
)
# --- Grid error retry button selectors ---
RETRY_BUTTON_SELECTORS = [
'button:has-text("Actualizar")',
'button:has-text("Refresh")',
'button:has-text("Retry")',
'button:has-text("Reintentar")',
'[data-e2e="retry-button"]',
]
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
level=logging.INFO,
)
# --- Per-run caches ---
class _RunCache:
def __init__(self):
self.url_validity: dict = {}
self.video_hash_owner: dict = {}
self.video_url_owner: dict = {}
self.locale: str = "es-ES"
def clear(self):
self.url_validity.clear()
self.video_hash_owner.clear()
self.video_url_owner.clear()
_cache = _RunCache()
def reset_caches():
_cache.clear()
# --- Custom Classes ---
class ScrapedMedia:
def __init__(self, url, media_type="video"):
self.type = media_type
self.media_url_https = url
class ScrapedTikTok:
def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None):
self.created_on = created_on
self.text = text
self.post_url = post_url
self.thumbnail_url = thumbnail_url
self.media = [ScrapedMedia(video_url, "video")] if video_url else []
# --- Helpers ---
def sha256_file(path, chunk_size=1024 * 1024):
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def grapheme_len(text):
return grapheme.length(text)
def remove_file_quietly(path):
if path and os.path.exists(path):
try:
os.remove(path)
logging.info(f"🧹 Removed temp file: {path}")
except Exception as e:
logging.warning(f"⚠️ Could not remove temp file {path}: {e}")
def take_error_screenshot(page, label):
timestamp = time.strftime("%Y%m%d_%H%M%S")
name = f"screenshot_{label}_{timestamp}.png"
try:
page.screenshot(path=name)
logging.info(f"📸 Screenshot saved: {name}")
except Exception as e:
logging.warning(f"⚠️ Could not save screenshot: {e}")
def clean_post_text(text):
raw = (text or "").strip()
raw = re.sub(r"\r", "\n", raw)
raw = re.sub(r"\n{3,}", "\n\n", raw)
return raw.strip()
def normalize_post_text(text):
if not text:
return ""
text = clean_post_text(text)
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if grapheme_len(text) <= max_length:
return text
clusters = list(grapheme.graphemes(text))
truncated = "".join(clusters[:max_length])
last_space = truncated.rfind(" ")
if last_space > TRUNCATE_MIN_PREFIX_CHARS:
return truncated[:last_space]
return truncated
def extract_tiktok_video_id(post_url):
if not post_url:
return None
match = re.search(r"/video/(\d+)", post_url)
return match.group(1) if match else None
def canonicalize_tiktok_url(url):
if not url:
return None
match = re.search(
r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)",
url, re.IGNORECASE,
)
if match:
return f"https://www.tiktok.com/@{match.group(1)}/video/{match.group(2)}"
return url.strip()
def make_unique_video_temp_base(post_url=None):
video_id = extract_tiktok_video_id(post_url) or "unknown"
ts_ms = int(time.time() * 1000)
rand = uuid.uuid4().hex[:8]
base = f"temp_tiktok_{video_id}_{ts_ms}_{rand}"
logging.info(f"🎞️ Using unique temp video base: {base}")
return base
def build_media_fingerprint(tiktok):
if not tiktok or not tiktok.media:
return "no-media"
parts = []
for media in tiktok.media:
media_url = getattr(media, "media_url_https", "") or ""
stable = canonicalize_tiktok_url(tiktok.post_url) or media_url
parts.append(f"video:{stable}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
if not parts:
return "no-media"
parts.sort()
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(
f"{normalized_text}||{media_fingerprint}".encode("utf-8")
).hexdigest()
# --- Bluesky login / retry helpers ---
def is_rate_limited_error(e):
t = repr(e).lower()
return "429" in t or "ratelimitexceeded" in t or "too many requests" in t
def is_auth_error(e):
t = repr(e).lower()
return "401" in t or "403" in t or "invalid identifier" in t
def is_transient_error(e):
signals = [
"InvokeTimeoutError", "ReadTimeout", "WriteTimeout",
"RemoteProtocolError", "ConnectError", "503", "502", "504",
]
return any(s in repr(e) for s in signals)
def is_network_error(e):
signals = [
"ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "503", "502", "504",
]
return any(s in repr(e) for s in signals)
def get_rate_limit_wait_seconds(e, default_delay):
try:
headers = getattr(e, "headers", None) or {}
ra = headers.get("retry-after") or headers.get("Retry-After")
if ra:
return min(max(int(ra), 1), BSKY_LOGIN_MAX_DELAY)
except Exception:
pass
return default_delay
def create_bsky_client(base_url, handle, password):
normalized = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
client = Client(base_url=normalized)
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
try:
client.login(handle, password)
logging.info("✅ Bluesky login successful.")
return client
except Exception as e:
if is_auth_error(e):
raise
if attempt < BSKY_LOGIN_MAX_RETRIES:
wait = min(BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY)
wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX)
logging.warning(f"⏳ Bluesky login retry {attempt} in {wait:.1f}s: {e}")
time.sleep(wait)
continue
raise
raise RuntimeError("Bluesky login failed after all retries.")
# --- State management ---
def default_state():
return {
"version": 1,
"posted_videos": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
state.setdefault("version", 1)
state.setdefault("posted_videos", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp = f"{state_path}.tmp"
with open(temp, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state: {e}")
def remember_posted_video(state, candidate, bsky_uri=None):
key = candidate.get("canonical_post_url") or f"textmedia:{candidate['text_media_key']}"
state["posted_videos"][key] = {
"canonical_post_url": candidate.get("canonical_post_url"),
"normalized_text": candidate["normalized_text"],
"text_media_key": candidate["text_media_key"],
"media_fingerprint": candidate["media_fingerprint"],
"bsky_uri": bsky_uri,
"video_created_on": candidate["tiktok"].created_on,
"post_url": candidate["tiktok"].post_url,
"video_id": candidate.get("video_id"),
"posted_at": arrow.utcnow().isoformat(),
}
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = key
def candidate_matches_state(candidate, state):
canonical_url = candidate["canonical_post_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted = state.get("posted_videos", {})
if canonical_url and canonical_url in posted:
return True, "state:post_url"
for rec in posted.values():
if rec.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for rec in posted.values():
if rec.get("normalized_text") == normalized_text and normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted = state.get("posted_videos", {})
if len(posted) <= max_entries:
return state
sortable = sorted(
posted.items(),
key=lambda x: x[1].get("posted_at", ""),
reverse=True,
)
keep = {k for k, _ in sortable[:max_entries]}
state["posted_videos"] = {k: v for k, v in posted.items() if k in keep}
state["posted_by_bsky_uri"] = {
uri: k
for uri, k in state.get("posted_by_bsky_uri", {}).items()
if k in keep
}
return state
# --- Bluesky feed helpers ---
def get_recent_bsky_posts(client, handle, limit=30):
recent = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized = normalize_post_text(text)
media_fp = build_bsky_media_fingerprint(item.post)
recent.append({
"uri": getattr(item.post, "uri", None),
"normalized_text": normalized,
"media_fingerprint": media_fp,
"text_media_key": build_text_media_key(normalized, media_fp),
})
except Exception as e:
logging.debug(f"Skipping feed item: {e}")
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}")
return recent
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
for existing in recent_bsky_posts:
if candidate["text_media_key"] == existing["text_media_key"]:
return True, "bsky:text_media_fingerprint"
if (
candidate["normalized_text"]
and candidate["normalized_text"] == existing["normalized_text"]
):
return True, "bsky:normalized_text"
return False, None
# --- Upload / blob helpers ---
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
result = client.upload_blob(binary_data)
return result.blob
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY,
)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(
f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s."
)
time.sleep(wait)
continue
break
if (
is_transient_error(e)
and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES
):
transient_attempts += 1
wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
logging.warning(
f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s."
)
time.sleep(wait)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
return None
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
return None
def send_post_with_retry(client, **kwargs):
last_exception = None
for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1):
try:
return client.send_post(**kwargs)
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(
BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_SEND_POST_MAX_DELAY,
)
if attempt < BSKY_SEND_POST_MAX_RETRIES:
time.sleep(wait)
continue
raise
if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES:
time.sleep(BSKY_SEND_POST_BASE_DELAY * attempt)
continue
raise
raise last_exception
def get_blob_from_file(file_path, client):
try:
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}")
return None
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.warning(
f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB"
)
return None
with open(file_path, "rb") as f:
data = f.read()
return upload_blob_with_retry(client, data, media_label=file_path)
except Exception as e:
logging.warning(f"Could not upload file {file_path}: {repr(e)}")
return None
def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
except AttributeError:
logging.error(
"❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto."
)
return None
def build_dynamic_alt(text):
alt = clean_post_text(text or "").replace("\n", " ").strip()
alt = re.sub(r"(?:https?://|www\.)\S+", "", alt).strip()
if not alt:
alt = "TikTok video"
return alt[:DYNAMIC_ALT_MAX_LENGTH]
def make_rich(content):
text_builder = client_utils.TextBuilder()
content = clean_post_text(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("#") and len(word) > 1:
tag = word[1:].rstrip(".,;:!?)'\"")
if tag:
text_builder.tag(word, tag)
else:
text_builder.text(word)
elif word.startswith(("http://", "https://")):
text_builder.link(word, word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# --- Banner helpers ---
def _dismiss_banners(page):
"""
Dismiss all TikTok banners in the correct order:
1. Top RGPD/info banner ("Entendido")
2. Cookie consent modal ("Permitir todas" / "Accept all" / etc.)
Returns True if at least one banner was dismissed.
"""
any_dismissed = False
# ── Step 1: Top RGPD info banner ────────────────────────────────────
for selector in TOP_BANNER_SELECTORS:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=2000):
btn.click()
logging.info(f"✅ Dismissed top banner: {selector}")
time.sleep(1)
any_dismissed = True
break
except Exception:
pass
# ── Step 2: Cookie consent modal ────────────────────────────────────
for selector in GDPR_SELECTORS:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=3000):
btn.click()
logging.info(f"✅ Dismissed cookie banner: {selector}")
time.sleep(TIKTOK_BANNER_WAIT_S)
any_dismissed = True
break
except Exception:
pass
if not any_dismissed:
logging.info(" No banners found — continuing.")
return any_dismissed
def _click_retry_button(page):
"""
Click the "Actualizar" / "Refresh" button that TikTok shows inside
the video grid when it renders an error state. Returns True if clicked.
"""
for selector in RETRY_BUTTON_SELECTORS:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=2000):
btn.click()
logging.info(f"🔁 Clicked grid retry button: {selector}")
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
return True
except Exception:
pass
return False
# --- TikTok Scraping ---
def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "es-ES") -> list:
"""
Scrape recent TikTok videos from a public profile using Playwright.
No login required for public profiles.
Strategy:
1. Navigate to profile and wait for page to settle.
2. Dismiss top RGPD banner ("Entendido") + cookie modal ("Permitir todas").
3. Reload page so TikTok renders the grid cleanly (no "Hubo un problema").
4. Dismiss any banners that reappear after reload.
5. Wait for video grid selector (30 s, soft-fail).
6. Click "Actualizar" retry button if TikTok shows grid error state.
7. Scroll to load more videos.
8. Collect all a[href*="/video/"] links.
"""
tiktoks = []
profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}"
# playwright-stealth — optional but strongly recommended
try:
from playwright_stealth import stealth_sync
USE_STEALTH = True
logging.info("🥷 playwright-stealth available — stealth mode ON")
except ImportError:
USE_STEALTH = False
logging.warning(
"⚠️ playwright-stealth not installed — running without stealth. "
"Run: pip install playwright-stealth"
)
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1366,768",
],
)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1366, "height": 768},
locale="es-ES",
timezone_id="Europe/Madrid",
extra_http_headers={
"Accept-Language": "es-ES,es;q=0.9,en;q=0.8",
"Accept": (
"text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/avif,image/webp,*/*;q=0.8"
),
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Ch-Ua": (
'"Chromium";v="124", "Google Chrome";v="124", '
'"Not-A.Brand";v="99"'
),
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
},
)
page = context.new_page()
if USE_STEALTH:
stealth_sync(page)
logging.info("🥷 Stealth patches applied.")
page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ name: 'Chrome PDF Plugin' },
{ name: 'Chrome PDF Viewer' },
{ name: 'Native Client' }
]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['es-ES', 'es', 'en']
});
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: {}
};
""")
try:
# ── 1. Initial navigation ────────────────────────────────────
logging.info(f"🌐 Navigating to TikTok profile: {profile_url}")
page.goto(profile_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# ── 2. Dismiss banners (first pass) ──────────────────────────
_dismiss_banners(page)
# ── 3. Reload for clean grid render ──────────────────────────
# TikTok serves "Hubo un problema" when the page first loaded
# while banners were blocking. A reload after dismissal gives
# TikTok a clean cookie state so the grid renders correctly.
logging.info("🔄 Reloading page after banner dismissal for clean grid render...")
page.reload(wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# ── 4. Dismiss banners (second pass, post-reload) ────────────
_dismiss_banners(page)
# ── 5. Wait for video grid ───────────────────────────────────
try:
page.wait_for_selector(GRID_SELECTORS, timeout=30000)
logging.info("✅ TikTok video grid detected.")
except Exception:
logging.warning(
"⚠️ Grid selector timed out after 30s — continuing anyway."
)
take_error_screenshot(page, "tiktok_grid_timeout")
# ── 6. Click "Actualizar" if grid shows error state ──────────
# Even when the grid DOM node exists, TikTok may render an
# error card inside it. Clicking the retry button triggers a
# client-side reload of the video feed without a full page
# reload, which often resolves the empty grid.
if _click_retry_button(page):
logging.info("⏳ Waiting for grid to reload after retry click...")
try:
page.wait_for_selector(GRID_SELECTORS, timeout=15000)
logging.info("✅ Grid reloaded after retry.")
except Exception:
logging.warning("⚠️ Grid still not visible after retry click.")
# ── 7. Scroll to load more videos ────────────────────────────
for scroll_i in range(TIKTOK_MAX_SCROLLS):
page.evaluate("window.scrollBy(0, window.innerHeight * 2)")
time.sleep(TIKTOK_SCROLL_PAUSE_S)
logging.info(f"📜 Scroll {scroll_i + 1}/{TIKTOK_MAX_SCROLLS}")
# ── 8. Collect video links ───────────────────────────────────
video_links = page.locator('a[href*="/video/"]').all()
logging.info(
f"📊 Found {len(video_links)} video links. "
f"Parsing up to {SCRAPE_VIDEO_LIMIT}..."
)
if not video_links:
take_error_screenshot(page, "tiktok_no_video_links")
logging.error(
"❌ No video links found after scroll. "
"TikTok may still be blocking — check screenshot."
)
browser.close()
return []
seen_urls = set()
for link in video_links:
if len(tiktoks) >= SCRAPE_VIDEO_LIMIT:
break
try:
href = link.get_attribute("href")
if not href:
continue
post_url = (
f"https://www.tiktok.com{href}"
if href.startswith("/")
else href
)
canonical = canonicalize_tiktok_url(post_url)
if not canonical or canonical in seen_urls:
continue
if "/video/" not in canonical:
continue
seen_urls.add(canonical)
# Caption
caption = ""
try:
card = link.locator("..").first
caption_el = card.locator(
'[data-e2e="video-desc"], '
'[class*="SpanUniqueId"], '
'p[class*="caption"]'
).first
if caption_el.is_visible(timeout=1000):
caption = caption_el.inner_text()
except Exception:
pass
# Thumbnail
thumbnail_url = None
try:
img = link.locator("img").first
if img.is_visible(timeout=1000):
thumbnail_url = img.get_attribute("src")
except Exception:
pass
tiktoks.append(
ScrapedTikTok(
created_on=arrow.utcnow().isoformat(),
text=caption,
video_url=canonical,
post_url=canonical,
thumbnail_url=thumbnail_url,
)
)
logging.info(f"🎵 Scraped TikTok: {canonical}")
except Exception as e:
logging.warning(f"⚠️ Failed to parse video card: {e}")
continue
except Exception as e:
take_error_screenshot(page, "tiktok_scrape_failed")
logging.error(f"❌ Failed to scrape TikTok profile: {e}")
browser.close()
logging.info(f"✅ Scraped {len(tiktoks)} TikTok videos.")
return tiktoks
# --- Video URL extraction ---
def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None):
"""
Open a single TikTok video page in an isolated context and intercept
the actual MP4/HLS stream URL from network responses.
"""
ctx = None
page = None
best_mp4_url = None
best_m3u8_url = None
seen_urls = set()
def current_best():
return best_mp4_url or best_m3u8_url
def handle_response(response):
nonlocal best_mp4_url, best_m3u8_url
try:
url = response.url
if not url or url in seen_urls:
return
seen_urls.add(url)
content_type = (response.headers.get("content-type") or "").lower()
url_l = url.lower()
if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l:
return
if ".m3u8" in url_l or "mpegurl" in content_type:
if best_m3u8_url is None:
best_m3u8_url = url
return
if ".mp4" in url_l or "video/mp4" in content_type:
if best_mp4_url is None:
best_mp4_url = url
return
except Exception as e:
logging.debug(f"Response parse error: {e}")
try:
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1366, "height": 768},
)
page = ctx.new_page()
page.on("response", handle_response)
logging.info(f"[video_id={video_id}] 🎬 Opening TikTok video page: {post_url}")
page.goto(post_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(2)
for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']:
try:
player = page.locator(selector).first
if player.count() > 0:
player.click(force=True, timeout=3000)
break
except Exception:
pass
for _ in range(10):
if current_best():
break
time.sleep(1)
selected = current_best()
logging.info(f"[video_id={video_id}] ✅ Resolved video URL: {selected}")
return selected
except Exception as e:
logging.warning(f"[video_id={video_id}] ⚠️ Could not extract video URL: {e}")
return None
finally:
try:
if page:
page.remove_listener("response", handle_response)
page.close()
except Exception:
pass
try:
if ctx:
ctx.close()
except Exception:
pass
# --- Video download + compress ---
def _probe_video_duration(file_path):
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path,
],
capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS,
)
if result.returncode != 0:
raise RuntimeError(f"ffprobe error: {result.stderr.strip()}")
duration_str = result.stdout.strip()
if not duration_str:
raise RuntimeError("ffprobe returned empty duration")
return float(duration_str)
def download_and_crop_video(video_url: str, output_path: str):
temp_input = output_path.replace(".mp4", "_source.mp4")
temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4")
temp_output = output_path.replace(".mp4", "_compressed.mp4")
try:
logging.info(f"⬇️ Downloading TikTok video: {video_url}")
url_l = video_url.lower()
if ".m3u8" in url_l:
download_cmd = [
"ffmpeg", "-y",
"-protocol_whitelist", "file,http,https,tcp,tls,crypto",
"-allowed_extensions", "ALL",
"-i", video_url, "-c", "copy", temp_input,
]
else:
download_cmd = [
"ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input,
]
result = subprocess.run(
download_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if result.returncode != 0:
logging.error(f"❌ ffmpeg download failed:\n{result.stderr}")
return None
if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0:
logging.error("❌ Downloaded file is missing or empty.")
return None
duration = _probe_video_duration(temp_input)
if duration <= 0:
logging.error("❌ Invalid video duration.")
return None
end_time = min(VIDEO_MAX_DURATION_SECONDS, duration)
end_time = min(end_time, duration - 0.05)
end_time = max(end_time, 0.1)
video_clip = VideoFileClip(temp_input)
try:
if hasattr(video_clip, "subclipped"):
cropped = video_clip.subclipped(0, end_time)
else:
cropped = video_clip.subclip(0, end_time)
try:
cropped.write_videofile(
temp_trimmed,
codec="libx264",
audio_codec="aac",
preset="veryfast",
bitrate="1800k",
audio_bitrate="128k",
logger=None,
)
finally:
cropped.close()
finally:
video_clip.close()
if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0:
logging.error("❌ Trimmed video is missing or empty.")
return None
compress_cmd = [
"ffmpeg", "-y", "-i", temp_trimmed,
"-vf", "scale='min(720,iw)':-2",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "30",
"-maxrate", "1800k", "-bufsize", "3600k",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart", temp_output,
]
result = subprocess.run(
compress_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if result.returncode != 0:
logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}")
return None
if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0:
logging.error("❌ Compressed video is missing or empty.")
return None
os.replace(temp_output, output_path)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
logging.info(f"✅ Video ready: {output_path} ({size_mb:.2f} MB)")
return output_path
except subprocess.TimeoutExpired:
logging.error(f"❌ ffmpeg timed out after {SUBPROCESS_TIMEOUT_SECONDS}s")
return None
except Exception as e:
logging.error(f"❌ Video processing error: {repr(e)}")
return None
finally:
remove_file_quietly(temp_input)
remove_file_quietly(temp_trimmed)
remove_file_quietly(temp_output)
# --- Main sync logic ---
def sync_feeds(args):
logging.info("🔄 Starting TikTok → Bluesky sync cycle...")
dry_run = getattr(args, "dry_run", False)
bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS
if dry_run:
logging.info("🧪 DRY RUN MODE — no posts will be created on Bluesky.")
try:
state = load_state(STATE_PATH)
state = prune_state(state, max_entries=5000)
tiktoks = scrape_tiktoks_via_playwright(
args.tiktok_handle,
locale=bsky_langs[0] if bsky_langs else "es-ES",
)
if not tiktoks:
logging.warning("⚠️ No TikTok videos found. Skipping sync.")
return
bsky_client = None
if not dry_run:
bsky_client = create_bsky_client(
args.bsky_base_url, args.bsky_handle, args.bsky_password,
)
recent_bsky_posts = []
if not dry_run:
recent_bsky_posts = get_recent_bsky_posts(
bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT,
)
# --- Build candidates ---
candidates = []
for tiktok in reversed(tiktoks):
try:
canonical_url = canonicalize_tiktok_url(tiktok.post_url)
if canonical_url and canonical_url in state.get("posted_videos", {}):
logging.info(f"⚡ Early skip (already in state): {canonical_url}")
continue
text = clean_post_text(tiktok.text or "")
normalized_text = normalize_post_text(text)
media_fp = build_media_fingerprint(tiktok)
text_media_key = build_text_media_key(normalized_text, media_fp)
video_id = extract_tiktok_video_id(tiktok.post_url)
candidate = {
"tiktok": tiktok,
"raw_text": truncate_text_safely(text),
"normalized_text": normalized_text,
"media_fingerprint": media_fp,
"text_media_key": text_media_key,
"canonical_post_url": canonical_url,
"video_id": video_id,
"resolved_video_url": None,
"resolved_video_hash": None,
}
is_dup_state, reason = candidate_matches_state(candidate, state)
if is_dup_state:
logging.info(
f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}"
)
continue
is_dup_bsky, reason = candidate_matches_existing_bsky(
candidate, recent_bsky_posts
)
if is_dup_bsky:
logging.info(
f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}"
)
continue
candidates.append(candidate)
except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate: {e}")
logging.info(
f"📬 {len(candidates)} new TikTok videos to post after dedup."
)
if not candidates:
logging.info("✅ Nothing new to post.")
return
# --- Pre-resolve video URLs ---
with sync_playwright() as p_pre:
pre_browser = p_pre.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
try:
for c in candidates:
c["resolved_video_url"] = extract_tiktok_video_url_isolated(
pre_browser,
c["tiktok"].post_url,
video_id=c.get("video_id"),
)
finally:
pre_browser.close()
# --- Post to Bluesky ---
new_posts = 0
for candidate in candidates:
tiktok = candidate["tiktok"]
raw_text = candidate["raw_text"]
logging.info(
f"📝 {'[DRY RUN] Would post' if dry_run else 'Posting'} "
f"TikTok video: {tiktok.post_url}"
)
if dry_run:
logging.info(f" 📄 Caption: {raw_text[:200]}")
remember_posted_video(
state, candidate,
bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}",
)
save_state(state, STATE_PATH)
new_posts += 1
continue
real_video_url = candidate.get("resolved_video_url")
video_embed = None
if real_video_url:
temp_base = make_unique_video_temp_base(tiktok.post_url)
temp_path = f"{temp_base}.mp4"
try:
cropped_path = download_and_crop_video(real_video_url, temp_path)
if cropped_path:
video_hash = sha256_file(cropped_path)
candidate["resolved_video_hash"] = video_hash
owner = _cache.video_hash_owner.get(video_hash)
if owner and owner != candidate["video_id"]:
logging.warning(
"⚠️ Video hash owned by another video. Skipping."
)
else:
_cache.video_hash_owner[video_hash] = candidate["video_id"]
video_blob = get_blob_from_file(cropped_path, bsky_client)
if video_blob:
alt = build_dynamic_alt(raw_text)
video_embed = build_video_embed(video_blob, alt)
finally:
remove_file_quietly(temp_path)
remove_file_quietly(f"{temp_base}_source.mp4")
remove_file_quietly(f"{temp_base}_trimmed.mp4")
remove_file_quietly(f"{temp_base}_compressed.mp4")
else:
logging.warning(
f"⚠️ Could not resolve video URL for {tiktok.post_url}"
)
try:
rich_text = make_rich(raw_text)
if video_embed:
post_result = send_post_with_retry(
bsky_client,
text=rich_text,
embed=video_embed,
langs=bsky_langs,
)
post_mode = "video"
else:
fallback_text = make_rich(
f"{raw_text}\n\n{tiktok.post_url}".strip()
)
post_result = send_post_with_retry(
bsky_client,
text=fallback_text,
langs=bsky_langs,
)
post_mode = "text_only_fallback"
bsky_uri = getattr(post_result, "uri", None)
remember_posted_video(state, candidate, bsky_uri=bsky_uri)
state = prune_state(state, max_entries=5000)
save_state(state, STATE_PATH)
recent_bsky_posts.insert(0, {
"uri": bsky_uri,
"normalized_text": candidate["normalized_text"],
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
})
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1
logging.info(
f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}"
)
time.sleep(5)
except Exception as e:
logging.error(f"❌ Failed to post to Bluesky: {e}")
logging.info(f"✅ Sync complete. Posted {new_posts} new TikTok videos.")
except Exception as e:
logging.error(f"❌ Error during sync cycle: {e}")
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync")
parser.add_argument(
"--tiktok-handle",
help="TikTok account handle to scrape (without @)",
)
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
parser.add_argument("--bsky-password", help="Your Bluesky app password")
parser.add_argument(
"--bsky-base-url",
help="Bluesky PDS base URL",
default=None,
)
parser.add_argument(
"--bsky-langs",
help="Comma-separated language codes (e.g. es,en)",
default=None,
)
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.bsky_base_url = (
args.bsky_base_url
or os.getenv("BSKY_BASE_URL")
or DEFAULT_BSKY_BASE_URL
)
raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS")
args.bsky_langs = (
[l.strip() for l in raw_langs.split(",") if l.strip()]
if raw_langs else DEFAULT_BSKY_LANGS
)
missing = []
if not args.tiktok_handle:
missing.append("--tiktok-handle / TIKTOK_HANDLE")
if not args.bsky_handle:
missing.append("--bsky-handle / BSKY_HANDLE")
if not args.bsky_password:
missing.append("--bsky-password / BSKY_APP_PASSWORD")
if missing:
logging.error(f"❌ Missing required arguments: {', '.join(missing)}")
return
logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}")
reset_caches()
sync_feeds(args)
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()