#!/usr/bin/env python3 """ bsky_post.py — Post text + optional image or video to a Bluesky instance. Usage examples: python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 python3 bsky_post.py "Dijous!!!!" --image thursday.jpg python3 bsky_post.py "Bon dia!" python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca """ import argparse import logging import mimetypes import os import sys import time import random import re from dataclasses import dataclass from atproto import Client, client_utils, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Text builder # ============================================================ def make_rich(content: str): text_builder = client_utils.TextBuilder() content = content.strip() lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): text_builder.link(word, word) elif word.startswith("#") and len(word) > 1: tag_name = word[1:].rstrip(".,;:!?)'\"") if tag_name: text_builder.tag(word, tag_name) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: """ Parse common rate-limit headers and return a bounded wait time in seconds. """ try: now_ts = int(time.time()) # Direct headers on exception headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass try: # Nested response headers response = getattr(error_obj, "response", None) headers = getattr(response, "headers", None) or {} now_ts = int(time.time()) retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass # repr fallback parsing text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info( f"🔑 Login attempt {attempt}/{max_attempts} → {service_url} as {username}" ) client.login(username, password) logging.info("✅ Login successful.") return True except Exception as e: logging.exception("❌ Login exception") # Fail fast on invalid credentials if is_auth_error(e): logging.error("❌ Bad credentials. Check handle/password.") return False # Respect explicit rate-limit timing if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait = wait + random.uniform(0, jitter) logging.warning( f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries due to rate limiting.") return False # Retry transient/network problems if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Transient login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries after transient/network errors.") return False # Unknown errors: bounded retry anyway if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue return False # ============================================================ # Media upload # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Image uploaded successfully.") return models.AppBskyEmbedImages.Image( image=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None def upload_video_and_wait( client: Client, video_data: bytes, alt_text: str = "" ) -> models.AppBskyEmbedVideo.Main | None: try: logging.info("🎬 Requesting Service Auth for Video Upload...") # 1. Get a Service Auth token from your home PDS (eurosky.social) # The audience (aud) MUST be the DID of the Bluesky Video Service VIDEO_SERVICE_DID = "did:web:video.bsky.app" auth_response = client.com.atproto.server.get_service_auth({ 'aud': VIDEO_SERVICE_DID }) service_auth_token = auth_response.token logging.info("🎬 Uploading video to Bluesky Video Service...") # 2. Create a temporary client pointing to the dedicated video service video_client = Client(base_url="https://video.bsky.app") # 3. Inject the Service Auth token into the headers # We don't call .login() on this client; we just set the authorization header manually. video_client._session = models.ComAtprotoServerCreateSession.Response( access_jwt=service_auth_token, refresh_jwt="", handle=client.me.handle, did=client.me.did, active=True ) # 4. Upload the video to the official service response = video_client.app.bsky.video.upload_video(video_data) job_id = response.job_id logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...") # 5. Poll the job status using the video client while True: status_resp = video_client.app.bsky.video.get_job_status({'job_id': job_id}) state = status_resp.job_status.state if state == 'JOB_STATE_COMPLETED': logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") time.sleep(10) return models.AppBskyEmbedVideo.Main( video=status_resp.job_status.blob, alt=alt_text ) elif state == 'JOB_STATE_FAILED': logging.error("❌ Video processing failed on Bluesky's servers.") return None logging.info(" ...still processing...") time.sleep(3) except Exception as e: logging.error(f"❌ Failed to upload/process video: {repr(e)}") return None # ============================================================ # Post # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", password: str = "", ) -> bool: rich_text = make_rich(text) try: # --- VIDEO POSTING --- if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") with open(video_path, "rb") as f: video_data = f.read() # Pass the password to our custom polling function # Use our custom polling function (no password needed) video_embed = upload_video_and_wait(client, video_data, alt_text) if not video_embed: logging.error("❌ Aborting post: video upload/processing failed.") return False logging.info(f"🚀 Sending video post...") result = client.send_post( text=rich_text, embed=video_embed, langs=langs ) # --- IMAGE POSTING --- elif image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed = models.AppBskyEmbedImages.Main(images=[image]) logging.info(f"🚀 Sending image post...") result = client.send_post(text=rich_text, embed=embed, langs=langs) # --- TEXT ONLY POSTING --- else: logging.info(f"🚀 Sending text post...") result = client.send_post(text=rich_text, langs=langs) uri = getattr(result, "uri", None) logging.info(f"✅ Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser( description="Post text + optional image or video to a Bluesky instance." ) parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file to attach") parser.add_argument("--video", default=None, help="Path to video file to attach") parser.add_argument("--alt", default="", help="Alt text for media") args = parser.parse_args() client = Client(base_url=args.service) success = login_with_backoff( client, args.username, args.password, args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, password=args.password, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()