فهرست منبع

Add the missing jobs

jherve 1 سال پیش
والد
کامیت
4ff66da97d
1فایلهای تغییر یافته به همراه89 افزوده شده و 3 حذف شده
  1. 89 3
      src/media_observer/test.py

+ 89 - 3
src/media_observer/test.py

@@ -1,7 +1,12 @@
 import asyncio
 from datetime import date, datetime, time, timedelta
+import os
+from pathlib import Path
+import pickle
+import tempfile
 import traceback
 from typing import ClassVar
+import urllib.parse
 from zoneinfo import ZoneInfo
 from loguru import logger
 from attrs import frozen
@@ -10,9 +15,10 @@ from uuid import UUID, uuid1
 from hypercorn.asyncio import serve
 from hypercorn.config import Config
 
-from media_observer.article import ArchiveCollection
+from media_observer.article import ArchiveCollection, FrontPage
 from media_observer.internet_archive import (
     InternetArchiveClient,
+    InternetArchiveSnapshot,
     InternetArchiveSnapshotId,
     SnapshotNotYetAvailable,
 )
@@ -22,6 +28,9 @@ from media_observer.web import app
 from config import settings
 
 
+tmpdir = Path(tempfile.mkdtemp(prefix="media_observer"))
+
+
 @frozen
 class Job(ABC):
     id_: UUID
@@ -130,13 +139,74 @@ class SnapshotFetchJob(Job):
     async def execute(self, ia_client: InternetArchiveClient):
         try:
             closest = await ia_client.fetch(self.snap_id)
-            return closest, [(self.id_, self.collection, closest, self.dt)]
+            return closest, [
+                SnapshotParseJob(self.id_, self.collection, closest, self.dt)
+            ]
         except Exception as e:
             self._log("ERROR", f"Error while fetching {self.snap_id}")
             traceback.print_exception(e)
             raise e
 
 
+@frozen
+class SnapshotParseJob(Job):
+    queue = asyncio.Queue()
+    collection: ArchiveCollection
+    snapshot: InternetArchiveSnapshot
+    dt: datetime
+
+    async def execute(self):
+        try:
+            main_page = await self.collection.FrontPageClass.from_snapshot(
+                self.snapshot
+            )
+            return main_page, [
+                SnapshotStoreJob(self.id_, main_page, self.collection, self.dt)
+            ]
+        except Exception as e:
+            snapshot = self.snapshot
+            sub_dir = (
+                tmpdir
+                / urllib.parse.quote_plus(snapshot.id.original)
+                / urllib.parse.quote_plus(str(snapshot.id.timestamp))
+            )
+            os.makedirs(sub_dir)
+
+            with open(sub_dir / "self.pickle", "wb") as f:
+                pickle.dump(self, f)
+            with open(sub_dir / "snapshot.html", "w") as f:
+                f.write(snapshot.text)
+            with open(sub_dir / "exception.txt", "w") as f:
+                f.writelines(traceback.format_exception(e))
+            with open(sub_dir / "url.txt", "w") as f:
+                f.write(snapshot.id.url)
+
+            self._log(
+                "ERROR",
+                f"Error while parsing snapshot from {snapshot.id.url}, details were written in directory {sub_dir}",
+            )
+            raise e
+
+
+@frozen
+class SnapshotStoreJob(Job):
+    queue = asyncio.Queue()
+    page: FrontPage
+    collection: ArchiveCollection
+    dt: datetime
+
+    async def execute(self, storage: Storage):
+        try:
+            return await storage.add_page(self.collection, self.page, self.dt), []
+        except Exception as e:
+            self._log(
+                "ERROR",
+                f"Error while attempting to store {self.page} from {self.collection.name} @ {self.dt}",
+            )
+            traceback.print_exception(e)
+            raise e
+
+
 @frozen
 class Worker:
     async def loop(self):
@@ -200,6 +270,18 @@ class FetchWorker(QueueWorker):
         return {"ia_client": self.ia_client}
 
 
+@frozen
+class ParseWorker(QueueWorker): ...
+
+
+@frozen
+class StoreWorker(QueueWorker):
+    storage: Storage
+
+    def get_execution_context(self):
+        return {"storage": self.storage}
+
+
 @frozen
 class WebServer(Worker):
     async def run(self):
@@ -228,7 +310,11 @@ async def main():
                 "snapshot": SnapshotWorker(
                     SnapshotSearchJob.queue, SnapshotFetchJob.queue, storage, ia
                 ),
-                "fetch": FetchWorker(SnapshotFetchJob.queue, None, ia),
+                "fetch": FetchWorker(
+                    SnapshotFetchJob.queue, SnapshotParseJob.queue, ia
+                ),
+                "parse": ParseWorker(SnapshotParseJob.queue, SnapshotStoreJob.queue),
+                "store": StoreWorker(SnapshotStoreJob.queue, None, storage),
             }
             web_server = WebServer()
             async with asyncio.TaskGroup() as tg: