Test Moar Fixes

This commit is contained in:
2026-05-08 18:59:24 +00:00
parent a04b5994d9
commit 83f8d28f3c

View File

@@ -1,20 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
bsky_post.py — Post text + optional image/video to Bluesky/federated PDS. Post text + optional image/video to Bluesky/federated PDS.
This script is designed to be robust across different atproto SDK versions and Key reliability choices:
federated PDS setups. It includes: - Video uploads go through https://video.bsky.app first (best client playback compatibility).
- getServiceAuth uses:
- Login retry/backoff aud = did:web:<your-pds-host>
- Video upload via video.bsky.app (primary, recommended for playback compatibility) lxm = com.atproto.repo.uploadBlob
- Correct service auth parameters: - Handles 409 already_exists from video service by reusing jobId.
- aud = did:web:<your-pds-host> - Uses raw lexicon dict embeds (NO AppBskyEmbedVideo typed model), avoiding BlobRef SDK mismatch.
- lxm = com.atproto.repo.uploadBlob - Optional direct-PDS fallback for video.
- Handles 409 already_exists from video service by reusing jobId - ffmpeg compression enabled by default (disable with --no-compress-video).
- Avoids hard dependency on models.BlobRef (cross-version compatibility)
- Optional direct PDS video fallback
- Optional ffmpeg compression (enabled by default)
- Explicit createRecord payload with guaranteed plain-string "text"
""" """
import argparse import argparse
@@ -22,7 +18,6 @@ import logging
import mimetypes import mimetypes
import os import os
import random import random
import re
import secrets import secrets
import shutil import shutil
import string import string
@@ -30,29 +25,16 @@ import subprocess
import sys import sys
import tempfile import tempfile
import time import time
from dataclasses import dataclass
from urllib.parse import urlparse from urllib.parse import urlparse
import requests import requests
from atproto import Client, models from atproto import Client
# ----------------------------------------------------------------------------- # ---------------------------
# Retry configuration for login/backoff behavior # Logging
# ----------------------------------------------------------------------------- # ---------------------------
@dataclass(frozen=True)
class RetryConfig:
login_max_attempts: int = 5
login_base_delay_seconds: float = 10.0
login_max_delay_seconds: float = 600.0
login_jitter_seconds: float = 3.0
# -----------------------------------------------------------------------------
# Logging setup
# -----------------------------------------------------------------------------
def setup_logging() -> None: def setup_logging() -> None:
"""Configure structured logging to stdout."""
logging.basicConfig( logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s", format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO, level=logging.INFO,
@@ -60,60 +42,37 @@ def setup_logging() -> None:
) )
# ----------------------------------------------------------------------------- # ---------------------------
# Error classification helpers # Login helpers
# ----------------------------------------------------------------------------- # ---------------------------
def is_rate_limited_error(error_obj) -> bool: def is_auth_error(exc: Exception) -> bool:
"""Detect common rate-limit signatures from exceptions.""" t = repr(exc).lower()
t = repr(error_obj).lower()
return "429" in t or "ratelimit" in t or "too many requests" in t
def is_auth_error(error_obj) -> bool:
"""Detect authentication/authorization failures."""
t = repr(error_obj).lower()
return "401" in t or "403" in t or "invalid identifier or password" in t return "401" in t or "403" in t or "invalid identifier or password" in t
def is_network_error(error_obj) -> bool: def is_rate_limited_error(exc: Exception) -> bool:
"""Detect transient network/transport/server errors.""" t = repr(exc).lower()
t = repr(error_obj) return "429" in t or "ratelimit" in t or "too many requests" in t
return any(x in t for x in [
def is_transient_error(exc: Exception) -> bool:
t = repr(exc)
needles = [
"ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "503", "502", "504", "ConnectionResetError", "TimeoutException", "InvokeTimeoutError", "502", "503", "504",
"InvokeTimeoutError" ]
]) return any(n in t for n in needles)
def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float:
"""
Try to extract a smart retry delay from response headers, else return default.
"""
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
return default_delay
# -----------------------------------------------------------------------------
# Login with retries and backoff
# -----------------------------------------------------------------------------
def login_with_backoff( def login_with_backoff(
client: Client, username: str, password: str, service_url: str, client: Client,
max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5 username: str,
password: str,
service_url: str,
max_attempts: int = 5,
base_delay: float = 8.0,
max_delay: float = 120.0,
) -> bool: ) -> bool:
"""
Attempt login with exponential-ish backoff and jitter for transient failures.
"""
for attempt in range(1, max_attempts + 1): for attempt in range(1, max_attempts + 1):
try: try:
logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}") logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}")
@@ -121,38 +80,29 @@ def login_with_backoff(
logging.info("✅ Login successful.") logging.info("✅ Login successful.")
return True return True
except Exception as e: except Exception as e:
logging.exception("❌ Login exception") logging.exception("❌ Login failed")
if is_auth_error(e): if is_auth_error(e):
logging.error("❌ Authentication failed.") logging.error("❌ Authentication failed. Check handle/app-password.")
return False return False
if is_rate_limited_error(e): if attempt >= max_attempts:
if attempt < max_attempts:
wait = get_rate_limit_wait_seconds(e, base_delay, max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Rate limited, retrying in {wait:.1f}s...")
time.sleep(wait)
continue
return False return False
if is_network_error(e): if is_rate_limited_error(e) or is_transient_error(e):
if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2)
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning(f"⏳ Retrying login in {wait:.1f}s...")
logging.warning(f"⏳ Network/transient error, retrying in {wait:.1f}s...")
time.sleep(wait) time.sleep(wait)
continue else:
return False return False
return False return False
# ----------------------------------------------------------------------------- # ---------------------------
# Generic utility helpers # Generic helpers
# ----------------------------------------------------------------------------- # ---------------------------
def wait_with_heartbeat(total_seconds: float, label: str) -> None: def wait_with_heartbeat(seconds: float, label: str) -> None:
""" if seconds <= 0:
Sleep with periodic logging, useful for processing/indexing waits.
"""
if total_seconds <= 0:
return return
logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...")
remaining = total_seconds remaining = seconds
while remaining > 0: while remaining > 0:
step = min(5.0, remaining) step = min(5.0, remaining)
time.sleep(step) time.sleep(step)
@@ -163,10 +113,6 @@ def wait_with_heartbeat(total_seconds: float, label: str) -> None:
def pds_did_from_service_url(service_url: str) -> str: def pds_did_from_service_url(service_url: str) -> str:
"""
Convert service URL host to did:web:<host> used as getServiceAuth.aud.
Example: https://eurosky.social -> did:web:eurosky.social
"""
host = (urlparse(service_url).hostname or "").lower() host = (urlparse(service_url).hostname or "").lower()
if not host: if not host:
raise ValueError(f"Invalid --service URL: {service_url}") raise ValueError(f"Invalid --service URL: {service_url}")
@@ -174,68 +120,67 @@ def pds_did_from_service_url(service_url: str) -> str:
def random_video_name(ext: str = ".mp4") -> str: def random_video_name(ext: str = ".mp4") -> str:
"""Generate a random upload filename for video service requests."""
token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12))
return f"{int(time.time())}_{token}{ext}" return f"{int(time.time())}_{token}{ext}"
def detect_mime_type(path: str) -> str: def extract_token_from_service_auth(resp_obj) -> str | None:
"""Best-effort MIME type detection.""" tok = getattr(resp_obj, "token", None)
mime, _ = mimetypes.guess_type(path) if tok:
return mime or "application/octet-stream" return tok
if isinstance(resp_obj, dict):
return resp_obj.get("token")
def _extract_service_auth_token(upload_auth) -> str | None:
"""Extract token from typed/dict getServiceAuth responses."""
token = getattr(upload_auth, "token", None)
if token:
return token
if isinstance(upload_auth, dict):
return upload_auth.get("token")
return None return None
# ----------------------------------------------------------------------------- def extract_blob_from_upload_blob_result(resp_obj):
# ffmpeg compression helpers blob = getattr(resp_obj, "blob", None)
# ----------------------------------------------------------------------------- if blob is not None:
return blob
if isinstance(resp_obj, dict):
return resp_obj.get("blob")
return None
# ---------------------------
# ffmpeg compression
# ---------------------------
def ffmpeg_exists() -> bool: def ffmpeg_exists() -> bool:
"""Check if ffmpeg is installed."""
return shutil.which("ffmpeg") is not None return shutil.which("ffmpeg") is not None
def ffprobe_exists() -> bool: def ffprobe_exists() -> bool:
"""Check if ffprobe is installed."""
return shutil.which("ffprobe") is not None return shutil.which("ffprobe") is not None
def get_video_duration_seconds(path: str) -> float | None: def get_video_duration_seconds(path: str) -> float | None:
"""Read video duration via ffprobe; return None if unavailable."""
if not ffprobe_exists(): if not ffprobe_exists():
return None return None
try: try:
out = subprocess.check_output([ out = subprocess.check_output(
[
"ffprobe", "-v", "error", "ffprobe", "-v", "error",
"-show_entries", "format=duration", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", "-of", "default=noprint_wrappers=1:nokey=1",
path path,
], stderr=subprocess.STDOUT, text=True).strip() ],
stderr=subprocess.STDOUT,
text=True,
).strip()
return float(out) return float(out)
except Exception: except Exception:
return None return None
def compress_video_ffmpeg( def compress_video_ffmpeg(
input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96 input_path: str,
max_size_mb: float = 45.0,
crf: int = 28,
preset: str = "veryfast",
audio_bitrate_k: int = 96,
) -> str | None: ) -> str | None:
"""
Compress video to H.264/AAC MP4, attempting to fit target size.
Returns:
- compressed temp path (if smaller),
- original input path (if already small / compressed not smaller),
- None on failure.
"""
if not ffmpeg_exists(): if not ffmpeg_exists():
logging.error("❌ ffmpeg not found. Install ffmpeg or use --no-compress-video.") logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.")
return None return None
if not os.path.exists(input_path): if not os.path.exists(input_path):
logging.error(f"❌ Video file not found: {input_path}") logging.error(f"❌ Video file not found: {input_path}")
@@ -244,17 +189,16 @@ def compress_video_ffmpeg(
src_size_mb = os.path.getsize(input_path) / (1024 * 1024) src_size_mb = os.path.getsize(input_path) / (1024 * 1024)
logging.info(f"📦 Source video size: {src_size_mb:.2f} MB") logging.info(f"📦 Source video size: {src_size_mb:.2f} MB")
if src_size_mb <= max_size_mb: if src_size_mb <= max_size_mb:
logging.info("✅ Already below size target. Skipping compression.") logging.info("✅ Already below target size. Skipping compression.")
return input_path return input_path
duration = get_video_duration_seconds(input_path) duration = get_video_duration_seconds(input_path)
target_video_k = 1200 # fallback target bitrate target_video_k = 1200
if duration and duration > 0: if duration and duration > 0:
total_kbps = (max_size_mb * 8192.0) / duration total_kbps = (max_size_mb * 8192.0) / duration
target_video_k = int(max(300, total_kbps - audio_bitrate_k)) target_video_k = int(max(300, total_kbps - audio_bitrate_k))
target_video_k = min(max(target_video_k, 300), 5000) target_video_k = min(max(target_video_k, 300), 5000)
# Create a temporary output file for compressed video
fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4")
os.close(fd) os.close(fd)
@@ -279,12 +223,10 @@ def compress_video_ffmpeg(
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out_size_mb = os.path.getsize(out_path) / (1024 * 1024) out_size_mb = os.path.getsize(out_path) / (1024 * 1024)
logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB") logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB")
# Keep compressed only if it is truly smaller
if out_size_mb < src_size_mb: if out_size_mb < src_size_mb:
return out_path return out_path
os.remove(out_path) os.remove(out_path)
logging.info(" Compression not smaller than source. Using original.")
return input_path return input_path
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logging.error("❌ ffmpeg compression failed.") logging.error("❌ ffmpeg compression failed.")
@@ -297,20 +239,50 @@ def compress_video_ffmpeg(
return None return None
# ----------------------------------------------------------------------------- # ---------------------------
# Video upload paths # Media upload: image
# ----------------------------------------------------------------------------- # ---------------------------
def upload_video_via_bsky_service(client: Client, video_path: str, service_url: str, alt_text: str = "") -> dict | None: def upload_image_embed_dict(client: Client, image_path: str, alt_text: str = "") -> dict | None:
""" if not os.path.exists(image_path):
Primary video upload path through https://video.bsky.app. logging.error(f"❌ Image file not found: {image_path}")
return None
IMPORTANT: mime, _ = mimetypes.guess_type(image_path)
- getServiceAuth.aud must be your PDS DID (did:web:<pds-host>) with open(image_path, "rb") as f:
- getServiceAuth.lxm must be "com.atproto.repo.uploadBlob" data = f.read()
Returns raw embed dict: logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime or 'unknown'})")
{"$type":"app.bsky.embed.video","video":<blob>,"alt":"..."} try:
""" up = client.upload_blob(data)
blob = extract_blob_from_upload_blob_result(up)
if blob is None:
logging.error("❌ uploadBlob returned no blob for image.")
return None
# Raw lexicon dict embed (cross-SDK safe)
return {
"$type": "app.bsky.embed.images",
"images": [
{
"alt": alt_text or "",
"image": blob,
}
],
}
except Exception as e:
logging.error(f"❌ Image upload failed: {repr(e)}")
return None
# ---------------------------
# Media upload: video via video.bsky.app (primary)
# ---------------------------
def upload_video_via_video_service_embed_dict(
client: Client,
video_path: str,
service_url: str,
alt_text: str = "",
) -> dict | None:
if not os.path.exists(video_path): if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}") logging.error(f"❌ Video file not found: {video_path}")
return None return None
@@ -324,78 +296,84 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url:
video_host = "https://video.bsky.app" video_host = "https://video.bsky.app"
pds_did = pds_did_from_service_url(service_url) pds_did = pds_did_from_service_url(service_url)
# Request service-auth token from user's PDS for uploadBlob lexicon method # getServiceAuth from user's PDS with correct audience + method binding
try: try:
params = models.ComAtprotoServerGetServiceAuth.Params( auth_resp = client.com.atproto.server.get_service_auth(
aud=pds_did, {"aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 1800}
lxm="com.atproto.repo.uploadBlob",
exp=int(time.time()) + 60 * 30,
) )
upload_auth = client.com.atproto.server.get_service_auth(params) except Exception as e:
except Exception: logging.error(f"❌ getServiceAuth failed: {repr(e)}")
# Fallback for SDK variants expecting dict input return None
upload_auth = client.com.atproto.server.get_service_auth({
"aud": pds_did,
"lxm": "com.atproto.repo.uploadBlob",
"exp": int(time.time()) + 60 * 30,
})
token = _extract_service_auth_token(upload_auth) token = extract_token_from_service_auth(auth_resp)
if not token: if not token:
logging.error(" Could not get service auth token.") logging.error("❌ getServiceAuth returned no token.")
return None return None
upload_name = random_video_name(".mp4") upload_name = random_video_name(".mp4")
logging.info(f"🎞️ Upload name: {upload_name}") logging.info(f"🎞️ Upload name: {upload_name}")
upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"} headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "video/mp4",
}
# Upload video bytes to video service try:
resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240)
except Exception as e:
# 200 = accepted, 409 already_exists = dedupe hit (still usable with jobId) logging.error(f"❌ video upload request failed: {repr(e)}")
if resp.status_code not in (200, 409):
logging.error(f"❌ video.bsky.app upload failed: {resp.status_code} - {resp.text}")
return None return None
body = resp.json() if r.status_code not in (200, 409):
if resp.status_code == 409: logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}")
if body.get("error") == "already_exists" and body.get("jobId"): return None
logging.info(" Video already processed on video.bsky.app. Reusing job.")
payload = r.json()
# Dedupe path: reuse existing job
if r.status_code == 409:
if payload.get("error") == "already_exists" and payload.get("jobId"):
logging.info(" Video already processed on video.bsky.app. Reusing existing job.")
else: else:
logging.error(f"❌ 409 without reusable jobId: {body}") logging.error(f"❌ 409 without reusable jobId: {payload}")
return None return None
job_id = body.get("jobId") job_id = payload.get("jobId")
if not job_id: if not job_id:
logging.error(f"Missing jobId in upload response: {body}") logging.error(f"No jobId in video upload response: {payload}")
return None return None
# Poll processing status
logging.info(f"⏳ Job {job_id} accepted — polling status...") logging.info(f"⏳ Job {job_id} accepted — polling status...")
status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600 deadline = time.time() + 600 # 10 min max poll
while time.time() < deadline: while time.time() < deadline:
try:
s = requests.get(status_url, params={"jobId": job_id}, timeout=30) s = requests.get(status_url, params={"jobId": job_id}, timeout=30)
except Exception as e:
logging.warning(f"⚠️ Status poll request failed once: {repr(e)}")
time.sleep(3)
continue
if s.status_code != 200: if s.status_code != 200:
logging.error(f"❌ Job status check failed: {s.status_code} - {s.text}") logging.error(f"❌ Job status failed: {s.status_code} - {s.text}")
return None return None
status_json = s.json() body = s.json()
job_status = status_json.get("jobStatus", {}) job_status = body.get("jobStatus", {})
state = job_status.get("state") state = job_status.get("state")
if state == "JOB_STATE_COMPLETED": if state == "JOB_STATE_COMPLETED":
blob = job_status.get("blob") blob = job_status.get("blob")
if not blob: if not blob:
logging.error(f"❌ Completed job without blob: {status_json}") logging.error(f"❌ Completed job without blob: {body}")
return None return None
wait_with_heartbeat(8, "CDN propagation") wait_with_heartbeat(8, "CDN propagation")
# Return RAW lexicon embed dict (no models.BlobRef dependency) # RAW embed dict; no BlobRef conversion at all.
return { return {
"$type": "app.bsky.embed.video", "$type": "app.bsky.embed.video",
"video": blob, "video": blob,
@@ -413,30 +391,33 @@ def upload_video_via_bsky_service(client: Client, video_path: str, service_url:
return None return None
def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0) -> dict | None: # ---------------------------
""" # Media upload: direct PDS fallback (optional)
Optional fallback: direct uploadBlob to PDS. # ---------------------------
def upload_video_via_pds_embed_dict(
client: Client,
video_path: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
) -> dict | None:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
Note:
- This can be less reliable for playback compatibility depending on client/AppView.
"""
try: try:
with open(video_path, "rb") as f: with open(video_path, "rb") as f:
video_bytes = f.read() b = f.read()
size_mb = len(video_bytes) / (1024 * 1024) size_mb = len(b) / (1024 * 1024)
logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)")
r = client.upload_blob(video_bytes) up = client.upload_blob(b)
blob = extract_blob_from_upload_blob_result(up)
wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing")
blob = getattr(r, "blob", None)
if blob is None and isinstance(r, dict):
blob = r.get("blob")
if blob is None: if blob is None:
logging.error("❌ PDS uploadBlob returned no blob.") logging.error("❌ PDS uploadBlob returned no blob.")
return None return None
wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing")
return { return {
"$type": "app.bsky.embed.video", "$type": "app.bsky.embed.video",
"video": blob, "video": blob,
@@ -447,190 +428,121 @@ def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", se
return None return None
def upload_video_smart( def upload_video_smart_embed_dict(
client: Client, video_path: str, service_url: str, alt_text: str, client: Client,
settle_delay_seconds: float, allow_pds_video_fallback: bool video_path: str,
service_url: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
allow_pds_video_fallback: bool = False,
) -> dict | None: ) -> dict | None:
"""
Try video.bsky.app first. Optionally fallback to direct PDS upload.
"""
logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.")
embed = upload_video_via_bsky_service(client, video_path, service_url, alt_text) embed = upload_video_via_video_service_embed_dict(
client=client,
video_path=video_path,
service_url=service_url,
alt_text=alt_text,
)
if embed: if embed:
return embed return embed
if allow_pds_video_fallback: if allow_pds_video_fallback:
logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.")
return upload_video_via_pds(client, video_path, alt_text, settle_delay_seconds) return upload_video_via_pds_embed_dict(
logging.error("❌ video.bsky.app failed and fallback disabled.")
return None
# -----------------------------------------------------------------------------
# Image upload
# -----------------------------------------------------------------------------
def upload_image(client: Client, image_path: str, alt_text: str = "") -> dict | None:
"""
Upload image blob and return raw app.bsky.embed.images dict.
"""
try:
if not os.path.exists(image_path):
logging.error(f"❌ Image file not found: {image_path}")
return None
mime = detect_mime_type(image_path)
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
r = client.upload_blob(data)
blob = getattr(r, "blob", None)
if blob is None and isinstance(r, dict):
blob = r.get("blob")
if blob is None:
logging.error("❌ uploadBlob returned no blob for image.")
return None
return {
"$type": "app.bsky.embed.images",
"images": [{"alt": alt_text or "", "image": blob}],
}
except Exception as e:
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
# -----------------------------------------------------------------------------
# Post creation
# -----------------------------------------------------------------------------
def post_to_bsky(
client: Client,
text: str,
langs: list[str],
image_path: str | None,
video_path: str | None,
alt_text: str,
service_url: str,
video_settle_delay: float,
allow_pds_video_fallback: bool,
) -> bool:
"""
Build and send app.bsky.feed.post with optional media embed.
"""
post_text = text.strip()
if not post_text and not image_path and not video_path:
logging.error("❌ Empty post text with no media is not allowed.")
return False
embed_dict = None
if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}")
embed_dict = upload_video_smart(
client=client, client=client,
video_path=video_path, video_path=video_path,
service_url=service_url,
alt_text=alt_text, alt_text=alt_text,
settle_delay_seconds=video_settle_delay, settle_delay_seconds=settle_delay_seconds,
allow_pds_video_fallback=allow_pds_video_fallback,
) )
if not embed_dict:
logging.error("❌ Aborting post: video upload/processing failed.")
return False
elif image_path: logging.error("❌ video.bsky.app failed and fallback is disabled.")
embed_dict = upload_image(client, image_path, alt_text) return None
if not embed_dict:
logging.error("❌ Aborting post: image upload failed.")
return False
# Explicit record: safest way to guarantee text and schema behavior
# ---------------------------
# Create post
# ---------------------------
def create_post_record(
text: str,
langs: list[str],
embed_dict: dict | None = None,
) -> dict:
record = { record = {
"$type": "app.bsky.feed.post", "$type": "app.bsky.feed.post",
"text": post_text, "text": text.strip(), # must be plain string
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
} }
if langs: if langs:
record["langs"] = langs record["langs"] = langs
if embed_dict: if embed_dict is not None:
record["embed"] = embed_dict record["embed"] = embed_dict
return record
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
# Typed call first; dict fallback for SDK differences def publish_post(client: Client, record: dict) -> bool:
try: try:
data = models.ComAtprotoRepoCreateRecord.Data( # Use dict payload directly for max cross-version compatibility.
repo=client.me.did, resp = client.com.atproto.repo.create_record(
collection="app.bsky.feed.post", {
record=record,
)
resp = client.com.atproto.repo.create_record(data)
except Exception:
resp = client.com.atproto.repo.create_record({
"repo": client.me.did, "repo": client.me.did,
"collection": "app.bsky.feed.post", "collection": "app.bsky.feed.post",
"record": record, "record": record,
}) }
)
uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None)
logging.info(f"✅ Post published! URI: {uri}") logging.info(f"✅ Post published! URI: {uri}")
return True return True
except Exception as e:
logging.error(f"❌ createRecord failed: {repr(e)}")
return False
# ----------------------------------------------------------------------------- # ---------------------------
# CLI entrypoint # Main
# ----------------------------------------------------------------------------- # ---------------------------
def main(): def main():
setup_logging() setup_logging()
parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.")
parser.add_argument("text", help="Post text content") parser.add_argument("text", help="Post text")
parser.add_argument("--username", required=True, help="Bluesky handle/email") parser.add_argument("--username", required=True, help="Handle/email")
parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--password", required=True, help="App password")
parser.add_argument("--service", default="https://bsky.social", help="PDS URL") parser.add_argument("--service", default="https://bsky.social", help="PDS URL")
parser.add_argument("--lang", default="ca", help="Comma-separated language codes") parser.add_argument("--lang", default="ca", help="Comma-separated language codes")
parser.add_argument("--image", default=None, help="Path to image file") parser.add_argument("--image", default=None, help="Image path")
parser.add_argument("--video", default=None, help="Path to video file") parser.add_argument("--video", default=None, help="Video path")
parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument("--alt", default="", help="Alt text")
parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait seconds") parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait")
parser.add_argument("--allow-pds-video-fallback", action="store_true", help="Allow direct PDS video fallback") parser.add_argument("--allow-pds-video-fallback", action="store_true")
# Compression options (enabled by default) # Compression ON by default
parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True, parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True)
help="Compress video before upload (default: enabled)") parser.add_argument("--no-compress-video", dest="compress_video", action="store_false")
parser.add_argument("--no-compress-video", dest="compress_video", action="store_false", parser.add_argument("--max-video-mb", type=float, default=45.0)
help="Disable compression") parser.add_argument("--ffmpeg-crf", type=int, default=28)
parser.add_argument("--max-video-mb", type=float, default=45.0, help="Target max size after compression") parser.add_argument("--ffmpeg-preset", default="veryfast")
parser.add_argument("--ffmpeg-crf", type=int, default=28, help="CRF (lower=better quality/larger)")
parser.add_argument("--ffmpeg-preset", default="veryfast", help="x264 preset")
args = parser.parse_args() args = parser.parse_args()
# Exactly one media type at a time
if args.image and args.video: if args.image and args.video:
logging.error("❌ Use either --image or --video, not both.") logging.error("❌ Use either --image or --video, not both.")
sys.exit(1) sys.exit(1)
# Initialize client against selected PDS
client = Client(base_url=args.service) client = Client(base_url=args.service)
# Login with retries
if not login_with_backoff( if not login_with_backoff(
client, args.username, args.password, args.service, client=client,
RetryConfig.login_max_attempts, username=args.username,
RetryConfig.login_base_delay_seconds, password=args.password,
RetryConfig.login_max_delay_seconds, service_url=args.service,
RetryConfig.login_jitter_seconds,
): ):
sys.exit(1) sys.exit(1)
# Parse languages
langs = [x.strip() for x in args.lang.split(",") if x.strip()] langs = [x.strip() for x in args.lang.split(",") if x.strip()]
# Optional pre-upload compression for videos
video_path_for_upload = args.video video_path_for_upload = args.video
temp_compressed_path = None temp_compressed_path = None
if args.video and args.compress_video: if args.video and args.compress_video:
compressed = compress_video_ffmpeg( compressed = compress_video_ffmpeg(
input_path=args.video, input_path=args.video,
@@ -646,23 +558,40 @@ def main():
if compressed != args.video: if compressed != args.video:
temp_compressed_path = compressed temp_compressed_path = compressed
# Build and publish post embed_dict = None
ok = post_to_bsky(
if video_path_for_upload:
logging.info(f"🎬 Preparing video upload: {video_path_for_upload}")
embed_dict = upload_video_smart_embed_dict(
client=client, client=client,
text=args.text,
langs=langs,
image_path=args.image,
video_path=video_path_for_upload, video_path=video_path_for_upload,
alt_text=args.alt,
service_url=args.service, service_url=args.service,
video_settle_delay=args.video_settle_delay, alt_text=args.alt,
settle_delay_seconds=args.video_settle_delay,
allow_pds_video_fallback=args.allow_pds_video_fallback, allow_pds_video_fallback=args.allow_pds_video_fallback,
) )
if embed_dict is None:
# Cleanup temp compressed file, if any logging.error("❌ Aborting post: video upload/processing failed.")
try:
if temp_compressed_path and os.path.exists(temp_compressed_path): if temp_compressed_path and os.path.exists(temp_compressed_path):
os.remove(temp_compressed_path) os.remove(temp_compressed_path)
sys.exit(1)
elif args.image:
embed_dict = upload_image_embed_dict(client=client, image_path=args.image, alt_text=args.alt)
if embed_dict is None:
logging.error("❌ Aborting post: image upload failed.")
if temp_compressed_path and os.path.exists(temp_compressed_path):
os.remove(temp_compressed_path)
sys.exit(1)
record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict)
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
ok = publish_post(client=client, record=record)
if temp_compressed_path and os.path.exists(temp_compressed_path):
try:
os.remove(temp_compressed_path)
logging.info(f"🧹 Removed temp file: {temp_compressed_path}") logging.info(f"🧹 Removed temp file: {temp_compressed_path}")
except Exception: except Exception:
pass pass