diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index b6df1a6..5a75fbb 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,4 +1,22 @@ #!/usr/bin/env python3 +""" +bsky_post.py โ€” Post text + optional image/video to Bluesky/federated PDS. + +This script is designed to be robust across different atproto SDK versions and +federated PDS setups. It includes: + +- Login retry/backoff +- Video upload via video.bsky.app (primary, recommended for playback compatibility) +- Correct service auth parameters: + - aud = did:web: + - lxm = com.atproto.repo.uploadBlob +- Handles 409 already_exists from video service by reusing jobId +- Avoids hard dependency on models.BlobRef (cross-version compatibility) +- Optional direct PDS video fallback +- Optional ffmpeg compression (enabled by default) +- Explicit createRecord payload with guaranteed plain-string "text" +""" + import argparse import logging import mimetypes @@ -19,6 +37,9 @@ import requests from atproto import Client, models +# ----------------------------------------------------------------------------- +# Retry configuration for login/backoff behavior +# ----------------------------------------------------------------------------- @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 @@ -27,7 +48,11 @@ class RetryConfig: login_jitter_seconds: float = 3.0 +# ----------------------------------------------------------------------------- +# Logging setup +# ----------------------------------------------------------------------------- def setup_logging() -> None: + """Configure structured logging to stdout.""" logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, @@ -35,17 +60,23 @@ def setup_logging() -> None: ) +# ----------------------------------------------------------------------------- +# Error classification helpers +# ----------------------------------------------------------------------------- def is_rate_limited_error(error_obj) -> bool: + """Detect common rate-limit signatures from exceptions.""" t = repr(error_obj).lower() return "429" in t or "ratelimit" in t or "too many requests" in t def is_auth_error(error_obj) -> bool: + """Detect authentication/authorization failures.""" t = repr(error_obj).lower() return "401" in t or "403" in t or "invalid identifier or password" in t def is_network_error(error_obj) -> bool: + """Detect transient network/transport/server errors.""" t = repr(error_obj) return any(x in t for x in [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", @@ -55,6 +86,9 @@ def is_network_error(error_obj) -> bool: def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: + """ + Try to extract a smart retry delay from response headers, else return default. + """ try: now_ts = int(time.time()) headers = getattr(error_obj, "headers", None) or {} @@ -70,10 +104,16 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: floa return default_delay +# ----------------------------------------------------------------------------- +# Login with retries and backoff +# ----------------------------------------------------------------------------- def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5 ) -> bool: + """ + Attempt login with exponential-ish backoff and jitter for transient failures. + """ for attempt in range(1, max_attempts + 1): try: logging.info(f"๐Ÿ”‘ Login attempt {attempt}/{max_attempts} โ†’ {service_url} as {username}") @@ -102,7 +142,13 @@ def login_with_backoff( return False +# ----------------------------------------------------------------------------- +# Generic utility helpers +# ----------------------------------------------------------------------------- def wait_with_heartbeat(total_seconds: float, label: str) -> None: + """ + Sleep with periodic logging, useful for processing/indexing waits. + """ if total_seconds <= 0: return logging.info(f"โณ Waiting {total_seconds:.0f}s for {label}...") @@ -117,6 +163,10 @@ def wait_with_heartbeat(total_seconds: float, label: str) -> None: def pds_did_from_service_url(service_url: str) -> str: + """ + Convert service URL host to did:web: used as getServiceAuth.aud. + Example: https://eurosky.social -> did:web:eurosky.social + """ host = (urlparse(service_url).hostname or "").lower() if not host: raise ValueError(f"Invalid --service URL: {service_url}") @@ -124,24 +174,42 @@ def pds_did_from_service_url(service_url: str) -> str: def random_video_name(ext: str = ".mp4") -> str: + """Generate a random upload filename for video service requests.""" token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) return f"{int(time.time())}_{token}{ext}" def detect_mime_type(path: str) -> str: + """Best-effort MIME type detection.""" mime, _ = mimetypes.guess_type(path) return mime or "application/octet-stream" +def _extract_service_auth_token(upload_auth) -> str | None: + """Extract token from typed/dict getServiceAuth responses.""" + token = getattr(upload_auth, "token", None) + if token: + return token + if isinstance(upload_auth, dict): + return upload_auth.get("token") + return None + + +# ----------------------------------------------------------------------------- +# ffmpeg compression helpers +# ----------------------------------------------------------------------------- def ffmpeg_exists() -> bool: + """Check if ffmpeg is installed.""" return shutil.which("ffmpeg") is not None def ffprobe_exists() -> bool: + """Check if ffprobe is installed.""" return shutil.which("ffprobe") is not None def get_video_duration_seconds(path: str) -> float | None: + """Read video duration via ffprobe; return None if unavailable.""" if not ffprobe_exists(): return None try: @@ -159,6 +227,13 @@ def get_video_duration_seconds(path: str) -> float | None: def compress_video_ffmpeg( input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96 ) -> str | None: + """ + Compress video to H.264/AAC MP4, attempting to fit target size. + Returns: + - compressed temp path (if smaller), + - original input path (if already small / compressed not smaller), + - None on failure. + """ if not ffmpeg_exists(): logging.error("โŒ ffmpeg not found. Install ffmpeg or use --no-compress-video.") return None @@ -173,12 +248,13 @@ def compress_video_ffmpeg( return input_path duration = get_video_duration_seconds(input_path) - target_video_k = 1200 + target_video_k = 1200 # fallback target bitrate if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = min(max(target_video_k, 300), 5000) + # Create a temporary output file for compressed video fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") os.close(fd) @@ -203,8 +279,11 @@ def compress_video_ffmpeg( subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"โœ… Compressed size: {out_size_mb:.2f} MB") + + # Keep compressed only if it is truly smaller if out_size_mb < src_size_mb: return out_path + os.remove(out_path) return input_path except subprocess.CalledProcessError as e: @@ -218,16 +297,20 @@ def compress_video_ffmpeg( return None -def _extract_service_auth_token(upload_auth) -> str | None: - token = getattr(upload_auth, "token", None) - if token: - return token - if isinstance(upload_auth, dict): - return upload_auth.get("token") - return None - - +# ----------------------------------------------------------------------------- +# Video upload paths +# ----------------------------------------------------------------------------- def upload_video_via_bsky_service(client: Client, video_path: str, service_url: str, alt_text: str = "") -> dict | None: + """ + Primary video upload path through https://video.bsky.app. + + IMPORTANT: + - getServiceAuth.aud must be your PDS DID (did:web:) + - getServiceAuth.lxm must be "com.atproto.repo.uploadBlob" + + Returns raw embed dict: + {"$type":"app.bsky.embed.video","video":,"alt":"..."} + """ if not os.path.exists(video_path): logging.error(f"โŒ Video file not found: {video_path}") return None @@ -241,6 +324,7 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) + # Request service-auth token from user's PDS for uploadBlob lexicon method try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=pds_did, @@ -249,6 +333,7 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: ) upload_auth = client.com.atproto.server.get_service_auth(params) except Exception: + # Fallback for SDK variants expecting dict input upload_auth = client.com.atproto.server.get_service_auth({ "aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", @@ -266,8 +351,10 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"} + # Upload video bytes to video service resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) + # 200 = accepted, 409 already_exists = dedupe hit (still usable with jobId) if resp.status_code not in (200, 409): logging.error(f"โŒ video.bsky.app upload failed: {resp.status_code} - {resp.text}") return None @@ -275,7 +362,7 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: body = resp.json() if resp.status_code == 409: if body.get("error") == "already_exists" and body.get("jobId"): - logging.info("โ„น๏ธ Video already processed on video.bsky.app. Reusing existing job.") + logging.info("โ„น๏ธ Video already processed on video.bsky.app. Reusing job.") else: logging.error(f"โŒ 409 without reusable jobId: {body}") return None @@ -285,6 +372,7 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: logging.error(f"โŒ Missing jobId in upload response: {body}") return None + # Poll processing status logging.info(f"โณ Job {job_id} accepted โ€” polling status...") status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 @@ -307,7 +395,7 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: wait_with_heartbeat(8, "CDN propagation") - # Return RAW embed dict (no models.BlobRef dependency) + # Return RAW lexicon embed dict (no models.BlobRef dependency) return { "$type": "app.bsky.embed.video", "video": blob, @@ -326,6 +414,12 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url: def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0) -> dict | None: + """ + Optional fallback: direct uploadBlob to PDS. + + Note: + - This can be less reliable for playback compatibility depending on client/AppView. + """ try: with open(video_path, "rb") as f: video_bytes = f.read() @@ -335,16 +429,14 @@ def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", se r = client.upload_blob(video_bytes) wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") - blob = getattr(r, "blob", None) - if blob is None: - d = r if isinstance(r, dict) else {} - blob = d.get("blob") + blob = getattr(r, "blob", None) + if blob is None and isinstance(r, dict): + blob = r.get("blob") if blob is None: logging.error("โŒ PDS uploadBlob returned no blob.") return None - # Also return raw embed dict return { "$type": "app.bsky.embed.video", "video": blob, @@ -359,6 +451,9 @@ def upload_video_smart( client: Client, video_path: str, service_url: str, alt_text: str, settle_delay_seconds: float, allow_pds_video_fallback: bool ) -> dict | None: + """ + Try video.bsky.app first. Optionally fallback to direct PDS upload. + """ logging.info(f"๐ŸŒ PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_bsky_service(client, video_path, service_url, alt_text) if embed: @@ -372,15 +467,23 @@ def upload_video_smart( return None +# ----------------------------------------------------------------------------- +# Image upload +# ----------------------------------------------------------------------------- def upload_image(client: Client, image_path: str, alt_text: str = "") -> dict | None: + """ + Upload image blob and return raw app.bsky.embed.images dict. + """ try: if not os.path.exists(image_path): logging.error(f"โŒ Image file not found: {image_path}") return None + mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"๐Ÿ–ผ๏ธ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") + r = client.upload_blob(data) blob = getattr(r, "blob", None) if blob is None and isinstance(r, dict): @@ -398,6 +501,9 @@ def upload_image(client: Client, image_path: str, alt_text: str = "") -> dict | return None +# ----------------------------------------------------------------------------- +# Post creation +# ----------------------------------------------------------------------------- def post_to_bsky( client: Client, text: str, @@ -409,12 +515,16 @@ def post_to_bsky( video_settle_delay: float, allow_pds_video_fallback: bool, ) -> bool: + """ + Build and send app.bsky.feed.post with optional media embed. + """ post_text = text.strip() if not post_text and not image_path and not video_path: logging.error("โŒ Empty post text with no media is not allowed.") return False embed_dict = None + if video_path: logging.info(f"๐ŸŽฌ Preparing video upload: {video_path}") embed_dict = upload_video_smart( @@ -428,12 +538,14 @@ def post_to_bsky( if not embed_dict: logging.error("โŒ Aborting post: video upload/processing failed.") return False + elif image_path: embed_dict = upload_image(client, image_path, alt_text) if not embed_dict: logging.error("โŒ Aborting post: image upload failed.") return False + # Explicit record: safest way to guarantee text and schema behavior record = { "$type": "app.bsky.feed.post", "text": post_text, @@ -446,6 +558,7 @@ def post_to_bsky( logging.info(f"๐Ÿงพ Final record text={record.get('text')!r}, has_embed={'embed' in record}") + # Typed call first; dict fallback for SDK differences try: data = models.ComAtprotoRepoCreateRecord.Data( repo=client.me.did, @@ -465,33 +578,44 @@ def post_to_bsky( return True +# ----------------------------------------------------------------------------- +# CLI entrypoint +# ----------------------------------------------------------------------------- def main(): setup_logging() + parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text content") - parser.add_argument("--username", required=True) - parser.add_argument("--password", required=True) - parser.add_argument("--service", default="https://bsky.social") - parser.add_argument("--lang", default="ca") - parser.add_argument("--image", default=None) - parser.add_argument("--video", default=None) - parser.add_argument("--alt", default="") - parser.add_argument("--video-settle-delay", type=float, default=30.0) - parser.add_argument("--allow-pds-video-fallback", action="store_true") + parser.add_argument("--username", required=True, help="Bluesky handle/email") + parser.add_argument("--password", required=True, help="Bluesky app password") + parser.add_argument("--service", default="https://bsky.social", help="PDS URL") + parser.add_argument("--lang", default="ca", help="Comma-separated language codes") + parser.add_argument("--image", default=None, help="Path to image file") + parser.add_argument("--video", default=None, help="Path to video file") + parser.add_argument("--alt", default="", help="Alt text for media") + parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait seconds") + parser.add_argument("--allow-pds-video-fallback", action="store_true", help="Allow direct PDS video fallback") - parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) - parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") - parser.add_argument("--max-video-mb", type=float, default=45.0) - parser.add_argument("--ffmpeg-crf", type=int, default=28) - parser.add_argument("--ffmpeg-preset", default="veryfast") + # Compression options (enabled by default) + parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True, + help="Compress video before upload (default: enabled)") + parser.add_argument("--no-compress-video", dest="compress_video", action="store_false", + help="Disable compression") + parser.add_argument("--max-video-mb", type=float, default=45.0, help="Target max size after compression") + parser.add_argument("--ffmpeg-crf", type=int, default=28, help="CRF (lower=better quality/larger)") + parser.add_argument("--ffmpeg-preset", default="veryfast", help="x264 preset") args = parser.parse_args() + # Exactly one media type at a time if args.image and args.video: logging.error("โŒ Use either --image or --video, not both.") sys.exit(1) + # Initialize client against selected PDS client = Client(base_url=args.service) + + # Login with retries if not login_with_backoff( client, args.username, args.password, args.service, RetryConfig.login_max_attempts, @@ -501,8 +625,10 @@ def main(): ): sys.exit(1) + # Parse languages langs = [x.strip() for x in args.lang.split(",") if x.strip()] + # Optional pre-upload compression for videos video_path_for_upload = args.video temp_compressed_path = None if args.video and args.compress_video: @@ -520,6 +646,7 @@ def main(): if compressed != args.video: temp_compressed_path = compressed + # Build and publish post ok = post_to_bsky( client=client, text=args.text, @@ -532,6 +659,7 @@ def main(): allow_pds_video_fallback=args.allow_pds_video_fallback, ) + # Cleanup temp compressed file, if any try: if temp_compressed_path and os.path.exists(temp_compressed_path): os.remove(temp_compressed_path)