fix(sync): preserve exact original tweet text, visible links, and hashtags when post fits Bluesky
This commit is contained in:
@@ -297,8 +297,9 @@ def is_x_or_twitter_domain(url):
|
||||
def extract_urls_from_text(text):
|
||||
if not text:
|
||||
return []
|
||||
|
||||
repaired = repair_broken_urls(text)
|
||||
return re.findall(r"https?://[^\s]+", repaired)
|
||||
return re.findall(r"https?://[^\s#]+", repaired)
|
||||
|
||||
|
||||
def extract_non_x_urls_from_text(text):
|
||||
@@ -326,6 +327,14 @@ def extract_ordered_non_x_urls(text):
|
||||
return ordered
|
||||
|
||||
|
||||
def extract_first_visible_non_x_url(text):
|
||||
for url in extract_non_x_urls_from_text(text or ""):
|
||||
canonical = canonicalize_url(url)
|
||||
if canonical:
|
||||
return canonical
|
||||
return None
|
||||
|
||||
|
||||
def remove_url_from_visible_text(text, url_to_remove):
|
||||
if not text or not url_to_remove:
|
||||
return text
|
||||
@@ -339,9 +348,12 @@ def remove_url_from_visible_text(text, url_to_remove):
|
||||
new_line = line
|
||||
|
||||
for url in line_urls:
|
||||
if canonicalize_url(strip_trailing_url_punctuation(url)) == canonical_target:
|
||||
new_line = new_line.replace(url, "").strip()
|
||||
cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(url))
|
||||
if cleaned_candidate == canonical_target:
|
||||
pattern = re.escape(url)
|
||||
new_line = re.sub(pattern, "", new_line)
|
||||
|
||||
new_line = re.sub(r"[ \t]+", " ", new_line).strip()
|
||||
cleaned_lines.append(new_line)
|
||||
|
||||
result = "\n".join(cleaned_lines)
|
||||
@@ -368,64 +380,6 @@ def looks_like_title_plus_url_post(text):
|
||||
return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://"))
|
||||
|
||||
|
||||
def find_tail_preservation_start(text, primary_non_x_url):
|
||||
if not text or not primary_non_x_url:
|
||||
return None
|
||||
|
||||
url_pos = text.find(primary_non_x_url)
|
||||
if url_pos == -1:
|
||||
return None
|
||||
|
||||
hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:])
|
||||
has_hashtag_after_url = hashtag_match is not None
|
||||
|
||||
candidates = [url_pos]
|
||||
|
||||
# Prefer clause boundaries before the URL.
|
||||
clause_patterns = [
|
||||
r"\.\s+",
|
||||
r":\s+",
|
||||
r";\s+",
|
||||
r"!\s+",
|
||||
r"\?\s+",
|
||||
r",\s+",
|
||||
]
|
||||
|
||||
before = text[:url_pos]
|
||||
for pattern in clause_patterns:
|
||||
for match in re.finditer(pattern, before):
|
||||
candidates.append(match.end())
|
||||
|
||||
# Prefer previous line break if present.
|
||||
last_newline = before.rfind("\n")
|
||||
if last_newline != -1:
|
||||
candidates.append(last_newline + 1)
|
||||
|
||||
# If there are hashtags after the URL, preserve a more generous block before it.
|
||||
if has_hashtag_after_url:
|
||||
generous_start = max(0, url_pos - 120)
|
||||
while generous_start > 0 and text[generous_start] not in {" ", "\n"}:
|
||||
generous_start -= 1
|
||||
candidates.append(generous_start)
|
||||
|
||||
# Choose the closest reasonable boundary before the URL, but not too close.
|
||||
reasonable_candidates = [
|
||||
c for c in candidates
|
||||
if 0 <= c < url_pos and (url_pos - c) <= 180
|
||||
]
|
||||
|
||||
if reasonable_candidates:
|
||||
start = min(reasonable_candidates, key=lambda c: (url_pos - c))
|
||||
# If the nearest boundary is too close, fall back to a slightly earlier one.
|
||||
if url_pos - start < 35:
|
||||
farther = [c for c in reasonable_candidates if url_pos - c >= 35]
|
||||
if farther:
|
||||
start = min(farther, key=lambda c: (url_pos - c))
|
||||
return start
|
||||
|
||||
return url_pos
|
||||
|
||||
|
||||
def looks_like_url_and_tag_tail(text, primary_non_x_url=None):
|
||||
if not text or not primary_non_x_url:
|
||||
return False
|
||||
@@ -445,6 +399,59 @@ def looks_like_url_and_tag_tail(text, primary_non_x_url=None):
|
||||
return False
|
||||
|
||||
|
||||
def find_tail_preservation_start(text, primary_non_x_url):
|
||||
if not text or not primary_non_x_url:
|
||||
return None
|
||||
|
||||
url_pos = text.find(primary_non_x_url)
|
||||
if url_pos == -1:
|
||||
return None
|
||||
|
||||
hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:])
|
||||
has_hashtag_after_url = hashtag_match is not None
|
||||
|
||||
candidates = [url_pos]
|
||||
|
||||
clause_patterns = [
|
||||
r"\.\s+",
|
||||
r":\s+",
|
||||
r";\s+",
|
||||
r"!\s+",
|
||||
r"\?\s+",
|
||||
r",\s+",
|
||||
]
|
||||
|
||||
before = text[:url_pos]
|
||||
for pattern in clause_patterns:
|
||||
for match in re.finditer(pattern, before):
|
||||
candidates.append(match.end())
|
||||
|
||||
last_newline = before.rfind("\n")
|
||||
if last_newline != -1:
|
||||
candidates.append(last_newline + 1)
|
||||
|
||||
if has_hashtag_after_url:
|
||||
generous_start = max(0, url_pos - 120)
|
||||
while generous_start > 0 and text[generous_start] not in {" ", "\n"}:
|
||||
generous_start -= 1
|
||||
candidates.append(generous_start)
|
||||
|
||||
reasonable_candidates = [
|
||||
c for c in candidates
|
||||
if 0 <= c < url_pos and (url_pos - c) <= 180
|
||||
]
|
||||
|
||||
if reasonable_candidates:
|
||||
start = min(reasonable_candidates, key=lambda c: (url_pos - c))
|
||||
if url_pos - start < 35:
|
||||
farther = [c for c in reasonable_candidates if url_pos - c >= 35]
|
||||
if farther:
|
||||
start = min(farther, key=lambda c: (url_pos - c))
|
||||
return start
|
||||
|
||||
return url_pos
|
||||
|
||||
|
||||
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
|
||||
if len(text) <= max_length:
|
||||
return text
|
||||
@@ -469,14 +476,13 @@ def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LEN
|
||||
|
||||
reserve = len(tail) + 4
|
||||
if reserve >= max_length:
|
||||
# Tail too large; keep the tail itself and trim from its front carefully.
|
||||
shortened_tail = tail
|
||||
if len(shortened_tail) > max_length - 3:
|
||||
shortened_tail = shortened_tail[-(max_length - 3):]
|
||||
first_space = shortened_tail.find(" ")
|
||||
if first_space > 0 and first_space < 40:
|
||||
shortened_tail = shortened_tail[first_space + 1:]
|
||||
return "..." + shortened_tail[-(max_length - 3):]
|
||||
shortened_tail = tail[-(max_length - 3):].strip()
|
||||
|
||||
first_space = shortened_tail.find(" ")
|
||||
if 0 <= first_space <= 30:
|
||||
shortened_tail = shortened_tail[first_space + 1:].strip()
|
||||
|
||||
return f"...{shortened_tail}"
|
||||
|
||||
available_prefix = max_length - reserve
|
||||
prefix = text[:tail_start].rstrip()
|
||||
@@ -485,9 +491,12 @@ def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LEN
|
||||
prefix = prefix[:available_prefix].rstrip()
|
||||
last_space = prefix.rfind(" ")
|
||||
if last_space > 20:
|
||||
prefix = prefix[:last_space]
|
||||
prefix = prefix[:last_space].rstrip()
|
||||
|
||||
final_text = f"{prefix}... {tail}".strip()
|
||||
final_text = re.sub(r"[ \t]+", " ", final_text)
|
||||
final_text = re.sub(r"\n{3,}", "\n\n", final_text).strip()
|
||||
|
||||
if len(final_text) <= max_length:
|
||||
return final_text
|
||||
|
||||
@@ -495,11 +504,13 @@ def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LEN
|
||||
|
||||
|
||||
def choose_final_visible_text(full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True):
|
||||
text = (full_clean_text or "").strip()
|
||||
text = clean_post_text(full_clean_text or "")
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# Golden rule: preserve exact original cleaned tweet text if it fits.
|
||||
if len(text) <= BSKY_TEXT_MAX_LENGTH:
|
||||
logging.info("🟢 Original cleaned tweet text fits in Bluesky. Preserving exact text.")
|
||||
return text
|
||||
|
||||
if primary_non_x_url:
|
||||
@@ -517,7 +528,9 @@ def choose_final_visible_text(full_clean_text, primary_non_x_url=None, prefer_fu
|
||||
logging.info("🔗 Keeping full visible text by removing long external URL from body and using external card")
|
||||
return text_without_url
|
||||
|
||||
return truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH)
|
||||
truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH)
|
||||
logging.info("✂️ Falling back to safe truncation for visible Bluesky text")
|
||||
return truncated
|
||||
|
||||
|
||||
def normalize_post_text(text):
|
||||
@@ -1203,21 +1216,31 @@ def make_rich(content):
|
||||
text_builder.text(" ")
|
||||
continue
|
||||
|
||||
if word.startswith("http://") or word.startswith("https://"):
|
||||
if word.startswith("http://"):
|
||||
word = word.replace("http://", "https://", 1)
|
||||
cleaned_word = strip_trailing_url_punctuation(word)
|
||||
|
||||
word = strip_trailing_url_punctuation(word)
|
||||
clean_url_value = clean_url(word)
|
||||
if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"):
|
||||
if cleaned_word.startswith("http://"):
|
||||
cleaned_word = cleaned_word.replace("http://", "https://", 1)
|
||||
|
||||
clean_url_value = clean_url(cleaned_word)
|
||||
|
||||
if clean_url_value and is_valid_url(clean_url_value):
|
||||
text_builder.link(clean_url_value, clean_url_value)
|
||||
trailing = word[len(cleaned_word):]
|
||||
if trailing:
|
||||
text_builder.text(trailing)
|
||||
else:
|
||||
text_builder.text(word)
|
||||
|
||||
elif word.startswith("#"):
|
||||
clean_tag = word[1:].rstrip(".,;:!?)'\"…")
|
||||
text_builder.tag(word, clean_tag)
|
||||
elif cleaned_word.startswith("#") and len(cleaned_word) > 1:
|
||||
clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"…")
|
||||
if clean_tag:
|
||||
text_builder.tag(cleaned_word, clean_tag)
|
||||
trailing = word[len(cleaned_word):]
|
||||
if trailing:
|
||||
text_builder.text(trailing)
|
||||
else:
|
||||
text_builder.text(word)
|
||||
|
||||
else:
|
||||
text_builder.text(word)
|
||||
@@ -1722,7 +1745,10 @@ def sync_feeds(args):
|
||||
|
||||
ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text)
|
||||
canonical_non_x_urls = set(ordered_non_x_urls)
|
||||
primary_non_x_url = ordered_non_x_urls[0] if ordered_non_x_urls else None
|
||||
|
||||
primary_non_x_url = extract_first_visible_non_x_url(full_clean_text)
|
||||
if not primary_non_x_url and ordered_non_x_urls:
|
||||
primary_non_x_url = ordered_non_x_urls[0]
|
||||
|
||||
has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or []))
|
||||
has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or []))
|
||||
|
||||
Reference in New Issue
Block a user