Files
post2bsky/tiktok2bsky.py
Guillem Hernandez Sola 96152194ea Cookies 3
2026-05-19 11:08:24 +02:00

1050 lines
40 KiB
Python

import argparse
import arrow
import hashlib
import json
import logging
import re
import time
import os
import subprocess
import uuid
import random
import tempfile
from pathlib import Path
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
import grapheme
# ─────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────
LOG_PATH = "tiktok2bsky.log"
STATE_PATH = "tiktok2bsky_state.json"
TIKTOK_COOKIES_PATH = "tiktok_cookies.json" # ← export from your browser
SCRAPE_VIDEO_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 300
DEFAULT_BSKY_LANGS = ["es"]
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
BSKY_SEND_POST_MAX_RETRIES = 3
BSKY_SEND_POST_BASE_DELAY = 5
BSKY_SEND_POST_MAX_DELAY = 60
BSKY_LOGIN_MAX_RETRIES = 4
BSKY_LOGIN_BASE_DELAY = 10
BSKY_LOGIN_MAX_DELAY = 600
BSKY_LOGIN_JITTER_MAX = 1.5
SUBPROCESS_TIMEOUT_SECONDS = 180
FFPROBE_TIMEOUT_SECONDS = 15
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
TIKTOK_PAGE_LOAD_WAIT_S = 5.0
TIKTOK_SCROLL_PAUSE_S = 2.5
TIKTOK_MAX_SCROLLS = 8
TIKTOK_BANNER_WAIT_S = 3.0
TIKTOK_MAX_LOAD_ATTEMPTS = 3
DYNAMIC_ALT_MAX_LENGTH = 150
TRUNCATE_MIN_PREFIX_CHARS = 20
# ─────────────────────────────────────────────
# Selectors
# ─────────────────────────────────────────────
GDPR_SELECTORS = [
'button:has-text("Permitir todas")',
'button:has-text("Rechazar cookies opcionales")',
'button:has-text("Entendido")',
'button:has-text("Aceptar todo")',
'button:has-text("Accept all")',
'button:has-text("Got it")',
'button:has-text("Decline optional")',
'[data-e2e="cookie-banner-accept"]',
'[id*="accept"]',
'[class*="accept-btn"]',
]
TOP_BANNER_SELECTORS = [
'button:has-text("Entendido")',
'button:has-text("Got it")',
'button:has-text("Understood")',
'[data-e2e="top-banner-close"]',
'[class*="BannerContainer"] button',
'[class*="DivBannerContainer"] button',
]
CAPTCHA_SELECTORS = [
'[class*="captcha"]',
'[id*="captcha"]',
'div:has-text("Drag the puzzle")',
'div:has-text("puzzle piece")',
'[class*="secsdk-captcha"]',
'[class*="tiktok-captcha"]',
]
GRID_SELECTORS = (
'[data-e2e="user-post-item"], '
'[class*="DivItemContainerV2"], '
'a[href*="/video/"], '
'[class*="video-feed"], '
'div[class*="VideoFeed"], '
'[class*="DivVideoFeedV2"]'
)
# ─────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
level=logging.INFO,
)
# ─────────────────────────────────────────────
# Data classes
# ─────────────────────────────────────────────
class ScrapedMedia:
def __init__(self, url, media_type="video"):
self.type = media_type
self.media_url_https = url
class ScrapedTikTok:
def __init__(self, created_on, text, video_url,
post_url=None, thumbnail_url=None):
self.created_on = created_on
self.text = text
self.post_url = post_url
self.thumbnail_url = thumbnail_url
self.media = ([ScrapedMedia(video_url, "video")]
if video_url else [])
# ─────────────────────────────────────────────
# Generic helpers
# ─────────────────────────────────────────────
def sha256_file(path, chunk_size=1024 * 1024):
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def take_error_screenshot(page, label):
timestamp = time.strftime("%Y%m%d_%H%M%S")
name = f"screenshot_{label}_{timestamp}.png"
try:
page.screenshot(path=name, full_page=True)
logging.info(f"📸 Screenshot saved: {name}")
except Exception as e:
logging.warning(f"⚠️ Could not save screenshot: {e}")
def canonicalize_tiktok_url(url):
if not url:
return None
match = re.search(
r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)",
url, re.IGNORECASE,
)
if match:
return (f"https://www.tiktok.com/@{match.group(1)}"
f"/video/{match.group(2)}")
return url.strip()
def load_state():
if os.path.exists(STATE_PATH):
with open(STATE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
return {"posted_ids": []}
def save_state(state):
with open(STATE_PATH, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
def tiktok_id_from_url(url):
if not url:
return None
match = re.search(r"/video/(\d+)", url)
return match.group(1) if match else url
def truncate_grapheme(text, max_len, suffix=""):
clusters = list(grapheme.graphemes(text))
if len(clusters) <= max_len:
return text
keep = max(TRUNCATE_MIN_PREFIX_CHARS, max_len - len(suffix))
return "".join(clusters[:keep]) + suffix
# ─────────────────────────────────────────────
# Cookie helpers (Option 1)
# ─────────────────────────────────────────────
def load_tiktok_cookies() -> list:
"""
Load TikTok session cookies exported from a real browser.
Supports both Netscape/EditThisCookie JSON format and
the simpler list-of-dicts format used by Cookie-Editor.
"""
if not os.path.exists(TIKTOK_COOKIES_PATH):
logging.warning(
f"⚠️ Cookie file not found at '{TIKTOK_COOKIES_PATH}'. "
"Running without session — CAPTCHA risk is higher."
)
return []
with open(TIKTOK_COOKIES_PATH, "r", encoding="utf-8") as f:
raw = json.load(f)
# Normalise to Playwright format
cookies = []
for c in raw:
entry = {
"name": c.get("name", ""),
"value": c.get("value", ""),
"domain": c.get("domain", ".tiktok.com"),
"path": c.get("path", "/"),
}
# sameSite must be one of "Strict" | "Lax" | "None"
ss = c.get("sameSite", "None")
entry["sameSite"] = ss if ss in ("Strict", "Lax", "None") else "None"
if "expirationDate" in c:
entry["expires"] = int(c["expirationDate"])
elif "expires" in c:
entry["expires"] = int(c["expires"])
cookies.append(entry)
logging.info(f"🍪 Loaded {len(cookies)} TikTok cookies from {TIKTOK_COOKIES_PATH}")
return cookies
def _is_captcha_visible(page) -> bool:
for sel in CAPTCHA_SELECTORS:
try:
if page.locator(sel).first.is_visible(timeout=1500):
logging.warning(f"🚧 CAPTCHA detected via selector: {sel}")
return True
except Exception:
pass
return False
# ─────────────────────────────────────────────
# yt-dlp scraper (Option 2 — fallback)
# ─────────────────────────────────────────────
def scrape_tiktoks_via_ytdlp(target_handle: str) -> list:
"""
Use yt-dlp as a fallback scraper when Playwright hits a CAPTCHA.
Extracts video URLs from the public TikTok profile without a browser.
Requires: pip install yt-dlp
"""
logging.info(f"🔄 Falling back to yt-dlp scraper for @{target_handle}...")
tiktoks = []
try:
import yt_dlp # noqa: F401 — verify it's installed
except ImportError:
logging.error(
"❌ yt-dlp is not installed. Run: pip install yt-dlp\n"
" Cannot scrape without Playwright session or yt-dlp."
)
return []
profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}"
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": "in_playlist", # don't download, just list URLs
"playlistend": SCRAPE_VIDEO_LIMIT,
"ignoreerrors": True,
"socket_timeout": 30,
# Pass cookies file if available so yt-dlp also benefits from session
**({"cookiefile": TIKTOK_COOKIES_PATH}
if os.path.exists(TIKTOK_COOKIES_PATH) else {}),
}
try:
import yt_dlp
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
logging.info(f"🌐 yt-dlp extracting profile: {profile_url}")
info = ydl.extract_info(profile_url, download=False)
if not info:
logging.error("❌ yt-dlp returned no info for profile.")
return []
entries = info.get("entries", [])
if not entries:
logging.warning("⚠️ yt-dlp found no video entries.")
return []
seen_urls = set()
for entry in entries:
if not entry:
continue
if len(tiktoks) >= SCRAPE_VIDEO_LIMIT:
break
url = entry.get("url") or entry.get("webpage_url") or ""
canonical = canonicalize_tiktok_url(url)
if not canonical or canonical in seen_urls:
continue
if "/video/" not in canonical:
continue
seen_urls.add(canonical)
# yt-dlp gives us rich metadata for free
title = entry.get("title", "")
timestamp = entry.get("timestamp")
thumbnail = entry.get("thumbnail", "")
created = (arrow.Arrow.fromtimestamp(timestamp).isoformat()
if timestamp else arrow.utcnow().isoformat())
tiktoks.append(ScrapedTikTok(
created_on = created,
text = title,
video_url = canonical,
post_url = canonical,
thumbnail_url = thumbnail,
))
logging.info(f"🎵 [yt-dlp] Scraped: {canonical}")
logging.info(f"✅ yt-dlp scraped {len(tiktoks)} videos.")
except Exception as e:
logging.error(f"❌ yt-dlp scrape failed: {e}")
return tiktoks
# ─────────────────────────────────────────────
# Playwright scraper (Option 1 — primary)
# ─────────────────────────────────────────────
def _dismiss_banners(page):
for sel in TOP_BANNER_SELECTORS + GDPR_SELECTORS:
try:
btn = page.locator(sel).first
if btn.is_visible(timeout=2000):
btn.click()
logging.info(f"✅ Dismissed banner: {sel}")
time.sleep(1.0)
return
except Exception:
pass
def _click_retry_button(page) -> bool:
for label in ("Actualizar", "Refresh", "Retry", "Reintentar"):
try:
btn = page.locator(f'button:has-text("{label}")').first
if btn.is_visible(timeout=1500):
btn.click()
logging.info(f"🔁 Clicked grid retry button: {label}")
time.sleep(2.0)
return True
except Exception:
pass
return False
def scrape_tiktoks_via_playwright(target_handle: str) -> list:
"""
Primary scraper: Playwright + session cookies.
Automatically falls back to yt-dlp if a CAPTCHA is detected.
"""
tiktoks = []
profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}"
cookies = load_tiktok_cookies()
# ── Stealth: support both playwright-stealth 2.x and 1.x ──────────
try:
from playwright_stealth import Stealth
USE_STEALTH = "v2"
_stealth = Stealth()
logging.info("🥷 playwright-stealth 2.x — stealth ON")
except ImportError:
try:
from playwright_stealth import stealth_sync
USE_STEALTH = "v1"
logging.info("🥷 playwright-stealth 1.x — stealth ON (legacy)")
except ImportError:
USE_STEALTH = False
logging.warning("⚠️ playwright-stealth not installed — no stealth.")
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1366,768",
],
)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1366, "height": 768},
locale="es-ES",
timezone_id="Europe/Madrid",
extra_http_headers={
"Accept-Language": "es-ES,es;q=0.9,en;q=0.8",
"Accept": (
"text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/avif,image/webp,*/*;q=0.8"
),
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Ch-Ua": (
'"Chromium";v="124","Google Chrome";v="124",'
'"Not-A.Brand";v="99"'
),
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
},
)
# ── Inject session cookies BEFORE navigation ───────────────────
if cookies:
context.add_cookies(cookies)
logging.info(f"🍪 Injected {len(cookies)} session cookies.")
else:
logging.warning(
"⚠️ No cookies loaded. "
f"Create '{TIKTOK_COOKIES_PATH}' to avoid CAPTCHAs."
)
page = context.new_page()
# ── Apply stealth patches ──────────────────────────────────────
if USE_STEALTH == "v2":
_stealth.apply_stealth_sync(page)
logging.info("🥷 Stealth patches applied (2.x).")
elif USE_STEALTH == "v1":
stealth_sync(page)
logging.info("🥷 Stealth patches applied (1.x).")
page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'plugins', {
get: () => [
{name:'Chrome PDF Plugin'},
{name:'Chrome PDF Viewer'},
{name:'Native Client'}
]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['es-ES','es','en']
});
window.chrome = {
runtime:{}, loadTimes:function(){},
csi:function(){}, app:{}
};
""")
try:
# ── 1. Navigate ────────────────────────────────────────────
logging.info(f"🌐 Navigating to TikTok profile: {profile_url}")
page.goto(profile_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# ── 2. CAPTCHA check immediately after load ─────────────────
if _is_captcha_visible(page):
take_error_screenshot(page, "captcha_after_load")
logging.warning(
"🚧 CAPTCHA detected right after page load. "
"Cookies may be expired — falling back to yt-dlp."
)
browser.close()
return scrape_tiktoks_via_ytdlp(target_handle)
# ── 3. Dismiss banners ─────────────────────────────────────
_dismiss_banners(page)
# ── 4. Reload for clean grid ───────────────────────────────
logging.info("🔄 Reloading page for clean grid render...")
page.reload(wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# ── 5. Multi-attempt loop ──────────────────────────────────
video_links = []
for attempt in range(1, TIKTOK_MAX_LOAD_ATTEMPTS + 1):
logging.info(
f"🔁 Grid load attempt {attempt}/{TIKTOK_MAX_LOAD_ATTEMPTS}..."
)
# CAPTCHA check on every attempt
if _is_captcha_visible(page):
take_error_screenshot(page, f"captcha_attempt_{attempt}")
logging.warning(
f"🚧 CAPTCHA on attempt {attempt} — falling back to yt-dlp."
)
browser.close()
return scrape_tiktoks_via_ytdlp(target_handle)
_dismiss_banners(page)
try:
page.wait_for_selector(GRID_SELECTORS, timeout=15000)
logging.info(f"✅ Grid selector found on attempt {attempt}.")
except Exception:
logging.warning(
f"⚠️ Grid selector timed out on attempt {attempt}."
)
take_error_screenshot(
page, f"grid_timeout_attempt_{attempt}"
)
_click_retry_button(page)
try:
page.wait_for_selector(GRID_SELECTORS, timeout=10000)
except Exception:
pass
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
video_links = page.locator('a[href*="/video/"]').all()
logging.info(
f"📊 Attempt {attempt}: found {len(video_links)} video links."
)
if video_links:
logging.info(f"✅ Got video links on attempt {attempt}.")
break
if attempt < TIKTOK_MAX_LOAD_ATTEMPTS:
logging.info(
f"🔄 No videos — reloading "
f"(attempt {attempt + 1}/{TIKTOK_MAX_LOAD_ATTEMPTS})..."
)
page.reload(wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# ── 6. Scroll to load more ─────────────────────────────────
if video_links:
for i in range(TIKTOK_MAX_SCROLLS):
page.evaluate("window.scrollBy(0, window.innerHeight * 2)")
time.sleep(TIKTOK_SCROLL_PAUSE_S)
video_links = page.locator('a[href*="/video/"]').all()
logging.info(
f"📊 {len(video_links)} video links after scrolling."
)
# ── 7. Still nothing → yt-dlp fallback ────────────────────
if not video_links:
take_error_screenshot(page, "no_video_links_final")
logging.warning(
"⚠️ No video links found after all Playwright attempts. "
"Falling back to yt-dlp."
)
browser.close()
return scrape_tiktoks_via_ytdlp(target_handle)
# ── 8. Parse video links ───────────────────────────────────
seen_urls = set()
for link in video_links:
if len(tiktoks) >= SCRAPE_VIDEO_LIMIT:
break
try:
href = link.get_attribute("href")
if not href:
continue
post_url = (
f"https://www.tiktok.com{href}"
if href.startswith("/") else href
)
canonical = canonicalize_tiktok_url(post_url)
if not canonical or canonical in seen_urls:
continue
if "/video/" not in canonical:
continue
seen_urls.add(canonical)
caption = ""
try:
card = link.locator("..").first
cap_el = card.locator(
'[data-e2e="video-desc"], '
'[class*="SpanUniqueId"], '
'p[class*="caption"]'
).first
if cap_el.is_visible(timeout=1000):
caption = cap_el.inner_text()
except Exception:
pass
thumbnail_url = None
try:
img = link.locator("img").first
if img.is_visible(timeout=1000):
thumbnail_url = img.get_attribute("src")
except Exception:
pass
tiktoks.append(ScrapedTikTok(
created_on = arrow.utcnow().isoformat(),
text = caption,
video_url = canonical,
post_url = canonical,
thumbnail_url = thumbnail_url,
))
logging.info(f"🎵 [Playwright] Scraped: {canonical}")
except Exception as e:
logging.warning(f"⚠️ Failed to parse video card: {e}")
except Exception as e:
take_error_screenshot(page, "playwright_scrape_failed")
logging.error(f"❌ Playwright scrape failed: {e}")
browser.close()
logging.info("🔄 Attempting yt-dlp fallback after Playwright error...")
return scrape_tiktoks_via_ytdlp(target_handle)
browser.close()
logging.info(f"✅ [Playwright] Scraped {len(tiktoks)} videos.")
return tiktoks
# ─────────────────────────────────────────────
# Video download (yt-dlp)
# ─────────────────────────────────────────────
def download_video_ytdlp(post_url: str, output_dir: str) -> str | None:
"""
Download a single TikTok video using yt-dlp.
Returns the path to the downloaded file, or None on failure.
"""
try:
import yt_dlp
except ImportError:
logging.error("❌ yt-dlp not installed. Run: pip install yt-dlp")
return None
output_template = os.path.join(output_dir, "%(id)s.%(ext)s")
ydl_opts = {
"quiet": True,
"no_warnings": True,
"outtmpl": output_template,
"format": "mp4/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"merge_output_format": "mp4",
"socket_timeout": 30,
"retries": 3,
**({"cookiefile": TIKTOK_COOKIES_PATH}
if os.path.exists(TIKTOK_COOKIES_PATH) else {}),
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(post_url, download=True)
if not info:
return None
filename = ydl.prepare_filename(info)
# yt-dlp may change extension after merge
for ext in ("mp4", "mkv", "webm"):
candidate = re.sub(r"\.\w+$", f".{ext}", filename)
if os.path.exists(candidate):
logging.info(f"📥 Downloaded via yt-dlp: {candidate}")
return candidate
if os.path.exists(filename):
return filename
except Exception as e:
logging.error(f"❌ yt-dlp download failed for {post_url}: {e}")
return None
# ─────────────────────────────────────────────
# Video processing helpers
# ─────────────────────────────────────────────
def get_video_duration(path: str) -> float | None:
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
capture_output=True, text=True,
timeout=FFPROBE_TIMEOUT_SECONDS,
)
return float(result.stdout.strip())
except Exception as e:
logging.warning(f"⚠️ ffprobe failed: {e}")
return None
def trim_video(input_path: str, output_path: str,
max_seconds: int = VIDEO_MAX_DURATION_SECONDS) -> bool:
try:
subprocess.run(
[
"ffmpeg", "-y", "-i", input_path,
"-t", str(max_seconds),
"-c", "copy", output_path,
],
capture_output=True, check=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
return True
except Exception as e:
logging.error(f"❌ ffmpeg trim failed: {e}")
return False
def get_video_dimensions(path: str) -> tuple[int, int] | None:
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=p=0",
path,
],
capture_output=True, text=True,
timeout=FFPROBE_TIMEOUT_SECONDS,
)
parts = result.stdout.strip().split(",")
if len(parts) == 2:
return int(parts[0]), int(parts[1])
except Exception as e:
logging.warning(f"⚠️ Could not get video dimensions: {e}")
return None
def extract_thumbnail(video_path: str, output_path: str) -> bool:
try:
subprocess.run(
[
"ffmpeg", "-y", "-i", video_path,
"-ss", "00:00:01",
"-vframes", "1",
"-q:v", "2",
output_path,
],
capture_output=True, check=True,
timeout=FFPROBE_TIMEOUT_SECONDS,
)
return os.path.exists(output_path)
except Exception as e:
logging.warning(f"⚠️ Thumbnail extraction failed: {e}")
return False
# ─────────────────────────────────────────────
# Bluesky helpers
# ─────────────────────────────────────────────
def bsky_login(client: Client, handle: str, password: str,
base_url: str) -> bool:
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
try:
client.base_url = base_url
client.login(handle, password)
logging.info(f"✅ Logged in to Bluesky as {handle}")
return True
except Exception as e:
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1))
+ random.uniform(0, BSKY_LOGIN_JITTER_MAX),
BSKY_LOGIN_MAX_DELAY,
)
logging.warning(
f"⚠️ Bluesky login attempt {attempt} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
logging.error("❌ All Bluesky login attempts failed.")
return False
def upload_video_blob(client: Client, video_path: str):
size_mb = os.path.getsize(video_path) / (1024 * 1024)
if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.error(
f"❌ Video too large: {size_mb:.1f} MB "
f"(max {MAX_VIDEO_UPLOAD_SIZE_MB} MB)"
)
return None
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
with open(video_path, "rb") as f:
data = f.read()
resp = client.upload_blob(data)
logging.info(f"✅ Video blob uploaded on attempt {attempt}.")
return resp.blob
except Exception as e:
err = str(e).lower()
is_transient = any(
k in err for k in ("rate", "timeout", "503", "502", "500")
)
delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY,
)
logging.warning(
f"⚠️ Blob upload attempt {attempt} failed: {e}. "
f"Retrying in {delay}s..."
)
if not is_transient and attempt >= BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
break
time.sleep(delay)
logging.error("❌ All blob upload attempts failed.")
return None
def upload_thumb_blob(client: Client, thumb_path: str):
try:
with open(thumb_path, "rb") as f:
data = f.read()
resp = client.upload_blob(data)
logging.info("✅ Thumbnail blob uploaded.")
return resp.blob
except Exception as e:
logging.warning(f"⚠️ Thumbnail upload failed: {e}")
return None
def send_bsky_post(client: Client, text: str, video_blob,
thumb_blob, langs: list,
aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None,
alt_text: str = "") -> bool:
tb = client_utils.TextBuilder()
tb.text(text)
video_embed = models.AppBskyEmbedVideo.Main(
video = video_blob,
alt = alt_text[:DYNAMIC_ALT_MAX_LENGTH],
thumbnail = thumb_blob,
**({"aspectRatio": aspect_ratio} if aspect_ratio else {}),
)
for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1):
try:
client.send_post(
text = tb,
embed = video_embed,
langs = langs,
)
logging.info("✅ Post sent to Bluesky.")
return True
except Exception as e:
delay = min(
BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_SEND_POST_MAX_DELAY,
)
logging.warning(
f"⚠️ Send post attempt {attempt} failed: {e}. "
f"Retrying in {delay}s..."
)
time.sleep(delay)
logging.error("❌ All send-post attempts failed.")
return False
# ─────────────────────────────────────────────
# Core sync logic
# ─────────────────────────────────────────────
def already_posted(video_id: str, state: dict) -> bool:
return video_id in state.get("posted_ids", [])
def mark_posted(video_id: str, state: dict):
ids = state.setdefault("posted_ids", [])
if video_id not in ids:
ids.append(video_id)
# Keep only the last N to avoid unbounded growth
state["posted_ids"] = ids[-DEDUPE_BSKY_LIMIT * 10:]
def process_tiktok(tiktok: ScrapedTikTok, client: Client,
langs: list, state: dict) -> bool:
"""Download, process, and post a single TikTok video to Bluesky."""
if not tiktok.media:
logging.warning("⚠️ TikTok has no media — skipping.")
return False
post_url = tiktok.post_url or tiktok.media[0].media_url_https
video_id = tiktok_id_from_url(post_url)
if already_posted(video_id, state):
logging.info(f"⏭️ Already posted {video_id} — skipping.")
return False
with tempfile.TemporaryDirectory() as tmpdir:
# ── Download video ────────────────────────────────────────────
video_path = download_video_ytdlp(post_url, tmpdir)
if not video_path or not os.path.exists(video_path):
logging.error(f"❌ Could not download video: {post_url}")
return False
# ── Check / trim duration ─────────────────────────────────────
duration = get_video_duration(video_path)
if duration and duration > VIDEO_MAX_DURATION_SECONDS:
logging.info(
f"✂️ Video {duration:.0f}s > {VIDEO_MAX_DURATION_SECONDS}s — trimming."
)
trimmed = os.path.join(tmpdir, "trimmed.mp4")
if not trim_video(video_path, trimmed):
logging.error("❌ Trim failed — skipping.")
return False
video_path = trimmed
# ── Check file size ───────────────────────────────────────────
size_mb = os.path.getsize(video_path) / (1024 * 1024)
if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.error(
f"❌ Video still too large after trim: {size_mb:.1f} MB — skipping."
)
return False
# ── Thumbnail ─────────────────────────────────────────────────
thumb_path = os.path.join(tmpdir, "thumb.jpg")
if not extract_thumbnail(video_path, thumb_path):
thumb_path = None
# ── Aspect ratio ──────────────────────────────────────────────
aspect_ratio = None
dims = get_video_dimensions(video_path)
if dims:
w, h = dims
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
width=w, height=h
)
# ── Upload blobs ──────────────────────────────────────────────
video_blob = upload_video_blob(client, video_path)
if not video_blob:
return False
thumb_blob = None
if thumb_path and os.path.exists(thumb_path):
thumb_blob = upload_thumb_blob(client, thumb_path)
# ── Build post text ───────────────────────────────────────────
raw_text = tiktok.text or ""
max_chars = BSKY_TEXT_MAX_LENGTH
if post_url:
max_chars -= len(post_url) + 2 # " \n" separator
post_text = truncate_grapheme(raw_text, max_chars)
if post_url:
post_text = f"{post_text}\n{post_url}".strip()
alt_text = truncate_grapheme(raw_text, DYNAMIC_ALT_MAX_LENGTH)
# ── Send post ─────────────────────────────────────────────────
success = send_bsky_post(
client, post_text, video_blob,
thumb_blob, langs, aspect_ratio, alt_text,
)
if success:
mark_posted(video_id, state)
save_state(state)
logging.info(f"🎉 Posted {video_id} to Bluesky.")
return success
# ─────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────
# ✅ FIXED — global declared at the very top of the function
def main():
global TIKTOK_COOKIES_PATH # ← must be first, before any other statement
load_dotenv()
parser = argparse.ArgumentParser(
description="TikTok → Bluesky cross-poster"
)
parser.add_argument("--tiktok-handle", required=True)
parser.add_argument("--bsky-handle", required=True)
parser.add_argument("--bsky-password", required=True)
parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL)
parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS)
parser.add_argument(
"--cookies-path",
default=TIKTOK_COOKIES_PATH,
help="Path to exported TikTok cookies JSON file.",
)
args = parser.parse_args()
# Now safe to reassign the global
TIKTOK_COOKIES_PATH = args.cookies_path
logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}")
logging.info(f"🍪 Cookie file: {TIKTOK_COOKIES_PATH} "
f"({'found' if os.path.exists(TIKTOK_COOKIES_PATH) else 'NOT FOUND'})")
state = load_state()
client = Client()
if not bsky_login(client, args.bsky_handle,
args.bsky_password, args.bsky_base_url):
logging.error("❌ Cannot proceed without Bluesky login.")
return
logging.info("🔄 Starting TikTok → Bluesky sync cycle...")
tiktoks = scrape_tiktoks_via_playwright(args.tiktok_handle)
if not tiktoks:
logging.warning("⚠️ No TikTok videos found. Skipping sync.")
logging.info("🤖 Bot finished.")
return
logging.info(f"📋 Found {len(tiktoks)} videos. Processing new ones...")
posted = 0
for tiktok in tiktoks:
try:
if process_tiktok(tiktok, client, args.bsky_langs, state):
posted += 1
time.sleep(random.uniform(3.0, 7.0))
except Exception as e:
logging.error(f"❌ Unexpected error processing video: {e}")
continue
logging.info(f"✅ Sync complete. Posted {posted} new video(s).")
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()