#!/usr/bin/env python3 """ bsky_post.py โ€” Post text + optional image or video to Bluesky/federated PDS. Examples: python3 bsky_post.py "DIVENDRES!!!!" --video media/divendres.mp4 --username you --password app-pass --service https://eurosky.social python3 bsky_post.py "Dijous!!!!" --image media/dijous.jpg --username you --password app-pass --service https://eurosky.social python3 bsky_post.py "Bon dia!" --username you --password app-pass --service https://eurosky.social """ import argparse import logging import mimetypes import os import random import re import sys import time from dataclasses import dataclass from urllib.parse import urlparse import requests from atproto import Client, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3.0 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info(f"๐Ÿ”‘ Login attempt {attempt}/{max_attempts} โ†’ {service_url} as {username}") client.login(username, password) logging.info("โœ… Login successful.") return True except Exception as e: logging.exception("โŒ Login exception") if is_auth_error(e): logging.error("โŒ Bad credentials. Check handle/password.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait += random.uniform(0, jitter) logging.warning(f"โณ Rate-limited. Retrying in {wait:.1f}s...") time.sleep(wait) continue logging.error("โŒ Exhausted login retries due to rate limiting.") return False if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"โณ Transient error. Retrying in {wait:.1f}s...") time.sleep(wait) continue logging.error("โŒ Exhausted login retries after transient/network errors.") return False if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"โณ Unknown login error. Retrying in {wait:.1f}s...") time.sleep(wait) continue return False # ============================================================ # Utility # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def wait_with_heartbeat(total_seconds: float, label: str = "processing") -> None: if total_seconds <= 0: return logging.info(f"โณ Waiting {total_seconds:.0f}s for {label}...") remaining = total_seconds while remaining > 0: step = min(5.0, remaining) time.sleep(step) remaining -= step if remaining > 0: logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") logging.info("โœ… Wait complete.") def pds_did_from_service_url(service_url: str) -> str: host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") return f"did:web:{host}" def model_to_dict(obj): if obj is None: return None if hasattr(obj, "model_dump"): return obj.model_dump(by_alias=True, exclude_none=True) if hasattr(obj, "dict"): return obj.dict(by_alias=True, exclude_none=True) return obj # ============================================================ # Media upload โ€” Image # ============================================================ def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: if not os.path.exists(image_path): logging.error(f"โŒ Image file not found: {image_path}") return None mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"๐Ÿ–ผ๏ธ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("โœ… Image uploaded successfully.") return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text) except Exception as e: logging.error(f"โŒ Failed to upload image: {repr(e)}") return None # ============================================================ # Media upload โ€” Video via PDS direct fallback # ============================================================ def upload_video_via_pds( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, ) -> models.AppBskyEmbedVideo.Main | None: """ Direct upload to home PDS via upload_blob. Fallback only. Can be less reliable for playback in clients. """ try: if not os.path.exists(video_path): logging.error(f"โŒ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.warning(f"๐ŸŽฌ [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") response = client.upload_blob(video_bytes) blob = response.blob logging.warning("โš ๏ธ [PDS-direct fallback] Blob uploaded. Waiting for indexing...") wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing") return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text) except Exception as e: logging.error(f"โŒ PDS-direct video upload failed: {repr(e)}") return None # ============================================================ # Media upload โ€” Video via video.bsky.app (primary) # ============================================================ def _extract_service_auth_token(upload_auth) -> str | None: token = getattr(upload_auth, "token", None) if token: return token if isinstance(upload_auth, dict): return upload_auth.get("token") return None def upload_video_via_bsky_service( client: Client, video_path: str, service_url: str, alt_text: str = "", ) -> models.AppBskyEmbedVideo.Main | None: """ Upload via centralized video.bsky.app service. Critical compatibility fixes: - aud must be user's PDS DID (e.g. did:web:eurosky.social) - lxm must be com.atproto.repo.uploadBlob """ try: if not os.path.exists(video_path): logging.error(f"โŒ Video file not found: {video_path}") return None with open(video_path, "rb") as f: video_bytes = f.read() size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"๐ŸŽฌ [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") VIDEO_HOST = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) # Some atproto versions prefer typed params, others accept dict try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=pds_did, lxm="com.atproto.repo.uploadBlob", # <-- critical fix exp=int(time.time()) + 60 * 30, ) upload_auth = client.com.atproto.server.get_service_auth(params) except Exception: upload_auth = client.com.atproto.server.get_service_auth( { "aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", # <-- critical fix "exp": int(time.time()) + 60 * 30, } ) token = _extract_service_auth_token(upload_auth) if not token: logging.error("โŒ Failed to extract service auth token.") return None user_did = client.me.did upload_url = ( f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" f"?did={user_did}&name={int(time.time())}.mp4" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) if upload_resp.status_code != 200: logging.error(f"โŒ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") return None body = upload_resp.json() job_id = body.get("jobId") if not job_id: logging.error(f"โŒ No jobId returned from video service. Response: {body}") return None logging.info(f"โณ Job {job_id} accepted โ€” polling status...") status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 while time.time() < deadline: status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) if status_resp.status_code != 200: logging.error(f"โŒ Job status check failed: {status_resp.status_code} - {status_resp.text}") return None status_json = status_resp.json() job_status = status_json.get("jobStatus", {}) state = job_status.get("state") if state == "JOB_STATE_COMPLETED": blob_dict = job_status.get("blob") if not blob_dict: logging.error(f"โŒ No blob in completed job status: {status_json}") return None wait_with_heartbeat(8, label="CDN propagation") blob_ref = models.BlobRef.from_dict(blob_dict) logging.info("โœ… Video processed successfully.") return models.AppBskyEmbedVideo.Main(video=blob_ref, alt=alt_text) if state == "JOB_STATE_FAILED": logging.error(f"โŒ Video processing failed: {job_status}") return None logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("โŒ Video processing timed out.") return None except Exception as e: logging.error(f"โŒ video.bsky.app upload failed: {repr(e)}") return None # ============================================================ # Video dispatcher # ============================================================ def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, ) -> models.AppBskyEmbedVideo.Main | None: logging.info(f"๐ŸŒ PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_bsky_service( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, ) if embed: return embed if allow_pds_video_fallback: logging.warning("โš ๏ธ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds( client=client, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, ) logging.error("โŒ video.bsky.app failed. Not using direct fallback unless enabled.") return None # ============================================================ # Post creation (explicit record to guarantee text string) # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", service_url: str = "https://bsky.social", video_settle_delay: float = 30.0, allow_pds_video_fallback: bool = False, ) -> bool: post_text = text.strip() if not post_text and not image_path and not video_path: logging.error("โŒ Empty post text with no media is not allowed.") return False try: embed_obj = None if video_path: logging.info(f"๐ŸŽฌ Preparing video upload: {video_path}") embed_obj = upload_video_smart( client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, settle_delay_seconds=video_settle_delay, allow_pds_video_fallback=allow_pds_video_fallback, ) if not embed_obj: logging.error("โŒ Aborting post: video upload/processing failed.") return False elif image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("โŒ Aborting post: image upload failed.") return False embed_obj = models.AppBskyEmbedImages.Main(images=[image]) record = { "$type": "app.bsky.feed.post", "text": post_text, # guaranteed plain string "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: record["langs"] = langs if embed_obj is not None: record["embed"] = model_to_dict(embed_obj) logging.info(f"๐Ÿงพ Final record text={record.get('text')!r}, has_embed={'embed' in record}") # typed first, dict fallback for compatibility try: resp = client.com.atproto.repo.create_record( models.ComAtprotoRepoCreateRecord.Data( repo=client.me.did, collection="app.bsky.feed.post", record=record, ) ) except Exception: resp = client.com.atproto.repo.create_record( { "repo": client.me.did, "collection": "app.bsky.feed.post", "record": record, } ) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) logging.info(f"โœ… Post published! URI: {uri}") return True except Exception as e: logging.error(f"โŒ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file") parser.add_argument("--video", default=None, help="Path to video file") parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument( "--video-settle-delay", type=float, default=30.0, help="Seconds to wait after direct-PDS fallback upload before posting.", ) parser.add_argument( "--allow-pds-video-fallback", action="store_true", help="Allow direct PDS video fallback if video.bsky.app fails.", ) args = parser.parse_args() if args.image and args.video: logging.error("โŒ Use either --image or --video, not both.") sys.exit(1) client = Client(base_url=args.service) success = login_with_backoff( client=client, username=args.username, password=args.password, service_url=args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client=client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, service_url=args.service, video_settle_delay=args.video_settle_delay, allow_pds_video_fallback=args.allow_pds_video_fallback, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()