Compare commits

...

3 Commits

Author SHA1 Message Date
Guillem Hernandez Sola
33412c7469 Added all 2026-05-20 07:17:06 +02:00
Guillem Hernandez Sola
6d4cfbd4b5 Changes 2026-05-20 07:16:07 +02:00
Guillem Hernandez Sola
7cddbd057a Fixes for today 2026-05-20 07:13:45 +02:00

View File

@@ -78,24 +78,26 @@ VIDEO_MAX_AGE_DAYS = 3
VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s
# Bluesky login retry config
BSKY_LOGIN_MAX_RETRIES = 4
BSKY_LOGIN_BASE_DELAY = 15.0
BSKY_LOGIN_MAX_DELAY = 120.0
BSKY_LOGIN_JITTER_MAX = 10.0
# ── Bluesky login retry config (ported from twitter2bsky.py) ─────────────────
BSKY_LOGIN_MAX_RETRIES = 6
BSKY_LOGIN_BASE_DELAY = 15.0
BSKY_LOGIN_MAX_DELAY = 600.0
BSKY_LOGIN_JITTER_MAX = 5.0
BSKY_LOGIN_RATE_LIMIT_DELAY = 90.0 # minimum wait on 429
BSKY_LOGIN_RATE_LIMIT_MAX_DELAY = 600.0 # maximum wait on 429
# Bluesky upload retry config
# ── Bluesky upload retry config ───────────────────────────────────────────────
BSKY_UPLOAD_MAX_RETRIES = 5
BSKY_UPLOAD_BASE_DELAY = 10.0
BSKY_UPLOAD_MAX_DELAY = 120.0
BSKY_UPLOAD_JITTER_MAX = 5.0
# Playwright scraping config
# ── Playwright scraping config ────────────────────────────────────────────────
PLAYWRIGHT_TIMEOUT_MS = 30_000
PLAYWRIGHT_SLOW_MO = 50
PLAYWRIGHT_MAX_RELOADS = 3
# TikTok selectors
# ── TikTok selectors ──────────────────────────────────────────────────────────
TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]'
TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]'
TIKTOK_BANNER_SELS = [
@@ -254,14 +256,10 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None:
for c in cookies:
domain = c.get("domain", ".tiktok.com")
# Netscape format requires domain to start with a dot for
# include_subdomains=TRUE to work correctly
include_sub = "TRUE" if domain.startswith(".") else "FALSE"
path = c.get("path", "/")
secure = "TRUE" if c.get("secure", False) else "FALSE"
expiry = int(
c.get("expirationDate") or c.get("expires") or 0
)
expiry = int(c.get("expirationDate") or c.get("expires") or 0)
name = c.get("name", "")
value = c.get("value", "")
@@ -285,144 +283,212 @@ def convert_json_cookies_to_netscape(json_path: str) -> str | None:
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky error classification helpers
# Bluesky error classification (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def _bsky_error_text(error_obj) -> str:
"""Normalised lowercase repr for pattern matching."""
return repr(error_obj).lower()
def is_rate_limited_error(error_obj) -> bool:
text = repr(error_obj).lower()
text = _bsky_error_text(error_obj)
return (
"429" in text
"429" in text
or "ratelimitexceeded" in text
or "too many requests" in text
or "rate limit" in text
or "rate limit" in text
or "ratelimit" in text
)
def is_auth_error(error_obj) -> bool:
text = repr(error_obj).lower()
text = _bsky_error_text(error_obj)
return (
"401" in text
or "403" in text
"401" in text
or "403" in text
or "invalid identifier" in text
or "invalid password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
or "expiredtoken" in text
or "accounttakedown" in text
or "invalid identifier or password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
)
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"ConnectError",
"RemoteProtocolError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"503",
"502",
"504",
"ConnectionResetError",
"ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "ConnectionResetError",
"503", "502", "504",
]
return any(sig in text for sig in signals)
return any(s in text for s in signals)
def is_transient_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
"InvokeTimeoutError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "RemoteProtocolError", "ConnectError",
"503", "502", "504",
]
return any(sig in text for sig in signals)
return any(s in text for s in signals)
def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float:
"""
Parse rate-limit response headers and return a bounded wait time in seconds.
Extract the server-requested wait time from rate-limit error headers.
Checks (in order):
1. error_obj.headers dict — Retry-After, X-RateLimit-After, RateLimit-Reset
2. repr(error_obj) text — same keys embedded as strings
3. Falls back to default_delay
Ported from twitter2bsky.py.
"""
now_ts = int(time.time())
# ── 1. Live headers object ────────────────────────────────────────────
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
for key in ("retry-after", "Retry-After"):
if headers.get(key):
return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY)
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("x-ratelimit-after", "X-RateLimit-After"):
if headers.get(key):
return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY)
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("ratelimit-reset", "RateLimit-Reset"):
if headers.get(key):
wait = max(int(headers[key]) - now_ts + 1, default_delay)
return min(wait, BSKY_LOGIN_MAX_DELAY)
val = headers.get(key)
if val:
wait = max(int(val) - now_ts + 2, default_delay)
return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
except Exception:
pass
# ── 2. repr() string fallback ─────────────────────────────────────────
text = repr(error_obj)
for pattern, is_timestamp in [
(r"'retry-after':\s*'(\d+)'", False),
(r"'x-ratelimit-after':\s*'(\d+)'", False),
(r"'ratelimit-reset':\s*'(\d+)'", True),
for pattern, is_ts in [
(r"['\"]retry-after['\"]\s*:\s*['\"](\d+)['\"]", False),
(r"['\"]x-ratelimit-after['\"]\s*:\s*['\"](\d+)['\"]", False),
(r"['\"]ratelimit-reset['\"]\s*:\s*['\"](\d+)['\"]", True),
(r"retry.?after[=:\s]+(\d+)", False),
]:
m = re.search(pattern, text, re.IGNORECASE)
if m:
val = int(m.group(1))
if is_timestamp:
wait = max(val - int(time.time()) + 1, default_delay)
return min(wait, BSKY_LOGIN_MAX_DELAY)
return min(max(val, 1), BSKY_LOGIN_MAX_DELAY)
if is_ts:
wait = max(val - now_ts + 2, default_delay)
return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
return min(max(val, 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
return default_delay
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky client
# Bluesky client — improved login (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
logging.info(f"🔐 Connecting Bluesky client via base URL: {base_url}")
"""
Authenticate with Bluesky with full retry logic ported from twitter2bsky.py:
• 429 / rate-limit → honour Retry-After header; wait up to 600s
• auth errors → fail immediately (retrying won't help)
• network/transient → exponential backoff with jitter
• other errors → exponential backoff with jitter
• exhausted retries → raise so Jenkins marks the build FAILURE
"""
logging.info(f"🔐 Connecting Bluesky client → {base_url}")
client = Client(base_url=base_url)
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
attempt = 0
last_error = None
while attempt < BSKY_LOGIN_MAX_RETRIES:
attempt += 1
logging.info(
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} "
f"for {handle}"
)
try:
logging.info(
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}"
)
client.login(handle, app_password)
# Fetch profile to confirm the session is fully live
client.me = client.get_profile(handle)
logging.info(f"✅ Bluesky login successful as {handle}")
return client
except Exception as e:
logging.warning(
f"⚠️ Bluesky login {type(e).__name__}: {e} "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})"
)
if is_rate_limited_error(e):
delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY)
jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⏳ Bluesky login rate-limited (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). "
f"Retrying in {wait:.1f}s."
)
time.sleep(wait)
elif attempt < BSKY_LOGIN_MAX_RETRIES:
delay = min(BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), BSKY_LOGIN_MAX_DELAY)
jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(f"⏳ Retrying login in {wait:.1f}s.")
time.sleep(wait)
else:
last_error = e
err_detail = f"{type(e).__name__}: {e}"
# ── Auth errors: no point retrying ───────────────────────────
if is_auth_error(e):
logging.error(
f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts."
f"❌ Bluesky login auth error (will not retry): {err_detail}"
)
raise
raise RuntimeError("Bluesky login failed: exhausted all retries.")
# ── Rate-limited (429) ────────────────────────────────────────
if is_rate_limited_error(e):
raw_wait = get_rate_limit_wait_seconds(e, BSKY_LOGIN_RATE_LIMIT_DELAY)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = min(raw_wait + jitter, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
logging.warning(
f"⏳ Bluesky login rate-limited (attempt {attempt}/"
f"{BSKY_LOGIN_MAX_RETRIES}). "
f"Waiting {wait:.1f}s (server requested {raw_wait:.0f}s)."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
continue
# ── Network / transient errors ────────────────────────────────
if is_network_error(e) or is_transient_error(e):
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_LOGIN_MAX_DELAY,
)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⚠️ Bluesky login network/transient error "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): "
f"{err_detail}. Retrying in {wait:.1f}s."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
continue
# ── Unknown errors ────────────────────────────────────────────
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_LOGIN_MAX_DELAY,
)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⚠️ Bluesky login failed "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): "
f"{err_detail}. Retrying in {wait:.1f}s."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
logging.error(
f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts. "
f"Last error: {type(last_error).__name__}: {last_error}"
)
raise RuntimeError(
f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: "
f"{last_error}"
)
# ─────────────────────────────────────────────────────────────────────────────
@@ -463,7 +529,7 @@ def compress_video(
• post-encode size guard — rejects file if still over limit
"""
if max_size_bytes is None:
max_size_bytes = 20 * 1024 * 1024 # fallback
max_size_bytes = 20 * 1024 * 1024
try:
duration = get_video_duration(input_path)
@@ -477,7 +543,6 @@ def compress_video(
trim_to = min(duration, max_duration)
# Target 85% of the size budget to leave headroom for container overhead
target_bits = max_size_bytes * 8 * 0.85
total_kbps = int(target_bits / trim_to / 1000)
audio_kbps = 96
@@ -493,10 +558,6 @@ def compress_video(
"ffmpeg", "-y",
"-i", input_path,
"-t", str(trim_to),
# Scale to 720p max, then pad to even dimensions.
# The pad filter is required because libx264 needs width/height
# divisible by 2. Portrait TikTok videos (9:16) would otherwise
# produce odd widths like 405px and crash the encoder.
"-vf", (
"scale='min(1280,iw)':'min(720,ih)'"
":force_original_aspect_ratio=decrease,"
@@ -504,7 +565,7 @@ def compress_video(
),
"-c:v", "libx264",
"-b:v", f"{video_kbps}k",
"-maxrate", f"{video_kbps}k", # hard ceiling — no burst above target
"-maxrate", f"{video_kbps}k",
"-bufsize", f"{video_kbps * 2}k",
"-c:a", "aac",
"-b:a", f"{audio_kbps}k",
@@ -520,7 +581,6 @@ def compress_video(
final_size = os.path.getsize(output_path)
# Reject if still over the hard limit
if final_size > max_size_bytes:
logging.error(
f"❌ Compressed file still too large: "
@@ -572,7 +632,6 @@ def download_video_ytdlp(
"""
Download a TikTok video using yt-dlp with browser impersonation.
Accepts a Netscape-format cookie file path (not JSON).
Returns True on success, False on failure.
"""
impersonate = get_best_impersonation_target()
@@ -633,7 +692,7 @@ def upload_video_to_bluesky(
) -> object | None:
"""
Upload a video file to Bluesky as a blob.
Exception is always logged as type(e).__name__: e for full visibility.
All exceptions logged as type(e).__name__: e for full visibility.
"""
size_mb = os.path.getsize(video_path) / 1024 / 1024
logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...")
@@ -717,6 +776,87 @@ def dismiss_overlays(page) -> None:
pass
def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict]:
"""
Inner scraping loop shared by both the stealth and no-stealth paths.
Returns a list of video dicts.
"""
videos = []
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(f"⚠️ Video grid not found on attempt {attempt}.")
ts = int(time.time())
try:
page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png")
logging.info(
f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(f"✅ Playwright scraped {len(videos)} videos.")
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png")
except Exception:
pass
time.sleep(3)
return videos
def scrape_tiktok_profile_playwright(
handle: str,
cookies: list,
@@ -724,10 +864,11 @@ def scrape_tiktok_profile_playwright(
) -> list[dict]:
"""
Scrape the most recent video URLs from a TikTok profile page using Playwright.
Returns a list of dicts with keys: video_id, url, timestamp.
Stealth fix: playwright-stealth v2.x must wrap the page via a context manager
on new_page(), not via .apply() or .use_sync() after the fact.
Stealth handling:
v1.x → stealth_sync(page) after new_page()
v2.x → Stealth() used as context manager; page created inside it
none → plain page, no stealth
"""
profile_url = f"https://www.tiktok.com/@{handle}"
logging.info(f"🕷️ Scraping TikTok profile: {profile_url}")
@@ -756,221 +897,58 @@ def scrape_tiktok_profile_playwright(
inject_cookies_into_context(context, cookies)
# ── Stealth application ───────────────────────────────────────────
# v1.x: stealth_sync(page) — called after new_page()
# v2.x: context manager on new_page — page must be created inside
# the Stealth() context, NOT wrapped after the fact.
# Stealth().use_sync(page) returns a SyncWrappingContextManager,
# not a Page — calling .goto() on it crashes.
# ─────────────────────────────────────────────────────────────────
page = None
if _STEALTH_V2 is None:
logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.")
page = context.new_page()
elif _STEALTH_V2:
# v2.x — use as context manager so the page is created inside it
# ── Stealth v2.x — page must be created inside the context manager ──
if _STEALTH_V2 is True:
try:
stealth_instance = Stealth()
with stealth_instance(context) as stealthy_context:
page = stealthy_context.new_page()
logging.info("🥷 playwright-stealth v2.x applied (context manager).")
# Run the scraping loop inside the context manager scope
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile "
f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(
f"⚠️ Video grid not found on attempt {attempt}."
)
ts = int(time.time())
page.screenshot(
path=f"screenshot_no_grid_{attempt}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{attempt}_{ts}.png"
)
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(
f"✅ Playwright scraped {len(videos)} videos."
)
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(
path=f"screenshot_error_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
videos = _run_playwright_scrape_loop(page, profile_url, limit)
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v2.x context manager failed: "
f"{type(e).__name__}: {e}. Falling back to no-stealth page."
f"⚠️ playwright-stealth v2.x failed: {type(e).__name__}: {e}. "
f"Retrying without stealth."
)
# Fall through to no-stealth path below
page = context.new_page()
videos = _run_playwright_scrape_loop(page, profile_url, limit)
else:
# v1.x — create page then apply stealth
# ── Stealth v1.x ──────────────────────────────────────────────────
elif _STEALTH_V2 is False:
page = context.new_page()
try:
stealth_sync(page)
logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).")
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v1.x failed: "
f"{type(e).__name__}: {e}. Continuing without stealth."
f"⚠️ playwright-stealth v1.x failed: {type(e).__name__}: {e}. "
f"Continuing without stealth."
)
videos = _run_playwright_scrape_loop(page, profile_url, limit)
# ── Scraping loop for v1.x and no-stealth paths ───────────────────
# (v2.x runs its loop inside the context manager above)
if page is not None and not videos and _STEALTH_V2 is not True:
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile "
f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(
f"⚠️ Video grid not found on attempt {attempt}."
)
ts = int(time.time())
page.screenshot(
path=f"screenshot_no_grid_{attempt}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{attempt}_{ts}.png"
)
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(f"✅ Playwright scraped {len(videos)} videos.")
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(
path=f"screenshot_error_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
# ── No stealth available ──────────────────────────────────────────
else:
logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.")
page = context.new_page()
videos = _run_playwright_scrape_loop(page, profile_url, limit)
if not videos:
logging.warning(
f"⚠️ Video grid not found on attempt {PLAYWRIGHT_MAX_RELOADS}."
f"⚠️ Video grid not found after {PLAYWRIGHT_MAX_RELOADS} attempts."
)
ts = int(time.time())
try:
if page:
page.screenshot(
path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
page.screenshot(
path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
except Exception:
pass
# ── Cleanup ───────────────────────────────────────────────────────
for obj in (page, context, browser):
try:
if obj:
@@ -992,7 +970,6 @@ def scrape_tiktok_profile_ytdlp(
"""
Fallback: use yt-dlp to extract the video list from a TikTok profile.
Accepts a Netscape-format cookie file path (not JSON).
Returns a list of dicts with keys: video_id, url, timestamp.
"""
import yt_dlp
@@ -1060,7 +1037,7 @@ def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> s
url = video_info.get("url", "")
if desc:
url_len = len(url) + 1 # +1 for newline
url_len = len(url) + 1
max_desc = max_len - url_len
if len(desc) > max_desc:
desc = desc[: max_desc - 1] + ""
@@ -1097,7 +1074,6 @@ def process_videos(
logging.info(f"⏭️ Already posted: {video_id}")
continue
# Age filter (only when timestamp is available)
ts = video.get("timestamp")
if ts:
try:
@@ -1150,7 +1126,6 @@ def process_videos(
if ok:
mark_as_posted(video_id, state, meta={"url": video_url})
posted_count += 1
# Brief pause between posts to avoid rate limiting
time.sleep(random.uniform(2.0, 5.0))
return posted_count
@@ -1163,21 +1138,9 @@ def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Cross-post TikTok videos to Bluesky."
)
parser.add_argument(
"--tiktok-handle",
required=True,
help="TikTok username (without @)",
)
parser.add_argument(
"--bsky-handle",
required=True,
help="Bluesky handle",
)
parser.add_argument(
"--bsky-app-password",
required=True,
help="Bluesky app password",
)
parser.add_argument("--tiktok-handle", required=True)
parser.add_argument("--bsky-handle", required=True)
parser.add_argument("--bsky-app-password", required=True)
parser.add_argument(
"--bsky-base-url",
default=DEFAULT_BSKY_BASE_URL,
@@ -1207,7 +1170,6 @@ def main():
load_dotenv()
args = parse_args()
# Fix 2 — resolve video size limit based on PDS
video_max_size_bytes = get_video_size_limit(args.bsky_base_url)
logging.info("=" * 60)
@@ -1230,17 +1192,17 @@ def main():
args.bsky_base_url,
)
# Convert JSON cookies → Netscape format for yt-dlp
# Playwright uses the JSON cookies directly via inject_cookies_into_context()
# yt-dlp requires Netscape .txt format — convert once and reuse
# Convert JSON cookies → Netscape format once for all yt-dlp calls
netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path)
if netscape_cookies_path:
logging.info(f"🍪 Netscape cookie file ready: {netscape_cookies_path}")
else:
logging.warning("⚠️ Could not create Netscape cookie file. yt-dlp will run without cookies.")
logging.warning(
"⚠️ Could not create Netscape cookie file. "
"yt-dlp will run without cookies."
)
try:
# Scrape TikTok profile
logging.info(f"🔄 Scraping @{args.tiktok_handle}...")
cookies = load_cookies_from_file(args.cookies_path)
@@ -1290,7 +1252,9 @@ def main():
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
try:
os.remove(netscape_cookies_path)
logging.info(f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}")
logging.info(
f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}"
)
except Exception as e:
logging.warning(f"⚠️ Could not remove Netscape cookie file: {e}")