Explorar el Código

Add fetch stage and outbound queue notion

jherve hace 1 año
padre
commit
71a5c08c32
Se han modificado 1 ficheros con 47 adiciones y 3 borrados
  1. 47 3
      src/media_observer/test.py

+ 47 - 3
src/media_observer/test.py

@@ -13,6 +13,7 @@ from hypercorn.config import Config
 from media_observer.article import ArchiveCollection
 from media_observer.internet_archive import (
     InternetArchiveClient,
+    InternetArchiveSnapshotId,
     SnapshotNotYetAvailable,
 )
 from media_observer.storage import Storage
@@ -99,7 +100,9 @@ class SnapshotSearchJob(Job):
                 )
 
             self._log("INFO", f"Got snapshot {id_closest}")
-            return id_closest, [(self.id_, id_closest, self.collection, self.dt)]
+            return id_closest, [
+                SnapshotFetchJob(self.id_, id_closest, self.collection, self.dt)
+            ]
 
         except SnapshotNotYetAvailable as e:
             self._log(
@@ -117,6 +120,23 @@ class SnapshotSearchJob(Job):
             raise e
 
 
+@frozen
+class SnapshotFetchJob(Job):
+    queue = asyncio.Queue()
+    snap_id: InternetArchiveSnapshotId
+    collection: ArchiveCollection
+    dt: datetime
+
+    async def execute(self, ia_client: InternetArchiveClient):
+        try:
+            closest = await ia_client.fetch(self.snap_id)
+            return closest, [(self.id_, self.collection, closest, self.dt)]
+        except Exception as e:
+            self._log("ERROR", f"Error while fetching {self.snap_id}")
+            traceback.print_exception(e)
+            raise e
+
+
 @frozen
 class Worker:
     async def loop(self):
@@ -128,6 +148,7 @@ class Worker:
                 logger.warning(f"Task #{id(self)} cancelled")
                 return
             except Exception as e:
+                traceback.print_exception(e)
                 logger.error(f"Task #{id(self)} failed with #{e}")
 
     async def run(self):
@@ -140,12 +161,22 @@ class Worker:
 @frozen
 class QueueWorker(Worker):
     inbound_queue: asyncio.Queue
+    outbound_queue: asyncio.Queue | None
 
     async def run(self):
         logger.info(f"Task #{id(self)} waiting for job..")
         job: Job = await self.inbound_queue.get()
         assert isinstance(job, Job)
-        await job.execute(**self.get_execution_context())
+
+        ret, further_jobs = await job.execute(**self.get_execution_context())
+        try:
+            for j in further_jobs:
+                await self.outbound_queue.put(j)
+        except AttributeError as e:
+            logger.error(
+                f"Could not push jobs #{further_jobs} because there is no outbound queue"
+            )
+            raise (e)
         self.inbound_queue.task_done()
 
     def get_execution_context(self):
@@ -161,6 +192,14 @@ class SnapshotWorker(QueueWorker):
         return {"storage": self.storage, "ia_client": self.ia_client}
 
 
+@frozen
+class FetchWorker(QueueWorker):
+    ia_client: InternetArchiveClient
+
+    def get_execution_context(self):
+        return {"ia_client": self.ia_client}
+
+
 @frozen
 class WebServer(Worker):
     async def run(self):
@@ -185,7 +224,12 @@ async def main():
     storage = await Storage.create()
     try:
         async with InternetArchiveClient.create() as ia:
-            workers = {"snapshot": SnapshotWorker(SnapshotSearchJob.queue, storage, ia)}
+            workers = {
+                "snapshot": SnapshotWorker(
+                    SnapshotSearchJob.queue, SnapshotFetchJob.queue, storage, ia
+                ),
+                "fetch": FetchWorker(SnapshotFetchJob.queue, None, ia),
+            }
             web_server = WebServer()
             async with asyncio.TaskGroup() as tg:
                 for w in workers.values():