#!/usr/bin/env python3 """ bsky_post.py — Post text + optional image or video to a Bluesky instance. Usage examples: python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 python3 bsky_post.py "Dijous!!!!" --image thursday.jpg python3 bsky_post.py "Bon dia!" python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca """ import argparse import logging import mimetypes import os import sys import time import random import re import base64 import json from dataclasses import dataclass from atproto import Client, client_utils, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Text builder # ============================================================ def make_rich(content: str): text_builder = client_utils.TextBuilder() content = content.strip() lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): text_builder.link(word, word) elif word.startswith("#") and len(word) > 1: tag_name = word[1:].rstrip(".,;:!?)'\"") if tag_name: text_builder.tag(word, tag_name) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: """ Parse common rate-limit headers and return a bounded wait time in seconds. """ try: now_ts = int(time.time()) # Direct headers on exception headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass try: # Nested response headers response = getattr(error_obj, "response", None) headers = getattr(response, "headers", None) or {} now_ts = int(time.time()) retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass # repr fallback parsing text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info( f"🔑 Login attempt {attempt}/{max_attempts} → {service_url} as {username}" ) client.login(username, password) logging.info("✅ Login successful.") return True except Exception as e: logging.exception("❌ Login exception") # Fail fast on invalid credentials if is_auth_error(e): logging.error("❌ Bad credentials. Check handle/password.") return False # Respect explicit rate-limit timing if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait = wait + random.uniform(0, jitter) logging.warning( f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries due to rate limiting.") return False # Retry transient/network problems if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Transient login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries after transient/network errors.") return False # Unknown errors: bounded retry anyway if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue return False # ============================================================ # Media upload # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Image uploaded successfully.") return models.AppBskyEmbedImages.Image( image=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None def upload_video_and_wait( client: Client, video_data: bytes, alt_text: str = "" ) -> models.AppBskyEmbedVideo.Main | None: # --- Helper classes to mock the SDK's internal Session object --- class MockPayload: def __init__(self, exp): self.exp = exp class VideoSession: def __init__(self, token, handle, did): self.access_jwt = token self.refresh_jwt = "" self.handle = handle self.did = did # Default to 1 hour from now if decoding fails exp_time = int(time.time()) + 3600 try: parts = token.split('.') if len(parts) == 3: payload = parts[1] # Pad base64 string if necessary payload += '=' * (-len(payload) % 4) decoded = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8')) if 'exp' in decoded: exp_time = decoded['exp'] except Exception: pass # Assign as an object so the SDK can call .exp self.access_jwt_payload = MockPayload(exp_time) try: logging.info("🎬 Requesting Service Auth for Video Upload...") # 1. Get a Service Auth token from your home PDS VIDEO_SERVICE_DID = "did:web:video.bsky.app" auth_response = client.com.atproto.server.get_service_auth({ 'aud': VIDEO_SERVICE_DID }) service_auth_token = auth_response.token logging.info("🎬 Uploading video to Bluesky Video Service...") # 2. Create a temporary client pointing to the dedicated video service video_client = Client(base_url="https://video.bsky.app") # 3. Inject the Service Auth token using our mock session video_client._session = VideoSession( token=service_auth_token, handle=client.me.handle, did=client.me.did ) # 4. Upload the video to the official service response = video_client.app.bsky.video.upload_video(video_data) job_id = response.job_id logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...") # 5. Poll the job status using the video client while True: status_resp = video_client.app.bsky.video.get_job_status({'job_id': job_id}) state = status_resp.job_status.state if state == 'JOB_STATE_COMPLETED': logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") time.sleep(10) return models.AppBskyEmbedVideo.Main( video=status_resp.job_status.blob, alt=alt_text ) elif state == 'JOB_STATE_FAILED': logging.error("❌ Video processing failed on Bluesky's servers.") return None logging.info(" ...still processing...") time.sleep(3) except Exception as e: logging.error(f"❌ Failed to upload/process video: {repr(e)}") return None # ============================================================ # Post # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", password: str = "", ) -> bool: rich_text = make_rich(text) try: # --- VIDEO POSTING --- if video_path: logging.info(f"🎬 Preparing video upload: {video_path}") with open(video_path, "rb") as f: video_data = f.read() # Pass the password to our custom polling function # Use our custom polling function (no password needed) video_embed = upload_video_and_wait(client, video_data, alt_text) if not video_embed: logging.error("❌ Aborting post: video upload/processing failed.") return False logging.info(f"🚀 Sending video post...") result = client.send_post( text=rich_text, embed=video_embed, langs=langs ) # --- IMAGE POSTING --- elif image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed = models.AppBskyEmbedImages.Main(images=[image]) logging.info(f"🚀 Sending image post...") result = client.send_post(text=rich_text, embed=embed, langs=langs) # --- TEXT ONLY POSTING --- else: logging.info(f"🚀 Sending text post...") result = client.send_post(text=rich_text, langs=langs) uri = getattr(result, "uri", None) logging.info(f"✅ Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser( description="Post text + optional image or video to a Bluesky instance." ) parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file to attach") parser.add_argument("--video", default=None, help="Path to video file to attach") parser.add_argument("--alt", default="", help="Alt text for media") args = parser.parse_args() client = Client(base_url=args.service) success = login_with_backoff( client, args.username, args.password, args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, password=args.password, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()