This commit is contained in:
Guillem Hernandez Sola
2026-05-08 10:46:33 +02:00
parent c5338de043
commit 1991d98fa4

View File

@@ -19,6 +19,7 @@ import random
import re import re
import base64 import base64
import json import json
import requests
from dataclasses import dataclass from dataclasses import dataclass
from atproto import Client, client_utils, models from atproto import Client, client_utils, models
@@ -325,76 +326,66 @@ def upload_video_and_wait(
video_data: bytes, video_data: bytes,
alt_text: str = "" alt_text: str = ""
) -> models.AppBskyEmbedVideo.Main | None: ) -> models.AppBskyEmbedVideo.Main | None:
# --- Helper classes to mock the SDK's internal Session object ---
class MockPayload:
def __init__(self, exp):
self.exp = exp
class VideoSession:
def __init__(self, token, handle, did):
self.access_jwt = token
self.refresh_jwt = ""
self.handle = handle
self.did = did
# Default to 1 hour from now if decoding fails
exp_time = int(time.time()) + 3600
try:
parts = token.split('.')
if len(parts) == 3:
payload = parts[1]
# Pad base64 string if necessary
payload += '=' * (-len(payload) % 4)
decoded = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8'))
if 'exp' in decoded:
exp_time = decoded['exp']
except Exception:
pass
# Assign as an object so the SDK can call .exp
self.access_jwt_payload = MockPayload(exp_time)
try: try:
logging.info("🎬 Requesting Service Auth for Video Upload...") logging.info("🎬 Requesting Service Auth for Video Upload...")
# 1. Get a Service Auth token from your home PDS # 1. Get a Service Auth token from your home PDS
VIDEO_SERVICE_DID = "did:web:video.bsky.app" VIDEO_SERVICE_DID = "did:web:video.bsky.app"
auth_response = client.com.atproto.server.get_service_auth({ auth_response = client.com.atproto.server.get_service_auth({
'aud': VIDEO_SERVICE_DID 'aud': VIDEO_SERVICE_DID,
'exp': int(time.time()) + 60 * 30, # Token valid for 30 mins
}) })
service_auth_token = auth_response.token service_auth_token = auth_response.token
logging.info("🎬 Uploading video to Bluesky Video Service...") logging.info("🎬 Uploading video to Bluesky Video Service...")
# 2. Create a temporary client pointing to the dedicated video service # 2. Upload the video using standard HTTP requests
video_client = Client(base_url="https://video.bsky.app") upload_url = "https://video.bsky.app/xrpc/app.bsky.video.uploadVideo"
headers = {
"Authorization": f"Bearer {service_auth_token}",
"Content-Type": "video/mp4" # Assuming mp4, adjust if needed
}
# 3. Inject the Service Auth token using our mock session # The endpoint expects the raw bytes in the body
video_client._session = VideoSession( upload_resp = requests.post(upload_url, headers=headers, data=video_data)
token=service_auth_token,
handle=client.me.handle,
did=client.me.did
)
# 4. Upload the video to the official service if upload_resp.status_code != 200:
response = video_client.app.bsky.video.upload_video(video_data) logging.error(f"❌ Video upload failed: {upload_resp.status_code} - {upload_resp.text}")
job_id = response.job_id return None
job_id = upload_resp.json().get("jobId")
if not job_id:
logging.error("❌ No Job ID returned from video service.")
return None
logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...") logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...")
# 5. Poll the job status using the video client # 3. Poll the job status
status_url = "https://video.bsky.app/xrpc/app.bsky.video.getJobStatus"
params = {"jobId": job_id}
while True: while True:
status_resp = video_client.app.bsky.video.get_job_status({'job_id': job_id}) status_resp = requests.get(status_url, headers=headers, params=params)
state = status_resp.job_status.state
if status_resp.status_code != 200:
logging.error(f"❌ Failed to get job status: {status_resp.status_code} - {status_resp.text}")
return None
job_status = status_resp.json().get("jobStatus", {})
state = job_status.get("state")
if state == 'JOB_STATE_COMPLETED': if state == 'JOB_STATE_COMPLETED':
logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") logging.info("✅ Processing complete! Waiting 10s for CDN propagation...")
time.sleep(10) time.sleep(10)
# Construct the blob object exactly as the SDK expects it
blob_dict = job_status.get("blob")
# Convert the raw dictionary into the SDK's BlobRef model
blob_ref = models.BlobRef.from_dict(blob_dict)
return models.AppBskyEmbedVideo.Main( return models.AppBskyEmbedVideo.Main(
video=status_resp.job_status.blob, video=blob_ref,
alt=alt_text alt=alt_text
) )
elif state == 'JOB_STATE_FAILED': elif state == 'JOB_STATE_FAILED':