Files
post2bsky/bsky_post.py
2026-05-08 19:22:06 +00:00

646 lines
21 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Post text + optional image/video to Bluesky/federated PDS.
Reliability features:
- Raw XRPC via requests (no atproto SDK serialization pitfalls).
- Hardened HTTP transport with retry adapter + longer read timeouts.
- Login fallback hosts via --auth-hosts (comma-separated).
- Video upload through https://video.bsky.app with proper service auth.
- 409 already_exists support (reuses jobId).
- Optional direct PDS fallback for video.
- ffmpeg compression enabled by default.
"""
import argparse
import json
import logging
import mimetypes
import os
import random
import secrets
import shutil
import string
import subprocess
import sys
import tempfile
import time
from urllib.parse import urlparse
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
for noisy in ("urllib3",):
logging.getLogger(noisy).setLevel(logging.WARNING)
# -----------------------------------------------------------------------------
# HTTP session with retries
# -----------------------------------------------------------------------------
def build_http_session() -> requests.Session:
s = requests.Session()
retry = Retry(
total=4,
connect=4,
read=4,
backoff_factor=0.8,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=frozenset(["GET", "POST"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
s.mount("https://", adapter)
s.mount("http://", adapter)
s.headers.update({"User-Agent": "post2bsky/1.0"})
return s
# -----------------------------------------------------------------------------
# Status helpers
# -----------------------------------------------------------------------------
def is_auth_error_status(status: int) -> bool:
return status in (400, 401, 403)
def is_rate_limited_status(status: int) -> bool:
return status == 429
def is_transient_status(status: int) -> bool:
return status in (408, 425, 429, 500, 502, 503, 504)
# -----------------------------------------------------------------------------
# Raw XRPC helpers
# -----------------------------------------------------------------------------
def xrpc_get(
http: requests.Session,
pds_url: str,
method: str,
params: dict,
access_jwt: str | None = None,
timeout=(10, 60),
) -> tuple[int, dict | str]:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {}
if access_jwt:
headers["Authorization"] = f"Bearer {access_jwt}"
r = http.get(url, headers=headers, params=params, timeout=timeout)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, r.text
def xrpc_post_json(
http: requests.Session,
pds_url: str,
method: str,
body: dict,
access_jwt: str | None = None,
timeout=(10, 90),
) -> tuple[int, dict | str]:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {"Content-Type": "application/json"}
if access_jwt:
headers["Authorization"] = f"Bearer {access_jwt}"
r = http.post(url, headers=headers, data=json.dumps(body), timeout=timeout)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, r.text
def xrpc_post_bytes(
http: requests.Session,
pds_url: str,
method: str,
data: bytes,
content_type: str,
access_jwt: str | None = None,
timeout=(20, 900),
) -> tuple[int, dict | str]:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {"Content-Type": content_type}
if access_jwt:
headers["Authorization"] = f"Bearer {access_jwt}"
r = http.post(url, headers=headers, data=data, timeout=timeout)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, r.text
# -----------------------------------------------------------------------------
# Login with fallback hosts
# -----------------------------------------------------------------------------
def login_on_host(
http: requests.Session,
host_url: str,
identifier: str,
password: str,
max_attempts: int = 5,
base_delay: float = 6.0,
max_delay: float = 60.0,
) -> dict | None:
for attempt in range(1, max_attempts + 1):
try:
logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{host_url} as {identifier}")
status, body = xrpc_post_json(
http=http,
pds_url=host_url,
method="com.atproto.server.createSession",
body={"identifier": identifier, "password": password},
timeout=(10, 75),
)
if status == 200 and isinstance(body, dict) and body.get("accessJwt") and body.get("did"):
logging.info(f"✅ Login successful on {host_url}.")
return body
logging.error(f"❌ Login failed on {host_url}: HTTP {status} body={body}")
if is_auth_error_status(status):
return None
if attempt < max_attempts and (is_rate_limited_status(status) or is_transient_status(status)):
wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0)
logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...")
time.sleep(wait)
continue
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0)
time.sleep(wait)
continue
return None
except requests.RequestException as e:
logging.warning(f"⚠️ Login request error on {host_url}: {repr(e)}")
if attempt >= max_attempts:
return None
wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.0)
logging.warning(f"⏳ Retrying login on {host_url} in {wait:.1f}s...")
time.sleep(wait)
return None
def login_with_fallback_hosts(
http: requests.Session,
auth_hosts: list[str],
identifier: str,
password: str,
) -> tuple[dict | None, str | None]:
for host in auth_hosts:
session = login_on_host(http, host, identifier, password)
if session:
return session, host
logging.warning(f"⚠️ Auth host failed: {host}")
return None, None
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def wait_with_heartbeat(seconds: float, label: str) -> None:
if seconds <= 0:
return
logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...")
remaining = seconds
while remaining > 0:
step = min(5.0, remaining)
time.sleep(step)
remaining -= step
if remaining > 0:
logging.info(f" ...still waiting ({remaining:.0f}s remaining)...")
logging.info("✅ Wait complete.")
def pds_did_from_service_url(service_url: str) -> str:
host = (urlparse(service_url).hostname or "").lower()
if not host:
raise ValueError(f"Invalid --service URL: {service_url}")
return f"did:web:{host}"
def random_video_name(ext: str = ".mp4") -> str:
token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12))
return f"{int(time.time())}_{token}{ext}"
def get_service_auth_token(
http: requests.Session,
pds_url_for_auth: str,
access_jwt: str,
aud: str,
lxm: str,
exp_seconds: int = 1800,
) -> str | None:
status, body = xrpc_get(
http=http,
pds_url=pds_url_for_auth,
method="com.atproto.server.getServiceAuth",
params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds},
access_jwt=access_jwt,
timeout=(10, 60),
)
if status != 200 or not isinstance(body, dict):
logging.error(f"❌ getServiceAuth failed: HTTP {status} body={body}")
return None
token = body.get("token")
if not token:
logging.error(f"❌ getServiceAuth returned no token: {body}")
return None
return token
# -----------------------------------------------------------------------------
# ffmpeg
# -----------------------------------------------------------------------------
def ffmpeg_exists() -> bool:
return shutil.which("ffmpeg") is not None
def ffprobe_exists() -> bool:
return shutil.which("ffprobe") is not None
def get_video_duration_seconds(path: str) -> float | None:
if not ffprobe_exists():
return None
try:
out = subprocess.check_output(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
stderr=subprocess.STDOUT,
text=True,
).strip()
return float(out)
except Exception:
return None
def compress_video_ffmpeg(
input_path: str,
max_size_mb: float = 45.0,
crf: int = 28,
preset: str = "veryfast",
audio_bitrate_k: int = 96,
) -> str | None:
if not ffmpeg_exists():
logging.error("❌ ffmpeg not found in PATH.")
return None
if not os.path.exists(input_path):
logging.error(f"❌ Video file not found: {input_path}")
return None
src_size_mb = os.path.getsize(input_path) / (1024 * 1024)
logging.info(f"📦 Source video size: {src_size_mb:.2f} MB")
if src_size_mb <= max_size_mb:
logging.info("✅ Already below target size. Skipping compression.")
return input_path
duration = get_video_duration_seconds(input_path)
target_video_k = 1200
if duration and duration > 0:
total_kbps = (max_size_mb * 8192.0) / duration
target_video_k = int(max(300, total_kbps - audio_bitrate_k))
target_video_k = min(max(target_video_k, 300), 5000)
fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4")
os.close(fd)
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-c:v", "libx264", "-preset", preset, "-crf", str(crf),
"-b:v", f"{target_video_k}k",
"-maxrate", f"{int(target_video_k * 1.3)}k",
"-bufsize", f"{int(target_video_k * 2)}k",
"-vf", "scale='min(1280,iw)':-2",
"-c:a", "aac", "-b:a", f"{audio_bitrate_k}k",
"-movflags", "+faststart",
out_path,
]
try:
logging.info(f"🛠️ Compressing video (target≤{max_size_mb}MB)...")
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out_size_mb = os.path.getsize(out_path) / (1024 * 1024)
logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB")
if out_size_mb < src_size_mb:
return out_path
os.remove(out_path)
logging.info(" Compression not smaller than source. Using original.")
return input_path
except subprocess.CalledProcessError as e:
logging.error("❌ ffmpeg compression failed.")
if e.stderr:
logging.error(e.stderr[-1500:])
try:
os.remove(out_path)
except Exception:
pass
return None
# -----------------------------------------------------------------------------
# Uploads
# -----------------------------------------------------------------------------
def upload_image_embed_dict(http: requests.Session, pds_url: str, access_jwt: str, image_path: str, alt_text: str) -> dict | None:
if not os.path.exists(image_path):
logging.error(f"❌ Image file not found: {image_path}")
return None
mime, _ = mimetypes.guess_type(image_path)
mime = mime or "image/jpeg"
with open(image_path, "rb") as f:
data = f.read()
status, body = xrpc_post_bytes(
http=http,
pds_url=pds_url,
method="com.atproto.repo.uploadBlob",
data=data,
content_type=mime,
access_jwt=access_jwt,
timeout=(20, 240),
)
if status != 200 or not isinstance(body, dict) or not body.get("blob"):
logging.error(f"❌ Image uploadBlob failed: HTTP {status} body={body}")
return None
return {
"$type": "app.bsky.embed.images",
"images": [{"alt": alt_text or "", "image": body["blob"]}],
}
def upload_video_via_video_service_embed_dict(
http: requests.Session,
did: str,
access_jwt: str,
pds_url_for_auth: str,
service_url: str,
video_path: str,
alt_text: str,
) -> dict | None:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)")
pds_did = pds_did_from_service_url(service_url)
token = get_service_auth_token(
http=http,
pds_url_for_auth=pds_url_for_auth,
access_jwt=access_jwt,
aud=pds_did,
lxm="com.atproto.repo.uploadBlob",
exp_seconds=1800,
)
if not token:
return None
upload_name = random_video_name(".mp4")
upload_url = f"https://video.bsky.app/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"}
try:
r = http.post(upload_url, headers=headers, data=video_bytes, timeout=(20, 360))
except requests.RequestException as e:
logging.error(f"❌ video.bsky.app upload request failed: {repr(e)}")
return None
if r.status_code not in (200, 409):
logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}")
return None
try:
payload = r.json()
except Exception:
logging.error(f"❌ Invalid JSON from upload response: {r.text[:400]}")
return None
if r.status_code == 409:
if payload.get("error") == "already_exists" and payload.get("jobId"):
logging.info(" Video already exists. Reusing job.")
else:
logging.error(f"❌ 409 without reusable jobId: {payload}")
return None
job_id = payload.get("jobId")
if not job_id:
logging.error(f"❌ Missing jobId: {payload}")
return None
status_url = "https://video.bsky.app/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600
while time.time() < deadline:
try:
s = http.get(
status_url,
params={"jobId": job_id},
headers={"Authorization": f"Bearer {token}"},
timeout=(10, 45),
)
except requests.RequestException as e:
logging.warning(f"⚠️ Poll error: {repr(e)}")
time.sleep(3)
continue
if s.status_code != 200:
logging.error(f"❌ getJobStatus failed: {s.status_code} - {s.text}")
return None
body = s.json()
st = (body.get("jobStatus") or {}).get("state")
if st == "JOB_STATE_COMPLETED":
blob = (body.get("jobStatus") or {}).get("blob")
if not blob:
logging.error(f"❌ Completed without blob: {body}")
return None
wait_with_heartbeat(8, "CDN propagation")
embed = {"$type": "app.bsky.embed.video", "video": blob}
if alt_text:
embed["alt"] = alt_text
return embed
if st == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {body}")
return None
logging.info(f" ...still processing (state={st})...")
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
# -----------------------------------------------------------------------------
# Post
# -----------------------------------------------------------------------------
def create_post_record(text: str, langs: list[str], embed_dict: dict | None) -> dict:
r = {
"$type": "app.bsky.feed.post",
"text": text.strip(),
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
}
if langs:
r["langs"] = langs
if embed_dict:
r["embed"] = embed_dict
return r
def publish_post(http: requests.Session, pds_url: str, access_jwt: str, did: str, record: dict) -> bool:
status, body = xrpc_post_json(
http=http,
pds_url=pds_url,
method="com.atproto.repo.createRecord",
body={"repo": did, "collection": "app.bsky.feed.post", "record": record},
access_jwt=access_jwt,
timeout=(10, 90),
)
if status != 200 or not isinstance(body, dict):
logging.error(f"❌ createRecord failed: HTTP {status} body={body}")
return False
logging.info(f"✅ Post published! URI: {body.get('uri')}")
return True
def main():
setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("text")
parser.add_argument("--username", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--service", default="https://eurosky.social")
parser.add_argument("--auth-hosts", default="", help="Comma-separated auth hosts fallback, e.g. https://eurosky.social,https://bsky.social")
parser.add_argument("--lang", default="ca")
parser.add_argument("--image", default=None)
parser.add_argument("--video", default=None)
parser.add_argument("--alt", default="")
parser.add_argument("--allow-pds-video-fallback", action="store_true")
parser.add_argument("--video-settle-delay", type=float, default=30.0)
parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True)
parser.add_argument("--no-compress-video", dest="compress_video", action="store_false")
parser.add_argument("--max-video-mb", type=float, default=45.0)
parser.add_argument("--ffmpeg-crf", type=int, default=28)
parser.add_argument("--ffmpeg-preset", default="veryfast")
args = parser.parse_args()
if args.image and args.video:
logging.error("❌ Use either --image or --video, not both.")
sys.exit(1)
service_url = args.service.rstrip("/")
auth_hosts = [x.strip().rstrip("/") for x in args.auth_hosts.split(",") if x.strip()]
if not auth_hosts:
auth_hosts = [service_url, "https://bsky.social"]
http = build_http_session()
session, auth_host_used = login_with_fallback_hosts(
http=http,
auth_hosts=auth_hosts,
identifier=args.username,
password=args.password,
)
if not session:
logging.error("❌ Login failed on all auth hosts.")
sys.exit(1)
did = session["did"]
access_jwt = session["accessJwt"]
logging.info(f"🆔 DID: {did}")
logging.info(f"🔐 Auth host used: {auth_host_used}")
logging.info(f"📡 Service host for repo operations: {service_url}")
langs = [x.strip() for x in args.lang.split(",") if x.strip()]
video_path_for_upload = args.video
tmp_compressed = None
if args.video and args.compress_video:
c = compress_video_ffmpeg(args.video, args.max_video_mb, args.ffmpeg_crf, args.ffmpeg_preset, 96)
if c is None:
sys.exit(1)
video_path_for_upload = c
if c != args.video:
tmp_compressed = c
embed = None
if video_path_for_upload:
embed = upload_video_via_video_service_embed_dict(
http=http,
did=did,
access_jwt=access_jwt,
pds_url_for_auth=auth_host_used,
service_url=service_url,
video_path=video_path_for_upload,
alt_text=args.alt,
)
if embed is None and args.allow_pds_video_fallback:
logging.warning("⚠️ Falling back to direct PDS video upload.")
status, body = xrpc_post_bytes(
http=http,
pds_url=service_url,
method="com.atproto.repo.uploadBlob",
data=open(video_path_for_upload, "rb").read(),
content_type="video/mp4",
access_jwt=access_jwt,
timeout=(20, 900),
)
if status == 200 and isinstance(body, dict) and body.get("blob"):
wait_with_heartbeat(args.video_settle_delay, "PDS/AppView indexing")
embed = {"$type": "app.bsky.embed.video", "video": body["blob"]}
if args.alt:
embed["alt"] = args.alt
if embed is None:
logging.error("❌ Aborting post: video upload failed.")
if tmp_compressed and os.path.exists(tmp_compressed):
os.remove(tmp_compressed)
sys.exit(1)
elif args.image:
embed = upload_image_embed_dict(http, service_url, access_jwt, args.image, args.alt)
if embed is None:
if tmp_compressed and os.path.exists(tmp_compressed):
os.remove(tmp_compressed)
sys.exit(1)
record = create_post_record(args.text, langs, embed)
ok = publish_post(http, service_url, access_jwt, did, record)
if tmp_compressed and os.path.exists(tmp_compressed):
os.remove(tmp_compressed)
if not ok:
sys.exit(1)
if __name__ == "__main__":
main()