From ef46ac5fd6ad776b4988b4b0b8490c0fb056e673 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Fri, 8 May 2026 19:34:02 +0000 Subject: [PATCH] revert 1f642299e51da8d31611ed8c548fc3cd02685b7f revert Text! --- twitter2bsky_daemon.py | 304 ++++++++++++----------------------------- 1 file changed, 88 insertions(+), 216 deletions(-) diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index 75d3a29..f2234ed 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -2,18 +2,18 @@ """ Post text + optional image/video to Bluesky/federated PDS. -Reliability strategy: -- Login via atproto SDK (only SDK usage). -- ALL media + record operations via raw XRPC (requests) to avoid - atproto SDK BlobRef serialization bugs across versions. -- Video uploads go through https://video.bsky.app (official path). -- getServiceAuth uses aud=did:web:, lxm=com.atproto.repo.uploadBlob. -- Handles 409 already_exists by reusing jobId. -- ffmpeg compression enabled by default. +Key reliability choices: +- Video uploads go through https://video.bsky.app first (best client playback compatibility). +- getServiceAuth uses: + aud = did:web: + lxm = com.atproto.repo.uploadBlob +- Handles 409 already_exists from video service by reusing jobId. +- Uses raw lexicon dict embeds (NO AppBskyEmbedVideo typed model), avoiding BlobRef SDK mismatch. +- Optional direct-PDS fallback for video. +- ffmpeg compression enabled by default (disable with --no-compress-video). """ import argparse -import json import logging import mimetypes import os @@ -124,99 +124,22 @@ def random_video_name(ext: str = ".mp4") -> str: return f"{int(time.time())}_{token}{ext}" -# --------------------------- -# Auth/session extraction (raw XRPC) -# --------------------------- -def get_session_info(client: Client) -> tuple[str, str, str]: - """ - Returns (did, access_jwt, pds_url) from the SDK client's session, - avoiding any lazy-loaded properties that may trigger BlobRef bugs. - """ - # atproto SDK exposes session via _session or me; pull safely. - did = None - access_jwt = None - - # Try common attribute paths across SDK versions - sess = getattr(client, "_session", None) or getattr(client, "session", None) - if sess is not None: - did = getattr(sess, "did", None) or (sess.get("did") if isinstance(sess, dict) else None) - access_jwt = ( - getattr(sess, "access_jwt", None) - or getattr(sess, "accessJwt", None) - or (sess.get("accessJwt") if isinstance(sess, dict) else None) - or (sess.get("access_jwt") if isinstance(sess, dict) else None) - ) - - if not did: - # Last resort: client.me โ€” but wrap to suppress BlobRef issues - try: - me = client.me - did = getattr(me, "did", None) - except Exception as e: - logging.error(f"โŒ Could not read client.me: {repr(e)}") - - if not access_jwt: - # Some SDK versions: client._session_dispatcher or similar - for attr in ("_access_jwt", "access_jwt"): - v = getattr(client, attr, None) - if v: - access_jwt = v - break - - if not did or not access_jwt: - raise RuntimeError("Unable to extract DID/accessJwt from atproto Client session.") - - pds_url = getattr(client, "_base_url", None) or getattr(client, "base_url", None) or "https://bsky.social" - return did, access_jwt, pds_url +def extract_token_from_service_auth(resp_obj) -> str | None: + tok = getattr(resp_obj, "token", None) + if tok: + return tok + if isinstance(resp_obj, dict): + return resp_obj.get("token") + return None -def xrpc_get(pds_url: str, access_jwt: str, method: str, params: dict, timeout: int = 30) -> dict: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = {"Authorization": f"Bearer {access_jwt}"} - r = requests.get(url, headers=headers, params=params, timeout=timeout) - r.raise_for_status() - return r.json() - - -def xrpc_post_json(pds_url: str, access_jwt: str, method: str, body: dict, timeout: int = 60) -> dict: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = { - "Authorization": f"Bearer {access_jwt}", - "Content-Type": "application/json", - } - r = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout) - if r.status_code >= 400: - raise RuntimeError(f"XRPC {method} failed: {r.status_code} {r.text}") - return r.json() - - -def xrpc_post_bytes(pds_url: str, access_jwt: str, method: str, data: bytes, content_type: str, timeout: int = 240) -> dict: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = { - "Authorization": f"Bearer {access_jwt}", - "Content-Type": content_type, - } - r = requests.post(url, headers=headers, data=data, timeout=timeout) - if r.status_code >= 400: - raise RuntimeError(f"XRPC {method} failed: {r.status_code} {r.text}") - return r.json() - - -def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, exp_seconds: int = 1800) -> str: - """ - Raw XRPC call to com.atproto.server.getServiceAuth โ€” avoids SDK typed coercion. - """ - body = xrpc_get( - pds_url=pds_url, - access_jwt=access_jwt, - method="com.atproto.server.getServiceAuth", - params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds}, - timeout=30, - ) - token = body.get("token") - if not token: - raise RuntimeError(f"getServiceAuth returned no token: {body}") - return token +def extract_blob_from_upload_blob_result(resp_obj): + blob = getattr(resp_obj, "blob", None) + if blob is not None: + return blob + if isinstance(resp_obj, dict): + return resp_obj.get("blob") + return None # --------------------------- @@ -317,40 +240,34 @@ def compress_video_ffmpeg( # --------------------------- -# Image upload (raw XRPC) +# Media upload: image # --------------------------- -def upload_image_embed_dict( - pds_url: str, - access_jwt: str, - image_path: str, - alt_text: str = "", -) -> dict | None: +def upload_image_embed_dict(client: Client, image_path: str, alt_text: str = "") -> dict | None: if not os.path.exists(image_path): logging.error(f"โŒ Image file not found: {image_path}") return None mime, _ = mimetypes.guess_type(image_path) - mime = mime or "image/jpeg" with open(image_path, "rb") as f: data = f.read() - logging.info(f"๐Ÿ–ผ๏ธ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") + logging.info(f"๐Ÿ–ผ๏ธ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime or 'unknown'})") try: - body = xrpc_post_bytes( - pds_url=pds_url, - access_jwt=access_jwt, - method="com.atproto.repo.uploadBlob", - data=data, - content_type=mime, - timeout=180, - ) - blob = body.get("blob") - if not blob: - logging.error(f"โŒ uploadBlob returned no blob: {body}") + up = client.upload_blob(data) + blob = extract_blob_from_upload_blob_result(up) + if blob is None: + logging.error("โŒ uploadBlob returned no blob for image.") return None + + # Raw lexicon dict embed (cross-SDK safe) return { "$type": "app.bsky.embed.images", - "images": [{"alt": alt_text or "", "image": blob}], + "images": [ + { + "alt": alt_text or "", + "image": blob, + } + ], } except Exception as e: logging.error(f"โŒ Image upload failed: {repr(e)}") @@ -358,12 +275,10 @@ def upload_image_embed_dict( # --------------------------- -# Video upload via video.bsky.app +# Media upload: video via video.bsky.app (primary) # --------------------------- def upload_video_via_video_service_embed_dict( - did: str, - access_jwt: str, - pds_url: str, + client: Client, video_path: str, service_url: str, alt_text: str = "", @@ -381,30 +296,31 @@ def upload_video_via_video_service_embed_dict( video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) - # Service auth token from user's PDS + # getServiceAuth from user's PDS with correct audience + method binding try: - token = get_service_auth_token( - pds_url=pds_url, - access_jwt=access_jwt, - aud=pds_did, - lxm="com.atproto.repo.uploadBlob", - exp_seconds=1800, + auth_resp = client.com.atproto.server.get_service_auth( + {"aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 1800} ) except Exception as e: logging.error(f"โŒ getServiceAuth failed: {repr(e)}") return None + token = extract_token_from_service_auth(auth_resp) + if not token: + logging.error("โŒ getServiceAuth returned no token.") + return None + upload_name = random_video_name(".mp4") logging.info(f"๐ŸŽž๏ธ Upload name: {upload_name}") - upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" + upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "video/mp4", } try: - r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=300) + r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) except Exception as e: logging.error(f"โŒ video upload request failed: {repr(e)}") return None @@ -413,12 +329,9 @@ def upload_video_via_video_service_embed_dict( logging.error(f"โŒ video.bsky.app upload failed: {r.status_code} - {r.text}") return None - try: - payload = r.json() - except Exception as e: - logging.error(f"โŒ Could not parse upload response JSON: {repr(e)} body={r.text[:500]}") - return None + payload = r.json() + # Dedupe path: reuse existing job if r.status_code == 409: if payload.get("error") == "already_exists" and payload.get("jobId"): logging.info("โ„น๏ธ Video already processed on video.bsky.app. Reusing existing job.") @@ -435,16 +348,10 @@ def upload_video_via_video_service_embed_dict( status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 600 # 10 min max poll - last_state = None while time.time() < deadline: try: - s = requests.get( - status_url, - params={"jobId": job_id}, - headers={"Authorization": f"Bearer {token}"}, - timeout=30, - ) + s = requests.get(status_url, params={"jobId": job_id}, timeout=30) except Exception as e: logging.warning(f"โš ๏ธ Status poll request failed once: {repr(e)}") time.sleep(3) @@ -454,19 +361,10 @@ def upload_video_via_video_service_embed_dict( logging.error(f"โŒ Job status failed: {s.status_code} - {s.text}") return None - try: - body = s.json() - except Exception as e: - logging.error(f"โŒ Could not parse status JSON: {repr(e)}") - return None - - job_status = body.get("jobStatus", {}) or {} + body = s.json() + job_status = body.get("jobStatus", {}) state = job_status.get("state") - if state != last_state: - logging.info(f" state โ†’ {state}") - last_state = state - if state == "JOB_STATE_COMPLETED": blob = job_status.get("blob") if not blob: @@ -475,7 +373,7 @@ def upload_video_via_video_service_embed_dict( wait_with_heartbeat(8, "CDN propagation") - # RAW embed dict โ€” no BlobRef typed conversion anywhere. + # RAW embed dict; no BlobRef conversion at all. return { "$type": "app.bsky.embed.video", "video": blob, @@ -486,6 +384,7 @@ def upload_video_via_video_service_embed_dict( logging.error(f"โŒ Video processing failed: {job_status}") return None + logging.info(f" ...still processing (state={state})...") time.sleep(3) logging.error("โŒ Video processing timed out.") @@ -493,11 +392,10 @@ def upload_video_via_video_service_embed_dict( # --------------------------- -# Direct PDS video fallback (rarely works on third-party PDS) +# Media upload: direct PDS fallback (optional) # --------------------------- def upload_video_via_pds_embed_dict( - pds_url: str, - access_jwt: str, + client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0, @@ -509,20 +407,13 @@ def upload_video_via_pds_embed_dict( try: with open(video_path, "rb") as f: b = f.read() + size_mb = len(b) / (1024 * 1024) logging.warning(f"๐ŸŽฌ [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") - - body = xrpc_post_bytes( - pds_url=pds_url, - access_jwt=access_jwt, - method="com.atproto.repo.uploadBlob", - data=b, - content_type="video/mp4", - timeout=600, # large timeout for direct PDS upload - ) - blob = body.get("blob") - if not blob: - logging.error(f"โŒ PDS uploadBlob returned no blob: {body}") + up = client.upload_blob(b) + blob = extract_blob_from_upload_blob_result(up) + if blob is None: + logging.error("โŒ PDS uploadBlob returned no blob.") return None wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") @@ -538,9 +429,7 @@ def upload_video_via_pds_embed_dict( def upload_video_smart_embed_dict( - did: str, - access_jwt: str, - pds_url: str, + client: Client, video_path: str, service_url: str, alt_text: str = "", @@ -549,9 +438,7 @@ def upload_video_smart_embed_dict( ) -> dict | None: logging.info(f"๐ŸŒ PDS ({service_url}). Trying video.bsky.app first.") embed = upload_video_via_video_service_embed_dict( - did=did, - access_jwt=access_jwt, - pds_url=pds_url, + client=client, video_path=video_path, service_url=service_url, alt_text=alt_text, @@ -562,8 +449,7 @@ def upload_video_smart_embed_dict( if allow_pds_video_fallback: logging.warning("โš ๏ธ video.bsky.app failed; trying direct PDS fallback.") return upload_video_via_pds_embed_dict( - pds_url=pds_url, - access_jwt=access_jwt, + client=client, video_path=video_path, alt_text=alt_text, settle_delay_seconds=settle_delay_seconds, @@ -574,12 +460,16 @@ def upload_video_smart_embed_dict( # --------------------------- -# Create post (raw XRPC) +# Create post # --------------------------- -def create_post_record(text: str, langs: list[str], embed_dict: dict | None = None) -> dict: +def create_post_record( + text: str, + langs: list[str], + embed_dict: dict | None = None, +) -> dict: record = { "$type": "app.bsky.feed.post", - "text": text.strip(), + "text": text.strip(), # must be plain string "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: @@ -589,20 +479,17 @@ def create_post_record(text: str, langs: list[str], embed_dict: dict | None = No return record -def publish_post(pds_url: str, access_jwt: str, did: str, record: dict) -> bool: +def publish_post(client: Client, record: dict) -> bool: try: - body = xrpc_post_json( - pds_url=pds_url, - access_jwt=access_jwt, - method="com.atproto.repo.createRecord", - body={ - "repo": did, + # Use dict payload directly for max cross-version compatibility. + resp = client.com.atproto.repo.create_record( + { + "repo": client.me.did, "collection": "app.bsky.feed.post", "record": record, - }, - timeout=60, + } ) - uri = body.get("uri") + uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) logging.info(f"โœ… Post published! URI: {uri}") return True except Exception as e: @@ -625,9 +512,10 @@ def main(): parser.add_argument("--image", default=None, help="Image path") parser.add_argument("--video", default=None, help="Video path") parser.add_argument("--alt", default="", help="Alt text") - parser.add_argument("--video-settle-delay", type=float, default=30.0) + parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait") parser.add_argument("--allow-pds-video-fallback", action="store_true") + # Compression ON by default parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") parser.add_argument("--max-video-mb", type=float, default=45.0) @@ -650,17 +538,6 @@ def main(): ): sys.exit(1) - # Capture session info ONCE โ€” avoid lazy SDK property access later. - try: - did, access_jwt, pds_url = get_session_info(client) - except Exception as e: - logging.error(f"โŒ Could not extract session: {repr(e)}") - sys.exit(1) - - # Override pds_url with the user-specified service (in case SDK normalized it) - pds_url = args.service.rstrip("/") - logging.info(f"๐Ÿ†” DID: {did} | PDS: {pds_url}") - langs = [x.strip() for x in args.lang.split(",") if x.strip()] video_path_for_upload = args.video @@ -686,9 +563,7 @@ def main(): if video_path_for_upload: logging.info(f"๐ŸŽฌ Preparing video upload: {video_path_for_upload}") embed_dict = upload_video_smart_embed_dict( - did=did, - access_jwt=access_jwt, - pds_url=pds_url, + client=client, video_path=video_path_for_upload, service_url=args.service, alt_text=args.alt, @@ -702,20 +577,17 @@ def main(): sys.exit(1) elif args.image: - embed_dict = upload_image_embed_dict( - pds_url=pds_url, - access_jwt=access_jwt, - image_path=args.image, - alt_text=args.alt, - ) + embed_dict = upload_image_embed_dict(client=client, image_path=args.image, alt_text=args.alt) if embed_dict is None: logging.error("โŒ Aborting post: image upload failed.") + if temp_compressed_path and os.path.exists(temp_compressed_path): + os.remove(temp_compressed_path) sys.exit(1) record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) logging.info(f"๐Ÿงพ Final record text={record.get('text')!r}, has_embed={'embed' in record}") - ok = publish_post(pds_url=pds_url, access_jwt=access_jwt, did=did, record=record) + ok = publish_post(client=client, record=record) if temp_compressed_path and os.path.exists(temp_compressed_path): try: