Files
post2bsky/twitter2bsky_daemon.py
2026-05-08 19:04:22 +00:00

733 lines
24 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Post text + optional image/video to Bluesky/federated PDS.
Reliability strategy:
- Login via atproto SDK (only SDK usage).
- ALL media + record operations via raw XRPC (requests) to avoid
atproto SDK BlobRef serialization bugs across versions.
- Video uploads go through https://video.bsky.app (official path).
- getServiceAuth uses aud=did:web:<pds-host>, lxm=com.atproto.repo.uploadBlob.
- Handles 409 already_exists by reusing jobId.
- ffmpeg compression enabled by default.
"""
import argparse
import json
import logging
import mimetypes
import os
import random
import secrets
import shutil
import string
import subprocess
import sys
import tempfile
import time
from urllib.parse import urlparse
import requests
from atproto import Client
# ---------------------------
# Logging
# ---------------------------
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
# ---------------------------
# Login helpers
# ---------------------------
def is_auth_error(exc: Exception) -> bool:
t = repr(exc).lower()
return "401" in t or "403" in t or "invalid identifier or password" in t
def is_rate_limited_error(exc: Exception) -> bool:
t = repr(exc).lower()
return "429" in t or "ratelimit" in t or "too many requests" in t
def is_transient_error(exc: Exception) -> bool:
t = repr(exc)
needles = [
"ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "InvokeTimeoutError", "502", "503", "504",
]
return any(n in t for n in needles)
def login_with_backoff(
client: Client,
username: str,
password: str,
service_url: str,
max_attempts: int = 5,
base_delay: float = 8.0,
max_delay: float = 120.0,
) -> bool:
for attempt in range(1, max_attempts + 1):
try:
logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}")
client.login(username, password)
logging.info("✅ Login successful.")
return True
except Exception as e:
logging.exception("❌ Login failed")
if is_auth_error(e):
logging.error("❌ Authentication failed. Check handle/app-password.")
return False
if attempt >= max_attempts:
return False
if is_rate_limited_error(e) or is_transient_error(e):
wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2)
logging.warning(f"⏳ Retrying login in {wait:.1f}s...")
time.sleep(wait)
else:
return False
return False
# ---------------------------
# Generic helpers
# ---------------------------
def wait_with_heartbeat(seconds: float, label: str) -> None:
if seconds <= 0:
return
logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...")
remaining = seconds
while remaining > 0:
step = min(5.0, remaining)
time.sleep(step)
remaining -= step
if remaining > 0:
logging.info(f" ...still waiting ({remaining:.0f}s remaining)...")
logging.info("✅ Wait complete.")
def pds_did_from_service_url(service_url: str) -> str:
host = (urlparse(service_url).hostname or "").lower()
if not host:
raise ValueError(f"Invalid --service URL: {service_url}")
return f"did:web:{host}"
def random_video_name(ext: str = ".mp4") -> str:
token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12))
return f"{int(time.time())}_{token}{ext}"
# ---------------------------
# Auth/session extraction (raw XRPC)
# ---------------------------
def get_session_info(client: Client) -> tuple[str, str, str]:
"""
Returns (did, access_jwt, pds_url) from the SDK client's session,
avoiding any lazy-loaded properties that may trigger BlobRef bugs.
"""
# atproto SDK exposes session via _session or me; pull safely.
did = None
access_jwt = None
# Try common attribute paths across SDK versions
sess = getattr(client, "_session", None) or getattr(client, "session", None)
if sess is not None:
did = getattr(sess, "did", None) or (sess.get("did") if isinstance(sess, dict) else None)
access_jwt = (
getattr(sess, "access_jwt", None)
or getattr(sess, "accessJwt", None)
or (sess.get("accessJwt") if isinstance(sess, dict) else None)
or (sess.get("access_jwt") if isinstance(sess, dict) else None)
)
if not did:
# Last resort: client.me — but wrap to suppress BlobRef issues
try:
me = client.me
did = getattr(me, "did", None)
except Exception as e:
logging.error(f"❌ Could not read client.me: {repr(e)}")
if not access_jwt:
# Some SDK versions: client._session_dispatcher or similar
for attr in ("_access_jwt", "access_jwt"):
v = getattr(client, attr, None)
if v:
access_jwt = v
break
if not did or not access_jwt:
raise RuntimeError("Unable to extract DID/accessJwt from atproto Client session.")
pds_url = getattr(client, "_base_url", None) or getattr(client, "base_url", None) or "https://bsky.social"
return did, access_jwt, pds_url
def xrpc_get(pds_url: str, access_jwt: str, method: str, params: dict, timeout: int = 30) -> dict:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {"Authorization": f"Bearer {access_jwt}"}
r = requests.get(url, headers=headers, params=params, timeout=timeout)
r.raise_for_status()
return r.json()
def xrpc_post_json(pds_url: str, access_jwt: str, method: str, body: dict, timeout: int = 60) -> dict:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {
"Authorization": f"Bearer {access_jwt}",
"Content-Type": "application/json",
}
r = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout)
if r.status_code >= 400:
raise RuntimeError(f"XRPC {method} failed: {r.status_code} {r.text}")
return r.json()
def xrpc_post_bytes(pds_url: str, access_jwt: str, method: str, data: bytes, content_type: str, timeout: int = 240) -> dict:
url = f"{pds_url.rstrip('/')}/xrpc/{method}"
headers = {
"Authorization": f"Bearer {access_jwt}",
"Content-Type": content_type,
}
r = requests.post(url, headers=headers, data=data, timeout=timeout)
if r.status_code >= 400:
raise RuntimeError(f"XRPC {method} failed: {r.status_code} {r.text}")
return r.json()
def get_service_auth_token(pds_url: str, access_jwt: str, aud: str, lxm: str, exp_seconds: int = 1800) -> str:
"""
Raw XRPC call to com.atproto.server.getServiceAuth — avoids SDK typed coercion.
"""
body = xrpc_get(
pds_url=pds_url,
access_jwt=access_jwt,
method="com.atproto.server.getServiceAuth",
params={"aud": aud, "lxm": lxm, "exp": int(time.time()) + exp_seconds},
timeout=30,
)
token = body.get("token")
if not token:
raise RuntimeError(f"getServiceAuth returned no token: {body}")
return token
# ---------------------------
# ffmpeg compression
# ---------------------------
def ffmpeg_exists() -> bool:
return shutil.which("ffmpeg") is not None
def ffprobe_exists() -> bool:
return shutil.which("ffprobe") is not None
def get_video_duration_seconds(path: str) -> float | None:
if not ffprobe_exists():
return None
try:
out = subprocess.check_output(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
stderr=subprocess.STDOUT,
text=True,
).strip()
return float(out)
except Exception:
return None
def compress_video_ffmpeg(
input_path: str,
max_size_mb: float = 45.0,
crf: int = 28,
preset: str = "veryfast",
audio_bitrate_k: int = 96,
) -> str | None:
if not ffmpeg_exists():
logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.")
return None
if not os.path.exists(input_path):
logging.error(f"❌ Video file not found: {input_path}")
return None
src_size_mb = os.path.getsize(input_path) / (1024 * 1024)
logging.info(f"📦 Source video size: {src_size_mb:.2f} MB")
if src_size_mb <= max_size_mb:
logging.info("✅ Already below target size. Skipping compression.")
return input_path
duration = get_video_duration_seconds(input_path)
target_video_k = 1200
if duration and duration > 0:
total_kbps = (max_size_mb * 8192.0) / duration
target_video_k = int(max(300, total_kbps - audio_bitrate_k))
target_video_k = min(max(target_video_k, 300), 5000)
fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4")
os.close(fd)
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-c:v", "libx264",
"-preset", preset,
"-crf", str(crf),
"-b:v", f"{target_video_k}k",
"-maxrate", f"{int(target_video_k * 1.3)}k",
"-bufsize", f"{int(target_video_k * 2)}k",
"-vf", "scale='min(1280,iw)':-2",
"-c:a", "aac",
"-b:a", f"{audio_bitrate_k}k",
"-movflags", "+faststart",
out_path,
]
try:
logging.info(f"🛠️ Compressing video (target≤{max_size_mb}MB, crf={crf}, preset={preset})...")
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out_size_mb = os.path.getsize(out_path) / (1024 * 1024)
logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB")
if out_size_mb < src_size_mb:
return out_path
os.remove(out_path)
logging.info(" Compression not smaller than source. Using original.")
return input_path
except subprocess.CalledProcessError as e:
logging.error("❌ ffmpeg compression failed.")
if e.stderr:
logging.error(e.stderr[-2000:])
try:
os.remove(out_path)
except Exception:
pass
return None
# ---------------------------
# Image upload (raw XRPC)
# ---------------------------
def upload_image_embed_dict(
pds_url: str,
access_jwt: str,
image_path: str,
alt_text: str = "",
) -> dict | None:
if not os.path.exists(image_path):
logging.error(f"❌ Image file not found: {image_path}")
return None
mime, _ = mimetypes.guess_type(image_path)
mime = mime or "image/jpeg"
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
try:
body = xrpc_post_bytes(
pds_url=pds_url,
access_jwt=access_jwt,
method="com.atproto.repo.uploadBlob",
data=data,
content_type=mime,
timeout=180,
)
blob = body.get("blob")
if not blob:
logging.error(f"❌ uploadBlob returned no blob: {body}")
return None
return {
"$type": "app.bsky.embed.images",
"images": [{"alt": alt_text or "", "image": blob}],
}
except Exception as e:
logging.error(f"❌ Image upload failed: {repr(e)}")
return None
# ---------------------------
# Video upload via video.bsky.app
# ---------------------------
def upload_video_via_video_service_embed_dict(
did: str,
access_jwt: str,
pds_url: str,
video_path: str,
service_url: str,
alt_text: str = "",
) -> dict | None:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)")
video_host = "https://video.bsky.app"
pds_did = pds_did_from_service_url(service_url)
# Service auth token from user's PDS
try:
token = get_service_auth_token(
pds_url=pds_url,
access_jwt=access_jwt,
aud=pds_did,
lxm="com.atproto.repo.uploadBlob",
exp_seconds=1800,
)
except Exception as e:
logging.error(f"❌ getServiceAuth failed: {repr(e)}")
return None
upload_name = random_video_name(".mp4")
logging.info(f"🎞️ Upload name: {upload_name}")
upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={did}&name={upload_name}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "video/mp4",
}
try:
r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=300)
except Exception as e:
logging.error(f"❌ video upload request failed: {repr(e)}")
return None
if r.status_code not in (200, 409):
logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}")
return None
try:
payload = r.json()
except Exception as e:
logging.error(f"❌ Could not parse upload response JSON: {repr(e)} body={r.text[:500]}")
return None
if r.status_code == 409:
if payload.get("error") == "already_exists" and payload.get("jobId"):
logging.info(" Video already processed on video.bsky.app. Reusing existing job.")
else:
logging.error(f"❌ 409 without reusable jobId: {payload}")
return None
job_id = payload.get("jobId")
if not job_id:
logging.error(f"❌ No jobId in video upload response: {payload}")
return None
logging.info(f"⏳ Job {job_id} accepted — polling status...")
status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600 # 10 min max poll
last_state = None
while time.time() < deadline:
try:
s = requests.get(
status_url,
params={"jobId": job_id},
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
except Exception as e:
logging.warning(f"⚠️ Status poll request failed once: {repr(e)}")
time.sleep(3)
continue
if s.status_code != 200:
logging.error(f"❌ Job status failed: {s.status_code} - {s.text}")
return None
try:
body = s.json()
except Exception as e:
logging.error(f"❌ Could not parse status JSON: {repr(e)}")
return None
job_status = body.get("jobStatus", {}) or {}
state = job_status.get("state")
if state != last_state:
logging.info(f" state → {state}")
last_state = state
if state == "JOB_STATE_COMPLETED":
blob = job_status.get("blob")
if not blob:
logging.error(f"❌ Completed job without blob: {body}")
return None
wait_with_heartbeat(8, "CDN propagation")
# RAW embed dict — no BlobRef typed conversion anywhere.
return {
"$type": "app.bsky.embed.video",
"video": blob,
"alt": alt_text or "",
}
if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {job_status}")
return None
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
# ---------------------------
# Direct PDS video fallback (rarely works on third-party PDS)
# ---------------------------
def upload_video_via_pds_embed_dict(
pds_url: str,
access_jwt: str,
video_path: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
) -> dict | None:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
try:
with open(video_path, "rb") as f:
b = f.read()
size_mb = len(b) / (1024 * 1024)
logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)")
body = xrpc_post_bytes(
pds_url=pds_url,
access_jwt=access_jwt,
method="com.atproto.repo.uploadBlob",
data=b,
content_type="video/mp4",
timeout=600, # large timeout for direct PDS upload
)
blob = body.get("blob")
if not blob:
logging.error(f"❌ PDS uploadBlob returned no blob: {body}")
return None
wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing")
return {
"$type": "app.bsky.embed.video",
"video": blob,
"alt": alt_text or "",
}
except Exception as e:
logging.error(f"❌ PDS-direct video upload failed: {repr(e)}")
return None
def upload_video_smart_embed_dict(
did: str,
access_jwt: str,
pds_url: str,
video_path: str,
service_url: str,
alt_text: str = "",
settle_delay_seconds: float = 30.0,
allow_pds_video_fallback: bool = False,
) -> dict | None:
logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.")
embed = upload_video_via_video_service_embed_dict(
did=did,
access_jwt=access_jwt,
pds_url=pds_url,
video_path=video_path,
service_url=service_url,
alt_text=alt_text,
)
if embed:
return embed
if allow_pds_video_fallback:
logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.")
return upload_video_via_pds_embed_dict(
pds_url=pds_url,
access_jwt=access_jwt,
video_path=video_path,
alt_text=alt_text,
settle_delay_seconds=settle_delay_seconds,
)
logging.error("❌ video.bsky.app failed and fallback is disabled.")
return None
# ---------------------------
# Create post (raw XRPC)
# ---------------------------
def create_post_record(text: str, langs: list[str], embed_dict: dict | None = None) -> dict:
record = {
"$type": "app.bsky.feed.post",
"text": text.strip(),
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
}
if langs:
record["langs"] = langs
if embed_dict is not None:
record["embed"] = embed_dict
return record
def publish_post(pds_url: str, access_jwt: str, did: str, record: dict) -> bool:
try:
body = xrpc_post_json(
pds_url=pds_url,
access_jwt=access_jwt,
method="com.atproto.repo.createRecord",
body={
"repo": did,
"collection": "app.bsky.feed.post",
"record": record,
},
timeout=60,
)
uri = body.get("uri")
logging.info(f"✅ Post published! URI: {uri}")
return True
except Exception as e:
logging.error(f"❌ createRecord failed: {repr(e)}")
return False
# ---------------------------
# Main
# ---------------------------
def main():
setup_logging()
parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.")
parser.add_argument("text", help="Post text")
parser.add_argument("--username", required=True, help="Handle/email")
parser.add_argument("--password", required=True, help="App password")
parser.add_argument("--service", default="https://bsky.social", help="PDS URL")
parser.add_argument("--lang", default="ca", help="Comma-separated language codes")
parser.add_argument("--image", default=None, help="Image path")
parser.add_argument("--video", default=None, help="Video path")
parser.add_argument("--alt", default="", help="Alt text")
parser.add_argument("--video-settle-delay", type=float, default=30.0)
parser.add_argument("--allow-pds-video-fallback", action="store_true")
parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True)
parser.add_argument("--no-compress-video", dest="compress_video", action="store_false")
parser.add_argument("--max-video-mb", type=float, default=45.0)
parser.add_argument("--ffmpeg-crf", type=int, default=28)
parser.add_argument("--ffmpeg-preset", default="veryfast")
args = parser.parse_args()
if args.image and args.video:
logging.error("❌ Use either --image or --video, not both.")
sys.exit(1)
client = Client(base_url=args.service)
if not login_with_backoff(
client=client,
username=args.username,
password=args.password,
service_url=args.service,
):
sys.exit(1)
# Capture session info ONCE — avoid lazy SDK property access later.
try:
did, access_jwt, pds_url = get_session_info(client)
except Exception as e:
logging.error(f"❌ Could not extract session: {repr(e)}")
sys.exit(1)
# Override pds_url with the user-specified service (in case SDK normalized it)
pds_url = args.service.rstrip("/")
logging.info(f"🆔 DID: {did} | PDS: {pds_url}")
langs = [x.strip() for x in args.lang.split(",") if x.strip()]
video_path_for_upload = args.video
temp_compressed_path = None
if args.video and args.compress_video:
compressed = compress_video_ffmpeg(
input_path=args.video,
max_size_mb=args.max_video_mb,
crf=args.ffmpeg_crf,
preset=args.ffmpeg_preset,
audio_bitrate_k=96,
)
if compressed is None:
logging.error("❌ Compression failed; aborting.")
sys.exit(1)
video_path_for_upload = compressed
if compressed != args.video:
temp_compressed_path = compressed
embed_dict = None
if video_path_for_upload:
logging.info(f"🎬 Preparing video upload: {video_path_for_upload}")
embed_dict = upload_video_smart_embed_dict(
did=did,
access_jwt=access_jwt,
pds_url=pds_url,
video_path=video_path_for_upload,
service_url=args.service,
alt_text=args.alt,
settle_delay_seconds=args.video_settle_delay,
allow_pds_video_fallback=args.allow_pds_video_fallback,
)
if embed_dict is None:
logging.error("❌ Aborting post: video upload/processing failed.")
if temp_compressed_path and os.path.exists(temp_compressed_path):
os.remove(temp_compressed_path)
sys.exit(1)
elif args.image:
embed_dict = upload_image_embed_dict(
pds_url=pds_url,
access_jwt=access_jwt,
image_path=args.image,
alt_text=args.alt,
)
if embed_dict is None:
logging.error("❌ Aborting post: image upload failed.")
sys.exit(1)
record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict)
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
ok = publish_post(pds_url=pds_url, access_jwt=access_jwt, did=did, record=record)
if temp_compressed_path and os.path.exists(temp_compressed_path):
try:
os.remove(temp_compressed_path)
logging.info(f"🧹 Removed temp file: {temp_compressed_path}")
except Exception:
pass
if not ok:
sys.exit(1)
if __name__ == "__main__":
main()