feat: Add opt-in per-domain request throttling for HTTP 429 backoff#1762
feat: Add opt-in per-domain request throttling for HTTP 429 backoff#1762MrAliHasan wants to merge 14 commits intoapify:masterfrom
Conversation
Add a new RequestThrottler component that handles HTTP 429 (Too Many Requests) responses on a per-domain basis, preventing the autoscaling death spiral where 429s cause concurrency to increase. Key features: - Per-domain tracking: rate limiting on domain A doesn't affect domain B - Exponential backoff: 2s -> 4s -> 8s -> ... capped at 60s - Retry-After header support (both seconds and HTTP-date formats) - Throttled requests are reclaimed to the queue, not dropped - Backoff resets on successful requests to that domain The AutoscaledPool is completely untouched - throttling happens transparently in BasicCrawler.__run_task_function before processing. Integration points: - BasicCrawler: throttle check, 429 recording, success reset - AbstractHttpCrawler: passes URL + Retry-After to detection - PlaywrightCrawler: passes URL + Retry-After to detection Closes apify#1437
|
Hi @MrAliHasan, thanks for your contribution! We'll try to review this soon. |
There was a problem hiding this comment.
As mentioned in #1762 (comment), the approach of reclaiming throttled requests is not optimal.
On top of that, the solution to #1437 should probably also be extensible enough to also cover #1396 without much tweaking.
I believe that such solution could be implemented in crawlee-python quite easily. See similar issue for crawlee-js. The Python version already supports multiple "unnamed queues" via RequestQueue.open(alias="..."), so you'd only need to implement a ThrottlingRequestManager (implementation of the RequestManager interface) that would keep track of the per-domain queues and their delays.
Do you want to try it?
|
Thanks for the detailed review. That makes sense regarding the busy-wait behavior and queue writes. |
Move per-domain throttling from execution layer (BasicCrawler.__run_task_function) to scheduling layer (ThrottlingRequestManager.fetch_next_request). - ThrottlingRequestManager wraps RequestQueue, implements RequestManager interface - fetch_next_request() buffers throttled requests and asyncio.sleep()s when all domains are throttled — eliminates busy-wait and unnecessary queue writes - Unified delay mechanism supports both HTTP 429 backoff and robots.txt crawl-delay (apify#1396) - parse_retry_after_header moved to crawlee._utils.http - 23 new tests covering throttling, scheduling, delegation, and crawl-delay Addresses apify#1437, apify#1396
…queues and update its integration across crawlers.
|
Heads up @janbuchar @vdusek @Mantisus: I've pushed a significant refactor based on the latest feedback. Sub-queues over memory buffer: ThrottlingRequestManager now delegates to persistent per-domain sub-queues via RequestQueue.open(alias=f"throttled-{domain}") instead of keeping throttled requests in memory. Test Structure: Completely rewrote test_throttling_request_manager.py to drop the Test... classes and conform to Crawlee's standard test structure. BasicCrawler fixes: Addressed all inline nits (used isinstance(), renamed url to request_url in _raise_for_session_blocked_status_code, updated docstrings/comments). The tests track the routing origin and safely aggregate get_handled_count and is_empty metrics across the main queue and sub-queues. All 24 tests pass, and Ruff and Pytest issues have been resolved. Let me know if the updated delegation architecture feels right! |
|
Update: I just pushed a small follow-up commit fixing the MyPy typing and Ruff linting errors in the test suite that were causing the CI to fail. All local checks for ThrottlingRequestManager are now passing 100%. Ready for review whenever you have time! |
Pijukatel
left a comment
There was a problem hiding this comment.
Hello, thanks for the work on this PR! I have just some annoying edgecase to think about. I am not sure myself what the best way is to deal with them.
| async def _get_or_create_sub_queue(self, domain: str) -> RequestQueue: | ||
| """Get or create a per-domain sub-queue.""" | ||
| if domain not in self._sub_queues: | ||
| self._sub_queues[domain] = await RequestQueue.open(alias=f'throttled-{domain}') |
There was a problem hiding this comment.
We should think this through. Calling self._sub_queues[domain]=await RequestQueue.open(alias=f'throttled-{domain}') will use the global service_locator to get the configuration and storage client that will be used to create this RQ. On the other hand, inner was created from crawler specific service-locator:
inner = await RequestQueue.open(
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)
So this could lead to unexpected behavior when these two service locators are not the same and thus they could use different configuration and different storage_client.
What are other options?
Hard-code MemoryStorageClient
Hard-code MemoryStorageClient for all self._sub_queues[domain] and use crawler-specific configuration. This would probably work for the majority of scenarios. It would be fast and cheap. But what about a heavily throttled crawler? Imagine a crawler that is crawling only one site, has massive RQ, and that one site has crawl_delay 60s. In such a scenario, the massive RQ would just be loaded to one of the self._sub_queues until the memory limit of the crawler is used. Would it then stay in a deadlock due to memory load?
Use crawler-specific service_locator as init argument to ThrottlingRequestManager and create all self._sub_queues like this
self._sub_queues[domain]=await RequestQueue.open(
alias=f'throttled-{domain}',
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)
This seems to me like a safe choice, but it would probably not be the cheapest and fastest. For example, on the Apify platform, it would use Apify-based RQ, which is more expensive and slower than an in-memory one.
Use in-memory as default, but allow custom service_locator?
I would probably prefer this approach. By default, no service locator would be passed to the ThrottlingRequestManager and it would use MemoryStorageClient and for special usecase you could use:
BasicCrawler(request_manager=ThrottlingRequestManager(service_loactor=custom_service_locator))
Use in-memory as default, but limit the max size of the in-memory self._sub_queues?
| async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: | ||
| origin = self._dispatched_origins.get(request.unique_key) | ||
| if origin and origin != 'inner' and origin in self._sub_queues: | ||
| return await self._sub_queues[origin].mark_request_as_handled(request) |
There was a problem hiding this comment.
Ideally, I think we do not want to just mark the request in self._sub_queues as handled, but completely delete it from the subqueue. There is no point in tracking such a request in a subqueue, as it can consume memory in some implementations.
It is enough that it is marked as handled in the inner. That one already handles all the deduplication needed and that is the only place where we need to store handled requests.
But RQ does not define a delete method so not sure what to do with this...
| if not self._is_domain_throttled(domain): | ||
| req = await sq.fetch_next_request() | ||
| if req: | ||
| self._mark_domain_dispatched(req.url) | ||
| self._dispatched_origins[req.unique_key] = domain | ||
| return req |
There was a problem hiding this comment.
Imagine a scenario where multiple domains are throttled at the same time. But the first of self._sub_queues has a massive amount of requests. The current approach would prefer such subqueu just due to the order in the self._sub_queues.
I think that the mechanism that decides which subqueue should be used to fetch_next_request should be based on the lowest time in _DomainState.throttled_until. So fetch from the longest overdue one.
Which brings me to another point. Do we even need to track _DomainState.last_request_at? I think we should just track _DomainState.throttled_until and update it on ThrottlingRequestManager._mark_domain_dispatched call.
|
Hey @Pijukatel, thanks for looking deeply into this! Great catches all around on the edge cases. I spent some time analyzing these points, and here is how I propose we handle them. Let me know if you are aligned on this direction before I push the changes: 1. Good catch. I'll simplify 2. Sub-Queue storage strategy Regarding the Therefore, I propose your "Option 2" as the default, but with the flexibility of "Option 3":
3. Fetch priority redesign I completely agree. Relying on iteration order rather than the longest-overdue domain is a flaw in the scheduling logic.
4. Deleting Handled Requests Because Does this direction look solid to you? If so, I'll get it coded and pushed up! |
|
...
We discussed it internally, and there are some open points. Please let me think about it over the weekend, so that I don't point you in the wrong direction. |
|
...
It is going in a good direction. Just a few more edgecases we discussed: Having ad hoc request queues being created for domains can lead to two undesired scenarios:
To deal with this, we agreed it would be best to have the Preserve existing behavior and introduce
|
|
Hey @Pijukatel, I've pushed the refactor based on your latest feedback. Here's what changed: Explicit domain routing
Opt-in only
Simplified state
Documentation
All local checks pass (1647 tests, 0 failures). Ready for review! |
Pijukatel
left a comment
There was a problem hiding this comment.
Nice.
Now I have just a few more code-related comments.
…ctor domain state management and sub-queue handling.
|
Hey @Pijukatel, I've pushed all the changes from your latest review. Here's what was addressed: Import cleanup
Dedicated purge method
Docstrings
Tests
All checks pass (1648 tests, 0 failures). Ready for review! |
|
@MrAliHasan good work! Please just fix the errors reported by |
The CI type check (Python 3.10) fails with It seems the newer One possible fix would be to narrow the type explicitly: result = await sq.add_request(request, forefront=forefront)
if result is None:
raise RuntimeError("Unexpected None from add_request()")
return resultHowever, this case should never occur in practice, so I wanted to ask first: Is there a preferred approach for handling this kind of I also noticed that |
|
Hi, @MrAliHasan Thank you for your inspiring work on this PR.
Make sure you are using Regarding |
….14 wheels for brotlicffi.
Thanks for the guidance! I've pinned Regarding #1775, understood. I'll keep the current |
This is wrong. You should have only run |
…one returns and updates the ty dependency constraint.
Thanks for the clarification. I've reverted the hard pin in pyproject.toml and restored the original specifier, then updated the dependency using uv so the resolved version is recorded in uv.lock. I also addressed the remaining CI issues:
Please let me know if anything else should be adjusted. |
|
Only last detail from me: Please update the PR description to match the latest state of the PR, especially that the |
Thanks! I’ve updated the PR description to reflect the latest state and clarified that the |
vdusek
left a comment
There was a problem hiding this comment.
Thanks for the contribution @MrAliHasan, I have a few comments 🙂...
| f'Sleeping {sleep_duration:.1f}s until earliest domain is available.' | ||
| ) | ||
| await asyncio.sleep(sleep_duration) | ||
| return await self.fetch_next_request() |
There was a problem hiding this comment.
Could we please use a while loop rather than recursion (because of smaller overhead)?
| _BASE_DELAY = timedelta(seconds=2) | ||
| """Initial delay after the first 429 response from a domain.""" | ||
|
|
||
| _MAX_DELAY = timedelta(seconds=60) | ||
| """Maximum delay between requests to a rate-limited domain.""" |
There was a problem hiding this comment.
Why not make these options configurable via __init__ with these defaults?
| """ | ||
| await self.drop() | ||
|
|
||
| inner = await RequestQueue.open( |
There was a problem hiding this comment.
It assumes the inner request manager is always a RequestQueue, right? But if the original inner was a RequestManagerTandem or another RequestManager subclass, this recreates it as a RequestQueue.
There was a problem hiding this comment.
Good point. The current implementation assumes the inner manager is a RequestQueue, which covers the expected use case. I've documented this in the docstring. If a more generic approach is needed in the future, it can be extended to preserve the original manager type.
There was a problem hiding this comment.
I think we should resolve this right ahead in this PR. Your opinion @janbuchar?
There was a problem hiding this comment.
Agreed. IMO the most practical way to achieve this would be to require a request_manager_opener callback in the __init__ method. Most often, this would be just RequestQueue.open. You'd need to make the RequestManagerTandem class generic, but that makes a lot of sense anyway.
There was a problem hiding this comment.
@MrAliHasan I'm afraid this is still not resolved correctly
|
|
||
| [[package]] | ||
| name = "ty" | ||
| version = "0.0.17" |
There was a problem hiding this comment.
There is an even newer version of ty on master; I suggest undoing all updates to the lock file.
|
Thanks for the detailed review @vdusek! All points have been addressed:
|
|
I've restored I did notice ty 0.0.21 flags some pre-existing errors in other files (e.g., test_basic_crawler.py, _redis/_utils.py), but none from the throttling manager code. If the type check still fails on CI, then please guide me on how to handle it. |
|
Hi @MrAliHasan, there are still type check issues, could you resolve them? |
Added an explicit None guard in |
There are still typer errors. You know you can run the checks locally, right? 🙂 |
Yes, I do run checks locally! 😄 I run It seems the CI (Linux, ubuntu-latest) resolves the type narrowing differently for Could you suggest the preferred approach here? I see a few options:
|
👍 |
There was a problem hiding this comment.
Could you please undo all changes of the uv.lock file?
| """ | ||
| await self.drop() | ||
|
|
||
| inner = await RequestQueue.open( |
There was a problem hiding this comment.
I think we should resolve this right ahead in this PR. Your opinion @janbuchar?
| if isinstance(self._request_manager, ThrottlingRequestManager): | ||
| crawl_delay = robots_txt_file.get_crawl_delay() | ||
| if crawl_delay is not None: | ||
| self._request_manager.set_crawl_delay(url, crawl_delay) |
There was a problem hiding this comment.
IIUC, this is called for every request, but it's redundant after the first call for a given domain. Could you improve that? (caching or checking if it was already set)
|
@MrAliHasan there are still some unresolved comments, mainly #1762 (comment) - can you please take care of those? |
Yes, I'll work on it tomorrow. |
…ard, crawl-delay caching, revert uv.lock
|
Thanks for the follow-up @vdusek @janbuchar! All comments have been addressed:
|
docs/guides/request_throttling.mdx
Outdated
|
|
||
| ## Overview | ||
|
|
||
| The <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> wraps a <ApiLink to="class/RequestQueue">`RequestQueue`</ApiLink> and manages per-domain throttling. You specify which domains to throttle at initialization, and the manager automatically: |
There was a problem hiding this comment.
I believe it can potentially wrap any RequestManager, no?
There was a problem hiding this comment.
You're right, it can wrap any RequestManager. I'll update the docs to reflect that.
| if purge_request_queue and isinstance(request_manager, RequestQueue): | ||
| await request_manager.drop() | ||
| self._request_manager = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) |
There was a problem hiding this comment.
Even in the state before the change, this was a code smell - shouldn't we add a "purge_on_start_hook"-like abstract method to RequestManager and implement it in RequestQueue? Or should we just call .drop on request manager?
This is aimed mostly at @vdusek and @Pijukatel. We definitely don't need to resolve it in this PR if you guys don't see an obvious way out.
There was a problem hiding this comment.
Understood, happy to leave this for a follow-up.
| # Record successful request to reset rate limit backoff for this domain. | ||
| if isinstance(request_manager, ThrottlingRequestManager): | ||
| request_manager.record_success(request.url) |
There was a problem hiding this comment.
Can't the "record success" part be implemented in ThrottlingRequestManager.mark_request_as_handled instead? It could probably look at request.state, couldn't it?
There was a problem hiding this comment.
Done, moved record_success into ThrottlingRequestManager.mark_request_as_handled and removed the isinstance check from the crawler.
| """ | ||
| await self.drop() | ||
|
|
||
| inner = await RequestQueue.open( |
There was a problem hiding this comment.
@MrAliHasan I'm afraid this is still not resolved correctly
…to mark_request_as_handled, fix docs
|
Added a |
Fixes #1437
Problem
When target websites return HTTP 429 (Too Many Requests), requests get retried without any per-domain delay — potentially making rate limiting worse.
Solution
Introduces the
ThrottlingRequestManager, an opt-in request manager wrapper that enforces per-domain delays at the scheduling layer.Key features:
record_domain_delay()sets a per-domainthrottled_untiltimestamp based onRetry-AfterheadersBasicCrawlerautomatically callsset_crawl_delay()whenrespect_robots_txt_fileis enabled and the request manager is aThrottlingRequestManagerrespect_robots_txt_fileis enabled but the request manager is not aThrottlingRequestManagerfetch_next_request()skips throttled domains, falls back to the inner queue, and sleeps only when all sub-queues are throttled and the inner queue is emptyrecreate_purged()handles queue reconstruction across crawler restartsHow it works
throttled_until),fetch_next_request()skips it and falls back to the inner queuerecord_domain_delay()updates per-domain backoff on HTTP 429 responses, respectingRetry-Afterheadersset_crawl_delay()integrates robots.txt crawl-delay when enabledUsage
Files changed
_throttling_request_manager.pyhttp.pyparse_retry_after_headerutility_basic_crawler.pyrecreate_purged()integration, crawl-delay warning_playwright_crawler.pyRetry-Afterheadertest_throttling_request_manager.pyRequestQueuewithMemoryStorageClientTests
recreate_purged(), and edge casesFuture work
This is a focused first step toward a more complete
RequestAnalyzerthat may include: