Tiktok example 3

This commit is contained in:
Guillem Hernandez Sola
2026-05-19 09:37:27 +02:00
parent ef32a15cbc
commit 23e87c17b7

View File

@@ -24,15 +24,11 @@ import grapheme
# --- Configuration ---
LOG_PATH = "tiktok2bsky.log"
STATE_PATH = "tiktok2bsky_state.json"
SCRAPE_VIDEO_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 300
DEFAULT_BSKY_LANGS = ["ca"]
TIKTOK_PAGE_LOAD_WAIT_S = 5.0 # was 3.0 — increased for slower grid render
TIKTOK_MAX_SCROLLS = 8 # was 5 — more scrolls = more videos discovered
SCRAPE_VIDEO_LIMIT = 30 # was 15
DEFAULT_BSKY_LANGS = ["es"]
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
@@ -67,12 +63,39 @@ FFPROBE_TIMEOUT_SECONDS = 15
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
SESSION_FILE_PERMISSIONS = 0o600
TIKTOK_SCROLL_PAUSE_S = 2.5 # pause between scrolls to let videos load
TIKTOK_PAGE_LOAD_WAIT_S = 3.0 # initial wait after profile page loads
TIKTOK_PAGE_LOAD_WAIT_S = 5.0 # increased from 3.0
TIKTOK_SCROLL_PAUSE_S = 2.5
TIKTOK_MAX_SCROLLS = 8 # increased from 5
TIKTOK_BANNER_WAIT_S = 3.0 # wait after dismissing cookie banner
DYNAMIC_ALT_MAX_LENGTH = 150
TRUNCATE_MIN_PREFIX_CHARS = 20
ORPHAN_DIGIT_MAX_DIGITS = 3
# --- Cookie banner selectors (Spanish + English) ---
GDPR_SELECTORS = [
'button:has-text("Permitir todas")', # ← exact text shown on screen
'button:has-text("Rechazar cookies opcionales")',
'button:has-text("Entendido")',
'button:has-text("Aceptar todo")',
'button:has-text("Accept all")',
'button:has-text("Got it")',
'button:has-text("Decline optional")',
'[data-e2e="cookie-banner-accept"]',
'[id*="accept"]',
'[class*="accept-btn"]',
]
# --- Video grid selectors ---
GRID_SELECTORS = (
'[data-e2e="user-post-item"], '
'[class*="DivItemContainerV2"], '
'a[href*="/video/"], '
'[class*="video-feed"], '
'div[class*="VideoFeed"], '
'[class*="DivVideoFeedV2"]'
)
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
@@ -90,7 +113,7 @@ class _RunCache:
self.url_validity: dict = {}
self.video_hash_owner: dict = {}
self.video_url_owner: dict = {}
self.locale: str = "en-US"
self.locale: str = "es-ES"
def clear(self):
self.url_validity.clear()
@@ -115,14 +138,14 @@ class ScrapedMedia:
class ScrapedTikTok:
"""Mirrors ScrapedTweet from twitter2bsky.py."""
def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None):
self.created_on = created_on # ISO8601 string or arrow-parseable
self.text = text # caption / description
self.post_url = post_url # https://www.tiktok.com/@user/video/123
self.created_on = created_on
self.text = text
self.post_url = post_url
self.thumbnail_url = thumbnail_url
self.media = [ScrapedMedia(video_url, "video")] if video_url else []
# --- Helpers (shared with twitter2bsky.py pattern) ---
# --- Helpers ---
def sha256_file(path, chunk_size=1024 * 1024):
h = hashlib.sha256()
with open(path, "rb") as f:
@@ -184,7 +207,6 @@ def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
def extract_tiktok_video_id(post_url):
"""Extract numeric video ID from a TikTok URL."""
if not post_url:
return None
match = re.search(r"/video/(\d+)", post_url)
@@ -192,7 +214,6 @@ def extract_tiktok_video_id(post_url):
def canonicalize_tiktok_url(url):
"""Normalize TikTok URL to a stable canonical form."""
if not url:
return None
match = re.search(
@@ -251,25 +272,33 @@ def build_text_media_key(normalized_text, media_fingerprint):
).hexdigest()
# --- Bluesky login / retry helpers (identical pattern to twitter2bsky.py) ---
# --- Bluesky login / retry helpers ---
def is_rate_limited_error(e):
t = repr(e).lower()
return "429" in t or "ratelimitexceeded" in t or "too many requests" in t
def is_auth_error(e):
t = repr(e).lower()
return "401" in t or "403" in t or "invalid identifier" in t
def is_transient_error(e):
signals = ["InvokeTimeoutError","ReadTimeout","WriteTimeout",
"RemoteProtocolError","ConnectError","503","502","504"]
signals = [
"InvokeTimeoutError", "ReadTimeout", "WriteTimeout",
"RemoteProtocolError", "ConnectError", "503", "502", "504",
]
return any(s in repr(e) for s in signals)
def is_network_error(e):
signals = ["ConnectError","RemoteProtocolError","ReadTimeout",
"WriteTimeout","TimeoutException","503","502","504"]
signals = [
"ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "503", "502", "504",
]
return any(s in repr(e) for s in signals)
def get_rate_limit_wait_seconds(e, default_delay):
try:
headers = getattr(e, "headers", None) or {}
@@ -302,9 +331,14 @@ def create_bsky_client(base_url, handle, password):
raise RuntimeError("Bluesky login failed after all retries.")
# --- State management (identical pattern) ---
# --- State management ---
def default_state():
return {"version": 1, "posted_videos": {}, "posted_by_bsky_uri": {}, "updated_at": None}
return {
"version": 1,
"posted_videos": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
@@ -373,11 +407,17 @@ def prune_state(state, max_entries=5000):
posted = state.get("posted_videos", {})
if len(posted) <= max_entries:
return state
sortable = sorted(posted.items(), key=lambda x: x[1].get("posted_at", ""), reverse=True)
sortable = sorted(
posted.items(),
key=lambda x: x[1].get("posted_at", ""),
reverse=True,
)
keep = {k for k, _ in sortable[:max_entries]}
state["posted_videos"] = {k: v for k, v in posted.items() if k in keep}
state["posted_by_bsky_uri"] = {
uri: k for uri, k in state.get("posted_by_bsky_uri", {}).items() if k in keep
uri: k
for uri, k in state.get("posted_by_bsky_uri", {}).items()
if k in keep
}
return state
@@ -414,12 +454,15 @@ def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
for existing in recent_bsky_posts:
if candidate["text_media_key"] == existing["text_media_key"]:
return True, "bsky:text_media_fingerprint"
if candidate["normalized_text"] and candidate["normalized_text"] == existing["normalized_text"]:
if (
candidate["normalized_text"]
and candidate["normalized_text"] == existing["normalized_text"]
):
return True, "bsky:normalized_text"
return False, None
# --- Upload / blob helpers (same as twitter2bsky.py) ---
# --- Upload / blob helpers ---
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
@@ -430,16 +473,26 @@ def upload_blob_with_retry(client, binary_data, media_label="media"):
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY)
wait = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY,
)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s.")
logging.warning(
f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s."
)
time.sleep(wait)
continue
break
if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
if (
is_transient_error(e)
and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES
):
transient_attempts += 1
wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
logging.warning(f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s.")
logging.warning(
f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s."
)
time.sleep(wait)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
@@ -456,7 +509,10 @@ def send_post_with_retry(client, **kwargs):
except Exception as e:
last_exception = e
if "429" in str(e) or "RateLimitExceeded" in str(e):
wait = min(BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY)
wait = min(
BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_SEND_POST_MAX_DELAY,
)
if attempt < BSKY_SEND_POST_MAX_RETRIES:
time.sleep(wait)
continue
@@ -475,7 +531,9 @@ def get_blob_from_file(file_path, client):
return None
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.warning(f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB")
logging.warning(
f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB"
)
return None
with open(file_path, "rb") as f:
data = f.read()
@@ -489,7 +547,9 @@ def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
except AttributeError:
logging.error("❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
logging.error(
"❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto."
)
return None
@@ -535,30 +595,35 @@ def make_rich(content):
# --- TikTok Scraping ---
# --- TikTok Scraping ---
def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> list:
def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "es-ES") -> list:
"""
Scrape recent TikTok videos from a public profile using Playwright.
No login required for public profiles.
Returns a list of ScrapedTikTok objects.
Fixes applied:
1. Aggressive GDPR/consent banner dismissal (Spanish + English)
2. Stealth headers: timezone, locale, sec-ch-ua, webdriver flag hidden
3. playwright-stealth applied before navigation
4. Broader + longer grid selector wait (30s, more selectors)
1. Aggressive GDPR/cookie banner dismissal Spanish + English,
waits TIKTOK_BANNER_WAIT_S after click for grid to render.
2. Stealth headers: Windows Chrome UA, Europe/Madrid timezone,
es-ES locale, sec-ch-ua headers, navigator.webdriver hidden.
3. playwright-stealth applied before navigation (graceful fallback
if not installed).
4. Broader grid selector list + 30s timeout + continues with scroll
even if selector times out instead of hard-failing.
"""
tiktoks = []
profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}"
# playwright-stealth is optional but strongly recommended
# FIX 3 — playwright-stealth (optional but strongly recommended)
try:
from playwright_stealth import stealth_sync
USE_STEALTH = True
logging.info("🥷 playwright-stealth available — stealth mode ON")
except ImportError:
USE_STEALTH = False
logging.warning("⚠️ playwright-stealth not installed — running without stealth")
logging.warning(
"⚠️ playwright-stealth not installed — running without stealth. "
"Run: pip install playwright-stealth"
)
with sync_playwright() as p:
browser = p.chromium.launch(
@@ -573,7 +638,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
],
)
# FIX 2 — Fake a real Windows Chrome browser with Spanish locale + Madrid timezone
# FIX 2 — Fake a real Windows Chrome with Spanish locale + Madrid timezone
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
@@ -592,7 +657,10 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"Sec-Ch-Ua": (
'"Chromium";v="124", "Google Chrome";v="124", '
'"Not-A.Brand";v="99"'
),
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
},
@@ -605,7 +673,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
stealth_sync(page)
logging.info("🥷 Stealth patches applied.")
# FIX 2 — Hide webdriver flag + fake plugins/languages via init script
# FIX 2 — Hide webdriver flag + fake plugins/languages
page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', {
@@ -630,49 +698,36 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
logging.info(f"🌐 Navigating to TikTok profile: {profile_url}")
page.goto(profile_url, wait_until="domcontentloaded", timeout=40000)
# FIX 1 — Wait longer for initial page render (was 3.0s)
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S + 2)
# FIX 1 — Wait for page to settle before looking for banner
time.sleep(TIKTOK_PAGE_LOAD_WAIT_S)
# FIX 1 — Aggressive GDPR/consent banner dismissal (Spanish + English)
GDPR_SELECTORS = [
'button:has-text("Entendido")',
'button:has-text("Aceptar todo")',
'button:has-text("Accept all")',
'button:has-text("Got it")',
'button:has-text("Decline optional")',
'[data-e2e="cookie-banner-accept"]',
'[id*="accept"]',
'[class*="accept-btn"]',
]
# FIX 1 — Dismiss cookie/consent banner BEFORE waiting for grid
banner_dismissed = False
for selector in GDPR_SELECTORS:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=3000):
btn.click()
logging.info(f"✅ Dismissed banner: {selector}")
time.sleep(2)
logging.info(f"✅ Dismissed cookie banner: {selector}")
time.sleep(TIKTOK_BANNER_WAIT_S) # wait for grid to render
banner_dismissed = True
break
except Exception:
pass
# FIX 4 — Broader selector list + longer timeout (30s, was 20s)
GRID_SELECTORS = (
'[data-e2e="user-post-item"], '
'[class*="DivItemContainerV2"], '
'a[href*="/video/"], '
'[class*="video-feed"], '
'div[class*="VideoFeed"], '
'[class*="DivVideoFeedV2"]'
)
if not banner_dismissed:
logging.info(" No cookie banner found — continuing.")
# FIX 4 — Broader selector + longer timeout (30s) + soft fail
try:
page.wait_for_selector(GRID_SELECTORS, timeout=30000)
logging.info("✅ TikTok video grid detected.")
except Exception:
# FIX 4 — Don't give up immediately: try scrolling anyway
logging.warning(
"⚠️ Grid selector timed out — attempting scroll anyway "
"(grid may still be partially loaded)"
"⚠️ Grid selector timed out after 30s — "
"attempting scroll anyway (grid may be partially loaded)"
)
take_error_screenshot(page, "tiktok_grid_timeout")
# Scroll to load more videos
for scroll_i in range(TIKTOK_MAX_SCROLLS):
@@ -689,7 +744,10 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
if not video_links:
take_error_screenshot(page, "tiktok_no_video_links")
logging.error("❌ No video links found after scroll. TikTok may be blocking.")
logging.error(
"❌ No video links found after scroll. "
"TikTok may still be blocking — check screenshot."
)
browser.close()
return []
@@ -714,7 +772,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
continue
seen_urls.add(canonical)
# Try to get caption from the card itself
# Try to get caption from the card
caption = ""
try:
card = link.locator("..").first
@@ -763,12 +821,12 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") ->
logging.info(f"✅ Scraped {len(tiktoks)} TikTok videos.")
return tiktoks
# --- Video extraction ---
def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None) -> str | None:
# --- Video URL extraction ---
def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None):
"""
Open a single TikTok video page in an isolated context and intercept
the actual MP4/HLS stream URL from network responses.
Mirrors extract_video_url_from_tweet_page_isolated() in twitter2bsky.py.
"""
ctx = None
page = None
@@ -789,7 +847,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No
content_type = (response.headers.get("content-type") or "").lower()
url_l = url.lower()
# Skip audio-only and segment files
if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l:
return
@@ -808,11 +865,11 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No
try:
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1920, "height": 1080},
viewport={"width": 1366, "height": 768},
)
page = ctx.new_page()
page.on("response", handle_response)
@@ -821,7 +878,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No
page.goto(post_url, wait_until="domcontentloaded", timeout=40000)
time.sleep(2)
# Try clicking the video player to trigger stream loading
for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']:
try:
player = page.locator(selector).first
@@ -831,7 +887,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No
except Exception:
pass
# Wait up to 10s for a stream URL to appear
for _ in range(10):
if current_best():
break
@@ -858,11 +913,15 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No
pass
# --- Video download + compress (same ffmpeg pipeline as twitter2bsky.py) ---
# --- Video download + compress ---
def _probe_video_duration(file_path):
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path,
],
capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS,
)
if result.returncode != 0:
@@ -873,8 +932,7 @@ def _probe_video_duration(file_path):
return float(duration_str)
def download_and_crop_video(video_url: str, output_path: str) -> str | None:
"""Identical ffmpeg pipeline to twitter2bsky.py."""
def download_and_crop_video(video_url: str, output_path: str):
temp_input = output_path.replace(".mp4", "_source.mp4")
temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4")
temp_output = output_path.replace(".mp4", "_compressed.mp4")
@@ -895,8 +953,10 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None:
"ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input,
]
result = subprocess.run(download_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS)
result = subprocess.run(
download_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if result.returncode != 0:
logging.error(f"❌ ffmpeg download failed:\n{result.stderr}")
return None
@@ -914,7 +974,6 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None:
end_time = min(end_time, duration - 0.05)
end_time = max(end_time, 0.1)
from moviepy import VideoFileClip
video_clip = VideoFileClip(temp_input)
try:
if hasattr(video_clip, "subclipped"):
@@ -923,8 +982,13 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None:
cropped = video_clip.subclip(0, end_time)
try:
cropped.write_videofile(
temp_trimmed, codec="libx264", audio_codec="aac",
preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None,
temp_trimmed,
codec="libx264",
audio_codec="aac",
preset="veryfast",
bitrate="1800k",
audio_bitrate="128k",
logger=None,
)
finally:
cropped.close()
@@ -943,8 +1007,10 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None:
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart", temp_output,
]
result = subprocess.run(compress_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS)
result = subprocess.run(
compress_cmd, capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if result.returncode != 0:
logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}")
return None
@@ -985,7 +1051,7 @@ def sync_feeds(args):
tiktoks = scrape_tiktoks_via_playwright(
args.tiktok_handle,
locale=bsky_langs[0] if bsky_langs else "en-US",
locale=bsky_langs[0] if bsky_langs else "es-ES",
)
if not tiktoks:
@@ -1004,14 +1070,10 @@ def sync_feeds(args):
bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT,
)
too_old_cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS)
# --- Build candidates ---
candidates = []
for tiktok in reversed(tiktoks):
try:
# TikTok grid doesn't expose timestamps reliably —
# use state-based dedup as primary guard
canonical_url = canonicalize_tiktok_url(tiktok.post_url)
if canonical_url and canonical_url in state.get("posted_videos", {}):
logging.info(f"⚡ Early skip (already in state): {canonical_url}")
@@ -1037,12 +1099,18 @@ def sync_feeds(args):
is_dup_state, reason = candidate_matches_state(candidate, state)
if is_dup_state:
logging.info(f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}")
logging.info(
f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}"
)
continue
is_dup_bsky, reason = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
is_dup_bsky, reason = candidate_matches_existing_bsky(
candidate, recent_bsky_posts
)
if is_dup_bsky:
logging.info(f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}")
logging.info(
f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}"
)
continue
candidates.append(candidate)
@@ -1050,7 +1118,9 @@ def sync_feeds(args):
except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate: {e}")
logging.info(f"📬 {len(candidates)} new TikTok videos to post after dedup.")
logging.info(
f"📬 {len(candidates)} new TikTok videos to post after dedup."
)
if not candidates:
logging.info("✅ Nothing new to post.")
@@ -1085,14 +1155,16 @@ def sync_feeds(args):
if dry_run:
logging.info(f" 📄 Caption: {raw_text[:200]}")
remember_posted_video(state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}")
remember_posted_video(
state, candidate,
bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}",
)
save_state(state, STATE_PATH)
new_posts += 1
continue
real_video_url = candidate.get("resolved_video_url")
video_embed = None
video_blob = None
if real_video_url:
temp_base = make_unique_video_temp_base(tiktok.post_url)
@@ -1104,7 +1176,9 @@ def sync_feeds(args):
candidate["resolved_video_hash"] = video_hash
owner = _cache.video_hash_owner.get(video_hash)
if owner and owner != candidate["video_id"]:
logging.warning(f"⚠️ Video hash owned by another video. Skipping.")
logging.warning(
"⚠️ Video hash owned by another video. Skipping."
)
else:
_cache.video_hash_owner[video_hash] = candidate["video_id"]
video_blob = get_blob_from_file(cropped_path, bsky_client)
@@ -1117,23 +1191,29 @@ def sync_feeds(args):
remove_file_quietly(f"{temp_base}_trimmed.mp4")
remove_file_quietly(f"{temp_base}_compressed.mp4")
else:
logging.warning(f"⚠️ Could not resolve video URL for {tiktok.post_url}")
logging.warning(
f"⚠️ Could not resolve video URL for {tiktok.post_url}"
)
try:
rich_text = make_rich(raw_text)
if video_embed:
post_result = send_post_with_retry(
bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs,
bsky_client,
text=rich_text,
embed=video_embed,
langs=bsky_langs,
)
post_mode = "video"
else:
# Fallback: post caption as text-only with link to TikTok
fallback_text = make_rich(
f"{raw_text}\n\n{tiktok.post_url}".strip()
)
post_result = send_post_with_retry(
bsky_client, text=fallback_text, langs=bsky_langs,
bsky_client,
text=fallback_text,
langs=bsky_langs,
)
post_mode = "text_only_fallback"
@@ -1151,7 +1231,9 @@ def sync_feeds(args):
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1
logging.info(f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}")
logging.info(
f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}"
)
time.sleep(5)
except Exception as e:
@@ -1167,19 +1249,34 @@ def main():
load_dotenv()
parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync")
parser.add_argument("--tiktok-handle", help="TikTok account handle to scrape (without @)")
parser.add_argument(
"--tiktok-handle",
help="TikTok account handle to scrape (without @)",
)
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
parser.add_argument("--bsky-password", help="Your Bluesky app password")
parser.add_argument("--bsky-base-url", help="Bluesky PDS base URL", default=None)
parser.add_argument("--bsky-langs", help="Comma-separated language codes", default=None)
parser.add_argument(
"--bsky-base-url",
help="Bluesky PDS base URL",
default=None,
)
parser.add_argument(
"--bsky-langs",
help="Comma-separated language codes (e.g. es,en)",
default=None,
)
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.bsky_base_url = args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL
args.bsky_base_url = (
args.bsky_base_url
or os.getenv("BSKY_BASE_URL")
or DEFAULT_BSKY_BASE_URL
)
raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS")
args.bsky_langs = (
@@ -1195,7 +1292,7 @@ def main():
if not args.bsky_password:
missing.append("--bsky-password / BSKY_APP_PASSWORD")
if missing:
logging.error(f"❌ Missing: {', '.join(missing)}")
logging.error(f"❌ Missing required arguments: {', '.join(missing)}")
return
logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}")