Files
post2bsky/bsky_post.py
Guillem Hernandez Sola 2d868d0ad7 Opus Fixes
2026-05-08 11:17:23 +02:00

552 lines
18 KiB
Python

#!/usr/bin/env python3
"""
bsky_post.py — Post text + optional image or video to a Bluesky instance.
Usage examples:
python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4
python3 bsky_post.py "Dijous!!!!" --image thursday.jpg
python3 bsky_post.py "Bon dia!"
python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca
"""
import argparse
import logging
import mimetypes
import os
import sys
import time
import random
import re
import requests
from dataclasses import dataclass
from urllib.parse import urlparse
from atproto import Client, client_utils, models
# ============================================================
# Config
# ============================================================
@dataclass(frozen=True)
class RetryConfig:
login_max_attempts: int = 5
login_base_delay_seconds: float = 10.0
login_max_delay_seconds: float = 600.0
login_jitter_seconds: float = 3
# ============================================================
# Logging
# ============================================================
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
# ============================================================
# Text builder
# ============================================================
def make_rich(content: str):
text_builder = client_utils.TextBuilder()
content = content.strip()
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("http://") or word.startswith("https://"):
text_builder.link(word, word)
elif word.startswith("#") and len(word) > 1:
tag_name = word[1:].rstrip(".,;:!?)'\"")
if tag_name:
text_builder.tag(word, tag_name)
else:
text_builder.text(word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# ============================================================
# Error helpers
# ============================================================
def is_rate_limited_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"429" in text
or "ratelimitexceeded" in text
or "too many requests" in text
or "rate limit" in text
)
def is_auth_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"401" in text
or "403" in text
or "invalid identifier or password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
)
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"ConnectError",
"RemoteProtocolError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"503",
"502",
"504",
"ConnectionResetError",
]
return any(sig in text for sig in signals)
def is_transient_error(error_obj) -> bool:
error_text = repr(error_obj)
transient_signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(signal in error_text for signal in transient_signals)
def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float:
"""
Parse common rate-limit headers and return a bounded wait time in seconds.
"""
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After")
if x_after:
return min(max(float(x_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
try:
response = getattr(error_obj, "response", None)
headers = getattr(response, "headers", None) or {}
now_ts = int(time.time())
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After")
if x_after:
return min(max(float(x_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
text = repr(error_obj)
m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE)
if m:
return min(max(float(m.group(1)), 1.0), max_delay)
m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE)
if m:
return min(max(float(m.group(1)), 1.0), max_delay)
m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE)
if m:
now_ts = int(time.time())
wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
return default_delay
# ============================================================
# Login with backoff
# ============================================================
def login_with_backoff(
client: Client,
username: str,
password: str,
service_url: str,
max_attempts: int = 5,
base_delay: float = 10.0,
max_delay: float = 600.0,
jitter: float = 1.5,
) -> bool:
for attempt in range(1, max_attempts + 1):
try:
logging.info(
f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}"
)
client.login(username, password)
logging.info("✅ Login successful.")
return True
except Exception as e:
logging.exception("❌ Login exception")
if is_auth_error(e):
logging.error("❌ Bad credentials. Check handle/password.")
return False
if is_rate_limited_error(e):
if attempt < max_attempts:
wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay)
wait = wait + random.uniform(0, jitter)
logging.warning(
f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
logging.error("❌ Exhausted login retries due to rate limiting.")
return False
if is_network_error(e) or is_transient_error(e):
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(
f"⏳ Transient login error (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
logging.error("❌ Exhausted login retries after transient/network errors.")
return False
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(
f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
return False
# ============================================================
# URL / DID helpers
# ============================================================
def normalize_pds_base(service_url: str) -> str:
"""
Strip trailing slashes and any trailing /xrpc segment so callers can safely
compose '<base>/xrpc/<method>' URLs.
"""
base = service_url.rstrip("/")
if base.endswith("/xrpc"):
base = base[: -len("/xrpc")]
return base
def pds_did_from_base(pds_base: str) -> str:
"""
Derive the did:web for a PDS from its base URL hostname.
Works for typical atproto PDS deployments (e.g. https://eurosky.social → did:web:eurosky.social).
"""
host = urlparse(pds_base).netloc
return f"did:web:{host}"
# ============================================================
# Media upload — Image
# ============================================================
def detect_mime_type(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
if mime:
return mime
ext = os.path.splitext(path)[1].lower()
fallbacks = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".webp": "image/webp",
".mp4": "video/mp4", ".mov": "video/quicktime",
".webm": "video/webm",
}
return fallbacks.get(ext, "application/octet-stream")
def upload_image(
client: Client,
image_path: str,
alt_text: str = "",
) -> models.AppBskyEmbedImages.Image | None:
try:
mime = detect_mime_type(image_path)
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
response = client.upload_blob(data)
logging.info("✅ Image uploaded successfully.")
return models.AppBskyEmbedImages.Image(
image=response.blob,
alt=alt_text,
)
except Exception as e:
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
# ============================================================
# Media upload — Video
# ============================================================
def upload_video_and_wait(
client: Client,
video_data: bytes,
alt_text: str = "",
service_url: str = "https://bsky.social",
) -> models.AppBskyEmbedVideo.Main | None:
"""
Upload a video to the user's PDS video service and wait for processing.
Notes on portability:
* Self-hosted/federated PDSes (e.g. eurosky.social) typically host the
video XRPC endpoints on the PDS itself, with `aud = did:web:<pds-host>`.
* The official Bluesky network uses a separate host (video.bsky.app),
but this implementation targets the PDS-hosted variant which works
for both eurosky-style PDSes and any conforming atproto deployment.
"""
try:
pds_base = normalize_pds_base(service_url)
service_did = pds_did_from_base(pds_base)
logging.info(f"🎬 Using video service at {pds_base} (aud={service_did})")
# --- Token #1: bound to uploadVideo ---
logging.info("🎬 Requesting Service Auth for Video Upload...")
upload_auth = client.com.atproto.server.get_service_auth({
'aud': service_did,
'lxm': 'app.bsky.video.uploadVideo',
'exp': int(time.time()) + 60 * 30, # 30 min (allowed because lxm is set)
})
upload_headers = {
"Authorization": f"Bearer {upload_auth.token}",
"Content-Type": "video/mp4",
}
upload_url = f"{pds_base}/xrpc/app.bsky.video.uploadVideo"
logging.info(f"🎬 Uploading video to {upload_url} ...")
upload_resp = requests.post(upload_url, headers=upload_headers, data=video_data)
if upload_resp.status_code != 200:
logging.error(f"❌ Video upload failed: {upload_resp.status_code} - {upload_resp.text}")
return None
job_id = upload_resp.json().get("jobId")
if not job_id:
logging.error("❌ No Job ID returned from video service.")
return None
logging.info(f"⏳ Video uploaded! Job ID: {job_id}. Waiting for processing...")
# --- Token #2: bound to getJobStatus ---
status_auth = client.com.atproto.server.get_service_auth({
'aud': service_did,
'lxm': 'app.bsky.video.getJobStatus',
'exp': int(time.time()) + 60 * 30,
})
status_headers = {"Authorization": f"Bearer {status_auth.token}"}
status_url = f"{pds_base}/xrpc/app.bsky.video.getJobStatus"
params = {"jobId": job_id}
while True:
status_resp = requests.get(status_url, headers=status_headers, params=params)
if status_resp.status_code != 200:
logging.error(f"❌ Failed to get job status: {status_resp.status_code} - {status_resp.text}")
return None
job_status = status_resp.json().get("jobStatus", {})
state = job_status.get("state")
if state == 'JOB_STATE_COMPLETED':
logging.info("✅ Processing complete! Waiting 10s for CDN propagation...")
time.sleep(10)
blob_dict = job_status.get("blob")
blob_ref = models.BlobRef.from_dict(blob_dict)
return models.AppBskyEmbedVideo.Main(
video=blob_ref,
alt=alt_text,
)
elif state == 'JOB_STATE_FAILED':
logging.error("❌ Video processing failed on Bluesky's servers.")
return None
logging.info(" ...still processing...")
time.sleep(3)
except Exception as e:
logging.error(f"❌ Failed to upload/process video: {repr(e)}")
return None
# ============================================================
# Post
# ============================================================
def post_to_bsky(
client: Client,
text: str,
langs: list[str],
image_path: str | None = None,
video_path: str | None = None,
alt_text: str = "",
password: str = "",
service_url: str = "https://bsky.social",
) -> bool:
rich_text = make_rich(text)
try:
# --- VIDEO POSTING ---
if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}")
with open(video_path, "rb") as f:
video_data = f.read()
video_embed = upload_video_and_wait(
client,
video_data,
alt_text=alt_text,
service_url=service_url,
)
if not video_embed:
logging.error("❌ Aborting post: video upload/processing failed.")
return False
logging.info("🚀 Sending video post...")
result = client.send_post(
text=rich_text,
embed=video_embed,
langs=langs,
)
# --- IMAGE POSTING ---
elif image_path:
image = upload_image(client, image_path, alt_text=alt_text)
if not image:
logging.error("❌ Aborting post: image upload failed.")
return False
embed = models.AppBskyEmbedImages.Main(images=[image])
logging.info("🚀 Sending image post...")
result = client.send_post(text=rich_text, embed=embed, langs=langs)
# --- TEXT ONLY POSTING ---
else:
logging.info("🚀 Sending text post...")
result = client.send_post(text=rich_text, langs=langs)
uri = getattr(result, "uri", None)
logging.info(f"✅ Post published! URI: {uri}")
return True
except Exception as e:
logging.error(f"❌ Failed to send post: {repr(e)}")
return False
# ============================================================
# CLI
# ============================================================
def main():
setup_logging()
parser = argparse.ArgumentParser(
description="Post text + optional image or video to a Bluesky instance."
)
parser.add_argument("text", help="Post text content")
parser.add_argument("--username", required=True, help="Bluesky handle or email")
parser.add_argument("--password", required=True, help="Bluesky app password")
parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL")
parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)")
parser.add_argument("--image", default=None, help="Path to image file to attach")
parser.add_argument("--video", default=None, help="Path to video file to attach")
parser.add_argument("--alt", default="", help="Alt text for media")
args = parser.parse_args()
client = Client(base_url=args.service)
success = login_with_backoff(
client,
args.username,
args.password,
args.service,
max_attempts=RetryConfig.login_max_attempts,
base_delay=RetryConfig.login_base_delay_seconds,
max_delay=RetryConfig.login_max_delay_seconds,
jitter=RetryConfig.login_jitter_seconds,
)
if not success:
sys.exit(1)
langs = [l.strip() for l in args.lang.split(",") if l.strip()]
post_success = post_to_bsky(
client,
text=args.text,
langs=langs,
image_path=args.image,
video_path=args.video,
alt_text=args.alt,
password=args.password,
service_url=args.service,
)
if not post_success:
sys.exit(1)
if __name__ == "__main__":
main()