diff --git a/bsky_post.py b/bsky_post.py index 680fe93..48c99d3 100644 --- a/bsky_post.py +++ b/bsky_post.py @@ -1,227 +1,161 @@ #!/usr/bin/env python3 """ -bsky_post.py β€” Post text + optional image or video to Bluesky/federated PDS. +Post text + optional image/video to Bluesky/federated PDS. -Examples: - python3 bsky_post.py "DIVENDRES!!!!" --video media/divendres.mp4 --username you --password app-pass --service https://eurosky.social - python3 bsky_post.py "Dijous!!!!" --image media/dijous.jpg --username you --password app-pass --service https://eurosky.social - python3 bsky_post.py "Bon dia!" --username you --password app-pass --service https://eurosky.social +Strategy: +- 100% raw XRPC via `requests` β€” no atproto SDK dependency at all. +- Avoids all SDK BlobRef/typed-model serialization bugs. +- Video uploads go through https://video.bsky.app (official path). +- getServiceAuth uses aud=did:web:, lxm=com.atproto.repo.uploadBlob. +- Handles 409 already_exists by reusing jobId. +- ffmpeg compression enabled by default. """ import argparse +import json import logging import mimetypes import os import random -import re import secrets +import shutil import string +import subprocess import sys +import tempfile import time -from dataclasses import dataclass from urllib.parse import urlparse import requests -from atproto import Client, models -# ============================================================ -# Config -# ============================================================ -@dataclass(frozen=True) -class RetryConfig: - login_max_attempts: int = 5 - login_base_delay_seconds: float = 10.0 - login_max_delay_seconds: float = 600.0 - login_jitter_seconds: float = 3.0 - - -# ============================================================ +# --------------------------- # Logging -# ============================================================ +# --------------------------- def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) + # Silence noisy third-party libs + for noisy in ("httpx", "httpcore", "urllib3"): + logging.getLogger(noisy).setLevel(logging.WARNING) -# ============================================================ -# Error helpers -# ============================================================ -def is_rate_limited_error(error_obj) -> bool: - text = repr(error_obj).lower() - return ( - "429" in text - or "ratelimitexceeded" in text - or "too many requests" in text - or "rate limit" in text - ) +# --------------------------- +# Error classification +# --------------------------- +def is_auth_error_status(status: int) -> bool: + return status in (400, 401, 403) -def is_auth_error(error_obj) -> bool: - text = repr(error_obj).lower() - return ( - "401" in text - or "403" in text - or "invalid identifier or password" in text - or "authenticationrequired" in text - or "invalidtoken" in text - ) +def is_rate_limited_status(status: int) -> bool: + return status == 429 -def is_network_error(error_obj) -> bool: - text = repr(error_obj) - signals = [ - "ConnectError", - "RemoteProtocolError", - "ReadTimeout", - "WriteTimeout", - "TimeoutException", - "503", - "502", - "504", - "ConnectionResetError", - ] - return any(sig in text for sig in signals) +def is_transient_status(status: int) -> bool: + return status in (500, 502, 503, 504) -def is_transient_error(error_obj) -> bool: - error_text = repr(error_obj) - transient_signals = [ - "InvokeTimeoutError", - "ReadTimeout", - "WriteTimeout", - "TimeoutException", - "RemoteProtocolError", - "ConnectError", - "503", - "502", - "504", - ] - return any(signal in error_text for signal in transient_signals) - - -def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: +# --------------------------- +# Raw XRPC helpers +# --------------------------- +def xrpc_get(pds_url: str, method: str, params: dict, access_jwt: str | None = None, timeout: int = 30) -> tuple[int, dict | str]: + url = f"{pds_url.rstrip('/')}/xrpc/{method}" + headers = {} + if access_jwt: + headers["Authorization"] = f"Bearer {access_jwt}" + r = requests.get(url, headers=headers, params=params, timeout=timeout) try: - now_ts = int(time.time()) - headers = getattr(error_obj, "headers", None) or {} - - retry_after = headers.get("retry-after") or headers.get("Retry-After") - if retry_after: - return min(max(float(retry_after), 1.0), max_delay) - - x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") - if x_after: - return min(max(float(x_after), 1.0), max_delay) - - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) - return min(wait_seconds, max_delay) + return r.status_code, r.json() except Exception: - pass - - text = repr(error_obj) - m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) - if m: - return min(max(float(m.group(1)), 1.0), max_delay) - - m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) - if m: - return min(max(float(m.group(1)), 1.0), max_delay) - - m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) - if m: - now_ts = int(time.time()) - wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) - return min(wait_seconds, max_delay) - - return default_delay + return r.status_code, r.text -# ============================================================ -# Login with backoff -# ============================================================ +def xrpc_post_json(pds_url: str, method: str, body: dict, access_jwt: str | None = None, timeout: int = 60) -> tuple[int, dict | str]: + url = f"{pds_url.rstrip('/')}/xrpc/{method}" + headers = {"Content-Type": "application/json"} + if access_jwt: + headers["Authorization"] = f"Bearer {access_jwt}" + r = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout) + try: + return r.status_code, r.json() + except Exception: + return r.status_code, r.text + + +def xrpc_post_bytes(pds_url: str, method: str, data: bytes, content_type: str, access_jwt: str | None = None, timeout: int = 600) -> tuple[int, dict | str]: + url = f"{pds_url.rstrip('/')}/xrpc/{method}" + headers = {"Content-Type": content_type} + if access_jwt: + headers["Authorization"] = f"Bearer {access_jwt}" + r = requests.post(url, headers=headers, data=data, timeout=timeout) + try: + return r.status_code, r.json() + except Exception: + return r.status_code, r.text + + +# --------------------------- +# Login (raw XRPC, no SDK) +# --------------------------- def login_with_backoff( - client: Client, - username: str, + pds_url: str, + identifier: str, password: str, - service_url: str, max_attempts: int = 5, - base_delay: float = 10.0, - max_delay: float = 600.0, - jitter: float = 1.5, -) -> bool: + base_delay: float = 8.0, + max_delay: float = 120.0, +) -> dict | None: + """ + Returns session dict {did, accessJwt, refreshJwt, handle} or None. + """ for attempt in range(1, max_attempts + 1): try: - logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") - client.login(username, password) - logging.info("βœ… Login successful.") - return True + logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {pds_url} as {identifier}") + status, body = xrpc_post_json( + pds_url=pds_url, + method="com.atproto.server.createSession", + body={"identifier": identifier, "password": password}, + timeout=30, + ) - except Exception as e: - logging.exception("❌ Login exception") + if status == 200 and isinstance(body, dict) and body.get("accessJwt"): + logging.info("βœ… Login successful.") + return body - if is_auth_error(e): - logging.error("❌ Bad credentials. Check handle/password.") - return False + logging.error(f"❌ Login failed: HTTP {status} body={body}") - if is_rate_limited_error(e): - if attempt < max_attempts: - wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) - wait += random.uniform(0, jitter) - logging.warning(f"⏳ Rate-limited. Retrying in {wait:.1f}s...") - time.sleep(wait) - continue - logging.error("❌ Exhausted login retries due to rate limiting.") - return False - - if is_network_error(e) or is_transient_error(e): - if attempt < max_attempts: - wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) - logging.warning(f"⏳ Transient error. Retrying in {wait:.1f}s...") - time.sleep(wait) - continue - logging.error("❌ Exhausted login retries after transient/network errors.") - return False - - if attempt < max_attempts: - wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) - logging.warning(f"⏳ Unknown login error. Retrying in {wait:.1f}s...") + if is_auth_error_status(status): + logging.error("❌ Authentication failed. Check handle/app-password.") + return None + if attempt >= max_attempts: + return None + if is_rate_limited_status(status) or is_transient_status(status): + wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) + logging.warning(f"⏳ Retrying login in {wait:.1f}s...") time.sleep(wait) - continue - - return False + else: + return None + except requests.RequestException as e: + logging.exception("❌ Login request error") + if attempt >= max_attempts: + return None + wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) + logging.warning(f"⏳ Retrying login in {wait:.1f}s...") + time.sleep(wait) + return None -# ============================================================ -# Utility -# ============================================================ -def detect_mime_type(path: str) -> str: - mime, _ = mimetypes.guess_type(path) - if mime: - return mime - ext = os.path.splitext(path)[1].lower() - fallbacks = { - ".jpg": "image/jpeg", - ".jpeg": "image/jpeg", - ".png": "image/png", - ".gif": "image/gif", - ".webp": "image/webp", - ".mp4": "video/mp4", - ".mov": "video/quicktime", - ".webm": "video/webm", - } - return fallbacks.get(ext, "application/octet-stream") - - -def wait_with_heartbeat(total_seconds: float, label: str = "processing") -> None: - if total_seconds <= 0: +# --------------------------- +# Generic helpers +# --------------------------- +def wait_with_heartbeat(seconds: float, label: str) -> None: + if seconds <= 0: return - logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") - remaining = total_seconds + logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...") + remaining = seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) @@ -243,238 +177,361 @@ def random_video_name(ext: str = ".mp4") -> str: return f"{int(time.time())}_{token}{ext}" -def model_to_dict(obj): - if obj is None: +def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, exp_seconds: int = 1800) -> str | None: + status, body = xrpc_get( + pds_url=pds_url, + method="com.atproto.server.getServiceAuth", + params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds}, + access_jwt=access_jwt, + timeout=30, + ) + if status != 200 or not isinstance(body, dict): + logging.error(f"❌ getServiceAuth failed: HTTP {status} body={body}") return None - if hasattr(obj, "model_dump"): - return obj.model_dump(by_alias=True, exclude_none=True) - if hasattr(obj, "dict"): - return obj.dict(by_alias=True, exclude_none=True) - return obj + token = body.get("token") + if not token: + logging.error(f"❌ getServiceAuth returned no token: {body}") + return None + return token -# ============================================================ -# Media upload β€” Image -# ============================================================ -def upload_image( - client: Client, +# --------------------------- +# ffmpeg compression +# --------------------------- +def ffmpeg_exists() -> bool: + return shutil.which("ffmpeg") is not None + + +def ffprobe_exists() -> bool: + return shutil.which("ffprobe") is not None + + +def get_video_duration_seconds(path: str) -> float | None: + if not ffprobe_exists(): + return None + try: + out = subprocess.check_output( + [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + path, + ], + stderr=subprocess.STDOUT, + text=True, + ).strip() + return float(out) + except Exception: + return None + + +def compress_video_ffmpeg( + input_path: str, + max_size_mb: float = 45.0, + crf: int = 28, + preset: str = "veryfast", + audio_bitrate_k: int = 96, +) -> str | None: + if not ffmpeg_exists(): + logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") + return None + if not os.path.exists(input_path): + logging.error(f"❌ Video file not found: {input_path}") + return None + + src_size_mb = os.path.getsize(input_path) / (1024 * 1024) + logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") + if src_size_mb <= max_size_mb: + logging.info("βœ… Already below target size. Skipping compression.") + return input_path + + duration = get_video_duration_seconds(input_path) + target_video_k = 1200 + if duration and duration > 0: + total_kbps = (max_size_mb * 8192.0) / duration + target_video_k = int(max(300, total_kbps - audio_bitrate_k)) + target_video_k = min(max(target_video_k, 300), 5000) + + fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") + os.close(fd) + + cmd = [ + "ffmpeg", "-y", + "-i", input_path, + "-c:v", "libx264", + "-preset", preset, + "-crf", str(crf), + "-b:v", f"{target_video_k}k", + "-maxrate", f"{int(target_video_k * 1.3)}k", + "-bufsize", f"{int(target_video_k * 2)}k", + "-vf", "scale='min(1280,iw)':-2", + "-c:a", "aac", + "-b:a", f"{audio_bitrate_k}k", + "-movflags", "+faststart", + out_path, + ] + + try: + logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") + subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + out_size_mb = os.path.getsize(out_path) / (1024 * 1024) + logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") + if out_size_mb < src_size_mb: + return out_path + os.remove(out_path) + logging.info("ℹ️ Compression not smaller than source. Using original.") + return input_path + except subprocess.CalledProcessError as e: + logging.error("❌ ffmpeg compression failed.") + if e.stderr: + logging.error(e.stderr[-2000:]) + try: + os.remove(out_path) + except Exception: + pass + return None + + +# --------------------------- +# Image upload +# --------------------------- +def upload_image_embed_dict( + pds_url: str, + access_jwt: str, image_path: str, alt_text: str = "", -) -> models.AppBskyEmbedImages.Image | None: - try: - if not os.path.exists(image_path): - logging.error(f"❌ Image file not found: {image_path}") - return None - - mime = detect_mime_type(image_path) - with open(image_path, "rb") as f: - data = f.read() - - logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") - response = client.upload_blob(data) - logging.info("βœ… Image uploaded successfully.") - - return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text) - - except Exception as e: - logging.error(f"❌ Failed to upload image: {repr(e)}") +) -> dict | None: + if not os.path.exists(image_path): + logging.error(f"❌ Image file not found: {image_path}") return None + mime, _ = mimetypes.guess_type(image_path) + mime = mime or "image/jpeg" + with open(image_path, "rb") as f: + data = f.read() -# ============================================================ -# Media upload β€” Video via PDS direct fallback -# ============================================================ -def upload_video_via_pds( - client: Client, + logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") + status, body = xrpc_post_bytes( + pds_url=pds_url, + method="com.atproto.repo.uploadBlob", + data=data, + content_type=mime, + access_jwt=access_jwt, + timeout=180, + ) + if status != 200 or not isinstance(body, dict): + logging.error(f"❌ Image uploadBlob failed: HTTP {status} body={body}") + return None + blob = body.get("blob") + if not blob: + logging.error(f"❌ uploadBlob returned no blob: {body}") + return None + return { + "$type": "app.bsky.embed.images", + "images": [{"alt": alt_text or "", "image": blob}], + } + + +# --------------------------- +# Video upload via video.bsky.app +# --------------------------- +def upload_video_via_video_service_embed_dict( + did: str, + access_jwt: str, + pds_url: str, video_path: str, + service_url: str, alt_text: str = "", - settle_delay_seconds: float = 30.0, -) -> models.AppBskyEmbedVideo.Main | None: - """ - Direct upload to home PDS via upload_blob. - Fallback only. Can be less reliable for playback in clients. - """ - try: - if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") - return None - - with open(video_path, "rb") as f: - video_bytes = f.read() - - size_mb = len(video_bytes) / (1024 * 1024) - logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") - - response = client.upload_blob(video_bytes) - blob = response.blob - logging.warning("⚠️ [PDS-direct fallback] Blob uploaded. Waiting for indexing...") - - wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing") - - return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text) - - except Exception as e: - logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") +) -> dict | None: + if not os.path.exists(video_path): + logging.error(f"❌ Video file not found: {video_path}") return None + with open(video_path, "rb") as f: + video_bytes = f.read() -# ============================================================ -# Media upload β€” Video via video.bsky.app (primary) -# ============================================================ -def _extract_service_auth_token(upload_auth) -> str | None: - token = getattr(upload_auth, "token", None) - if token: - return token - if isinstance(upload_auth, dict): - return upload_auth.get("token") - return None + size_mb = len(video_bytes) / (1024 * 1024) + logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") + video_host = "https://video.bsky.app" + pds_did = pds_did_from_service_url(service_url) + + token = get_service_auth_token( + pds_url=pds_url, + access_jwt=access_jwt, + aud=pds_did, + lxm="com.atproto.repo.uploadBlob", + exp_seconds=1800, + ) + if not token: + return None + + upload_name = random_video_name(".mp4") + logging.info(f"🎞️ Upload name: {upload_name}") + + upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "video/mp4", + } + + try: + r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=300) + except requests.RequestException as e: + logging.error(f"❌ video upload request failed: {repr(e)}") + return None + + if r.status_code not in (200, 409): + logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}") + return None + + try: + payload = r.json() + except Exception as e: + logging.error(f"❌ Could not parse upload response JSON: {repr(e)} body={r.text[:500]}") + return None + + if r.status_code == 409: + if payload.get("error") == "already_exists" and payload.get("jobId"): + logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") + else: + logging.error(f"❌ 409 without reusable jobId: {payload}") + return None + + job_id = payload.get("jobId") + if not job_id: + logging.error(f"❌ No jobId in video upload response: {payload}") + return None + + logging.info(f"⏳ Job {job_id} accepted β€” polling status...") -def _poll_video_job(video_host: str, job_id: str) -> models.AppBskyEmbedVideo.Main | None: status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" - deadline = time.time() + 600 # up to 10 minutes + deadline = time.time() + 600 # 10 min max poll + last_state = None + completed_blob = None while time.time() < deadline: - status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) - if status_resp.status_code != 200: - logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}") + try: + s = requests.get( + status_url, + params={"jobId": job_id}, + headers={"Authorization": f"Bearer {token}"}, + timeout=30, + ) + except requests.RequestException as e: + logging.warning(f"⚠️ Status poll request failed once: {repr(e)}") + time.sleep(3) + continue + + if s.status_code != 200: + logging.error(f"❌ Job status failed: {s.status_code} - {s.text}") return None - status_json = status_resp.json() - job_status = status_json.get("jobStatus", {}) + try: + body = s.json() + except Exception as e: + logging.error(f"❌ Could not parse status JSON: {repr(e)}") + return None + + job_status = body.get("jobStatus", {}) or {} state = job_status.get("state") - if state == "JOB_STATE_COMPLETED": - blob_dict = job_status.get("blob") - if not blob_dict: - logging.error(f"❌ No blob in completed job status: {status_json}") - return None + if state != last_state: + logging.info(f" state β†’ {state}") + last_state = state - wait_with_heartbeat(8, label="CDN propagation") - blob_ref = models.BlobRef.from_dict(blob_dict) - logging.info("βœ… Video processed successfully.") - return models.AppBskyEmbedVideo.Main(video=blob_ref, alt="") + if state == "JOB_STATE_COMPLETED": + completed_blob = job_status.get("blob") + if not completed_blob: + logging.error(f"❌ Completed job without blob: {body}") + return None + break if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None - logging.info(f" ...still processing (state={state})...") time.sleep(3) - logging.error("❌ Video processing timed out.") - return None - - -def upload_video_via_bsky_service( - client: Client, - video_path: str, - service_url: str, - alt_text: str = "", -) -> models.AppBskyEmbedVideo.Main | None: - """ - Upload via centralized video.bsky.app service. - - Critical compatibility fixes: - - aud must be user's PDS DID (e.g. did:web:eurosky.social) - - lxm must be com.atproto.repo.uploadBlob - - handle 409 already_exists by reusing returned jobId - """ - try: - if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") - return None - - with open(video_path, "rb") as f: - video_bytes = f.read() - - size_mb = len(video_bytes) / (1024 * 1024) - logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") - - VIDEO_HOST = "https://video.bsky.app" - pds_did = pds_did_from_service_url(service_url) - - try: - params = models.ComAtprotoServerGetServiceAuth.Params( - aud=pds_did, - lxm="com.atproto.repo.uploadBlob", - exp=int(time.time()) + 60 * 30, - ) - upload_auth = client.com.atproto.server.get_service_auth(params) - except Exception: - upload_auth = client.com.atproto.server.get_service_auth( - { - "aud": pds_did, - "lxm": "com.atproto.repo.uploadBlob", - "exp": int(time.time()) + 60 * 30, - } - ) - - token = _extract_service_auth_token(upload_auth) - if not token: - logging.error("❌ Failed to extract service auth token.") - return None - - user_did = client.me.did - upload_name = random_video_name(".mp4") - logging.info(f"🎞️ Upload name: {upload_name}") - - upload_url = ( - f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" - f"?did={user_did}&name={upload_name}" - ) - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "video/mp4", - } - - upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) - - if upload_resp.status_code not in (200, 409): - logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") - return None - - body = upload_resp.json() - - if upload_resp.status_code == 409: - if body.get("error") == "already_exists" and body.get("jobId"): - logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") - else: - logging.error(f"❌ video.bsky.app returned 409 without reusable jobId: {body}") - return None - - job_id = body.get("jobId") - if not job_id: - logging.error(f"❌ No jobId returned from video service. Response: {body}") - return None - - logging.info(f"⏳ Job {job_id} accepted β€” polling status...") - embed = _poll_video_job(VIDEO_HOST, job_id) - if not embed: - return None - - # inject alt text after job result - return models.AppBskyEmbedVideo.Main(video=embed.video, alt=alt_text) - - except Exception as e: - logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") + if completed_blob is None: + logging.error("❌ Video processing timed out.") return None + # Wait AFTER successful poll β€” outside the loop, no SDK in scope + wait_with_heartbeat(8, "CDN propagation") -# ============================================================ -# Video dispatcher -# ============================================================ -def upload_video_smart( - client: Client, + # Build pure dict embed + embed = { + "$type": "app.bsky.embed.video", + "video": completed_blob, + } + if alt_text: + embed["alt"] = alt_text + return embed + + +# --------------------------- +# Direct PDS video fallback +# --------------------------- +def upload_video_via_pds_embed_dict( + pds_url: str, + access_jwt: str, + video_path: str, + alt_text: str = "", + settle_delay_seconds: float = 30.0, +) -> dict | None: + if not os.path.exists(video_path): + logging.error(f"❌ Video file not found: {video_path}") + return None + + with open(video_path, "rb") as f: + b = f.read() + size_mb = len(b) / (1024 * 1024) + logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") + + status, body = xrpc_post_bytes( + pds_url=pds_url, + method="com.atproto.repo.uploadBlob", + data=b, + content_type="video/mp4", + access_jwt=access_jwt, + timeout=900, + ) + if status != 200 or not isinstance(body, dict): + logging.error(f"❌ PDS uploadBlob failed: HTTP {status} body={body}") + return None + + blob = body.get("blob") + if not blob: + logging.error(f"❌ PDS uploadBlob returned no blob: {body}") + return None + + wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") + + embed = { + "$type": "app.bsky.embed.video", + "video": blob, + } + if alt_text: + embed["alt"] = alt_text + return embed + + +def upload_video_smart_embed_dict( + did: str, + access_jwt: str, + pds_url: str, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, -) -> models.AppBskyEmbedVideo.Main | None: +) -> dict | None: logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") - embed = upload_video_via_bsky_service( - client=client, + embed = upload_video_via_video_service_embed_dict( + did=did, + access_jwt=access_jwt, + pds_url=pds_url, video_path=video_path, service_url=service_url, alt_text=alt_text, @@ -484,127 +541,77 @@ def upload_video_smart( if allow_pds_video_fallback: logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") - return upload_video_via_pds( - client=client, + return upload_video_via_pds_embed_dict( + pds_url=pds_url, + access_jwt=access_jwt, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) - logging.error("❌ video.bsky.app failed. Not using direct fallback unless enabled.") + logging.error("❌ video.bsky.app failed and fallback is disabled.") return None -# ============================================================ -# Post creation (explicit record to guarantee text string) -# ============================================================ -def post_to_bsky( - client: Client, - text: str, - langs: list[str], - image_path: str | None = None, - video_path: str | None = None, - alt_text: str = "", - service_url: str = "https://bsky.social", - video_settle_delay: float = 30.0, - allow_pds_video_fallback: bool = False, -) -> bool: - post_text = text.strip() +# --------------------------- +# Create post +# --------------------------- +def create_post_record(text: str, langs: list[str], embed_dict: dict | None = None) -> dict: + record = { + "$type": "app.bsky.feed.post", + "text": text.strip(), + "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), + } + if langs: + record["langs"] = langs + if embed_dict is not None: + record["embed"] = embed_dict + return record - if not post_text and not image_path and not video_path: - logging.error("❌ Empty post text with no media is not allowed.") - return False - - try: - embed_obj = None - - if video_path: - logging.info(f"🎬 Preparing video upload: {video_path}") - embed_obj = upload_video_smart( - client=client, - video_path=video_path, - service_url=service_url, - alt_text=alt_text, - settle_delay_seconds=video_settle_delay, - allow_pds_video_fallback=allow_pds_video_fallback, - ) - if not embed_obj: - logging.error("❌ Aborting post: video upload/processing failed.") - return False - - elif image_path: - image = upload_image(client, image_path, alt_text=alt_text) - if not image: - logging.error("❌ Aborting post: image upload failed.") - return False - embed_obj = models.AppBskyEmbedImages.Main(images=[image]) - - record = { - "$type": "app.bsky.feed.post", - "text": post_text, - "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), - } - - if langs: - record["langs"] = langs - - if embed_obj is not None: - record["embed"] = model_to_dict(embed_obj) - - logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") - - try: - resp = client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Data( - repo=client.me.did, - collection="app.bsky.feed.post", - record=record, - ) - ) - except Exception: - resp = client.com.atproto.repo.create_record( - { - "repo": client.me.did, - "collection": "app.bsky.feed.post", - "record": record, - } - ) - - uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) - logging.info(f"βœ… Post published! URI: {uri}") - return True - - except Exception as e: - logging.error(f"❌ Failed to send post: {repr(e)}") + +def publish_post(pds_url: str, access_jwt: str, did: str, record: dict) -> bool: + status, body = xrpc_post_json( + pds_url=pds_url, + method="com.atproto.repo.createRecord", + body={ + "repo": did, + "collection": "app.bsky.feed.post", + "record": record, + }, + access_jwt=access_jwt, + timeout=60, + ) + if status != 200 or not isinstance(body, dict): + logging.error(f"❌ createRecord failed: HTTP {status} body={body}") return False + uri = body.get("uri") + logging.info(f"βœ… Post published! URI: {uri}") + return True -# ============================================================ -# CLI -# ============================================================ +# --------------------------- +# Main +# --------------------------- def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") - parser.add_argument("text", help="Post text content") - parser.add_argument("--username", required=True, help="Bluesky handle or email") - parser.add_argument("--password", required=True, help="Bluesky app password") + parser.add_argument("text", help="Post text") + parser.add_argument("--username", required=True, help="Handle/email") + parser.add_argument("--password", required=True, help="App password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") - parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") - parser.add_argument("--image", default=None, help="Path to image file") - parser.add_argument("--video", default=None, help="Path to video file") - parser.add_argument("--alt", default="", help="Alt text for media") - parser.add_argument( - "--video-settle-delay", - type=float, - default=30.0, - help="Seconds to wait after direct-PDS fallback upload before posting.", - ) - parser.add_argument( - "--allow-pds-video-fallback", - action="store_true", - help="Allow direct PDS video fallback if video.bsky.app fails.", - ) + parser.add_argument("--lang", default="ca", help="Comma-separated language codes") + parser.add_argument("--image", default=None, help="Image path") + parser.add_argument("--video", default=None, help="Video path") + parser.add_argument("--alt", default="", help="Alt text") + parser.add_argument("--video-settle-delay", type=float, default=30.0) + parser.add_argument("--allow-pds-video-fallback", action="store_true") + + parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) + parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") + parser.add_argument("--max-video-mb", type=float, default=45.0) + parser.add_argument("--ffmpeg-crf", type=int, default=28) + parser.add_argument("--ffmpeg-preset", default="veryfast") args = parser.parse_args() @@ -612,38 +619,86 @@ def main(): logging.error("❌ Use either --image or --video, not both.") sys.exit(1) - client = Client(base_url=args.service) + pds_url = args.service.rstrip("/") - success = login_with_backoff( - client=client, - username=args.username, + session = login_with_backoff( + pds_url=pds_url, + identifier=args.username, password=args.password, - service_url=args.service, - max_attempts=RetryConfig.login_max_attempts, - base_delay=RetryConfig.login_base_delay_seconds, - max_delay=RetryConfig.login_max_delay_seconds, - jitter=RetryConfig.login_jitter_seconds, ) - if not success: + if not session: sys.exit(1) - langs = [l.strip() for l in args.lang.split(",") if l.strip()] + did = session["did"] + access_jwt = session["accessJwt"] + logging.info(f"πŸ†” DID: {did} | PDS: {pds_url}") - post_success = post_to_bsky( - client=client, - text=args.text, - langs=langs, - image_path=args.image, - video_path=args.video, - alt_text=args.alt, - service_url=args.service, - video_settle_delay=args.video_settle_delay, - allow_pds_video_fallback=args.allow_pds_video_fallback, - ) + langs = [x.strip() for x in args.lang.split(",") if x.strip()] - if not post_success: + video_path_for_upload = args.video + temp_compressed_path = None + + if args.video and args.compress_video: + compressed = compress_video_ffmpeg( + input_path=args.video, + max_size_mb=args.max_video_mb, + crf=args.ffmpeg_crf, + preset=args.ffmpeg_preset, + audio_bitrate_k=96, + ) + if compressed is None: + logging.error("❌ Compression failed; aborting.") + sys.exit(1) + video_path_for_upload = compressed + if compressed != args.video: + temp_compressed_path = compressed + + embed_dict = None + + if video_path_for_upload: + logging.info(f"🎬 Preparing video upload: {video_path_for_upload}") + embed_dict = upload_video_smart_embed_dict( + did=did, + access_jwt=access_jwt, + pds_url=pds_url, + video_path=video_path_for_upload, + service_url=pds_url, + alt_text=args.alt, + settle_delay_seconds=args.video_settle_delay, + allow_pds_video_fallback=args.allow_pds_video_fallback, + ) + if embed_dict is None: + logging.error("❌ Aborting post: video upload/processing failed.") + if temp_compressed_path and os.path.exists(temp_compressed_path): + os.remove(temp_compressed_path) + sys.exit(1) + + elif args.image: + embed_dict = upload_image_embed_dict( + pds_url=pds_url, + access_jwt=access_jwt, + image_path=args.image, + alt_text=args.alt, + ) + if embed_dict is None: + logging.error("❌ Aborting post: image upload failed.") + sys.exit(1) + + record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) + logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") + + ok = publish_post(pds_url=pds_url, access_jwt=access_jwt, did=did, record=record) + + if temp_compressed_path and os.path.exists(temp_compressed_path): + try: + os.remove(temp_compressed_path) + logging.info(f"🧹 Removed temp file: {temp_compressed_path}") + except Exception: + pass + + if not ok: sys.exit(1) if __name__ == "__main__": - main() \ No newline at end of file + main()