Procházet zdrojové kódy

Split snapshot service into job / worker

jherve před 1 rokem
rodič
revize
b844fd411a

+ 10 - 1
src/de_quoi_parle_le_monde/main.py

@@ -9,6 +9,7 @@ from attrs import frozen
 from de_quoi_parle_le_monde.web import app
 from de_quoi_parle_le_monde.http import HttpClient
 from de_quoi_parle_le_monde.storage import Storage
+from de_quoi_parle_le_monde.workers.snapshot import SnapshotJob, SnapshotWorker
 
 
 @frozen
@@ -19,13 +20,21 @@ class Application:
     web_config: Config
 
     async def run(self):
-        await asyncio.gather(self._run_web_server())
+        await asyncio.gather(self._run_web_server(), self._run_snapshot_worker())
         logger.info("Will quit now..")
 
     async def _run_web_server(self):
         logger.info("Starting web server..")
         await serve(self.web_app, self.web_config)
 
+    async def _run_snapshot_worker(self):
+        logger.info("Starting snapshot service..")
+        jobs = SnapshotJob.create(10, [18])
+
+        async with self.http_client.session() as session:
+            worker = SnapshotWorker.create(self.storage, session)
+            await asyncio.gather(*[worker.run(job) for job in jobs])
+
     @staticmethod
     async def create():
         http_client = HttpClient()

+ 23 - 18
src/de_quoi_parle_le_monde/workers/snapshot.py

@@ -1,10 +1,10 @@
 from datetime import date, datetime, time, timedelta
-import asyncio
 from attrs import frozen
 import traceback
 from loguru import logger
 
-from de_quoi_parle_le_monde.http import HttpClient
+from de_quoi_parle_le_monde.article import ArchiveCollection
+from de_quoi_parle_le_monde.http import HttpSession
 from de_quoi_parle_le_monde.internet_archive import (
     InternetArchiveClient,
     SnapshotNotYetAvailable,
@@ -14,9 +14,14 @@ from de_quoi_parle_le_monde.storage import Storage
 
 
 @frozen
-class SnapshotWorker:
-    storage: Storage
-    ia_client: InternetArchiveClient
+class SnapshotJob:
+    collection: ArchiveCollection
+    dt: datetime
+
+    @classmethod
+    def create(cls, n_days: int, hours: list[int]):
+        dts = cls.last_n_days_at_hours(n_days, hours)
+        return [cls(c, d) for d in dts for c in media_collection.values()]
 
     @staticmethod
     def last_n_days_at_hours(n: int, hours: list[int]) -> list[datetime]:
@@ -26,6 +31,12 @@ class SnapshotWorker:
             for h in hours
         ]
 
+
+@frozen
+class SnapshotWorker:
+    storage: Storage
+    ia_client: InternetArchiveClient
+
     async def find(self, collection, dt):
         try:
             return await self.ia_client.get_snapshot_id_closest_to(collection.url, dt)
@@ -82,7 +93,10 @@ class SnapshotWorker:
             traceback.print_exception(e)
             raise e
 
-    async def handle_snap(self, collection, dt):
+    async def run(self, job: SnapshotJob):
+        collection = job.collection
+        dt = job.dt
+
         try:
             logger.info(f"Start handling snap for collection {collection.name} @ {dt}")
             id_closest = await self.find(collection, dt)
@@ -93,16 +107,7 @@ class SnapshotWorker:
         except Exception as e:
             return
 
-
-async def download_all(
-    http_client: HttpClient, storage: Storage, n_days: int, hours: list[int]
-):
-    dts = SnapshotWorker.last_n_days_at_hours(n_days, hours)
-
-    async with http_client.session() as session:
+    @staticmethod
+    def create(storage: Storage, session: HttpSession):
         ia = InternetArchiveClient(session)
-        worker = SnapshotWorker(storage, ia)
-
-        return await asyncio.gather(
-            *[worker.handle_snap(c, d) for d in dts for c in media_collection.values()]
-        )
+        return SnapshotWorker(storage, ia)