Files
post2bsky/bsky_post.py
Guillem Hernandez Sola 22b32b0b0e pst_bsky new
2026-05-08 14:24:17 +02:00

691 lines
23 KiB
Python

#!/usr/bin/env python3
"""
bsky_post.py — Post text + optional image or video to Bluesky/federated PDS.
Usage examples:
python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 --username you --password app-pass --service https://eurosky.social
python3 bsky_post.py "Dijous!!!!" --image thursday.jpg --username you --password app-pass
python3 bsky_post.py "Bon dia!" --username you --password app-pass
python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca --username you --password app-pass
python3 bsky_post.py "Long video!" --video clip.mp4 --username you --password app-pass --allow-pds-video-fallback
"""
import argparse
import logging
import mimetypes
import os
import random
import re
import sys
import time
from dataclasses import dataclass
from urllib.parse import urlparse
import requests
from atproto import Client, client_utils, models
# ============================================================
# Config
# ============================================================
@dataclass(frozen=True)
class RetryConfig:
login_max_attempts: int = 5
login_base_delay_seconds: float = 10.0
login_max_delay_seconds: float = 600.0
login_jitter_seconds: float = 3.0
# ============================================================
# Logging
# ============================================================
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
# ============================================================
# Text builder
# ============================================================
def make_rich(content: str):
text_builder = client_utils.TextBuilder()
content = content.strip()
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("http://") or word.startswith("https://"):
text_builder.link(word, word)
elif word.startswith("#") and len(word) > 1:
tag_name = word[1:].rstrip(".,;:!?)'\"")
if tag_name:
text_builder.tag(word, tag_name)
else:
text_builder.text(word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# ============================================================
# Error helpers
# ============================================================
def is_rate_limited_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"429" in text
or "ratelimitexceeded" in text
or "too many requests" in text
or "rate limit" in text
)
def is_auth_error(error_obj) -> bool:
text = repr(error_obj).lower()
return (
"401" in text
or "403" in text
or "invalid identifier or password" in text
or "authenticationrequired" in text
or "invalidtoken" in text
)
def is_network_error(error_obj) -> bool:
text = repr(error_obj)
signals = [
"ConnectError",
"RemoteProtocolError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"503",
"502",
"504",
"ConnectionResetError",
]
return any(sig in text for sig in signals)
def is_transient_error(error_obj) -> bool:
error_text = repr(error_obj)
transient_signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(signal in error_text for signal in transient_signals)
def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float:
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After")
if x_after:
return min(max(float(x_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
try:
response = getattr(error_obj, "response", None)
headers = getattr(response, "headers", None) or {}
now_ts = int(time.time())
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After")
if x_after:
return min(max(float(x_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
text = repr(error_obj)
m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE)
if m:
return min(max(float(m.group(1)), 1.0), max_delay)
m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE)
if m:
return min(max(float(m.group(1)), 1.0), max_delay)
m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE)
if m:
now_ts = int(time.time())
wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
return default_delay
# ============================================================
# Login with backoff
# ============================================================
def login_with_backoff(
client: Client,
username: str,
password: str,
service_url: str,
max_attempts: int = 5,
base_delay: float = 10.0,
max_delay: float = 600.0,
jitter: float = 1.5,
) -> bool:
for attempt in range(1, max_attempts + 1):
try:
logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}")
client.login(username, password)
logging.info("✅ Login successful.")
return True
except Exception as e:
logging.exception("❌ Login exception")
if is_auth_error(e):
logging.error("❌ Bad credentials. Check handle/password.")
return False
if is_rate_limited_error(e):
if attempt < max_attempts:
wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay)
wait = wait + random.uniform(0, jitter)
logging.warning(
f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
logging.error("❌ Exhausted login retries due to rate limiting.")
return False
if is_network_error(e) or is_transient_error(e):
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(
f"⏳ Transient login error (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
logging.error("❌ Exhausted login retries after transient/network errors.")
return False
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(
f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). "
f"Retrying in {wait:.1f}s..."
)
time.sleep(wait)
continue
return False
# ============================================================
# Utility
# ============================================================
def detect_mime_type(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
if mime:
return mime
ext = os.path.splitext(path)[1].lower()
fallbacks = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".mp4": "video/mp4",
".mov": "video/quicktime",
".webm": "video/webm",
}
return fallbacks.get(ext, "application/octet-stream")
def wait_with_heartbeat(total_seconds: float, label: str = "indexing") -> None:
if total_seconds <= 0:
return
logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...")
remaining = total_seconds
while remaining > 0:
step = min(5.0, remaining)
time.sleep(step)
remaining -= step
if remaining > 0:
logging.info(f" ...still waiting ({remaining:.0f}s remaining)...")
logging.info("✅ Wait complete.")
def pds_did_from_service_url(service_url: str) -> str:
host = (urlparse(service_url).hostname or "").lower()
if not host:
raise ValueError(f"Invalid --service URL: {service_url}")
return f"did:web:{host}"
# ============================================================
# Media upload — Image
# ============================================================
def upload_image(
client: Client,
image_path: str,
alt_text: str = "",
) -> models.AppBskyEmbedImages.Image | None:
try:
mime = detect_mime_type(image_path)
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
response = client.upload_blob(data)
logging.info("✅ Image uploaded successfully.")
return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text)
except Exception as e:
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
# ============================================================
# Media upload — Video (PDS direct fallback only)
# ============================================================
def upload_video_via_pds(
client: Client,
video_path: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
) -> models.AppBskyEmbedVideo.Main | None:
"""
Direct upload to home PDS via upload_blob.
Kept only as optional fallback; playback can be unreliable in clients.
"""
try:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)")
response = client.upload_blob(video_bytes)
blob = response.blob
logging.warning("⚠️ [PDS-direct fallback] Blob uploaded. Waiting for indexing...")
wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing")
return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text)
except Exception as e:
logging.error(f"❌ PDS-direct video upload failed: {repr(e)}")
return None
# ============================================================
# Media upload — Video (video.bsky.app primary)
# ============================================================
def _extract_service_auth_token(upload_auth) -> str | None:
token = getattr(upload_auth, "token", None)
if token:
return token
if isinstance(upload_auth, dict):
return upload_auth.get("token")
return None
def upload_video_via_bsky_service(
client: Client,
video_path: str,
service_url: str,
alt_text: str = "",
) -> models.AppBskyEmbedVideo.Main | None:
"""
Upload via centralized video.bsky.app service.
IMPORTANT FIX:
getServiceAuth(aud=...) must use the user's PDS DID (e.g. did:web:eurosky.social),
not did:web:video.bsky.app.
"""
try:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)")
VIDEO_HOST = "https://video.bsky.app"
pds_did = pds_did_from_service_url(service_url)
# Robust for different atproto versions
try:
params = models.ComAtprotoServerGetServiceAuth.Params(
aud=pds_did, # <-- critical fix
lxm="app.bsky.video.uploadVideo",
exp=int(time.time()) + 60 * 30,
)
upload_auth = client.com.atproto.server.get_service_auth(params)
except Exception:
upload_auth = client.com.atproto.server.get_service_auth(
{
"aud": pds_did, # <-- critical fix
"lxm": "app.bsky.video.uploadVideo",
"exp": int(time.time()) + 60 * 30,
}
)
token = _extract_service_auth_token(upload_auth)
if not token:
logging.error("❌ Failed to extract service auth token.")
return None
user_did = client.me.did
upload_url = (
f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo"
f"?did={user_did}&name={int(time.time())}.mp4"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "video/mp4",
}
upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180)
if upload_resp.status_code != 200:
logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}")
return None
body = upload_resp.json()
job_id = body.get("jobId")
if not job_id:
logging.error(f"❌ No jobId returned from video service. Response: {body}")
return None
logging.info(f"⏳ Job {job_id} accepted — polling status...")
status_url = f"{VIDEO_HOST}/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600
while time.time() < deadline:
status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30)
if status_resp.status_code != 200:
logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}")
return None
status_json = status_resp.json()
job_status = status_json.get("jobStatus", {})
state = job_status.get("state")
if state == "JOB_STATE_COMPLETED":
blob_dict = job_status.get("blob")
if not blob_dict:
logging.error(f"❌ No blob in completed job status: {status_json}")
return None
wait_with_heartbeat(8, label="CDN propagation")
blob_ref = models.BlobRef.from_dict(blob_dict)
logging.info("✅ Video processed successfully.")
return models.AppBskyEmbedVideo.Main(video=blob_ref, alt=alt_text)
if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {job_status}")
return None
logging.info(f" ...still processing (state={state})...")
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
except Exception as e:
logging.error(f"❌ video.bsky.app upload failed: {repr(e)}")
return None
# ============================================================
# Video dispatcher
# ============================================================
def upload_video_smart(
client: Client,
video_path: str,
service_url: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
allow_pds_video_fallback: bool = False,
) -> models.AppBskyEmbedVideo.Main | None:
logging.info(
f"🌍 PDS ({service_url}). Trying video.bsky.app first for playback reliability."
)
embed = upload_video_via_bsky_service(
client=client,
video_path=video_path,
service_url=service_url,
alt_text=alt_text,
)
if embed:
return embed
if allow_pds_video_fallback:
logging.warning(
"⚠️ video.bsky.app failed; trying direct PDS fallback "
"(may be unplayable in some clients)."
)
return upload_video_via_pds(
client=client,
video_path=video_path,
alt_text=alt_text,
settle_delay_seconds=settle_delay_seconds,
)
logging.error(
"❌ video.bsky.app failed. Not posting unreliable direct-PDS video. "
"Use --allow-pds-video-fallback to override."
)
return None
# ============================================================
# Post
# ============================================================
def post_to_bsky(
client: Client,
text: str,
langs: list[str],
image_path: str | None = None,
video_path: str | None = None,
alt_text: str = "",
service_url: str = "https://bsky.social",
video_settle_delay: float = 30.0,
allow_pds_video_fallback: bool = False,
) -> bool:
post_text = text.strip()
# Allow empty text only if media exists
if not post_text and not image_path and not video_path:
logging.error("❌ Empty post text with no media is not allowed.")
return False
try:
embed_obj = None
if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}")
video_embed = upload_video_smart(
client=client,
video_path=video_path,
service_url=service_url,
alt_text=alt_text,
settle_delay_seconds=video_settle_delay,
allow_pds_video_fallback=allow_pds_video_fallback,
)
if not video_embed:
logging.error("❌ Aborting post: video upload/processing failed.")
return False
embed_obj = video_embed
elif image_path:
image = upload_image(client, image_path, alt_text=alt_text)
if not image:
logging.error("❌ Aborting post: image upload failed.")
return False
embed_obj = models.AppBskyEmbedImages.Main(images=[image])
# Build record explicitly (most reliable)
record = {
"$type": "app.bsky.feed.post",
"text": post_text, # <-- guaranteed string
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
}
if langs:
record["langs"] = langs
if embed_obj is not None:
# atproto models -> plain dict
if hasattr(embed_obj, "model_dump"):
record["embed"] = embed_obj.model_dump(by_alias=True, exclude_none=True)
elif hasattr(embed_obj, "dict"):
record["embed"] = embed_obj.dict(by_alias=True, exclude_none=True)
else:
record["embed"] = embed_obj
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
resp = client.com.atproto.repo.create_record(
models.ComAtprotoRepoCreateRecord.Data(
repo=client.me.did,
collection="app.bsky.feed.post",
record=record,
)
)
uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None)
logging.info(f"✅ Post published! URI: {uri}")
return True
except Exception as e:
logging.error(f"❌ Failed to send post: {repr(e)}")
return False
# ============================================================
# CLI
# ============================================================
def main():
setup_logging()
parser = argparse.ArgumentParser(
description="Post text + optional image or video to Bluesky/federated PDS."
)
parser.add_argument("text", help="Post text content")
parser.add_argument("--username", required=True, help="Bluesky handle or email")
parser.add_argument("--password", required=True, help="Bluesky app password")
parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL")
parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)")
parser.add_argument("--image", default=None, help="Path to image file to attach")
parser.add_argument("--video", default=None, help="Path to video file to attach")
parser.add_argument("--alt", default="", help="Alt text for media")
parser.add_argument(
"--video-settle-delay",
type=float,
default=30.0,
help="Seconds to wait after direct-PDS fallback upload before posting (default: 30).",
)
parser.add_argument(
"--allow-pds-video-fallback",
action="store_true",
help="Allow direct-PDS fallback if video.bsky.app fails (less reliable playback).",
)
args = parser.parse_args()
if args.image and args.video:
logging.error("❌ Use either --image or --video, not both.")
sys.exit(1)
client = Client(base_url=args.service)
success = login_with_backoff(
client=client,
username=args.username,
password=args.password,
service_url=args.service,
max_attempts=RetryConfig.login_max_attempts,
base_delay=RetryConfig.login_base_delay_seconds,
max_delay=RetryConfig.login_max_delay_seconds,
jitter=RetryConfig.login_jitter_seconds,
)
if not success:
sys.exit(1)
langs = [l.strip() for l in args.lang.split(",") if l.strip()]
post_success = post_to_bsky(
client=client,
text=args.text,
langs=langs,
image_path=args.image,
video_path=args.video,
alt_text=args.alt,
service_url=args.service,
video_settle_delay=args.video_settle_delay,
allow_pds_video_fallback=args.allow_pds_video_fallback,
)
if not post_success:
sys.exit(1)
if __name__ == "__main__":
main()