Files
post2bsky/tiktok2bsky.py
Guillem Hernandez Sola ee54aa4a25 Cookies 7
2026-05-19 11:18:48 +02:00

911 lines
37 KiB
Python

#!/usr/bin/env python3
"""
tiktok2bsky.py
──────────────
Scrapes recent videos from a public TikTok profile and cross-posts
them to a Bluesky account.
Usage:
python tiktok2bsky.py \
--tiktok-handle jijantesfc \
--bsky-handle jijantesfc.eurosky.social \
--bsky-app-password xxxx-xxxx-xxxx-xxxx \
--bsky-base-url https://eurosky.social \
--bsky-langs es \
--cookies-path tiktok_cookies.json
"""
import argparse
import json
import logging
import os
import random
import re
import subprocess
import sys
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
import arrow
import httpx
from atproto import Client
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright
# playwright-stealth 1.x uses stealth_sync, 2.x uses Stealth class
try:
from playwright_stealth import stealth_sync
_STEALTH_V2 = False
except ImportError:
from playwright_stealth import Stealth
_STEALTH_V2 = True
# ─────────────────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("tiktok2bsky.log", encoding="utf-8"),
],
level=logging.INFO,
)
# ─────────────────────────────────────────────────────────────────────────────
# Constants & defaults
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
DEFAULT_BSKY_LANGS = ["es"]
TIKTOK_COOKIES_PATH = "tiktok_cookies.json"
STATE_FILE = "tiktok2bsky_state.json"
STATE_MAX_ENTRIES = 5000
SCRAPE_VIDEO_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s
VIDEO_MAX_SIZE_BYTES = 45 * 1024 * 1024 # 45 MB
# Bluesky login retry config
BSKY_LOGIN_MAX_RETRIES = 4
BSKY_LOGIN_BASE_DELAY = 15.0
BSKY_LOGIN_MAX_DELAY = 120.0
BSKY_LOGIN_JITTER_MAX = 10.0
# Bluesky upload retry config
BSKY_UPLOAD_MAX_RETRIES = 5
BSKY_UPLOAD_BASE_DELAY = 10.0
BSKY_UPLOAD_MAX_DELAY = 120.0
BSKY_UPLOAD_JITTER_MAX = 5.0
# Playwright scraping config
PLAYWRIGHT_TIMEOUT_MS = 30_000
PLAYWRIGHT_SLOW_MO = 50
PLAYWRIGHT_MAX_RELOADS = 3
# TikTok selectors
TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]'
TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]'
TIKTOK_BANNER_SELS = [
'[id*="banner"]',
'[class*="banner"]',
'[data-e2e="recommend-modal-close"]',
'button:has-text("Rechazar")',
'button:has-text("Reject")',
'button:has-text("Accept")',
'button:has-text("Aceptar")',
'[aria-label="Close"]',
'[aria-label="Cerrar"]',
]
TIKTOK_COOKIE_MODAL_SELS = [
'button:has-text("Decline all")',
'button:has-text("Rechazar todo")',
'button:has-text("Reject all")',
'button:has-text("Accept all")',
'button:has-text("Aceptar todo")',
'[class*="cookie"] button',
'[id*="cookie"] button',
]
TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]'
TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")'
# ─────────────────────────────────────────────────────────────────────────────
# State management
# ─────────────────────────────────────────────────────────────────────────────
def load_state() -> dict:
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
state = json.load(f)
logging.info(
f"📂 Loaded state: {len(state.get('posted', {}))} entries."
)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state file: {e}. Starting fresh.")
return {"posted": {}}
def save_state(state: dict):
# Prune to last STATE_MAX_ENTRIES
posted = state.get("posted", {})
if len(posted) > STATE_MAX_ENTRIES:
sorted_keys = sorted(
posted.keys(),
key=lambda k: posted[k].get("posted_at", ""),
)
for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]:
del posted[old_key]
state["posted"] = posted
try:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
except Exception as e:
logging.error(f"❌ Could not save state: {e}")
def is_already_posted(video_id: str, state: dict) -> bool:
return video_id in state.get("posted", {})
def mark_as_posted(video_id: str, state: dict, meta: dict = None):
state.setdefault("posted", {})[video_id] = {
"posted_at": arrow.utcnow().isoformat(),
**(meta or {}),
}
save_state(state)
# ─────────────────────────────────────────────────────────────────────────────
# Cookie helpers
# ─────────────────────────────────────────────────────────────────────────────
def load_cookies_from_file(path: str) -> list:
"""Load cookies from a JSON file (format produced by generate_tiktok_cookies.py)."""
if not os.path.exists(path):
logging.warning(f"⚠️ Cookie file not found: {path}")
return []
try:
with open(path, "r", encoding="utf-8") as f:
cookies = json.load(f)
logging.info(f"🍪 Loaded {len(cookies)} cookies from {path}")
return cookies
except Exception as e:
logging.warning(f"⚠️ Could not load cookies from {path}: {e}")
return []
def inject_cookies_into_context(context, cookies: list):
"""Inject a list of cookie dicts into a Playwright browser context."""
if not cookies:
return
playwright_cookies = []
for c in cookies:
entry = {
"name": c.get("name", ""),
"value": c.get("value", ""),
"domain": c.get("domain", ".tiktok.com"),
"path": c.get("path", "/"),
"secure": c.get("secure", False),
"httpOnly": c.get("httpOnly", False),
"sameSite": c.get("sameSite", "None"),
}
exp = c.get("expirationDate") or c.get("expires")
if exp and float(exp) > 0:
entry["expires"] = float(exp)
playwright_cookies.append(entry)
try:
context.add_cookies(playwright_cookies)
logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.")
except Exception as e:
logging.warning(f"⚠️ Could not inject cookies: {e}")
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky helpers
# ─────────────────────────────────────────────────────────────────────────────
def bsky_login(client: Client, handle: str, password: str,
base_url: str) -> bool:
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
try:
# Force the client to use the custom PDS for ALL requests
# including identity resolution — must be set before login
client._base_url = base_url.rstrip("/")
client.base_url = base_url.rstrip("/")
# Use com.atproto.server.createSession directly on the PDS
response = client.com.atproto.server.create_session(
data={
"identifier": handle,
"password": password,
}
)
logging.info(
f"✅ Logged in to Bluesky as {handle} via {base_url}"
)
return True
except Exception as e:
err = str(e)
# 401 = wrong credentials — no point retrying
if any(x in err for x in ("401", "AuthenticationRequired",
"Invalid identifier", "Invalid password")):
logging.error(
f"❌ Bluesky login failed: invalid handle or app password.\n"
f" Handle : {handle}\n"
f" PDS : {base_url}\n"
f" Fix : regenerate app password at {base_url}/settings\n"
f" Detail : {err}"
)
return False
if attempt == BSKY_LOGIN_MAX_RETRIES:
logging.error(
f"❌ All {BSKY_LOGIN_MAX_RETRIES} login attempts failed."
)
return False
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1))
+ random.uniform(0, BSKY_LOGIN_JITTER_MAX),
BSKY_LOGIN_MAX_DELAY,
)
logging.warning(
f"⚠️ Bluesky login attempt {attempt} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
return False
def bsky_get_recent_post_urls(client: Client, handle: str,
limit: int = 50) -> set:
"""Return a set of URLs recently posted to Bluesky (to avoid duplicates)."""
urls: set = set()
try:
feed = client.get_author_feed(actor=handle, limit=limit)
for item in feed.feed:
post = item.post
if hasattr(post, "record") and hasattr(post.record, "embed"):
embed = post.record.embed
if hasattr(embed, "external") and hasattr(embed.external, "uri"):
urls.add(embed.external.uri)
if hasattr(post, "record") and hasattr(post.record, "text"):
text = post.record.text
found = re.findall(r"https?://\S+", text)
urls.update(found)
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}")
return urls
def bsky_upload_blob_with_retry(client: Client, data: bytes,
mime_type: str) -> object:
"""Upload a blob to Bluesky with retry + exponential backoff."""
for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1):
try:
resp = client.upload_blob(data)
logging.info(
f"✅ Blob uploaded ({len(data) / 1024 / 1024:.1f} MB) "
f"on attempt {attempt}."
)
return resp.blob
except Exception as e:
err = str(e)
is_rate_limit = "429" in err or "RateLimitExceeded" in err
if attempt == BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
)
raise
delay = min(
BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1))
+ random.uniform(0, BSKY_UPLOAD_JITTER_MAX),
BSKY_UPLOAD_MAX_DELAY,
)
if is_rate_limit:
delay = max(delay, 60.0)
logging.warning(
f"⚠️ Blob upload attempt {attempt} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
def bsky_create_post_with_retry(client: Client, text: str,
embed=None, langs=None) -> bool:
"""Create a Bluesky post with retry + exponential backoff."""
for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1):
try:
kwargs = {"text": text}
if embed:
kwargs["embed"] = embed
if langs:
kwargs["langs"] = langs
client.send_post(**kwargs)
logging.info(f"✅ Post created on attempt {attempt}.")
return True
except Exception as e:
err = str(e)
is_rate_limit = "429" in err or "RateLimitExceeded" in err
if attempt == BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Post creation failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
)
return False
delay = min(
BSKY_UPLOAD_BASE_DELAY * (2 ** (attempt - 1))
+ random.uniform(0, BSKY_UPLOAD_JITTER_MAX),
BSKY_UPLOAD_MAX_DELAY,
)
if is_rate_limit:
delay = max(delay, 60.0)
logging.warning(
f"⚠️ Post creation attempt {attempt} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
return False
# ─────────────────────────────────────────────────────────────────────────────
# Video processing helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_video_duration(path: str) -> float:
"""Return video duration in seconds using ffprobe."""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
capture_output=True, text=True, timeout=30,
)
return float(result.stdout.strip())
except Exception as e:
logging.warning(f"⚠️ ffprobe failed: {e}")
return 0.0
def compress_video(input_path: str, output_path: str,
max_duration: int = VIDEO_MAX_DURATION_S,
max_size_bytes: int = VIDEO_MAX_SIZE_BYTES) -> bool:
"""
Trim to max_duration and compress to fit max_size_bytes.
Returns True on success.
"""
try:
duration = get_video_duration(input_path)
trim_to = min(duration, max_duration)
# Target bitrate calculation (leave 10% headroom)
target_bits = max_size_bytes * 8 * 0.90
target_kbps = int(target_bits / trim_to / 1000)
video_kbps = max(200, target_kbps - 128) # reserve 128k for audio
logging.info(
f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, "
f"video_bitrate={video_kbps}k"
)
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-t", str(trim_to),
"-vf", "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease",
"-c:v", "libx264",
"-b:v", f"{video_kbps}k",
"-maxrate", f"{video_kbps * 2}k",
"-bufsize", f"{video_kbps * 4}k",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "+faststart",
"-pix_fmt", "yuv420p",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
logging.error(f"❌ ffmpeg failed:\n{result.stderr}")
return False
final_size = os.path.getsize(output_path)
logging.info(
f"✅ Compressed video: {final_size / 1024 / 1024:.1f} MB → {output_path}"
)
return True
except Exception as e:
logging.error(f"❌ compress_video error: {e}")
return False
def download_video(url: str, output_path: str,
cookies: list = None) -> bool:
"""
Download a video from a URL (MP4 or M3U8) using httpx or yt-dlp.
Falls back to yt-dlp for HLS streams or when direct download fails.
"""
# ── Try direct HTTP download first ────────────────────────────────
if not url.endswith(".m3u8"):
try:
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Referer": "https://www.tiktok.com/",
}
with httpx.stream("GET", url, headers=headers,
follow_redirects=True, timeout=60) as r:
r.raise_for_status()
with open(output_path, "wb") as f:
for chunk in r.iter_bytes(chunk_size=1024 * 64):
f.write(chunk)
size = os.path.getsize(output_path)
if size > 10_000:
logging.info(
f"✅ Direct download OK: {size / 1024 / 1024:.1f} MB"
)
return True
logging.warning(
f"⚠️ Direct download too small ({size} bytes), trying yt-dlp..."
)
except Exception as e:
logging.warning(f"⚠️ Direct download failed: {e}. Trying yt-dlp...")
# ── Fall back to yt-dlp ────────────────────────────────────────────
return download_video_ytdlp(url, output_path, cookies=cookies)
def download_video_ytdlp(url: str, output_path: str,
cookies: list = None) -> bool:
"""Download a video using yt-dlp, optionally injecting cookies."""
cookie_file = None
try:
import yt_dlp
ydl_opts = {
"outtmpl": output_path,
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"quiet": True,
"no_warnings": False,
"merge_output_format": "mp4",
}
# Write cookies to a temp Netscape file if provided
if cookies:
cookie_file = _write_netscape_cookies(cookies)
if cookie_file:
ydl_opts["cookiefile"] = cookie_file
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
if os.path.exists(output_path) and os.path.getsize(output_path) > 10_000:
logging.info(
f"✅ yt-dlp download OK: "
f"{os.path.getsize(output_path) / 1024 / 1024:.1f} MB"
)
return True
logging.error("❌ yt-dlp produced no output file or file too small.")
return False
except Exception as e:
logging.error(f"❌ yt-dlp download failed: {e}")
return False
finally:
if cookie_file and os.path.exists(cookie_file):
os.unlink(cookie_file)
def _write_netscape_cookies(cookies: list) -> str | None:
"""Write cookies list to a Netscape-format temp file for yt-dlp."""
try:
fd, path = tempfile.mkstemp(suffix=".txt", prefix="tiktok_cookies_")
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write("# Netscape HTTP Cookie File\n")
for c in cookies:
domain = c.get("domain", ".tiktok.com")
flag = "TRUE" if domain.startswith(".") else "FALSE"
path_val = c.get("path", "/")
secure = "TRUE" if c.get("secure") else "FALSE"
exp = int(c.get("expirationDate", 0) or c.get("expires", 0) or 0)
name = c.get("name", "")
value = c.get("value", "")
f.write(f"{domain}\t{flag}\t{path_val}\t{secure}\t{exp}\t{name}\t{value}\n")
return path
except Exception as e:
logging.warning(f"⚠️ Could not write Netscape cookie file: {e}")
return None
# ─────────────────────────────────────────────────────────────────────────────
# TikTok scraping via Playwright
# ─────────────────────────────────────────────────────────────────────────────
def _dismiss_overlays(page):
"""Dismiss cookie banners and RGPD modals."""
for sel in TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS:
try:
el = page.locator(sel).first
if el.is_visible(timeout=1500):
el.click(timeout=2000)
logging.info(f"🚫 Dismissed overlay: {sel}")
time.sleep(0.5)
except Exception:
pass
def _take_debug_screenshot(page, label: str):
"""Save a debug screenshot to workspace."""
try:
path = f"screenshot_{label}_{int(time.time())}.png"
page.screenshot(path=path)
logging.info(f"📸 Screenshot saved: {path}")
except Exception:
pass
def scrape_tiktoks_via_playwright(handle: str) -> list:
"""
Scrape recent videos from a public TikTok profile.
Returns a list of dicts: {id, url, desc, timestamp, video_url}
"""
profile_url = f"https://www.tiktok.com/@{handle.lstrip('@')}"
cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH)
videos = []
logging.info(f"🕷️ Scraping TikTok profile: {profile_url}")
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
slow_mo=PLAYWRIGHT_SLOW_MO,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-blink-features=AutomationControlled",
"--disable-dev-shm-usage",
"--disable-gpu",
],
)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 900},
locale="es-ES",
timezone_id="Europe/Madrid",
)
# Inject saved cookies
if cookies:
inject_cookies_into_context(context, cookies)
page = context.new_page()
# Stealth mode
# Stealth mode — compatible with both v1.x and v2.x
if _STEALTH_V2:
Stealth().apply_stealth_sync(page)
else:
stealth_sync(page)
# Mask automation signals
page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]});
Object.defineProperty(navigator, 'languages', {get: () => ['es-ES', 'es', 'en']});
""")
# ── Multi-attempt page load ────────────────────────────────────
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
logging.info(
f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
try:
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception as e:
logging.warning(f"⚠️ page.goto failed on attempt {attempt}: {e}")
_take_debug_screenshot(page, f"goto_fail_{attempt}")
if attempt < PLAYWRIGHT_MAX_RELOADS:
time.sleep(3.0)
continue
break
time.sleep(random.uniform(2.5, 4.0))
_dismiss_overlays(page)
time.sleep(1.5)
# Check for grid error state
try:
if page.locator(TIKTOK_GRID_ERROR_SEL).is_visible(timeout=2000):
logging.warning("⚠️ Grid error state detected. Clicking Refresh...")
try:
page.locator(TIKTOK_REFRESH_BTN_SEL).first.click(timeout=3000)
time.sleep(3.0)
except Exception:
pass
except Exception:
pass
# Wait for video grid
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
logging.info("✅ Video grid found.")
break
except Exception:
logging.warning(
f"⚠️ Video grid not found on attempt {attempt}."
)
_take_debug_screenshot(page, f"no_grid_{attempt}")
if attempt < PLAYWRIGHT_MAX_RELOADS:
time.sleep(3.0)
else:
logging.error("❌ Video grid never loaded after all attempts.")
_take_debug_screenshot(page, "final_fail")
browser.close()
return []
# ── Scroll to load more videos ─────────────────────────────────
logging.info("📜 Scrolling to load videos...")
for _ in range(5):
page.evaluate("window.scrollBy(0, window.innerHeight * 2)")
time.sleep(random.uniform(1.0, 2.0))
# ── Extract video items ────────────────────────────────────────
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
logging.info(f"📋 Found {len(items)} video items in grid.")
cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS)
for item in items[:SCRAPE_VIDEO_LIMIT]:
try:
# Get the link
link_el = item.locator("a").first
href = link_el.get_attribute("href") or ""
if not href or "/video/" not in href:
continue
# Normalise URL
if href.startswith("/"):
href = "https://www.tiktok.com" + href
# Extract video ID
vid_match = re.search(r"/video/(\d+)", href)
if not vid_match:
continue
video_id = vid_match.group(1)
# Get description (best-effort)
desc = ""
try:
desc = item.get_attribute("aria-label") or ""
if not desc:
desc_el = item.locator('[class*="desc"], [class*="title"]').first
desc = desc_el.inner_text(timeout=1000).strip()
except Exception:
pass
videos.append({
"id": video_id,
"url": href,
"desc": desc,
"timestamp": arrow.utcnow().isoformat(),
"video_url": href, # resolved later during download
})
except Exception as e:
logging.warning(f"⚠️ Error parsing video item: {e}")
continue
browser.close()
logging.info(f"✅ Scraped {len(videos)} videos from @{handle}.")
return videos
# ─────────────────────────────────────────────────────────────────────────────
# Core: process a single TikTok video → post to Bluesky
# ─────────────────────────────────────────────────────────────────────────────
def process_tiktok(video: dict, client: Client,
langs: list, state: dict) -> bool:
"""
Download, compress, and post a single TikTok video to Bluesky.
Returns True if successfully posted.
"""
video_id = video["id"]
video_url = video["url"]
desc = video.get("desc", "")
# ── Deduplication ──────────────────────────────────────────────────
if is_already_posted(video_id, state):
logging.info(f"⏭️ Skipping already-posted video: {video_id}")
return False
logging.info(f"🎬 Processing video {video_id}: {video_url}")
cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH)
with tempfile.TemporaryDirectory() as tmpdir:
raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4")
processed_path = os.path.join(tmpdir, f"{video_id}.mp4")
# ── Download ───────────────────────────────────────────────────
logging.info(f"⬇️ Downloading: {video_url}")
if not download_video(video_url, raw_path, cookies=cookies):
logging.error(f"❌ Download failed for {video_id}. Skipping.")
return False
# ── Compress / trim ────────────────────────────────────────────
if not compress_video(raw_path, processed_path):
logging.error(f"❌ Compression failed for {video_id}. Skipping.")
return False
# ── Size guard ─────────────────────────────────────────────────
final_size = os.path.getsize(processed_path)
if final_size > VIDEO_MAX_SIZE_BYTES:
logging.error(
f"❌ Compressed video still too large: "
f"{final_size / 1024 / 1024:.1f} MB > "
f"{VIDEO_MAX_SIZE_BYTES / 1024 / 1024:.0f} MB. Skipping."
)
return False
# ── Upload to Bluesky ──────────────────────────────────────────
logging.info(
f"⬆️ Uploading to Bluesky "
f"({final_size / 1024 / 1024:.1f} MB)..."
)
with open(processed_path, "rb") as f:
video_data = f.read()
try:
blob = bsky_upload_blob_with_retry(client, video_data, "video/mp4")
except Exception as e:
logging.error(f"❌ Blob upload failed for {video_id}: {e}")
return False
# ── Build post text ────────────────────────────────────────────
post_text = desc.strip() if desc else ""
if len(post_text) > 280:
post_text = post_text[:277] + "..."
if not post_text:
post_text = f"🎬 {video_url}"
# ── Build video embed ──────────────────────────────────────────
try:
from atproto import models
video_embed = models.AppBskyEmbedVideo.Main(
video=blob,
alt=desc[:1000] if desc else "",
)
except Exception as e:
logging.error(f"❌ Could not build video embed: {e}")
return False
# ── Create post ────────────────────────────────────────────────
success = bsky_create_post_with_retry(
client,
text=post_text,
embed=video_embed,
langs=langs,
)
if success:
mark_as_posted(video_id, state, {
"tiktok_url": video_url,
"desc": desc[:200] if desc else "",
})
logging.info(f"✅ Posted video {video_id} to Bluesky.")
return True
logging.error(f"❌ Failed to post video {video_id} to Bluesky.")
return False
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
def main():
global TIKTOK_COOKIES_PATH # must be first line in function
load_dotenv()
parser = argparse.ArgumentParser(
description="TikTok → Bluesky cross-poster"
)
parser.add_argument("--tiktok-handle", required=True,
help="TikTok handle to scrape (without @)")
parser.add_argument("--bsky-handle", required=True,
help="Bluesky handle (e.g. user.eurosky.social)")
parser.add_argument("--bsky-app-password", required=True,
help="Bluesky app password (not account password)")
parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL,
help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})")
parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS,
help="Post language codes (default: es)")
parser.add_argument("--cookies-path", default=TIKTOK_COOKIES_PATH,
help="Path to TikTok cookies JSON file")
args = parser.parse_args()
# Override global cookie path from CLI
TIKTOK_COOKIES_PATH = args.cookies_path
logging.info("=" * 60)
logging.info(f"🤖 TikTok→Bluesky bot started")
logging.info(f" TikTok handle : @{args.tiktok_handle}")
logging.info(f" Bluesky handle: {args.bsky_handle}")
logging.info(f" Bluesky PDS : {args.bsky_base_url}")
logging.info(f" Languages : {args.bsky_langs}")
logging.info(
f" Cookie file : {TIKTOK_COOKIES_PATH} "
f"({'✅ found' if os.path.exists(TIKTOK_COOKIES_PATH) else '❌ NOT FOUND'})"
)
logging.info("=" * 60)
state = load_state()
client = Client()
# ── Bluesky login ──────────────────────────────────────────────────
if not bsky_login(client, args.bsky_handle,
args.bsky_app_password,
args.bsky_base_url):
logging.error("❌ Cannot proceed without Bluesky login. Exiting.")
sys.exit(1)
# ── Scrape TikTok ──────────────────────────────────────────────────
logging.info(f"🔄 Scraping @{args.tiktok_handle}...")
tiktoks = scrape_tiktoks_via_playwright(args.tiktok_handle)
if not tiktoks:
logging.warning("⚠️ No TikTok videos found. Skipping sync.")
logging.info("🤖 Bot finished.")
return
logging.info(f"📋 Found {len(tiktoks)} video(s). Processing new ones...")
# ── Process each video ─────────────────────────────────────────────
posted = 0
for tiktok in tiktoks:
try:
if process_tiktok(tiktok, client, args.bsky_langs, state):
posted += 1
# Polite delay between posts
time.sleep(random.uniform(3.0, 7.0))
except Exception as e:
logging.error(
f"❌ Unexpected error processing video "
f"{tiktok.get('id', '?')}: {e}"
)
continue
logging.info("=" * 60)
logging.info(f"✅ Sync complete. Posted {posted} new video(s).")
logging.info("🤖 Bot finished.")
logging.info("=" * 60)
if __name__ == "__main__":
main()