Files
post2bsky/twitter2bsky_daemon.py
2026-05-08 16:57:21 +00:00

548 lines
18 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import argparse
import logging
import mimetypes
import os
import random
import re
import secrets
import shutil
import string
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from urllib.parse import urlparse
import requests
from atproto import Client, models
@dataclass(frozen=True)
class RetryConfig:
login_max_attempts: int = 5
login_base_delay_seconds: float = 10.0
login_max_delay_seconds: float = 600.0
login_jitter_seconds: float = 3.0
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
def is_rate_limited_error(error_obj) -> bool:
t = repr(error_obj).lower()
return "429" in t or "ratelimit" in t or "too many requests" in t
def is_auth_error(error_obj) -> bool:
t = repr(error_obj).lower()
return "401" in t or "403" in t or "invalid identifier or password" in t
def is_network_error(error_obj) -> bool:
t = repr(error_obj)
return any(x in t for x in [
"ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout",
"TimeoutException", "503", "502", "504", "ConnectionResetError",
"InvokeTimeoutError"
])
def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float:
try:
now_ts = int(time.time())
headers = getattr(error_obj, "headers", None) or {}
retry_after = headers.get("retry-after") or headers.get("Retry-After")
if retry_after:
return min(max(float(retry_after), 1.0), max_delay)
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay)
return min(wait_seconds, max_delay)
except Exception:
pass
return default_delay
def login_with_backoff(
client: Client, username: str, password: str, service_url: str,
max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5
) -> bool:
for attempt in range(1, max_attempts + 1):
try:
logging.info(f"🔑 Login attempt {attempt}/{max_attempts}{service_url} as {username}")
client.login(username, password)
logging.info("✅ Login successful.")
return True
except Exception as e:
logging.exception("❌ Login exception")
if is_auth_error(e):
logging.error("❌ Authentication failed.")
return False
if is_rate_limited_error(e):
if attempt < max_attempts:
wait = get_rate_limit_wait_seconds(e, base_delay, max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Rate limited, retrying in {wait:.1f}s...")
time.sleep(wait)
continue
return False
if is_network_error(e):
if attempt < max_attempts:
wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Network/transient error, retrying in {wait:.1f}s...")
time.sleep(wait)
continue
return False
return False
def wait_with_heartbeat(total_seconds: float, label: str) -> None:
if total_seconds <= 0:
return
logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...")
remaining = total_seconds
while remaining > 0:
step = min(5.0, remaining)
time.sleep(step)
remaining -= step
if remaining > 0:
logging.info(f" ...still waiting ({remaining:.0f}s remaining)...")
logging.info("✅ Wait complete.")
def pds_did_from_service_url(service_url: str) -> str:
host = (urlparse(service_url).hostname or "").lower()
if not host:
raise ValueError(f"Invalid --service URL: {service_url}")
return f"did:web:{host}"
def random_video_name(ext: str = ".mp4") -> str:
token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12))
return f"{int(time.time())}_{token}{ext}"
def detect_mime_type(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
return mime or "application/octet-stream"
def ffmpeg_exists() -> bool:
return shutil.which("ffmpeg") is not None
def ffprobe_exists() -> bool:
return shutil.which("ffprobe") is not None
def get_video_duration_seconds(path: str) -> float | None:
if not ffprobe_exists():
return None
try:
out = subprocess.check_output([
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path
], stderr=subprocess.STDOUT, text=True).strip()
return float(out)
except Exception:
return None
def compress_video_ffmpeg(
input_path: str, max_size_mb: float = 45.0, crf: int = 28, preset: str = "veryfast", audio_bitrate_k: int = 96
) -> str | None:
if not ffmpeg_exists():
logging.error("❌ ffmpeg not found. Install ffmpeg or use --no-compress-video.")
return None
if not os.path.exists(input_path):
logging.error(f"❌ Video file not found: {input_path}")
return None
src_size_mb = os.path.getsize(input_path) / (1024 * 1024)
logging.info(f"📦 Source video size: {src_size_mb:.2f} MB")
if src_size_mb <= max_size_mb:
logging.info("✅ Already below size target. Skipping compression.")
return input_path
duration = get_video_duration_seconds(input_path)
target_video_k = 1200
if duration and duration > 0:
total_kbps = (max_size_mb * 8192.0) / duration
target_video_k = int(max(300, total_kbps - audio_bitrate_k))
target_video_k = min(max(target_video_k, 300), 5000)
fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4")
os.close(fd)
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-c:v", "libx264",
"-preset", preset,
"-crf", str(crf),
"-b:v", f"{target_video_k}k",
"-maxrate", f"{int(target_video_k * 1.3)}k",
"-bufsize", f"{int(target_video_k * 2)}k",
"-vf", "scale='min(1280,iw)':-2",
"-c:a", "aac",
"-b:a", f"{audio_bitrate_k}k",
"-movflags", "+faststart",
out_path,
]
try:
logging.info(f"🛠️ Compressing video (target≤{max_size_mb}MB, crf={crf}, preset={preset})...")
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out_size_mb = os.path.getsize(out_path) / (1024 * 1024)
logging.info(f"✅ Compressed size: {out_size_mb:.2f} MB")
if out_size_mb < src_size_mb:
return out_path
os.remove(out_path)
return input_path
except subprocess.CalledProcessError as e:
logging.error("❌ ffmpeg compression failed.")
if e.stderr:
logging.error(e.stderr[-2000:])
try:
os.remove(out_path)
except Exception:
pass
return None
def _extract_service_auth_token(upload_auth) -> str | None:
token = getattr(upload_auth, "token", None)
if token:
return token
if isinstance(upload_auth, dict):
return upload_auth.get("token")
return None
def upload_video_via_bsky_service(client: Client, video_path: str, service_url: str, alt_text: str = "") -> dict | None:
if not os.path.exists(video_path):
logging.error(f"❌ Video file not found: {video_path}")
return None
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)")
video_host = "https://video.bsky.app"
pds_did = pds_did_from_service_url(service_url)
try:
params = models.ComAtprotoServerGetServiceAuth.Params(
aud=pds_did,
lxm="com.atproto.repo.uploadBlob",
exp=int(time.time()) + 60 * 30,
)
upload_auth = client.com.atproto.server.get_service_auth(params)
except Exception:
upload_auth = client.com.atproto.server.get_service_auth({
"aud": pds_did,
"lxm": "com.atproto.repo.uploadBlob",
"exp": int(time.time()) + 60 * 30,
})
token = _extract_service_auth_token(upload_auth)
if not token:
logging.error("❌ Could not get service auth token.")
return None
upload_name = random_video_name(".mp4")
logging.info(f"🎞️ Upload name: {upload_name}")
upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "video/mp4"}
resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240)
if resp.status_code not in (200, 409):
logging.error(f"❌ video.bsky.app upload failed: {resp.status_code} - {resp.text}")
return None
body = resp.json()
if resp.status_code == 409:
if body.get("error") == "already_exists" and body.get("jobId"):
logging.info(" Video already processed on video.bsky.app. Reusing existing job.")
else:
logging.error(f"❌ 409 without reusable jobId: {body}")
return None
job_id = body.get("jobId")
if not job_id:
logging.error(f"❌ Missing jobId in upload response: {body}")
return None
logging.info(f"⏳ Job {job_id} accepted — polling status...")
status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus"
deadline = time.time() + 600
while time.time() < deadline:
s = requests.get(status_url, params={"jobId": job_id}, timeout=30)
if s.status_code != 200:
logging.error(f"❌ Job status check failed: {s.status_code} - {s.text}")
return None
status_json = s.json()
job_status = status_json.get("jobStatus", {})
state = job_status.get("state")
if state == "JOB_STATE_COMPLETED":
blob = job_status.get("blob")
if not blob:
logging.error(f"❌ Completed job without blob: {status_json}")
return None
wait_with_heartbeat(8, "CDN propagation")
# Return RAW embed dict (no models.BlobRef dependency)
return {
"$type": "app.bsky.embed.video",
"video": blob,
"alt": alt_text or "",
}
if state == "JOB_STATE_FAILED":
logging.error(f"❌ Video processing failed: {job_status}")
return None
logging.info(f" ...still processing (state={state})...")
time.sleep(3)
logging.error("❌ Video processing timed out.")
return None
def upload_video_via_pds(client: Client, video_path: str, alt_text: str = "", settle_delay_seconds: float = 30.0) -> dict | None:
try:
with open(video_path, "rb") as f:
video_bytes = f.read()
size_mb = len(video_bytes) / (1024 * 1024)
logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)")
r = client.upload_blob(video_bytes)
wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing")
blob = getattr(r, "blob", None)
if blob is None:
d = r if isinstance(r, dict) else {}
blob = d.get("blob")
if blob is None:
logging.error("❌ PDS uploadBlob returned no blob.")
return None
# Also return raw embed dict
return {
"$type": "app.bsky.embed.video",
"video": blob,
"alt": alt_text or "",
}
except Exception as e:
logging.error(f"❌ PDS-direct video upload failed: {repr(e)}")
return None
def upload_video_smart(
client: Client, video_path: str, service_url: str, alt_text: str,
settle_delay_seconds: float, allow_pds_video_fallback: bool
) -> dict | None:
logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.")
embed = upload_video_via_bsky_service(client, video_path, service_url, alt_text)
if embed:
return embed
if allow_pds_video_fallback:
logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.")
return upload_video_via_pds(client, video_path, alt_text, settle_delay_seconds)
logging.error("❌ video.bsky.app failed and fallback disabled.")
return None
def upload_image(client: Client, image_path: str, alt_text: str = "") -> dict | None:
try:
if not os.path.exists(image_path):
logging.error(f"❌ Image file not found: {image_path}")
return None
mime = detect_mime_type(image_path)
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
r = client.upload_blob(data)
blob = getattr(r, "blob", None)
if blob is None and isinstance(r, dict):
blob = r.get("blob")
if blob is None:
logging.error("❌ uploadBlob returned no blob for image.")
return None
return {
"$type": "app.bsky.embed.images",
"images": [{"alt": alt_text or "", "image": blob}],
}
except Exception as e:
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
def post_to_bsky(
client: Client,
text: str,
langs: list[str],
image_path: str | None,
video_path: str | None,
alt_text: str,
service_url: str,
video_settle_delay: float,
allow_pds_video_fallback: bool,
) -> bool:
post_text = text.strip()
if not post_text and not image_path and not video_path:
logging.error("❌ Empty post text with no media is not allowed.")
return False
embed_dict = None
if video_path:
logging.info(f"🎬 Preparing video upload: {video_path}")
embed_dict = upload_video_smart(
client=client,
video_path=video_path,
service_url=service_url,
alt_text=alt_text,
settle_delay_seconds=video_settle_delay,
allow_pds_video_fallback=allow_pds_video_fallback,
)
if not embed_dict:
logging.error("❌ Aborting post: video upload/processing failed.")
return False
elif image_path:
embed_dict = upload_image(client, image_path, alt_text)
if not embed_dict:
logging.error("❌ Aborting post: image upload failed.")
return False
record = {
"$type": "app.bsky.feed.post",
"text": post_text,
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
}
if langs:
record["langs"] = langs
if embed_dict:
record["embed"] = embed_dict
logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}")
try:
data = models.ComAtprotoRepoCreateRecord.Data(
repo=client.me.did,
collection="app.bsky.feed.post",
record=record,
)
resp = client.com.atproto.repo.create_record(data)
except Exception:
resp = client.com.atproto.repo.create_record({
"repo": client.me.did,
"collection": "app.bsky.feed.post",
"record": record,
})
uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None)
logging.info(f"✅ Post published! URI: {uri}")
return True
def main():
setup_logging()
parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.")
parser.add_argument("text", help="Post text content")
parser.add_argument("--username", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--service", default="https://bsky.social")
parser.add_argument("--lang", default="ca")
parser.add_argument("--image", default=None)
parser.add_argument("--video", default=None)
parser.add_argument("--alt", default="")
parser.add_argument("--video-settle-delay", type=float, default=30.0)
parser.add_argument("--allow-pds-video-fallback", action="store_true")
parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True)
parser.add_argument("--no-compress-video", dest="compress_video", action="store_false")
parser.add_argument("--max-video-mb", type=float, default=45.0)
parser.add_argument("--ffmpeg-crf", type=int, default=28)
parser.add_argument("--ffmpeg-preset", default="veryfast")
args = parser.parse_args()
if args.image and args.video:
logging.error("❌ Use either --image or --video, not both.")
sys.exit(1)
client = Client(base_url=args.service)
if not login_with_backoff(
client, args.username, args.password, args.service,
RetryConfig.login_max_attempts,
RetryConfig.login_base_delay_seconds,
RetryConfig.login_max_delay_seconds,
RetryConfig.login_jitter_seconds,
):
sys.exit(1)
langs = [x.strip() for x in args.lang.split(",") if x.strip()]
video_path_for_upload = args.video
temp_compressed_path = None
if args.video and args.compress_video:
compressed = compress_video_ffmpeg(
input_path=args.video,
max_size_mb=args.max_video_mb,
crf=args.ffmpeg_crf,
preset=args.ffmpeg_preset,
audio_bitrate_k=96,
)
if compressed is None:
logging.error("❌ Compression failed; aborting.")
sys.exit(1)
video_path_for_upload = compressed
if compressed != args.video:
temp_compressed_path = compressed
ok = post_to_bsky(
client=client,
text=args.text,
langs=langs,
image_path=args.image,
video_path=video_path_for_upload,
alt_text=args.alt,
service_url=args.service,
video_settle_delay=args.video_settle_delay,
allow_pds_video_fallback=args.allow_pds_video_fallback,
)
try:
if temp_compressed_path and os.path.exists(temp_compressed_path):
os.remove(temp_compressed_path)
logging.info(f"🧹 Removed temp file: {temp_compressed_path}")
except Exception:
pass
if not ok:
sys.exit(1)
if __name__ == "__main__":
main()