diff --git a/bsky_post.py b/bsky_post.py index 570f471..d8e7f34 100644 --- a/bsky_post.py +++ b/bsky_post.py @@ -19,6 +19,7 @@ import random import re import base64 import json +import requests from dataclasses import dataclass from atproto import Client, client_utils, models @@ -325,76 +326,66 @@ def upload_video_and_wait( video_data: bytes, alt_text: str = "" ) -> models.AppBskyEmbedVideo.Main | None: - - # --- Helper classes to mock the SDK's internal Session object --- - class MockPayload: - def __init__(self, exp): - self.exp = exp - - class VideoSession: - def __init__(self, token, handle, did): - self.access_jwt = token - self.refresh_jwt = "" - self.handle = handle - self.did = did - - # Default to 1 hour from now if decoding fails - exp_time = int(time.time()) + 3600 - - try: - parts = token.split('.') - if len(parts) == 3: - payload = parts[1] - # Pad base64 string if necessary - payload += '=' * (-len(payload) % 4) - decoded = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8')) - if 'exp' in decoded: - exp_time = decoded['exp'] - except Exception: - pass - - # Assign as an object so the SDK can call .exp - self.access_jwt_payload = MockPayload(exp_time) - try: logging.info("🎬 Requesting Service Auth for Video Upload...") # 1. Get a Service Auth token from your home PDS VIDEO_SERVICE_DID = "did:web:video.bsky.app" auth_response = client.com.atproto.server.get_service_auth({ - 'aud': VIDEO_SERVICE_DID + 'aud': VIDEO_SERVICE_DID, + 'exp': int(time.time()) + 60 * 30, # Token valid for 30 mins }) service_auth_token = auth_response.token logging.info("🎬 Uploading video to Bluesky Video Service...") - # 2. Create a temporary client pointing to the dedicated video service - video_client = Client(base_url="https://video.bsky.app") + # 2. Upload the video using standard HTTP requests + upload_url = "https://video.bsky.app/xrpc/app.bsky.video.uploadVideo" + headers = { + "Authorization": f"Bearer {service_auth_token}", + "Content-Type": "video/mp4" # Assuming mp4, adjust if needed + } - # 3. Inject the Service Auth token using our mock session - video_client._session = VideoSession( - token=service_auth_token, - handle=client.me.handle, - did=client.me.did - ) - - # 4. Upload the video to the official service - response = video_client.app.bsky.video.upload_video(video_data) - job_id = response.job_id + # The endpoint expects the raw bytes in the body + upload_resp = requests.post(upload_url, headers=headers, data=video_data) + if upload_resp.status_code != 200: + logging.error(f"❌ Video upload failed: {upload_resp.status_code} - {upload_resp.text}") + return None + + job_id = upload_resp.json().get("jobId") + if not job_id: + logging.error("❌ No Job ID returned from video service.") + return None + logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...") - # 5. Poll the job status using the video client + # 3. Poll the job status + status_url = "https://video.bsky.app/xrpc/app.bsky.video.getJobStatus" + params = {"jobId": job_id} + while True: - status_resp = video_client.app.bsky.video.get_job_status({'job_id': job_id}) - state = status_resp.job_status.state + status_resp = requests.get(status_url, headers=headers, params=params) + + if status_resp.status_code != 200: + logging.error(f"❌ Failed to get job status: {status_resp.status_code} - {status_resp.text}") + return None + + job_status = status_resp.json().get("jobStatus", {}) + state = job_status.get("state") if state == 'JOB_STATE_COMPLETED': logging.info("✅ Processing complete! Waiting 10s for CDN propagation...") time.sleep(10) + # Construct the blob object exactly as the SDK expects it + blob_dict = job_status.get("blob") + + # Convert the raw dictionary into the SDK's BlobRef model + blob_ref = models.BlobRef.from_dict(blob_dict) + return models.AppBskyEmbedVideo.Main( - video=status_resp.job_status.blob, + video=blob_ref, alt=alt_text ) elif state == 'JOB_STATE_FAILED':