This commit is contained in:
Guillem Hernandez Sola
2026-05-08 11:42:37 +02:00
parent 1072b1e621
commit 337a59039a

View File

@@ -7,6 +7,15 @@ Usage examples:
python3 bsky_post.py "Dijous!!!!" --image thursday.jpg python3 bsky_post.py "Dijous!!!!" --image thursday.jpg
python3 bsky_post.py "Bon dia!" python3 bsky_post.py "Bon dia!"
python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca
Notes:
* Self-hosted / federated PDSes (e.g. eurosky.social) upload videos
DIRECTLY to the user's PDS as a generic blob — no video.bsky.app
service auth is required.
* The official Bluesky network (bsky.social, *.bsky.network) uses the
centralized video.bsky.app pipeline with a service-auth token scoped
via `lxm` to `app.bsky.video.uploadVideo`.
* The script auto-detects which path to use based on --service.
""" """
import argparse import argparse
@@ -273,26 +282,21 @@ def login_with_backoff(
# ============================================================ # ============================================================
# URL / DID helpers # PDS detection
# ============================================================ # ============================================================
def normalize_pds_base(service_url: str) -> str: def is_official_bsky_pds(service_url: str) -> bool:
""" """
Strip trailing slashes and any trailing /xrpc segment so callers can safely Detect whether the configured PDS is part of the official Bluesky-operated
compose '<base>/xrpc/<method>' URLs. network. Self-hosted/federated PDSes (e.g. eurosky.social) return False.
""" """
base = service_url.rstrip("/") try:
if base.endswith("/xrpc"): host = (urlparse(service_url).hostname or "").lower()
base = base[: -len("/xrpc")] return (
return base host in {"bsky.social", "bsky.app"}
or host.endswith(".bsky.network")
)
def pds_did_from_base(pds_base: str) -> str: except Exception:
""" return False
Derive the did:web for a PDS from its base URL hostname.
Works for typical atproto PDS deployments (e.g. https://eurosky.social → did:web:eurosky.social).
"""
host = urlparse(pds_base).netloc
return f"did:web:{host}"
# ============================================================ # ============================================================
@@ -338,100 +342,181 @@ def upload_image(
# ============================================================ # ============================================================
# Media upload — Video # Media upload — Video (PDS-direct)
# ============================================================ # ============================================================
def upload_video_via_pds(
def upload_video_and_wait(
client: Client, client: Client,
video_data: bytes, video_path: str,
alt_text: str = "", alt_text: str = "",
service_url: str = "https://bsky.social", # kept for signature stability
) -> models.AppBskyEmbedVideo.Main | None: ) -> models.AppBskyEmbedVideo.Main | None:
""" """
Upload a video to the Bluesky video service (video.bsky.app) and wait for processing. Upload a video as a generic blob directly to the user's PDS.
The video service is shared infrastructure across the atproto network — even Used for self-hosted / federated PDSes (e.g. eurosky.social) that don't
federated PDSes (e.g. eurosky.social) use video.bsky.app for upload/processing. proxy to the centralized video.bsky.app service. The PDS stores the bytes
The resulting blob is stored back in the user's home PDS automatically. and returns a blob ref we can embed directly.
""" """
try: try:
VIDEO_SERVICE_HOST = "https://video.bsky.app" if not os.path.exists(video_path):
VIDEO_SERVICE_DID = "did:web:video.bsky.app" logging.error(f"❌ Video file not found: {video_path}")
return None
logging.info(f"🎬 Using shared video service at {VIDEO_SERVICE_HOST}") with open(video_path, "rb") as f:
video_bytes = f.read()
# --- Token #1: bound to uploadVideo --- size_mb = len(video_bytes) / (1024 * 1024)
logging.info("🎬 Requesting Service Auth for Video Upload...") logging.info(f"🎬 [PDS-direct] Uploading video to home PDS: {video_path} ({size_mb:.2f} MB)")
upload_auth = client.com.atproto.server.get_service_auth({
'aud': VIDEO_SERVICE_DID,
'lxm': 'app.bsky.video.uploadVideo',
'exp': int(time.time()) + 60 * 30,
})
upload_token = upload_auth.token
# The video service needs the user's DID in the upload URL as a query param response = client.upload_blob(video_bytes)
user_did = client.me.did blob = response.blob
upload_url = ( logging.info(f"✅ [PDS-direct] Video blob uploaded successfully.")
f"{VIDEO_SERVICE_HOST}/xrpc/app.bsky.video.uploadVideo"
f"?did={user_did}&name={int(time.time())}.mp4" return models.AppBskyEmbedVideo.Main(
video=blob,
alt=alt_text,
) )
upload_headers = { except Exception as e:
"Authorization": f"Bearer {upload_token}", logging.error(f"❌ PDS-direct video upload failed: {repr(e)}")
return None
# ============================================================
# Media upload — Video (video.bsky.app shared service)
# ============================================================
def upload_video_via_bsky_service(
client: Client,
video_path: str,
alt_text: str = "",
) -> models.AppBskyEmbedVideo.Main | None:
"""
Upload a video via the centralized video.bsky.app service.
Used only when the configured PDS is on the official Bluesky network.
"""
try:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.info(f"🎬 [video.bsky.app] Uploading via shared service: {video_path} ({size_mb:.2f} MB)")
VIDEO_HOST = "https://video.bsky.app"
VIDEO_DID = "did:web:video.bsky.app"
# --- Service auth token (lxm-scoped → 30 min OK) ---
upload_auth = client.com.atproto.server.get_service_auth({
"aud": VIDEO_DID,
"lxm": "app.bsky.video.uploadVideo",
"exp": int(time.time()) + 60 * 30,
})
user_did = client.me.did
upload_url = (
f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo"
f"?did={user_did}&name={int(time.time())}.mp4"
)
headers = {
"Authorization": f"Bearer {upload_auth.token}",
"Content-Type": "video/mp4", "Content-Type": "video/mp4",
} }
logging.info(f"🎬 Uploading video to {upload_url} ...") upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=120)
upload_resp = requests.post(upload_url, headers=upload_headers, data=video_data)
if upload_resp.status_code != 200: if upload_resp.status_code != 200:
logging.error(f"❌ Video upload failed: {upload_resp.status_code} - {upload_resp.text}") logging.error(
f"❌ video.bsky.app upload failed: "
f"{upload_resp.status_code} - {upload_resp.text}"
)
return None return None
job_id = upload_resp.json().get("jobId") job_id = upload_resp.json().get("jobId")
if not job_id: if not job_id:
logging.error("❌ No Job ID returned from video service.") logging.error("❌ No jobId returned from video service.")
return None return None
logging.info(f"Video uploaded! Job ID: {job_id}. Waiting for processing...") logging.info(f"Job {job_id} accepted — polling status...")
# --- getJobStatus is unauthenticated on video.bsky.app --- # getJobStatus is unauthenticated on video.bsky.app
# (per the XRPC walkthrough, no auth header is required for status polling) status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus"
status_url = f"{VIDEO_SERVICE_HOST}/xrpc/app.bsky.video.getJobStatus" deadline = time.time() + 300 # 5-minute ceiling
params = {"jobId": job_id}
while True:
status_resp = requests.get(status_url, params=params)
while time.time() < deadline:
status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30)
if status_resp.status_code != 200: if status_resp.status_code != 200:
logging.error(f"❌ Failed to get job status: {status_resp.status_code} - {status_resp.text}") logging.error(
f"❌ Job status check failed: "
f"{status_resp.status_code} - {status_resp.text}"
)
return None return None
job_status = status_resp.json().get("jobStatus", {}) job_status = status_resp.json().get("jobStatus", {})
state = job_status.get("state") state = job_status.get("state")
if state == 'JOB_STATE_COMPLETED': if state == "JOB_STATE_COMPLETED":
logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") logging.info("✅ Processing complete! Waiting 10s for CDN propagation...")
time.sleep(10) time.sleep(10)
blob_dict = job_status.get("blob") blob_dict = job_status.get("blob")
blob_ref = models.BlobRef.from_dict(blob_dict) if not blob_dict:
logging.error("❌ No blob in completed job status.")
return None
blob_ref = models.BlobRef.from_dict(blob_dict)
return models.AppBskyEmbedVideo.Main( return models.AppBskyEmbedVideo.Main(
video=blob_ref, video=blob_ref,
alt=alt_text, alt=alt_text,
) )
elif state == 'JOB_STATE_FAILED':
logging.error("❌ Video processing failed on Bluesky's servers.") if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed on Bluesky's servers: {job_status}")
return None return None
logging.info(" ...still processing...") logging.info(" ...still processing...")
time.sleep(3) time.sleep(3)
except Exception as e: logging.error("❌ Video processing timed out after 5 minutes.")
logging.error(f"❌ Failed to upload/process video: {repr(e)}")
return None return None
except Exception as e:
logging.error(f"❌ video.bsky.app upload failed: {repr(e)}")
return None
# ============================================================
# Media upload — Video (smart dispatcher)
# ============================================================
def upload_video_smart(
client: Client,
video_path: str,
service_url: str,
alt_text: str = "",
) -> models.AppBskyEmbedVideo.Main | None:
"""
Smart dispatcher: pick PDS-direct or video.bsky.app based on the PDS.
* Self-hosted / federated PDSes (eurosky.social etc.) → direct PDS upload
(no service auth needed; PDS stores video itself).
* Official Bluesky network (bsky.social, *.bsky.network) → video.bsky.app
with lxm-scoped service auth, falling back to PDS-direct on failure.
"""
if is_official_bsky_pds(service_url):
logging.info(f"🌐 Detected official Bluesky PDS ({service_url}) — using video.bsky.app")
embed = upload_video_via_bsky_service(client, video_path, alt_text=alt_text)
if embed:
return embed
logging.warning(
"⚠️ video.bsky.app upload failed; falling back to direct PDS upload."
)
return upload_video_via_pds(client, video_path, alt_text=alt_text)
logging.info(f"🌍 Detected self-hosted/federated PDS ({service_url}) — using direct upload")
return upload_video_via_pds(client, video_path, alt_text=alt_text)
# ============================================================ # ============================================================
# Post # Post
# ============================================================ # ============================================================
@@ -442,7 +527,6 @@ def post_to_bsky(
image_path: str | None = None, image_path: str | None = None,
video_path: str | None = None, video_path: str | None = None,
alt_text: str = "", alt_text: str = "",
password: str = "",
service_url: str = "https://bsky.social", service_url: str = "https://bsky.social",
) -> bool: ) -> bool:
rich_text = make_rich(text) rich_text = make_rich(text)
@@ -451,14 +535,12 @@ def post_to_bsky(
# --- VIDEO POSTING --- # --- VIDEO POSTING ---
if video_path: if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}") logging.info(f"🎬 Preparing video upload: {video_path}")
with open(video_path, "rb") as f:
video_data = f.read()
video_embed = upload_video_and_wait( video_embed = upload_video_smart(
client, client,
video_data, video_path,
alt_text=alt_text,
service_url=service_url, service_url=service_url,
alt_text=alt_text,
) )
if not video_embed: if not video_embed:
@@ -539,7 +621,6 @@ def main():
image_path=args.image, image_path=args.image,
video_path=args.video, video_path=args.video,
alt_text=args.alt, alt_text=args.alt,
password=args.password,
service_url=args.service, service_url=args.service,
) )