This commit is contained in:
Guillem Hernandez Sola
2026-05-08 14:53:31 +02:00
parent 022af846a4
commit 1e49bd2c09

View File

@@ -14,6 +14,8 @@ import mimetypes
import os import os
import random import random
import re import re
import secrets
import string
import sys import sys
import time import time
from dataclasses import dataclass from dataclasses import dataclass
@@ -236,6 +238,11 @@ def pds_did_from_service_url(service_url: str) -> str:
return f"did:web:{host}" return f"did:web:{host}"
def random_video_name(ext: str = ".mp4") -> str:
token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12))
return f"{int(time.time())}_{token}{ext}"
def model_to_dict(obj): def model_to_dict(obj):
if obj is None: if obj is None:
return None return None
@@ -323,6 +330,42 @@ def _extract_service_auth_token(upload_auth) -> str | None:
return None return None
def _poll_video_job(video_host: str, job_id: str) -> models.AppBskyEmbedVideo.Main | None:
status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600 # up to 10 minutes
while time.time() < deadline:
status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30)
if status_resp.status_code != 200:
logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}")
return None
status_json = status_resp.json()
job_status = status_json.get("jobStatus", {})
state = job_status.get("state")
if state == "JOB_STATE_COMPLETED":
blob_dict = job_status.get("blob")
if not blob_dict:
logging.error(f"❌ No blob in completed job status: {status_json}")
return None
wait_with_heartbeat(8, label="CDN propagation")
blob_ref = models.BlobRef.from_dict(blob_dict)
logging.info("✅ Video processed successfully.")
return models.AppBskyEmbedVideo.Main(video=blob_ref, alt="")
if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {job_status}")
return None
logging.info(f" ...still processing (state={state})...")
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
def upload_video_via_bsky_service( def upload_video_via_bsky_service(
client: Client, client: Client,
video_path: str, video_path: str,
@@ -335,6 +378,7 @@ def upload_video_via_bsky_service(
Critical compatibility fixes: Critical compatibility fixes:
- aud must be user's PDS DID (e.g. did:web:eurosky.social) - aud must be user's PDS DID (e.g. did:web:eurosky.social)
- lxm must be com.atproto.repo.uploadBlob - lxm must be com.atproto.repo.uploadBlob
- handle 409 already_exists by reusing returned jobId
""" """
try: try:
if not os.path.exists(video_path): if not os.path.exists(video_path):
@@ -350,11 +394,10 @@ def upload_video_via_bsky_service(
VIDEO_HOST = "https://video.bsky.app" VIDEO_HOST = "https://video.bsky.app"
pds_did = pds_did_from_service_url(service_url) pds_did = pds_did_from_service_url(service_url)
# Some atproto versions prefer typed params, others accept dict
try: try:
params = models.ComAtprotoServerGetServiceAuth.Params( params = models.ComAtprotoServerGetServiceAuth.Params(
aud=pds_did, aud=pds_did,
lxm="com.atproto.repo.uploadBlob", # <-- critical fix lxm="com.atproto.repo.uploadBlob",
exp=int(time.time()) + 60 * 30, exp=int(time.time()) + 60 * 30,
) )
upload_auth = client.com.atproto.server.get_service_auth(params) upload_auth = client.com.atproto.server.get_service_auth(params)
@@ -362,7 +405,7 @@ def upload_video_via_bsky_service(
upload_auth = client.com.atproto.server.get_service_auth( upload_auth = client.com.atproto.server.get_service_auth(
{ {
"aud": pds_did, "aud": pds_did,
"lxm": "com.atproto.repo.uploadBlob", # <-- critical fix "lxm": "com.atproto.repo.uploadBlob",
"exp": int(time.time()) + 60 * 30, "exp": int(time.time()) + 60 * 30,
} }
) )
@@ -373,9 +416,12 @@ def upload_video_via_bsky_service(
return None return None
user_did = client.me.did user_did = client.me.did
upload_name = random_video_name(".mp4")
logging.info(f"🎞️ Upload name: {upload_name}")
upload_url = ( upload_url = (
f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo"
f"?did={user_did}&name={int(time.time())}.mp4" f"?did={user_did}&name={upload_name}"
) )
headers = { headers = {
"Authorization": f"Bearer {token}", "Authorization": f"Bearer {token}",
@@ -383,50 +429,32 @@ def upload_video_via_bsky_service(
} }
upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180)
if upload_resp.status_code != 200:
if upload_resp.status_code not in (200, 409):
logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}")
return None return None
body = upload_resp.json() body = upload_resp.json()
if upload_resp.status_code == 409:
if body.get("error") == "already_exists" and body.get("jobId"):
logging.info(" Video already processed on video.bsky.app. Reusing existing job.")
else:
logging.error(f"❌ video.bsky.app returned 409 without reusable jobId: {body}")
return None
job_id = body.get("jobId") job_id = body.get("jobId")
if not job_id: if not job_id:
logging.error(f"❌ No jobId returned from video service. Response: {body}") logging.error(f"❌ No jobId returned from video service. Response: {body}")
return None return None
logging.info(f"⏳ Job {job_id} accepted — polling status...") logging.info(f"⏳ Job {job_id} accepted — polling status...")
status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus" embed = _poll_video_job(VIDEO_HOST, job_id)
deadline = time.time() + 600 if not embed:
return None
while time.time() < deadline: # inject alt text after job result
status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) return models.AppBskyEmbedVideo.Main(video=embed.video, alt=alt_text)
if status_resp.status_code != 200:
logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}")
return None
status_json = status_resp.json()
job_status = status_json.get("jobStatus", {})
state = job_status.get("state")
if state == "JOB_STATE_COMPLETED":
blob_dict = job_status.get("blob")
if not blob_dict:
logging.error(f"❌ No blob in completed job status: {status_json}")
return None
wait_with_heartbeat(8, label="CDN propagation")
blob_ref = models.BlobRef.from_dict(blob_dict)
logging.info("✅ Video processed successfully.")
return models.AppBskyEmbedVideo.Main(video=blob_ref, alt=alt_text)
if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {job_status}")
return None
logging.info(f" ...still processing (state={state})...")
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
except Exception as e: except Exception as e:
logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") logging.error(f"❌ video.bsky.app upload failed: {repr(e)}")
@@ -513,7 +541,7 @@ def post_to_bsky(
record = { record = {
"$type": "app.bsky.feed.post", "$type": "app.bsky.feed.post",
"text": post_text, # guaranteed plain string "text": post_text,
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
} }
@@ -525,7 +553,6 @@ def post_to_bsky(
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
# typed first, dict fallback for compatibility
try: try:
resp = client.com.atproto.repo.create_record( resp = client.com.atproto.repo.create_record(
models.ComAtprotoRepoCreateRecord.Data( models.ComAtprotoRepoCreateRecord.Data(