瀏覽代碼

Add proper worker/job/jobqueue abstractions

jherve 1 年之前
父節點
當前提交
f64bfbdb78
共有 5 個文件被更改,包括 231 次插入94 次删除
  1. 1 1
      pyproject.toml
  2. 0 14
      requirements-dev.lock
  3. 0 14
      requirements.lock
  4. 155 65
      src/de_quoi_parle_le_monde/snapshots.py
  5. 75 0
      src/de_quoi_parle_le_monde/worker.py

+ 1 - 1
pyproject.toml

@@ -28,7 +28,7 @@ dependencies = [
     "packaging>=24.0",
 ]
 readme = "README.md"
-requires-python = ">= 3.10"
+requires-python = ">= 3.11"
 
 [project.optional-dependencies]
 embeddings = [

+ 0 - 14
requirements-dev.lock

@@ -35,9 +35,6 @@ anyio==4.3.0
     # via httpx
     # via starlette
     # via watchfiles
-async-timeout==4.0.3
-    # via aiohttp
-    # via redis
 attrs==23.2.0
     # via aiohttp
     # via aiohttp-client-cache
@@ -71,10 +68,6 @@ dynaconf==3.2.5
     # via de-quoi-parle-le-monde
 email-validator==2.1.1
     # via fastapi
-exceptiongroup==1.2.1
-    # via anyio
-    # via cattrs
-    # via taskgroup
 faiss-cpu==1.8.0
     # via de-quoi-parle-le-monde
 fastapi==0.111.0
@@ -188,21 +181,14 @@ soupsieve==2.5
     # via beautifulsoup4
 starlette==0.37.2
     # via fastapi
-taskgroup==0.0.0a4
-    # via hypercorn
-tomli==2.0.1
-    # via hypercorn
 typer==0.12.3
     # via fastapi-cli
 typing-extensions==4.11.0
     # via aiosqlite
-    # via anyio
-    # via cattrs
     # via fastapi
     # via pydantic
     # via pydantic-core
     # via typer
-    # via uvicorn
 ujson==5.9.0
     # via fastapi
 url-normalize==1.4.3

+ 0 - 14
requirements.lock

@@ -35,9 +35,6 @@ anyio==4.3.0
     # via httpx
     # via starlette
     # via watchfiles
-async-timeout==4.0.3
-    # via aiohttp
-    # via redis
 attrs==23.2.0
     # via aiohttp
     # via aiohttp-client-cache
@@ -71,10 +68,6 @@ dynaconf==3.2.5
     # via de-quoi-parle-le-monde
 email-validator==2.1.1
     # via fastapi
-exceptiongroup==1.2.1
-    # via anyio
-    # via cattrs
-    # via taskgroup
 faiss-cpu==1.8.0
     # via de-quoi-parle-le-monde
 fastapi==0.111.0
@@ -188,21 +181,14 @@ soupsieve==2.5
     # via beautifulsoup4
 starlette==0.37.2
     # via fastapi
-taskgroup==0.0.0a4
-    # via hypercorn
-tomli==2.0.1
-    # via hypercorn
 typer==0.12.3
     # via fastapi-cli
 typing-extensions==4.11.0
     # via aiosqlite
-    # via anyio
-    # via cattrs
     # via fastapi
     # via pydantic
     # via pydantic-core
     # via typer
-    # via uvicorn
 ujson==5.9.0
     # via fastapi
 url-normalize==1.4.3

+ 155 - 65
src/de_quoi_parle_le_monde/snapshots.py

@@ -8,13 +8,16 @@ from attrs import frozen
 from loguru import logger
 
 
-from de_quoi_parle_le_monde.article import ArchiveCollection
+from de_quoi_parle_le_monde.article import ArchiveCollection, MainPage
 from de_quoi_parle_le_monde.internet_archive import (
     InternetArchiveClient,
+    InternetArchiveSnapshot,
+    InternetArchiveSnapshotId,
     SnapshotNotYetAvailable,
 )
 from de_quoi_parle_le_monde.medias import media_collection
 from de_quoi_parle_le_monde.storage import Storage
+from de_quoi_parle_le_monde.worker import Job, Worker, JobQueue
 from config import settings
 
 idx = 0
@@ -27,8 +30,7 @@ def unique_id():
 
 
 @frozen
-class SnapshotJob:
-    id_: int
+class SnapshotSearchJob(Job):
     collection: ArchiveCollection
     dt: datetime
 
@@ -49,37 +51,127 @@ class SnapshotJob:
             < now
         ]
 
+    async def run(self, ia_client: InternetArchiveClient):
+        return await ia_client.get_snapshot_id_closest_to(self.collection.url, self.dt)
+
+
+@frozen
+class SnapshotFetchJob(Job):
+    snap_id: InternetArchiveSnapshotId
+    collection: ArchiveCollection
+    dt: datetime
+
+    async def run(self, ia_client: InternetArchiveClient):
+        return await ia_client.fetch(self.snap_id)
+
+
+@frozen
+class SnapshotParseJob(Job):
+    collection: ArchiveCollection
+    snapshot: InternetArchiveSnapshot
+    dt: datetime
+
+    async def run(self):
+        return await self.collection.MainPageClass.from_snapshot(self.snapshot)
+
+
+@frozen
+class SnapshotStoreJob(Job):
+    page: MainPage
+    collection: ArchiveCollection
+    dt: datetime
+
+    async def run(self, storage: Storage):
+        page, collection, dt = self.page, self.collection, self.dt
+        site_id = await storage.add_site(collection.name, collection.url)
+        snapshot_id = await storage.add_snapshot(site_id, page.snapshot.id, dt)
+
+        article_id = await storage.add_featured_article(
+            page.main_article.article.original
+        )
+        main_article_snap_id = await storage.add_featured_article_snapshot(
+            article_id, page.main_article.article
+        )
+        await storage.add_main_article(snapshot_id, main_article_snap_id)
+
+        for t in page.top_articles:
+            article_id = await storage.add_featured_article(t.article.original)
+            top_article_snap_id = await storage.add_featured_article_snapshot(
+                article_id, t.article
+            )
+            await storage.add_top_article(snapshot_id, top_article_snap_id, t)
+
+        return snapshot_id
+
 
 @frozen
-class SnapshotWorker:
+class SearchWorker(Worker):
     storage: Storage
     ia_client: InternetArchiveClient
+    type_ = SnapshotSearchJob
+
+    async def execute(self, job: SnapshotSearchJob):
+        collection = job.collection
+        dt = job.dt
+
+        if await self.storage.exists_snapshot(collection.name, dt):
+            return None, []
+
+        self._log(
+            "DEBUG", job, f"Start handling snap for collection {collection.name} @ {dt}"
+        )
 
-    async def find(self, collection, dt):
         try:
-            return await self.ia_client.get_snapshot_id_closest_to(collection.url, dt)
+            id_closest = await job.run(self.ia_client)
+            return id_closest, [
+                SnapshotFetchJob(job.id_, id_closest, job.collection, job.dt)
+            ]
+
         except SnapshotNotYetAvailable as e:
-            logger.warning(f"Snapshot for {collection.name} @ {dt} not yet available")
+            self._log(
+                "WARNING",
+                job,
+                f"Snapshot for {collection.name} @ {dt} not yet available",
+            )
             raise e
+
         except Exception as e:
-            logger.error(
-                f"Error while trying to find snapshot for {collection.name} @ {dt}"
+            self._log(
+                "ERROR",
+                job,
+                f"Error while trying to find snapshot for {collection.name} @ {dt}",
             )
             traceback.print_exception(e)
             raise e
 
-    async def fetch(self, snap_id):
+
+@frozen
+class FetchWorker(Worker):
+    ia_client: InternetArchiveClient
+    type_ = SnapshotFetchJob
+
+    async def execute(self, job: SnapshotFetchJob):
         try:
-            return await self.ia_client.fetch(snap_id)
+            closest = await job.run(self.ia_client)
+            return closest, [SnapshotParseJob(job.id_, job.collection, closest, job.dt)]
         except Exception as e:
-            logger.error(f"Error while fetching {snap_id}")
+            self._log("ERROR", job, f"Error while fetching {job.snap_id}")
             traceback.print_exception(e)
             raise e
 
-    async def parse(self, collection, snapshot):
+
+@frozen
+class ParseWorker(Worker):
+    type_ = SnapshotParseJob
+
+    async def execute(self, job: SnapshotParseJob):
         try:
-            return await collection.MainPageClass.from_snapshot(snapshot)
+            main_page = await job.run()
+            return main_page, [
+                SnapshotStoreJob(job.id_, main_page, job.collection, job.dt)
+            ]
         except Exception as e:
+            snapshot = job.snapshot
             tmpdir_prefix = urllib.parse.quote_plus(
                 f"le_monde_{snapshot.id.original}_{snapshot.id.timestamp}"
             )
@@ -92,74 +184,72 @@ class SnapshotWorker:
             with open(tmpdir / "url.txt", "w") as f:
                 f.write(snapshot.id.url)
 
-            logger.error(
-                f"Error while parsing snapshot from {snapshot.id.url}, details were written in directory {tmpdir}"
+            self._log(
+                "ERROR",
+                job,
+                f"Error while parsing snapshot from {snapshot.id.url}, details were written in directory {tmpdir}",
             )
-
             raise e
 
-    async def store(self, page, collection, dt):
-        try:
-            site_id = await self.storage.add_site(collection.name, collection.url)
-            snapshot_id = await self.storage.add_snapshot(site_id, page.snapshot.id, dt)
-
-            article_id = await self.storage.add_featured_article(
-                page.main_article.article.original
-            )
-            main_article_snap_id = await self.storage.add_featured_article_snapshot(
-                article_id, page.main_article.article
-            )
-            await self.storage.add_main_article(snapshot_id, main_article_snap_id)
 
-            for t in page.top_articles:
-                article_id = await self.storage.add_featured_article(t.article.original)
-                top_article_snap_id = await self.storage.add_featured_article_snapshot(
-                    article_id, t.article
-                )
-                await self.storage.add_top_article(snapshot_id, top_article_snap_id, t)
+@frozen
+class StoreWorker(Worker):
+    storage: Storage
+    type_ = SnapshotStoreJob
 
+    async def execute(self, job: SnapshotStoreJob):
+        try:
+            return await job.run(self.storage), []
         except Exception as e:
-            logger.error(
-                f"Error while attempting to store {page} from {collection.name} @ {dt}"
+            self._log(
+                "ERROR",
+                job,
+                f"Error while attempting to store {job.page} from {job.collection.name} @ {job.dt}",
             )
             traceback.print_exception(e)
             raise e
 
-    async def run(self, job: SnapshotJob):
-        collection = job.collection
-        dt = job.dt
-
-        if await self.storage.exists_snapshot(collection.name, dt):
-            # The snapshot is already stored, skipping
-            return
-
-        try:
-            logger.debug(
-                f"[{job.id_: <3}] Start handling snap for collection {collection.name} @ {dt}"
-            )
-            id_closest = await self.find(collection, dt)
-            logger.debug(f"[{job.id_: <3}] Found snapshot id")
-            closest = await self.ia_client.fetch(id_closest)
-            logger.debug(f"[{job.id_: <3}] Fetched snapshot")
-            main_page = await self.parse(collection, closest)
-            logger.debug(f"[{job.id_: <3}] Parsed snapshot")
-            await self.store(main_page, collection, dt)
-            logger.info(
-                f"[{job.id_: <3}] Snap for collection {collection.name} @ {dt} is stored"
-            )
-        except Exception:
-            return
-
 
 async def main():
     storage = await Storage.create()
 
+    queue = JobQueue(
+        [
+            SnapshotSearchJob,
+            SnapshotFetchJob,
+            SnapshotParseJob,
+            SnapshotStoreJob,
+        ]
+    )
+
     logger.info("Starting snapshot service..")
-    jobs = SnapshotJob.create(settings.snapshots.days_in_past, settings.snapshots.hours)
+    jobs = SnapshotSearchJob.create(
+        settings.snapshots.days_in_past, settings.snapshots.hours
+    )
+
+    for j in jobs:
+        queue.put_nowait(j)
 
     async with InternetArchiveClient.create() as ia:
-        worker = SnapshotWorker(storage, ia)
-        await asyncio.gather(*[worker.run(job) for job in jobs])
+        workers = {
+            SearchWorker(queue, storage, ia): 3,
+            FetchWorker(queue, ia): 3,
+            ParseWorker(queue): 3,
+            StoreWorker(queue, storage): 3,
+        }
+
+        async with asyncio.TaskGroup() as tg:
+            tasks = []
+            for w, nb in workers.items():
+                for _ in range(nb):
+                    tasks.append(tg.create_task(w.loop()))
+
+            # Wait until the queue is fully processed.
+            await queue.join()
+
+            for t in tasks:
+                t.cancel()
+
     logger.info("Snapshot service exiting")
 
 

+ 75 - 0
src/de_quoi_parle_le_monde/worker.py

@@ -0,0 +1,75 @@
+import asyncio
+from attrs import frozen
+from loguru import logger
+from abc import ABC, abstractmethod
+from typing import Any, ClassVar
+
+
+@frozen
+class Job(ABC):
+    id_: int
+
+    @abstractmethod
+    async def run(self, *args): ...
+
+
+class JobQueue:
+    def __init__(self, job_types) -> None:
+        self.job_types = job_types
+        self._finished = asyncio.locks.Event()
+        self._pending_tasks = 0
+        self.queues = {kls: asyncio.Queue() for kls in self.job_types}
+
+    async def get(self, job_kls):
+        return await self.queues[job_kls].get()
+
+    def task_done(self, job_kls):
+        self.queues[job_kls].task_done()
+
+        self._pending_tasks -= 1
+        if self._pending_tasks == 0:
+            self._finished.set()
+
+    def put_nowait(self, job):
+        self._pending_tasks += 1
+        self._finished.clear()
+        return self.queues[type(job)].put_nowait(job)
+
+    async def join(self):
+        if self._pending_tasks > 0:
+            await self._finished.wait()
+
+    def qsize(self):
+        return {j: self.queues[j].qsize() for j in self.job_types}
+
+
+@frozen
+class Worker(ABC):
+    queue: JobQueue
+    type_: ClassVar[type]
+
+    @abstractmethod
+    async def execute(self, job: Job) -> tuple[Any, list[Job]]: ...
+
+    async def loop(self):
+        while True:
+            # Get a "work item" out of the queue.
+            job = await self.queue.get(self.type_)
+
+            assert isinstance(job, self.type_)
+
+            try:
+                res, further_jobs = await self.execute(job)
+
+                if res is not None:
+                    self._log("DEBUG", job, f"Completed job {job.__class__.__name__}")
+
+                for j in further_jobs:
+                    self.queue.put_nowait(j)
+            except Exception:
+                ...
+
+            self.queue.task_done(self.type_)
+
+    def _log(self, level: str, job: Job, msg: str):
+        logger.log(level, f"[{job.id_: <3}] {msg}")