diff --git a/bsky_post.py b/bsky_post.py
index 713644c..9d87461 100644
--- a/bsky_post.py
+++ b/bsky_post.py
@@ -17,11 +17,10 @@ import sys
import time
import random
import re
-import base64
-import json
import requests
from dataclasses import dataclass
+from urllib.parse import urlparse
from atproto import Client, client_utils, models
@@ -151,7 +150,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: floa
try:
now_ts = int(time.time())
- # Direct headers on exception
headers = getattr(error_obj, "headers", None) or {}
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
@@ -169,7 +167,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: floa
pass
try:
- # Nested response headers
response = getattr(error_obj, "response", None)
headers = getattr(response, "headers", None) or {}
now_ts = int(time.time())
@@ -189,7 +186,6 @@ def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: floa
except Exception:
pass
- # repr fallback parsing
text = repr(error_obj)
m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE)
if m:
@@ -233,12 +229,10 @@ def login_with_backoff(
except Exception as e:
logging.exception("❌ Login exception")
- # Fail fast on invalid credentials
if is_auth_error(e):
logging.error("❌ Bad credentials. Check handle/password.")
return False
- # Respect explicit rate-limit timing
if is_rate_limited_error(e):
if attempt < max_attempts:
wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay)
@@ -253,7 +247,6 @@ def login_with_backoff(
logging.error("❌ Exhausted login retries due to rate limiting.")
return False
- # Retry transient/network problems
if is_network_error(e) or is_transient_error(e):
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
@@ -267,7 +260,6 @@ def login_with_backoff(
logging.error("❌ Exhausted login retries after transient/network errors.")
return False
- # Unknown errors: bounded retry anyway
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(
@@ -281,7 +273,30 @@ def login_with_backoff(
# ============================================================
-# Media upload
+# URL / DID helpers
+# ============================================================
+def normalize_pds_base(service_url: str) -> str:
+ """
+ Strip trailing slashes and any trailing /xrpc segment so callers can safely
+ compose '/xrpc/' URLs.
+ """
+ base = service_url.rstrip("/")
+ if base.endswith("/xrpc"):
+ base = base[: -len("/xrpc")]
+ return base
+
+
+def pds_did_from_base(pds_base: str) -> str:
+ """
+ Derive the did:web for a PDS from its base URL hostname.
+ Works for typical atproto PDS deployments (e.g. https://eurosky.social → did:web:eurosky.social).
+ """
+ host = urlparse(pds_base).netloc
+ return f"did:web:{host}"
+
+
+# ============================================================
+# Media upload — Image
# ============================================================
def detect_mime_type(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
@@ -321,38 +336,46 @@ def upload_image(
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
+
+# ============================================================
+# Media upload — Video
+# ============================================================
def upload_video_and_wait(
client: Client,
video_data: bytes,
- alt_text: str = ""
+ alt_text: str = "",
+ service_url: str = "https://bsky.social",
) -> models.AppBskyEmbedVideo.Main | None:
- try:
- # --- Resolve PDS host + DID dynamically ---
- # The Client stores the service URL it logged into
- pds_base = str(client._base_url).rstrip("/") # e.g. https://eurosky.social
+ """
+ Upload a video to the user's PDS video service and wait for processing.
- # Derive the DID from the hostname (works for did:web PDSes)
- from urllib.parse import urlparse
- host = urlparse(pds_base).netloc
- service_did = f"did:web:{host}"
+ Notes on portability:
+ * Self-hosted/federated PDSes (e.g. eurosky.social) typically host the
+ video XRPC endpoints on the PDS itself, with `aud = did:web:`.
+ * The official Bluesky network uses a separate host (video.bsky.app),
+ but this implementation targets the PDS-hosted variant which works
+ for both eurosky-style PDSes and any conforming atproto deployment.
+ """
+ try:
+ pds_base = normalize_pds_base(service_url)
+ service_did = pds_did_from_base(pds_base)
logging.info(f"🎬 Using video service at {pds_base} (aud={service_did})")
- # --- Token #1: uploadVideo, scoped to *this* PDS ---
+ # --- Token #1: bound to uploadVideo ---
logging.info("🎬 Requesting Service Auth for Video Upload...")
upload_auth = client.com.atproto.server.get_service_auth({
'aud': service_did,
'lxm': 'app.bsky.video.uploadVideo',
- 'exp': int(time.time()) + 60 * 30,
+ 'exp': int(time.time()) + 60 * 30, # 30 min (allowed because lxm is set)
})
upload_headers = {
"Authorization": f"Bearer {upload_auth.token}",
"Content-Type": "video/mp4",
}
- # --- Upload to the PDS, not video.bsky.app ---
upload_url = f"{pds_base}/xrpc/app.bsky.video.uploadVideo"
- logging.info(f"🎬 Uploading video to {upload_url}...")
+ logging.info(f"🎬 Uploading video to {upload_url} ...")
upload_resp = requests.post(upload_url, headers=upload_headers, data=video_data)
if upload_resp.status_code != 200:
@@ -366,7 +389,7 @@ def upload_video_and_wait(
logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...")
- # --- Token #2: getJobStatus ---
+ # --- Token #2: bound to getJobStatus ---
status_auth = client.com.atproto.server.get_service_auth({
'aud': service_did,
'lxm': 'app.bsky.video.getJobStatus',
@@ -396,7 +419,7 @@ def upload_video_and_wait(
return models.AppBskyEmbedVideo.Main(
video=blob_ref,
- alt=alt_text
+ alt=alt_text,
)
elif state == 'JOB_STATE_FAILED':
logging.error("❌ Video processing failed on Bluesky's servers.")
@@ -409,6 +432,7 @@ def upload_video_and_wait(
logging.error(f"❌ Failed to upload/process video: {repr(e)}")
return None
+
# ============================================================
# Post
# ============================================================
@@ -420,6 +444,7 @@ def post_to_bsky(
video_path: str | None = None,
alt_text: str = "",
password: str = "",
+ service_url: str = "https://bsky.social",
) -> bool:
rich_text = make_rich(text)
@@ -429,21 +454,23 @@ def post_to_bsky(
logging.info(f"🎬 Preparing video upload: {video_path}")
with open(video_path, "rb") as f:
video_data = f.read()
-
- # Pass the password to our custom polling function
- # Use our custom polling function (no password needed)
- video_embed = upload_video_and_wait(client, video_data, alt_text)
-
+ video_embed = upload_video_and_wait(
+ client,
+ video_data,
+ alt_text=alt_text,
+ service_url=service_url,
+ )
+
if not video_embed:
logging.error("❌ Aborting post: video upload/processing failed.")
return False
-
- logging.info(f"🚀 Sending video post...")
+
+ logging.info("🚀 Sending video post...")
result = client.send_post(
- text=rich_text,
- embed=video_embed,
- langs=langs
+ text=rich_text,
+ embed=video_embed,
+ langs=langs,
)
# --- IMAGE POSTING ---
@@ -452,14 +479,14 @@ def post_to_bsky(
if not image:
logging.error("❌ Aborting post: image upload failed.")
return False
-
+
embed = models.AppBskyEmbedImages.Main(images=[image])
- logging.info(f"🚀 Sending image post...")
+ logging.info("🚀 Sending image post...")
result = client.send_post(text=rich_text, embed=embed, langs=langs)
# --- TEXT ONLY POSTING ---
else:
- logging.info(f"🚀 Sending text post...")
+ logging.info("🚀 Sending text post...")
result = client.send_post(text=rich_text, langs=langs)
uri = getattr(result, "uri", None)
@@ -514,6 +541,7 @@ def main():
video_path=args.video,
alt_text=args.alt,
password=args.password,
+ service_url=args.service,
)
if not post_success: