Added all

This commit is contained in:
Guillem Hernandez Sola
2026-05-19 11:29:47 +02:00
parent ee54aa4a25
commit b41cbce242

View File

@@ -8,9 +8,9 @@ them to a Bluesky account.
Usage:
python tiktok2bsky.py \
--tiktok-handle jijantesfc \
--bsky-handle jijantesfc.eurosky.social \
--bsky-handle jijantesfc.bsky.social \
--bsky-app-password xxxx-xxxx-xxxx-xxxx \
--bsky-base-url https://eurosky.social \
--bsky-base-url https://bsky.social \
--bsky-langs es \
--cookies-path tiktok_cookies.json
"""
@@ -33,6 +33,7 @@ import httpx
from atproto import Client
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright
# playwright-stealth 1.x uses stealth_sync, 2.x uses Stealth class
try:
from playwright_stealth import stealth_sync
@@ -113,6 +114,7 @@ TIKTOK_COOKIE_MODAL_SELS = [
TIKTOK_GRID_ERROR_SEL = '[data-e2e="user-post-item-list-error"]'
TIKTOK_REFRESH_BTN_SEL = 'button:has-text("Actualizar"), button:has-text("Refresh")'
# ─────────────────────────────────────────────────────────────────────────────
# State management
# ─────────────────────────────────────────────────────────────────────────────
@@ -160,6 +162,7 @@ def mark_as_posted(video_id: str, state: dict, meta: dict = None):
}
save_state(state)
# ─────────────────────────────────────────────────────────────────────────────
# Cookie helpers
# ─────────────────────────────────────────────────────────────────────────────
@@ -203,64 +206,215 @@ def inject_cookies_into_context(context, cookies: list):
except Exception as e:
logging.warning(f"⚠️ Could not inject cookies: {e}")
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky error classification helpers (ported from twitter2bsky.py)
# ─────────────────────────────────────────────────────────────────────────────
def is_rate_limited_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"429" in text
or "ratelimitexceeded" in text
or "too many requests" in text
or "rate limit" in text
)
def is_auth_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"401" in text
or "403" in text
or "invalid identifier or password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
)
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"ConnectError",
"RemoteProtocolError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"503",
"502",
"504",
"ConnectionResetError",
]
return any(sig in text for sig in signals)
def is_transient_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(sig in text for sig in signals)
def get_rate_limit_wait_seconds(error_obj, default_delay: float) -> float:
"""
Parse rate-limit response headers and return a bounded wait time in seconds.
Supports retry-after, x-ratelimit-after, and ratelimit-reset (unix timestamp).
Ported from twitter2bsky.py.
"""
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
for key in ("retry-after", "Retry-After"):
if headers.get(key):
return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY)
for key in ("x-ratelimit-after", "X-RateLimit-After"):
if headers.get(key):
return min(max(int(headers[key]), 1), BSKY_LOGIN_MAX_DELAY)
for key in ("ratelimit-reset", "RateLimit-Reset"):
if headers.get(key):
wait = max(int(headers[key]) - now_ts + 1, default_delay)
return min(wait, BSKY_LOGIN_MAX_DELAY)
except Exception:
pass
# repr() fallback — parse headers embedded in the exception string
text = repr(error_obj)
for pattern, is_timestamp in [
(r"'retry-after':\s*'(\d+)'", False),
(r"'x-ratelimit-after':\s*'(\d+)'", False),
(r"'ratelimit-reset':\s*'(\d+)'", True),
]:
m = re.search(pattern, text, re.IGNORECASE)
if m:
val = int(m.group(1))
if is_timestamp:
return min(
max(val - int(time.time()) + 1, default_delay),
BSKY_LOGIN_MAX_DELAY,
)
return min(max(val, 1), BSKY_LOGIN_MAX_DELAY)
return default_delay
# ─────────────────────────────────────────────────────────────────────────────
# Bluesky helpers
# ─────────────────────────────────────────────────────────────────────────────
def bsky_login(client: Client, handle: str, password: str,
base_url: str) -> bool:
base_url: str = DEFAULT_BSKY_BASE_URL) -> bool:
"""
Authenticate against the AT Protocol PDS.
base_url is always https://bsky.social for standard Bluesky accounts —
even when the user's handle lives on a custom domain like eurosky.social.
The Client is re-initialised with the base URL baked in at construction
time, which is the only reliable way to override the internal session
resolver (mirrors create_bsky_client() in twitter2bsky.py).
"""
normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}")
# Re-initialise the client so the base URL is baked in from the start.
# Setting client.base_url after construction does not reliably override
# the internal session resolver in the atproto SDK.
client.__init__(base_url=normalized_base_url)
for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1):
try:
# Force the client to use the custom PDS for ALL requests
# including identity resolution — must be set before login
client._base_url = base_url.rstrip("/")
client.base_url = base_url.rstrip("/")
# Use com.atproto.server.createSession directly on the PDS
response = client.com.atproto.server.create_session(
data={
"identifier": handle,
"password": password,
}
)
logging.info(
f"✅ Logged in to Bluesky as {handle} via {base_url}"
f"🔐 Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} "
f"for {handle}"
)
client.login(handle, password)
logging.info(f"✅ Bluesky login successful as {handle}")
return True
except Exception as e:
err = str(e)
# 401 = wrong credentials — no point retrying
if any(x in err for x in ("401", "AuthenticationRequired",
"Invalid identifier", "Invalid password")):
# ── 401 / auth errors — no point retrying ─────────────────
if is_auth_error(e):
logging.error(
f"❌ Bluesky login failed: invalid handle or app password.\n"
f" Handle : {handle}\n"
f" PDS : {base_url}\n"
f" Fix : regenerate app password at {base_url}/settings\n"
f" Detail : {err}"
f" PDS : {normalized_base_url}\n"
f" Fix : regenerate app password at "
f"https://bsky.app/settings/app-passwords\n"
f" Detail : {repr(e)}"
)
return False
if attempt == BSKY_LOGIN_MAX_RETRIES:
# ── Rate limit ─────────────────────────────────────────────
if is_rate_limited_error(e):
if attempt < BSKY_LOGIN_MAX_RETRIES:
wait = get_rate_limit_wait_seconds(
e, default_delay=BSKY_LOGIN_BASE_DELAY
)
wait += random.uniform(0, BSKY_LOGIN_JITTER_MAX)
logging.warning(
f"⏳ Bluesky login rate-limited "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). "
f"Retrying in {wait:.1f}s."
)
time.sleep(wait)
continue
logging.error(
f"All {BSKY_LOGIN_MAX_RETRIES} login attempts failed."
"Exhausted Bluesky login retries due to rate limiting."
)
return False
delay = min(
BSKY_LOGIN_BASE_DELAY * (2 ** (attempt - 1))
+ random.uniform(0, BSKY_LOGIN_JITTER_MAX),
BSKY_LOGIN_MAX_DELAY,
# ── Transient / network errors ─────────────────────────────
if is_network_error(e) or is_transient_error(e):
if attempt < BSKY_LOGIN_MAX_RETRIES:
wait = min(
BSKY_LOGIN_BASE_DELAY * attempt,
BSKY_LOGIN_MAX_DELAY,
) + random.uniform(0, BSKY_LOGIN_JITTER_MAX)
logging.warning(
f"⏳ Transient login failure "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}). "
f"Retrying in {wait:.1f}s."
)
time.sleep(wait)
continue
logging.error(
"❌ Exhausted Bluesky login retries after "
"transient/network errors."
)
return False
# ── Unexpected error — retry with backoff ──────────────────
if attempt < BSKY_LOGIN_MAX_RETRIES:
wait = min(
BSKY_LOGIN_BASE_DELAY * attempt,
BSKY_LOGIN_MAX_DELAY,
) + random.uniform(0, BSKY_LOGIN_JITTER_MAX)
logging.warning(
f"⏳ Unexpected login error "
f"(attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES}): "
f"{repr(e)}. Retrying in {wait:.1f}s."
)
time.sleep(wait)
continue
logging.error(
f"❌ All Bluesky login attempts failed. Last error: {repr(e)}"
)
logging.warning(
f"⚠️ Bluesky login attempt {attempt} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
return False
return False
def bsky_get_recent_post_urls(client: Client, handle: str,
limit: int = 50) -> set:
"""Return a set of URLs recently posted to Bluesky (to avoid duplicates)."""
@@ -274,7 +428,7 @@ def bsky_get_recent_post_urls(client: Client, handle: str,
if hasattr(embed, "external") and hasattr(embed.external, "uri"):
urls.add(embed.external.uri)
if hasattr(post, "record") and hasattr(post.record, "text"):
text = post.record.text
text = post.record.text
found = re.findall(r"https?://\S+", text)
urls.update(found)
except Exception as e:
@@ -294,12 +448,12 @@ def bsky_upload_blob_with_retry(client: Client, data: bytes,
)
return resp.blob
except Exception as e:
err = str(e)
is_rate_limit = "429" in err or "RateLimitExceeded" in err
is_rate_limit = is_rate_limited_error(e)
if attempt == BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
f"❌ Blob upload failed after "
f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
)
raise
@@ -309,7 +463,10 @@ def bsky_upload_blob_with_retry(client: Client, data: bytes,
BSKY_UPLOAD_MAX_DELAY,
)
if is_rate_limit:
delay = max(delay, 60.0)
delay = max(
get_rate_limit_wait_seconds(e, default_delay=delay),
60.0,
)
logging.warning(
f"⚠️ Blob upload attempt {attempt} failed: {e}. "
@@ -332,12 +489,12 @@ def bsky_create_post_with_retry(client: Client, text: str,
logging.info(f"✅ Post created on attempt {attempt}.")
return True
except Exception as e:
err = str(e)
is_rate_limit = "429" in err or "RateLimitExceeded" in err
is_rate_limit = is_rate_limited_error(e)
if attempt == BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Post creation failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
f"❌ Post creation failed after "
f"{BSKY_UPLOAD_MAX_RETRIES} attempts: {e}"
)
return False
@@ -347,7 +504,10 @@ def bsky_create_post_with_retry(client: Client, text: str,
BSKY_UPLOAD_MAX_DELAY,
)
if is_rate_limit:
delay = max(delay, 60.0)
delay = max(
get_rate_limit_wait_seconds(e, default_delay=delay),
60.0,
)
logging.warning(
f"⚠️ Post creation attempt {attempt} failed: {e}. "
@@ -357,6 +517,7 @@ def bsky_create_post_with_retry(client: Client, text: str,
return False
# ─────────────────────────────────────────────────────────────────────────────
# Video processing helpers
# ─────────────────────────────────────────────────────────────────────────────
@@ -389,10 +550,10 @@ def compress_video(input_path: str, output_path: str,
duration = get_video_duration(input_path)
trim_to = min(duration, max_duration)
# Target bitrate calculation (leave 10% headroom)
target_bits = max_size_bytes * 8 * 0.90
target_kbps = int(target_bits / trim_to / 1000)
video_kbps = max(200, target_kbps - 128) # reserve 128k for audio
# Target bitrate calculation (leave 10 % headroom)
target_bits = max_size_bytes * 8 * 0.90
target_kbps = int(target_bits / trim_to / 1000)
video_kbps = max(200, target_kbps - 128) # reserve 128 k for audio
logging.info(
f"🎬 Compressing: duration={duration:.1f}s → trim={trim_to:.1f}s, "
@@ -478,10 +639,10 @@ def download_video_ytdlp(url: str, output_path: str,
import yt_dlp
ydl_opts = {
"outtmpl": output_path,
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"quiet": True,
"no_warnings": False,
"outtmpl": output_path,
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"quiet": True,
"no_warnings": False,
"merge_output_format": "mp4",
}
@@ -519,19 +680,25 @@ def _write_netscape_cookies(cookies: list) -> str | None:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write("# Netscape HTTP Cookie File\n")
for c in cookies:
domain = c.get("domain", ".tiktok.com")
flag = "TRUE" if domain.startswith(".") else "FALSE"
path_val = c.get("path", "/")
secure = "TRUE" if c.get("secure") else "FALSE"
exp = int(c.get("expirationDate", 0) or c.get("expires", 0) or 0)
name = c.get("name", "")
value = c.get("value", "")
f.write(f"{domain}\t{flag}\t{path_val}\t{secure}\t{exp}\t{name}\t{value}\n")
domain = c.get("domain", ".tiktok.com")
flag = "TRUE" if domain.startswith(".") else "FALSE"
path_val = c.get("path", "/")
secure = "TRUE" if c.get("secure") else "FALSE"
exp = int(
c.get("expirationDate", 0) or c.get("expires", 0) or 0
)
name = c.get("name", "")
value = c.get("value", "")
f.write(
f"{domain}\t{flag}\t{path_val}\t{secure}\t"
f"{exp}\t{name}\t{value}\n"
)
return path
except Exception as e:
logging.warning(f"⚠️ Could not write Netscape cookie file: {e}")
return None
# ─────────────────────────────────────────────────────────────────────────────
# TikTok scraping via Playwright
# ─────────────────────────────────────────────────────────────────────────────
@@ -599,7 +766,6 @@ def scrape_tiktoks_via_playwright(handle: str) -> list:
page = context.new_page()
# Stealth mode
# Stealth mode — compatible with both v1.x and v2.x
if _STEALTH_V2:
Stealth().apply_stealth_sync(page)
@@ -680,8 +846,6 @@ def scrape_tiktoks_via_playwright(handle: str) -> list:
items = page.locator(TIKTOK_VIDEO_ITEM_SEL).all()
logging.info(f"📋 Found {len(items)} video items in grid.")
cutoff = arrow.utcnow().shift(days=-VIDEO_MAX_AGE_DAYS)
for item in items[:SCRAPE_VIDEO_LIMIT]:
try:
# Get the link
@@ -705,7 +869,9 @@ def scrape_tiktoks_via_playwright(handle: str) -> list:
try:
desc = item.get_attribute("aria-label") or ""
if not desc:
desc_el = item.locator('[class*="desc"], [class*="title"]').first
desc_el = item.locator(
'[class*="desc"], [class*="title"]'
).first
desc = desc_el.inner_text(timeout=1000).strip()
except Exception:
pass
@@ -727,6 +893,7 @@ def scrape_tiktoks_via_playwright(handle: str) -> list:
logging.info(f"✅ Scraped {len(videos)} videos from @{handle}.")
return videos
# ─────────────────────────────────────────────────────────────────────────────
# Core: process a single TikTok video → post to Bluesky
# ─────────────────────────────────────────────────────────────────────────────
@@ -750,8 +917,8 @@ def process_tiktok(video: dict, client: Client,
cookies = load_cookies_from_file(TIKTOK_COOKIES_PATH)
with tempfile.TemporaryDirectory() as tmpdir:
raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4")
processed_path = os.path.join(tmpdir, f"{video_id}.mp4")
raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4")
processed_path = os.path.join(tmpdir, f"{video_id}.mp4")
# ── Download ───────────────────────────────────────────────────
logging.info(f"⬇️ Downloading: {video_url}")
@@ -825,6 +992,7 @@ def process_tiktok(video: dict, client: Client,
logging.error(f"❌ Failed to post video {video_id} to Bluesky.")
return False
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
@@ -836,25 +1004,42 @@ def main():
parser = argparse.ArgumentParser(
description="TikTok → Bluesky cross-poster"
)
parser.add_argument("--tiktok-handle", required=True,
help="TikTok handle to scrape (without @)")
parser.add_argument("--bsky-handle", required=True,
help="Bluesky handle (e.g. user.eurosky.social)")
parser.add_argument("--bsky-app-password", required=True,
help="Bluesky app password (not account password)")
parser.add_argument("--bsky-base-url", default=DEFAULT_BSKY_BASE_URL,
help=f"Bluesky PDS base URL (default: {DEFAULT_BSKY_BASE_URL})")
parser.add_argument("--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS,
help="Post language codes (default: es)")
parser.add_argument("--cookies-path", default=TIKTOK_COOKIES_PATH,
help="Path to TikTok cookies JSON file")
parser.add_argument(
"--tiktok-handle", required=True,
help="TikTok handle to scrape (without @)",
)
parser.add_argument(
"--bsky-handle", required=True,
help="Bluesky handle (e.g. user.bsky.social)",
)
parser.add_argument(
"--bsky-app-password", required=True,
help="Bluesky app password (not account password)",
)
parser.add_argument(
"--bsky-base-url", default=DEFAULT_BSKY_BASE_URL,
help=(
"Bluesky AT Protocol PDS base URL. "
"Always https://bsky.social even for custom-domain users "
"(e.g. eurosky.social handles still authenticate via bsky.social). "
f"Default: {DEFAULT_BSKY_BASE_URL}"
),
)
parser.add_argument(
"--bsky-langs", nargs="+", default=DEFAULT_BSKY_LANGS,
help="Post language codes (default: es)",
)
parser.add_argument(
"--cookies-path", default=TIKTOK_COOKIES_PATH,
help="Path to TikTok cookies JSON file",
)
args = parser.parse_args()
# Override global cookie path from CLI
TIKTOK_COOKIES_PATH = args.cookies_path
logging.info("=" * 60)
logging.info(f"🤖 TikTok→Bluesky bot started")
logging.info("🤖 TikTok→Bluesky bot started")
logging.info(f" TikTok handle : @{args.tiktok_handle}")
logging.info(f" Bluesky handle: {args.bsky_handle}")
logging.info(f" Bluesky PDS : {args.bsky_base_url}")
@@ -866,12 +1051,17 @@ def main():
logging.info("=" * 60)
state = load_state()
# Instantiate client — base URL is baked in via bsky_login()
client = Client()
# ── Bluesky login ──────────────────────────────────────────────────
if not bsky_login(client, args.bsky_handle,
args.bsky_app_password,
args.bsky_base_url):
if not bsky_login(
client,
args.bsky_handle,
args.bsky_app_password,
args.bsky_base_url,
):
logging.error("❌ Cannot proceed without Bluesky login. Exiting.")
sys.exit(1)