diff --git a/bsky_post.py b/bsky_post.py
index 3d4d2ef..3f4bc9e 100644
--- a/bsky_post.py
+++ b/bsky_post.py
@@ -7,6 +7,15 @@ Usage examples:
python3 bsky_post.py "Dijous!!!!" --image thursday.jpg
python3 bsky_post.py "Bon dia!"
python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca
+
+Notes:
+ * Self-hosted / federated PDSes (e.g. eurosky.social) upload videos
+ DIRECTLY to the user's PDS as a generic blob — no video.bsky.app
+ service auth is required.
+ * The official Bluesky network (bsky.social, *.bsky.network) uses the
+ centralized video.bsky.app pipeline with a service-auth token scoped
+ via `lxm` to `app.bsky.video.uploadVideo`.
+ * The script auto-detects which path to use based on --service.
"""
import argparse
@@ -273,26 +282,21 @@ def login_with_backoff(
# ============================================================
-# URL / DID helpers
+# PDS detection
# ============================================================
-def normalize_pds_base(service_url: str) -> str:
+def is_official_bsky_pds(service_url: str) -> bool:
"""
- Strip trailing slashes and any trailing /xrpc segment so callers can safely
- compose '/xrpc/' URLs.
+ Detect whether the configured PDS is part of the official Bluesky-operated
+ network. Self-hosted/federated PDSes (e.g. eurosky.social) return False.
"""
- base = service_url.rstrip("/")
- if base.endswith("/xrpc"):
- base = base[: -len("/xrpc")]
- return base
-
-
-def pds_did_from_base(pds_base: str) -> str:
- """
- Derive the did:web for a PDS from its base URL hostname.
- Works for typical atproto PDS deployments (e.g. https://eurosky.social → did:web:eurosky.social).
- """
- host = urlparse(pds_base).netloc
- return f"did:web:{host}"
+ try:
+ host = (urlparse(service_url).hostname or "").lower()
+ return (
+ host in {"bsky.social", "bsky.app"}
+ or host.endswith(".bsky.network")
+ )
+ except Exception:
+ return False
# ============================================================
@@ -338,100 +342,181 @@ def upload_image(
# ============================================================
-# Media upload — Video
+# Media upload — Video (PDS-direct)
# ============================================================
-
-def upload_video_and_wait(
+def upload_video_via_pds(
client: Client,
- video_data: bytes,
+ video_path: str,
alt_text: str = "",
- service_url: str = "https://bsky.social", # kept for signature stability
) -> models.AppBskyEmbedVideo.Main | None:
"""
- Upload a video to the Bluesky video service (video.bsky.app) and wait for processing.
+ Upload a video as a generic blob directly to the user's PDS.
- The video service is shared infrastructure across the atproto network — even
- federated PDSes (e.g. eurosky.social) use video.bsky.app for upload/processing.
- The resulting blob is stored back in the user's home PDS automatically.
+ Used for self-hosted / federated PDSes (e.g. eurosky.social) that don't
+ proxy to the centralized video.bsky.app service. The PDS stores the bytes
+ and returns a blob ref we can embed directly.
"""
try:
- VIDEO_SERVICE_HOST = "https://video.bsky.app"
- VIDEO_SERVICE_DID = "did:web:video.bsky.app"
+ if not os.path.exists(video_path):
+ logging.error(f"❌ Video file not found: {video_path}")
+ return None
- logging.info(f"🎬 Using shared video service at {VIDEO_SERVICE_HOST}")
+ with open(video_path, "rb") as f:
+ video_bytes = f.read()
- # --- Token #1: bound to uploadVideo ---
- logging.info("🎬 Requesting Service Auth for Video Upload...")
- upload_auth = client.com.atproto.server.get_service_auth({
- 'aud': VIDEO_SERVICE_DID,
- 'lxm': 'app.bsky.video.uploadVideo',
- 'exp': int(time.time()) + 60 * 30,
- })
- upload_token = upload_auth.token
+ size_mb = len(video_bytes) / (1024 * 1024)
+ logging.info(f"🎬 [PDS-direct] Uploading video to home PDS: {video_path} ({size_mb:.2f} MB)")
- # The video service needs the user's DID in the upload URL as a query param
- user_did = client.me.did
- upload_url = (
- f"{VIDEO_SERVICE_HOST}/xrpc/app.bsky.video.uploadVideo"
- f"?did={user_did}&name={int(time.time())}.mp4"
+ response = client.upload_blob(video_bytes)
+ blob = response.blob
+ logging.info(f"✅ [PDS-direct] Video blob uploaded successfully.")
+
+ return models.AppBskyEmbedVideo.Main(
+ video=blob,
+ alt=alt_text,
)
- upload_headers = {
- "Authorization": f"Bearer {upload_token}",
+ except Exception as e:
+ logging.error(f"❌ PDS-direct video upload failed: {repr(e)}")
+ return None
+
+
+# ============================================================
+# Media upload — Video (video.bsky.app shared service)
+# ============================================================
+def upload_video_via_bsky_service(
+ client: Client,
+ video_path: str,
+ alt_text: str = "",
+) -> models.AppBskyEmbedVideo.Main | None:
+ """
+ Upload a video via the centralized video.bsky.app service.
+ Used only when the configured PDS is on the official Bluesky network.
+ """
+ try:
+ if not os.path.exists(video_path):
+ logging.error(f"❌ Video file not found: {video_path}")
+ return None
+
+ with open(video_path, "rb") as f:
+ video_bytes = f.read()
+
+ size_mb = len(video_bytes) / (1024 * 1024)
+ logging.info(f"🎬 [video.bsky.app] Uploading via shared service: {video_path} ({size_mb:.2f} MB)")
+
+ VIDEO_HOST = "https://video.bsky.app"
+ VIDEO_DID = "did:web:video.bsky.app"
+
+ # --- Service auth token (lxm-scoped → 30 min OK) ---
+ upload_auth = client.com.atproto.server.get_service_auth({
+ "aud": VIDEO_DID,
+ "lxm": "app.bsky.video.uploadVideo",
+ "exp": int(time.time()) + 60 * 30,
+ })
+
+ user_did = client.me.did
+ upload_url = (
+ f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo"
+ f"?did={user_did}&name={int(time.time())}.mp4"
+ )
+ headers = {
+ "Authorization": f"Bearer {upload_auth.token}",
"Content-Type": "video/mp4",
}
- logging.info(f"🎬 Uploading video to {upload_url} ...")
- upload_resp = requests.post(upload_url, headers=upload_headers, data=video_data)
-
+ upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=120)
if upload_resp.status_code != 200:
- logging.error(f"❌ Video upload failed: {upload_resp.status_code} - {upload_resp.text}")
+ logging.error(
+ f"❌ video.bsky.app upload failed: "
+ f"{upload_resp.status_code} - {upload_resp.text}"
+ )
return None
job_id = upload_resp.json().get("jobId")
if not job_id:
- logging.error("❌ No Job ID returned from video service.")
+ logging.error("❌ No jobId returned from video service.")
return None
- logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...")
+ logging.info(f"⏳ Job {job_id} accepted — polling status...")
- # --- getJobStatus is unauthenticated on video.bsky.app ---
- # (per the XRPC walkthrough, no auth header is required for status polling)
- status_url = f"{VIDEO_SERVICE_HOST}/xrpc/app.bsky.video.getJobStatus"
- params = {"jobId": job_id}
-
- while True:
- status_resp = requests.get(status_url, params=params)
+ # getJobStatus is unauthenticated on video.bsky.app
+ status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus"
+ deadline = time.time() + 300 # 5-minute ceiling
+ while time.time() < deadline:
+ status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30)
if status_resp.status_code != 200:
- logging.error(f"❌ Failed to get job status: {status_resp.status_code} - {status_resp.text}")
+ logging.error(
+ f"❌ Job status check failed: "
+ f"{status_resp.status_code} - {status_resp.text}"
+ )
return None
job_status = status_resp.json().get("jobStatus", {})
state = job_status.get("state")
- if state == 'JOB_STATE_COMPLETED':
+ if state == "JOB_STATE_COMPLETED":
logging.info("✅ Processing complete! Waiting 10s for CDN propagation...")
time.sleep(10)
blob_dict = job_status.get("blob")
- blob_ref = models.BlobRef.from_dict(blob_dict)
+ if not blob_dict:
+ logging.error("❌ No blob in completed job status.")
+ return None
+ blob_ref = models.BlobRef.from_dict(blob_dict)
return models.AppBskyEmbedVideo.Main(
video=blob_ref,
alt=alt_text,
)
- elif state == 'JOB_STATE_FAILED':
- logging.error("❌ Video processing failed on Bluesky's servers.")
+
+ if state == "JOB_STATE_FAILED":
+ logging.error(f"❌ Video processing failed on Bluesky's servers: {job_status}")
return None
logging.info(" ...still processing...")
time.sleep(3)
- except Exception as e:
- logging.error(f"❌ Failed to upload/process video: {repr(e)}")
+ logging.error("❌ Video processing timed out after 5 minutes.")
return None
+ except Exception as e:
+ logging.error(f"❌ video.bsky.app upload failed: {repr(e)}")
+ return None
+
+
+# ============================================================
+# Media upload — Video (smart dispatcher)
+# ============================================================
+def upload_video_smart(
+ client: Client,
+ video_path: str,
+ service_url: str,
+ alt_text: str = "",
+) -> models.AppBskyEmbedVideo.Main | None:
+ """
+ Smart dispatcher: pick PDS-direct or video.bsky.app based on the PDS.
+
+ * Self-hosted / federated PDSes (eurosky.social etc.) → direct PDS upload
+ (no service auth needed; PDS stores video itself).
+ * Official Bluesky network (bsky.social, *.bsky.network) → video.bsky.app
+ with lxm-scoped service auth, falling back to PDS-direct on failure.
+ """
+ if is_official_bsky_pds(service_url):
+ logging.info(f"🌐 Detected official Bluesky PDS ({service_url}) — using video.bsky.app")
+ embed = upload_video_via_bsky_service(client, video_path, alt_text=alt_text)
+ if embed:
+ return embed
+
+ logging.warning(
+ "⚠️ video.bsky.app upload failed; falling back to direct PDS upload."
+ )
+ return upload_video_via_pds(client, video_path, alt_text=alt_text)
+
+ logging.info(f"🌍 Detected self-hosted/federated PDS ({service_url}) — using direct upload")
+ return upload_video_via_pds(client, video_path, alt_text=alt_text)
+
+
# ============================================================
# Post
# ============================================================
@@ -442,7 +527,6 @@ def post_to_bsky(
image_path: str | None = None,
video_path: str | None = None,
alt_text: str = "",
- password: str = "",
service_url: str = "https://bsky.social",
) -> bool:
rich_text = make_rich(text)
@@ -451,14 +535,12 @@ def post_to_bsky(
# --- VIDEO POSTING ---
if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}")
- with open(video_path, "rb") as f:
- video_data = f.read()
- video_embed = upload_video_and_wait(
+ video_embed = upload_video_smart(
client,
- video_data,
- alt_text=alt_text,
+ video_path,
service_url=service_url,
+ alt_text=alt_text,
)
if not video_embed:
@@ -539,7 +621,6 @@ def main():
image_path=args.image,
video_path=args.video,
alt_text=args.alt,
- password=args.password,
service_url=args.service,
)