Files
post2bsky/tiktok2bsky.py
Guillem Hernandez Sola 7fef3f3ab8 Tiktok example
2026-05-19 09:20:32 +02:00

1131 lines
41 KiB
Python

import argparse
import arrow
import hashlib
import html
import io
import json
import logging
import re
import httpx
import time
import os
import subprocess
import uuid
import random
from urllib.parse import urlparse
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
from bs4 import BeautifulSoup
from PIL import Image
import grapheme
# --- Configuration ---
LOG_PATH = "tiktok2bsky.log"
STATE_PATH = "tiktok2bsky_state.json"
SCRAPE_VIDEO_LIMIT = 15 # TikTok loads fewer items per scroll than Twitter
DEDUPE_BSKY_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 300
DEFAULT_BSKY_LANGS = ["ca"]
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
BSKY_IMAGE_MAX_BYTES = 950 * 1024
BSKY_IMAGE_MAX_DIMENSION = 2000
BSKY_IMAGE_MIN_JPEG_QUALITY = 45
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
EXTERNAL_THUMB_MAX_DIMENSION = 1200
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
BSKY_SEND_POST_MAX_RETRIES = 3
BSKY_SEND_POST_BASE_DELAY = 5
BSKY_SEND_POST_MAX_DELAY = 60
BSKY_LOGIN_MAX_RETRIES = 4
BSKY_LOGIN_BASE_DELAY = 10
BSKY_LOGIN_MAX_DELAY = 600
BSKY_LOGIN_JITTER_MAX = 1.5
MEDIA_DOWNLOAD_TIMEOUT = 30
LINK_METADATA_TIMEOUT = 10
SUBPROCESS_TIMEOUT_SECONDS = 180
FFPROBE_TIMEOUT_SECONDS = 15
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
SESSION_FILE_PERMISSIONS = 0o600
TIKTOK_SCROLL_PAUSE_S = 2.5 # pause between scrolls to let videos load
TIKTOK_MAX_SCROLLS = 5 # how many times to scroll down the profile
TIKTOK_PAGE_LOAD_WAIT_S = 3.0 # initial wait after profile page loads
DYNAMIC_ALT_MAX_LENGTH = 150
TRUNCATE_MIN_PREFIX_CHARS = 20
ORPHAN_DIGIT_MAX_DIGITS = 3
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(),
],
level=logging.INFO,
)
# --- Per-run caches ---
class _RunCache:
def __init__(self):
self.url_validity: dict = {}
self.video_hash_owner: dict = {}
self.video_url_owner: dict = {}
self.locale: str = "en-US"
def clear(self):
self.url_validity.clear()
self.video_hash_owner.clear()
self.video_url_owner.clear()
_cache = _RunCache()
def reset_caches():
_cache.clear()
# --- Custom Classes ---
class ScrapedMedia:
def __init__(self, url, media_type="video"):
self.type = media_type
self.media_url_https = url
class ScrapedTikTok:
"""Mirrors ScrapedTweet from twitter2bsky.py."""
def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None):
self.created_on = created_on # ISO8601 string or arrow-parseable
self.text = text # caption / description
self.post_url = post_url # https://www.tiktok.com/@user/video/123
self.thumbnail_url = thumbnail_url
self.media = [ScrapedMedia(video_url, "video")] if video_url else []
# --- Helpers (shared with twitter2bsky.py pattern) ---
def sha256_file(path, chunk_size=1024 * 1024):
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def grapheme_len(text):
return grapheme.length(text)
def remove_file_quietly(path):
if path and os.path.exists(path):
try:
os.remove(path)
logging.info(f"🧹 Removed temp file: {path}")
except Exception as e:
logging.warning(f"⚠️ Could not remove temp file {path}: {e}")
def take_error_screenshot(page, label):
timestamp = time.strftime("%Y%m%d_%H%M%S")
name = f"screenshot_{label}_{timestamp}.png"
try:
page.screenshot(path=name)
logging.info(f"📸 Screenshot saved: {name}")
except Exception as e:
logging.warning(f"⚠️ Could not save screenshot: {e}")
def clean_post_text(text):
raw = (text or "").strip()
raw = re.sub(r"\r", "\n", raw)
raw = re.sub(r"\n{3,}", "\n\n", raw)
return raw.strip()
def normalize_post_text(text):
if not text:
return ""
text = clean_post_text(text)
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if grapheme_len(text) <= max_length:
return text
clusters = list(grapheme.graphemes(text))
truncated = "".join(clusters[:max_length])
last_space = truncated.rfind(" ")
if last_space > TRUNCATE_MIN_PREFIX_CHARS:
return truncated[:last_space]
return truncated
def extract_tiktok_video_id(post_url):
"""Extract numeric video ID from a TikTok URL."""
if not post_url:
return None
match = re.search(r"/video/(\d+)", post_url)
return match.group(1) if match else None
def canonicalize_tiktok_url(url):
"""Normalize TikTok URL to a stable canonical form."""
if not url:
return None
match = re.search(
r"https?://(?:www\.)?tiktok\.com/@([^/]+)/video/(\d+)",
url, re.IGNORECASE,
)
if match:
return f"https://www.tiktok.com/@{match.group(1)}/video/{match.group(2)}"
return url.strip()
def make_unique_video_temp_base(post_url=None):
video_id = extract_tiktok_video_id(post_url) or "unknown"
ts_ms = int(time.time() * 1000)
rand = uuid.uuid4().hex[:8]
base = f"temp_tiktok_{video_id}_{ts_ms}_{rand}"
logging.info(f"🎞️ Using unique temp video base: {base}")
return base
def build_media_fingerprint(tiktok):
if not tiktok or not tiktok.media:
return "no-media"
parts = []
for media in tiktok.media:
media_url = getattr(media, "media_url_https", "") or ""
stable = canonicalize_tiktok_url(tiktok.post_url) or media_url
parts.append(f"video:{stable}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
if not parts:
return "no-media"
parts.sort()
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(
f"{normalized_text}||{media_fingerprint}".encode("utf-8")
).hexdigest()
# --- Bluesky login / retry helpers (identical pattern to twitter2bsky.py) ---
def is_rate_limited_error(e):
t = repr(e).lower()
return "429" in t or "ratelimitexceeded" in t or "too many requests" in t
def is_auth_error(e):
t = repr(e).lower()
return "401" in t or "403" in t or "invalid identifier" in t
def is_transient_error(e):
signals = ["InvokeTimeoutError","ReadTimeout","WriteTimeout",
"RemoteProtocolError","ConnectError","503","502","504"]
return any(s in repr(e) for s in signals)
def is_network_error(e):
signals = ["ConnectError","RemoteProtocolError","ReadTimeout",
"WriteTimeout","TimeoutException","503","502","504"]
return any(s in repr(e) for s in signals)
def get_rate_limit_wait_seconds(e, default_delay):
try:
headers = getattr(e, "headers", None) or {}
ra = headers.get("retry-after") or headers.get("Retry-After")
if ra:
return min(max(int(ra), 1), BSKY_LOGIN_MAX_DELAY)
except Exception:
pass
return default_delay
def create_bsky_client(base_url, handle, password):
normalized = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
client = Client(base_url=normalized)
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
try:
client.login(handle, password)
logging.info("✅ Bluesky login successful.")
return client
except Exception as e:
if is_auth_error(e):
raise
if attempt < BSKY_LOGIN_MAX_RETRIES:
wait = min(BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY)
wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX)
logging.warning(f"⏳ Bluesky login retry {attempt} in {wait:.1f}s: {e}")
time.sleep(wait)
continue
raise
raise RuntimeError("Bluesky login failed after all retries.")
# --- State management (identical pattern) ---
def default_state():
return {"version": 1, "posted_videos": {}, "posted_by_bsky_uri": {}, "updated_at": None}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
state.setdefault("version", 1)
state.setdefault("posted_videos", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp = f"{state_path}.tmp"
with open(temp, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state: {e}")
def remember_posted_video(state, candidate, bsky_uri=None):
key = candidate.get("canonical_post_url") or f"textmedia:{candidate['text_media_key']}"
state["posted_videos"][key] = {
"canonical_post_url": candidate.get("canonical_post_url"),
"normalized_text": candidate["normalized_text"],
"text_media_key": candidate["text_media_key"],
"media_fingerprint": candidate["media_fingerprint"],
"bsky_uri": bsky_uri,
"video_created_on": candidate["tiktok"].created_on,
"post_url": candidate["tiktok"].post_url,
"video_id": candidate.get("video_id"),
"posted_at": arrow.utcnow().isoformat(),
}
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = key
def candidate_matches_state(candidate, state):
canonical_url = candidate["canonical_post_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted = state.get("posted_videos", {})
if canonical_url and canonical_url in posted:
return True, "state:post_url"
for rec in posted.values():
if rec.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for rec in posted.values():
if rec.get("normalized_text") == normalized_text and normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted = state.get("posted_videos", {})
if len(posted) <= max_entries:
return state
sortable = sorted(posted.items(), key=lambda x: x[1].get("posted_at", ""), reverse=True)
keep = {k for k, _ in sortable[:max_entries]}
state["posted_videos"] = {k: v for k, v in posted.items() if k in keep}
state["posted_by_bsky_uri"] = {
uri: k for uri, k in state.get("posted_by_bsky_uri", {}).items() if k in keep
}
return state
# --- Bluesky feed helpers ---
def get_recent_bsky_posts(client, handle, limit=30):
recent = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized = normalize_post_text(text)
media_fp = build_bsky_media_fingerprint(item.post)
recent.append({
"uri": getattr(item.post, "uri", None),
"normalized_text": normalized,
"media_fingerprint": media_fp,
"text_media_key": build_text_media_key(normalized, media_fp),
})
except Exception as e:
logging.debug(f"Skipping feed item: {e}")
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts: {e}")
return recent
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
for existing in recent_bsky_posts:
if candidate["text_media_key"] == existing["text_media_key"]:
return True, "bsky:text_media_fingerprint"
if candidate["normalized_text"] and candidate["normalized_text"] == existing["normalized_text"]:
return True, "bsky:normalized_text"
return False, None
# --- Upload / blob helpers (same as twitter2bsky.py) ---
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
result = client.upload_blob(binary_data)
return result.blob
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s.")
time.sleep(wait)
continue
break
if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
transient_attempts += 1
wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
logging.warning(f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s.")
time.sleep(wait)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
return None
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
return None
def send_post_with_retry(client, **kwargs):
last_exception = None
for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1):
try:
return client.send_post(**kwargs)
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY)
if attempt < BSKY_SEND_POST_MAX_RETRIES:
time.sleep(wait)
continue
raise
if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES:
time.sleep(BSKY_SEND_POST_BASE_DELAY * attempt)
continue
raise
raise last_exception
def get_blob_from_file(file_path, client):
try:
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}")
return None
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.warning(f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB")
return None
with open(file_path, "rb") as f:
data = f.read()
return upload_blob_with_retry(client, data, media_label=file_path)
except Exception as e:
logging.warning(f"Could not upload file {file_path}: {repr(e)}")
return None
def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
except AttributeError:
logging.error("❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
return None
def build_dynamic_alt(text):
alt = clean_post_text(text or "").replace("\n", " ").strip()
alt = re.sub(r"(?:https?://|www\.)\S+", "", alt).strip()
if not alt:
alt = "TikTok video"
return alt[:DYNAMIC_ALT_MAX_LENGTH]
def make_rich(content):
"""Build a Bluesky TextBuilder with hashtag and URL facets."""
text_builder = client_utils.TextBuilder()
content = clean_post_text(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("#") and len(word) > 1:
tag = word[1:].rstrip(".,;:!?)'\"")
if tag:
text_builder.tag(word, tag)
else:
text_builder.text(word)
elif word.startswith(("http://", "https://")):
text_builder.link(word, word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# --- TikTok Scraping ---
def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> list:
"""
Scrape recent TikTok videos from a public profile using Playwright.
No login required for public profiles.
Returns a list of ScrapedTikTok objects.
"""
tiktoks = []
profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}"
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
],
)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
viewport={"width": 1920, "height": 1080},
locale=locale,
# TikTok checks these headers — set them explicitly
extra_http_headers={
"Accept-Language": f"{locale},en;q=0.9",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
},
)
page = context.new_page()
try:
logging.info(f"🌐 Navigating to TikTok profile: {profile_url}")
page.goto(profile_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# Dismiss cookie/consent banners if present
for selector in [
'button:has-text("Accept all")',
'button:has-text("Decline optional")',
'[data-e2e="cookie-banner-accept"]',
]:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=2000):
btn.click()
time.sleep(1)
break
except Exception:
pass
# Wait for video grid to appear
try:
page.wait_for_selector(
'[data-e2e="user-post-item"], '
'[class*="DivItemContainerV2"], '
'a[href*="/video/"]',
timeout=20000,
)
except Exception:
take_error_screenshot(page, "tiktok_profile_load_failed")
logging.error("❌ TikTok video grid did not appear.")
browser.close()
return []
# Scroll to load more videos
for scroll_i in range(TIKTOK_MAX_SCROLLS):
page.evaluate("window.scrollBy(0, window.innerHeight * 2)")
time.sleep(TIKTOK_SCROLL_PAUSE_S)
logging.info(f"📜 Scroll {scroll_i + 1}/{TIKTOK_MAX_SCROLLS}")
# Collect video links
video_links = page.locator('a[href*="/video/"]').all()
logging.info(f"📊 Found {len(video_links)} video links. Parsing up to {SCRAPE_VIDEO_LIMIT}...")
seen_urls = set()
for link in video_links:
if len(tiktoks) >= SCRAPE_VIDEO_LIMIT:
break
try:
href = link.get_attribute("href")
if not href:
continue
post_url = (
f"https://www.tiktok.com{href}"
if href.startswith("/")
else href
)
canonical = canonicalize_tiktok_url(post_url)
if not canonical or canonical in seen_urls:
continue
if "/video/" not in canonical:
continue
seen_urls.add(canonical)
# Try to get caption from the card itself (avoids opening each video)
caption = ""
try:
# The caption is often in a sibling/child element
card = link.locator("..").first
caption_el = card.locator(
'[data-e2e="video-desc"], '
'[class*="SpanUniqueId"], '
'p[class*="caption"]'
).first
if caption_el.is_visible(timeout=1000):
caption = caption_el.inner_text()
except Exception:
pass
# Thumbnail
thumbnail_url = None
try:
img = link.locator("img").first
if img.is_visible(timeout=1000):
thumbnail_url = img.get_attribute("src")
except Exception:
pass
# TikTok doesn't expose post timestamps in the grid —
# use now as a conservative estimate; dedup prevents re-posting
created_on = arrow.utcnow().isoformat()
tiktoks.append(
ScrapedTikTok(
created_on=created_on,
text=caption,
video_url=canonical, # placeholder; real URL resolved later
post_url=canonical,
thumbnail_url=thumbnail_url,
)
)
logging.info(f"🎵 Scraped TikTok: {canonical}")
except Exception as e:
logging.warning(f"⚠️ Failed to parse video card: {e}")
continue
except Exception as e:
take_error_screenshot(page, "tiktok_scrape_failed")
logging.error(f"❌ Failed to scrape TikTok profile: {e}")
browser.close()
logging.info(f"✅ Scraped {len(tiktoks)} TikTok videos.")
return tiktoks
# --- Video extraction ---
def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None) -> str | None:
"""
Open a single TikTok video page in an isolated context and intercept
the actual MP4/HLS stream URL from network responses.
Mirrors extract_video_url_from_tweet_page_isolated() in twitter2bsky.py.
"""
ctx = None
page = None
best_mp4_url = None
best_m3u8_url = None
seen_urls = set()
def current_best():
return best_mp4_url or best_m3u8_url
def handle_response(response):
nonlocal best_mp4_url, best_m3u8_url
try:
url = response.url
if not url or url in seen_urls:
return
seen_urls.add(url)
content_type = (response.headers.get("content-type") or "").lower()
url_l = url.lower()
# Skip audio-only and segment files
if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l:
return
if ".m3u8" in url_l or "mpegurl" in content_type:
if best_m3u8_url is None:
best_m3u8_url = url
return
if ".mp4" in url_l or "video/mp4" in content_type:
if best_mp4_url is None:
best_mp4_url = url
return
except Exception as e:
logging.debug(f"Response parse error: {e}")
try:
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
viewport={"width": 1920, "height": 1080},
)
page = ctx.new_page()
page.on("response", handle_response)
logging.info(f"[video_id={video_id}] 🎬 Opening TikTok video page: {post_url}")
page.goto(post_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(2)
# Try clicking the video player to trigger stream loading
for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']:
try:
player = page.locator(selector).first
if player.count() > 0:
player.click(force=True, timeout=3000)
break
except Exception:
pass
# Wait up to 10s for a stream URL to appear
for _ in range(10):
if current_best():
break
time.sleep(1)
selected = current_best()
logging.info(f"[video_id={video_id}] ✅ Resolved video URL: {selected}")
return selected
except Exception as e:
logging.warning(f"[video_id={video_id}] ⚠️ Could not extract video URL: {e}")
return None
finally:
try:
if page:
page.remove_listener("response", handle_response)
page.close()
except Exception:
pass
try:
if ctx:
ctx.close()
except Exception:
pass
# --- Video download + compress (same ffmpeg pipeline as twitter2bsky.py) ---
def _probe_video_duration(file_path):
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS,
)
if result.returncode != 0:
raise RuntimeError(f"ffprobe error: {result.stderr.strip()}")
duration_str = result.stdout.strip()
if not duration_str:
raise RuntimeError("ffprobe returned empty duration")
return float(duration_str)
def download_and_crop_video(video_url: str, output_path: str) -> str | None:
"""Identical ffmpeg pipeline to twitter2bsky.py."""
temp_input = output_path.replace(".mp4", "_source.mp4")
temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4")
temp_output = output_path.replace(".mp4", "_compressed.mp4")
try:
logging.info(f"⬇️ Downloading TikTok video: {video_url}")
url_l = video_url.lower()
if ".m3u8" in url_l:
download_cmd = [
"ffmpeg", "-y",
"-protocol_whitelist", "file,http,https,tcp,tls,crypto",
"-allowed_extensions", "ALL",
"-i", video_url, "-c", "copy", temp_input,
]
else:
download_cmd = [
"ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input,
]
result = subprocess.run(download_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS)
if result.returncode != 0:
logging.error(f"❌ ffmpeg download failed:\n{result.stderr}")
return None
if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0:
logging.error("❌ Downloaded file is missing or empty.")
return None
duration = _probe_video_duration(temp_input)
if duration <= 0:
logging.error("❌ Invalid video duration.")
return None
end_time = min(VIDEO_MAX_DURATION_SECONDS, duration)
end_time = min(end_time, duration - 0.05)
end_time = max(end_time, 0.1)
from moviepy import VideoFileClip
video_clip = VideoFileClip(temp_input)
try:
if hasattr(video_clip, "subclipped"):
cropped = video_clip.subclipped(0, end_time)
else:
cropped = video_clip.subclip(0, end_time)
try:
cropped.write_videofile(
temp_trimmed, codec="libx264", audio_codec="aac",
preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None,
)
finally:
cropped.close()
finally:
video_clip.close()
if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0:
logging.error("❌ Trimmed video is missing or empty.")
return None
compress_cmd = [
"ffmpeg", "-y", "-i", temp_trimmed,
"-vf", "scale='min(720,iw)':-2",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "30",
"-maxrate", "1800k", "-bufsize", "3600k",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart", temp_output,
]
result = subprocess.run(compress_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS)
if result.returncode != 0:
logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}")
return None
if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0:
logging.error("❌ Compressed video is missing or empty.")
return None
os.replace(temp_output, output_path)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
logging.info(f"✅ Video ready: {output_path} ({size_mb:.2f} MB)")
return output_path
except subprocess.TimeoutExpired:
logging.error(f"❌ ffmpeg timed out after {SUBPROCESS_TIMEOUT_SECONDS}s")
return None
except Exception as e:
logging.error(f"❌ Video processing error: {repr(e)}")
return None
finally:
remove_file_quietly(temp_input)
remove_file_quietly(temp_trimmed)
remove_file_quietly(temp_output)
# --- Main sync logic ---
def sync_feeds(args):
logging.info("🔄 Starting TikTok → Bluesky sync cycle...")
dry_run = getattr(args, "dry_run", False)
bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS
if dry_run:
logging.info("🧪 DRY RUN MODE — no posts will be created on Bluesky.")
try:
state = load_state(STATE_PATH)
state = prune_state(state, max_entries=5000)
tiktoks = scrape_tiktoks_via_playwright(
args.tiktok_handle,
locale=bsky_langs[0] if bsky_langs else "en-US",
)
if not tiktoks:
logging.warning("⚠️ No TikTok videos found. Skipping sync.")
return
bsky_client = None
if not dry_run:
bsky_client = create_bsky_client(
args.bsky_base_url, args.bsky_handle, args.bsky_password,
)
recent_bsky_posts = []
if not dry_run:
recent_bsky_posts = get_recent_bsky_posts(
bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT,
)
too_old_cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS)
# --- Build candidates ---
candidates = []
for tiktok in reversed(tiktoks):
try:
# TikTok grid doesn't expose timestamps reliably —
# use state-based dedup as primary guard
canonical_url = canonicalize_tiktok_url(tiktok.post_url)
if canonical_url and canonical_url in state.get("posted_videos", {}):
logging.info(f"⚡ Early skip (already in state): {canonical_url}")
continue
text = clean_post_text(tiktok.text or "")
normalized_text = normalize_post_text(text)
media_fp = build_media_fingerprint(tiktok)
text_media_key = build_text_media_key(normalized_text, media_fp)
video_id = extract_tiktok_video_id(tiktok.post_url)
candidate = {
"tiktok": tiktok,
"raw_text": truncate_text_safely(text),
"normalized_text": normalized_text,
"media_fingerprint": media_fp,
"text_media_key": text_media_key,
"canonical_post_url": canonical_url,
"video_id": video_id,
"resolved_video_url": None,
"resolved_video_hash": None,
}
is_dup_state, reason = candidate_matches_state(candidate, state)
if is_dup_state:
logging.info(f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}")
continue
is_dup_bsky, reason = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
if is_dup_bsky:
logging.info(f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}")
continue
candidates.append(candidate)
except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate: {e}")
logging.info(f"📬 {len(candidates)} new TikTok videos to post after dedup.")
if not candidates:
logging.info("✅ Nothing new to post.")
return
# --- Pre-resolve video URLs ---
with sync_playwright() as p_pre:
pre_browser = p_pre.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
try:
for c in candidates:
c["resolved_video_url"] = extract_tiktok_video_url_isolated(
pre_browser,
c["tiktok"].post_url,
video_id=c.get("video_id"),
)
finally:
pre_browser.close()
# --- Post to Bluesky ---
new_posts = 0
for candidate in candidates:
tiktok = candidate["tiktok"]
raw_text = candidate["raw_text"]
logging.info(
f"📝 {'[DRY RUN] Would post' if dry_run else 'Posting'} "
f"TikTok video: {tiktok.post_url}"
)
if dry_run:
logging.info(f" 📄 Caption: {raw_text[:200]}")
remember_posted_video(state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}")
save_state(state, STATE_PATH)
new_posts += 1
continue
real_video_url = candidate.get("resolved_video_url")
video_embed = None
video_blob = None
if real_video_url:
temp_base = make_unique_video_temp_base(tiktok.post_url)
temp_path = f"{temp_base}.mp4"
try:
cropped_path = download_and_crop_video(real_video_url, temp_path)
if cropped_path:
video_hash = sha256_file(cropped_path)
candidate["resolved_video_hash"] = video_hash
owner = _cache.video_hash_owner.get(video_hash)
if owner and owner != candidate["video_id"]:
logging.warning(f"⚠️ Video hash owned by another video. Skipping.")
else:
_cache.video_hash_owner[video_hash] = candidate["video_id"]
video_blob = get_blob_from_file(cropped_path, bsky_client)
if video_blob:
alt = build_dynamic_alt(raw_text)
video_embed = build_video_embed(video_blob, alt)
finally:
remove_file_quietly(temp_path)
remove_file_quietly(f"{temp_base}_source.mp4")
remove_file_quietly(f"{temp_base}_trimmed.mp4")
remove_file_quietly(f"{temp_base}_compressed.mp4")
else:
logging.warning(f"⚠️ Could not resolve video URL for {tiktok.post_url}")
try:
rich_text = make_rich(raw_text)
if video_embed:
post_result = send_post_with_retry(
bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs,
)
post_mode = "video"
else:
# Fallback: post caption as text-only with link to TikTok
fallback_text = make_rich(
f"{raw_text}\n\n{tiktok.post_url}".strip()
)
post_result = send_post_with_retry(
bsky_client, text=fallback_text, langs=bsky_langs,
)
post_mode = "text_only_fallback"
bsky_uri = getattr(post_result, "uri", None)
remember_posted_video(state, candidate, bsky_uri=bsky_uri)
state = prune_state(state, max_entries=5000)
save_state(state, STATE_PATH)
recent_bsky_posts.insert(0, {
"uri": bsky_uri,
"normalized_text": candidate["normalized_text"],
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
})
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1
logging.info(f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}")
time.sleep(5)
except Exception as e:
logging.error(f"❌ Failed to post to Bluesky: {e}")
logging.info(f"✅ Sync complete. Posted {new_posts} new TikTok videos.")
except Exception as e:
logging.error(f"❌ Error during sync cycle: {e}")
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync")
parser.add_argument("--tiktok-handle", help="TikTok account handle to scrape (without @)")
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
parser.add_argument("--bsky-password", help="Your Bluesky app password")
parser.add_argument("--bsky-base-url", help="Bluesky PDS base URL", default=None)
parser.add_argument("--bsky-langs", help="Comma-separated language codes", default=None)
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.bsky_base_url = args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL
raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS")
args.bsky_langs = (
[l.strip() for l in raw_langs.split(",") if l.strip()]
if raw_langs else DEFAULT_BSKY_LANGS
)
missing = []
if not args.tiktok_handle:
missing.append("--tiktok-handle / TIKTOK_HANDLE")
if not args.bsky_handle:
missing.append("--bsky-handle / BSKY_HANDLE")
if not args.bsky_password:
missing.append("--bsky-password / BSKY_APP_PASSWORD")
if missing:
logging.error(f"❌ Missing: {', '.join(missing)}")
return
logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}")
reset_caches()
sync_feeds(args)
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()