This commit is contained in:
Guillem Hernandez Sola
2026-05-20 09:01:37 +02:00
parent 33412c7469
commit 04384ec91c

View File

@@ -25,7 +25,6 @@ import subprocess
import sys
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
import arrow
@@ -37,18 +36,19 @@ from playwright.sync_api import sync_playwright
# ─────────────────────────────────────────────────────────────────────────────
# playwright-stealth: detect installed version
# v2.x (2.0.x) has a completely unstable API — we skip stealth for it and
# rely on browser launch args instead. v1.x stealth_sync works fine.
# ─────────────────────────────────────────────────────────────────────────────
_STEALTH_V2 = None # None = not available at all
_STEALTH_SYNC = None # will hold the stealth_sync callable if v1.x is present
try:
from playwright_stealth import stealth_sync
_STEALTH_V2 = False
from playwright_stealth import stealth_sync as _stealth_sync_import
_STEALTH_SYNC = _stealth_sync_import
logging.getLogger(__name__).debug("playwright-stealth v1.x detected (stealth_sync)")
except ImportError:
try:
from playwright_stealth import Stealth
_STEALTH_V2 = True
except ImportError:
pass # stealth disabled — warning emitted at runtime
# v2.x is installed but its API is too unstable to use reliably —
# browser launch args provide equivalent protection for our use case
pass
# ─────────────────────────────────────────────────────────────────────────────
@@ -120,12 +120,10 @@ TIKTOK_COOKIE_MODAL_SELS = [
'[class*="cookie"] button',
'[id*="cookie"] button',
]
TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]'
TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")'
# ─────────────────────────────────────────────────────────────────────────────
# Fix 2 — Dynamic video size limit based on PDS
# Dynamic video size limit based on PDS
# ─────────────────────────────────────────────────────────────────────────────
def get_video_size_limit(bsky_base_url: str) -> int:
"""
@@ -165,7 +163,6 @@ def save_state(state: dict):
for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]:
del posted[old_key]
state["posted"] = posted
try:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
@@ -189,7 +186,6 @@ def mark_as_posted(video_id: str, state: dict, meta: dict = None):
# Cookie helpers
# ─────────────────────────────────────────────────────────────────────────────
def load_cookies_from_file(path: str) -> list:
"""Load cookies from a JSON file."""
if not os.path.exists(path):
logging.warning(f"⚠️ Cookie file not found: {path}")
return []
@@ -204,7 +200,6 @@ def load_cookies_from_file(path: str) -> list:
def inject_cookies_into_context(context, cookies: list):
"""Inject a list of cookie dicts into a Playwright browser context."""
if not cookies:
return
playwright_cookies = []
@@ -224,7 +219,9 @@ def inject_cookies_into_context(context, cookies: list):
playwright_cookies.append(entry)
try:
context.add_cookies(playwright_cookies)
logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.")
logging.info(
f"🍪 Injected {len(playwright_cookies)} cookies into browser context."
)
except Exception as e:
logging.warning(f"⚠️ Could not inject cookies: {e}")
@@ -232,25 +229,16 @@ def inject_cookies_into_context(context, cookies: list):
def convert_json_cookies_to_netscape(json_path: str) -> str | None:
"""
Convert a JSON cookie file (browser extension format) to a Netscape
cookie file that yt-dlp can consume.
Returns the path to a temporary Netscape file, or None on failure.
The caller is responsible for deleting the file when done.
Netscape format columns (tab-separated):
domain include_subdomains path secure expiry name value
cookie file that yt-dlp can consume. Returns temp file path or None.
Caller must delete the file when done.
"""
try:
with open(json_path, "r", encoding="utf-8") as f:
cookies = json.load(f)
tmp = tempfile.NamedTemporaryFile(
mode="w",
suffix=".txt",
delete=False,
encoding="utf-8",
mode="w", suffix=".txt", delete=False, encoding="utf-8"
)
tmp.write("# Netscape HTTP Cookie File\n")
tmp.write("# Generated by tiktok2bsky.py\n\n")
@@ -262,7 +250,6 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None:
expiry = int(c.get("expirationDate") or c.get("expires") or 0)
name = c.get("name", "")
value = c.get("value", "")
tmp.write(
f"{domain}\t{include_sub}\t{path}\t"
f"{secure}\t{expiry}\t{name}\t{value}\n"
@@ -286,7 +273,6 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None:
# Bluesky error classification (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def _bsky_error_text(error_obj) -> str:
"""Normalised lowercase repr for pattern matching."""
return repr(error_obj).lower()
@@ -318,61 +304,47 @@ def is_auth_error(error_obj) -> bool:
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
return any(s in text for s in [
"ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "ConnectionResetError",
"503", "502", "504",
]
return any(s in text for s in signals)
])
def is_transient_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
return any(s in text for s in [
"InvokeTimeoutError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "RemoteProtocolError", "ConnectError",
"503", "502", "504",
]
return any(s in text for s in signals)
])
def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float:
"""
Extract the server-requested wait time from rate-limit error headers.
Checks (in order):
1. error_obj.headers dict — Retry-After, X-RateLimit-After, RateLimit-Reset
2. repr(error_obj) text — same keys embedded as strings
3. Falls back to default_delay
Ported from twitter2bsky.py.
"""
now_ts = int(time.time())
# ── 1. Live headers object ────────────────────────────────────────────
try:
headers = getattr(error_obj, "headers", None) or {}
for key in ("retry-after", "Retry-After"):
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("x-ratelimit-after", "X-RateLimit-After"):
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("ratelimit-reset", "RateLimit-Reset"):
val = headers.get(key)
if val:
wait = max(int(val) - now_ts + 2, default_delay)
return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
except Exception:
pass
# ── 2. repr() string fallback ─────────────────────────────────────────
text = repr(error_obj)
for pattern, is_ts in [
(r"['\"]retry-after['\"]\s*:\s*['\"](\d+)['\"]", False),
@@ -392,34 +364,29 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float:
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky client — improved login (ported from twitter2bsky.py)
# Bluesky client — robust login (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
"""
Authenticate with Bluesky with full retry logic ported from twitter2bsky.py:
Authenticate with Bluesky with full retry logic:
• 429 / rate-limit → honour Retry-After header; wait up to 600s
• auth errors → fail immediately (retrying won't help)
• network/transient → exponential backoff with jitter
• other errors → exponential backoff with jitter
• exhausted retries → raise so Jenkins marks the build FAILURE
"""
logging.info(f"🔐 Connecting Bluesky client → {base_url}")
client = Client(base_url=base_url)
attempt = 0
last_error = None
while attempt < BSKY_LOGIN_MAX_RETRIES:
attempt += 1
logging.info(
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} "
f"for {handle}"
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}"
)
try:
client.login(handle, app_password)
# Fetch profile to confirm the session is fully live
client.me = client.get_profile(handle)
logging.info(f"✅ Bluesky login successful as {handle}")
return client
@@ -428,14 +395,14 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
last_error = e
err_detail = f"{type(e).__name__}: {e}"
# ── Auth errors: no point retrying ───────────────────────────
# Auth errors no point retrying
if is_auth_error(e):
logging.error(
f"❌ Bluesky login auth error (will not retry): {err_detail}"
)
raise
# ── Rate-limited (429) ────────────────────────────────────────
# Rate-limited (429)
if is_rate_limited_error(e):
raw_wait = get_rate_limit_wait_seconds(e, BSKY_LOGIN_RATE_LIMIT_DELAY)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
@@ -449,7 +416,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
time.sleep(wait)
continue
# ── Network / transient errors ────────────────────────────────
# Network / transient errors
if is_network_error(e) or is_transient_error(e):
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
@@ -466,7 +433,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
time.sleep(wait)
continue
# ── Unknown errors ────────────────────────────────────────────
# Unknown errors
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_LOGIN_MAX_DELAY,
@@ -486,8 +453,7 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
f"Last error: {type(last_error).__name__}: {last_error}"
)
raise RuntimeError(
f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: "
f"{last_error}"
f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: {last_error}"
)
@@ -495,7 +461,6 @@ def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
# Video helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_video_duration(path: str) -> float:
"""Return video duration in seconds via ffprobe, or 0.0 on failure."""
try:
result = subprocess.run(
[
@@ -504,9 +469,7 @@ def get_video_duration(path: str) -> float:
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
capture_output=True,
text=True,
timeout=15,
capture_output=True, text=True, timeout=15,
)
return float(result.stdout.strip())
except Exception as e:
@@ -520,29 +483,18 @@ def compress_video(
max_duration: int = VIDEO_MAX_DURATION_S,
max_size_bytes: int = None,
) -> bool:
"""
Re-encode input_path → output_path using libx264, targeting max_size_bytes.
Fixes applied:
• pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement)
• -maxrate == -b:v — hard ceiling, no burst above target
• post-encode size guard — rejects file if still over limit
"""
if max_size_bytes is None:
max_size_bytes = 20 * 1024 * 1024
try:
duration = get_video_duration(input_path)
if duration <= 0:
logging.error(
f"❌ compress_video: invalid duration={duration} "
f"for {input_path} ({os.path.getsize(input_path)} bytes)"
f"❌ compress_video: invalid duration={duration} for {input_path}"
)
return False
trim_to = min(duration, max_duration)
target_bits = max_size_bytes * 8 * 0.85
total_kbps = int(target_bits / trim_to / 1000)
audio_kbps = 96
@@ -580,12 +532,11 @@ def compress_video(
return False
final_size = os.path.getsize(output_path)
if final_size > max_size_bytes:
logging.error(
f"❌ Compressed file still too large: "
f"{final_size / 1024 / 1024:.1f} MB > "
f"{max_size_bytes / 1024 / 1024:.0f} MB limit. Skipping."
f"{max_size_bytes / 1024 / 1024:.0f} MB. Skipping."
)
return False
@@ -604,23 +555,65 @@ def compress_video(
# ─────────────────────────────────────────────────────────────────────────────
def get_best_impersonation_target() -> str | None:
"""
Dynamically select the best available curl_cffi impersonation target.
Returns None if curl_cffi is not installed or no target is available.
Ask yt-dlp directly which impersonation targets are actually available
in the current environment. This is the only reliable method —
curl_cffi's BrowserType enum values change between versions and do not
map 1:1 to yt-dlp's target names.
Returns the best available target string, or None if none are available.
"""
try:
from curl_cffi.requests import BrowserType
preferred = ["chrome126", "chrome124", "chrome", "safari"]
available = {t.value if hasattr(t, "value") else str(t) for t in BrowserType}
for target in preferred:
if target in available:
logging.info(f"🎭 yt-dlp impersonation target: {target}")
return target
if available:
target = sorted(available)[0]
logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}")
return target
import yt_dlp
# yt-dlp exposes available impersonation targets via
# ImpersonateTarget.supported_targets() in newer builds,
# or via YoutubeDL._impersonate_target_key in older ones.
# The safest cross-version approach is to instantiate a YoutubeDL
# object with quiet=True and inspect _impersonate_targets.
with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl:
# _impersonate_targets is a dict of {ImpersonateTarget: handler}
targets = getattr(ydl, "_impersonate_targets", None)
if not targets:
logging.warning(
"⚠️ yt-dlp: no impersonation targets available in this environment."
)
return None
# Convert to string representations and pick the best one
preferred = ["chrome", "safari", "firefox", "edge"]
available_strs = []
for t in targets.keys():
# ImpersonateTarget has .client and optionally .version
client = getattr(t, "client", None) or str(t)
version = getattr(t, "version", None)
label = f"{client}-{version}" if version else str(client)
available_strs.append((label.lower(), t))
logging.info(
f"🎭 yt-dlp available impersonation targets: "
f"{[s for s, _ in available_strs]}"
)
# Pick highest-versioned chrome first, then others
chrome_targets = sorted(
[(s, t) for s, t in available_strs if "chrome" in s],
key=lambda x: x[0],
reverse=True,
)
if chrome_targets:
best_label, best_target = chrome_targets[0]
logging.info(f"🎭 Selected impersonation target: {best_label}")
return best_target # return the actual ImpersonateTarget object
# Fallback to any available target
best_label, best_target = available_strs[0]
logging.info(f"🎭 Selected impersonation target (fallback): {best_label}")
return best_target
except Exception as e:
logging.warning(f"⚠️ Could not check impersonation targets: {e}")
logging.warning(
f"⚠️ Could not determine yt-dlp impersonation targets: "
f"{type(e).__name__}: {e}"
)
return None
@@ -629,10 +622,6 @@ def download_video_ytdlp(
output_path: str,
netscape_cookies_path: str = None,
) -> bool:
"""
Download a TikTok video using yt-dlp with browser impersonation.
Accepts a Netscape-format cookie file path (not JSON).
"""
impersonate = get_best_impersonation_target()
ydl_opts = {
@@ -646,7 +635,7 @@ def download_video_ytdlp(
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate:
if impersonate is not None:
ydl_opts["impersonate"] = impersonate
try:
@@ -658,7 +647,7 @@ def download_video_ytdlp(
size_mb = os.path.getsize(output_path) / 1024 / 1024
logging.info(f"✅ yt-dlp download OK: {size_mb:.1f} MB")
return True
else:
logging.warning(
f"⚠️ yt-dlp output too small or missing: {output_path} "
f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)"
@@ -666,9 +655,7 @@ def download_video_ytdlp(
return False
except Exception as e:
logging.error(
f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}"
)
logging.error(f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}")
return False
@@ -677,7 +664,6 @@ def download_video(
output_path: str,
netscape_cookies_path: str = None,
) -> bool:
"""Download a TikTok video via yt-dlp with browser impersonation."""
logging.info(f"⬇️ Downloading: {url}")
return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path)
@@ -690,10 +676,6 @@ def upload_video_to_bluesky(
video_path: str,
video_id: str,
) -> object | None:
"""
Upload a video file to Bluesky as a blob.
All exceptions logged as type(e).__name__: e for full visibility.
"""
size_mb = os.path.getsize(video_path) / 1024 / 1024
logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...")
@@ -738,19 +720,12 @@ def post_video_to_bluesky(
langs: list[str],
video_id: str,
) -> bool:
"""Create a Bluesky post embedding the uploaded video blob."""
from atproto import models
try:
video_embed = models.AppBskyEmbedVideo.Main(video=blob)
client.send_post(
text=caption,
embed=video_embed,
langs=langs,
)
client.send_post(text=caption, embed=video_embed, langs=langs)
logging.info(f"✅ Posted video {video_id} to Bluesky.")
return True
except Exception as e:
logging.error(
f"❌ Failed to post video {video_id} to Bluesky: "
@@ -763,7 +738,6 @@ def post_video_to_bluesky(
# TikTok scraping — Playwright
# ─────────────────────────────────────────────────────────────────────────────
def dismiss_overlays(page) -> None:
"""Try to dismiss cookie banners and modal overlays."""
all_sels = TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS
for sel in all_sels:
try:
@@ -777,10 +751,7 @@ def dismiss_overlays(page) -> None:
def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict]:
"""
Inner scraping loop shared by both the stealth and no-stealth paths.
Returns a list of video dicts.
"""
"""Inner scraping loop — shared by stealth and no-stealth paths."""
videos = []
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
@@ -798,8 +769,7 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
TIKTOK_VIDEO_GRID_SEL, timeout=PLAYWRIGHT_TIMEOUT_MS
)
except Exception:
pass
@@ -844,8 +814,7 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
f"⚠️ Playwright attempt {attempt} error: {type(e).__name__}: {e}"
)
ts = int(time.time())
try:
@@ -865,10 +834,10 @@ def scrape_tiktok_profile_playwright(
"""
Scrape the most recent video URLs from a TikTok profile page using Playwright.
Stealth handling:
v1.x → stealth_sync(page) after new_page()
v2.x → Stealth() used as context manager; page created inside it
none → plain page, no stealth
Stealth strategy:
v1.x → stealth_sync(page) after new_page() — works reliably
v2.x → skipped entirely; v2.0.x API is unstable across patch versions.
Browser launch args provide equivalent bot-detection evasion.
"""
profile_url = f"https://www.tiktok.com/@{handle}"
logging.info(f"🕷️ Scraping TikTok profile: {profile_url}")
@@ -883,6 +852,8 @@ def scrape_tiktok_profile_playwright(
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
],
)
context = browser.new_context(
@@ -893,44 +864,31 @@ def scrape_tiktok_profile_playwright(
),
viewport={"width": 1280, "height": 900},
locale="es-ES",
# Mask automation signals at the context level
extra_http_headers={
"Accept-Language": "es-ES,es;q=0.9,en;q=0.8",
},
)
inject_cookies_into_context(context, cookies)
# ── Stealth v2.x — page must be created inside the context manager ──
if _STEALTH_V2 is True:
try:
stealth_instance = Stealth()
with stealth_instance(context) as stealthy_context:
page = stealthy_context.new_page()
logging.info("🥷 playwright-stealth v2.x applied (context manager).")
videos = _run_playwright_scrape_loop(page, profile_url, limit)
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v2.x failed: {type(e).__name__}: {e}. "
f"Retrying without stealth."
)
# Fall through to no-stealth path below
page = context.new_page()
videos = _run_playwright_scrape_loop(page, profile_url, limit)
# ── Stealth v1.x ──────────────────────────────────────────────────
elif _STEALTH_V2 is False:
page = context.new_page()
# Apply stealth v1.x if available; skip v2.x entirely
if _STEALTH_SYNC is not None:
try:
stealth_sync(page)
logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).")
_STEALTH_SYNC(page)
logging.info("🥷 playwright-stealth v1.x applied.")
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v1.x failed: {type(e).__name__}: {e}. "
f"Continuing without stealth."
)
videos = _run_playwright_scrape_loop(page, profile_url, limit)
# ── No stealth available ──────────────────────────────────────────
else:
logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.")
page = context.new_page()
logging.info(
" playwright-stealth v2.x detected — skipping (unstable API). "
"Using browser launch args for bot-detection evasion."
)
videos = _run_playwright_scrape_loop(page, profile_url, limit)
if not videos:
@@ -986,7 +944,7 @@ def scrape_tiktok_profile_ytdlp(
}
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate:
if impersonate is not None:
ydl_opts["impersonate"] = impersonate
try:
@@ -1022,9 +980,7 @@ def scrape_tiktok_profile_ytdlp(
return videos[:limit]
except Exception as e:
logging.error(
f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}"
)
logging.error(f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}")
return []
@@ -1032,17 +988,14 @@ def scrape_tiktok_profile_ytdlp(
# Caption builder
# ─────────────────────────────────────────────────────────────────────────────
def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str:
"""Build a Bluesky post caption from video metadata."""
desc = (video_info.get("description") or "").strip()
url = video_info.get("url", "")
if desc:
url_len = len(url) + 1
max_desc = max_len - url_len
if len(desc) > max_desc:
desc = desc[: max_desc - 1] + ""
return f"{desc}\n{url}"
return url
@@ -1059,10 +1012,6 @@ def process_videos(
max_age_days: int,
video_max_size_bytes: int,
) -> int:
"""
Download, compress, upload and post each new video.
Returns the count of successfully posted videos.
"""
posted_count = 0
now = arrow.utcnow()
@@ -1096,8 +1045,7 @@ def process_videos(
# 1. Download
ok = download_video(
video_url,
raw_path,
video_url, raw_path,
netscape_cookies_path=netscape_cookies_path,
)
if not ok:
@@ -1105,11 +1053,7 @@ def process_videos(
continue
# 2. Compress
ok = compress_video(
raw_path,
comp_path,
max_size_bytes=video_max_size_bytes,
)
ok = compress_video(raw_path, comp_path, max_size_bytes=video_max_size_bytes)
if not ok:
logging.error(f"❌ Compression failed for {video_id}. Skipping.")
continue
@@ -1142,25 +1086,19 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--bsky-handle", required=True)
parser.add_argument("--bsky-app-password", required=True)
parser.add_argument(
"--bsky-base-url",
default=DEFAULT_BSKY_BASE_URL,
"--bsky-base-url", default=DEFAULT_BSKY_BASE_URL,
help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})",
)
parser.add_argument(
"--bsky-langs",
nargs="+",
default=DEFAULT_BSKY_LANGS,
"--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS,
help="BCP-47 language tags for posts (default: es)",
)
parser.add_argument(
"--cookies-path",
default=TIKTOK_COOKIES_PATH,
"--cookies-path", default=TIKTOK_COOKIES_PATH,
help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})",
)
parser.add_argument(
"--max-age-days",
type=int,
default=VIDEO_MAX_AGE_DAYS,
"--max-age-days", type=int, default=VIDEO_MAX_AGE_DAYS,
help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})",
)
return parser.parse_args()
@@ -1184,13 +1122,7 @@ def main():
logging.info("=" * 60)
state = load_state()
# Connect to Bluesky
client = connect_bluesky(
args.bsky_handle,
args.bsky_app_password,
args.bsky_base_url,
)
client = connect_bluesky(args.bsky_handle, args.bsky_app_password, args.bsky_base_url)
# Convert JSON cookies → Netscape format once for all yt-dlp calls
netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path)
@@ -1207,9 +1139,7 @@ def main():
cookies = load_cookies_from_file(args.cookies_path)
videos = scrape_tiktok_profile_playwright(
args.tiktok_handle,
cookies,
limit=SCRAPE_VIDEO_LIMIT,
args.tiktok_handle, cookies, limit=SCRAPE_VIDEO_LIMIT,
)
if not videos:
@@ -1248,7 +1178,6 @@ def main():
logging.info("=" * 60)
finally:
# Always clean up the temporary Netscape cookie file
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
try:
os.remove(netscape_cookies_path)