Fixes for 429

- persist post and thumbnail cooldowns to shared JSON state
- check global cooldown before login and before posting
- stop parallel workers from repeatedly hitting createRecord after first 429
- add clear post start/success/failure/global-cooldown logs
- reduce traceback noise for expected rate-limit failures
This commit is contained in:
2026-04-10 19:51:20 +00:00
parent ba742965bd
commit f4fb1a9af1

View File

@@ -23,8 +23,11 @@ except ImportError:
Image = None Image = None
PIL_AVAILABLE = False PIL_AVAILABLE = False
# --- Configuration --- # --- Configuration ---
STATE_PATH = "rss2bsky_state.json" STATE_PATH = "rss2bsky_state.json"
COOLDOWN_STATE_PATH = "rss2bsky_cooldowns.json"
DEDUPE_BSKY_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30
BSKY_TEXT_MAX_LENGTH = 275 BSKY_TEXT_MAX_LENGTH = 275
@@ -41,11 +44,9 @@ BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
HTTP_TIMEOUT = 20 HTTP_TIMEOUT = 20
POST_RETRY_DELAY_SECONDS = 2 POST_RETRY_DELAY_SECONDS = 2
# Thumbnail upload cooldown state DEFAULT_POST_COOLDOWN_SECONDS = 3600
THUMB_UPLOAD_COOLDOWN_UNTIL = 0 DEFAULT_THUMB_COOLDOWN_SECONDS = 3600
# Post creation cooldown state
POST_CREATION_COOLDOWN_UNTIL = 0
# --- Logging --- # --- Logging ---
logging.basicConfig( logging.basicConfig(
@@ -58,6 +59,92 @@ if not PIL_AVAILABLE:
logging.warning("Pillow is not installed. External card thumbnail compression is disabled.") logging.warning("Pillow is not installed. External card thumbnail compression is disabled.")
# --- Cooldown persistence ---
def default_cooldown_state():
return {
"version": 1,
"post_creation_cooldown_until": 0,
"thumb_upload_cooldown_until": 0,
"updated_at": None,
}
def load_cooldown_state(path=COOLDOWN_STATE_PATH):
if not os.path.exists(path):
return default_cooldown_state()
try:
with open(path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
return default_cooldown_state()
state.setdefault("version", 1)
state.setdefault("post_creation_cooldown_until", 0)
state.setdefault("thumb_upload_cooldown_until", 0)
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"Could not load cooldown state {path}: {e}")
return default_cooldown_state()
def save_cooldown_state(state, path=COOLDOWN_STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, path)
except Exception as e:
logging.warning(f"Could not save cooldown state {path}: {e}")
def get_global_post_cooldown_until():
state = load_cooldown_state()
return int(state.get("post_creation_cooldown_until", 0) or 0)
def get_global_thumb_cooldown_until():
state = load_cooldown_state()
return int(state.get("thumb_upload_cooldown_until", 0) or 0)
def is_global_post_cooldown_active():
return int(time.time()) < get_global_post_cooldown_until()
def is_global_thumb_cooldown_active():
return int(time.time()) < get_global_thumb_cooldown_until()
def set_global_post_cooldown_until(reset_ts):
state = load_cooldown_state()
current = int(state.get("post_creation_cooldown_until", 0) or 0)
if reset_ts > current:
state["post_creation_cooldown_until"] = int(reset_ts)
save_cooldown_state(state)
final_ts = int(state.get("post_creation_cooldown_until", 0) or 0)
return final_ts
def set_global_thumb_cooldown_until(reset_ts):
state = load_cooldown_state()
current = int(state.get("thumb_upload_cooldown_until", 0) or 0)
if reset_ts > current:
state["thumb_upload_cooldown_until"] = int(reset_ts)
save_cooldown_state(state)
final_ts = int(state.get("thumb_upload_cooldown_until", 0) or 0)
return final_ts
# --- Encoding / text helpers --- # --- Encoding / text helpers ---
def fix_encoding(text): def fix_encoding(text):
try: try:
@@ -110,7 +197,7 @@ def normalize_text(text):
def process_title(title): def process_title(title):
try: try:
if is_html(title): if is_html(title):
title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip() title_text = BeautifulSoup(title, "html.parser").get_text().strip()
else: else:
title_text = (title or "").strip() title_text = (title or "").strip()
@@ -135,17 +222,6 @@ def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
def build_post_text_variants(title_text, link): def build_post_text_variants(title_text, link):
"""
Build text variants from best to worst.
Preferred:
1. Full title + blank line + real URL
Fallbacks:
2. Full title only
Intentionally avoid:
- truncated title + URL
"""
title_text = clean_whitespace(title_text) title_text = clean_whitespace(title_text)
link = canonicalize_url(link) or link or "" link = canonicalize_url(link) or link or ""
@@ -455,23 +531,7 @@ def make_rich(content):
return text_builder return text_builder
# --- Rate limit helpers --- # --- Error / cooldown helpers ---
def get_rate_limit_wait_seconds(error_obj, default_delay):
try:
headers = getattr(error_obj, "headers", None)
if headers:
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
now_ts = int(time.time())
reset_ts = int(reset_value)
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
except Exception:
pass
return default_delay
def get_rate_limit_reset_timestamp(error_obj): def get_rate_limit_reset_timestamp(error_obj):
try: try:
headers = getattr(error_obj, "headers", None) headers = getattr(error_obj, "headers", None)
@@ -481,12 +541,34 @@ def get_rate_limit_reset_timestamp(error_obj):
return int(reset_value) return int(reset_value)
except Exception: except Exception:
pass pass
try:
response = getattr(error_obj, "response", None)
headers = getattr(response, "headers", None)
if headers:
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
return int(reset_value)
except Exception:
pass
text = repr(error_obj)
match = re.search(r"'ratelimit-reset': '(\d+)'", text)
if match:
return int(match.group(1))
return None return None
def is_rate_limited_error(error_obj): def is_rate_limited_error(error_obj):
error_text = str(error_obj) error_text = str(error_obj)
return "429" in error_text or "RateLimitExceeded" in error_text repr_text = repr(error_obj)
return (
"429" in error_text or
"429" in repr_text or
"RateLimitExceeded" in error_text or
"RateLimitExceeded" in repr_text
)
def is_transient_blob_error(error_obj): def is_transient_blob_error(error_obj):
@@ -505,53 +587,52 @@ def is_transient_blob_error(error_obj):
return any(signal in error_text for signal in transient_signals) return any(signal in error_text for signal in transient_signals)
# --- Post cooldown helpers --- def is_timeout_error(error_obj):
text = repr(error_obj)
return any(signal in text for signal in [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
])
def activate_post_creation_cooldown_from_error(error_obj): def activate_post_creation_cooldown_from_error(error_obj):
global POST_CREATION_COOLDOWN_UNTIL
reset_ts = get_rate_limit_reset_timestamp(error_obj) reset_ts = get_rate_limit_reset_timestamp(error_obj)
if reset_ts: if not reset_ts:
if reset_ts > POST_CREATION_COOLDOWN_UNTIL: reset_ts = int(time.time()) + DEFAULT_POST_COOLDOWN_SECONDS
POST_CREATION_COOLDOWN_UNTIL = reset_ts
logging.error( final_ts = set_global_post_cooldown_until(reset_ts)
f"=== BSKY POST STOPPED: RATE LIMITED === Posting disabled until " logging.error(
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_ts))}" f"=== BSKY POST STOPPED: RATE LIMITED === Posting disabled until "
) f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}"
else: )
fallback_reset = int(time.time()) + 3600 return final_ts
if fallback_reset > POST_CREATION_COOLDOWN_UNTIL:
POST_CREATION_COOLDOWN_UNTIL = fallback_reset
logging.error("=== BSKY POST STOPPED: RATE LIMITED === Posting disabled for 1 hour.")
def is_post_creation_cooldown_active():
return int(time.time()) < POST_CREATION_COOLDOWN_UNTIL
# --- Thumbnail cooldown helpers ---
def activate_thumb_upload_cooldown_from_error(error_obj): def activate_thumb_upload_cooldown_from_error(error_obj):
global THUMB_UPLOAD_COOLDOWN_UNTIL
reset_ts = get_rate_limit_reset_timestamp(error_obj) reset_ts = get_rate_limit_reset_timestamp(error_obj)
if reset_ts: if not reset_ts:
if reset_ts > THUMB_UPLOAD_COOLDOWN_UNTIL: reset_ts = int(time.time()) + DEFAULT_THUMB_COOLDOWN_SECONDS
THUMB_UPLOAD_COOLDOWN_UNTIL = reset_ts
logging.warning(
f"Thumbnail uploads disabled until rate-limit reset at "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_ts))}."
)
else:
fallback_reset = int(time.time()) + 3600
if fallback_reset > THUMB_UPLOAD_COOLDOWN_UNTIL:
THUMB_UPLOAD_COOLDOWN_UNTIL = fallback_reset
logging.warning("Thumbnail uploads disabled temporarily for 1 hour due to rate limiting.")
final_ts = set_global_thumb_cooldown_until(reset_ts)
def is_thumb_upload_cooldown_active(): logging.warning(
return int(time.time()) < THUMB_UPLOAD_COOLDOWN_UNTIL f"Thumbnail uploads disabled until "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}."
)
return final_ts
# --- Blob / image upload helpers --- # --- Blob / image upload helpers ---
def get_rate_limit_wait_seconds(error_obj, default_delay):
reset_ts = get_rate_limit_reset_timestamp(error_obj)
if reset_ts:
now_ts = int(time.time())
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
return default_delay
def upload_blob_with_retry(client, binary_data, media_label="media", optional=False, cooldown_on_rate_limit=False): def upload_blob_with_retry(client, binary_data, media_label="media", optional=False, cooldown_on_rate_limit=False):
last_exception = None last_exception = None
transient_attempts = 0 transient_attempts = 0
@@ -563,18 +644,11 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa
except Exception as e: except Exception as e:
last_exception = e last_exception = e
is_rate_limited = is_rate_limited_error(e)
if is_rate_limited: if is_rate_limited_error(e):
if cooldown_on_rate_limit: if cooldown_on_rate_limit:
activate_thumb_upload_cooldown_from_error(e) activate_thumb_upload_cooldown_from_error(e)
backoff_delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if optional and cooldown_on_rate_limit: if optional and cooldown_on_rate_limit:
logging.warning( logging.warning(
f"Optional blob upload rate-limited for {media_label}. " f"Optional blob upload rate-limited for {media_label}. "
@@ -582,6 +656,12 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa
) )
return None return None
backoff_delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning( logging.warning(
f"Blob upload rate-limited for {media_label}. " f"Blob upload rate-limited for {media_label}. "
@@ -589,9 +669,9 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa
) )
time.sleep(wait_seconds) time.sleep(wait_seconds)
continue continue
else:
logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}") logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}")
break break
if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
transient_attempts += 1 transient_attempts += 1
@@ -657,8 +737,8 @@ def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_B
def get_external_thumb_blob_from_url(image_url, client, http_client): def get_external_thumb_blob_from_url(image_url, client, http_client):
if is_thumb_upload_cooldown_active(): if is_global_thumb_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(THUMB_UPLOAD_COOLDOWN_UNTIL)) reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_thumb_cooldown_until()))
logging.info(f"Skipping external thumbnail upload due to active cooldown until {reset_str}") logging.info(f"Skipping external thumbnail upload due to active cooldown until {reset_str}")
return None return None
@@ -813,16 +893,18 @@ def is_probable_length_error(exc):
def try_send_post_with_variants(client, text_variants, embed, post_lang): def try_send_post_with_variants(client, text_variants, embed, post_lang):
global POST_CREATION_COOLDOWN_UNTIL if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
if is_post_creation_cooldown_active(): raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}")
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL))
raise RuntimeError(f"Posting skipped because post creation cooldown is active until {reset_str}")
last_exception = None last_exception = None
for idx, variant in enumerate(text_variants, start=1): for idx, variant in enumerate(text_variants, start=1):
try: try:
if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}")
logging.info(f"Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})") logging.info(f"Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})")
rich_text = make_rich(variant) rich_text = make_rich(variant)
result = client.send_post(text=rich_text, embed=embed, langs=[post_lang]) result = client.send_post(text=rich_text, embed=embed, langs=[post_lang])
@@ -836,6 +918,9 @@ def try_send_post_with_variants(client, text_variants, embed, post_lang):
activate_post_creation_cooldown_from_error(e) activate_post_creation_cooldown_from_error(e)
raise raise
if is_timeout_error(e):
raise
if not is_probable_length_error(e): if not is_probable_length_error(e):
raise raise
@@ -847,7 +932,7 @@ def try_send_post_with_variants(client, text_variants, embed, post_lang):
# --- Main --- # --- Main ---
def main(): def main():
parser = argparse.ArgumentParser(description="Post RSS to Bluesky with JSON state tracking.") parser = argparse.ArgumentParser(description="Post RSS to Bluesky with shared cooldown tracking.")
parser.add_argument("rss_feed", help="RSS feed URL") parser.add_argument("rss_feed", help="RSS feed URL")
parser.add_argument("bsky_handle", help="Bluesky handle") parser.add_argument("bsky_handle", help="Bluesky handle")
parser.add_argument("bsky_username", help="Bluesky username") parser.add_argument("bsky_username", help="Bluesky username")
@@ -855,8 +940,13 @@ def main():
parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL")
parser.add_argument("--lang", default="ca", help="Language code for the post") parser.add_argument("--lang", default="ca", help="Language code for the post")
parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file") parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file")
parser.add_argument("--cooldown-path", default=COOLDOWN_STATE_PATH, help="Path to shared cooldown JSON state file")
args = parser.parse_args() args = parser.parse_args()
global STATE_PATH, COOLDOWN_STATE_PATH
STATE_PATH = args.state_path
COOLDOWN_STATE_PATH = args.cooldown_path
feed_url = args.rss_feed feed_url = args.rss_feed
bsky_handle = args.bsky_handle bsky_handle = args.bsky_handle
bsky_username = args.bsky_username bsky_username = args.bsky_username
@@ -865,11 +955,21 @@ def main():
post_lang = args.lang post_lang = args.lang
state_path = args.state_path state_path = args.state_path
if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}")
return
client = Client(base_url=service_url) client = Client(base_url=service_url)
backoff = 60 backoff = 60
while True: while True:
try: try:
if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}")
return
logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}") logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}")
client.login(bsky_username, bsky_password) client.login(bsky_username, bsky_password)
logging.info(f"Login successful for user: {bsky_username}") logging.info(f"Login successful for user: {bsky_username}")
@@ -926,13 +1026,18 @@ def main():
logging.info(" Execution finished: no new entries to publish.") logging.info(" Execution finished: no new entries to publish.")
return return
if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}")
return
noves_entrades = 0 noves_entrades = 0
with httpx.Client() as http_client: with httpx.Client() as http_client:
for candidate in entries_to_post: for candidate in entries_to_post:
if is_post_creation_cooldown_active(): if is_global_post_cooldown_active():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL)) reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
logging.error(f"=== BSKY POST STOPPED: RATE LIMITED === Skipping remaining entries until {reset_str}") logging.error(f"=== BSKY POST STOPPED: GLOBAL COOLDOWN === Skipping remaining entries until {reset_str}")
break break
title_text = candidate["title_text"] title_text = candidate["title_text"]
@@ -980,13 +1085,24 @@ def main():
time.sleep(POST_RETRY_DELAY_SECONDS) time.sleep(POST_RETRY_DELAY_SECONDS)
except Exception as e: except Exception as e:
logging.exception(f"=== BSKY POST FAILED === {canonical_link or title_text}") if is_rate_limited_error(e):
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
if is_rate_limited_error(e) or is_post_creation_cooldown_active(): logging.error(f"=== BSKY POST FAILED === {canonical_link or title_text}")
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(POST_CREATION_COOLDOWN_UNTIL))
logging.error(f"=== BSKY POST STOPPED: RATE LIMITED === Ending publish loop until {reset_str}") logging.error(f"=== BSKY POST STOPPED: RATE LIMITED === Ending publish loop until {reset_str}")
break break
if "global post cooldown is active" in str(e).lower():
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until()))
logging.warning(f"=== BSKY POST SKIPPED: GLOBAL COOLDOWN === {canonical_link or title_text}")
logging.warning(f"=== BSKY POST STOPPED: GLOBAL COOLDOWN === Ending publish loop until {reset_str}")
break
if is_timeout_error(e):
logging.error(f"=== BSKY POST FAILED === {canonical_link or title_text} :: timeout")
break
logging.exception(f"=== BSKY POST FAILED === {canonical_link or title_text}")
if noves_entrades > 0: if noves_entrades > 0:
logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.") logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.")
else: else: