From f4fb1a9af1ab8f543575a94500118be2f27a0f29 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Fri, 10 Apr 2026 19:51:20 +0000 Subject: [PATCH] Fixes for 429 - persist post and thumbnail cooldowns to shared JSON state - check global cooldown before login and before posting - stop parallel workers from repeatedly hitting createRecord after first 429 - add clear post start/success/failure/global-cooldown logs - reduce traceback noise for expected rate-limit failures --- rss2bsky.py | 310 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 213 insertions(+), 97 deletions(-) diff --git a/rss2bsky.py b/rss2bsky.py index 0cf6356..5b07a61 100644 --- a/rss2bsky.py +++ b/rss2bsky.py @@ -23,8 +23,11 @@ except ImportError: Image = None PIL_AVAILABLE = False + # --- Configuration --- STATE_PATH = "rss2bsky_state.json" +COOLDOWN_STATE_PATH = "rss2bsky_cooldowns.json" + DEDUPE_BSKY_LIMIT = 30 BSKY_TEXT_MAX_LENGTH = 275 @@ -41,11 +44,9 @@ BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 HTTP_TIMEOUT = 20 POST_RETRY_DELAY_SECONDS = 2 -# Thumbnail upload cooldown state -THUMB_UPLOAD_COOLDOWN_UNTIL = 0 +DEFAULT_POST_COOLDOWN_SECONDS = 3600 +DEFAULT_THUMB_COOLDOWN_SECONDS = 3600 -# Post creation cooldown state -POST_CREATION_COOLDOWN_UNTIL = 0 # --- Logging --- logging.basicConfig( @@ -58,6 +59,92 @@ if not PIL_AVAILABLE: logging.warning("Pillow is not installed. External card thumbnail compression is disabled.") +# --- Cooldown persistence --- +def default_cooldown_state(): + return { + "version": 1, + "post_creation_cooldown_until": 0, + "thumb_upload_cooldown_until": 0, + "updated_at": None, + } + + +def load_cooldown_state(path=COOLDOWN_STATE_PATH): + if not os.path.exists(path): + return default_cooldown_state() + + try: + with open(path, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + return default_cooldown_state() + + state.setdefault("version", 1) + state.setdefault("post_creation_cooldown_until", 0) + state.setdefault("thumb_upload_cooldown_until", 0) + state.setdefault("updated_at", None) + return state + except Exception as e: + logging.warning(f"Could not load cooldown state {path}: {e}") + return default_cooldown_state() + + +def save_cooldown_state(state, path=COOLDOWN_STATE_PATH): + try: + state["updated_at"] = arrow.utcnow().isoformat() + temp_path = f"{path}.tmp" + + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) + + os.replace(temp_path, path) + except Exception as e: + logging.warning(f"Could not save cooldown state {path}: {e}") + + +def get_global_post_cooldown_until(): + state = load_cooldown_state() + return int(state.get("post_creation_cooldown_until", 0) or 0) + + +def get_global_thumb_cooldown_until(): + state = load_cooldown_state() + return int(state.get("thumb_upload_cooldown_until", 0) or 0) + + +def is_global_post_cooldown_active(): + return int(time.time()) < get_global_post_cooldown_until() + + +def is_global_thumb_cooldown_active(): + return int(time.time()) < get_global_thumb_cooldown_until() + + +def set_global_post_cooldown_until(reset_ts): + state = load_cooldown_state() + current = int(state.get("post_creation_cooldown_until", 0) or 0) + + if reset_ts > current: + state["post_creation_cooldown_until"] = int(reset_ts) + save_cooldown_state(state) + + final_ts = int(state.get("post_creation_cooldown_until", 0) or 0) + return final_ts + + +def set_global_thumb_cooldown_until(reset_ts): + state = load_cooldown_state() + current = int(state.get("thumb_upload_cooldown_until", 0) or 0) + + if reset_ts > current: + state["thumb_upload_cooldown_until"] = int(reset_ts) + save_cooldown_state(state) + + final_ts = int(state.get("thumb_upload_cooldown_until", 0) or 0) + return final_ts + + # --- Encoding / text helpers --- def fix_encoding(text): try: @@ -110,7 +197,7 @@ def normalize_text(text): def process_title(title): try: if is_html(title): - title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip() + title_text = BeautifulSoup(title, "html.parser").get_text().strip() else: title_text = (title or "").strip() @@ -135,17 +222,6 @@ def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): def build_post_text_variants(title_text, link): - """ - Build text variants from best to worst. - - Preferred: - 1. Full title + blank line + real URL - Fallbacks: - 2. Full title only - - Intentionally avoid: - - truncated title + URL - """ title_text = clean_whitespace(title_text) link = canonicalize_url(link) or link or "" @@ -455,23 +531,7 @@ def make_rich(content): return text_builder -# --- Rate limit helpers --- -def get_rate_limit_wait_seconds(error_obj, default_delay): - try: - headers = getattr(error_obj, "headers", None) - if headers: - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - now_ts = int(time.time()) - reset_ts = int(reset_value) - wait_seconds = max(reset_ts - now_ts + 1, default_delay) - return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) - except Exception: - pass - - return default_delay - - +# --- Error / cooldown helpers --- def get_rate_limit_reset_timestamp(error_obj): try: headers = getattr(error_obj, "headers", None) @@ -481,12 +541,34 @@ def get_rate_limit_reset_timestamp(error_obj): return int(reset_value) except Exception: pass + + try: + response = getattr(error_obj, "response", None) + headers = getattr(response, "headers", None) + if headers: + reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + if reset_value: + return int(reset_value) + except Exception: + pass + + text = repr(error_obj) + match = re.search(r"'ratelimit-reset': '(\d+)'", text) + if match: + return int(match.group(1)) + return None def is_rate_limited_error(error_obj): error_text = str(error_obj) - return "429" in error_text or "RateLimitExceeded" in error_text + repr_text = repr(error_obj) + return ( + "429" in error_text or + "429" in repr_text or + "RateLimitExceeded" in error_text or + "RateLimitExceeded" in repr_text + ) def is_transient_blob_error(error_obj): @@ -505,53 +587,52 @@ def is_transient_blob_error(error_obj): return any(signal in error_text for signal in transient_signals) -# --- Post cooldown helpers --- +def is_timeout_error(error_obj): + text = repr(error_obj) + return any(signal in text for signal in [ + "InvokeTimeoutError", + "ReadTimeout", + "WriteTimeout", + "TimeoutException", + ]) + + def activate_post_creation_cooldown_from_error(error_obj): - global POST_CREATION_COOLDOWN_UNTIL - reset_ts = get_rate_limit_reset_timestamp(error_obj) - if reset_ts: - if reset_ts > POST_CREATION_COOLDOWN_UNTIL: - POST_CREATION_COOLDOWN_UNTIL = reset_ts - logging.error( - f"=== BSKY POST STOPPED: RATE LIMITED === Posting disabled until " - f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_ts))}" - ) - else: - fallback_reset = int(time.time()) + 3600 - if fallback_reset > POST_CREATION_COOLDOWN_UNTIL: - POST_CREATION_COOLDOWN_UNTIL = fallback_reset - logging.error("=== BSKY POST STOPPED: RATE LIMITED === Posting disabled for 1 hour.") + if not reset_ts: + reset_ts = int(time.time()) + DEFAULT_POST_COOLDOWN_SECONDS + + final_ts = set_global_post_cooldown_until(reset_ts) + logging.error( + f"=== BSKY POST STOPPED: RATE LIMITED === Posting disabled until " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}" + ) + return final_ts -def is_post_creation_cooldown_active(): - return int(time.time()) < POST_CREATION_COOLDOWN_UNTIL - - -# --- Thumbnail cooldown helpers --- def activate_thumb_upload_cooldown_from_error(error_obj): - global THUMB_UPLOAD_COOLDOWN_UNTIL - reset_ts = get_rate_limit_reset_timestamp(error_obj) - if reset_ts: - if reset_ts > THUMB_UPLOAD_COOLDOWN_UNTIL: - THUMB_UPLOAD_COOLDOWN_UNTIL = reset_ts - logging.warning( - f"Thumbnail uploads disabled until rate-limit reset at " - f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_ts))}." - ) - else: - fallback_reset = int(time.time()) + 3600 - if fallback_reset > THUMB_UPLOAD_COOLDOWN_UNTIL: - THUMB_UPLOAD_COOLDOWN_UNTIL = fallback_reset - logging.warning("Thumbnail uploads disabled temporarily for 1 hour due to rate limiting.") + if not reset_ts: + reset_ts = int(time.time()) + DEFAULT_THUMB_COOLDOWN_SECONDS - -def is_thumb_upload_cooldown_active(): - return int(time.time()) < THUMB_UPLOAD_COOLDOWN_UNTIL + final_ts = set_global_thumb_cooldown_until(reset_ts) + logging.warning( + f"Thumbnail uploads disabled until " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}." + ) + return final_ts # --- Blob / image upload helpers --- +def get_rate_limit_wait_seconds(error_obj, default_delay): + reset_ts = get_rate_limit_reset_timestamp(error_obj) + if reset_ts: + now_ts = int(time.time()) + wait_seconds = max(reset_ts - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) + return default_delay + + def upload_blob_with_retry(client, binary_data, media_label="media", optional=False, cooldown_on_rate_limit=False): last_exception = None transient_attempts = 0 @@ -563,18 +644,11 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa except Exception as e: last_exception = e - is_rate_limited = is_rate_limited_error(e) - if is_rate_limited: + if is_rate_limited_error(e): if cooldown_on_rate_limit: activate_thumb_upload_cooldown_from_error(e) - backoff_delay = min( - BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_BLOB_UPLOAD_MAX_DELAY - ) - wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) - if optional and cooldown_on_rate_limit: logging.warning( f"Optional blob upload rate-limited for {media_label}. " @@ -582,6 +656,12 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa ) return None + backoff_delay = min( + BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_BLOB_UPLOAD_MAX_DELAY + ) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning( f"Blob upload rate-limited for {media_label}. " @@ -589,9 +669,9 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa ) time.sleep(wait_seconds) continue - else: - logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}") - break + + logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}") + break if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: transient_attempts += 1 @@ -657,8 +737,8 @@ def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_B def get_external_thumb_blob_from_url(image_url, client, http_client): - if is_thumb_upload_cooldown_active(): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(THUMB_UPLOAD_COOLDOWN_UNTIL)) + if is_global_thumb_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_thumb_cooldown_until())) logging.info(f"Skipping external thumbnail upload due to active cooldown until {reset_str}") return None @@ -813,16 +893,18 @@ def is_probable_length_error(exc): def try_send_post_with_variants(client, text_variants, embed, post_lang): - global POST_CREATION_COOLDOWN_UNTIL - - if is_post_creation_cooldown_active(): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL)) - raise RuntimeError(f"Posting skipped because post creation cooldown is active until {reset_str}") + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") last_exception = None for idx, variant in enumerate(text_variants, start=1): try: + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") + logging.info(f"Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})") rich_text = make_rich(variant) result = client.send_post(text=rich_text, embed=embed, langs=[post_lang]) @@ -836,6 +918,9 @@ def try_send_post_with_variants(client, text_variants, embed, post_lang): activate_post_creation_cooldown_from_error(e) raise + if is_timeout_error(e): + raise + if not is_probable_length_error(e): raise @@ -847,7 +932,7 @@ def try_send_post_with_variants(client, text_variants, embed, post_lang): # --- Main --- def main(): - parser = argparse.ArgumentParser(description="Post RSS to Bluesky with JSON state tracking.") + parser = argparse.ArgumentParser(description="Post RSS to Bluesky with shared cooldown tracking.") parser.add_argument("rss_feed", help="RSS feed URL") parser.add_argument("bsky_handle", help="Bluesky handle") parser.add_argument("bsky_username", help="Bluesky username") @@ -855,8 +940,13 @@ def main(): parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") parser.add_argument("--lang", default="ca", help="Language code for the post") parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file") + parser.add_argument("--cooldown-path", default=COOLDOWN_STATE_PATH, help="Path to shared cooldown JSON state file") args = parser.parse_args() + global STATE_PATH, COOLDOWN_STATE_PATH + STATE_PATH = args.state_path + COOLDOWN_STATE_PATH = args.cooldown_path + feed_url = args.rss_feed bsky_handle = args.bsky_handle bsky_username = args.bsky_username @@ -865,11 +955,21 @@ def main(): post_lang = args.lang state_path = args.state_path + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") + return + client = Client(base_url=service_url) backoff = 60 while True: try: + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") + return + logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}") client.login(bsky_username, bsky_password) logging.info(f"Login successful for user: {bsky_username}") @@ -926,13 +1026,18 @@ def main(): logging.info("â„šī¸ Execution finished: no new entries to publish.") return + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") + return + noves_entrades = 0 with httpx.Client() as http_client: for candidate in entries_to_post: - if is_post_creation_cooldown_active(): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL)) - logging.error(f"=== BSKY POST STOPPED: RATE LIMITED === Skipping remaining entries until {reset_str}") + if is_global_post_cooldown_active(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.error(f"=== BSKY POST STOPPED: GLOBAL COOLDOWN === Skipping remaining entries until {reset_str}") break title_text = candidate["title_text"] @@ -980,13 +1085,24 @@ def main(): time.sleep(POST_RETRY_DELAY_SECONDS) except Exception as e: - logging.exception(f"=== BSKY POST FAILED === {canonical_link or title_text}") - - if is_rate_limited_error(e) or is_post_creation_cooldown_active(): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL)) + if is_rate_limited_error(e): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.error(f"=== BSKY POST FAILED === {canonical_link or title_text}") logging.error(f"=== BSKY POST STOPPED: RATE LIMITED === Ending publish loop until {reset_str}") break + if "global post cooldown is active" in str(e).lower(): + reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until())) + logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === {canonical_link or title_text}") + logging.warning(f"=== BSKY POST STOPPED: GLOBAL COOLDOWN === Ending publish loop until {reset_str}") + break + + if is_timeout_error(e): + logging.error(f"=== BSKY POST FAILED === {canonical_link or title_text} :: timeout") + break + + logging.exception(f"=== BSKY POST FAILED === {canonical_link or title_text}") + if noves_entrades > 0: logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.") else: