diff --git a/bsky_post.py b/bsky_post.py index b374ec1..7071a6b 100644 --- a/bsky_post.py +++ b/bsky_post.py @@ -1,641 +1,465 @@ #!/usr/bin/env python3 -""" -Post text + optional image/video to Bluesky/federated PDS. - -Reliability features: -- Raw XRPC via requests (no atproto SDK serialization pitfalls). -- Hardened HTTP transport with retry adapter + longer read timeouts. -- Login fallback hosts via --auth-hosts (comma-separated). -- Video upload through https://video.bsky.app with proper service auth. -- 409 already_exists support (reuses jobId). -- Optional direct PDS fallback for video. -- ffmpeg compression enabled by default. -""" - import argparse +import io import json import logging -import mimetypes import os import random -import secrets -import shutil -import string +import re import subprocess -import sys import tempfile import time -from urllib.parse import urlparse +from dotenv import load_dotenv +from PIL import Image +from atproto import Client, client_utils, models -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry +# --- Configuration --- +LOG_PATH = "bsky_single_post.log" +DEFAULT_BSKY_BASE_URL = "https://bsky.social" +DEFAULT_BSKY_LANGS = ["ca"] + +BSKY_TEXT_MAX_LENGTH = 300 + +MAX_VIDEO_UPLOAD_SIZE_MB = 45 +VIDEO_MAX_DURATION_SECONDS = 179 +SUBPROCESS_TIMEOUT_SECONDS = 240 +FFPROBE_TIMEOUT_SECONDS = 20 + +BSKY_IMAGE_MAX_BYTES = 950 * 1024 +BSKY_IMAGE_MAX_DIMENSION = 2000 +BSKY_IMAGE_MIN_JPEG_QUALITY = 45 + +BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 +BSKY_BLOB_UPLOAD_BASE_DELAY = 10 +BSKY_BLOB_UPLOAD_MAX_DELAY = 300 +BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 +BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 + +BSKY_SEND_POST_MAX_RETRIES = 3 +BSKY_SEND_POST_BASE_DELAY = 5 +BSKY_SEND_POST_MAX_DELAY = 60 + +BSKY_LOGIN_MAX_RETRIES = 5 +BSKY_LOGIN_BASE_DELAY = 10 +BSKY_LOGIN_MAX_DELAY = 600 +BSKY_LOGIN_JITTER_MAX = 1.5 + +# --- Logging --- +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], + level=logging.INFO, +) -# ----------------------------------------------------------------------------- -# Logging -# ----------------------------------------------------------------------------- -def setup_logging() -> None: - logging.basicConfig( - format="%(asctime)s %(levelname)s %(message)s", - level=logging.INFO, - stream=sys.stdout, - ) - for noisy in ("urllib3",): - logging.getLogger(noisy).setLevel(logging.WARNING) +# --- Text helpers --- +def clean_post_text(text: str) -> str: + if not text: + return "" + text = text.replace("\r", "\n") + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() -# ----------------------------------------------------------------------------- -# HTTP session with retries -# ----------------------------------------------------------------------------- -def build_http_session() -> requests.Session: - s = requests.Session() - retry = Retry( - total=4, - connect=4, - read=4, - backoff_factor=0.8, - status_forcelist=[429, 500, 502, 503, 504], - allowed_methods=frozenset(["GET", "POST"]), - raise_on_status=False, - ) - adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20) - s.mount("https://", adapter) - s.mount("http://", adapter) - s.headers.update({"User-Agent": "post2bsky/1.0"}) - return s +def truncate_text_safely(text: str, max_length: int = BSKY_TEXT_MAX_LENGTH) -> str: + if len(text) <= max_length: + return text + truncated = text[:max_length] + last_space = truncated.rfind(" ") + if last_space > 20: + return truncated[:last_space] + return truncated -# ----------------------------------------------------------------------------- -# Status helpers -# ----------------------------------------------------------------------------- -def is_auth_error_status(status: int) -> bool: - return status in (400, 401, 403) +def make_rich(content: str): + text_builder = client_utils.TextBuilder() + content = clean_post_text(content) + lines = content.splitlines() + + for li, line in enumerate(lines): + words = line.split(" ") + for wi, word in enumerate(words): + text_builder.text(word) + if wi < len(words) - 1: + text_builder.text(" ") + if li < len(lines) - 1: + text_builder.text("\n") + + return text_builder -def is_rate_limited_status(status: int) -> bool: - return status == 429 +# --- Error helpers --- +def is_rate_limited_error(error_obj): + t = repr(error_obj).lower() + return "429" in t or "ratelimitexceeded" in t or "too many requests" in t or "rate limit" in t -def is_transient_status(status: int) -> bool: - return status in (408, 425, 429, 500, 502, 503, 504) +def is_auth_error(error_obj): + t = repr(error_obj).lower() + return "401" in t or "403" in t or "invalid identifier or password" in t -# ----------------------------------------------------------------------------- -# Raw XRPC helpers -# ----------------------------------------------------------------------------- -def xrpc_get( - http: requests.Session, - pds_url: str, - method: str, - params: dict, - access_jwt: str | None = None, - timeout=(10, 60), -) -> tuple[int, dict | str]: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = {} - if access_jwt: - headers["Authorization"] = f"Bearer {access_jwt}" - r = http.get(url, headers=headers, params=params, timeout=timeout) +def is_network_error(error_obj): + t = repr(error_obj) + signals = ["ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError"] + return any(sig in t for sig in signals) + + +def is_transient_error(error_obj): + t = repr(error_obj) + transient = ["InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504"] + return any(s in t for s in transient) + + +def get_rate_limit_wait_seconds(error_obj, default_delay): try: - return r.status_code, r.json() + now_ts = int(time.time()) + headers = getattr(error_obj, "headers", None) or {} + retry_after = headers.get("retry-after") or headers.get("Retry-After") + if retry_after: + return min(max(int(retry_after), 1), BSKY_LOGIN_MAX_DELAY) + reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + if reset_value: + wait_seconds = max(int(reset_value) - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) except Exception: - return r.status_code, r.text + pass + + text = repr(error_obj) + m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) + if m: + return min(max(int(m.group(1)), 1), BSKY_LOGIN_MAX_DELAY) + + return default_delay -def xrpc_post_json( - http: requests.Session, - pds_url: str, - method: str, - body: dict, - access_jwt: str | None = None, - timeout=(10, 90), -) -> tuple[int, dict | str]: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = {"Content-Type": "application/json"} - if access_jwt: - headers["Authorization"] = f"Bearer {access_jwt}" - r = http.post(url, headers=headers, data=json.dumps(body), timeout=timeout) - try: - return r.status_code, r.json() - except Exception: - return r.status_code, r.text +# --- Bluesky client --- +def create_bsky_client(base_url, handle, password): + base = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") + logging.info(f"πŸ” Connecting Bluesky client via base URL: {base}") + client = Client(base_url=base) - -def xrpc_post_bytes( - http: requests.Session, - pds_url: str, - method: str, - data: bytes, - content_type: str, - access_jwt: str | None = None, - timeout=(20, 900), -) -> tuple[int, dict | str]: - url = f"{pds_url.rstrip('/')}/xrpc/{method}" - headers = {"Content-Type": content_type} - if access_jwt: - headers["Authorization"] = f"Bearer {access_jwt}" - r = http.post(url, headers=headers, data=data, timeout=timeout) - try: - return r.status_code, r.json() - except Exception: - return r.status_code, r.text - - -# ----------------------------------------------------------------------------- -# Login with fallback hosts -# ----------------------------------------------------------------------------- -def login_on_host( - http: requests.Session, - host_url: str, - identifier: str, - password: str, - max_attempts: int = 5, - base_delay: float = 6.0, - max_delay: float = 60.0, -) -> dict | None: - for attempt in range(1, max_attempts + 1): + for attempt in range(1, BSKY_LOGIN_MAX_RETRIES + 1): try: - logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {host_url} as {identifier}") - status, body = xrpc_post_json( - http=http, - pds_url=host_url, - method="com.atproto.server.createSession", - body={"identifier": identifier, "password": password}, - timeout=(10, 75), - ) + logging.info(f"πŸ” Bluesky login attempt {attempt}/{BSKY_LOGIN_MAX_RETRIES} for {handle}") + client.login(handle, password) + logging.info("βœ… Bluesky login successful.") + return client - if status == 200 and isinstance(body, dict) and body.get("accessJwt") and body.get("did"): - logging.info(f"βœ… Login successful on {host_url}.") - return body + except Exception as e: + logging.exception("❌ Bluesky login exception") - logging.error(f"❌ Login failed on {host_url}: HTTP {status} body={body}") + if is_auth_error(e): + logging.error("❌ Invalid Bluesky credentials.") + raise - if is_auth_error_status(status): - return None + if is_rate_limited_error(e): + if attempt < BSKY_LOGIN_MAX_RETRIES: + wait = get_rate_limit_wait_seconds(e, BSKY_LOGIN_BASE_DELAY) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) + logging.warning(f"⏳ Login rate-limited. Retrying in {wait:.1f}s...") + time.sleep(wait) + continue + raise - if attempt < max_attempts and (is_rate_limited_status(status) or is_transient_status(status)): - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) - logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...") - time.sleep(wait) - continue - - if attempt < max_attempts: - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) + if is_network_error(e) or is_transient_error(e): + if attempt < BSKY_LOGIN_MAX_RETRIES: + wait = min(BSKY_LOGIN_BASE_DELAY * attempt, BSKY_LOGIN_MAX_DELAY) + random.uniform(0, BSKY_LOGIN_JITTER_MAX) + logging.warning(f"⏳ Login transient failure. Retrying in {wait:.1f}s...") + time.sleep(wait) + continue + raise + + raise + + raise RuntimeError("Bluesky login failed after retries.") + + +# --- Blob upload retries --- +def upload_blob_with_retry(client, binary_data, media_label="media"): + last_exception = None + transient_attempts = 0 + + for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): + try: + result = client.upload_blob(binary_data) + return result.blob + except Exception as e: + last_exception = e + text = str(e) + is_rate = "429" in text or "RateLimitExceeded" in text + + if is_rate: + backoff = min(BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY) + wait = get_rate_limit_wait_seconds(e, backoff) + if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: + logging.warning(f"⏳ Blob upload rate-limited for {media_label}. Retry in {wait}s.") + time.sleep(wait) + continue + break + + if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: + transient_attempts += 1 + wait = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + logging.warning(f"⏳ Blob transient error for {media_label}. Retry in {wait}s.") time.sleep(wait) continue + logging.warning(f"⚠️ Blob upload failed for {media_label}: {repr(e)}") return None - except requests.RequestException as e: - logging.warning(f"⚠️ Login request error on {host_url}: {repr(e)}") - if attempt >= max_attempts: - return None - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0) - logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...") - time.sleep(wait) - + logging.warning(f"⚠️ Blob upload exhausted retries for {media_label}: {repr(last_exception)}") return None -def login_with_fallback_hosts( - http: requests.Session, - auth_hosts: list[str], - identifier: str, - password: str, -) -> tuple[dict | None, str | None]: - for host in auth_hosts: - session = login_on_host(http, host, identifier, password) - if session: - return session, host - logging.warning(f"⚠️ Auth host failed: {host}") - return None, None +def send_post_with_retry(client, **kwargs): + last_exception = None - -# ----------------------------------------------------------------------------- -# Helpers -# ----------------------------------------------------------------------------- -def wait_with_heartbeat(seconds: float, label: str) -> None: - if seconds <= 0: - return - logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...") - remaining = seconds - while remaining > 0: - step = min(5.0, remaining) - time.sleep(step) - remaining -= step - if remaining > 0: - logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") - logging.info("βœ… Wait complete.") - - -def pds_did_from_service_url(service_url: str) -> str: - host = (urlparse(service_url).hostname or "").lower() - if not host: - raise ValueError(f"Invalid --service URL: {service_url}") - return f"did:web:{host}" - - -def random_video_name(ext: str = ".mp4") -> str: - token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) - return f"{int(time.time())}_{token}{ext}" - - -def get_service_auth_token( - http: requests.Session, - pds_url_for_auth: str, - access_jwt: str, - aud: str, - lxm: str, - exp_seconds: int = 1800, -) -> str | None: - status, body = xrpc_get( - http=http, - pds_url=pds_url_for_auth, - method="com.atproto.server.getServiceAuth", - params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds}, - access_jwt=access_jwt, - timeout=(10, 60), - ) - if status != 200 or not isinstance(body, dict): - logging.error(f"❌ getServiceAuth failed: HTTP {status} body={body}") - return None - token = body.get("token") - if not token: - logging.error(f"❌ getServiceAuth returned no token: {body}") - return None - return token - - -# ----------------------------------------------------------------------------- -# ffmpeg -# ----------------------------------------------------------------------------- -def ffmpeg_exists() -> bool: - return shutil.which("ffmpeg") is not None - - -def ffprobe_exists() -> bool: - return shutil.which("ffprobe") is not None - - -def get_video_duration_seconds(path: str) -> float | None: - if not ffprobe_exists(): - return None - try: - out = subprocess.check_output( - [ - "ffprobe", "-v", "error", - "-show_entries", "format=duration", - "-of", "default=noprint_wrappers=1:nokey=1", - path, - ], - stderr=subprocess.STDOUT, - text=True, - ).strip() - return float(out) - except Exception: - return None - - -def compress_video_ffmpeg( - input_path: str, - max_size_mb: float = 45.0, - crf: int = 28, - preset: str = "veryfast", - audio_bitrate_k: int = 96, -) -> str | None: - if not ffmpeg_exists(): - logging.error("❌ ffmpeg not found in PATH.") - return None - if not os.path.exists(input_path): - logging.error(f"❌ Video file not found: {input_path}") - return None - - src_size_mb = os.path.getsize(input_path) / (1024 * 1024) - logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") - if src_size_mb <= max_size_mb: - logging.info("βœ… Already below target size. Skipping compression.") - return input_path - - duration = get_video_duration_seconds(input_path) - target_video_k = 1200 - if duration and duration > 0: - total_kbps = (max_size_mb * 8192.0) / duration - target_video_k = int(max(300, total_kbps - audio_bitrate_k)) - target_video_k = min(max(target_video_k, 300), 5000) - - fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") - os.close(fd) - - cmd = [ - "ffmpeg", "-y", "-i", input_path, - "-c:v", "libx264", "-preset", preset, "-crf", str(crf), - "-b:v", f"{target_video_k}k", - "-maxrate", f"{int(target_video_k * 1.3)}k", - "-bufsize", f"{int(target_video_k * 2)}k", - "-vf", "scale='min(1280,iw)':-2", - "-c:a", "aac", "-b:a", f"{audio_bitrate_k}k", - "-movflags", "+faststart", - out_path, - ] - - try: - logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB)...") - subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - out_size_mb = os.path.getsize(out_path) / (1024 * 1024) - logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") - if out_size_mb < src_size_mb: - return out_path - os.remove(out_path) - logging.info("ℹ️ Compression not smaller than source. Using original.") - return input_path - except subprocess.CalledProcessError as e: - logging.error("❌ ffmpeg compression failed.") - if e.stderr: - logging.error(e.stderr[-1500:]) + for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): try: - os.remove(out_path) + return client.send_post(**kwargs) + except Exception as e: + last_exception = e + text = str(e) + is_rate = "429" in text or "RateLimitExceeded" in text + + if is_rate and attempt < BSKY_SEND_POST_MAX_RETRIES: + backoff = min(BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), BSKY_SEND_POST_MAX_DELAY) + wait = get_rate_limit_wait_seconds(e, backoff) + logging.warning(f"⏳ send_post rate-limited. Retry in {wait}s.") + time.sleep(wait) + continue + + if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: + wait = BSKY_SEND_POST_BASE_DELAY * attempt + logging.warning(f"⏳ send_post transient error. Retry in {wait}s.") + time.sleep(wait) + continue + + raise + + raise last_exception + + +# --- Image helpers --- +def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): + try: + with Image.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + w, h = img.size + max_dim = max(w, h) + + if max_dim > BSKY_IMAGE_MAX_DIMENSION: + scale = BSKY_IMAGE_MAX_DIMENSION / max_dim + img = img.resize((max(1, int(w * scale)), max(1, int(h * scale))), Image.LANCZOS) + + for q in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: + out = io.BytesIO() + img.save(out, format="JPEG", quality=q, optimize=True, progressive=True) + data = out.getvalue() + if len(data) <= max_bytes: + return data + except Exception as e: + logging.warning(f"⚠️ Could not compress image: {repr(e)}") + return None + + +def get_blob_from_file_image(client, path): + if not os.path.exists(path): + logging.error(f"❌ Image not found: {path}") + return None + + with open(path, "rb") as f: + content = f.read() + + upload_bytes = content + if len(content) > BSKY_IMAGE_MAX_BYTES: + logging.info("πŸ–ΌοΈ Image exceeds safe limit, compressing...") + compressed = compress_post_image_to_limit(content, BSKY_IMAGE_MAX_BYTES) + if not compressed: + logging.error("❌ Could not compress image enough.") + return None + upload_bytes = compressed + + return upload_blob_with_retry(client, upload_bytes, media_label=path) + + +# --- Video helpers --- +def remove_file_quietly(path): + if path and os.path.exists(path): + try: + os.remove(path) except Exception: pass - return None -# ----------------------------------------------------------------------------- -# Uploads -# ----------------------------------------------------------------------------- -def upload_image_embed_dict(http: requests.Session, pds_url: str, access_jwt: str, image_path: str, alt_text: str) -> dict | None: - if not os.path.exists(image_path): - logging.error(f"❌ Image file not found: {image_path}") - return None - mime, _ = mimetypes.guess_type(image_path) - mime = mime or "image/jpeg" - with open(image_path, "rb") as f: - data = f.read() - - status, body = xrpc_post_bytes( - http=http, - pds_url=pds_url, - method="com.atproto.repo.uploadBlob", - data=data, - content_type=mime, - access_jwt=access_jwt, - timeout=(20, 240), - ) - if status != 200 or not isinstance(body, dict) or not body.get("blob"): - logging.error(f"❌ Image uploadBlob failed: HTTP {status} body={body}") - return None - - return { - "$type": "app.bsky.embed.images", - "images": [{"alt": alt_text or "", "image": body["blob"]}], - } +def probe_video_duration(file_path): + cmd = [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + file_path, + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=FFPROBE_TIMEOUT_SECONDS) + if result.returncode != 0 or not result.stdout.strip(): + raise RuntimeError("ffprobe failed to read duration") + return float(result.stdout.strip()) -def upload_video_via_video_service_embed_dict( - http: requests.Session, - did: str, - access_jwt: str, - pds_url_for_auth: str, - service_url: str, - video_path: str, - alt_text: str, -) -> dict | None: +def transcode_video_for_bsky(input_path, output_path): + cmd = [ + "ffmpeg", "-y", + "-i", input_path, + "-vf", "scale='min(1280,iw)':-2", + "-c:v", "libx264", + "-preset", "veryfast", + "-crf", "28", + "-maxrate", "1800k", + "-bufsize", "3600k", + "-c:a", "aac", + "-b:a", "128k", + "-movflags", "+faststart", + output_path, + ] + return subprocess.run(cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT_SECONDS) + + +def prepare_video_file(video_path): if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") - return None + logging.error(f"❌ Video not found: {video_path}") + return None, None - with open(video_path, "rb") as f: - video_bytes = f.read() + size_mb = os.path.getsize(video_path) / (1024 * 1024) + logging.info(f"🎬 Source video size: {size_mb:.2f} MB") - size_mb = len(video_bytes) / (1024 * 1024) - logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") + # Always transcode to maximize compatibility + temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name + res = transcode_video_for_bsky(video_path, temp_out) + if res.returncode != 0: + logging.error(f"❌ ffmpeg transcode failed:\n{res.stderr[-1200:]}") + remove_file_quietly(temp_out) + return None, None - pds_did = pds_did_from_service_url(service_url) - token = get_service_auth_token( - http=http, - pds_url_for_auth=pds_url_for_auth, - access_jwt=access_jwt, - aud=pds_did, - lxm="com.atproto.repo.uploadBlob", - exp_seconds=1800, - ) - if not token: - return None + out_size_mb = os.path.getsize(temp_out) / (1024 * 1024) + logging.info(f"βœ… Prepared video size: {out_size_mb:.2f} MB") - upload_name = random_video_name(".mp4") - upload_url = f"https://video.bsky.app/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}" - headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"} + if out_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: + logging.error(f"❌ Video still too large after transcode ({out_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB).") + remove_file_quietly(temp_out) + return None, None + # Optional duration check try: - r = http.post(upload_url, headers=headers, data=video_bytes, timeout=(20, 360)) - except requests.RequestException as e: - logging.error(f"❌ video.bsky.app upload request failed: {repr(e)}") - return None - - if r.status_code not in (200, 409): - logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}") - return None - - try: - payload = r.json() + duration = probe_video_duration(temp_out) + if duration > VIDEO_MAX_DURATION_SECONDS: + logging.warning(f"⚠️ Video duration {duration:.1f}s exceeds recommended {VIDEO_MAX_DURATION_SECONDS}s.") except Exception: - logging.error(f"❌ Invalid JSON from upload response: {r.text[:400]}") - return None + pass - if r.status_code == 409: - if payload.get("error") == "already_exists" and payload.get("jobId"): - logging.info("ℹ️ Video already exists. Reusing job.") - else: - logging.error(f"❌ 409 without reusable jobId: {payload}") - return None - - job_id = payload.get("jobId") - if not job_id: - logging.error(f"❌ Missing jobId: {payload}") - return None - - status_url = "https://video.bsky.app/xrpc/app.bsky.video.getJobStatus" - deadline = time.time() + 600 - - while time.time() < deadline: - try: - s = http.get( - status_url, - params={"jobId": job_id}, - headers={"Authorization": f"Bearer {token}"}, - timeout=(10, 45), - ) - except requests.RequestException as e: - logging.warning(f"⚠️ Poll error: {repr(e)}") - time.sleep(3) - continue - - if s.status_code != 200: - logging.error(f"❌ getJobStatus failed: {s.status_code} - {s.text}") - return None - - body = s.json() - st = (body.get("jobStatus") or {}).get("state") - if st == "JOB_STATE_COMPLETED": - blob = (body.get("jobStatus") or {}).get("blob") - if not blob: - logging.error(f"❌ Completed without blob: {body}") - return None - wait_with_heartbeat(8, "CDN propagation") - embed = {"$type": "app.bsky.embed.video", "video": blob} - if alt_text: - embed["alt"] = alt_text - return embed - if st == "JOB_STATE_FAILED": - logging.error(f"❌ Video processing failed: {body}") - return None - - logging.info(f" ...still processing (state={st})...") - time.sleep(3) - - logging.error("❌ Video processing timed out.") - return None + return temp_out, True -# ----------------------------------------------------------------------------- -# Post -# ----------------------------------------------------------------------------- -def create_post_record(text: str, langs: list[str], embed_dict: dict | None) -> dict: - r = { - "$type": "app.bsky.feed.post", - "text": text.strip(), - "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), +def build_video_embed_raw(video_blob, alt_text): + # RAW embed dict avoids SDK BlobRef issues + embed = { + "$type": "app.bsky.embed.video", + "video": video_blob, } - if langs: - r["langs"] = langs - if embed_dict: - r["embed"] = embed_dict - return r + if alt_text: + embed["alt"] = alt_text + return embed -def publish_post(http: requests.Session, pds_url: str, access_jwt: str, did: str, record: dict) -> bool: - status, body = xrpc_post_json( - http=http, - pds_url=pds_url, - method="com.atproto.repo.createRecord", - body={"repo": did, "collection": "app.bsky.feed.post", "record": record}, - access_jwt=access_jwt, - timeout=(10, 90), - ) - if status != 200 or not isinstance(body, dict): - logging.error(f"❌ createRecord failed: HTTP {status} body={body}") - return False - logging.info(f"βœ… Post published! URI: {body.get('uri')}") +# --- Main post function --- +def post_single(client, text, langs, image_path=None, video_path=None, alt_text=""): + clean = truncate_text_safely(clean_post_text(text), BSKY_TEXT_MAX_LENGTH) + rich_text = make_rich(clean) + + if image_path and video_path: + raise ValueError("Use either image or video, not both.") + + # Text + image + if image_path: + blob = get_blob_from_file_image(client, image_path) + if not blob: + logging.error("❌ Image upload failed.") + return False + + embed = models.AppBskyEmbedImages.Main( + images=[models.AppBskyEmbedImages.Image(alt=alt_text or "Image", image=blob)] + ) + resp = send_post_with_retry(client, text=rich_text, embed=embed, langs=langs) + logging.info(f"βœ… Posted text+image: {getattr(resp, 'uri', None)}") + return True + + # Text + video + if video_path: + prepared_path, is_temp = prepare_video_file(video_path) + if not prepared_path: + return False + + try: + with open(prepared_path, "rb") as f: + b = f.read() + + video_blob = upload_blob_with_retry(client, b, media_label=prepared_path) + if not video_blob: + logging.error("❌ Video blob upload failed.") + return False + + raw_video_embed = build_video_embed_raw(video_blob, alt_text or "Video") + resp = send_post_with_retry(client, text=rich_text, embed=raw_video_embed, langs=langs) + logging.info(f"βœ… Posted text+video: {getattr(resp, 'uri', None)}") + return True + finally: + if is_temp: + remove_file_quietly(prepared_path) + + # Text only fallback + resp = send_post_with_retry(client, text=rich_text, langs=langs) + logging.info(f"βœ… Posted text only: {getattr(resp, 'uri', None)}") return True def main(): - setup_logging() - parser = argparse.ArgumentParser() - parser.add_argument("text") - parser.add_argument("--username", required=True) - parser.add_argument("--password", required=True) - parser.add_argument("--service", default="https://eurosky.social") - parser.add_argument("--auth-hosts", default="", help="Comma-separated auth hosts fallback, e.g. https://eurosky.social,https://bsky.social") - parser.add_argument("--lang", default="ca") - parser.add_argument("--image", default=None) - parser.add_argument("--video", default=None) - parser.add_argument("--alt", default="") - parser.add_argument("--allow-pds-video-fallback", action="store_true") - parser.add_argument("--video-settle-delay", type=float, default=30.0) + load_dotenv() - parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) - parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") - parser.add_argument("--max-video-mb", type=float, default=45.0) - parser.add_argument("--ffmpeg-crf", type=int, default=28) - parser.add_argument("--ffmpeg-preset", default="veryfast") + parser = argparse.ArgumentParser(description="Single Bluesky post: text + image OR video") + parser.add_argument("--text", required=True, help="Post text") + parser.add_argument("--image", default=None, help="Local image path") + parser.add_argument("--video", default=None, help="Local video path") + parser.add_argument("--alt", default="", help="Alt text for media") + parser.add_argument("--bsky-handle", default=None, help="Bluesky handle") + parser.add_argument("--bsky-password", default=None, help="Bluesky app password") + parser.add_argument("--bsky-base-url", default=None, help="PDS URL (e.g. https://eurosky.social)") + parser.add_argument("--bsky-langs", default=None, help="Comma-separated langs, e.g. ca,es") args = parser.parse_args() + handle = args.bsky_handle or os.getenv("BSKY_HANDLE") + password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") + base_url = (args.bsky_base_url or os.getenv("BSKY_BASE_URL") or DEFAULT_BSKY_BASE_URL).strip() + + raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") + langs = [x.strip() for x in raw_langs.split(",") if x.strip()] if raw_langs else DEFAULT_BSKY_LANGS + + if not handle or not password: + logging.error("❌ Missing credentials: --bsky-handle/BSKY_HANDLE and --bsky-password/BSKY_APP_PASSWORD are required.") + sys.exit(1) + if args.image and args.video: logging.error("❌ Use either --image or --video, not both.") sys.exit(1) - service_url = args.service.rstrip("/") - auth_hosts = [x.strip().rstrip("/") for x in args.auth_hosts.split(",") if x.strip()] - if not auth_hosts: - auth_hosts = [service_url, "https://bsky.social"] + client = create_bsky_client(base_url, handle, password) - http = build_http_session() - - session, auth_host_used = login_with_fallback_hosts( - http=http, - auth_hosts=auth_hosts, - identifier=args.username, - password=args.password, + ok = post_single( + client=client, + text=args.text, + langs=langs, + image_path=args.image, + video_path=args.video, + alt_text=args.alt, ) - if not session: - logging.error("❌ Login failed on all auth hosts.") - sys.exit(1) - - did = session["did"] - access_jwt = session["accessJwt"] - logging.info(f"πŸ†” DID: {did}") - logging.info(f"πŸ” Auth host used: {auth_host_used}") - logging.info(f"πŸ“‘ Service host for repo operations: {service_url}") - - langs = [x.strip() for x in args.lang.split(",") if x.strip()] - - video_path_for_upload = args.video - tmp_compressed = None - if args.video and args.compress_video: - c = compress_video_ffmpeg(args.video, args.max_video_mb, args.ffmpeg_crf, args.ffmpeg_preset, 96) - if c is None: - sys.exit(1) - video_path_for_upload = c - if c != args.video: - tmp_compressed = c - - embed = None - if video_path_for_upload: - embed = upload_video_via_video_service_embed_dict( - http=http, - did=did, - access_jwt=access_jwt, - pds_url_for_auth=auth_host_used, - service_url=service_url, - video_path=video_path_for_upload, - alt_text=args.alt, - ) - if embed is None and args.allow_pds_video_fallback: - logging.warning("⚠️ Falling back to direct PDS video upload.") - status, body = xrpc_post_bytes( - http=http, - pds_url=service_url, - method="com.atproto.repo.uploadBlob", - data=open(video_path_for_upload, "rb").read(), - content_type="video/mp4", - access_jwt=access_jwt, - timeout=(20, 900), - ) - if status == 200 and isinstance(body, dict) and body.get("blob"): - wait_with_heartbeat(args.video_settle_delay, "PDS/AppView indexing") - embed = {"$type": "app.bsky.embed.video", "video": body["blob"]} - if args.alt: - embed["alt"] = args.alt - - if embed is None: - logging.error("❌ Aborting post: video upload failed.") - if tmp_compressed and os.path.exists(tmp_compressed): - os.remove(tmp_compressed) - sys.exit(1) - - elif args.image: - embed = upload_image_embed_dict(http, service_url, access_jwt, args.image, args.alt) - if embed is None: - if tmp_compressed and os.path.exists(tmp_compressed): - os.remove(tmp_compressed) - sys.exit(1) - - record = create_post_record(args.text, langs, embed) - ok = publish_post(http, service_url, access_jwt, did, record) - - if tmp_compressed and os.path.exists(tmp_compressed): - os.remove(tmp_compressed) if not ok: sys.exit(1)