#!/usr/bin/env python3 """ Post text + optional image/video to Bluesky/federated PDS. Key reliability choices: - Video uploads go through https://video.bsky.app first (best client playback compatibility). - getServiceAuth uses: aud = did:web: lxm = com.atproto.repo.uploadBlob - Handles 409 already_exists from video service by reusing jobId. - Uses raw lexicon dict embeds (NO AppBskyEmbedVideo typed model), avoiding BlobRef SDK mismatch. - Optional direct-PDS fallback for video. - ffmpeg compression enabled by default (disable with --no-compress-video). """ import argparse import logging import mimetypes import os import random import secrets import shutil import string import subprocess import sys import tempfile import time from urllib.parse import urlparse import requests from atproto import Client # --------------------------- # Logging # --------------------------- def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # --------------------------- # Login helpers # --------------------------- def is_auth_error(exc: Exception) -> bool: t = repr(exc).lower() return "401" in t or "403" in t or "invalid identifier or password" in t def is_rate_limited_error(exc: Exception) -> bool: t = repr(exc).lower() return "429" in t or "ratelimit" in t or "too many requests" in t def is_transient_error(exc: Exception) -> bool: t = repr(exc) needles = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "InvokeTimeoutError", "502", "503", "504", ] return any(n in t for n in needles) def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 8.0, max_delay: float = 120.0, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") client.login(username, password) logging.info("βœ… Login successful.") return True except Exception as e: logging.exception("❌ Login failed") if is_auth_error(e): logging.error("❌ Authentication failed. Check handle/app-password.") return False if attempt >= max_attempts: return False if is_rate_limited_error(e) or is_transient_error(e): wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) logging.warning(f"⏳ Retrying login in {wait:.1f}s...") time.sleep(wait) else: return False return False # --------------------------- # Generic helpers # --------------------------- def wait_with_heartbeat(seconds: float, label: str) -> None: if seconds <= 0: return logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...") remaining = seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("βœ… Wait complete.") def pds_did_from_service_url(service_url: str) -> str: host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") return f"did:web:{host}" def random_video_name(ext: str = ".mp4") -> str: token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) return f"{int(time.time())}_{token}{ext}" def extract_token_from_service_auth(resp_obj) -> str | None: tok = getattr(resp_obj, "token", None) if tok: return tok if isinstance(resp_obj, dict): return resp_obj.get("token") return None def extract_blob_from_upload_blob_result(resp_obj): blob = getattr(resp_obj, "blob", None) if blob is not None: return blob if isinstance(resp_obj, dict): return resp_obj.get("blob") return None # --------------------------- # ffmpeg compression # --------------------------- def ffmpeg_exists() -> bool: return shutil.which("ffmpeg") is not None def ffprobe_exists() -> bool: return shutil.which("ffprobe") is not None def get_video_duration_seconds(path: str) -> float | None: if not ffprobe_exists(): return None try: out = subprocess.check_output( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ], stderr=subprocess.STDOUT, text=True, ).strip() return float(out) except Exception: return None def compress_video_ffmpeg( input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96, ) -> str | None: if not ffmpeg_exists(): logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") return None if not os.path.exists(input_path): logging.error(f"❌ Video file not found: {input_path}") return None src_size_mb = os.path.getsize(input_path) / (1024 * 1024) logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") if src_size_mb <= max_size_mb: logging.info("βœ… Already below target size. Skipping compression.") return input_path duration = get_video_duration_seconds(input_path) target_video_k = 1200 if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = min(max(target_video_k, 300), 5000) fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") os.close(fd) cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-b:v", f"{target_video_k}k", "-maxrate", f"{int(target_video_k * 1.3)}k", "-bufsize", f"{int(target_video_k * 2)}k", "-vf", "scale='min(1280,iw)':-2", "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", "-movflags", "+faststart", out_path, ] try: logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") if out_size_mb < src_size_mb: return out_path os.remove(out_path) logging.info("ℹ️ Compression not smaller than source. Using original.") return input_path except subprocess.CalledProcessError as e: logging.error("❌ ffmpeg compression failed.") if e.stderr: logging.error(e.stderr[-2000:]) try: os.remove(out_path) except Exception: pass return None # --------------------------- # Media upload: image # --------------------------- def upload_image_embed_dict(client: Client, image_path: str, alt_text: str = "") -> dict | None: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") return None mime, _ = mimetypes.guess_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime or 'unknown'})") try: up = client.upload_blob(data) blob = extract_blob_from_upload_blob_result(up) if blob is None: logging.error("❌ uploadBlob returned no blob for image.") return None # Raw lexicon dict embed (cross-SDK safe) return { "$type": "app.bsky.embed.images", "images": [ { "alt": alt_text or "", "image": blob, } ], } except Exception as e: logging.error(f"❌ Image upload failed: {repr(e)}") return None # --------------------------- # Media upload: video via video.bsky.app (primary) # --------------------------- def upload_video_via_video_service_embed_dict( client: Client, video_path: str, service_url: str, alt_text: str = "", ) -> dict | None: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) # getServiceAuth from user's PDS with correct audience + method binding try: auth_resp = client.com.atproto.server.get_service_auth( {"aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 1800} ) except Exception as e: logging.error(f"❌ getServiceAuth failed: {repr(e)}") return None token = extract_token_from_service_auth(auth_resp) if not token: logging.error("❌ getServiceAuth returned no token.") return None upload_name = random_video_name(".mp4") logging.info(f"🎞️ Upload name: {upload_name}") upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } try: r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) except Exception as e: logging.error(f"❌ video upload request failed: {repr(e)}") return None if r.status_code not in (200, 409): logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}") return None payload = r.json() # Dedupe path: reuse existing job if r.status_code == 409: if payload.get("error") == "already_exists" and payload.get("jobId"): logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") else: logging.error(f"❌ 409 without reusable jobId: {payload}") return None job_id = payload.get("jobId") if not job_id: logging.error(f"❌ No jobId in video upload response: {payload}") return None logging.info(f"⏳ Job {job_id} accepted β€” polling status...") status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 # 10 min max poll while time.time() < deadline: try: s = requests.get(status_url, params={"jobId": job_id}, timeout=30) except Exception as e: logging.warning(f"⚠️ Status poll request failed once: {repr(e)}") time.sleep(3) continue if s.status_code != 200: logging.error(f"❌ Job status failed: {s.status_code} - {s.text}") return None body = s.json() job_status = body.get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": blob = job_status.get("blob") if not blob: logging.error(f"❌ Completed job without blob: {body}") return None wait_with_heartbeat(8, "CDN propagation") # RAW embed dict; no BlobRef conversion at all. return { "$type": "app.bsky.embed.video", "video": blob, "alt": alt_text or "", } if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("❌ Video processing timed out.") return None # --------------------------- # Media upload: direct PDS fallback (optional) # --------------------------- def upload_video_via_pds_embed_dict( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, ) -> dict | None: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None try: with open(video_path, "rb") as f: b = f.read() size_mb = len(b) / (1024 * 1024) logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") up = client.upload_blob(b) blob = extract_blob_from_upload_blob_result(up) if blob is None: logging.error("❌ PDS uploadBlob returned no blob.") return None wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") return { "$type": "app.bsky.embed.video", "video": blob, "alt": alt_text or "", } except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None def upload_video_smart_embed_dict( client: Client, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, ) -> dict | None: logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_video_service_embed_dict( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, ) if embed: return embed if allow_pds_video_fallback: logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds_embed_dict( client=client, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.error("❌ video.bsky.app failed and fallback is disabled.") return None # --------------------------- # Create post # --------------------------- def create_post_record( text: str, langs: list[str], embed_dict: dict | None = None, ) -> dict: record = { "$type": "app.bsky.feed.post", "text": text.strip(), # must be plain string "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: record["langs"] = langs if embed_dict is not None: record["embed"] = embed_dict return record def publish_post(client: Client, record: dict) -> bool: try: # Use dict payload directly for max cross-version compatibility. resp = client.com.atproto.repo.create_record( { "repo": client.me.did, "collection": "app.bsky.feed.post", "record": record, } ) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) logging.info(f"βœ… Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ createRecord failed: {repr(e)}") return False # --------------------------- # Main # --------------------------- def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text") parser.add_argument("--username", required=True, help="Handle/email") parser.add_argument("--password", required=True, help="App password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--image", default=None, help="Image path") parser.add_argument("--video", default=None, help="Video path") parser.add_argument("--alt", default="", help="Alt text") parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait") parser.add_argument("--allow-pds-video-fallback", action="store_true") # Compression ON by default parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") parser.add_argument("--max-video-mb", type=float, default=45.0) parser.add_argument("--ffmpeg-crf", type=int, default=28) parser.add_argument("--ffmpeg-preset", default="veryfast") args = parser.parse_args() if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) client = Client(base_url=args.service) if not login_with_backoff( client=client, username=args.username, password=args.password, service_url=args.service, ): sys.exit(1) langs = [x.strip() for x in args.lang.split(",") if x.strip()] video_path_for_upload = args.video temp_compressed_path = None if args.video and args.compress_video: compressed = compress_video_ffmpeg( input_path=args.video, max_size_mb=args.max_video_mb, crf=args.ffmpeg_crf, preset=args.ffmpeg_preset, audio_bitrate_k=96, ) if compressed is None: logging.error("❌ Compression failed; aborting.") sys.exit(1) video_path_for_upload = compressed if compressed != args.video: temp_compressed_path = compressed embed_dict = None if video_path_for_upload: logging.info(f"🎬 Preparing video upload: {video_path_for_upload}") embed_dict = upload_video_smart_embed_dict( client=client, video_path=video_path_for_upload, service_url=args.service, alt_text=args.alt, settle_delay_seconds=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) if embed_dict is None: logging.error("❌ Aborting post: video upload/processing failed.") if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path) sys.exit(1) elif args.image: embed_dict = upload_image_embed_dict(client=client, image_path=args.image, alt_text=args.alt) if embed_dict is None: logging.error("❌ Aborting post: image upload failed.") if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path) sys.exit(1) record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") ok = publish_post(client=client, record=record) if temp_compressed_path and os.path.exists(temp_compressed_path): try: os.remove(temp_compressed_path) logging.info(f"🧹 Removed temp file: {temp_compressed_path}") except Exception: pass if not ok: sys.exit(1) if __name__ == "__main__": main()