Files
post2bsky/tiktok2bsky.py
Guillem Hernandez Sola 6d4cfbd4b5 Changes
2026-05-20 07:16:07 +02:00

1673 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
tiktok2bsky.py
──────────────
Scrapes recent videos from a public TikTok profile and cross-posts
them to a Bluesky account.
Usage:
python tiktok2bsky.py \
--tiktok-handle jijantesfc \
--bsky-handle jijantesfc.bsky.social \
--bsky-app-password xxxx-xxxx-xxxx-xxxx \
--bsky-base-url https://bsky.social \
--bsky-langs es \
--cookies-path tiktok_cookies.json
"""
import argparse
import json
import logging
import os
import random
import re
import subprocess
import sys
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
import arrow
import httpx
from atproto import Client
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright
# ─────────────────────────────────────────────────────────────────────────────
# playwright-stealth: detect installed version
# ─────────────────────────────────────────────────────────────────────────────
_STEALTH_V2 = None # None = not available at all
try:
from playwright_stealth import stealth_sync
_STEALTH_V2 = False
except ImportError:
try:
from playwright_stealth import Stealth
_STEALTH_V2 = True
except ImportError:
pass # stealth disabled — warning emitted at runtime
# ─────────────────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("tiktok2bsky.log", encoding="utf-8"),
],
level=logging.INFO,
)
# ─────────────────────────────────────────────────────────────────────────────
# Constants & defaults
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
DEFAULT_BSKY_LANGS = ["es"]
TIKTOK_COOKIES_PATH = "tiktok_cookies.json"
STATE_FILE = "tiktok2bsky_state.json"
STATE_MAX_ENTRIES = 5000
SCRAPE_VIDEO_LIMIT = 30
VIDEO_MAX_AGE_DAYS = 3
VIDEO_MAX_DURATION_S = 179 # Bluesky hard limit is 180s
# ── Bluesky login retry config (ported from twitter2bsky.py) ─────────────────
BSKY_LOGIN_MAX_RETRIES = 6
BSKY_LOGIN_BASE_DELAY = 15.0
BSKY_LOGIN_MAX_DELAY = 600.0
BSKY_LOGIN_JITTER_MAX = 5.0
BSKY_LOGIN_RATE_LIMIT_DELAY = 90.0 # minimum wait on 429
BSKY_LOGIN_RATE_LIMIT_MAX_DELAY = 600.0 # maximum wait on 429
# ── Bluesky upload retry config ───────────────────────────────────────────────
BSKY_UPLOAD_MAX_RETRIES = 5
BSKY_UPLOAD_BASE_DELAY = 10.0
BSKY_UPLOAD_MAX_DELAY = 120.0
BSKY_UPLOAD_JITTER_MAX = 5.0
# ── Playwright scraping config ────────────────────────────────────────────────
PLAYWRIGHT_TIMEOUT_MS = 30_000
PLAYWRIGHT_SLOW_MO = 50
PLAYWRIGHT_MAX_RELOADS = 3
<<<<<<< HEAD
# TikTok selectors
=======
# ── TikTok selectors ──────────────────────────────────────────────────────────
>>>>>>> 7cddbd0 (Fixes for today)
TIKTOK_VIDEO_GRID_SEL = '[data-e2e="user-post-item-list"]'
TIKTOK_VIDEO_ITEM_SEL = '[data-e2e="user-post-item"]'
TIKTOK_BANNER_SELS = [
'[id*="banner"]',
'[class*="banner"]',
'[data-e2e="recommend-modal-close"]',
'button:has-text("Rechazar")',
'button:has-text("Reject")',
'button:has-text("Accept")',
'button:has-text("Aceptar")',
'[aria-label="Close"]',
'[aria-label="Cerrar"]',
]
TIKTOK_COOKIE_MODAL_SELS = [
'button:has-text("Decline all")',
'button:has-text("Rechazar todo")',
'button:has-text("Reject all")',
'button:has-text("Accept all")',
'button:has-text("Aceptar todo")',
'[class*="cookie"] button',
'[id*="cookie"] button',
]
TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]'
TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")'
# ─────────────────────────────────────────────────────────────────────────────
# Fix 2 — Dynamic video size limit based on PDS
# ─────────────────────────────────────────────────────────────────────────────
def get_video_size_limit(bsky_base_url: str) -> int:
"""
bsky.social supports ~50 MB blobs. Third-party PDS instances
typically cap at 1020 MB. Use a conservative 10 MB for
anything that isn't the official PDS.
"""
if "bsky.social" in (bsky_base_url or ""):
return 20 * 1024 * 1024 # 20 MB — official PDS
return 10 * 1024 * 1024 # 10 MB — safe for third-party PDS
# ─────────────────────────────────────────────────────────────────────────────
# State management
# ─────────────────────────────────────────────────────────────────────────────
def load_state() -> dict:
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
state = json.load(f)
logging.info(
f"📂 Loaded state: {len(state.get('posted', {}))} entries."
)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state file: {e}. Starting fresh.")
return {"posted": {}}
def save_state(state: dict):
posted = state.get("posted", {})
if len(posted) > STATE_MAX_ENTRIES:
sorted_keys = sorted(
posted.keys(),
key=lambda k: posted[k].get("posted_at", ""),
)
for old_key in sorted_keys[: len(posted) - STATE_MAX_ENTRIES]:
del posted[old_key]
state["posted"] = posted
try:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
except Exception as e:
logging.error(f"❌ Could not save state: {e}")
def is_already_posted(video_id: str, state: dict) -> bool:
return video_id in state.get("posted", {})
def mark_as_posted(video_id: str, state: dict, meta: dict = None):
state.setdefault("posted", {})[video_id] = {
"posted_at": arrow.utcnow().isoformat(),
**(meta or {}),
}
save_state(state)
# ─────────────────────────────────────────────────────────────────────────────
# Cookie helpers
# ─────────────────────────────────────────────────────────────────────────────
def load_cookies_from_file(path: str) -> list:
"""Load cookies from a JSON file."""
if not os.path.exists(path):
logging.warning(f"⚠️ Cookie file not found: {path}")
return []
try:
with open(path, "r", encoding="utf-8") as f:
cookies = json.load(f)
logging.info(f"🍪 Loaded {len(cookies)} cookies from {path}")
return cookies
except Exception as e:
logging.warning(f"⚠️ Could not load cookies from {path}: {e}")
return []
def inject_cookies_into_context(context, cookies: list):
"""Inject a list of cookie dicts into a Playwright browser context."""
if not cookies:
return
playwright_cookies = []
for c in cookies:
entry = {
"name": c.get("name", ""),
"value": c.get("value", ""),
"domain": c.get("domain", ".tiktok.com"),
"path": c.get("path", "/"),
"secure": c.get("secure", False),
"httpOnly": c.get("httpOnly", False),
"sameSite": c.get("sameSite", "None"),
}
exp = c.get("expirationDate") or c.get("expires")
if exp and float(exp) > 0:
entry["expires"] = float(exp)
playwright_cookies.append(entry)
try:
context.add_cookies(playwright_cookies)
logging.info(f"🍪 Injected {len(playwright_cookies)} cookies into browser context.")
except Exception as e:
logging.warning(f"⚠️ Could not inject cookies: {e}")
def convert_json_cookies_to_netscape(json_path: str) -> str | None:
"""
Convert a JSON cookie file (browser extension format) to a Netscape
cookie file that yt-dlp can consume.
Returns the path to a temporary Netscape file, or None on failure.
The caller is responsible for deleting the file when done.
Netscape format columns (tab-separated):
domain include_subdomains path secure expiry name value
"""
try:
with open(json_path, "r", encoding="utf-8") as f:
cookies = json.load(f)
tmp = tempfile.NamedTemporaryFile(
mode="w",
suffix=".txt",
delete=False,
encoding="utf-8",
)
tmp.write("# Netscape HTTP Cookie File\n")
tmp.write("# Generated by tiktok2bsky.py\n\n")
for c in cookies:
domain = c.get("domain", ".tiktok.com")
<<<<<<< HEAD
# Netscape format requires domain to start with a dot for
# include_subdomains=TRUE to work correctly
include_sub = "TRUE" if domain.startswith(".") else "FALSE"
path = c.get("path", "/")
secure = "TRUE" if c.get("secure", False) else "FALSE"
expiry = int(
c.get("expirationDate") or c.get("expires") or 0
)
=======
include_sub = "TRUE" if domain.startswith(".") else "FALSE"
path = c.get("path", "/")
secure = "TRUE" if c.get("secure", False) else "FALSE"
expiry = int(c.get("expirationDate") or c.get("expires") or 0)
>>>>>>> 7cddbd0 (Fixes for today)
name = c.get("name", "")
value = c.get("value", "")
tmp.write(
f"{domain}\t{include_sub}\t{path}\t"
f"{secure}\t{expiry}\t{name}\t{value}\n"
)
tmp.close()
logging.info(
f"🍪 Converted {len(cookies)} cookies to Netscape format: {tmp.name}"
)
return tmp.name
except Exception as e:
logging.warning(
f"⚠️ Could not convert cookies to Netscape format: "
f"{type(e).__name__}: {e}"
)
return None
# ─────────────────────────────────────────────────────────────────────────────
<<<<<<< HEAD
# Bluesky error classification helpers
=======
# Bluesky error classification (ported from twitter2bsky.py)
>>>>>>> 7cddbd0 (Fixes for today)
# ─────────────────────────────────────────────────────────────────────────────
def _bsky_error_text(error_obj) -> str:
"""Normalised lowercase repr for pattern matching."""
return repr(error_obj).lower()
def is_rate_limited_error(error_obj) -> bool:
text = _bsky_error_text(error_obj)
return (
"429" in text
or "ratelimitexceeded" in text
or "too many requests" in text
or "rate limit" in text
or "ratelimit" in text
)
def is_auth_error(error_obj) -> bool:
text = _bsky_error_text(error_obj)
return (
"401" in text
or "403" in text
or "invalid identifier" in text
or "invalid password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
or "expiredtoken" in text
or "accounttakedown" in text
or "invalid identifier or password" in text
)
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "ConnectionResetError",
"503", "502", "504",
]
return any(s in text for s in signals)
def is_transient_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"InvokeTimeoutError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "RemoteProtocolError", "ConnectError",
"503", "502", "504",
]
return any(s in text for s in signals)
def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float:
"""
<<<<<<< HEAD
Parse rate-limit response headers and return a bounded wait time in seconds.
=======
Extract the server-requested wait time from rate-limit error headers.
Checks (in order):
1. error_obj.headers dict — Retry-After, X-RateLimit-After, RateLimit-Reset
2. repr(error_obj) text — same keys embedded as strings
3. Falls back to default_delay
Ported from twitter2bsky.py.
>>>>>>> 7cddbd0 (Fixes for today)
"""
now_ts = int(time.time())
# ── 1. Live headers object ────────────────────────────────────────────
try:
headers = getattr(error_obj, "headers", None) or {}
for key in ("retry-after", "Retry-After"):
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("x-ratelimit-after", "X-RateLimit-After"):
val = headers.get(key)
if val:
return min(max(int(val), 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
for key in ("ratelimit-reset", "RateLimit-Reset"):
val = headers.get(key)
if val:
wait = max(int(val) - now_ts + 2, default_delay)
return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
except Exception:
pass
<<<<<<< HEAD
=======
# ── 2. repr() string fallback ─────────────────────────────────────────
>>>>>>> 7cddbd0 (Fixes for today)
text = repr(error_obj)
for pattern, is_ts in [
(r"['\"]retry-after['\"]\s*:\s*['\"](\d+)['\"]", False),
(r"['\"]x-ratelimit-after['\"]\s*:\s*['\"](\d+)['\"]", False),
(r"['\"]ratelimit-reset['\"]\s*:\s*['\"](\d+)['\"]", True),
(r"retry.?after[=:\s]+(\d+)", False),
]:
m = re.search(pattern, text, re.IGNORECASE)
if m:
val = int(m.group(1))
<<<<<<< HEAD
if is_timestamp:
wait = max(val - int(time.time()) + 1, default_delay)
return min(wait, BSKY_LOGIN_MAX_DELAY)
return min(max(val, 1), BSKY_LOGIN_MAX_DELAY)
=======
if is_ts:
wait = max(val - now_ts + 2, default_delay)
return min(wait, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
return min(max(val, 1), BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
>>>>>>> 7cddbd0 (Fixes for today)
return default_delay
# ─────────────────────────────────────────────────────────────────────────────
<<<<<<< HEAD
# Bluesky client
# ─────────────────────────────────────────────────────────────────────────────
def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
logging.info(f"🔐 Connecting Bluesky client via base URL: {base_url}")
client = Client(base_url=base_url)
=======
# Bluesky client — improved login (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def connect_bluesky(handle: str, app_password: str, base_url: str) -> Client:
"""
Authenticate with Bluesky with full retry logic ported from twitter2bsky.py:
• 429 / rate-limit → honour Retry-After header; wait up to 600s
• auth errors → fail immediately (retrying won't help)
• network/transient → exponential backoff with jitter
• other errors → exponential backoff with jitter
• exhausted retries → raise so Jenkins marks the build FAILURE
"""
logging.info(f"🔐 Connecting Bluesky client → {base_url}")
client = Client(base_url=base_url)
attempt = 0
last_error = None
while attempt < BSKY_LOGIN_MAX_RETRIES:
attempt += 1
logging.info(
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} "
f"for {handle}"
)
>>>>>>> 7cddbd0 (Fixes for today)
try:
<<<<<<< HEAD
logging.info(
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}"
)
client.login(handle, app_password)
client.me = client.get_profile(handle)
logging.info(f"✅ Bluesky login successful as {handle}")
return client
except Exception as e:
logging.warning(
f"⚠️ Bluesky login {type(e).__name__}: {e} "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES})"
)
if is_rate_limited_error(e):
delay = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY)
jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⏳ Bluesky login rate-limited (attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). "
f"Retrying in {wait:.1f}s."
)
time.sleep(wait)
elif attempt < BSKY_LOGIN_MAX_RETRIES:
delay = min(BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)), BSKY_LOGIN_MAX_DELAY)
jitter = random.uniform(0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(f"⏳ Retrying login in {wait:.1f}s.")
time.sleep(wait)
else:
logging.error(
f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts."
)
raise
raise RuntimeError("Bluesky login failed: exhausted all retries.")
=======
client.login(handle, app_password)
# Fetch profile to confirm the session is fully live
client.me = client.get_profile(handle)
logging.info(f"✅ Bluesky login successful as {handle}")
return client
except Exception as e:
last_error = e
err_detail = f"{type(e).__name__}: {e}"
# ── Auth errors: no point retrying ───────────────────────────
if is_auth_error(e):
logging.error(
f"❌ Bluesky login auth error (will not retry): {err_detail}"
)
raise
# ── Rate-limited (429) ────────────────────────────────────────
if is_rate_limited_error(e):
raw_wait = get_rate_limit_wait_seconds(e, BSKY_LOGIN_RATE_LIMIT_DELAY)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = min(raw_wait + jitter, BSKY_LOGIN_RATE_LIMIT_MAX_DELAY)
logging.warning(
f"⏳ Bluesky login rate-limited (attempt {attempt}/"
f"{BSKY_LOGIN_MAX_RETRIES}). "
f"Waiting {wait:.1f}s (server requested {raw_wait:.0f}s)."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
continue
# ── Network / transient errors ────────────────────────────────
if is_network_error(e) or is_transient_error(e):
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_LOGIN_MAX_DELAY,
)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⚠️ Bluesky login network/transient error "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): "
f"{err_detail}. Retrying in {wait:.1f}s."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
continue
# ── Unknown errors ────────────────────────────────────────────
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_LOGIN_MAX_DELAY,
)
jitter = random.uniform(0.0, BSKY_LOGIN_JITTER_MAX)
wait = delay + jitter
logging.warning(
f"⚠️ Bluesky login failed "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): "
f"{err_detail}. Retrying in {wait:.1f}s."
)
if attempt < BSKY_LOGIN_MAX_RETRIES:
time.sleep(wait)
logging.error(
f"❌ Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts. "
f"Last error: {type(last_error).__name__}: {last_error}"
)
raise RuntimeError(
f"Bluesky login failed after {BSKY_LOGIN_MAX_RETRIES} attempts: "
f"{last_error}"
)
>>>>>>> 7cddbd0 (Fixes for today)
# ─────────────────────────────────────────────────────────────────────────────
# Video helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_video_duration(path: str) -> float:
"""Return video duration in seconds via ffprobe, or 0.0 on failure."""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
capture_output=True,
text=True,
timeout=15,
)
return float(result.stdout.strip())
except Exception as e:
logging.warning(f"⚠️ ffprobe failed for {path}: {e}")
return 0.0
def compress_video(
input_path: str,
output_path: str,
max_duration: int = VIDEO_MAX_DURATION_S,
max_size_bytes: int = None,
) -> bool:
"""
Re-encode input_path → output_path using libx264, targeting max_size_bytes.
Fixes applied:
• pad=ceil(iw/2)*2:ceil(ih/2)*2 — ensures even dimensions (libx264 requirement)
• -maxrate == -b:v — hard ceiling, no burst above target
• post-encode size guard — rejects file if still over limit
"""
if max_size_bytes is None:
<<<<<<< HEAD
max_size_bytes = 20 * 1024 * 1024 # fallback
=======
max_size_bytes = 20 * 1024 * 1024
>>>>>>> 7cddbd0 (Fixes for today)
try:
duration = get_video_duration(input_path)
if duration <= 0:
logging.error(
f"❌ compress_video: invalid duration={duration} "
f"for {input_path} ({os.path.getsize(input_path)} bytes)"
)
return False
trim_to = min(duration, max_duration)
<<<<<<< HEAD
# Target 85% of the size budget to leave headroom for container overhead
=======
>>>>>>> 7cddbd0 (Fixes for today)
target_bits = max_size_bytes * 8 * 0.85
total_kbps = int(target_bits / trim_to / 1000)
audio_kbps = 96
video_kbps = max(200, total_kbps - audio_kbps)
logging.info(
f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, "
f"video_bitrate={video_kbps}k "
f"(target ≤ {max_size_bytes // 1024 // 1024}MB)"
)
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-t", str(trim_to),
<<<<<<< HEAD
# Scale to 720p max, then pad to even dimensions.
# The pad filter is required because libx264 needs width/height
# divisible by 2. Portrait TikTok videos (9:16) would otherwise
# produce odd widths like 405px and crash the encoder.
=======
>>>>>>> 7cddbd0 (Fixes for today)
"-vf", (
"scale='min(1280,iw)':'min(720,ih)'"
":force_original_aspect_ratio=decrease,"
"pad=ceil(iw/2)*2:ceil(ih/2)*2"
),
"-c:v", "libx264",
"-b:v", f"{video_kbps}k",
<<<<<<< HEAD
"-maxrate", f"{video_kbps}k", # hard ceiling — no burst above target
=======
"-maxrate", f"{video_kbps}k",
>>>>>>> 7cddbd0 (Fixes for today)
"-bufsize", f"{video_kbps * 2}k",
"-c:a", "aac",
"-b:a", f"{audio_kbps}k",
"-movflags", "+faststart",
"-pix_fmt", "yuv420p",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
logging.error(f"❌ ffmpeg failed:\n{result.stderr}")
return False
final_size = os.path.getsize(output_path)
<<<<<<< HEAD
# Reject if still over the hard limit
=======
>>>>>>> 7cddbd0 (Fixes for today)
if final_size > max_size_bytes:
logging.error(
f"❌ Compressed file still too large: "
f"{final_size / 1024 / 1024:.1f} MB > "
f"{max_size_bytes / 1024 / 1024:.0f} MB limit. Skipping."
)
return False
logging.info(
f"✅ Compressed video: {final_size / 1024 / 1024:.1f} MB → {output_path}"
)
return True
except Exception as e:
logging.error(f"❌ compress_video error: {type(e).__name__}: {e}")
return False
# ─────────────────────────────────────────────────────────────────────────────
# yt-dlp helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_best_impersonation_target() -> str | None:
"""
Dynamically select the best available curl_cffi impersonation target.
Returns None if curl_cffi is not installed or no target is available.
"""
try:
from curl_cffi.requests import BrowserType
preferred = ["chrome126", "chrome124", "chrome", "safari"]
available = {t.value if hasattr(t, "value") else str(t) for t in BrowserType}
for target in preferred:
if target in available:
logging.info(f"🎭 yt-dlp impersonation target: {target}")
return target
if available:
target = sorted(available)[0]
logging.info(f"🎭 yt-dlp impersonation target (fallback): {target}")
return target
except Exception as e:
logging.warning(f"⚠️ Could not check impersonation targets: {e}")
return None
def download_video_ytdlp(
url: str,
output_path: str,
netscape_cookies_path: str = None,
) -> bool:
"""
Download a TikTok video using yt-dlp with browser impersonation.
Accepts a Netscape-format cookie file path (not JSON).
<<<<<<< HEAD
Returns True on success, False on failure.
=======
>>>>>>> 7cddbd0 (Fixes for today)
"""
impersonate = get_best_impersonation_target()
ydl_opts = {
"outtmpl": output_path,
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"quiet": False,
"no_warnings": False,
"merge_output_format": "mp4",
}
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate:
ydl_opts["impersonate"] = impersonate
try:
import yt_dlp
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
if os.path.exists(output_path) and os.path.getsize(output_path) > 50 * 1024:
size_mb = os.path.getsize(output_path) / 1024 / 1024
logging.info(f"✅ yt-dlp download OK: {size_mb:.1f} MB")
return True
else:
logging.warning(
f"⚠️ yt-dlp output too small or missing: {output_path} "
f"({os.path.getsize(output_path) if os.path.exists(output_path) else 0} bytes)"
)
return False
except Exception as e:
logging.error(
f"❌ yt-dlp download failed for {url}: {type(e).__name__}: {e}"
)
return False
def download_video(
url: str,
output_path: str,
netscape_cookies_path: str = None,
) -> bool:
"""Download a TikTok video via yt-dlp with browser impersonation."""
logging.info(f"⬇️ Downloading: {url}")
return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path)
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky upload
# ─────────────────────────────────────────────────────────────────────────────
def upload_video_to_bluesky(
client: Client,
video_path: str,
video_id: str,
) -> object | None:
"""
Upload a video file to Bluesky as a blob.
<<<<<<< HEAD
Exception is always logged as type(e).__name__: e for full visibility.
=======
All exceptions logged as type(e).__name__: e for full visibility.
>>>>>>> 7cddbd0 (Fixes for today)
"""
size_mb = os.path.getsize(video_path) / 1024 / 1024
logging.info(f"⬆️ Uploading to Bluesky ({size_mb:.1f} MB)...")
with open(video_path, "rb") as f:
video_data = f.read()
delay = BSKY_UPLOAD_BASE_DELAY
for attempt in range(1, BSKY_UPLOAD_MAX_RETRIES + 1):
try:
blob = client.upload_blob(video_data)
logging.info(f"✅ Blob uploaded successfully for {video_id}")
return blob.blob
except Exception as e:
err_detail = f"{type(e).__name__}: {e}"
if attempt >= BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: "
f"{err_detail}"
)
return None
logging.warning(
f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} "
f"failed: {err_detail}. Retrying in {delay:.1f}s..."
)
time.sleep(delay + random.uniform(0, BSKY_UPLOAD_JITTER_MAX))
delay = min(delay * 2, BSKY_UPLOAD_MAX_DELAY)
return None
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky post
# ─────────────────────────────────────────────────────────────────────────────
def post_video_to_bluesky(
client: Client,
blob,
caption: str,
langs: list[str],
video_id: str,
) -> bool:
"""Create a Bluesky post embedding the uploaded video blob."""
from atproto import models
try:
video_embed = models.AppBskyEmbedVideo.Main(video=blob)
client.send_post(
text=caption,
embed=video_embed,
langs=langs,
)
logging.info(f"✅ Posted video {video_id} to Bluesky.")
return True
except Exception as e:
logging.error(
f"❌ Failed to post video {video_id} to Bluesky: "
f"{type(e).__name__}: {e}"
)
return False
# ─────────────────────────────────────────────────────────────────────────────
# TikTok scraping — Playwright
# ─────────────────────────────────────────────────────────────────────────────
def dismiss_overlays(page) -> None:
"""Try to dismiss cookie banners and modal overlays."""
all_sels = TIKTOK_COOKIE_MODAL_SELS + TIKTOK_BANNER_SELS
for sel in all_sels:
try:
el = page.locator(sel).first
if el.is_visible(timeout=1500):
el.click(timeout=1500)
logging.info(f"🚫 Dismissed overlay: {sel}")
time.sleep(0.5)
except Exception:
pass
<<<<<<< HEAD
=======
def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict]:
"""
Inner scraping loop shared by both the stealth and no-stealth paths.
Returns a list of video dicts.
"""
videos = []
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile (attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(f"⚠️ Video grid not found on attempt {attempt}.")
ts = int(time.time())
try:
page.screenshot(path=f"screenshot_no_grid_{attempt}_{ts}.png")
logging.info(
f"📸 Screenshot saved: screenshot_no_grid_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(f"✅ Playwright scraped {len(videos)} videos.")
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(path=f"screenshot_error_{attempt}_{ts}.png")
except Exception:
pass
time.sleep(3)
return videos
>>>>>>> 7cddbd0 (Fixes for today)
def scrape_tiktok_profile_playwright(
handle: str,
cookies: list,
limit: int = SCRAPE_VIDEO_LIMIT,
) -> list[dict]:
"""
Scrape the most recent video URLs from a TikTok profile page using Playwright.
<<<<<<< HEAD
Returns a list of dicts with keys: video_id, url, timestamp.
Stealth fix: playwright-stealth v2.x must wrap the page via a context manager
on new_page(), not via .apply() or .use_sync() after the fact.
=======
Stealth handling:
v1.x → stealth_sync(page) after new_page()
v2.x → Stealth() used as context manager; page created inside it
none → plain page, no stealth
>>>>>>> 7cddbd0 (Fixes for today)
"""
profile_url = f"https://www.tiktok.com/@{handle}"
logging.info(f"🕷️ Scraping TikTok profile: {profile_url}")
videos = []
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
slow_mo=PLAYWRIGHT_SLOW_MO,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-setuid-sandbox",
],
)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 900},
locale="es-ES",
)
inject_cookies_into_context(context, cookies)
<<<<<<< HEAD
# ── Stealth application ───────────────────────────────────────────
# v1.x: stealth_sync(page) — called after new_page()
# v2.x: context manager on new_page — page must be created inside
# the Stealth() context, NOT wrapped after the fact.
# Stealth().use_sync(page) returns a SyncWrappingContextManager,
# not a Page — calling .goto() on it crashes.
# ─────────────────────────────────────────────────────────────────
page = None
if _STEALTH_V2 is None:
logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.")
page = context.new_page()
elif _STEALTH_V2:
# v2.x — use as context manager so the page is created inside it
=======
# ── Stealth v2.x — page must be created inside the context manager ──
if _STEALTH_V2 is True:
>>>>>>> 7cddbd0 (Fixes for today)
try:
stealth_instance = Stealth()
with stealth_instance(context) as stealthy_context:
page = stealthy_context.new_page()
logging.info("🥷 playwright-stealth v2.x applied (context manager).")
<<<<<<< HEAD
# Run the scraping loop inside the context manager scope
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile "
f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(
f"⚠️ Video grid not found on attempt {attempt}."
)
ts = int(time.time())
page.screenshot(
path=f"screenshot_no_grid_{attempt}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{attempt}_{ts}.png"
)
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(
f"✅ Playwright scraped {len(videos)} videos."
)
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(
path=f"screenshot_error_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v2.x context manager failed: "
f"{type(e).__name__}: {e}. Falling back to no-stealth page."
)
page = context.new_page()
else:
# v1.x — create page then apply stealth
=======
videos = _run_playwright_scrape_loop(page, profile_url, limit)
except Exception as e:
logging.warning(
f"⚠️ playwright-stealth v2.x failed: {type(e).__name__}: {e}. "
f"Retrying without stealth."
)
# Fall through to no-stealth path below
page = context.new_page()
videos = _run_playwright_scrape_loop(page, profile_url, limit)
# ── Stealth v1.x ──────────────────────────────────────────────────
elif _STEALTH_V2 is False:
>>>>>>> 7cddbd0 (Fixes for today)
page = context.new_page()
try:
stealth_sync(page)
logging.info("🥷 playwright-stealth v1.x applied (stealth_sync).")
except Exception as e:
logging.warning(
<<<<<<< HEAD
f"⚠️ playwright-stealth v1.x failed: "
f"{type(e).__name__}: {e}. Continuing without stealth."
)
# ── Scraping loop for v1.x and no-stealth paths ───────────────────
# (v2.x runs its loop inside the context manager above)
if page is not None and not videos and _STEALTH_V2 is not True:
for attempt in range(1, PLAYWRIGHT_MAX_RELOADS + 1):
try:
logging.info(
f"🌐 Loading profile "
f"(attempt {attempt}/{PLAYWRIGHT_MAX_RELOADS})..."
)
page.goto(
profile_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
time.sleep(3)
dismiss_overlays(page)
try:
page.wait_for_selector(
TIKTOK_VIDEO_GRID_SEL,
timeout=PLAYWRIGHT_TIMEOUT_MS,
)
except Exception:
pass
grid = page.locator(TIKTOK_VIDEO_GRID_SEL).first
if not grid.is_visible(timeout=5000):
logging.warning(
f"⚠️ Video grid not found on attempt {attempt}."
)
ts = int(time.time())
page.screenshot(
path=f"screenshot_no_grid_{attempt}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{attempt}_{ts}.png"
)
time.sleep(3)
continue
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
for item in items[:limit]:
try:
link = item.locator("a").first.get_attribute("href")
if link and "/video/" in link:
vid_match = re.search(r"/video/(\d+)", link)
if vid_match:
video_id = vid_match.group(1)
full_url = (
link if link.startswith("http")
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
})
except Exception:
pass
if videos:
logging.info(f"✅ Playwright scraped {len(videos)} videos.")
break
except Exception as e:
logging.warning(
f"⚠️ Playwright attempt {attempt} error: "
f"{type(e).__name__}: {e}"
)
ts = int(time.time())
try:
page.screenshot(
path=f"screenshot_error_{attempt}_{ts}.png"
)
except Exception:
pass
time.sleep(3)
if not videos:
logging.warning(
f"⚠️ Video grid not found on attempt {PLAYWRIGHT_MAX_RELOADS}."
)
ts = int(time.time())
try:
if page:
page.screenshot(
path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
except Exception:
pass
# ── Cleanup ───────────────────────────────────────────────────────
=======
f"⚠️ playwright-stealth v1.x failed: {type(e).__name__}: {e}. "
f"Continuing without stealth."
)
videos = _run_playwright_scrape_loop(page, profile_url, limit)
# ── No stealth available ──────────────────────────────────────────
else:
logging.warning("⚠️ playwright-stealth not installed. Skipping stealth.")
page = context.new_page()
videos = _run_playwright_scrape_loop(page, profile_url, limit)
if not videos:
logging.warning(
f"⚠️ Video grid not found after {PLAYWRIGHT_MAX_RELOADS} attempts."
)
ts = int(time.time())
try:
page.screenshot(
path=f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
logging.info(
f"📸 Screenshot saved: "
f"screenshot_no_grid_{PLAYWRIGHT_MAX_RELOADS}_{ts}.png"
)
except Exception:
pass
>>>>>>> 7cddbd0 (Fixes for today)
for obj in (page, context, browser):
try:
if obj:
obj.close()
except Exception:
pass
return videos
# ─────────────────────────────────────────────────────────────────────────────
# TikTok scraping — yt-dlp fallback
# ─────────────────────────────────────────────────────────────────────────────
def scrape_tiktok_profile_ytdlp(
handle: str,
netscape_cookies_path: str = None,
limit: int = SCRAPE_VIDEO_LIMIT,
) -> list[dict]:
"""
Fallback: use yt-dlp to extract the video list from a TikTok profile.
Accepts a Netscape-format cookie file path (not JSON).
<<<<<<< HEAD
Returns a list of dicts with keys: video_id, url, timestamp.
=======
>>>>>>> 7cddbd0 (Fixes for today)
"""
import yt_dlp
profile_url = f"https://www.tiktok.com/@{handle}"
logging.info(f"📦 yt-dlp profile scrape fallback for @{handle}...")
impersonate = get_best_impersonation_target()
ydl_opts = {
"extract_flat": True,
"quiet": True,
"no_warnings": True,
"playlistend": limit,
}
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate:
ydl_opts["impersonate"] = impersonate
try:
logging.info(f"🌐 yt-dlp extracting: {profile_url}")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(profile_url, download=False)
entries = info.get("entries", []) if info else []
logging.info(
f"✅ yt-dlp returned {len(entries)} entries "
f"(playlist: {info.get('title', '?') if info else '?'})"
)
videos = []
for entry in entries:
if not entry:
continue
url = entry.get("url") or entry.get("webpage_url") or ""
vid_match = re.search(r"/video/(\d+)", url)
if not vid_match:
vid_id = entry.get("id", "")
if vid_id:
url = f"https://www.tiktok.com/@{handle}/video/{vid_id}"
vid_match = re.search(r"/video/(\d+)", url)
if vid_match:
videos.append({
"video_id": vid_match.group(1),
"url": url,
"timestamp": entry.get("timestamp"),
})
logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.")
return videos[:limit]
except Exception as e:
logging.error(
f"❌ yt-dlp profile scrape failed: {type(e).__name__}: {e}"
)
return []
# ─────────────────────────────────────────────────────────────────────────────
# Caption builder
# ─────────────────────────────────────────────────────────────────────────────
def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str:
"""Build a Bluesky post caption from video metadata."""
desc = (video_info.get("description") or "").strip()
url = video_info.get("url", "")
if desc:
<<<<<<< HEAD
url_len = len(url) + 1 # +1 for newline
=======
url_len = len(url) + 1
>>>>>>> 7cddbd0 (Fixes for today)
max_desc = max_len - url_len
if len(desc) > max_desc:
desc = desc[: max_desc - 1] + ""
return f"{desc}\n{url}"
return url
# ─────────────────────────────────────────────────────────────────────────────
# Main processing loop
# ─────────────────────────────────────────────────────────────────────────────
def process_videos(
videos: list[dict],
state: dict,
client: Client,
tiktok_handle: str,
netscape_cookies_path: str,
langs: list[str],
max_age_days: int,
video_max_size_bytes: int,
) -> int:
"""
Download, compress, upload and post each new video.
Returns the count of successfully posted videos.
"""
posted_count = 0
now = arrow.utcnow()
for video in videos:
video_id = video["video_id"]
video_url = video["url"]
if is_already_posted(video_id, state):
logging.info(f"⏭️ Already posted: {video_id}")
continue
<<<<<<< HEAD
# Age filter (only when timestamp is available)
=======
>>>>>>> 7cddbd0 (Fixes for today)
ts = video.get("timestamp")
if ts:
try:
video_time = arrow.get(ts)
age_days = (now - video_time).days
if age_days > max_age_days:
logging.info(
f"⏭️ Video {video_id} too old "
f"({age_days}d > {max_age_days}d). Skipping."
)
continue
except Exception:
pass
logging.info(f"🎬 Processing video {video_id}: {video_url}")
with tempfile.TemporaryDirectory() as tmpdir:
raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4")
comp_path = os.path.join(tmpdir, f"{video_id}.mp4")
# 1. Download
ok = download_video(
video_url,
raw_path,
netscape_cookies_path=netscape_cookies_path,
)
if not ok:
logging.error(f"❌ Download failed for {video_id}. Skipping.")
continue
# 2. Compress
ok = compress_video(
raw_path,
comp_path,
max_size_bytes=video_max_size_bytes,
)
if not ok:
logging.error(f"❌ Compression failed for {video_id}. Skipping.")
continue
# 3. Upload blob
blob = upload_video_to_bluesky(client, comp_path, video_id)
if blob is None:
logging.error(f"❌ Blob upload failed for {video_id}.")
continue
# 4. Post
caption = build_caption(video, tiktok_handle)
ok = post_video_to_bluesky(client, blob, caption, langs, video_id)
if ok:
mark_as_posted(video_id, state, meta={"url": video_url})
posted_count += 1
<<<<<<< HEAD
# Brief pause between posts to avoid rate limiting
=======
>>>>>>> 7cddbd0 (Fixes for today)
time.sleep(random.uniform(2.0, 5.0))
return posted_count
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Cross-post TikTok videos to Bluesky."
<<<<<<< HEAD
)
parser.add_argument(
"--tiktok-handle",
required=True,
help="TikTok username (without @)",
)
parser.add_argument(
"--bsky-handle",
required=True,
help="Bluesky handle",
)
parser.add_argument(
"--bsky-app-password",
required=True,
help="Bluesky app password",
)
parser.add_argument(
"--bsky-base-url",
default=DEFAULT_BSKY_BASE_URL,
help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})",
)
parser.add_argument(
"--bsky-langs",
nargs="+",
default=DEFAULT_BSKY_LANGS,
help="BCP-47 language tags for posts (default: es)",
)
parser.add_argument(
"--cookies-path",
default=TIKTOK_COOKIES_PATH,
help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})",
)
parser.add_argument(
"--max-age-days",
type=int,
default=VIDEO_MAX_AGE_DAYS,
help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})",
)
=======
)
parser.add_argument("--tiktok-handle", required=True)
parser.add_argument("--bsky-handle", required=True)
parser.add_argument("--bsky-app-password", required=True)
parser.add_argument(
"--bsky-base-url",
default=DEFAULT_BSKY_BASE_URL,
help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})",
)
parser.add_argument(
"--bsky-langs",
nargs="+",
default=DEFAULT_BSKY_LANGS,
help="BCP-47 language tags for posts (default: es)",
)
parser.add_argument(
"--cookies-path",
default=TIKTOK_COOKIES_PATH,
help=f"Path to TikTok cookies JSON (default: {TIKTOK_COOKIES_PATH})",
)
parser.add_argument(
"--max-age-days",
type=int,
default=VIDEO_MAX_AGE_DAYS,
help=f"Skip videos older than N days (default: {VIDEO_MAX_AGE_DAYS})",
)
>>>>>>> 7cddbd0 (Fixes for today)
return parser.parse_args()
def main():
load_dotenv()
args = parse_args()
<<<<<<< HEAD
# Fix 2 — resolve video size limit based on PDS
=======
>>>>>>> 7cddbd0 (Fixes for today)
video_max_size_bytes = get_video_size_limit(args.bsky_base_url)
logging.info("=" * 60)
logging.info("🤖 TikTok→Bluesky bot started")
logging.info(f" TikTok handle : @{args.tiktok_handle}")
logging.info(f" Bluesky handle: {args.bsky_handle}")
logging.info(f" Bluesky PDS : {args.bsky_base_url}")
logging.info(f" Languages : {args.bsky_langs}")
logging.info(f" Video size cap: {video_max_size_bytes // 1024 // 1024} MB")
cookie_status = "✅ found" if os.path.exists(args.cookies_path) else "❌ NOT FOUND"
logging.info(f" Cookie file : {args.cookies_path} ({cookie_status})")
logging.info("=" * 60)
state = load_state()
# Connect to Bluesky
client = connect_bluesky(
args.bsky_handle,
args.bsky_app_password,
args.bsky_base_url,
)
<<<<<<< HEAD
# Convert JSON cookies → Netscape format for yt-dlp
# Playwright uses the JSON cookies directly via inject_cookies_into_context()
# yt-dlp requires Netscape .txt format — convert once and reuse
=======
# Convert JSON cookies → Netscape format once for all yt-dlp calls
>>>>>>> 7cddbd0 (Fixes for today)
netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path)
if netscape_cookies_path:
logging.info(f"🍪 Netscape cookie file ready: {netscape_cookies_path}")
else:
<<<<<<< HEAD
logging.warning("⚠️ Could not create Netscape cookie file. yt-dlp will run without cookies.")
try:
# Scrape TikTok profile
=======
logging.warning(
"⚠️ Could not create Netscape cookie file. "
"yt-dlp will run without cookies."
)
try:
>>>>>>> 7cddbd0 (Fixes for today)
logging.info(f"🔄 Scraping @{args.tiktok_handle}...")
cookies = load_cookies_from_file(args.cookies_path)
videos = scrape_tiktok_profile_playwright(
args.tiktok_handle,
cookies,
limit=SCRAPE_VIDEO_LIMIT,
)
if not videos:
logging.warning(
"⚠️ Playwright grid scraping failed. Trying yt-dlp fallback..."
)
ts = int(time.time())
logging.info(f"📸 Screenshot saved: screenshot_playwright_failed_{ts}.png")
videos = scrape_tiktok_profile_ytdlp(
args.tiktok_handle,
netscape_cookies_path=netscape_cookies_path,
limit=SCRAPE_VIDEO_LIMIT,
)
if not videos:
logging.error("❌ No videos found. Exiting.")
sys.exit(0)
logging.info(f"📋 Found {len(videos)} video(s). Processing new ones...")
posted = process_videos(
videos=videos,
state=state,
client=client,
tiktok_handle=args.tiktok_handle,
netscape_cookies_path=netscape_cookies_path,
langs=args.bsky_langs,
max_age_days=args.max_age_days,
video_max_size_bytes=video_max_size_bytes,
)
logging.info("=" * 60)
logging.info(f"✅ Sync complete. Posted {posted} new video(s).")
logging.info("🤖 Bot finished.")
logging.info("=" * 60)
finally:
# Always clean up the temporary Netscape cookie file
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
try:
os.remove(netscape_cookies_path)
<<<<<<< HEAD
logging.info(f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}")
=======
logging.info(
f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}"
)
>>>>>>> 7cddbd0 (Fixes for today)
except Exception as e:
logging.warning(f"⚠️ Could not remove Netscape cookie file: {e}")
if __name__ == "__main__":
main()