#!/usr/bin/env python3 """ bsky_post.py β€” Post text + optional image/video to Bluesky/federated PDS. This script is designed to be robust across different atproto SDK versions and federated PDS setups. It includes: - Login retry/backoff - Video upload via video.bsky.app (primary, recommended for playback compatibility) - Correct service auth parameters: - aud = did:web: - lxm = com.atproto.repo.uploadBlob - Handles 409 already_exists from video service by reusing jobId - Avoids hard dependency on models.BlobRef (cross-version compatibility) - Optional direct PDS video fallback - Optional ffmpeg compression (enabled by default) - Explicit createRecord payload with guaranteed plain-string "text" """ import argparse import logging import mimetypes import os import random import re import secrets import shutil import string import subprocess import sys import tempfile import time from dataclasses import dataclass from urllib.parse import urlparse import requests from atproto import Client, models # ----------------------------------------------------------------------------- # Retry configuration for login/backoff behavior # ----------------------------------------------------------------------------- @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3.0 # ----------------------------------------------------------------------------- # Logging setup # ----------------------------------------------------------------------------- def setup_logging() -> None: """Configure structured logging to stdout.""" logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ----------------------------------------------------------------------------- # Error classification helpers # ----------------------------------------------------------------------------- def is_rate_limited_error(error_obj) -> bool: """Detect common rate-limit signatures from exceptions.""" t = repr(error_obj).lower() return "429" in t or "ratelimit" in t or "too many requests" in t def is_auth_error(error_obj) -> bool: """Detect authentication/authorization failures.""" t = repr(error_obj).lower() return "401" in t or "403" in t or "invalid identifier or password" in t def is_network_error(error_obj) -> bool: """Detect transient network/transport/server errors.""" t = repr(error_obj) return any(x in t for x in [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", "InvokeTimeoutError" ]) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: """ Try to extract a smart retry delay from response headers, else return default. """ try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass return default_delay # ----------------------------------------------------------------------------- # Login with retries and backoff # ----------------------------------------------------------------------------- def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5 ) -> bool: """ Attempt login with exponential-ish backoff and jitter for transient failures. """ for attempt in range(1, max_attempts + 1): try: logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") client.login(username, password) logging.info("βœ… Login successful.") return True except Exception as e: logging.exception("❌ Login exception") if is_auth_error(e): logging.error("❌ Authentication failed.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, base_delay, max_delay) + random.uniform(0, jitter) logging.warning(f"⏳ Rate limited, retrying in {wait:.1f}s...") time.sleep(wait) continue return False if is_network_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"⏳ Network/transient error, retrying in {wait:.1f}s...") time.sleep(wait) continue return False return False # ----------------------------------------------------------------------------- # Generic utility helpers # ----------------------------------------------------------------------------- def wait_with_heartbeat(total_seconds: float, label: str) -> None: """ Sleep with periodic logging, useful for processing/indexing waits. """ if total_seconds <= 0: return logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") remaining = total_seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("βœ… Wait complete.") def pds_did_from_service_url(service_url: str) -> str: """ Convert service URL host to did:web: used as getServiceAuth.aud. Example: https://eurosky.social -> did:web:eurosky.social """ host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") return f"did:web:{host}" def random_video_name(ext: str = ".mp4") -> str: """Generate a random upload filename for video service requests.""" token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) return f"{int(time.time())}_{token}{ext}" def detect_mime_type(path: str) -> str: """Best-effort MIME type detection.""" mime, _ = mimetypes.guess_type(path) return mime or "application/octet-stream" def _extract_service_auth_token(upload_auth) -> str | None: """Extract token from typed/dict getServiceAuth responses.""" token = getattr(upload_auth, "token", None) if token: return token if isinstance(upload_auth, dict): return upload_auth.get("token") return None # ----------------------------------------------------------------------------- # ffmpeg compression helpers # ----------------------------------------------------------------------------- def ffmpeg_exists() -> bool: """Check if ffmpeg is installed.""" return shutil.which("ffmpeg") is not None def ffprobe_exists() -> bool: """Check if ffprobe is installed.""" return shutil.which("ffprobe") is not None def get_video_duration_seconds(path: str) -> float | None: """Read video duration via ffprobe; return None if unavailable.""" if not ffprobe_exists(): return None try: out = subprocess.check_output([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ], stderr=subprocess.STDOUT, text=True).strip() return float(out) except Exception: return None def compress_video_ffmpeg( input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96 ) -> str | None: """ Compress video to H.264/AAC MP4, attempting to fit target size. Returns: - compressed temp path (if smaller), - original input path (if already small / compressed not smaller), - None on failure. """ if not ffmpeg_exists(): logging.error("❌ ffmpeg not found. Install ffmpeg or use --no-compress-video.") return None if not os.path.exists(input_path): logging.error(f"❌ Video file not found: {input_path}") return None src_size_mb = os.path.getsize(input_path) / (1024 * 1024) logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") if src_size_mb <= max_size_mb: logging.info("βœ… Already below size target. Skipping compression.") return input_path duration = get_video_duration_seconds(input_path) target_video_k = 1200 # fallback target bitrate if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = min(max(target_video_k, 300), 5000) # Create a temporary output file for compressed video fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") os.close(fd) cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-b:v", f"{target_video_k}k", "-maxrate", f"{int(target_video_k * 1.3)}k", "-bufsize", f"{int(target_video_k * 2)}k", "-vf", "scale='min(1280,iw)':-2", "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", "-movflags", "+faststart", out_path, ] try: logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") # Keep compressed only if it is truly smaller if out_size_mb < src_size_mb: return out_path os.remove(out_path) return input_path except subprocess.CalledProcessError as e: logging.error("❌ ffmpeg compression failed.") if e.stderr: logging.error(e.stderr[-2000:]) try: os.remove(out_path) except Exception: pass return None # ----------------------------------------------------------------------------- # Video upload paths # ----------------------------------------------------------------------------- def upload_video_via_bsky_service(client: Client, video_path: str, service_url: str, alt_text: str = "") -> dict | None: """ Primary video upload path through https://video.bsky.app. IMPORTANT: - getServiceAuth.aud must be your PDS DID (did:web:) - getServiceAuth.lxm must be "com.atproto.repo.uploadBlob" Returns raw embed dict: {"$type":"app.bsky.embed.video","video":,"alt":"..."} """ if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) # Request service-auth token from user's PDS for uploadBlob lexicon method try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=pds_did, lxm="com.atproto.repo.uploadBlob", exp=int(time.time()) + 60 * 30, ) upload_auth = client.com.atproto.server.get_service_auth(params) except Exception: # Fallback for SDK variants expecting dict input upload_auth = client.com.atproto.server.get_service_auth({ "aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 60 * 30, }) token = _extract_service_auth_token(upload_auth) if not token: logging.error("❌ Could not get service auth token.") return None upload_name = random_video_name(".mp4") logging.info(f"🎞️ Upload name: {upload_name}") upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"} # Upload video bytes to video service resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) # 200 = accepted, 409 already_exists = dedupe hit (still usable with jobId) if resp.status_code not in (200, 409): logging.error(f"❌ video.bsky.app upload failed: {resp.status_code} - {resp.text}") return None body = resp.json() if resp.status_code == 409: if body.get("error") == "already_exists" and body.get("jobId"): logging.info("ℹ️ Video already processed on video.bsky.app. Reusing job.") else: logging.error(f"❌ 409 without reusable jobId: {body}") return None job_id = body.get("jobId") if not job_id: logging.error(f"❌ Missing jobId in upload response: {body}") return None # Poll processing status logging.info(f"⏳ Job {job_id} accepted β€” polling status...") status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 while time.time() < deadline: s = requests.get(status_url, params={"jobId": job_id}, timeout=30) if s.status_code != 200: logging.error(f"❌ Job status check failed: {s.status_code} - {s.text}") return None status_json = s.json() job_status = status_json.get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": blob = job_status.get("blob") if not blob: logging.error(f"❌ Completed job without blob: {status_json}") return None wait_with_heartbeat(8, "CDN propagation") # Return RAW lexicon embed dict (no models.BlobRef dependency) return { "$type": "app.bsky.embed.video", "video": blob, "alt": alt_text or "", } if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("❌ Video processing timed out.") return None def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0) -> dict | None: """ Optional fallback: direct uploadBlob to PDS. Note: - This can be less reliable for playback compatibility depending on client/AppView. """ try: with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") r = client.upload_blob(video_bytes) wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") blob = getattr(r, "blob", None) if blob is None and isinstance(r, dict): blob = r.get("blob") if blob is None: logging.error("❌ PDS uploadBlob returned no blob.") return None return { "$type": "app.bsky.embed.video", "video": blob, "alt": alt_text or "", } except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str, settle_delay_seconds: float, allow_pds_video_fallback: bool ) -> dict | None: """ Try video.bsky.app first. Optionally fallback to direct PDS upload. """ logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_bsky_service(client, video_path, service_url, alt_text) if embed: return embed if allow_pds_video_fallback: logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds(client, video_path, alt_text, settle_delay_seconds) logging.error("❌ video.bsky.app failed and fallback disabled.") return None # ----------------------------------------------------------------------------- # Image upload # ----------------------------------------------------------------------------- def upload_image(client: Client, image_path: str, alt_text: str = "") -> dict | None: """ Upload image blob and return raw app.bsky.embed.images dict. """ try: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") return None mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") r = client.upload_blob(data) blob = getattr(r, "blob", None) if blob is None and isinstance(r, dict): blob = r.get("blob") if blob is None: logging.error("❌ uploadBlob returned no blob for image.") return None return { "$type": "app.bsky.embed.images", "images": [{"alt": alt_text or "", "image": blob}], } except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None # ----------------------------------------------------------------------------- # Post creation # ----------------------------------------------------------------------------- def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None, video_path: str | None, alt_text: str, service_url: str, video_settle_delay: float, allow_pds_video_fallback: bool, ) -> bool: """ Build and send app.bsky.feed.post with optional media embed. """ post_text = text.strip() if not post_text and not image_path and not video_path: logging.error("❌ Empty post text with no media is not allowed.") return False embed_dict = None if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") embed_dict = upload_video_smart( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, settle_delay_seconds=video_settle_delay, allow_pds_video_fallback=allow_pds_video_fallback, ) if not embed_dict: logging.error("❌ Aborting post: video upload/processing failed.") return False elif image_path: embed_dict = upload_image(client, image_path, alt_text) if not embed_dict: logging.error("❌ Aborting post: image upload failed.") return False # Explicit record: safest way to guarantee text and schema behavior record = { "$type": "app.bsky.feed.post", "text": post_text, "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: record["langs"] = langs if embed_dict: record["embed"] = embed_dict logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") # Typed call first; dict fallback for SDK differences try: data = models.ComAtprotoRepoCreateRecord.Data( repo=client.me.did, collection="app.bsky.feed.post", record=record, ) resp = client.com.atproto.repo.create_record(data) except Exception: resp = client.com.atproto.repo.create_record({ "repo": client.me.did, "collection": "app.bsky.feed.post", "record": record, }) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) logging.info(f"βœ… Post published! URI: {uri}") return True # ----------------------------------------------------------------------------- # CLI entrypoint # ----------------------------------------------------------------------------- def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle/email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--image", default=None, help="Path to image file") parser.add_argument("--video", default=None, help="Path to video file") parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait seconds") parser.add_argument("--allow-pds-video-fallback", action="store_true", help="Allow direct PDS video fallback") # Compression options (enabled by default) parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True, help="Compress video before upload (default: enabled)") parser.add_argument("--no-compress-video", dest="compress_video", action="store_false", help="Disable compression") parser.add_argument("--max-video-mb", type=float, default=45.0, help="Target max size after compression") parser.add_argument("--ffmpeg-crf", type=int, default=28, help="CRF (lower=better quality/larger)") parser.add_argument("--ffmpeg-preset", default="veryfast", help="x264 preset") args = parser.parse_args() # Exactly one media type at a time if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) # Initialize client against selected PDS client = Client(base_url=args.service) # Login with retries if not login_with_backoff( client, args.username, args.password, args.service, RetryConfig.login_max_attempts, RetryConfig.login_base_delay_seconds, RetryConfig.login_max_delay_seconds, RetryConfig.login_jitter_seconds, ): sys.exit(1) # Parse languages langs = [x.strip() for x in args.lang.split(",") if x.strip()] # Optional pre-upload compression for videos video_path_for_upload = args.video temp_compressed_path = None if args.video and args.compress_video: compressed = compress_video_ffmpeg( input_path=args.video, max_size_mb=args.max_video_mb, crf=args.ffmpeg_crf, preset=args.ffmpeg_preset, audio_bitrate_k=96, ) if compressed is None: logging.error("❌ Compression failed; aborting.") sys.exit(1) video_path_for_upload = compressed if compressed != args.video: temp_compressed_path = compressed # Build and publish post ok = post_to_bsky( client=client, text=args.text, langs=langs, image_path=args.image, video_path=video_path_for_upload, alt_text=args.alt, service_url=args.service, video_settle_delay=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) # Cleanup temp compressed file, if any try: if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path) logging.info(f"🧹 Removed temp file: {temp_compressed_path}") except Exception: pass if not ok: sys.exit(1) if __name__ == "__main__": main()