diff --git a/tiktok2bsky.py b/tiktok2bsky.py index 2df98fb..e94eeb2 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -24,15 +24,11 @@ import grapheme # --- Configuration --- LOG_PATH = "tiktok2bsky.log" STATE_PATH = "tiktok2bsky_state.json" +SCRAPE_VIDEO_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 VIDEO_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 300 -DEFAULT_BSKY_LANGS = ["ca"] - -TIKTOK_PAGE_LOAD_WAIT_S = 5.0 # was 3.0 — increased for slower grid render -TIKTOK_MAX_SCROLLS = 8 # was 5 — more scrolls = more videos discovered -SCRAPE_VIDEO_LIMIT = 30 # was 15 - +DEFAULT_BSKY_LANGS = ["es"] VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 @@ -67,12 +63,39 @@ FFPROBE_TIMEOUT_SECONDS = 15 DEFAULT_BSKY_BASE_URL = "https://bsky.social" SESSION_FILE_PERMISSIONS = 0o600 -TIKTOK_SCROLL_PAUSE_S = 2.5 # pause between scrolls to let videos load -TIKTOK_PAGE_LOAD_WAIT_S = 3.0 # initial wait after profile page loads +TIKTOK_PAGE_LOAD_WAIT_S = 5.0 # increased from 3.0 +TIKTOK_SCROLL_PAUSE_S = 2.5 +TIKTOK_MAX_SCROLLS = 8 # increased from 5 +TIKTOK_BANNER_WAIT_S = 3.0 # wait after dismissing cookie banner + DYNAMIC_ALT_MAX_LENGTH = 150 TRUNCATE_MIN_PREFIX_CHARS = 20 ORPHAN_DIGIT_MAX_DIGITS = 3 +# --- Cookie banner selectors (Spanish + English) --- +GDPR_SELECTORS = [ + 'button:has-text("Permitir todas")', # ← exact text shown on screen + 'button:has-text("Rechazar cookies opcionales")', + 'button:has-text("Entendido")', + 'button:has-text("Aceptar todo")', + 'button:has-text("Accept all")', + 'button:has-text("Got it")', + 'button:has-text("Decline optional")', + '[data-e2e="cookie-banner-accept"]', + '[id*="accept"]', + '[class*="accept-btn"]', +] + +# --- Video grid selectors --- +GRID_SELECTORS = ( + '[data-e2e="user-post-item"], ' + '[class*="DivItemContainerV2"], ' + 'a[href*="/video/"], ' + '[class*="video-feed"], ' + 'div[class*="VideoFeed"], ' + '[class*="DivVideoFeedV2"]' +) + # --- Logging Setup --- logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", @@ -90,7 +113,7 @@ class _RunCache: self.url_validity: dict = {} self.video_hash_owner: dict = {} self.video_url_owner: dict = {} - self.locale: str = "en-US" + self.locale: str = "es-ES" def clear(self): self.url_validity.clear() @@ -115,14 +138,14 @@ class ScrapedMedia: class ScrapedTikTok: """Mirrors ScrapedTweet from twitter2bsky.py.""" def __init__(self, created_on, text, video_url, post_url=None, thumbnail_url=None): - self.created_on = created_on # ISO8601 string or arrow-parseable - self.text = text # caption / description - self.post_url = post_url # https://www.tiktok.com/@user/video/123 + self.created_on = created_on + self.text = text + self.post_url = post_url self.thumbnail_url = thumbnail_url self.media = [ScrapedMedia(video_url, "video")] if video_url else [] -# --- Helpers (shared with twitter2bsky.py pattern) --- +# --- Helpers --- def sha256_file(path, chunk_size=1024 * 1024): h = hashlib.sha256() with open(path, "rb") as f: @@ -184,7 +207,6 @@ def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): def extract_tiktok_video_id(post_url): - """Extract numeric video ID from a TikTok URL.""" if not post_url: return None match = re.search(r"/video/(\d+)", post_url) @@ -192,7 +214,6 @@ def extract_tiktok_video_id(post_url): def canonicalize_tiktok_url(url): - """Normalize TikTok URL to a stable canonical form.""" if not url: return None match = re.search( @@ -251,25 +272,33 @@ def build_text_media_key(normalized_text, media_fingerprint): ).hexdigest() -# --- Bluesky login / retry helpers (identical pattern to twitter2bsky.py) --- +# --- Bluesky login / retry helpers --- def is_rate_limited_error(e): t = repr(e).lower() return "429" in t or "ratelimitexceeded" in t or "too many requests" in t + def is_auth_error(e): t = repr(e).lower() return "401" in t or "403" in t or "invalid identifier" in t + def is_transient_error(e): - signals = ["InvokeTimeoutError","ReadTimeout","WriteTimeout", - "RemoteProtocolError","ConnectError","503","502","504"] + signals = [ + "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", + "RemoteProtocolError", "ConnectError", "503", "502", "504", + ] return any(s in repr(e) for s in signals) + def is_network_error(e): - signals = ["ConnectError","RemoteProtocolError","ReadTimeout", - "WriteTimeout","TimeoutException","503","502","504"] + signals = [ + "ConnectError", "RemoteProtocolError", "ReadTimeout", + "WriteTimeout", "TimeoutException", "503", "502", "504", + ] return any(s in repr(e) for s in signals) + def get_rate_limit_wait_seconds(e, default_delay): try: headers = getattr(e, "headers", None) or {} @@ -302,9 +331,14 @@ def create_bsky_client(base_url, handle, password): raise RuntimeError("Bluesky login failed after all retries.") -# --- State management (identical pattern) --- +# --- State management --- def default_state(): - return {"version": 1, "posted_videos": {}, "posted_by_bsky_uri": {}, "updated_at": None} + return { + "version": 1, + "posted_videos": {}, + "posted_by_bsky_uri": {}, + "updated_at": None, + } def load_state(state_path=STATE_PATH): @@ -373,11 +407,17 @@ def prune_state(state, max_entries=5000): posted = state.get("posted_videos", {}) if len(posted) <= max_entries: return state - sortable = sorted(posted.items(), key=lambda x: x[1].get("posted_at", ""), reverse=True) + sortable = sorted( + posted.items(), + key=lambda x: x[1].get("posted_at", ""), + reverse=True, + ) keep = {k for k, _ in sortable[:max_entries]} state["posted_videos"] = {k: v for k, v in posted.items() if k in keep} state["posted_by_bsky_uri"] = { - uri: k for uri, k in state.get("posted_by_bsky_uri", {}).items() if k in keep + uri: k + for uri, k in state.get("posted_by_bsky_uri", {}).items() + if k in keep } return state @@ -414,12 +454,15 @@ def candidate_matches_existing_bsky(candidate, recent_bsky_posts): for existing in recent_bsky_posts: if candidate["text_media_key"] == existing["text_media_key"]: return True, "bsky:text_media_fingerprint" - if candidate["normalized_text"] and candidate["normalized_text"] == existing["normalized_text"]: + if ( + candidate["normalized_text"] + and candidate["normalized_text"] == existing["normalized_text"] + ): return True, "bsky:normalized_text" return False, None -# --- Upload / blob helpers (same as twitter2bsky.py) --- +# --- Upload / blob helpers --- def upload_blob_with_retry(client, binary_data, media_label="media"): last_exception = None transient_attempts = 0 @@ -430,16 +473,26 @@ def upload_blob_with_retry(client, binary_data, media_label="media"): except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): - wait = min(BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY) + wait = min( + BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_BLOB_UPLOAD_MAX_DELAY, + ) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: - logging.warning(f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s.") + logging.warning( + f"⏳ Blob upload rate-limited. Retry {attempt} after {wait}s." + ) time.sleep(wait) continue break - if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: + if ( + is_transient_error(e) + and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES + ): transient_attempts += 1 wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts - logging.warning(f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s.") + logging.warning( + f"⏳ Transient blob upload error. Retry {transient_attempts} after {wait}s." + ) time.sleep(wait) continue logging.warning(f"Could not upload {media_label}: {repr(e)}") @@ -456,7 +509,10 @@ def send_post_with_retry(client, **kwargs): except Exception as e: last_exception = e if "429" in str(e) or "RateLimitExceeded" in str(e): - wait = min(BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY) + wait = min( + BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_SEND_POST_MAX_DELAY, + ) if attempt < BSKY_SEND_POST_MAX_RETRIES: time.sleep(wait) continue @@ -475,7 +531,9 @@ def get_blob_from_file(file_path, client): return None size_mb = os.path.getsize(file_path) / (1024 * 1024) if size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: - logging.warning(f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB") + logging.warning( + f"File too large: {size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB" + ) return None with open(file_path, "rb") as f: data = f.read() @@ -489,7 +547,9 @@ def build_video_embed(video_blob, alt_text): try: return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) except AttributeError: - logging.error("❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") + logging.error( + "❌ atproto version does not support AppBskyEmbedVideo. Upgrade atproto." + ) return None @@ -535,30 +595,35 @@ def make_rich(content): # --- TikTok Scraping --- -# --- TikTok Scraping --- -def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> list: +def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "es-ES") -> list: """ Scrape recent TikTok videos from a public profile using Playwright. No login required for public profiles. - Returns a list of ScrapedTikTok objects. Fixes applied: - 1. Aggressive GDPR/consent banner dismissal (Spanish + English) - 2. Stealth headers: timezone, locale, sec-ch-ua, webdriver flag hidden - 3. playwright-stealth applied before navigation - 4. Broader + longer grid selector wait (30s, more selectors) + 1. Aggressive GDPR/cookie banner dismissal — Spanish + English, + waits TIKTOK_BANNER_WAIT_S after click for grid to render. + 2. Stealth headers: Windows Chrome UA, Europe/Madrid timezone, + es-ES locale, sec-ch-ua headers, navigator.webdriver hidden. + 3. playwright-stealth applied before navigation (graceful fallback + if not installed). + 4. Broader grid selector list + 30s timeout + continues with scroll + even if selector times out instead of hard-failing. """ tiktoks = [] profile_url = f"https://www.tiktok.com/@{target_handle.lstrip('@')}" - # playwright-stealth is optional but strongly recommended + # FIX 3 — playwright-stealth (optional but strongly recommended) try: from playwright_stealth import stealth_sync USE_STEALTH = True logging.info("🥷 playwright-stealth available — stealth mode ON") except ImportError: USE_STEALTH = False - logging.warning("⚠️ playwright-stealth not installed — running without stealth") + logging.warning( + "⚠️ playwright-stealth not installed — running without stealth. " + "Run: pip install playwright-stealth" + ) with sync_playwright() as p: browser = p.chromium.launch( @@ -573,7 +638,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> ], ) - # FIX 2 — Fake a real Windows Chrome browser with Spanish locale + Madrid timezone + # FIX 2 — Fake a real Windows Chrome with Spanish locale + Madrid timezone context = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " @@ -592,7 +657,10 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", - "Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', + "Sec-Ch-Ua": ( + '"Chromium";v="124", "Google Chrome";v="124", ' + '"Not-A.Brand";v="99"' + ), "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', }, @@ -605,7 +673,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> stealth_sync(page) logging.info("🥷 Stealth patches applied.") - # FIX 2 — Hide webdriver flag + fake plugins/languages via init script + # FIX 2 — Hide webdriver flag + fake plugins/languages page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { @@ -630,49 +698,36 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> logging.info(f"🌐 Navigating to TikTok profile: {profile_url}") page.goto(profile_url, wait_until="domcontentloaded", timeout=40000) - # FIX 1 — Wait longer for initial page render (was 3.0s) - time.sleep(TIKTOK_PAGE_LOAD_WAIT_S + 2) + # FIX 1 — Wait for page to settle before looking for banner + time.sleep(TIKTOK_PAGE_LOAD_WAIT_S) - # FIX 1 — Aggressive GDPR/consent banner dismissal (Spanish + English) - GDPR_SELECTORS = [ - 'button:has-text("Entendido")', - 'button:has-text("Aceptar todo")', - 'button:has-text("Accept all")', - 'button:has-text("Got it")', - 'button:has-text("Decline optional")', - '[data-e2e="cookie-banner-accept"]', - '[id*="accept"]', - '[class*="accept-btn"]', - ] + # FIX 1 — Dismiss cookie/consent banner BEFORE waiting for grid + banner_dismissed = False for selector in GDPR_SELECTORS: try: btn = page.locator(selector).first if btn.is_visible(timeout=3000): btn.click() - logging.info(f"✅ Dismissed banner: {selector}") - time.sleep(2) + logging.info(f"✅ Dismissed cookie banner: {selector}") + time.sleep(TIKTOK_BANNER_WAIT_S) # wait for grid to render + banner_dismissed = True break except Exception: pass - # FIX 4 — Broader selector list + longer timeout (30s, was 20s) - GRID_SELECTORS = ( - '[data-e2e="user-post-item"], ' - '[class*="DivItemContainerV2"], ' - 'a[href*="/video/"], ' - '[class*="video-feed"], ' - 'div[class*="VideoFeed"], ' - '[class*="DivVideoFeedV2"]' - ) + if not banner_dismissed: + logging.info("ℹ️ No cookie banner found — continuing.") + + # FIX 4 — Broader selector + longer timeout (30s) + soft fail try: page.wait_for_selector(GRID_SELECTORS, timeout=30000) logging.info("✅ TikTok video grid detected.") except Exception: - # FIX 4 — Don't give up immediately: try scrolling anyway logging.warning( - "⚠️ Grid selector timed out — attempting scroll anyway " - "(grid may still be partially loaded)" + "⚠️ Grid selector timed out after 30s — " + "attempting scroll anyway (grid may be partially loaded)" ) + take_error_screenshot(page, "tiktok_grid_timeout") # Scroll to load more videos for scroll_i in range(TIKTOK_MAX_SCROLLS): @@ -689,7 +744,10 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> if not video_links: take_error_screenshot(page, "tiktok_no_video_links") - logging.error("❌ No video links found after scroll. TikTok may be blocking.") + logging.error( + "❌ No video links found after scroll. " + "TikTok may still be blocking — check screenshot." + ) browser.close() return [] @@ -714,7 +772,7 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> continue seen_urls.add(canonical) - # Try to get caption from the card itself + # Try to get caption from the card caption = "" try: card = link.locator("..").first @@ -763,12 +821,12 @@ def scrape_tiktoks_via_playwright(target_handle: str, locale: str = "en-US") -> logging.info(f"✅ Scraped {len(tiktoks)} TikTok videos.") return tiktoks -# --- Video extraction --- -def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None) -> str | None: + +# --- Video URL extraction --- +def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = None): """ Open a single TikTok video page in an isolated context and intercept the actual MP4/HLS stream URL from network responses. - Mirrors extract_video_url_from_tweet_page_isolated() in twitter2bsky.py. """ ctx = None page = None @@ -789,7 +847,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No content_type = (response.headers.get("content-type") or "").lower() url_l = url.lower() - # Skip audio-only and segment files if ".m4s" in url_l or "/aud/" in url_l or "mp4a" in url_l: return @@ -808,11 +865,11 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No try: ctx = browser.new_context( user_agent=( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" + "Chrome/124.0.0.0 Safari/537.36" ), - viewport={"width": 1920, "height": 1080}, + viewport={"width": 1366, "height": 768}, ) page = ctx.new_page() page.on("response", handle_response) @@ -821,7 +878,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No page.goto(post_url, wait_until="domcontentloaded", timeout=40000) time.sleep(2) - # Try clicking the video player to trigger stream loading for selector in ['[data-e2e="video-player"]', "video", '[class*="Video"]']: try: player = page.locator(selector).first @@ -831,7 +887,6 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No except Exception: pass - # Wait up to 10s for a stream URL to appear for _ in range(10): if current_best(): break @@ -858,11 +913,15 @@ def extract_tiktok_video_url_isolated(browser, post_url: str, video_id: str = No pass -# --- Video download + compress (same ffmpeg pipeline as twitter2bsky.py) --- +# --- Video download + compress --- def _probe_video_duration(file_path): result = subprocess.run( - ["ffprobe", "-v", "error", "-show_entries", "format=duration", - "-of", "default=noprint_wrappers=1:nokey=1", file_path], + [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + file_path, + ], capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS, ) if result.returncode != 0: @@ -873,8 +932,7 @@ def _probe_video_duration(file_path): return float(duration_str) -def download_and_crop_video(video_url: str, output_path: str) -> str | None: - """Identical ffmpeg pipeline to twitter2bsky.py.""" +def download_and_crop_video(video_url: str, output_path: str): temp_input = output_path.replace(".mp4", "_source.mp4") temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") temp_output = output_path.replace(".mp4", "_compressed.mp4") @@ -895,8 +953,10 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None: "ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input, ] - result = subprocess.run(download_cmd, capture_output=True, text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS) + result = subprocess.run( + download_cmd, capture_output=True, text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) if result.returncode != 0: logging.error(f"❌ ffmpeg download failed:\n{result.stderr}") return None @@ -914,7 +974,6 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None: end_time = min(end_time, duration - 0.05) end_time = max(end_time, 0.1) - from moviepy import VideoFileClip video_clip = VideoFileClip(temp_input) try: if hasattr(video_clip, "subclipped"): @@ -923,8 +982,13 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None: cropped = video_clip.subclip(0, end_time) try: cropped.write_videofile( - temp_trimmed, codec="libx264", audio_codec="aac", - preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None, + temp_trimmed, + codec="libx264", + audio_codec="aac", + preset="veryfast", + bitrate="1800k", + audio_bitrate="128k", + logger=None, ) finally: cropped.close() @@ -943,8 +1007,10 @@ def download_and_crop_video(video_url: str, output_path: str) -> str | None: "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", temp_output, ] - result = subprocess.run(compress_cmd, capture_output=True, text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS) + result = subprocess.run( + compress_cmd, capture_output=True, text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) if result.returncode != 0: logging.error(f"❌ ffmpeg compression failed:\n{result.stderr}") return None @@ -985,7 +1051,7 @@ def sync_feeds(args): tiktoks = scrape_tiktoks_via_playwright( args.tiktok_handle, - locale=bsky_langs[0] if bsky_langs else "en-US", + locale=bsky_langs[0] if bsky_langs else "es-ES", ) if not tiktoks: @@ -1004,14 +1070,10 @@ def sync_feeds(args): bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT, ) - too_old_cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS) - # --- Build candidates --- candidates = [] for tiktok in reversed(tiktoks): try: - # TikTok grid doesn't expose timestamps reliably — - # use state-based dedup as primary guard canonical_url = canonicalize_tiktok_url(tiktok.post_url) if canonical_url and canonical_url in state.get("posted_videos", {}): logging.info(f"⚡ Early skip (already in state): {canonical_url}") @@ -1037,12 +1099,18 @@ def sync_feeds(args): is_dup_state, reason = candidate_matches_state(candidate, state) if is_dup_state: - logging.info(f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}") + logging.info( + f"⏭️ Skipping (state duplicate: {reason}): {canonical_url}" + ) continue - is_dup_bsky, reason = candidate_matches_existing_bsky(candidate, recent_bsky_posts) + is_dup_bsky, reason = candidate_matches_existing_bsky( + candidate, recent_bsky_posts + ) if is_dup_bsky: - logging.info(f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}") + logging.info( + f"⏭️ Skipping (Bluesky duplicate: {reason}): {canonical_url}" + ) continue candidates.append(candidate) @@ -1050,7 +1118,9 @@ def sync_feeds(args): except Exception as e: logging.warning(f"⚠️ Failed to prepare candidate: {e}") - logging.info(f"📬 {len(candidates)} new TikTok videos to post after dedup.") + logging.info( + f"📬 {len(candidates)} new TikTok videos to post after dedup." + ) if not candidates: logging.info("✅ Nothing new to post.") @@ -1085,14 +1155,16 @@ def sync_feeds(args): if dry_run: logging.info(f" 📄 Caption: {raw_text[:200]}") - remember_posted_video(state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}") + remember_posted_video( + state, candidate, + bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", + ) save_state(state, STATE_PATH) new_posts += 1 continue real_video_url = candidate.get("resolved_video_url") video_embed = None - video_blob = None if real_video_url: temp_base = make_unique_video_temp_base(tiktok.post_url) @@ -1104,7 +1176,9 @@ def sync_feeds(args): candidate["resolved_video_hash"] = video_hash owner = _cache.video_hash_owner.get(video_hash) if owner and owner != candidate["video_id"]: - logging.warning(f"⚠️ Video hash owned by another video. Skipping.") + logging.warning( + "⚠️ Video hash owned by another video. Skipping." + ) else: _cache.video_hash_owner[video_hash] = candidate["video_id"] video_blob = get_blob_from_file(cropped_path, bsky_client) @@ -1117,23 +1191,29 @@ def sync_feeds(args): remove_file_quietly(f"{temp_base}_trimmed.mp4") remove_file_quietly(f"{temp_base}_compressed.mp4") else: - logging.warning(f"⚠️ Could not resolve video URL for {tiktok.post_url}") + logging.warning( + f"⚠️ Could not resolve video URL for {tiktok.post_url}" + ) try: rich_text = make_rich(raw_text) if video_embed: post_result = send_post_with_retry( - bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs, + bsky_client, + text=rich_text, + embed=video_embed, + langs=bsky_langs, ) post_mode = "video" else: - # Fallback: post caption as text-only with link to TikTok fallback_text = make_rich( f"{raw_text}\n\n{tiktok.post_url}".strip() ) post_result = send_post_with_retry( - bsky_client, text=fallback_text, langs=bsky_langs, + bsky_client, + text=fallback_text, + langs=bsky_langs, ) post_mode = "text_only_fallback" @@ -1151,7 +1231,9 @@ def sync_feeds(args): recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] new_posts += 1 - logging.info(f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}") + logging.info( + f"✅ Posted TikTok to Bluesky [{post_mode}]: {raw_text[:80]}" + ) time.sleep(5) except Exception as e: @@ -1167,19 +1249,34 @@ def main(): load_dotenv() parser = argparse.ArgumentParser(description="TikTok to Bluesky Sync") - parser.add_argument("--tiktok-handle", help="TikTok account handle to scrape (without @)") + parser.add_argument( + "--tiktok-handle", + help="TikTok account handle to scrape (without @)", + ) parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") - parser.add_argument("--bsky-base-url", help="Bluesky PDS base URL", default=None) - parser.add_argument("--bsky-langs", help="Comma-separated language codes", default=None) + parser.add_argument( + "--bsky-base-url", + help="Bluesky PDS base URL", + default=None, + ) + parser.add_argument( + "--bsky-langs", + help="Comma-separated language codes (e.g. es,en)", + default=None, + ) parser.add_argument("--dry-run", action="store_true", default=False) args = parser.parse_args() args.tiktok_handle = args.tiktok_handle or os.getenv("TIKTOK_HANDLE") - args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") + args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") - args.bsky_base_url = args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL + args.bsky_base_url = ( + args.bsky_base_url + or os.getenv("BSKY_BASE_URL") + or DEFAULT_BSKY_BASE_URL + ) raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") args.bsky_langs = ( @@ -1195,7 +1292,7 @@ def main(): if not args.bsky_password: missing.append("--bsky-password / BSKY_APP_PASSWORD") if missing: - logging.error(f"❌ Missing: {', '.join(missing)}") + logging.error(f"❌ Missing required arguments: {', '.join(missing)}") return logging.info(f"🤖 TikTok→Bluesky bot started. Scraping @{args.tiktok_handle}")