From 233697cca19778b910939458f7d86e03432b234c Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Fri, 8 May 2026 10:04:27 +0200 Subject: [PATCH] fe --- bsky_post.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/bsky_post.py b/bsky_post.py index ceaf0f0..68097da 100644 --- a/bsky_post.py +++ b/bsky_post.py @@ -17,6 +17,8 @@ import sys import time import random import re +import base64 +import json from dataclasses import dataclass from atproto import Client, client_utils, models @@ -318,17 +320,36 @@ def upload_image( logging.error(f"❌ Failed to upload image: {repr(e)}") return None - def upload_video_and_wait( client: Client, video_data: bytes, alt_text: str = "" ) -> models.AppBskyEmbedVideo.Main | None: + + # --- Helper class to mock the SDK's internal Session object --- + class VideoSession: + def __init__(self, token, handle, did): + self.access_jwt = token + self.refresh_jwt = "" + self.handle = handle + self.did = did + + # The SDK needs this payload to check token expiration + self.access_jwt_payload = {"exp": int(time.time()) + 3600} + try: + parts = token.split('.') + if len(parts) == 3: + payload = parts[1] + # Pad base64 string if necessary + payload += '=' * (-len(payload) % 4) + self.access_jwt_payload = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8')) + except Exception: + pass + try: logging.info("🎬 Requesting Service Auth for Video Upload...") - # 1. Get a Service Auth token from your home PDS (eurosky.social) - # The audience (aud) MUST be the DID of the Bluesky Video Service + # 1. Get a Service Auth token from your home PDS VIDEO_SERVICE_DID = "did:web:video.bsky.app" auth_response = client.com.atproto.server.get_service_auth({ 'aud': VIDEO_SERVICE_DID @@ -340,14 +361,11 @@ def upload_video_and_wait( # 2. Create a temporary client pointing to the dedicated video service video_client = Client(base_url="https://video.bsky.app") - # 3. Inject the Service Auth token into the headers - # We don't call .login() on this client; we just set the authorization header manually. - video_client._session = models.ComAtprotoServerCreateSession.Response( - access_jwt=service_auth_token, - refresh_jwt="", - handle=client.me.handle, - did=client.me.did, - active=True + # 3. Inject the Service Auth token using our mock session + video_client._session = VideoSession( + token=service_auth_token, + handle=client.me.handle, + did=client.me.did ) # 4. Upload the video to the official service