diff --git a/tiktok2bsky.py b/tiktok2bsky.py index eac502b..b6ad65d 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -25,7 +25,6 @@ import subprocess import sys import tempfile import time -from datetime import datetime, timezone from pathlib import Path import arrow @@ -37,18 +36,19 @@ from playwright.sync_api import sync_playwright # ───────────────────────────────────────────────────────────────────────────── # playwright-stealth: detect installed version +# v2.x (2.0.x) has a completely unstable API — we skip stealth for it and +# rely on browser launch args instead. v1.x stealth_sync works fine. # ───────────────────────────────────────────────────────────────────────────── -_STEALTH_V2 = None # None = not available at all +_STEALTH_SYNC = None # will hold the stealth_sync callable if v1.x is present try: - from playwright_stealth import stealth_sync - _STEALTH_V2 = False + from playwright_stealth import stealth_sync as _stealth_sync_import + _STEALTH_SYNC = _stealth_sync_import + logging.getLogger(__name__).debug("playwright-stealth v1.x detected (stealth_sync)") except ImportError: - try: - from playwright_stealth import Stealth - _STEALTH_V2 = True - except ImportError: - pass # stealth disabled — warning emitted at runtime + # v2.x is installed but its API is too unstable to use reliably — + # browser launch args provide equivalent protection for our use case + pass # ───────────────────────────────────────────────────────────────────────────── @@ -120,12 +120,10 @@ TIKTOK_COOKIE_MODAL_SELS = [ '[class*="cookie"] button', '[id*="cookie"] button', ] -TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]' -TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")' # ───────────────────────────────────────────────────────────────────────────── -# Fix 2 — Dynamic video size limit based on PDS +# Dynamic video size limit based on PDS # ───────────────────────────────────────────────────────────────────────────── def get_video_size_limit(bsky_base_url: str) -> int: """ @@ -165,7 +163,6 @@ def save_state(state: dict): for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]: del posted[old_key] state["posted"] = posted - try: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) @@ -189,7 +186,6 @@ def mark_as_posted(video_id: str, state: dict, meta: dict = None): # Cookie helpers # ───────────────────────────────────────────────────────────────────────────── def load_cookies_from_file(path: str) -> list: - """Load cookies from a JSON file.""" if not os.path.exists(path): logging.warning(f"⚠️ Cookie file not found: {path}") return [] @@ -204,7 +200,6 @@ def load_cookies_from_file(path: str) -> list: def inject_cookies_into_context(context, cookies: list): - """Inject a list of cookie dicts into a Playwright browser context.""" if not cookies: return playwright_cookies = [] @@ -224,7 +219,9 @@ def inject_cookies_into_context(context, cookies: list): playwright_cookies.append(entry) try: context.add_cookies(playwright_cookies) - logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.") + logging.info( + f"🍪 Injected {len(playwright_cookies)} cookies into browser context." + ) except Exception as e: logging.warning(f"⚠️ Could not inject cookies: {e}") @@ -232,25 +229,16 @@ def inject_cookies_into_context(context, cookies: list): def convert_json_cookies_to_netscape(json_path: str) -> str | None: """ Convert a JSON cookie file (browser extension format) to a Netscape - cookie file that yt-dlp can consume. - - Returns the path to a temporary Netscape file, or None on failure. - The caller is responsible for deleting the file when done. - - Netscape format columns (tab-separated): - domain include_subdomains path secure expiry name value + cookie file that yt-dlp can consume. Returns temp file path or None. + Caller must delete the file when done. """ try: with open(json_path, "r", encoding="utf-8") as f: cookies = json.load(f) tmp = tempfile.NamedTemporaryFile( - mode="w", - suffix=".txt", - delete=False, - encoding="utf-8", + mode="w", suffix=".txt", delete=False, encoding="utf-8" ) - tmp.write("# Netscape HTTP Cookie File\n") tmp.write("# Generated by tiktok2bsky.py\n\n") @@ -262,7 +250,6 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None: expiry = int(c.get("expirationDate") or c.get("expires") or 0) name = c.get("name", "") value = c.get("value", "") - tmp.write( f"{domain}\t{include_sub}\t{path}\t" f"{secure}\t{expiry}\t{name}\t{value}\n" @@ -286,14 +273,13 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None: # Bluesky error classification (ported from twitter2bsky.py) # ───────────────────────────────────────────────────────────────────────────── def _bsky_error_text(error_obj) -> str: - """Normalised lowercase repr for pattern matching.""" return repr(error_obj).lower() def is_rate_limited_error(error_obj) -> bool: text = _bsky_error_text(error_obj) return ( - "429" in text + "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text @@ -304,75 +290,61 @@ def is_rate_limited_error(error_obj) -> bool: def is_auth_error(error_obj) -> bool: text = _bsky_error_text(error_obj) return ( - "401" in text - or "403" in text - or "invalid identifier" in text - or "invalid password" in text - or "authenticationrequired" in text - or "invalidtoken" in text - or "expiredtoken" in text - or "accounttakedown" in text + "401" in text + or "403" in text + or "invalid identifier" in text + or "invalid password" in text + or "authenticationrequired" in text + or "invalidtoken" in text + or "expiredtoken" in text + or "accounttakedown" in text or "invalid identifier or password" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) - signals = [ + return any(s in text for s in [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "ConnectionResetError", "503", "502", "504", - ] - return any(s in text for s in signals) + ]) def is_transient_error(error_obj) -> bool: text = repr(error_obj) - signals = [ + return any(s in text for s in [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", - ] - return any(s in text for s in signals) + ]) def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: """ Extract the server-requested wait time from rate-limit error headers. - - Checks (in order): - 1. error_obj.headers dict — Retry-After, X-RateLimit-After, RateLimit-Reset - 2. repr(error_obj) text — same keys embedded as strings - 3. Falls back to default_delay - Ported from twitter2bsky.py. """ now_ts = int(time.time()) - # ── 1. Live headers object ──────────────────────────────────────────── try: headers = getattr(error_obj, "headers", None) or {} - for key in ("retry-after", "Retry-After"): val = headers.get(key) if val: return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY) - for key in ("x-ratelimit-after", "X-RateLimit-After"): val = headers.get(key) if val: return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY) - for key in ("ratelimit-reset", "RateLimit-Reset"): val = headers.get(key) if val: wait = max(int(val) - now_ts + 2, default_delay) return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY) - except Exception: pass - # ── 2. repr() string fallback ───────────────────────────────────────── text = repr(error_obj) for pattern, is_ts in [ (r"['\"]retry-after['\"]\s*:\s*['\"](\d+)['\"]", False), @@ -392,34 +364,29 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float: # ───────────────────────────────────────────────────────────────────────────── -# Bluesky client — improved login (ported from twitter2bsky.py) +# Bluesky client — robust login (ported from twitter2bsky.py) # ───────────────────────────────────────────────────────────────────────────── def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: """ - Authenticate with Bluesky with full retry logic ported from twitter2bsky.py: - + Authenticate with Bluesky with full retry logic: • 429 / rate-limit → honour Retry-After header; wait up to 600s • auth errors → fail immediately (retrying won't help) • network/transient → exponential backoff with jitter • other errors → exponential backoff with jitter - • exhausted retries → raise so Jenkins marks the build FAILURE """ logging.info(f"🔐 Connecting Bluesky client → {base_url}") - client = Client(base_url=base_url) - - attempt = 0 - last_error = None + client = Client(base_url=base_url) + attempt = 0 + last_error = None while attempt < BSKY_LOGIN_MAX_RETRIES: attempt += 1 logging.info( - f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} " - f"for {handle}" + f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}" ) try: client.login(handle, app_password) - # Fetch profile to confirm the session is fully live client.me = client.get_profile(handle) logging.info(f"✅ Bluesky login successful as {handle}") return client @@ -428,14 +395,14 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: last_error = e err_detail = f"{type(e).__name__}: {e}" - # ── Auth errors: no point retrying ─────────────────────────── + # Auth errors — no point retrying if is_auth_error(e): logging.error( f"❌ Bluesky login auth error (will not retry): {err_detail}" ) raise - # ── Rate-limited (429) ──────────────────────────────────────── + # Rate-limited (429) if is_rate_limited_error(e): raw_wait = get_rate_limit_wait_seconds(e, BSKY_LOGIN_RATE_LIMIT_DELAY) jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX) @@ -449,7 +416,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: time.sleep(wait) continue - # ── Network / transient errors ──────────────────────────────── + # Network / transient errors if is_network_error(e) or is_transient_error(e): delay = min( BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), @@ -466,7 +433,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: time.sleep(wait) continue - # ── Unknown errors ──────────────────────────────────────────── + # Unknown errors delay = min( BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), BSKY_LOGIN_MAX_DELAY, @@ -486,8 +453,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: f"Last error: {type(last_error).__name__}: {last_error}" ) raise RuntimeError( - f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: " - f"{last_error}" + f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: {last_error}" ) @@ -495,7 +461,6 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client: # Video helpers # ───────────────────────────────────────────────────────────────────────────── def get_video_duration(path: str) -> float: - """Return video duration in seconds via ffprobe, or 0.0 on failure.""" try: result = subprocess.run( [ @@ -504,9 +469,7 @@ def get_video_duration(path: str) -> float: "-of", "default=noprint_wrappers=1:nokey=1", path, ], - capture_output=True, - text=True, - timeout=15, + capture_output=True, text=True, timeout=15, ) return float(result.stdout.strip()) except Exception as e: @@ -520,29 +483,18 @@ def compress_video( max_duration: int = VIDEO_MAX_DURATION_S, max_size_bytes: int = None, ) -> bool: - """ - Re-encode input_path → output_path using libx264, targeting max_size_bytes. - - Fixes applied: - • pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement) - • -maxrate == -b:v — hard ceiling, no burst above target - • post-encode size guard — rejects file if still over limit - """ if max_size_bytes is None: max_size_bytes = 20 * 1024 * 1024 try: duration = get_video_duration(input_path) - if duration <= 0: logging.error( - f"❌ compress_video: invalid duration={duration} " - f"for {input_path} ({os.path.getsize(input_path)} bytes)" + f"❌ compress_video: invalid duration={duration} for {input_path}" ) return False - trim_to = min(duration, max_duration) - + trim_to = min(duration, max_duration) target_bits = max_size_bytes * 8 * 0.85 total_kbps = int(target_bits / trim_to / 1000) audio_kbps = 96 @@ -580,12 +532,11 @@ def compress_video( return False final_size = os.path.getsize(output_path) - if final_size > max_size_bytes: logging.error( f"❌ Compressed file still too large: " f"{final_size / 1024 / 1024:.1f} MB > " - f"{max_size_bytes / 1024 / 1024:.0f} MB limit. Skipping." + f"{max_size_bytes / 1024 / 1024:.0f} MB. Skipping." ) return False @@ -604,23 +555,65 @@ def compress_video( # ───────────────────────────────────────────────────────────────────────────── def get_best_impersonation_target() -> str | None: """ - Dynamically select the best available curl_cffi impersonation target. - Returns None if curl_cffi is not installed or no target is available. + Ask yt-dlp directly which impersonation targets are actually available + in the current environment. This is the only reliable method — + curl_cffi's BrowserType enum values change between versions and do not + map 1:1 to yt-dlp's target names. + + Returns the best available target string, or None if none are available. """ try: - from curl_cffi.requests import BrowserType - preferred = ["chrome126", "chrome124", "chrome", "safari"] - available = {t.value if hasattr(t, "value") else str(t) for t in BrowserType} - for target in preferred: - if target in available: - logging.info(f"🎭 yt-dlp impersonation target: {target}") - return target - if available: - target = sorted(available)[0] - logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}") - return target + import yt_dlp + # yt-dlp exposes available impersonation targets via + # ImpersonateTarget.supported_targets() in newer builds, + # or via YoutubeDL._impersonate_target_key in older ones. + # The safest cross-version approach is to instantiate a YoutubeDL + # object with quiet=True and inspect _impersonate_targets. + with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl: + # _impersonate_targets is a dict of {ImpersonateTarget: handler} + targets = getattr(ydl, "_impersonate_targets", None) + if not targets: + logging.warning( + "⚠️ yt-dlp: no impersonation targets available in this environment." + ) + return None + + # Convert to string representations and pick the best one + preferred = ["chrome", "safari", "firefox", "edge"] + available_strs = [] + for t in targets.keys(): + # ImpersonateTarget has .client and optionally .version + client = getattr(t, "client", None) or str(t) + version = getattr(t, "version", None) + label = f"{client}-{version}" if version else str(client) + available_strs.append((label.lower(), t)) + + logging.info( + f"🎭 yt-dlp available impersonation targets: " + f"{[s for s, _ in available_strs]}" + ) + + # Pick highest-versioned chrome first, then others + chrome_targets = sorted( + [(s, t) for s, t in available_strs if "chrome" in s], + key=lambda x: x[0], + reverse=True, + ) + if chrome_targets: + best_label, best_target = chrome_targets[0] + logging.info(f"🎭 Selected impersonation target: {best_label}") + return best_target # return the actual ImpersonateTarget object + + # Fallback to any available target + best_label, best_target = available_strs[0] + logging.info(f"🎭 Selected impersonation target (fallback): {best_label}") + return best_target + except Exception as e: - logging.warning(f"⚠️ Could not check impersonation targets: {e}") + logging.warning( + f"⚠️ Could not determine yt-dlp impersonation targets: " + f"{type(e).__name__}: {e}" + ) return None @@ -629,10 +622,6 @@ def download_video_ytdlp( output_path: str, netscape_cookies_path: str = None, ) -> bool: - """ - Download a TikTok video using yt-dlp with browser impersonation. - Accepts a Netscape-format cookie file path (not JSON). - """ impersonate = get_best_impersonation_target() ydl_opts = { @@ -646,7 +635,7 @@ def download_video_ytdlp( if netscape_cookies_path and os.path.exists(netscape_cookies_path): ydl_opts["cookiefile"] = netscape_cookies_path - if impersonate: + if impersonate is not None: ydl_opts["impersonate"] = impersonate try: @@ -658,17 +647,15 @@ def download_video_ytdlp( size_mb = os.path.getsize(output_path) / 1024 / 1024 logging.info(f"✅ yt-dlp download OK: {size_mb:.1f} MB") return True - else: - logging.warning( - f"⚠️ yt-dlp output too small or missing: {output_path} " - f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)" - ) - return False + + logging.warning( + f"⚠️ yt-dlp output too small or missing: {output_path} " + f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)" + ) + return False except Exception as e: - logging.error( - f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}" - ) + logging.error(f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}") return False @@ -677,7 +664,6 @@ def download_video( output_path: str, netscape_cookies_path: str = None, ) -> bool: - """Download a TikTok video via yt-dlp with browser impersonation.""" logging.info(f"⬇️ Downloading: {url}") return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path) @@ -690,10 +676,6 @@ def upload_video_to_bluesky( video_path: str, video_id: str, ) -> object | None: - """ - Upload a video file to Bluesky as a blob. - All exceptions logged as type(e).__name__: e for full visibility. - """ size_mb = os.path.getsize(video_path) / 1024 / 1024 logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...") @@ -738,19 +720,12 @@ def post_video_to_bluesky( langs: list[str], video_id: str, ) -> bool: - """Create a Bluesky post embedding the uploaded video blob.""" from atproto import models - try: video_embed = models.AppBskyEmbedVideo.Main(video=blob) - client.send_post( - text=caption, - embed=video_embed, - langs=langs, - ) + client.send_post(text=caption, embed=video_embed, langs=langs) logging.info(f"✅ Posted video {video_id} to Bluesky.") return True - except Exception as e: logging.error( f"❌ Failed to post video {video_id} to Bluesky: " @@ -763,7 +738,6 @@ def post_video_to_bluesky( # TikTok scraping — Playwright # ───────────────────────────────────────────────────────────────────────────── def dismiss_overlays(page) -> None: - """Try to dismiss cookie banners and modal overlays.""" all_sels = TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS for sel in all_sels: try: @@ -777,10 +751,7 @@ def dismiss_overlays(page) -> None: def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict]: - """ - Inner scraping loop shared by both the stealth and no-stealth paths. - Returns a list of video dicts. - """ + """Inner scraping loop — shared by stealth and no-stealth paths.""" videos = [] for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1): @@ -798,8 +769,7 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict try: page.wait_for_selector( - TIKTOK_VIDEO_GRID_SEL, - timeout=PLAYWRIGHT_TIMEOUT_MS, + TIKTOK_VIDEO_GRID_SEL, timeout=PLAYWRIGHT_TIMEOUT_MS ) except Exception: pass @@ -844,8 +814,7 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict except Exception as e: logging.warning( - f"⚠️ Playwright attempt {attempt} error: " - f"{type(e).__name__}: {e}" + f"⚠️ Playwright attempt {attempt} error: {type(e).__name__}: {e}" ) ts = int(time.time()) try: @@ -865,10 +834,10 @@ def scrape_tiktok_profile_playwright( """ Scrape the most recent video URLs from a TikTok profile page using Playwright. - Stealth handling: - v1.x → stealth_sync(page) after new_page() - v2.x → Stealth() used as context manager; page created inside it - none → plain page, no stealth + Stealth strategy: + v1.x → stealth_sync(page) after new_page() — works reliably + v2.x → skipped entirely; v2.0.x API is unstable across patch versions. + Browser launch args provide equivalent bot-detection evasion. """ profile_url = f"https://www.tiktok.com/@{handle}" logging.info(f"🕷️ Scraping TikTok profile: {profile_url}") @@ -883,6 +852,8 @@ def scrape_tiktok_profile_playwright( "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", ], ) context = browser.new_context( @@ -893,45 +864,32 @@ def scrape_tiktok_profile_playwright( ), viewport={"width": 1280, "height": 900}, locale="es-ES", + # Mask automation signals at the context level + extra_http_headers={ + "Accept-Language": "es-ES,es;q=0.9,en;q=0.8", + }, ) inject_cookies_into_context(context, cookies) + page = context.new_page() - # ── Stealth v2.x — page must be created inside the context manager ── - if _STEALTH_V2 is True: + # Apply stealth v1.x if available; skip v2.x entirely + if _STEALTH_SYNC is not None: try: - stealth_instance = Stealth() - with stealth_instance(context) as stealthy_context: - page = stealthy_context.new_page() - logging.info("🥷 playwright-stealth v2.x applied (context manager).") - videos = _run_playwright_scrape_loop(page, profile_url, limit) - except Exception as e: - logging.warning( - f"⚠️ playwright-stealth v2.x failed: {type(e).__name__}: {e}. " - f"Retrying without stealth." - ) - # Fall through to no-stealth path below - page = context.new_page() - videos = _run_playwright_scrape_loop(page, profile_url, limit) - - # ── Stealth v1.x ────────────────────────────────────────────────── - elif _STEALTH_V2 is False: - page = context.new_page() - try: - stealth_sync(page) - logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).") + _STEALTH_SYNC(page) + logging.info("🥷 playwright-stealth v1.x applied.") except Exception as e: logging.warning( f"⚠️ playwright-stealth v1.x failed: {type(e).__name__}: {e}. " f"Continuing without stealth." ) - videos = _run_playwright_scrape_loop(page, profile_url, limit) - - # ── No stealth available ────────────────────────────────────────── else: - logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.") - page = context.new_page() - videos = _run_playwright_scrape_loop(page, profile_url, limit) + logging.info( + "ℹ️ playwright-stealth v2.x detected — skipping (unstable API). " + "Using browser launch args for bot-detection evasion." + ) + + videos = _run_playwright_scrape_loop(page, profile_url, limit) if not videos: logging.warning( @@ -986,7 +944,7 @@ def scrape_tiktok_profile_ytdlp( } if netscape_cookies_path and os.path.exists(netscape_cookies_path): ydl_opts["cookiefile"] = netscape_cookies_path - if impersonate: + if impersonate is not None: ydl_opts["impersonate"] = impersonate try: @@ -1022,9 +980,7 @@ def scrape_tiktok_profile_ytdlp( return videos[:limit] except Exception as e: - logging.error( - f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}" - ) + logging.error(f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}") return [] @@ -1032,17 +988,14 @@ def scrape_tiktok_profile_ytdlp( # Caption builder # ───────────────────────────────────────────────────────────────────────────── def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str: - """Build a Bluesky post caption from video metadata.""" desc = (video_info.get("description") or "").strip() url = video_info.get("url", "") - if desc: url_len = len(url) + 1 max_desc = max_len - url_len if len(desc) > max_desc: desc = desc[: max_desc - 1] + "…" return f"{desc}\n{url}" - return url @@ -1059,10 +1012,6 @@ def process_videos( max_age_days: int, video_max_size_bytes: int, ) -> int: - """ - Download, compress, upload and post each new video. - Returns the count of successfully posted videos. - """ posted_count = 0 now = arrow.utcnow() @@ -1096,8 +1045,7 @@ def process_videos( # 1. Download ok = download_video( - video_url, - raw_path, + video_url, raw_path, netscape_cookies_path=netscape_cookies_path, ) if not ok: @@ -1105,11 +1053,7 @@ def process_videos( continue # 2. Compress - ok = compress_video( - raw_path, - comp_path, - max_size_bytes=video_max_size_bytes, - ) + ok = compress_video(raw_path, comp_path, max_size_bytes=video_max_size_bytes) if not ok: logging.error(f"❌ Compression failed for {video_id}. Skipping.") continue @@ -1142,25 +1086,19 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--bsky-handle", required=True) parser.add_argument("--bsky-app-password", required=True) parser.add_argument( - "--bsky-base-url", - default=DEFAULT_BSKY_BASE_URL, + "--bsky-base-url", default=DEFAULT_BSKY_BASE_URL, help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})", ) parser.add_argument( - "--bsky-langs", - nargs="+", - default=DEFAULT_BSKY_LANGS, + "--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS, help="BCP-47 language tags for posts (default: es)", ) parser.add_argument( - "--cookies-path", - default=TIKTOK_COOKIES_PATH, + "--cookies-path", default=TIKTOK_COOKIES_PATH, help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})", ) parser.add_argument( - "--max-age-days", - type=int, - default=VIDEO_MAX_AGE_DAYS, + "--max-age-days", type=int, default=VIDEO_MAX_AGE_DAYS, help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})", ) return parser.parse_args() @@ -1183,14 +1121,8 @@ def main(): logging.info(f" Cookie file : {args.cookies_path} ({cookie_status})") logging.info("=" * 60) - state = load_state() - - # Connect to Bluesky - client = connect_bluesky( - args.bsky_handle, - args.bsky_app_password, - args.bsky_base_url, - ) + state = load_state() + client = connect_bluesky(args.bsky_handle, args.bsky_app_password, args.bsky_base_url) # Convert JSON cookies → Netscape format once for all yt-dlp calls netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path) @@ -1207,9 +1139,7 @@ def main(): cookies = load_cookies_from_file(args.cookies_path) videos = scrape_tiktok_profile_playwright( - args.tiktok_handle, - cookies, - limit=SCRAPE_VIDEO_LIMIT, + args.tiktok_handle, cookies, limit=SCRAPE_VIDEO_LIMIT, ) if not videos: @@ -1248,7 +1178,6 @@ def main(): logging.info("=" * 60) finally: - # Always clean up the temporary Netscape cookie file if netscape_cookies_path and os.path.exists(netscape_cookies_path): try: os.remove(netscape_cookies_path)