This commit is contained in:
Guillem Hernandez Sola
2026-05-08 10:04:27 +02:00
parent 7deeec0a41
commit 233697cca1

View File

@@ -17,6 +17,8 @@ import sys
import time import time
import random import random
import re import re
import base64
import json
from dataclasses import dataclass from dataclasses import dataclass
from atproto import Client, client_utils, models from atproto import Client, client_utils, models
@@ -318,17 +320,36 @@ def upload_image(
logging.error(f"❌ Failed to upload image: {repr(e)}") logging.error(f"❌ Failed to upload image: {repr(e)}")
return None return None
def upload_video_and_wait( def upload_video_and_wait(
client: Client, client: Client,
video_data: bytes, video_data: bytes,
alt_text: str = "" alt_text: str = ""
) -> models.AppBskyEmbedVideo.Main | None: ) -> models.AppBskyEmbedVideo.Main | None:
# --- Helper class to mock the SDK's internal Session object ---
class VideoSession:
def __init__(self, token, handle, did):
self.access_jwt = token
self.refresh_jwt = ""
self.handle = handle
self.did = did
# The SDK needs this payload to check token expiration
self.access_jwt_payload = {"exp": int(time.time()) + 3600}
try:
parts = token.split('.')
if len(parts) == 3:
payload = parts[1]
# Pad base64 string if necessary
payload += '=' * (-len(payload) % 4)
self.access_jwt_payload = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8'))
except Exception:
pass
try: try:
logging.info("🎬 Requesting Service Auth for Video Upload...") logging.info("🎬 Requesting Service Auth for Video Upload...")
# 1. Get a Service Auth token from your home PDS (eurosky.social) # 1. Get a Service Auth token from your home PDS
# The audience (aud) MUST be the DID of the Bluesky Video Service
VIDEO_SERVICE_DID = "did:web:video.bsky.app" VIDEO_SERVICE_DID = "did:web:video.bsky.app"
auth_response = client.com.atproto.server.get_service_auth({ auth_response = client.com.atproto.server.get_service_auth({
'aud': VIDEO_SERVICE_DID 'aud': VIDEO_SERVICE_DID
@@ -340,14 +361,11 @@ def upload_video_and_wait(
# 2. Create a temporary client pointing to the dedicated video service # 2. Create a temporary client pointing to the dedicated video service
video_client = Client(base_url="https://video.bsky.app") video_client = Client(base_url="https://video.bsky.app")
# 3. Inject the Service Auth token into the headers # 3. Inject the Service Auth token using our mock session
# We don't call .login() on this client; we just set the authorization header manually. video_client._session = VideoSession(
video_client._session = models.ComAtprotoServerCreateSession.Response( token=service_auth_token,
access_jwt=service_auth_token, handle=client.me.handle,
refresh_jwt="", did=client.me.did
handle=client.me.handle,
did=client.me.did,
active=True
) )
# 4. Upload the video to the official service # 4. Upload the video to the official service