#!/usr/bin/env python3 """ bsky_post.py — Post text + optional image or video to Bluesky. Usage examples: python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 --username you --password app-pass python3 bsky_post.py "Dijous!!!!" --image thursday.jpg --username you --password app-pass python3 bsky_post.py "Bon dia!" --username you --password app-pass python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca --username you --password app-pass python3 bsky_post.py "Long video!" --video clip.mp4 --username you --password app-pass """ import argparse import logging import mimetypes import os import sys import time import random import re import requests from dataclasses import dataclass from urllib.parse import urlparse from atproto import Client, client_utils, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3.0 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Text builder # ============================================================ def make_rich(content: str): text_builder = client_utils.TextBuilder() content = content.strip() lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): text_builder.link(word, word) elif word.startswith("#") and len(word) > 1: tag_name = word[1:].rstrip(".,;:!?)'\"") if tag_name: text_builder.tag(word, tag_name) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass try: response = getattr(error_obj, "response", None) headers = getattr(response, "headers", None) or {} now_ts = int(time.time()) retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info(f"🔑 Login attempt {attempt}/{max_attempts} → {service_url} as {username}") client.login(username, password) logging.info("✅ Login successful.") return True except Exception as e: logging.exception("❌ Login exception") if is_auth_error(e): logging.error("❌ Bad credentials. Check handle/password.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait = wait + random.uniform(0, jitter) logging.warning( f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries due to rate limiting.") return False if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Transient login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries after transient/network errors.") return False if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue return False # ============================================================ # Utility # ============================================================ def is_official_bsky_pds(service_url: str) -> bool: try: host = (urlparse(service_url).hostname or "").lower() return host in {"bsky.social", "bsky.app"} or host.endswith(".bsky.network") except Exception: return False def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def wait_with_heartbeat(total_seconds: float, label: str = "indexing") -> None: if total_seconds <= 0: return logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") remaining = total_seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("✅ Wait complete.") # ============================================================ # Media upload — Image # ============================================================ def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Image uploaded successfully.") return models.AppBskyEmbedImages.Image( image=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None # ============================================================ # Media upload — Video (PDS-direct fallback only) # ============================================================ def upload_video_via_pds( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, ) -> models.AppBskyEmbedVideo.Main | None: """ Direct upload to home PDS using upload_blob. This can produce posts where blob exists but AppView playback is unreliable. Use only as explicit fallback. """ try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.warning( f"🎬 [PDS-direct fallback] Uploading to home PDS: {video_path} ({size_mb:.2f} MB)" ) response = client.upload_blob(video_bytes) blob = response.blob logging.warning("⚠️ [PDS-direct fallback] Video blob uploaded.") wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing the video blob") return models.AppBskyEmbedVideo.Main( video=blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None # ============================================================ # Media upload — Video (video.bsky.app primary path) # ============================================================ def _extract_service_auth_token(upload_auth) -> str | None: token = getattr(upload_auth, "token", None) if token: return token if isinstance(upload_auth, dict): return upload_auth.get("token") return None def upload_video_via_bsky_service( client: Client, video_path: str, alt_text: str = "", ) -> models.AppBskyEmbedVideo.Main | None: """ Upload a video via centralized video.bsky.app service. This is the reliable playback path for Bluesky clients. """ try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") VIDEO_HOST = "https://video.bsky.app" VIDEO_DID = "did:web:video.bsky.app" # Robust params typing across atproto versions try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=VIDEO_DID, lxm="app.bsky.video.uploadVideo", exp=int(time.time()) + 60 * 30, ) upload_auth = client.com.atproto.server.get_service_auth(params) except Exception: upload_auth = client.com.atproto.server.get_service_auth({ "aud": VIDEO_DID, "lxm": "app.bsky.video.uploadVideo", "exp": int(time.time()) + 60 * 30, }) token = _extract_service_auth_token(upload_auth) if not token: logging.error("❌ Failed to get service auth token for video.bsky.app.") return None user_did = client.me.did upload_url = ( f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" f"?did={user_did}&name={int(time.time())}.mp4" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) if upload_resp.status_code != 200: logging.error( f"❌ video.bsky.app upload failed: " f"{upload_resp.status_code} - {upload_resp.text}" ) return None payload = upload_resp.json() job_id = payload.get("jobId") if not job_id: logging.error(f"❌ No jobId returned from video service. Response: {payload}") return None logging.info(f"⏳ Job {job_id} accepted — polling status...") status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 # 10 min for big videos while time.time() < deadline: status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) if status_resp.status_code != 200: logging.error( f"❌ Job status check failed: " f"{status_resp.status_code} - {status_resp.text}" ) return None status_json = status_resp.json() job_status = status_json.get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": blob_dict = job_status.get("blob") if not blob_dict: logging.error(f"❌ No blob in completed job status: {status_json}") return None # Small propagation cushion wait_with_heartbeat(8, label="CDN propagation") blob_ref = models.BlobRef.from_dict(blob_dict) logging.info("✅ Video processed and blob returned.") return models.AppBskyEmbedVideo.Main( video=blob_ref, alt=alt_text, ) if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed: {job_status}") return None logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("❌ Video processing timed out.") return None except Exception as e: logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") return None # ============================================================ # Media upload — Video dispatcher # ============================================================ def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, ) -> models.AppBskyEmbedVideo.Main | None: """ Reliable policy: 1) Always try video.bsky.app first (best playback compatibility). 2) Optional direct-PDS fallback only if explicitly enabled. """ if is_official_bsky_pds(service_url): logging.info(f"🌐 PDS appears official ({service_url}). Using video.bsky.app.") else: logging.info( f"🌍 PDS is self-hosted/federated ({service_url}). " "Still trying video.bsky.app first for client playback reliability." ) embed = upload_video_via_bsky_service(client, video_path, alt_text=alt_text) if embed: return embed if allow_pds_video_fallback: logging.warning( "⚠️ video.bsky.app upload failed; trying direct PDS fallback " "(may produce unplayable videos in some clients)." ) return upload_video_via_pds( client, video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.error( "❌ video.bsky.app upload failed. Not posting unreliable direct-PDS video " "(enable --allow-pds-video-fallback to override)." ) return None # ============================================================ # Post # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", service_url: str = "https://bsky.social", video_settle_delay: float = 30.0, allow_pds_video_fallback: bool = False, ) -> bool: rich_text = make_rich(text) try: if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") video_embed = upload_video_smart( client, video_path, service_url=service_url, alt_text=alt_text, settle_delay_seconds=video_settle_delay, allow_pds_video_fallback=allow_pds_video_fallback, ) if not video_embed: logging.error("❌ Aborting post: video upload/processing failed.") return False logging.info("🚀 Sending video post...") result = client.send_post( text=rich_text, embed=video_embed, langs=langs, ) elif image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed = models.AppBskyEmbedImages.Main(images=[image]) logging.info("🚀 Sending image post...") result = client.send_post(text=rich_text, embed=embed, langs=langs) else: logging.info("🚀 Sending text post...") result = client.send_post(text=rich_text, langs=langs) uri = getattr(result, "uri", None) logging.info(f"✅ Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser( description="Post text + optional image or video to Bluesky." ) parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file to attach") parser.add_argument("--video", default=None, help="Path to video file to attach") parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument( "--video-settle-delay", type=float, default=30.0, help="Seconds to wait after direct-PDS video fallback upload before posting (default: 30)", ) parser.add_argument( "--allow-pds-video-fallback", action="store_true", help="Allow direct-PDS fallback if video.bsky.app upload fails (less reliable playback).", ) args = parser.parse_args() if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) client = Client(base_url=args.service) success = login_with_backoff( client, args.username, args.password, args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, service_url=args.service, video_settle_delay=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()