From 5d2cec8b3148e7e36c848066afdffa24c8552a3c Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Fri, 8 May 2026 19:22:06 +0000 Subject: [PATCH] Test --- bsky_post.py | 575 +++++++++++++++++++++++---------------------------- 1 file changed, 258 insertions(+), 317 deletions(-) diff --git a/bsky_post.py b/bsky_post.py index 48c99d3..b374ec1 100644 --- a/bsky_post.py +++ b/bsky_post.py @@ -2,12 +2,13 @@ """ Post text + optional image/video to Bluesky/federated PDS. -Strategy: -- 100% raw XRPC via `requests` β€” no atproto SDK dependency at all. -- Avoids all SDK BlobRef/typed-model serialization bugs. -- Video uploads go through https://video.bsky.app (official path). -- getServiceAuth uses aud=did:web:, lxm=com.atproto.repo.uploadBlob. -- Handles 409 already_exists by reusing jobId. +Reliability features: +- Raw XRPC via requests (no atproto SDK serialization pitfalls). +- Hardened HTTP transport with retry adapter + longer read timeouts. +- Login fallback hosts via --auth-hosts (comma-separated). +- Video upload through https://video.bsky.app with proper service auth. +- 409 already_exists support (reuses jobId). +- Optional direct PDS fallback for video. - ffmpeg compression enabled by default. """ @@ -27,25 +28,47 @@ import time from urllib.parse import urlparse import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry -# --------------------------- +# ----------------------------------------------------------------------------- # Logging -# --------------------------- +# ----------------------------------------------------------------------------- def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO, stream=sys.stdout, ) - # Silence noisy third-party libs - for noisy in ("httpx", "httpcore", "urllib3"): + for noisy in ("urllib3",): logging.getLogger(noisy).setLevel(logging.WARNING) -# --------------------------- -# Error classification -# --------------------------- +# ----------------------------------------------------------------------------- +# HTTP session with retries +# ----------------------------------------------------------------------------- +def build_http_session() -> requests.Session: + s = requests.Session() + retry = Retry( + total=4, + connect=4, + read=4, + backoff_factor=0.8, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=frozenset(["GET", "POST"]), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20) + s.mount("https://", adapter) + s.mount("http://", adapter) + s.headers.update({"User-Agent": "post2bsky/1.0"}) + return s + + +# ----------------------------------------------------------------------------- +# Status helpers +# ----------------------------------------------------------------------------- def is_auth_error_status(status: int) -> bool: return status in (400, 401, 403) @@ -55,102 +78,143 @@ def is_rate_limited_status(status: int) -> bool: def is_transient_status(status: int) -> bool: - return status in (500, 502, 503, 504) + return status in (408, 425, 429, 500, 502, 503, 504) -# --------------------------- +# ----------------------------------------------------------------------------- # Raw XRPC helpers -# --------------------------- -def xrpc_get(pds_url: str, method: str, params: dict, access_jwt: str | None = None, timeout: int = 30) -> tuple[int, dict | str]: +# ----------------------------------------------------------------------------- +def xrpc_get( + http: requests.Session, + pds_url: str, + method: str, + params: dict, + access_jwt: str | None = None, + timeout=(10, 60), +) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" - r = requests.get(url, headers=headers, params=params, timeout=timeout) + r = http.get(url, headers=headers, params=params, timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text -def xrpc_post_json(pds_url: str, method: str, body: dict, access_jwt: str | None = None, timeout: int = 60) -> tuple[int, dict | str]: +def xrpc_post_json( + http: requests.Session, + pds_url: str, + method: str, + body: dict, + access_jwt: str | None = None, + timeout=(10, 90), +) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {"Content-Type": "application/json"} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" - r = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout) + r = http.post(url, headers=headers, data=json.dumps(body), timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text -def xrpc_post_bytes(pds_url: str, method: str, data: bytes, content_type: str, access_jwt: str | None = None, timeout: int = 600) -> tuple[int, dict | str]: +def xrpc_post_bytes( + http: requests.Session, + pds_url: str, + method: str, + data: bytes, + content_type: str, + access_jwt: str | None = None, + timeout=(20, 900), +) -> tuple[int, dict | str]: url = f"{pds_url.rstrip('/')}/xrpc/{method}" headers = {"Content-Type": content_type} if access_jwt: headers["Authorization"] = f"Bearer {access_jwt}" - r = requests.post(url, headers=headers, data=data, timeout=timeout) + r = http.post(url, headers=headers, data=data, timeout=timeout) try: return r.status_code, r.json() except Exception: return r.status_code, r.text -# --------------------------- -# Login (raw XRPC, no SDK) -# --------------------------- -def login_with_backoff( - pds_url: str, +# ----------------------------------------------------------------------------- +# Login with fallback hosts +# ----------------------------------------------------------------------------- +def login_on_host( + http: requests.Session, + host_url: str, identifier: str, password: str, max_attempts: int = 5, - base_delay: float = 8.0, - max_delay: float = 120.0, + base_delay: float = 6.0, + max_delay: float = 60.0, ) -> dict | None: - """ - Returns session dict {did, accessJwt, refreshJwt, handle} or None. - """ for attempt in range(1, max_attempts + 1): try: - logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {pds_url} as {identifier}") + logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {host_url} as {identifier}") status, body = xrpc_post_json( - pds_url=pds_url, + http=http, + pds_url=host_url, method="com.atproto.server.createSession", body={"identifier": identifier, "password": password}, - timeout=30, + timeout=(10, 75), ) - if status == 200 and isinstance(body, dict) and body.get("accessJwt"): - logging.info("βœ… Login successful.") + if status == 200 and isinstance(body, dict) and body.get("accessJwt") and body.get("did"): + logging.info(f"βœ… Login successful on {host_url}.") return body - logging.error(f"❌ Login failed: HTTP {status} body={body}") + logging.error(f"❌ Login failed on {host_url}: HTTP {status} body={body}") if is_auth_error_status(status): - logging.error("❌ Authentication failed. Check handle/app-password.") return None - if attempt >= max_attempts: - return None - if is_rate_limited_status(status) or is_transient_status(status): - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) - logging.warning(f"⏳ Retrying login in {wait:.1f}s...") + + if attempt < max_attempts and (is_rate_limited_status(status) or is_transient_status(status)): + wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) + logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...") time.sleep(wait) - else: - return None + continue + + if attempt < max_attempts: + wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) + time.sleep(wait) + continue + + return None + except requests.RequestException as e: - logging.exception("❌ Login request error") + logging.warning(f"⚠️ Login request error on {host_url}: {repr(e)}") if attempt >= max_attempts: return None - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) - logging.warning(f"⏳ Retrying login in {wait:.1f}s...") + wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) + logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...") time.sleep(wait) + return None -# --------------------------- -# Generic helpers -# --------------------------- +def login_with_fallback_hosts( + http: requests.Session, + auth_hosts: list[str], + identifier: str, + password: str, +) -> tuple[dict | None, str | None]: + for host in auth_hosts: + session = login_on_host(http, host, identifier, password) + if session: + return session, host + logging.warning(f"⚠️ Auth host failed: {host}") + return None, None + + +# ----------------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------------- def wait_with_heartbeat(seconds: float, label: str) -> None: if seconds <= 0: return @@ -177,13 +241,21 @@ def random_video_name(ext: str = ".mp4") -> str: return f"{int(time.time())}_{token}{ext}" -def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, exp_seconds: int = 1800) -> str | None: +def get_service_auth_token( + http: requests.Session, + pds_url_for_auth: str, + access_jwt: str, + aud: str, + lxm: str, + exp_seconds: int = 1800, +) -> str | None: status, body = xrpc_get( - pds_url=pds_url, + http=http, + pds_url=pds_url_for_auth, method="com.atproto.server.getServiceAuth", params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds}, access_jwt=access_jwt, - timeout=30, + timeout=(10, 60), ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ getServiceAuth failed: HTTP {status} body={body}") @@ -195,9 +267,9 @@ def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, ex return token -# --------------------------- -# ffmpeg compression -# --------------------------- +# ----------------------------------------------------------------------------- +# ffmpeg +# ----------------------------------------------------------------------------- def ffmpeg_exists() -> bool: return shutil.which("ffmpeg") is not None @@ -233,7 +305,7 @@ def compress_video_ffmpeg( audio_bitrate_k: int = 96, ) -> str | None: if not ffmpeg_exists(): - logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") + logging.error("❌ ffmpeg not found in PATH.") return None if not os.path.exists(input_path): logging.error(f"❌ Video file not found: {input_path}") @@ -256,23 +328,19 @@ def compress_video_ffmpeg( os.close(fd) cmd = [ - "ffmpeg", "-y", - "-i", input_path, - "-c:v", "libx264", - "-preset", preset, - "-crf", str(crf), + "ffmpeg", "-y", "-i", input_path, + "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-b:v", f"{target_video_k}k", "-maxrate", f"{int(target_video_k * 1.3)}k", "-bufsize", f"{int(target_video_k * 2)}k", "-vf", "scale='min(1280,iw)':-2", - "-c:a", "aac", - "-b:a", f"{audio_bitrate_k}k", + "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", "-movflags", "+faststart", out_path, ] try: - logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") + logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB)...") subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out_size_mb = os.path.getsize(out_path) / (1024 * 1024) logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") @@ -284,7 +352,7 @@ def compress_video_ffmpeg( except subprocess.CalledProcessError as e: logging.error("❌ ffmpeg compression failed.") if e.stderr: - logging.error(e.stderr[-2000:]) + logging.error(e.stderr[-1500:]) try: os.remove(out_path) except Exception: @@ -292,56 +360,45 @@ def compress_video_ffmpeg( return None -# --------------------------- -# Image upload -# --------------------------- -def upload_image_embed_dict( - pds_url: str, - access_jwt: str, - image_path: str, - alt_text: str = "", -) -> dict | None: +# ----------------------------------------------------------------------------- +# Uploads +# ----------------------------------------------------------------------------- +def upload_image_embed_dict(http: requests.Session, pds_url: str, access_jwt: str, image_path: str, alt_text: str) -> dict | None: if not os.path.exists(image_path): logging.error(f"❌ Image file not found: {image_path}") return None - mime, _ = mimetypes.guess_type(image_path) mime = mime or "image/jpeg" with open(image_path, "rb") as f: data = f.read() - logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") status, body = xrpc_post_bytes( + http=http, pds_url=pds_url, method="com.atproto.repo.uploadBlob", data=data, content_type=mime, access_jwt=access_jwt, - timeout=180, + timeout=(20, 240), ) - if status != 200 or not isinstance(body, dict): + if status != 200 or not isinstance(body, dict) or not body.get("blob"): logging.error(f"❌ Image uploadBlob failed: HTTP {status} body={body}") return None - blob = body.get("blob") - if not blob: - logging.error(f"❌ uploadBlob returned no blob: {body}") - return None + return { "$type": "app.bsky.embed.images", - "images": [{"alt": alt_text or "", "image": blob}], + "images": [{"alt": alt_text or "", "image": body["blob"]}], } -# --------------------------- -# Video upload via video.bsky.app -# --------------------------- def upload_video_via_video_service_embed_dict( + http: requests.Session, did: str, access_jwt: str, - pds_url: str, - video_path: str, + pds_url_for_auth: str, service_url: str, - alt_text: str = "", + video_path: str, + alt_text: str, ) -> dict | None: if not os.path.exists(video_path): logging.error(f"❌ Video file not found: {video_path}") @@ -353,11 +410,10 @@ def upload_video_via_video_service_embed_dict( size_mb = len(video_bytes) / (1024 * 1024) logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") - video_host = "https://video.bsky.app" pds_did = pds_did_from_service_url(service_url) - token = get_service_auth_token( - pds_url=pds_url, + http=http, + pds_url_for_auth=pds_url_for_auth, access_jwt=access_jwt, aud=pds_did, lxm="com.atproto.repo.uploadBlob", @@ -367,18 +423,13 @@ def upload_video_via_video_service_embed_dict( return None upload_name = random_video_name(".mp4") - logging.info(f"🎞️ Upload name: {upload_name}") - - upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "video/mp4", - } + upload_url = f"https://video.bsky.app/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"} try: - r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=300) + r = http.post(upload_url, headers=headers, data=video_bytes, timeout=(20, 360)) except requests.RequestException as e: - logging.error(f"❌ video upload request failed: {repr(e)}") + logging.error(f"❌ video.bsky.app upload request failed: {repr(e)}") return None if r.status_code not in (200, 409): @@ -387,314 +438,204 @@ def upload_video_via_video_service_embed_dict( try: payload = r.json() - except Exception as e: - logging.error(f"❌ Could not parse upload response JSON: {repr(e)} body={r.text[:500]}") + except Exception: + logging.error(f"❌ Invalid JSON from upload response: {r.text[:400]}") return None if r.status_code == 409: if payload.get("error") == "already_exists" and payload.get("jobId"): - logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") + logging.info("ℹ️ Video already exists. Reusing job.") else: logging.error(f"❌ 409 without reusable jobId: {payload}") return None job_id = payload.get("jobId") if not job_id: - logging.error(f"❌ No jobId in video upload response: {payload}") + logging.error(f"❌ Missing jobId: {payload}") return None - logging.info(f"⏳ Job {job_id} accepted β€” polling status...") - - status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" - deadline = time.time() + 600 # 10 min max poll - last_state = None - completed_blob = None + status_url = "https://video.bsky.app/xrpc/app.bsky.video.getJobStatus" + deadline = time.time() + 600 while time.time() < deadline: try: - s = requests.get( + s = http.get( status_url, params={"jobId": job_id}, headers={"Authorization": f"Bearer {token}"}, - timeout=30, + timeout=(10, 45), ) except requests.RequestException as e: - logging.warning(f"⚠️ Status poll request failed once: {repr(e)}") + logging.warning(f"⚠️ Poll error: {repr(e)}") time.sleep(3) continue if s.status_code != 200: - logging.error(f"❌ Job status failed: {s.status_code} - {s.text}") + logging.error(f"❌ getJobStatus failed: {s.status_code} - {s.text}") return None - try: - body = s.json() - except Exception as e: - logging.error(f"❌ Could not parse status JSON: {repr(e)}") - return None - - job_status = body.get("jobStatus", {}) or {} - state = job_status.get("state") - - if state != last_state: - logging.info(f" state β†’ {state}") - last_state = state - - if state == "JOB_STATE_COMPLETED": - completed_blob = job_status.get("blob") - if not completed_blob: - logging.error(f"❌ Completed job without blob: {body}") + body = s.json() + st = (body.get("jobStatus") or {}).get("state") + if st == "JOB_STATE_COMPLETED": + blob = (body.get("jobStatus") or {}).get("blob") + if not blob: + logging.error(f"❌ Completed without blob: {body}") return None - break - - if state == "JOB_STATE_FAILED": - logging.error(f"❌ Video processing failed: {job_status}") + wait_with_heartbeat(8, "CDN propagation") + embed = {"$type": "app.bsky.embed.video", "video": blob} + if alt_text: + embed["alt"] = alt_text + return embed + if st == "JOB_STATE_FAILED": + logging.error(f"❌ Video processing failed: {body}") return None + logging.info(f" ...still processing (state={st})...") time.sleep(3) - if completed_blob is None: - logging.error("❌ Video processing timed out.") - return None - - # Wait AFTER successful poll β€” outside the loop, no SDK in scope - wait_with_heartbeat(8, "CDN propagation") - - # Build pure dict embed - embed = { - "$type": "app.bsky.embed.video", - "video": completed_blob, - } - if alt_text: - embed["alt"] = alt_text - return embed - - -# --------------------------- -# Direct PDS video fallback -# --------------------------- -def upload_video_via_pds_embed_dict( - pds_url: str, - access_jwt: str, - video_path: str, - alt_text: str = "", - settle_delay_seconds: float = 30.0, -) -> dict | None: - if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") - return None - - with open(video_path, "rb") as f: - b = f.read() - size_mb = len(b) / (1024 * 1024) - logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") - - status, body = xrpc_post_bytes( - pds_url=pds_url, - method="com.atproto.repo.uploadBlob", - data=b, - content_type="video/mp4", - access_jwt=access_jwt, - timeout=900, - ) - if status != 200 or not isinstance(body, dict): - logging.error(f"❌ PDS uploadBlob failed: HTTP {status} body={body}") - return None - - blob = body.get("blob") - if not blob: - logging.error(f"❌ PDS uploadBlob returned no blob: {body}") - return None - - wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") - - embed = { - "$type": "app.bsky.embed.video", - "video": blob, - } - if alt_text: - embed["alt"] = alt_text - return embed - - -def upload_video_smart_embed_dict( - did: str, - access_jwt: str, - pds_url: str, - video_path: str, - service_url: str, - alt_text: str = "", - settle_delay_seconds: float = 30.0, - allow_pds_video_fallback: bool = False, -) -> dict | None: - logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") - embed = upload_video_via_video_service_embed_dict( - did=did, - access_jwt=access_jwt, - pds_url=pds_url, - video_path=video_path, - service_url=service_url, - alt_text=alt_text, - ) - if embed: - return embed - - if allow_pds_video_fallback: - logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") - return upload_video_via_pds_embed_dict( - pds_url=pds_url, - access_jwt=access_jwt, - video_path=video_path, - alt_text=alt_text, - settle_delay_seconds=settle_delay_seconds, - ) - - logging.error("❌ video.bsky.app failed and fallback is disabled.") + logging.error("❌ Video processing timed out.") return None -# --------------------------- -# Create post -# --------------------------- -def create_post_record(text: str, langs: list[str], embed_dict: dict | None = None) -> dict: - record = { +# ----------------------------------------------------------------------------- +# Post +# ----------------------------------------------------------------------------- +def create_post_record(text: str, langs: list[str], embed_dict: dict | None) -> dict: + r = { "$type": "app.bsky.feed.post", "text": text.strip(), "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), } if langs: - record["langs"] = langs - if embed_dict is not None: - record["embed"] = embed_dict - return record + r["langs"] = langs + if embed_dict: + r["embed"] = embed_dict + return r -def publish_post(pds_url: str, access_jwt: str, did: str, record: dict) -> bool: +def publish_post(http: requests.Session, pds_url: str, access_jwt: str, did: str, record: dict) -> bool: status, body = xrpc_post_json( + http=http, pds_url=pds_url, method="com.atproto.repo.createRecord", - body={ - "repo": did, - "collection": "app.bsky.feed.post", - "record": record, - }, + body={"repo": did, "collection": "app.bsky.feed.post", "record": record}, access_jwt=access_jwt, - timeout=60, + timeout=(10, 90), ) if status != 200 or not isinstance(body, dict): logging.error(f"❌ createRecord failed: HTTP {status} body={body}") return False - uri = body.get("uri") - logging.info(f"βœ… Post published! URI: {uri}") + logging.info(f"βœ… Post published! URI: {body.get('uri')}") return True -# --------------------------- -# Main -# --------------------------- def main(): setup_logging() - - parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") - parser.add_argument("text", help="Post text") - parser.add_argument("--username", required=True, help="Handle/email") - parser.add_argument("--password", required=True, help="App password") - parser.add_argument("--service", default="https://bsky.social", help="PDS URL") - parser.add_argument("--lang", default="ca", help="Comma-separated language codes") - parser.add_argument("--image", default=None, help="Image path") - parser.add_argument("--video", default=None, help="Video path") - parser.add_argument("--alt", default="", help="Alt text") - parser.add_argument("--video-settle-delay", type=float, default=30.0) + parser = argparse.ArgumentParser() + parser.add_argument("text") + parser.add_argument("--username", required=True) + parser.add_argument("--password", required=True) + parser.add_argument("--service", default="https://eurosky.social") + parser.add_argument("--auth-hosts", default="", help="Comma-separated auth hosts fallback, e.g. https://eurosky.social,https://bsky.social") + parser.add_argument("--lang", default="ca") + parser.add_argument("--image", default=None) + parser.add_argument("--video", default=None) + parser.add_argument("--alt", default="") parser.add_argument("--allow-pds-video-fallback", action="store_true") + parser.add_argument("--video-settle-delay", type=float, default=30.0) parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") parser.add_argument("--max-video-mb", type=float, default=45.0) parser.add_argument("--ffmpeg-crf", type=int, default=28) parser.add_argument("--ffmpeg-preset", default="veryfast") - args = parser.parse_args() if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) - pds_url = args.service.rstrip("/") + service_url = args.service.rstrip("/") + auth_hosts = [x.strip().rstrip("/") for x in args.auth_hosts.split(",") if x.strip()] + if not auth_hosts: + auth_hosts = [service_url, "https://bsky.social"] - session = login_with_backoff( - pds_url=pds_url, + http = build_http_session() + + session, auth_host_used = login_with_fallback_hosts( + http=http, + auth_hosts=auth_hosts, identifier=args.username, password=args.password, ) if not session: + logging.error("❌ Login failed on all auth hosts.") sys.exit(1) did = session["did"] access_jwt = session["accessJwt"] - logging.info(f"πŸ†” DID: {did} | PDS: {pds_url}") + logging.info(f"πŸ†” DID: {did}") + logging.info(f"πŸ” Auth host used: {auth_host_used}") + logging.info(f"πŸ“‘ Service host for repo operations: {service_url}") langs = [x.strip() for x in args.lang.split(",") if x.strip()] video_path_for_upload = args.video - temp_compressed_path = None - + tmp_compressed = None if args.video and args.compress_video: - compressed = compress_video_ffmpeg( - input_path=args.video, - max_size_mb=args.max_video_mb, - crf=args.ffmpeg_crf, - preset=args.ffmpeg_preset, - audio_bitrate_k=96, - ) - if compressed is None: - logging.error("❌ Compression failed; aborting.") + c = compress_video_ffmpeg(args.video, args.max_video_mb, args.ffmpeg_crf, args.ffmpeg_preset, 96) + if c is None: sys.exit(1) - video_path_for_upload = compressed - if compressed != args.video: - temp_compressed_path = compressed - - embed_dict = None + video_path_for_upload = c + if c != args.video: + tmp_compressed = c + embed = None if video_path_for_upload: - logging.info(f"🎬 Preparing video upload: {video_path_for_upload}") - embed_dict = upload_video_smart_embed_dict( + embed = upload_video_via_video_service_embed_dict( + http=http, did=did, access_jwt=access_jwt, - pds_url=pds_url, + pds_url_for_auth=auth_host_used, + service_url=service_url, video_path=video_path_for_upload, - service_url=pds_url, alt_text=args.alt, - settle_delay_seconds=args.video_settle_delay, - allow_pds_video_fallback=args.allow_pds_video_fallback, ) - if embed_dict is None: - logging.error("❌ Aborting post: video upload/processing failed.") - if temp_compressed_path and os.path.exists(temp_compressed_path): - os.remove(temp_compressed_path) + if embed is None and args.allow_pds_video_fallback: + logging.warning("⚠️ Falling back to direct PDS video upload.") + status, body = xrpc_post_bytes( + http=http, + pds_url=service_url, + method="com.atproto.repo.uploadBlob", + data=open(video_path_for_upload, "rb").read(), + content_type="video/mp4", + access_jwt=access_jwt, + timeout=(20, 900), + ) + if status == 200 and isinstance(body, dict) and body.get("blob"): + wait_with_heartbeat(args.video_settle_delay, "PDS/AppView indexing") + embed = {"$type": "app.bsky.embed.video", "video": body["blob"]} + if args.alt: + embed["alt"] = args.alt + + if embed is None: + logging.error("❌ Aborting post: video upload failed.") + if tmp_compressed and os.path.exists(tmp_compressed): + os.remove(tmp_compressed) sys.exit(1) elif args.image: - embed_dict = upload_image_embed_dict( - pds_url=pds_url, - access_jwt=access_jwt, - image_path=args.image, - alt_text=args.alt, - ) - if embed_dict is None: - logging.error("❌ Aborting post: image upload failed.") + embed = upload_image_embed_dict(http, service_url, access_jwt, args.image, args.alt) + if embed is None: + if tmp_compressed and os.path.exists(tmp_compressed): + os.remove(tmp_compressed) sys.exit(1) - record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) - logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") + record = create_post_record(args.text, langs, embed) + ok = publish_post(http, service_url, access_jwt, did, record) - ok = publish_post(pds_url=pds_url, access_jwt=access_jwt, did=did, record=record) - - if temp_compressed_path and os.path.exists(temp_compressed_path): - try: - os.remove(temp_compressed_path) - logging.info(f"🧹 Removed temp file: {temp_compressed_path}") - except Exception: - pass + if tmp_compressed and os.path.exists(tmp_compressed): + os.remove(tmp_compressed) if not ok: sys.exit(1)