#!/usr/bin/env python3 """ bsky_post.py β€” Post text + optional image or video to Bluesky/federated PDS. Fixes included: - Robust login backoff - video.bsky.app first for videos - Correct getServiceAuth: aud = did:web: lxm = com.atproto.repo.uploadBlob - Handles 409 already_exists by reusing returned jobId - SDK compatibility: no hard dependency on models.BlobRef - Explicit createRecord payload so text is always a plain string - ffmpeg compression (enabled by default, disable with --no-compress-video) """ import argparse import logging import mimetypes import os import random import re import secrets import shutil import string import subprocess import sys import tempfile import time from dataclasses import dataclass from urllib.parse import urlparse import requests from atproto import Client, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3.0 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", "InvokeTimeoutError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") client.login(username, password) logging.info("βœ… Login successful.") return True except Exception as e: logging.exception("❌ Login exception") if is_auth_error(e): logging.error("❌ Authentication failed. Check username/app-password.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait += random.uniform(0, jitter) logging.warning(f"⏳ Rate limited, retrying in {wait:.1f}s...") time.sleep(wait) continue logging.error("❌ Exhausted retries due to rate limit.") return False if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"⏳ Transient/network error, retrying in {wait:.1f}s...") time.sleep(wait) continue logging.error("❌ Exhausted retries after network/transient errors.") return False if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"⏳ Unknown error, retrying in {wait:.1f}s...") time.sleep(wait) continue return False # ============================================================ # Utility # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def wait_with_heartbeat(total_seconds: float, label: str) -> None: if total_seconds <= 0: return logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") remaining = total_seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("βœ… Wait complete.") def pds_did_from_service_url(service_url: str) -> str: host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") return f"did:web:{host}" def random_video_name(ext: str = ".mp4") -> str: token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) return f"{int(time.time())}_{token}{ext}" def model_to_dict(obj): if obj is None: return None if hasattr(obj, "model_dump"): return obj.model_dump(by_alias=True, exclude_none=True) if hasattr(obj, "dict"): return obj.dict(by_alias=True, exclude_none=True) return obj def normalize_blob_for_embed(blob_dict: dict): """ Cross-version blob normalization: - Use BlobRef if available - else return raw dict (accepted by older/newer serializers) """ BlobRef = getattr(models, "BlobRef", None) if BlobRef is not None: try: return BlobRef.from_dict(blob_dict) except Exception: pass return blob_dict def _extract_service_auth_token(upload_auth) -> str | None: token = getattr(upload_auth, "token", None) if token: return token if isinstance(upload_auth, dict): return upload_auth.get("token") return None # ============================================================ # ffmpeg compression # ============================================================ def ffmpeg_exists() -> bool: return shutil.which("ffmpeg") is not None def ffprobe_exists() -> bool: return shutil.which("ffprobe") is not None def get_video_duration_seconds(path: str) -> float | None: if not ffprobe_exists(): return None try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True).strip() return float(out) except Exception: return None def compress_video_ffmpeg( input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96, ) -> str | None: if not ffmpeg_exists(): logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") return None if not os.path.exists(input_path): logging.error(f"❌ Input video not found: {input_path}") return None src_size_mb = os.path.getsize(input_path) / (1024 * 1024) logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") if src_size_mb <= max_size_mb: logging.info("βœ… Already below target size. Skipping compression.") return input_path duration = get_video_duration_seconds(input_path) target_video_k = 1200 if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = min(max(target_video_k, 300), 5000) fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") os.close(fd) cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-b:v", f"{target_video_k}k", "-maxrate", f"{int(target_video_k * 1.3)}k", "-bufsize", f"{int(target_video_k * 2)}k", "-vf", "scale='min(1280,iw)':-2", "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", "-movflags", "+faststart", out_path, ] try: logging.info( f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset}, vβ‰ˆ{target_video_k}k)..." ) subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") if out_size_mb < src_size_mb: return out_path logging.info("ℹ️ Compressed file is not smaller. Using original.") try: os.remove(out_path) except Exception: pass return input_path except subprocess.CalledProcessError as e: logging.error("❌ ffmpeg compression failed.") if e.stderr: logging.error(e.stderr[-2000:]) try: os.remove(out_path) except Exception: pass return None # ============================================================ # Uploads # ============================================================ def upload_image(client: Client, image_path: str, alt_text: str = ""): try: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") return None mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None def upload_video_via_pds( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, ): try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") response = client.upload_blob(video_bytes) blob = response.blob logging.warning("⚠️ [PDS-direct fallback] Blob uploaded. Waiting for indexing...") wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text) except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None def _poll_video_job(video_host: str, job_id: str, alt_text: str): status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 while time.time() < deadline: status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) if status_resp.status_code != 200: logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}") return None status_json = status_resp.json() job_status = status_json.get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": blob_dict = job_status.get("blob") if not blob_dict: logging.error(f"❌ No blob in completed job status: {status_json}") return None wait_with_heartbeat(8, "CDN propagation") blob_obj = normalize_blob_for_embed(blob_dict) # <- BlobRef-safe logging.info("βœ… Video processed successfully.") return models.AppBskyEmbedVideo.Main(video=blob_obj, alt=alt_text) if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("❌ Video processing timed out.") return None def upload_video_via_bsky_service( client: Client, video_path: str, service_url: str, alt_text: str = "", ): try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) # getServiceAuth with correct aud + lxm try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=pds_did, lxm="com.atproto.repo.uploadBlob", exp=int(time.time()) + 60 * 30, ) upload_auth = client.com.atproto.server.get_service_auth(params) except Exception: upload_auth = client.com.atproto.server.get_service_auth( { "aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 60 * 30, } ) token = _extract_service_auth_token(upload_auth) if not token: logging.error("❌ Failed to extract service auth token.") return None upload_name = random_video_name(".mp4") logging.info(f"🎞️ Upload name: {upload_name}") upload_url = ( f"{video_host}/xrpc/app.bsky.video.uploadVideo" f"?did={client.me.did}&name={upload_name}" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) if upload_resp.status_code not in (200, 409): logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") return None body = upload_resp.json() if upload_resp.status_code == 409: if body.get("error") == "already_exists" and body.get("jobId"): logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") else: logging.error(f"❌ 409 without reusable jobId: {body}") return None job_id = body.get("jobId") if not job_id: logging.error(f"❌ No jobId in upload response: {body}") return None logging.info(f"⏳ Job {job_id} accepted β€” polling status...") return _poll_video_job(video_host, job_id, alt_text) except Exception as e: logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") return None def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, ): logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_bsky_service( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, ) if embed: return embed if allow_pds_video_fallback: logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds( client=client, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.error("❌ video.bsky.app failed and fallback disabled.") return None # ============================================================ # Posting # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", service_url: str = "https://bsky.social", video_settle_delay: float = 30.0, allow_pds_video_fallback: bool = False, ) -> bool: post_text = text.strip() if not post_text and not image_path and not video_path: logging.error("❌ Empty post text with no media is not allowed.") return False try: embed_obj = None if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") embed_obj = upload_video_smart( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, settle_delay_seconds=video_settle_delay, allow_pds_video_fallback=allow_pds_video_fallback, ) if not embed_obj: logging.error("❌ Aborting post: video upload/processing failed.") return False elif image_path: image = upload_image(client, image_path, alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed_obj = models.AppBskyEmbedImages.Main(images=[image]) record = { "$type": "app.bsky.feed.post", "text": post_text, "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: record["langs"] = langs if embed_obj is not None: record["embed"] = model_to_dict(embed_obj) logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") try: resp = client.com.atproto.repo.create_record( models.ComAtprotoRepoCreateRecord.Data( repo=client.me.did, collection="app.bsky.feed.post", record=record, ) ) except Exception: resp = client.com.atproto.repo.create_record( {"repo": client.me.did, "collection": "app.bsky.feed.post", "record": record} ) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) logging.info(f"βœ… Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle/email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--image", default=None, help="Path to image file") parser.add_argument("--video", default=None, help="Path to video file") parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument("--video-settle-delay", type=float, default=30.0) parser.add_argument("--allow-pds-video-fallback", action="store_true") # Compression defaults ON parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") parser.add_argument("--max-video-mb", type=float, default=45.0) parser.add_argument("--ffmpeg-crf", type=int, default=28) parser.add_argument("--ffmpeg-preset", default="veryfast") args = parser.parse_args() if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) client = Client(base_url=args.service) if not login_with_backoff( client=client, username=args.username, password=args.password, service_url=args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ): sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] video_path_for_upload = args.video temp_compressed_path = None if args.video and args.compress_video: compressed = compress_video_ffmpeg( input_path=args.video, max_size_mb=args.max_video_mb, crf=args.ffmpeg_crf, preset=args.ffmpeg_preset, audio_bitrate_k=96, ) if compressed is None: logging.error("❌ Compression failed; aborting.") sys.exit(1) video_path_for_upload = compressed if compressed != args.video: temp_compressed_path = compressed ok = post_to_bsky( client=client, text=args.text, langs=langs, image_path=args.image, video_path=video_path_for_upload, alt_text=args.alt, service_url=args.service, video_settle_delay=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) try: if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path) logging.info(f"🧹 Removed temp file: {temp_compressed_path}") except Exception: pass if not ok: sys.exit(1) if __name__ == "__main__": main()