Просмотр исходного кода

Use RateLimiter + Semaphore to prevent temporary ban from server

jherve 1 год назад
Родитель
Сommit
c0e0c19064

+ 12 - 1
pdm.lock

@@ -5,7 +5,7 @@
 groups = ["default"]
 strategy = ["cross_platform", "inherit_metadata"]
 lock_version = "4.4.1"
-content_hash = "sha256:a88b3afbb37897d57ba23209bac9dc1c0bb0a8a63c8f942d6c9e31d683fb62ff"
+content_hash = "sha256:9cfacc65dfaf2b246f751386054056e6759a3cd53ddfd55dc3a206704e548cf4"
 
 [[package]]
 name = "aioboto3"
@@ -146,6 +146,17 @@ files = [
     {file = "aioitertools-0.11.0.tar.gz", hash = "sha256:42c68b8dd3a69c2bf7f2233bf7df4bb58b557bca5252ac02ed5187bbc67d6831"},
 ]
 
+[[package]]
+name = "aiolimiter"
+version = "1.1.0"
+requires_python = ">=3.7,<4.0"
+summary = "asyncio rate limiter, a leaky bucket implementation"
+groups = ["default"]
+files = [
+    {file = "aiolimiter-1.1.0-py3-none-any.whl", hash = "sha256:0b4997961fc58b8df40279e739f9cf0d3e255e63e9a44f64df567a8c17241e24"},
+    {file = "aiolimiter-1.1.0.tar.gz", hash = "sha256:461cf02f82a29347340d031626c92853645c099cb5ff85577b831a7bd21132b5"},
+]
+
 [[package]]
 name = "aiosignal"
 version = "1.3.1"

+ 1 - 0
pyproject.toml

@@ -15,6 +15,7 @@ dependencies = [
     "aiohttp>=3.9.3",
     "aiohttp-client-cache[all]>=0.11.0",
     "lxml>=5.1.0",
+    "aiolimiter>=1.1.0",
 ]
 requires-python = "==3.11.*"
 readme = "README.md"

+ 49 - 4
src/de_quoi_parle_le_monde/http.py

@@ -1,8 +1,53 @@
+from attrs import define
 from aiohttp_client_cache import CachedSession, SQLiteBackend
+from aiohttp_client_cache.session import CacheMixin
+from aiohttp.client import ClientSession
+from aiolimiter import AsyncLimiter
+import asyncio
+
+
+class SemaphoreMixin:
+    async def _request(self, *args, **kwargs):
+        await self.sem.acquire()
+        req = await super()._request(*args, **kwargs)
+        self.sem.release()
+        return req
+
+
+class RateLimiterMixin:
+    async def _request(self, *args, **kwargs):
+        await self.limiter.acquire()
+        return await super()._request(*args, **kwargs)
+
+
+@define
+class LimitedCachedSession(CacheMixin, SemaphoreMixin, RateLimiterMixin, ClientSession):
+    sem: asyncio.Semaphore
+    limiter: AsyncLimiter
+    cache: SQLiteBackend
+
+    def __init__(self):
+        self.sem = asyncio.Semaphore(5)
+        self.limiter = AsyncLimiter(2.0, 1.0)
+        super().__init__(cache=SQLiteBackend("http"))
+
+
+class HttpSession:
+    def __init__(self):
+        self.session = LimitedCachedSession()
+
+    async def get(self, url, params=None):
+        async with self.session.get(url, allow_redirects=True, params=params) as resp:
+            return await resp.text()
+
+    async def __aenter__(self):
+        await self.session.__aenter__()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        return await self.session.__aexit__(exc_type, exc, tb)
 
 
 class HttpClient:
-    async def aget(self, url, params=None):
-        async with CachedSession(cache=SQLiteBackend("http")) as session:
-            async with session.get(url, allow_redirects=True, params=params) as resp:
-                return await resp.text()
+    def session(self):
+        return HttpSession()

+ 8 - 5
src/de_quoi_parle_le_monde/internet_archive.py

@@ -2,9 +2,10 @@ from attrs import frozen
 from typing import Optional, ClassVar, NewType
 from datetime import date, datetime, timedelta
 import cattrs
+import asyncio
 from bs4 import BeautifulSoup
 
-from de_quoi_parle_le_monde.http import HttpClient
+from de_quoi_parle_le_monde.http import HttpSession
 
 Timestamp = NewType("Timestamp", datetime)
 datetime_format = "%Y%m%d%H%M%S"
@@ -87,7 +88,7 @@ class InternetArchiveSnapshot:
 @frozen
 class InternetArchiveClient:
     # https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server
-    client: HttpClient
+    session: HttpSession
     search_url: ClassVar[str] = "http://web.archive.org/cdx/search/cdx"
 
     async def search_snapshots(self, req: CdxRequest) -> list[InternetArchiveSnapshot]:
@@ -95,15 +96,17 @@ class InternetArchiveClient:
             record = CdxRecord.parse_line(line)
             return InternetArchiveSnapshot.from_record(record)
 
-        resp = await self.client.aget(self.search_url, req.into_params())
+        resp = await self.session.get(self.search_url, req.into_params())
 
         return [to_snapshot(line) for line in resp.splitlines()]
 
     async def fetch_and_parse_snapshot(
         self, snap: InternetArchiveSnapshot
     ) -> BeautifulSoup:
-        resp = await self.client.aget(snap.url)
-        return BeautifulSoup(resp, "lxml")
+        resp = await self.session.get(snap.url)
+        loop = asyncio.get_event_loop()
+        soup = await loop.run_in_executor(None, BeautifulSoup, resp, "lxml")
+        return soup
 
     async def get_snapshot_closest_to(self, url, dt):
         req = CdxRequest(

+ 8 - 6
src/de_quoi_parle_le_monde/main.py

@@ -9,14 +9,16 @@ from de_quoi_parle_le_monde.le_monde import LeMondeArchive, LeMondeMainPage
 
 async def get_latest_snaps(dts):
     http_client = HttpClient()
-    ia = InternetArchiveClient(http_client)
 
-    async def req_and_parse_first_snap(dt):
-        closest = await ia.get_snapshot_closest_to(LeMondeArchive.url, dt)
-        closest_content = await ia.fetch_and_parse_snapshot(closest)
-        return LeMondeMainPage(closest, closest_content)
+    async with http_client.session() as session:
+        ia = InternetArchiveClient(session)
 
-    return await asyncio.gather(*[req_and_parse_first_snap(d) for d in dts])
+        async def req_and_parse_first_snap(dt):
+            closest = await ia.get_snapshot_closest_to(LeMondeArchive.url, dt)
+            closest_content = await ia.fetch_and_parse_snapshot(closest)
+            return LeMondeMainPage(closest, closest_content)
+
+        return await asyncio.gather(*[req_and_parse_first_snap(d) for d in dts])
 
 
 @frozen