fix(sync): preserve non-X links during truncation so Bluesky external cards are generated

This commit is contained in:
Guillem Hernandez Sola
2026-04-09 12:06:44 +02:00
parent 66b6ff1fbe
commit 19ec55717e

View File

@@ -35,14 +35,13 @@ EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
MEDIA_DOWNLOAD_TIMEOUT = 30
LINK_METADATA_TIMEOUT = 10
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
# Extra timeout retry tuning for transient blob upload failures
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
@@ -242,74 +241,295 @@ def remove_trailing_ellipsis_line(text):
return "\n".join(lines).strip()
def clean_url(url):
trimmed_url = url.strip()
cleaned_url = re.sub(r"\s+", "", trimmed_url)
cleaned_url = strip_trailing_url_punctuation(cleaned_url)
if is_valid_url(cleaned_url):
return cleaned_url
return None
def clean_post_text(text):
raw_text = (text or "").strip()
raw_text = repair_broken_urls(raw_text)
raw_text = repair_broken_mentions(raw_text)
raw_text = strip_line_edge_whitespace(raw_text)
raw_text = remove_trailing_ellipsis_line(raw_text)
return raw_text.strip()
def canonicalize_url(url):
if not url:
return None
return strip_trailing_url_punctuation(url.strip())
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if len(text) <= max_length:
return text
truncated = text[:max_length - 3]
last_space = truncated.rfind(" ")
if last_space > 0:
return truncated[:last_space] + "..."
return truncated + "..."
def canonicalize_tweet_url(url):
if not url:
return None
url = url.strip()
match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE)
if not match:
return url.lower()
handle = match.group(1).lower()
tweet_id = match.group(2)
return f"https://x.com/{handle}/status/{tweet_id}"
def is_x_or_twitter_domain(url):
try:
hostname = (urlparse(url).hostname or "").lower()
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"}
except Exception:
return False
def extract_urls_from_text(text):
def prepare_post_text_for_bsky(full_clean_text, keep_url=None):
"""
Prepare final Bluesky post text.
If keep_url is provided and exists in the text, try to preserve it in the final output
by truncating the body before the URL instead of cutting the URL away.
"""
text = (full_clean_text or "").strip()
if not text:
return []
repaired = repair_broken_urls(text)
return re.findall(r"https?://[^\s]+", repaired)
return text
if len(text) <= BSKY_TEXT_MAX_LENGTH:
return text
def extract_non_x_urls_from_text(text):
urls = extract_urls_from_text(text)
result = []
if keep_url:
canonical_keep = canonicalize_url(keep_url)
urls = extract_ordered_non_x_urls(text)
matched_url = None
for url in urls:
cleaned = strip_trailing_url_punctuation(url)
if cleaned and not is_x_or_twitter_domain(cleaned):
result.append(cleaned)
if canonicalize_url(url) == canonical_keep:
matched_url = url
break
return result
if matched_url and matched_url in text:
idx = text.find(matched_url)
prefix = text[:idx].rstrip()
suffix = matched_url
reserve = len(suffix) + 1
available = BSKY_TEXT_MAX_LENGTH - reserve
if available > 10:
trimmed_prefix = prefix
if len(trimmed_prefix) > available:
trimmed_prefix = trimmed_prefix[:available - 3]
last_space = trimmed_prefix.rfind(" ")
if last_space > 0:
trimmed_prefix = trimmed_prefix[:last_space] + "..."
else:
trimmed_prefix = trimmed_prefix + "..."
final_text = f"{trimmed_prefix.rstrip()} {suffix}".strip()
if len(final_text) <= BSKY_TEXT_MAX_LENGTH:
logging.info("🔗 Preserved non-X URL in final Bluesky text for card generation")
return final_text
return truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH)
def extract_ordered_non_x_urls(text):
seen = set()
ordered = []
def normalize_post_text(text):
if not text:
return ""
for url in extract_non_x_urls_from_text(text):
canonical = canonicalize_url(url)
if canonical and canonical not in seen:
seen.add(canonical)
ordered.append(canonical)
text = clean_post_text(text)
text = text.replace("\r", "\n")
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
return ordered
def build_media_fingerprint(tweet):
if not tweet or not tweet.media:
return "no-media"
parts = []
for media in tweet.media:
media_type = getattr(media, "type", "unknown")
media_url = getattr(media, "media_url_https", "") or ""
stable_value = media_url
if media_type == "photo":
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
elif media_type == "video":
stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "")
parts.append(f"{media_type}:{stable_value}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
images = getattr(embed, "images", None)
if images:
for img in images:
image_obj = getattr(img, "image", None)
ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj)
parts.append(f"photo:{ref}")
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
external = getattr(embed, "external", None)
if external:
uri = getattr(external, "uri", None) or str(external)
parts.append(f"external:{uri}")
if not parts:
return "no-media"
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest()
def create_bsky_client(base_url, handle, password):
normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}")
try:
client = Client(base_url=normalized_base_url)
except TypeError:
logging.warning("⚠️ Your atproto Client does not accept base_url in constructor. Falling back.")
client = Client()
try:
if hasattr(client, "base_url"):
client.base_url = normalized_base_url
elif hasattr(client, "_base_url"):
client._base_url = normalized_base_url
except Exception as e:
logging.warning(f"⚠️ Could not apply custom base URL cleanly: {e}")
client.login(handle, password)
return client
def default_state():
return {
"version": 1,
"posted_tweets": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.")
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
logging.warning("⚠️ State file is invalid. Reinitializing.")
return default_state()
state.setdefault("version", 1)
state.setdefault("posted_tweets", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{state_path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state file {state_path}: {e}")
def remember_posted_tweet(state, candidate, bsky_uri=None):
canonical_tweet_url = candidate.get("canonical_tweet_url")
fallback_key = f"textmedia:{candidate['text_media_key']}"
state_key = canonical_tweet_url or fallback_key
record = {
"canonical_tweet_url": canonical_tweet_url,
"normalized_text": candidate["normalized_text"],
"raw_text": candidate["raw_text"],
"full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]),
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]),
"ordered_non_x_urls": candidate.get("ordered_non_x_urls", []),
"bsky_uri": bsky_uri,
"tweet_created_on": candidate["tweet"].created_on,
"tweet_url": candidate["tweet"].tweet_url,
"posted_at": arrow.utcnow().isoformat(),
}
state["posted_tweets"][state_key] = record
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = state_key
def candidate_matches_state(candidate, state):
canonical_tweet_url = candidate["canonical_tweet_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted_tweets = state.get("posted_tweets", {})
if canonical_tweet_url and canonical_tweet_url in posted_tweets:
return True, "state:tweet_url"
for _, record in posted_tweets.items():
if record.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for _, record in posted_tweets.items():
if record.get("normalized_text") == normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted_tweets = state.get("posted_tweets", {})
if len(posted_tweets) <= max_entries:
return state
sortable = []
for key, record in posted_tweets.items():
posted_at = record.get("posted_at") or ""
sortable.append((key, posted_at))
sortable.sort(key=lambda x: x[1], reverse=True)
keep_keys = {key for key, _ in sortable[:max_entries]}
new_posted_tweets = {}
for key, record in posted_tweets.items():
if key in keep_keys:
new_posted_tweets[key] = record
new_posted_by_bsky_uri = {}
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
if key in keep_keys:
new_posted_by_bsky_uri[bsky_uri] = key
state["posted_tweets"] = new_posted_tweets
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
return state
def extract_urls_from_facets(record):
@@ -329,21 +549,55 @@ def extract_urls_from_facets(record):
return urls
def looks_like_title_plus_url_post(text):
if not text:
return False
def get_recent_bsky_posts(client, handle, limit=30):
recent_posts = []
repaired = repair_broken_urls(text)
repaired = strip_line_edge_whitespace(repaired)
lines = [line.strip() for line in repaired.splitlines() if line.strip()]
if len(lines) < 2:
return False
try:
timeline = client.get_author_feed(handle, limit=limit)
last_line = lines[-1]
urls_in_last_line = extract_ordered_non_x_urls(last_line)
total_urls = extract_ordered_non_x_urls(repaired)
for item in timeline.feed:
try:
if item.reason is not None:
continue
return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://"))
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized_text = normalize_post_text(text)
urls = []
urls.extend(extract_non_x_urls_from_text(text))
urls.extend(extract_urls_from_facets(record))
canonical_non_x_urls = set()
for url in urls:
if not is_x_or_twitter_domain(url):
canonical = canonicalize_url(url)
if canonical:
canonical_non_x_urls.add(canonical)
media_fingerprint = build_bsky_media_fingerprint(item.post)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
recent_posts.append({
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_non_x_urls": canonical_non_x_urls,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"created_at": getattr(record, "created_at", None),
})
except Exception as e:
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}")
return recent_posts
def get_rate_limit_wait_seconds(error_obj, default_delay):
@@ -642,305 +896,9 @@ def build_external_link_embed(url, client, http_client, fallback_title="Link"):
return None
def prepare_post_text(text):
raw_text = (text or "").strip()
raw_text = repair_broken_urls(raw_text)
raw_text = repair_broken_mentions(raw_text)
raw_text = strip_line_edge_whitespace(raw_text)
raw_text = remove_trailing_ellipsis_line(raw_text)
if len(raw_text) > BSKY_TEXT_MAX_LENGTH:
truncated = raw_text[:BSKY_TEXT_MAX_LENGTH - 3]
last_space = truncated.rfind(" ")
if last_space > 0:
raw_text = truncated[:last_space] + "..."
else:
raw_text = truncated + "..."
return raw_text.strip()
def normalize_post_text(text):
if not text:
return ""
text = repair_broken_urls(text)
text = repair_broken_mentions(text)
text = strip_line_edge_whitespace(text)
text = remove_trailing_ellipsis_line(text)
text = text.replace("\r", "\n")
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
def build_media_fingerprint(tweet):
if not tweet or not tweet.media:
return "no-media"
parts = []
for media in tweet.media:
media_type = getattr(media, "type", "unknown")
media_url = getattr(media, "media_url_https", "") or ""
stable_value = media_url
if media_type == "photo":
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
elif media_type == "video":
stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "")
parts.append(f"{media_type}:{stable_value}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
images = getattr(embed, "images", None)
if images:
for img in images:
image_obj = getattr(img, "image", None)
ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj)
parts.append(f"photo:{ref}")
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
external = getattr(embed, "external", None)
if external:
uri = getattr(external, "uri", None) or str(external)
parts.append(f"external:{uri}")
if not parts:
return "no-media"
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest()
def create_bsky_client(base_url, handle, password):
normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}")
try:
client = Client(base_url=normalized_base_url)
except TypeError:
logging.warning("⚠️ Your atproto Client does not accept base_url in constructor. Falling back.")
client = Client()
try:
if hasattr(client, "base_url"):
client.base_url = normalized_base_url
elif hasattr(client, "_base_url"):
client._base_url = normalized_base_url
except Exception as e:
logging.warning(f"⚠️ Could not apply custom base URL cleanly: {e}")
client.login(handle, password)
return client
def default_state():
return {
"version": 1,
"posted_tweets": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.")
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
logging.warning("⚠️ State file is invalid. Reinitializing.")
return default_state()
state.setdefault("version", 1)
state.setdefault("posted_tweets", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{state_path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state file {state_path}: {e}")
def remember_posted_tweet(state, candidate, bsky_uri=None):
canonical_tweet_url = candidate.get("canonical_tweet_url")
fallback_key = f"textmedia:{candidate['text_media_key']}"
state_key = canonical_tweet_url or fallback_key
record = {
"canonical_tweet_url": canonical_tweet_url,
"normalized_text": candidate["normalized_text"],
"raw_text": candidate["raw_text"],
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]),
"ordered_non_x_urls": candidate.get("ordered_non_x_urls", []),
"bsky_uri": bsky_uri,
"tweet_created_on": candidate["tweet"].created_on,
"tweet_url": candidate["tweet"].tweet_url,
"posted_at": arrow.utcnow().isoformat(),
}
state["posted_tweets"][state_key] = record
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = state_key
def candidate_matches_state(candidate, state):
canonical_tweet_url = candidate["canonical_tweet_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted_tweets = state.get("posted_tweets", {})
if canonical_tweet_url and canonical_tweet_url in posted_tweets:
return True, "state:tweet_url"
for _, record in posted_tweets.items():
if record.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for _, record in posted_tweets.items():
if record.get("normalized_text") == normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted_tweets = state.get("posted_tweets", {})
if len(posted_tweets) <= max_entries:
return state
sortable = []
for key, record in posted_tweets.items():
posted_at = record.get("posted_at") or ""
sortable.append((key, posted_at))
sortable.sort(key=lambda x: x[1], reverse=True)
keep_keys = {key for key, _ in sortable[:max_entries]}
new_posted_tweets = {}
for key, record in posted_tweets.items():
if key in keep_keys:
new_posted_tweets[key] = record
new_posted_by_bsky_uri = {}
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
if key in keep_keys:
new_posted_by_bsky_uri[bsky_uri] = key
state["posted_tweets"] = new_posted_tweets
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
return state
def get_recent_bsky_posts(client, handle, limit=30):
recent_posts = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized_text = normalize_post_text(text)
urls = []
urls.extend(extract_non_x_urls_from_text(text))
urls.extend(extract_urls_from_facets(record))
canonical_non_x_urls = set()
for url in urls:
if not is_x_or_twitter_domain(url):
canonical = canonicalize_url(url)
if canonical:
canonical_non_x_urls.add(canonical)
media_fingerprint = build_bsky_media_fingerprint(item.post)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
recent_posts.append({
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_non_x_urls": canonical_non_x_urls,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"created_at": getattr(record, "created_at", None),
})
except Exception as e:
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}")
return recent_posts
def make_rich(content):
text_builder = client_utils.TextBuilder()
content = repair_broken_urls(content.strip())
content = repair_broken_mentions(content)
content = strip_line_edge_whitespace(content)
content = remove_trailing_ellipsis_line(content)
content = clean_post_text(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
@@ -985,10 +943,7 @@ def make_rich(content):
def build_dynamic_alt(raw_text):
dynamic_alt = repair_broken_urls(raw_text)
dynamic_alt = repair_broken_mentions(dynamic_alt)
dynamic_alt = strip_line_edge_whitespace(dynamic_alt)
dynamic_alt = remove_trailing_ellipsis_line(dynamic_alt)
dynamic_alt = clean_post_text(raw_text)
dynamic_alt = dynamic_alt.replace("\n", " ").strip()
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip()
@@ -1469,33 +1424,38 @@ def sync_feeds(args):
logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
continue
prepared_text = prepare_post_text(tweet.text)
normalized_text = normalize_post_text(prepared_text)
full_clean_text = clean_post_text(tweet.text)
normalized_text = normalize_post_text(full_clean_text)
if not normalized_text:
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}")
continue
ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text)
canonical_non_x_urls = set(ordered_non_x_urls)
primary_non_x_url = ordered_non_x_urls[0] if ordered_non_x_urls else None
raw_text = prepare_post_text_for_bsky(full_clean_text, keep_url=primary_non_x_url)
media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
ordered_non_x_urls = extract_ordered_non_x_urls(prepared_text)
canonical_non_x_urls = set(ordered_non_x_urls)
has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or []))
has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or []))
candidate_tweets.append({
"tweet": tweet,
"tweet_time": tweet_time,
"raw_text": prepared_text,
"raw_text": raw_text,
"full_clean_text": full_clean_text,
"normalized_text": normalized_text,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url),
"canonical_non_x_urls": canonical_non_x_urls,
"ordered_non_x_urls": ordered_non_x_urls,
"looks_like_title_plus_url": looks_like_title_plus_url_post(prepared_text),
"primary_non_x_url": primary_non_x_url,
"looks_like_title_plus_url": looks_like_title_plus_url_post(full_clean_text),
"has_video": has_video,
"has_photo": has_photo,
})
@@ -1550,11 +1510,12 @@ def sync_feeds(args):
tweet = candidate["tweet"]
tweet_time = candidate["tweet_time"]
raw_text = candidate["raw_text"]
full_clean_text = candidate["full_clean_text"]
logging.info(f"📝 Posting missing tweet from {tweet_time} to Bluesky...")
rich_text = make_rich(raw_text)
dynamic_alt = build_dynamic_alt(raw_text)
dynamic_alt = build_dynamic_alt(full_clean_text)
image_embeds = []
video_embed = None
@@ -1563,9 +1524,6 @@ def sync_feeds(args):
has_video = candidate.get("has_video", False)
# --- VIDEO-FIRST POLICY ---
# If the tweet contains video, try video first and do not degrade to photos
# from the same tweet if video processing/upload fails.
if has_video:
video_media = next((m for m in (tweet.media or []) if getattr(m, "type", None) == "video"), None)
@@ -1598,7 +1556,6 @@ def sync_feeds(args):
if os.path.exists(temp_video_path):
os.remove(temp_video_path)
# Important: if tweet had video, do NOT upload photos as fallback.
if not video_embed:
logging.warning(
"⚠️ Tweet contains video, but video could not be posted. "
@@ -1606,7 +1563,6 @@ def sync_feeds(args):
)
else:
# Photo-only tweets can post images normally.
if tweet.media:
for media in tweet.media:
if media.type == "photo":
@@ -1621,19 +1577,15 @@ def sync_feeds(args):
else:
media_upload_failures.append(f"photo:{media.media_url_https}")
# If nothing media-based is available, optionally degrade to external card / text-only
if not video_embed and not image_embeds:
candidate_url = None
if candidate.get("ordered_non_x_urls"):
candidate_url = candidate["ordered_non_x_urls"][0]
candidate_url = candidate.get("primary_non_x_url")
if candidate_url:
if candidate.get("looks_like_title_plus_url"):
logging.info(f"🔗 Detected title+URL post style. Using URL for external card: {candidate_url}")
else:
logging.info(f"🔗 Text-only post with non-X URL. Using first URL for external card: {candidate_url}")
logging.info(f"🔗 Using first non-X URL for external card: {candidate_url}")
if candidate_url:
external_embed = build_external_link_embed(
candidate_url,
bsky_client,