diff --git a/tiktok2bsky.py b/tiktok2bsky.py index b6ad65d..66d6e8b 100644 --- a/tiktok2bsky.py +++ b/tiktok2bsky.py @@ -44,7 +44,6 @@ _STEALTH_SYNC = None # will hold the stealth_sync callable if v1.x is present try: from playwright_stealth import stealth_sync as _stealth_sync_import _STEALTH_SYNC = _stealth_sync_import - logging.getLogger(__name__).debug("playwright-stealth v1.x detected (stealth_sync)") except ImportError: # v2.x is installed but its API is too unstable to use reliably — # browser launch args provide equivalent protection for our use case @@ -494,7 +493,7 @@ def compress_video( ) return False - trim_to = min(duration, max_duration) + trim_to = min(duration, max_duration) target_bits = max_size_bytes * 8 * 0.85 total_kbps = int(target_bits / trim_to / 1000) audio_kbps = 96 @@ -553,24 +552,18 @@ def compress_video( # ───────────────────────────────────────────────────────────────────────────── # yt-dlp helpers # ───────────────────────────────────────────────────────────────────────────── -def get_best_impersonation_target() -> str | None: +def get_best_impersonation_target(): """ Ask yt-dlp directly which impersonation targets are actually available - in the current environment. This is the only reliable method — - curl_cffi's BrowserType enum values change between versions and do not - map 1:1 to yt-dlp's target names. + in the current environment. Returns the best ImpersonateTarget object, + or None if none are available. - Returns the best available target string, or None if none are available. + This is the only reliable method — curl_cffi's BrowserType enum values + change between versions and do not map 1:1 to yt-dlp's target names. """ try: import yt_dlp - # yt-dlp exposes available impersonation targets via - # ImpersonateTarget.supported_targets() in newer builds, - # or via YoutubeDL._impersonate_target_key in older ones. - # The safest cross-version approach is to instantiate a YoutubeDL - # object with quiet=True and inspect _impersonate_targets. with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl: - # _impersonate_targets is a dict of {ImpersonateTarget: handler} targets = getattr(ydl, "_impersonate_targets", None) if not targets: logging.warning( @@ -578,11 +571,8 @@ def get_best_impersonation_target() -> str | None: ) return None - # Convert to string representations and pick the best one - preferred = ["chrome", "safari", "firefox", "edge"] available_strs = [] for t in targets.keys(): - # ImpersonateTarget has .client and optionally .version client = getattr(t, "client", None) or str(t) version = getattr(t, "version", None) label = f"{client}-{version}" if version else str(client) @@ -593,7 +583,7 @@ def get_best_impersonation_target() -> str | None: f"{[s for s, _ in available_strs]}" ) - # Pick highest-versioned chrome first, then others + # Prefer highest-versioned chrome, then anything else chrome_targets = sorted( [(s, t) for s, t in available_strs if "chrome" in s], key=lambda x: x[0], @@ -602,9 +592,8 @@ def get_best_impersonation_target() -> str | None: if chrome_targets: best_label, best_target = chrome_targets[0] logging.info(f"🎭 Selected impersonation target: {best_label}") - return best_target # return the actual ImpersonateTarget object + return best_target - # Fallback to any available target best_label, best_target = available_strs[0] logging.info(f"🎭 Selected impersonation target (fallback): {best_label}") return best_target @@ -617,11 +606,76 @@ def get_best_impersonation_target() -> str | None: return None +def fetch_video_metadata_ytdlp( + url: str, + netscape_cookies_path: str = None, +) -> dict: + """ + Fetch metadata (title, description, timestamp, uploader) for a single + TikTok video URL using yt-dlp without downloading the video file. + + TikTok captions (the text the creator wrote) live in the 'description' + field of yt-dlp's info dict. 'title' is a shorter auto-generated label. + + Returns a dict with keys: description, title, timestamp, uploader. + All values default to empty string / None on failure. + """ + import yt_dlp + + impersonate = get_best_impersonation_target() + + ydl_opts = { + "quiet": True, + "no_warnings": True, + "skip_download": True, + } + if netscape_cookies_path and os.path.exists(netscape_cookies_path): + ydl_opts["cookiefile"] = netscape_cookies_path + if impersonate is not None: + ydl_opts["impersonate"] = impersonate + + try: + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + + if not info: + return {} + + raw_desc = (info.get("description") or "").strip() + raw_title = (info.get("title") or "").strip() + + # Prefer description (full caption with hashtags) over title + description = raw_desc or raw_title + + logging.info( + f"📝 Fetched metadata for {url}: " + f"description={description[:80]!r}" + f"{'...' if len(description) > 80 else ''}" + ) + + return { + "description": description, + "title": raw_title, + "timestamp": info.get("timestamp"), + "uploader": info.get("uploader") or info.get("channel") or "", + } + + except Exception as e: + logging.warning( + f"⚠️ Could not fetch metadata for {url}: {type(e).__name__}: {e}" + ) + return {} + + def download_video_ytdlp( url: str, output_path: str, netscape_cookies_path: str = None, ) -> bool: + """ + Download a TikTok video using yt-dlp with browser impersonation. + Accepts a Netscape-format cookie file path (not JSON). + """ impersonate = get_best_impersonation_target() ydl_opts = { @@ -631,10 +685,8 @@ def download_video_ytdlp( "no_warnings": False, "merge_output_format": "mp4", } - if netscape_cookies_path and os.path.exists(netscape_cookies_path): ydl_opts["cookiefile"] = netscape_cookies_path - if impersonate is not None: ydl_opts["impersonate"] = impersonate @@ -665,7 +717,9 @@ def download_video( netscape_cookies_path: str = None, ) -> bool: logging.info(f"⬇️ Downloading: {url}") - return download_video_ytdlp(url, output_path, netscape_cookies_path=netscape_cookies_path) + return download_video_ytdlp( + url, output_path, netscape_cookies_path=netscape_cookies_path + ) # ───────────────────────────────────────────────────────────────────────────── @@ -692,14 +746,12 @@ def upload_video_to_bluesky( except Exception as e: err_detail = f"{type(e).__name__}: {e}" - if attempt >= BSKY_UPLOAD_MAX_RETRIES: logging.error( f"❌ Blob upload failed after {BSKY_UPLOAD_MAX_RETRIES} attempts: " f"{err_detail}" ) return None - logging.warning( f"⚠️ Blob upload attempt {attempt}/{BSKY_UPLOAD_MAX_RETRIES} " f"failed: {err_detail}. Retrying in {delay:.1f}s..." @@ -734,6 +786,43 @@ def post_video_to_bluesky( return False +# ───────────────────────────────────────────────────────────────────────────── +# Caption builder +# ───────────────────────────────────────────────────────────────────────────── +def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str: + """ + Build a Bluesky post caption from video metadata. + + Format: + + + + If description + URL exceeds 290 chars, the description is trimmed at + the last whitespace boundary before the limit to avoid cutting mid-word + or mid-hashtag. + """ + desc = (video_info.get("description") or "").strip() + url = video_info.get("url", "").strip() + + if not desc: + # No caption available — just post the URL + return url + + # Reserve space for newline + URL + url_block = f"\n{url}" + max_desc = max_len - len(url_block) + + if len(desc) > max_desc: + trimmed = desc[:max_desc - 1] + cut = trimmed.rfind(" ") + # Only use word boundary if it doesn't cut off too much + if cut > max_desc // 2: + trimmed = trimmed[:cut] + desc = trimmed + "…" + + return f"{desc}{url_block}" + + # ───────────────────────────────────────────────────────────────────────────── # TikTok scraping — Playwright # ───────────────────────────────────────────────────────────────────────────── @@ -801,9 +890,10 @@ def _run_playwright_scrape_loop(page, profile_url: str, limit: int) -> list[dict else f"https://www.tiktok.com{link}" ) videos.append({ - "video_id": video_id, - "url": full_url, - "timestamp": None, + "video_id": video_id, + "url": full_url, + "timestamp": None, + "description": "", }) except Exception: pass @@ -864,7 +954,6 @@ def scrape_tiktok_profile_playwright( ), viewport={"width": 1280, "height": 900}, locale="es-ES", - # Mask automation signals at the context level extra_http_headers={ "Accept-Language": "es-ES,es;q=0.9,en;q=0.8", }, @@ -873,7 +962,7 @@ def scrape_tiktok_profile_playwright( inject_cookies_into_context(context, cookies) page = context.new_page() - # Apply stealth v1.x if available; skip v2.x entirely + # Apply stealth v1.x if available; skip v2.x entirely (unstable API) if _STEALTH_SYNC is not None: try: _STEALTH_SYNC(page) @@ -928,6 +1017,9 @@ def scrape_tiktok_profile_ytdlp( """ Fallback: use yt-dlp to extract the video list from a TikTok profile. Accepts a Netscape-format cookie file path (not JSON). + + Note: flat playlist extraction gives us basic metadata (title, timestamp) + but not the full description — that is fetched per-video in process_videos(). """ import yt_dlp @@ -970,10 +1062,13 @@ def scrape_tiktok_profile_ytdlp( url = f"https://www.tiktok.com/@{handle}/video/{vid_id}" vid_match = re.search(r"/video/(\d+)", url) if vid_match: + # description from flat extraction is usually just the title — + # the full caption is fetched per-video in process_videos() videos.append({ - "video_id": vid_match.group(1), - "url": url, - "timestamp": entry.get("timestamp"), + "video_id": vid_match.group(1), + "url": url, + "timestamp": entry.get("timestamp"), + "description": (entry.get("description") or "").strip(), }) logging.info(f"✅ yt-dlp fallback produced {len(videos)} usable videos.") @@ -984,21 +1079,6 @@ def scrape_tiktok_profile_ytdlp( return [] -# ───────────────────────────────────────────────────────────────────────────── -# Caption builder -# ───────────────────────────────────────────────────────────────────────────── -def build_caption(video_info: dict, tiktok_handle: str, max_len: int = 290) -> str: - desc = (video_info.get("description") or "").strip() - url = video_info.get("url", "") - if desc: - url_len = len(url) + 1 - max_desc = max_len - url_len - if len(desc) > max_desc: - desc = desc[: max_desc - 1] + "…" - return f"{desc}\n{url}" - return url - - # ───────────────────────────────────────────────────────────────────────────── # Main processing loop # ───────────────────────────────────────────────────────────────────────────── @@ -1012,6 +1092,14 @@ def process_videos( max_age_days: int, video_max_size_bytes: int, ) -> int: + """ + For each new video: + 0. Fetch full metadata (description/caption) via yt-dlp + 1. Download the video file + 2. Compress to fit within the PDS size limit + 3. Upload blob to Bluesky + 4. Create the post with caption + URL + """ posted_count = 0 now = arrow.utcnow() @@ -1023,6 +1111,7 @@ def process_videos( logging.info(f"⏭️ Already posted: {video_id}") continue + # Age filter (only when timestamp is available) ts = video.get("timestamp") if ts: try: @@ -1039,6 +1128,19 @@ def process_videos( logging.info(f"🎬 Processing video {video_id}: {video_url}") + # ── 0. Fetch full metadata if description not already populated ─── + if not video.get("description"): + logging.info(f"🔍 Fetching metadata for {video_id}...") + meta = fetch_video_metadata_ytdlp( + video_url, + netscape_cookies_path=netscape_cookies_path, + ) + if meta: + video["description"] = meta.get("description", "") + # Backfill timestamp if we didn't have one from scraping + if not video.get("timestamp") and meta.get("timestamp"): + video["timestamp"] = meta["timestamp"] + with tempfile.TemporaryDirectory() as tmpdir: raw_path = os.path.join(tmpdir, f"{video_id}_raw.mp4") comp_path = os.path.join(tmpdir, f"{video_id}.mp4") @@ -1053,7 +1155,9 @@ def process_videos( continue # 2. Compress - ok = compress_video(raw_path, comp_path, max_size_bytes=video_max_size_bytes) + ok = compress_video( + raw_path, comp_path, max_size_bytes=video_max_size_bytes + ) if not ok: logging.error(f"❌ Compression failed for {video_id}. Skipping.") continue @@ -1066,6 +1170,7 @@ def process_videos( # 4. Post caption = build_caption(video, tiktok_handle) + logging.info(f"📝 Caption preview: {caption[:120]!r}") ok = post_video_to_bluesky(client, blob, caption, langs, video_id) if ok: mark_as_posted(video_id, state, meta={"url": video_url}) @@ -1122,7 +1227,9 @@ def main(): logging.info("=" * 60) state = load_state() - client = connect_bluesky(args.bsky_handle, args.bsky_app_password, args.bsky_base_url) + client = connect_bluesky( + args.bsky_handle, args.bsky_app_password, args.bsky_base_url + ) # Convert JSON cookies → Netscape format once for all yt-dlp calls netscape_cookies_path = convert_json_cookies_to_netscape(args.cookies_path) @@ -1178,6 +1285,7 @@ def main(): logging.info("=" * 60) finally: + # Always clean up the temporary Netscape cookie file if netscape_cookies_path and os.path.exists(netscape_cookies_path): try: os.remove(netscape_cookies_path) @@ -1185,7 +1293,9 @@ def main(): f"🧹 Removed temporary Netscape cookie file: {netscape_cookies_path}" ) except Exception as e: - logging.warning(f"⚠️ Could not remove Netscape cookie file: {e}") + logging.warning( + f"⚠️ Could not remove Netscape cookie file: {e}" + ) if __name__ == "__main__":