#!/usr/bin/env python3 """ bsky_post.py — Post text + optional image or video to a Bluesky instance. Usage examples: python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 python3 bsky_post.py "Dijous!!!!" --image thursday.jpg python3 bsky_post.py "Bon dia!" python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca python3 bsky_post.py "Long video!" --video clip.mp4 --video-settle-delay 25 """ import argparse import logging import mimetypes import os import sys import time import random import re import requests from dataclasses import dataclass from urllib.parse import urlparse from atproto import Client, client_utils, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Text builder # ============================================================ def make_rich(content: str): text_builder = client_utils.TextBuilder() content = content.strip() lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): text_builder.link(word, word) elif word.startswith("#") and len(word) > 1: tag_name = word[1:].rstrip(".,;:!?)'\"") if tag_name: text_builder.tag(word, tag_name) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass try: response = getattr(error_obj, "response", None) headers = getattr(response, "headers", None) or {} now_ts = int(time.time()) retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info( f"🔑 Login attempt {attempt}/{max_attempts} → {service_url} as {username}" ) client.login(username, password) logging.info("✅ Login successful.") return True except Exception as e: logging.exception("❌ Login exception") if is_auth_error(e): logging.error("❌ Bad credentials. Check handle/password.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait = wait + random.uniform(0, jitter) logging.warning( f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries due to rate limiting.") return False if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Transient login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries after transient/network errors.") return False if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue return False # ============================================================ # PDS detection # ============================================================ def is_official_bsky_pds(service_url: str) -> bool: """ Detect whether the configured PDS is part of the official Bluesky-operated network. Self-hosted/federated PDSes (e.g. eurosky.social) return False. """ try: host = (urlparse(service_url).hostname or "").lower() return ( host in {"bsky.social", "bsky.app"} or host.endswith(".bsky.network") ) except Exception: return False # ============================================================ # Media upload — Image # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Image uploaded successfully.") return models.AppBskyEmbedImages.Image( image=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None # ============================================================ # Helpers — settle delay # ============================================================ def wait_with_heartbeat(total_seconds: float, label: str = "indexing") -> None: """ Sleep `total_seconds` while logging a heartbeat every 5s so the operator can see we're still alive and how much time is left. """ if total_seconds <= 0: return logging.info(f"⏳ Waiting {total_seconds:.0f}s for PDS {label}...") remaining = total_seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("✅ Settle delay complete.") # ============================================================ # Media upload — Video (PDS-direct) # ============================================================ def upload_video_via_pds( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 60.0, ) -> models.AppBskyEmbedVideo.Main | None: """ Upload a video as a generic blob directly to the user's PDS. Used for self-hosted / federated PDSes (e.g. eurosky.social) that don't proxy to the centralized video.bsky.app service. The PDS stores the bytes and returns a blob ref we can embed directly. A settle delay is applied AFTER the upload returns, because some PDSes (notably non-Bluesky-operated ones) index the blob asynchronously. If we publish the post immediately, the AppView can render "video not found" until indexing catches up — the delay ensures the post references a blob that's already retrievable. """ try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info( f"🎬 [PDS-direct] Uploading video to home PDS: {video_path} ({size_mb:.2f} MB)" ) response = client.upload_blob(video_bytes) blob = response.blob logging.info("✅ [PDS-direct] Video blob uploaded successfully.") # Give PDS / AppView time to index the blob before we reference it. wait_with_heartbeat(settle_delay_seconds, label="to index the video blob") return models.AppBskyEmbedVideo.Main( video=blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None # ============================================================ # Media upload — Video (video.bsky.app shared service) # ============================================================ def upload_video_via_bsky_service( client: Client, video_path: str, alt_text: str = "", ) -> models.AppBskyEmbedVideo.Main | None: """ Upload a video via the centralized video.bsky.app service. Used only when the configured PDS is on the official Bluesky network. """ try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info( f"🎬 [video.bsky.app] Uploading via shared service: {video_path} ({size_mb:.2f} MB)" ) VIDEO_HOST = "https://video.bsky.app" VIDEO_DID = "did:web:video.bsky.app" upload_auth = client.com.atproto.server.get_service_auth({ "aud": VIDEO_DID, "lxm": "app.bsky.video.uploadVideo", "exp": int(time.time()) + 60 * 30, }) user_did = client.me.did upload_url = ( f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" f"?did={user_did}&name={int(time.time())}.mp4" ) headers = { "Authorization": f"Bearer {upload_auth.token}", "Content-Type": "video/mp4", } upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=120) if upload_resp.status_code != 200: logging.error( f"❌ video.bsky.app upload failed: " f"{upload_resp.status_code} - {upload_resp.text}" ) return None job_id = upload_resp.json().get("jobId") if not job_id: logging.error("❌ No jobId returned from video service.") return None logging.info(f"⏳ Job {job_id} accepted — polling status...") status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 300 while time.time() < deadline: status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) if status_resp.status_code != 200: logging.error( f"❌ Job status check failed: " f"{status_resp.status_code} - {status_resp.text}" ) return None job_status = status_resp.json().get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") time.sleep(10) blob_dict = job_status.get("blob") if not blob_dict: logging.error("❌ No blob in completed job status.") return None blob_ref = models.BlobRef.from_dict(blob_dict) return models.AppBskyEmbedVideo.Main( video=blob_ref, alt=alt_text, ) if state == "JOB_STATE_FAILED": logging.error(f"❌ Video processing failed on Bluesky's servers: {job_status}") return None logging.info(" ...still processing...") time.sleep(3) logging.error("❌ Video processing timed out after 5 minutes.") return None except Exception as e: logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") return None # ============================================================ # Media upload — Video (smart dispatcher) # ============================================================ def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 15.0, ) -> models.AppBskyEmbedVideo.Main | None: """ Smart dispatcher: * Self-hosted / federated PDSes → direct PDS upload (with settle delay) * Official Bluesky network → video.bsky.app, fallback to PDS-direct """ if is_official_bsky_pds(service_url): logging.info(f"🌐 Detected official Bluesky PDS ({service_url}) — using video.bsky.app") embed = upload_video_via_bsky_service(client, video_path, alt_text=alt_text) if embed: return embed logging.warning( "⚠️ video.bsky.app upload failed; falling back to direct PDS upload." ) return upload_video_via_pds( client, video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.info(f"🌍 Detected self-hosted/federated PDS ({service_url}) — using direct upload") return upload_video_via_pds( client, video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) # ============================================================ # Post # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", service_url: str = "https://bsky.social", video_settle_delay: float = 15.0, ) -> bool: rich_text = make_rich(text) try: if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") video_embed = upload_video_smart( client, video_path, service_url=service_url, alt_text=alt_text, settle_delay_seconds=video_settle_delay, ) if not video_embed: logging.error("❌ Aborting post: video upload/processing failed.") return False logging.info("🚀 Sending video post...") result = client.send_post( text=rich_text, embed=video_embed, langs=langs, ) elif image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed = models.AppBskyEmbedImages.Main(images=[image]) logging.info("🚀 Sending image post...") result = client.send_post(text=rich_text, embed=embed, langs=langs) else: logging.info("🚀 Sending text post...") result = client.send_post(text=rich_text, langs=langs) uri = getattr(result, "uri", None) logging.info(f"✅ Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser( description="Post text + optional image or video to a Bluesky instance." ) parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file to attach") parser.add_argument("--video", default=None, help="Path to video file to attach") parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument( "--video-settle-delay", type=float, default=15.0, help="Seconds to wait after PDS-direct video upload before posting (default: 15)", ) args = parser.parse_args() client = Client(base_url=args.service) success = login_with_backoff( client, args.username, args.password, args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, service_url=args.service, video_settle_delay=args.video_settle_delay, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()