This commit is contained in:
Guillem Hernandez Sola
2026-05-20 09:28:12 +02:00
parent 04384ec91c
commit c0524b76ee

View File

@@ -44,7 +44,6 @@ _STEALTH_SYNC = None # will hold the stealth_sync callable if v1.x is present
try:
from playwright_stealth import stealth_sync as _stealth_sync_import
_STEALTH_SYNC = _stealth_sync_import
logging.getLogger(__name__).debug("playwright-stealth v1.x detected (stealth_sync)")
except ImportError:
# v2.x is installed but its API is too unstable to use reliably —
# browser launch args provide equivalent protection for our use case
@@ -494,7 +493,7 @@ def compress_video(
)
return False
trim_to = min(duration, max_duration)
trim_to = min(duration, max_duration)
target_bits = max_size_bytes * 8 * 0.85
total_kbps = int(target_bits / trim_to / 1000)
audio_kbps = 96
@@ -553,24 +552,18 @@ def compress_video(
# ─────────────────────────────────────────────────────────────────────────────
# yt-dlp helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_best_impersonation_target() -> str | None:
def get_best_impersonation_target():
"""
Ask yt-dlp directly which impersonation targets are actually available
in the current environment. This is the only reliable method —
curl_cffi's BrowserType enum values change between versions and do not
map 1:1 to yt-dlp's target names.
in the current environment. Returns the best ImpersonateTarget object,
or None if none are available.
Returns the best available target string, or None if none are available.
This is the only reliable method — curl_cffi's BrowserType enum values
change between versions and do not map 1:1 to yt-dlp's target names.
"""
try:
import yt_dlp
# yt-dlp exposes available impersonation targets via
# ImpersonateTarget.supported_targets() in newer builds,
# or via YoutubeDL._impersonate_target_key in older ones.
# The safest cross-version approach is to instantiate a YoutubeDL
# object with quiet=True and inspect _impersonate_targets.
with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl:
# _impersonate_targets is a dict of {ImpersonateTarget: handler}
targets = getattr(ydl, "_impersonate_targets", None)
if not targets:
logging.warning(
@@ -578,11 +571,8 @@ def get_best_impersonation_target() -> str | None:
)
return None
# Convert to string representations and pick the best one
preferred = ["chrome", "safari", "firefox", "edge"]
available_strs = []
for t in targets.keys():
# ImpersonateTarget has .client and optionally .version
client = getattr(t, "client", None) or str(t)
version = getattr(t, "version", None)
label = f"{client}-{version}" if version else str(client)
@@ -593,7 +583,7 @@ def get_best_impersonation_target() -> str | None:
f"{[s for s, _ in available_strs]}"
)
# Pick highest-versioned chrome first, then others
# Prefer highest-versioned chrome, then anything else
chrome_targets = sorted(
[(s, t) for s, t in available_strs if "chrome" in s],
key=lambda x: x[0],
@@ -602,9 +592,8 @@ def get_best_impersonation_target() -> str | None:
if chrome_targets:
best_label, best_target = chrome_targets[0]
logging.info(f"🎭 Selected impersonation target: {best_label}")
return best_target # return the actual ImpersonateTarget object
return best_target
# Fallback to any available target
best_label, best_target = available_strs[0]
logging.info(f"🎭 Selected impersonation target (fallback): {best_label}")
return best_target
@@ -617,11 +606,76 @@ def get_best_impersonation_target() -> str | None:
return None
def fetch_video_metadata_ytdlp(
url: str,
netscape_cookies_path: str = None,
) -> dict:
"""
Fetch metadata (title, description, timestamp, uploader) for a single
TikTok video URL using yt-dlp without downloading the video file.
TikTok captions (the text the creator wrote) live in the 'description'
field of yt-dlp's info dict. 'title' is a shorter auto-generated label.
Returns a dict with keys: description, title, timestamp, uploader.
All values default to empty string / None on failure.
"""
import yt_dlp
impersonate = get_best_impersonation_target()
ydl_opts = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
}
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate is not None:
ydl_opts["impersonate"] = impersonate
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
return {}
raw_desc = (info.get("description") or "").strip()
raw_title = (info.get("title") or "").strip()
# Prefer description (full caption with hashtags) over title
description = raw_desc or raw_title
logging.info(
f"📝 Fetched metadata for {url}: "
f"description={description[:80]!r}"
f"{'...' if len(description) > 80 else ''}"
)
return {
"description": description,
"title": raw_title,
"timestamp": info.get("timestamp"),
"uploader": info.get("uploader") or info.get("channel") or "",
}
except Exception as e:
logging.warning(
f"⚠️ Could not fetch metadata for {url}: {type(e).__name__}: {e}"
)
return {}
def download_video_ytdlp(
url: str,
output_path: str,
netscape_cookies_path: str = None,
) -> bool:
"""
Download a TikTok video using yt-dlp with browser impersonation.
Accepts a Netscape-format cookie file path (not JSON).
"""
impersonate = get_best_impersonation_target()
ydl_opts = {
@@ -631,10 +685,8 @@ def download_video_ytdlp(
"no_warnings": False,
"merge_output_format": "mp4",
}
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
ydl_opts["cookiefile"] = netscape_cookies_path
if impersonate is not None:
ydl_opts["impersonate"] = impersonate
@@ -665,7 +717,9 @@ def download_video(
netscape_cookies_path: str = None,
) -> bool:
logging.info(f"⬇️ Downloading: {url}")
return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path)
return download_video_ytdlp(
url, output_path, netscape_cookies_path=netscape_cookies_path
)
# ─────────────────────────────────────────────────────────────────────────────
@@ -692,14 +746,12 @@ def upload_video_to_bluesky(
except Exception as e:
err_detail = f"{type(e).__name__}: {e}"
if attempt >= BSKY_UPLOAD_MAX_RETRIES:
logging.error(
f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: "
f"{err_detail}"
)
return None
logging.warning(
f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} "
f"failed: {err_detail}. Retrying in {delay:.1f}s..."
@@ -734,6 +786,43 @@ def post_video_to_bluesky(
return False
# ─────────────────────────────────────────────────────────────────────────────
# Caption builder
# ─────────────────────────────────────────────────────────────────────────────
def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str:
"""
Build a Bluesky post caption from video metadata.
Format:
<description>
<tiktok_url>
If description + URL exceeds 290 chars, the description is trimmed at
the last whitespace boundary before the limit to avoid cutting mid-word
or mid-hashtag.
"""
desc = (video_info.get("description") or "").strip()
url = video_info.get("url", "").strip()
if not desc:
# No caption available — just post the URL
return url
# Reserve space for newline + URL
url_block = f"\n{url}"
max_desc = max_len - len(url_block)
if len(desc) > max_desc:
trimmed = desc[:max_desc - 1]
cut = trimmed.rfind(" ")
# Only use word boundary if it doesn't cut off too much
if cut > max_desc // 2:
trimmed = trimmed[:cut]
desc = trimmed + ""
return f"{desc}{url_block}"
# ─────────────────────────────────────────────────────────────────────────────
# TikTok scraping — Playwright
# ─────────────────────────────────────────────────────────────────────────────
@@ -801,9 +890,10 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict
else f"https://www.tiktok.com{link}"
)
videos.append({
"video_id": video_id,
"url": full_url,
"timestamp": None,
"video_id": video_id,
"url": full_url,
"timestamp": None,
"description": "",
})
except Exception:
pass
@@ -864,7 +954,6 @@ def scrape_tiktok_profile_playwright(
),
viewport={"width": 1280, "height": 900},
locale="es-ES",
# Mask automation signals at the context level
extra_http_headers={
"Accept-Language": "es-ES,es;q=0.9,en;q=0.8",
},
@@ -873,7 +962,7 @@ def scrape_tiktok_profile_playwright(
inject_cookies_into_context(context, cookies)
page = context.new_page()
# Apply stealth v1.x if available; skip v2.x entirely
# Apply stealth v1.x if available; skip v2.x entirely (unstable API)
if _STEALTH_SYNC is not None:
try:
_STEALTH_SYNC(page)
@@ -928,6 +1017,9 @@ def scrape_tiktok_profile_ytdlp(
"""
Fallback: use yt-dlp to extract the video list from a TikTok profile.
Accepts a Netscape-format cookie file path (not JSON).
Note: flat playlist extraction gives us basic metadata (title, timestamp)
but not the full description — that is fetched per-video in process_videos().
"""
import yt_dlp
@@ -970,10 +1062,13 @@ def scrape_tiktok_profile_ytdlp(
url = f"https://www.tiktok.com/@{handle}/video/{vid_id}"
vid_match = re.search(r"/video/(\d+)", url)
if vid_match:
# description from flat extraction is usually just the title —
# the full caption is fetched per-video in process_videos()
videos.append({
"video_id": vid_match.group(1),
"url": url,
"timestamp": entry.get("timestamp"),
"video_id": vid_match.group(1),
"url": url,
"timestamp": entry.get("timestamp"),
"description": (entry.get("description") or "").strip(),
})
logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.")
@@ -984,21 +1079,6 @@ def scrape_tiktok_profile_ytdlp(
return []
# ─────────────────────────────────────────────────────────────────────────────
# Caption builder
# ─────────────────────────────────────────────────────────────────────────────
def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str:
desc = (video_info.get("description") or "").strip()
url = video_info.get("url", "")
if desc:
url_len = len(url) + 1
max_desc = max_len - url_len
if len(desc) > max_desc:
desc = desc[: max_desc - 1] + ""
return f"{desc}\n{url}"
return url
# ─────────────────────────────────────────────────────────────────────────────
# Main processing loop
# ─────────────────────────────────────────────────────────────────────────────
@@ -1012,6 +1092,14 @@ def process_videos(
max_age_days: int,
video_max_size_bytes: int,
) -> int:
"""
For each new video:
0. Fetch full metadata (description/caption) via yt-dlp
1. Download the video file
2. Compress to fit within the PDS size limit
3. Upload blob to Bluesky
4. Create the post with caption + URL
"""
posted_count = 0
now = arrow.utcnow()
@@ -1023,6 +1111,7 @@ def process_videos(
logging.info(f"⏭️ Already posted: {video_id}")
continue
# Age filter (only when timestamp is available)
ts = video.get("timestamp")
if ts:
try:
@@ -1039,6 +1128,19 @@ def process_videos(
logging.info(f"🎬 Processing video {video_id}: {video_url}")
# ── 0. Fetch full metadata if description not already populated ───
if not video.get("description"):
logging.info(f"🔍 Fetching metadata for {video_id}...")
meta = fetch_video_metadata_ytdlp(
video_url,
netscape_cookies_path=netscape_cookies_path,
)
if meta:
video["description"] = meta.get("description", "")
# Backfill timestamp if we didn't have one from scraping
if not video.get("timestamp") and meta.get("timestamp"):
video["timestamp"] = meta["timestamp"]
with tempfile.TemporaryDirectory() as tmpdir:
raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4")
comp_path = os.path.join(tmpdir, f"{video_id}.mp4")
@@ -1053,7 +1155,9 @@ def process_videos(
continue
# 2. Compress
ok = compress_video(raw_path, comp_path, max_size_bytes=video_max_size_bytes)
ok = compress_video(
raw_path, comp_path, max_size_bytes=video_max_size_bytes
)
if not ok:
logging.error(f"❌ Compression failed for {video_id}. Skipping.")
continue
@@ -1066,6 +1170,7 @@ def process_videos(
# 4. Post
caption = build_caption(video, tiktok_handle)
logging.info(f"📝 Caption preview: {caption[:120]!r}")
ok = post_video_to_bluesky(client, blob, caption, langs, video_id)
if ok:
mark_as_posted(video_id, state, meta={"url": video_url})
@@ -1122,7 +1227,9 @@ def main():
logging.info("=" * 60)
state = load_state()
client = connect_bluesky(args.bsky_handle, args.bsky_app_password, args.bsky_base_url)
client = connect_bluesky(
args.bsky_handle, args.bsky_app_password, args.bsky_base_url
)
# Convert JSON cookies → Netscape format once for all yt-dlp calls
netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path)
@@ -1178,6 +1285,7 @@ def main():
logging.info("=" * 60)
finally:
# Always clean up the temporary Netscape cookie file
if netscape_cookies_path and os.path.exists(netscape_cookies_path):
try:
os.remove(netscape_cookies_path)
@@ -1185,7 +1293,9 @@ def main():
f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}"
)
except Exception as e:
logging.warning(f"⚠️ Could not remove Netscape cookie file: {e}")
logging.warning(
f"⚠️ Could not remove Netscape cookie file: {e}"
)
if __name__ == "__main__":