#!/usr/bin/env python3 """ Post text + optional image/video to Bluesky/federated PDS. Strategy: - 100% raw XRPC via `requests` β€” no atproto SDK dependency at all. - Avoids all SDK BlobRef/typed-model serialization bugs. - Video uploads go through https://video.bsky.app (official path). - getServiceAuth uses aud=did:web:, lxm=com.atproto.repo.uploadBlob. - Handles 409 already_exists by reusing jobId. - ffmpeg compression enabled by default. """ import argparse import json import logging import mimetypes import os import random import secrets import shutil import string import subprocess import sys import tempfile import time from urllib.parse import urlparse import requests # --------------------------- # Logging # --------------------------- def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # Silence noisy third-party libs for noisy in ("httpx", "httpcore", "urllib3"): logging.getLogger(noisy).setLevel(logging.WARNING) # --------------------------- # Error classification # --------------------------- def is_auth_error_status(status: int) -> bool: return status in (400, 401, 403) def is_rate_limited_status(status: int) -> bool: return status == 429 def is_transient_status(status: int) -> bool: return status in (500, 502, 503, 504) # --------------------------- # Raw XRPC helpers # --------------------------- def xrpc_get(pds_url: str, method: str, params: dict, access_jwt: str | None = None, timeout: int = 30) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" r = requests.get(url, headers=headers, params=params, timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text def xrpc_post_json(pds_url: str, method: str, body: dict, access_jwt: str | None = None, timeout: int = 60) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {"Content-Type": "application/json"} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" r = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text def xrpc_post_bytes(pds_url: str, method: str, data: bytes, content_type: str, access_jwt: str | None = None, timeout: int = 600) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {"Content-Type": content_type} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" r = requests.post(url, headers=headers, data=data, timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text # --------------------------- # Login (raw XRPC, no SDK) # --------------------------- def login_with_backoff( pds_url: str, identifier: str, password: str, max_attempts: int = 5, base_delay: float = 8.0, max_delay: float = 120.0, ) -> dict | None: """ Returns session dict {did, accessJwt, refreshJwt, handle} or None. """ for attempt in range(1, max_attempts + 1): try: logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {pds_url} as {identifier}") status, body = xrpc_post_json( pds_url=pds_url, method="com.atproto.server.createSession", body={"identifier": identifier, "password": password}, timeout=30, ) if status == 200 and isinstance(body, dict) and body.get("accessJwt"): logging.info("βœ… Login successful.") return body logging.error(f"❌ Login failed: HTTP {status} body={body}") if is_auth_error_status(status): logging.error("❌ Authentication failed. Check handle/app-password.") return None if attempt >= max_attempts: return None if is_rate_limited_status(status) or is_transient_status(status): wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) logging.warning(f"⏳ Retrying login in {wait:.1f}s...") time.sleep(wait) else: return None except requests.RequestException as e: logging.exception("❌ Login request error") if attempt >= max_attempts: return None wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) logging.warning(f"⏳ Retrying login in {wait:.1f}s...") time.sleep(wait) return None # --------------------------- # Generic helpers # --------------------------- def wait_with_heartbeat(seconds: float, label: str) -> None: if seconds <= 0: return logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...") remaining = seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("βœ… Wait complete.") def pds_did_from_service_url(service_url: str) -> str: host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") return f"did:web:{host}" def random_video_name(ext: str = ".mp4") -> str: token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) return f"{int(time.time())}_{token}{ext}" def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, exp_seconds: int = 1800) -> str | None: status, body = xrpc_get( pds_url=pds_url, method="com.atproto.server.getServiceAuth", params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds}, access_jwt=access_jwt, timeout=30, ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ getServiceAuth failed: HTTP {status} body={body}") return None token = body.get("token") if not token: logging.error(f"❌ getServiceAuth returned no token: {body}") return None return token # --------------------------- # ffmpeg compression # --------------------------- def ffmpeg_exists() -> bool: return shutil.which("ffmpeg") is not None def ffprobe_exists() -> bool: return shutil.which("ffprobe") is not None def get_video_duration_seconds(path: str) -> float | None: if not ffprobe_exists(): return None try: out = subprocess.check_output( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ], stderr=subprocess.STDOUT, text=True, ).strip() return float(out) except Exception: return None def compress_video_ffmpeg( input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96, ) -> str | None: if not ffmpeg_exists(): logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") return None if not os.path.exists(input_path): logging.error(f"❌ Video file not found: {input_path}") return None src_size_mb = os.path.getsize(input_path) / (1024 * 1024) logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") if src_size_mb <= max_size_mb: logging.info("βœ… Already below target size. Skipping compression.") return input_path duration = get_video_duration_seconds(input_path) target_video_k = 1200 if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = min(max(target_video_k, 300), 5000) fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") os.close(fd) cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-b:v", f"{target_video_k}k", "-maxrate", f"{int(target_video_k * 1.3)}k", "-bufsize", f"{int(target_video_k * 2)}k", "-vf", "scale='min(1280,iw)':-2", "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", "-movflags", "+faststart", out_path, ] try: logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") if out_size_mb < src_size_mb: return out_path os.remove(out_path) logging.info("ℹ️ Compression not smaller than source. Using original.") return input_path except subprocess.CalledProcessError as e: logging.error("❌ ffmpeg compression failed.") if e.stderr: logging.error(e.stderr[-2000:]) try: os.remove(out_path) except Exception: pass return None # --------------------------- # Image upload # --------------------------- def upload_image_embed_dict( pds_url: str, access_jwt: str, image_path: str, alt_text: str = "", ) -> dict | None: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") return None mime, _ = mimetypes.guess_type(image_path) mime = mime or "image/jpeg" with open(image_path, "rb") as f: data = f.read() logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") status, body = xrpc_post_bytes( pds_url=pds_url, method="com.atproto.repo.uploadBlob", data=data, content_type=mime, access_jwt=access_jwt, timeout=180, ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ Image uploadBlob failed: HTTP {status} body={body}") return None blob = body.get("blob") if not blob: logging.error(f"❌ uploadBlob returned no blob: {body}") return None return { "$type": "app.bsky.embed.images", "images": [{"alt": alt_text or "", "image": blob}], } # --------------------------- # Video upload via video.bsky.app # --------------------------- def upload_video_via_video_service_embed_dict( did: str, access_jwt: str, pds_url: str, video_path: str, service_url: str, alt_text: str = "", ) -> dict | None: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) token = get_service_auth_token( pds_url=pds_url, access_jwt=access_jwt, aud=pds_did, lxm="com.atproto.repo.uploadBlob", exp_seconds=1800, ) if not token: return None upload_name = random_video_name(".mp4") logging.info(f"🎞️ Upload name: {upload_name}") upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } try: r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=300) except requests.RequestException as e: logging.error(f"❌ video upload request failed: {repr(e)}") return None if r.status_code not in (200, 409): logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}") return None try: payload = r.json() except Exception as e: logging.error(f"❌ Could not parse upload response JSON: {repr(e)} body={r.text[:500]}") return None if r.status_code == 409: if payload.get("error") == "already_exists" and payload.get("jobId"): logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") else: logging.error(f"❌ 409 without reusable jobId: {payload}") return None job_id = payload.get("jobId") if not job_id: logging.error(f"❌ No jobId in video upload response: {payload}") return None logging.info(f"⏳ Job {job_id} accepted β€” polling status...") status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 # 10 min max poll last_state = None completed_blob = None while time.time() < deadline: try: s = requests.get( status_url, params={"jobId": job_id}, headers={"Authorization": f"Bearer {token}"}, timeout=30, ) except requests.RequestException as e: logging.warning(f"⚠️ Status poll request failed once: {repr(e)}") time.sleep(3) continue if s.status_code != 200: logging.error(f"❌ Job status failed: {s.status_code} - {s.text}") return None try: body = s.json() except Exception as e: logging.error(f"❌ Could not parse status JSON: {repr(e)}") return None job_status = body.get("jobStatus", {}) or {} state = job_status.get("state") if state != last_state: logging.info(f" state β†’ {state}") last_state = state if state == "JOB_STATE_COMPLETED": completed_blob = job_status.get("blob") if not completed_blob: logging.error(f"❌ Completed job without blob: {body}") return None break if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None time.sleep(3) if completed_blob is None: logging.error("❌ Video processing timed out.") return None # Wait AFTER successful poll β€” outside the loop, no SDK in scope wait_with_heartbeat(8, "CDN propagation") # Build pure dict embed embed = { "$type": "app.bsky.embed.video", "video": completed_blob, } if alt_text: embed["alt"] = alt_text return embed # --------------------------- # Direct PDS video fallback # --------------------------- def upload_video_via_pds_embed_dict( pds_url: str, access_jwt: str, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, ) -> dict | None: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: b = f.read() size_mb = len(b) / (1024 * 1024) logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") status, body = xrpc_post_bytes( pds_url=pds_url, method="com.atproto.repo.uploadBlob", data=b, content_type="video/mp4", access_jwt=access_jwt, timeout=900, ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ PDS uploadBlob failed: HTTP {status} body={body}") return None blob = body.get("blob") if not blob: logging.error(f"❌ PDS uploadBlob returned no blob: {body}") return None wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") embed = { "$type": "app.bsky.embed.video", "video": blob, } if alt_text: embed["alt"] = alt_text return embed def upload_video_smart_embed_dict( did: str, access_jwt: str, pds_url: str, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, ) -> dict | None: logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_video_service_embed_dict( did=did, access_jwt=access_jwt, pds_url=pds_url, video_path=video_path, service_url=service_url, alt_text=alt_text, ) if embed: return embed if allow_pds_video_fallback: logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds_embed_dict( pds_url=pds_url, access_jwt=access_jwt, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.error("❌ video.bsky.app failed and fallback is disabled.") return None # --------------------------- # Create post # --------------------------- def create_post_record(text: str, langs: list[str], embed_dict: dict | None = None) -> dict: record = { "$type": "app.bsky.feed.post", "text": text.strip(), "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: record["langs"] = langs if embed_dict is not None: record["embed"] = embed_dict return record def publish_post(pds_url: str, access_jwt: str, did: str, record: dict) -> bool: status, body = xrpc_post_json( pds_url=pds_url, method="com.atproto.repo.createRecord", body={ "repo": did, "collection": "app.bsky.feed.post", "record": record, }, access_jwt=access_jwt, timeout=60, ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ createRecord failed: HTTP {status} body={body}") return False uri = body.get("uri") logging.info(f"βœ… Post published! URI: {uri}") return True # --------------------------- # Main # --------------------------- def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text") parser.add_argument("--username", required=True, help="Handle/email") parser.add_argument("--password", required=True, help="App password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--image", default=None, help="Image path") parser.add_argument("--video", default=None, help="Video path") parser.add_argument("--alt", default="", help="Alt text") parser.add_argument("--video-settle-delay", type=float, default=30.0) parser.add_argument("--allow-pds-video-fallback", action="store_true") parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") parser.add_argument("--max-video-mb", type=float, default=45.0) parser.add_argument("--ffmpeg-crf", type=int, default=28) parser.add_argument("--ffmpeg-preset", default="veryfast") args = parser.parse_args() if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) pds_url = args.service.rstrip("/") session = login_with_backoff( pds_url=pds_url, identifier=args.username, password=args.password, ) if not session: sys.exit(1) did = session["did"] access_jwt = session["accessJwt"] logging.info(f"πŸ†” DID: {did} | PDS: {pds_url}") langs = [x.strip() for x in args.lang.split(",") if x.strip()] video_path_for_upload = args.video temp_compressed_path = None if args.video and args.compress_video: compressed = compress_video_ffmpeg( input_path=args.video, max_size_mb=args.max_video_mb, crf=args.ffmpeg_crf, preset=args.ffmpeg_preset, audio_bitrate_k=96, ) if compressed is None: logging.error("❌ Compression failed; aborting.") sys.exit(1) video_path_for_upload = compressed if compressed != args.video: temp_compressed_path = compressed embed_dict = None if video_path_for_upload: logging.info(f"🎬 Preparing video upload: {video_path_for_upload}") embed_dict = upload_video_smart_embed_dict( did=did, access_jwt=access_jwt, pds_url=pds_url, video_path=video_path_for_upload, service_url=pds_url, alt_text=args.alt, settle_delay_seconds=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) if embed_dict is None: logging.error("❌ Aborting post: video upload/processing failed.") if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path) sys.exit(1) elif args.image: embed_dict = upload_image_embed_dict( pds_url=pds_url, access_jwt=access_jwt, image_path=args.image, alt_text=args.alt, ) if embed_dict is None: logging.error("❌ Aborting post: image upload failed.") sys.exit(1) record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") ok = publish_post(pds_url=pds_url, access_jwt=access_jwt, did=did, record=record) if temp_compressed_path and os.path.exists(temp_compressed_path): try: os.remove(temp_compressed_path) logging.info(f"🧹 Removed temp file: {temp_compressed_path}") except Exception: pass if not ok: sys.exit(1) if __name__ == "__main__": main()