diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index 0c58522..9633bb6 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -2,14 +2,16 @@ """ bsky_post.py — Post text + optional image or video to Bluesky/federated PDS. -Includes: +Fixes included: - Robust login backoff -- Reliable video upload via video.bsky.app -- Correct service auth (aud + lxm) -- 409 already_exists handling (reuse jobId) -- SDK compatibility for blob refs -- Explicit record creation (guaranteed text field) -- ffmpeg video compression (enabled by default) +- video.bsky.app first for videos +- Correct getServiceAuth: + aud = did:web: + lxm = com.atproto.repo.uploadBlob +- Handles 409 already_exists by reusing returned jobId +- SDK compatibility: no hard dependency on models.BlobRef +- Explicit createRecord payload so text is always a plain string +- ffmpeg compression (enabled by default, disable with --no-compress-video) """ import argparse @@ -90,12 +92,13 @@ def is_network_error(error_obj) -> bool: "502", "504", "ConnectionResetError", + "InvokeTimeoutError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: - error_text = repr(error_obj) + text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", @@ -107,7 +110,7 @@ def is_transient_error(error_obj) -> bool: "502", "504", ] - return any(signal in error_text for signal in transient_signals) + return any(signal in text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: @@ -149,7 +152,7 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: floa # ============================================================ -# Login with backoff +# Login # ============================================================ def login_with_backoff( client: Client, @@ -167,36 +170,35 @@ def login_with_backoff( client.login(username, password) logging.info("✅ Login successful.") return True - except Exception as e: logging.exception("❌ Login exception") if is_auth_error(e): - logging.error("❌ Bad credentials. Check handle/password.") + logging.error("❌ Authentication failed. Check username/app-password.") return False if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait += random.uniform(0, jitter) - logging.warning(f"⏳ Rate-limited. Retrying in {wait:.1f}s...") + logging.warning(f"⏳ Rate limited, retrying in {wait:.1f}s...") time.sleep(wait) continue - logging.error("❌ Exhausted login retries due to rate limiting.") + logging.error("❌ Exhausted retries due to rate limit.") return False if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) - logging.warning(f"⏳ Transient error. Retrying in {wait:.1f}s...") + logging.warning(f"⏳ Transient/network error, retrying in {wait:.1f}s...") time.sleep(wait) continue - logging.error("❌ Exhausted login retries after transient/network errors.") + logging.error("❌ Exhausted retries after network/transient errors.") return False if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) - logging.warning(f"⏳ Unknown login error. Retrying in {wait:.1f}s...") + logging.warning(f"⏳ Unknown error, retrying in {wait:.1f}s...") time.sleep(wait) continue @@ -224,7 +226,7 @@ def detect_mime_type(path: str) -> str: return fallbacks.get(ext, "application/octet-stream") -def wait_with_heartbeat(total_seconds: float, label: str = "processing") -> None: +def wait_with_heartbeat(total_seconds: float, label: str) -> None: if total_seconds <= 0: return logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") @@ -261,6 +263,11 @@ def model_to_dict(obj): def normalize_blob_for_embed(blob_dict: dict): + """ + Cross-version blob normalization: + - Use BlobRef if available + - else return raw dict (accepted by older/newer serializers) + """ BlobRef = getattr(models, "BlobRef", None) if BlobRef is not None: try: @@ -270,6 +277,15 @@ def normalize_blob_for_embed(blob_dict: dict): return blob_dict +def _extract_service_auth_token(upload_auth) -> str | None: + token = getattr(upload_auth, "token", None) + if token: + return token + if isinstance(upload_auth, dict): + return upload_auth.get("token") + return None + + # ============================================================ # ffmpeg compression # ============================================================ @@ -286,8 +302,7 @@ def get_video_duration_seconds(path: str) -> float | None: return None try: cmd = [ - "ffprobe", - "-v", "error", + "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, @@ -306,7 +321,7 @@ def compress_video_ffmpeg( audio_bitrate_k: int = 96, ) -> str | None: if not ffmpeg_exists(): - logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or run with --no-compress-video.") + logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") return None if not os.path.exists(input_path): @@ -317,12 +332,11 @@ def compress_video_ffmpeg( logging.info(f"📦 Source video size: {src_size_mb:.2f} MB") if src_size_mb <= max_size_mb: - logging.info("✅ Source video already under target size. Skipping compression.") + logging.info("✅ Already below target size. Skipping compression.") return input_path duration = get_video_duration_seconds(input_path) target_video_k = 1200 - if duration and duration > 0: total_kbps = (max_size_mb * 8192.0) / duration target_video_k = int(max(300, total_kbps - audio_bitrate_k)) @@ -332,8 +346,7 @@ def compress_video_ffmpeg( os.close(fd) cmd = [ - "ffmpeg", - "-y", + "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", preset, @@ -350,17 +363,16 @@ def compress_video_ffmpeg( try: logging.info( - f"🛠️ Compressing video (target≤{max_size_mb}MB, crf={crf}, preset={preset}, v_bitrate≈{target_video_k}k)..." + f"🛠️ Compressing video (target≤{max_size_mb}MB, crf={crf}, preset={preset}, v≈{target_video_k}k)..." ) subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - out_size_mb = os.path.getsize(out_path) / (1024 * 1024) - logging.info(f"✅ Compressed video size: {out_size_mb:.2f} MB") + logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB") if out_size_mb < src_size_mb: return out_path - logging.info("ℹ️ Compression not smaller than source. Using original.") + logging.info("ℹ️ Compressed file is not smaller. Using original.") try: os.remove(out_path) except Exception: @@ -379,13 +391,9 @@ def compress_video_ffmpeg( # ============================================================ -# Media upload — Image +# Uploads # ============================================================ -def upload_image( - client: Client, - image_path: str, - alt_text: str = "", -) -> models.AppBskyEmbedImages.Image | None: +def upload_image(client: Client, image_path: str, alt_text: str = ""): try: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") @@ -397,24 +405,18 @@ def upload_image( logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) - logging.info("✅ Image uploaded successfully.") - return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text) - except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None -# ============================================================ -# Media upload — Video via PDS direct fallback -# ============================================================ def upload_video_via_pds( client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, -) -> models.AppBskyEmbedVideo.Main | None: +): try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") @@ -425,33 +427,18 @@ def upload_video_via_pds( size_mb = len(video_bytes) / (1024 * 1024) logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") - response = client.upload_blob(video_bytes) blob = response.blob + logging.warning("⚠️ [PDS-direct fallback] Blob uploaded. Waiting for indexing...") - - wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing") - + wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text) - except Exception as e: logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") return None -# ============================================================ -# Media upload — Video via video.bsky.app (primary) -# ============================================================ -def _extract_service_auth_token(upload_auth) -> str | None: - token = getattr(upload_auth, "token", None) - if token: - return token - if isinstance(upload_auth, dict): - return upload_auth.get("token") - return None - - -def _poll_video_job(video_host: str, job_id: str, alt_text: str) -> models.AppBskyEmbedVideo.Main | None: +def _poll_video_job(video_host: str, job_id: str, alt_text: str): status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 @@ -471,8 +458,8 @@ def _poll_video_job(video_host: str, job_id: str, alt_text: str) -> models.AppBs logging.error(f"❌ No blob in completed job status: {status_json}") return None - wait_with_heartbeat(8, label="CDN propagation") - blob_obj = normalize_blob_for_embed(blob_dict) + wait_with_heartbeat(8, "CDN propagation") + blob_obj = normalize_blob_for_embed(blob_dict) # <- BlobRef-safe logging.info("✅ Video processed successfully.") return models.AppBskyEmbedVideo.Main(video=blob_obj, alt=alt_text) @@ -492,7 +479,7 @@ def upload_video_via_bsky_service( video_path: str, service_url: str, alt_text: str = "", -) -> models.AppBskyEmbedVideo.Main | None: +): try: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") @@ -504,9 +491,10 @@ def upload_video_via_bsky_service( size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") - VIDEO_HOST = "https://video.bsky.app" + video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) + # getServiceAuth with correct aud + lxm try: params = models.ComAtprotoServerGetServiceAuth.Params( aud=pds_did, @@ -528,20 +516,19 @@ def upload_video_via_bsky_service( logging.error("❌ Failed to extract service auth token.") return None - user_did = client.me.did upload_name = random_video_name(".mp4") logging.info(f"🎞️ Upload name: {upload_name}") upload_url = ( - f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" - f"?did={user_did}&name={upload_name}" + f"{video_host}/xrpc/app.bsky.video.uploadVideo" + f"?did={client.me.did}&name={upload_name}" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } - upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) + upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) if upload_resp.status_code not in (200, 409): logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") @@ -553,25 +540,22 @@ def upload_video_via_bsky_service( if body.get("error") == "already_exists" and body.get("jobId"): logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") else: - logging.error(f"❌ video.bsky.app returned 409 without reusable jobId: {body}") + logging.error(f"❌ 409 without reusable jobId: {body}") return None job_id = body.get("jobId") if not job_id: - logging.error(f"❌ No jobId returned from video service. Response: {body}") + logging.error(f"❌ No jobId in upload response: {body}") return None logging.info(f"⏳ Job {job_id} accepted — polling status...") - return _poll_video_job(VIDEO_HOST, job_id, alt_text=alt_text) + return _poll_video_job(video_host, job_id, alt_text) except Exception as e: logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") return None -# ============================================================ -# Video dispatcher -# ============================================================ def upload_video_smart( client: Client, video_path: str, @@ -579,7 +563,7 @@ def upload_video_smart( alt_text: str = "", settle_delay_seconds: float = 30.0, allow_pds_video_fallback: bool = False, -) -> models.AppBskyEmbedVideo.Main | None: +): logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_bsky_service( client=client, @@ -599,12 +583,12 @@ def upload_video_smart( settle_delay_seconds=settle_delay_seconds, ) - logging.error("❌ video.bsky.app failed. Not using direct fallback unless enabled.") + logging.error("❌ video.bsky.app failed and fallback disabled.") return None # ============================================================ -# Post creation +# Posting # ============================================================ def post_to_bsky( client: Client, @@ -641,7 +625,7 @@ def post_to_bsky( return False elif image_path: - image = upload_image(client, image_path, alt_text=alt_text) + image = upload_image(client, image_path, alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False @@ -655,7 +639,6 @@ def post_to_bsky( if langs: record["langs"] = langs - if embed_obj is not None: record["embed"] = model_to_dict(embed_obj) @@ -671,11 +654,7 @@ def post_to_bsky( ) except Exception: resp = client.com.atproto.repo.create_record( - { - "repo": client.me.did, - "collection": "app.bsky.feed.post", - "record": record, - } + {"repo": client.me.did, "collection": "app.bsky.feed.post", "record": record} ) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) @@ -695,42 +674,22 @@ def main(): parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser.add_argument("text", help="Post text content") - parser.add_argument("--username", required=True, help="Bluesky handle or email") + parser.add_argument("--username", required=True, help="Bluesky handle/email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="PDS URL") - parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") + parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--image", default=None, help="Path to image file") parser.add_argument("--video", default=None, help="Path to video file") parser.add_argument("--alt", default="", help="Alt text for media") - parser.add_argument( - "--video-settle-delay", - type=float, - default=30.0, - help="Seconds to wait after direct-PDS fallback upload before posting.", - ) - parser.add_argument( - "--allow-pds-video-fallback", - action="store_true", - help="Allow direct PDS video fallback if video.bsky.app fails.", - ) + parser.add_argument("--video-settle-delay", type=float, default=30.0) + parser.add_argument("--allow-pds-video-fallback", action="store_true") # Compression defaults ON - parser.add_argument( - "--compress-video", - dest="compress_video", - action="store_true", - default=True, - help="Compress video with ffmpeg before upload (default: enabled).", - ) - parser.add_argument( - "--no-compress-video", - dest="compress_video", - action="store_false", - help="Disable ffmpeg compression.", - ) - parser.add_argument("--max-video-mb", type=float, default=45.0, help="Target max size (MB) after compression.") - parser.add_argument("--ffmpeg-crf", type=int, default=28, help="ffmpeg CRF (lower=better quality, larger file).") - parser.add_argument("--ffmpeg-preset", default="veryfast", help="ffmpeg preset (ultrafast..veryslow).") + parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) + parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") + parser.add_argument("--max-video-mb", type=float, default=45.0) + parser.add_argument("--ffmpeg-crf", type=int, default=28) + parser.add_argument("--ffmpeg-preset", default="veryfast") args = parser.parse_args() @@ -740,7 +699,7 @@ def main(): client = Client(base_url=args.service) - success = login_with_backoff( + if not login_with_backoff( client=client, username=args.username, password=args.password, @@ -749,8 +708,7 @@ def main(): base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, - ) - if not success: + ): sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] @@ -773,7 +731,7 @@ def main(): if compressed != args.video: temp_compressed_path = compressed - post_success = post_to_bsky( + ok = post_to_bsky( client=client, text=args.text, langs=langs, @@ -792,7 +750,7 @@ def main(): except Exception: pass - if not post_success: + if not ok: sys.exit(1)