jherve 1 рік тому
батько
коміт
ef7bae0e16
3 змінених файлів з 152 додано та 471 видалено
  1. 13 299
      src/media_observer/__main__.py
  2. 100 121
      src/media_observer/snapshots.py
  3. 39 51
      src/media_observer/worker.py

+ 13 - 299
src/media_observer/__main__.py

@@ -1,314 +1,28 @@
 import asyncio
 import concurrent.futures
 from itertools import islice
-from datetime import date, datetime, time, timedelta
-import os
-from pathlib import Path
-import pickle
-import tempfile
-import traceback
-from typing import Any, ClassVar
-import urllib.parse
-from zoneinfo import ZoneInfo
+from typing import Any
 from loguru import logger
 from attrs import define, field, frozen
-from abc import ABC, abstractmethod
-from uuid import UUID, uuid1
 from hypercorn.asyncio import serve
 from hypercorn.config import Config
 
-from media_observer.article import ArchiveCollection, FrontPage
-from media_observer.internet_archive import (
-    InternetArchiveClient,
-    InternetArchiveSnapshot,
-    InternetArchiveSnapshotId,
-    SnapshotNotYetAvailable,
+from media_observer.worker import Worker
+from media_observer.internet_archive import InternetArchiveClient
+from media_observer.snapshots import (
+    FetchWorker,
+    ParseWorker,
+    SnapshotWorker,
+    SnapshotFetchJob,
+    SnapshotParseJob,
+    SnapshotStoreJob,
+    SnapshotWatchdog,
+    StoreWorker,
+    SnapshotSearchJob,
 )
 from media_observer.similarity_index import SimilaritySearch
 from media_observer.storage import Storage
-from media_observer.medias import media_collection
 from media_observer.web import app
-from config import settings
-
-
-tmpdir = Path(tempfile.mkdtemp(prefix="media_observer"))
-
-
-@frozen
-class Job(ABC):
-    id_: UUID
-    queue: ClassVar[asyncio.Queue]
-
-    @abstractmethod
-    async def execute(self, **kwargs): ...
-
-    def _log(self, level: str, msg: str):
-        logger.log(level, f"[{self.id_}] {msg}")
-
-
-def unique_id():
-    return uuid1()
-
-
-@frozen
-class SnapshotSearchJob(Job):
-    queue = asyncio.Queue()
-    collection: ArchiveCollection
-    dt: datetime
-
-    @classmethod
-    def create(cls, n_days: int, hours: list[int]):
-        return [
-            cls(unique_id(), c, d)
-            for c in media_collection.values()
-            for d in cls.last_n_days_at_hours(n_days, hours, c.tz)
-        ]
-
-    @staticmethod
-    def last_n_days_at_hours(n: int, hours: list[int], tz: ZoneInfo) -> list[datetime]:
-        now = datetime.now(tz)
-
-        return [
-            dt
-            for i in range(0, n)
-            for h in hours
-            if (
-                dt := datetime.combine(
-                    date.today() - timedelta(days=i), time(hour=h), tzinfo=tz
-                )
-            )
-            < now
-        ]
-
-    async def execute(self, *, storage: Storage, ia_client: InternetArchiveClient):
-        collection = self.collection
-        dt = self.dt
-
-        if await storage.exists_frontpage(collection.name, dt):
-            return None, []
-
-        self._log(
-            "DEBUG",
-            f"Start handling snap for collection {collection.name} @ {dt}",
-        )
-
-        try:
-            id_closest = await ia_client.get_snapshot_id_closest_to(
-                self.collection.url, self.dt
-            )
-
-            delta = self.dt - id_closest.timestamp
-            abs_delta = abs(delta)
-            if abs_delta.total_seconds() > 3600:
-                time = "after" if delta > timedelta(0) else "before"
-                self._log(
-                    "WARNING",
-                    f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
-                )
-
-            self._log("INFO", f"Got snapshot {id_closest}")
-            return id_closest, [
-                SnapshotFetchJob(self.id_, id_closest, self.collection, self.dt)
-            ]
-
-        except SnapshotNotYetAvailable as e:
-            self._log(
-                "WARNING",
-                f"Snapshot for {collection.name} @ {dt} not yet available",
-            )
-            raise e
-
-        except Exception as e:
-            self._log(
-                "ERROR",
-                f"Error while trying to find snapshot for {collection.name} @ {dt}",
-            )
-            traceback.print_exception(e)
-            raise e
-
-
-@frozen
-class SnapshotFetchJob(Job):
-    queue = asyncio.Queue()
-    snap_id: InternetArchiveSnapshotId
-    collection: ArchiveCollection
-    dt: datetime
-
-    async def execute(self, ia_client: InternetArchiveClient):
-        try:
-            closest = await ia_client.fetch(self.snap_id)
-            return closest, [
-                SnapshotParseJob(self.id_, self.collection, closest, self.dt)
-            ]
-        except Exception as e:
-            self._log("ERROR", f"Error while fetching {self.snap_id}")
-            traceback.print_exception(e)
-            raise e
-
-
-@frozen
-class SnapshotParseJob(Job):
-    queue = asyncio.Queue()
-    collection: ArchiveCollection
-    snapshot: InternetArchiveSnapshot
-    dt: datetime
-
-    async def execute(self):
-        try:
-            main_page = await self.collection.FrontPageClass.from_snapshot(
-                self.snapshot
-            )
-            return main_page, [
-                SnapshotStoreJob(self.id_, main_page, self.collection, self.dt)
-            ]
-        except Exception as e:
-            snapshot = self.snapshot
-            sub_dir = (
-                tmpdir
-                / urllib.parse.quote_plus(snapshot.id.original)
-                / urllib.parse.quote_plus(str(snapshot.id.timestamp))
-            )
-            os.makedirs(sub_dir)
-
-            with open(sub_dir / "self.pickle", "wb") as f:
-                pickle.dump(self, f)
-            with open(sub_dir / "snapshot.html", "w") as f:
-                f.write(snapshot.text)
-            with open(sub_dir / "exception.txt", "w") as f:
-                f.writelines(traceback.format_exception(e))
-            with open(sub_dir / "url.txt", "w") as f:
-                f.write(snapshot.id.url)
-
-            self._log(
-                "ERROR",
-                f"Error while parsing snapshot from {snapshot.id.url}, details were written in directory {sub_dir}",
-            )
-            raise e
-
-
-@frozen
-class SnapshotStoreJob(Job):
-    queue = asyncio.Queue()
-    page: FrontPage
-    collection: ArchiveCollection
-    dt: datetime
-
-    async def execute(self, storage: Storage):
-        try:
-            return await storage.add_page(self.collection, self.page, self.dt), []
-        except Exception as e:
-            self._log(
-                "ERROR",
-                f"Error while attempting to store {self.page} from {self.collection.name} @ {self.dt}",
-            )
-            traceback.print_exception(e)
-            raise e
-
-
-class Worker(ABC):
-    @abstractmethod
-    async def run(self): ...
-
-    def _log(self, level: str, msg: str):
-        logger.log(level, f"[Worker {self.__class__.__name__}] {msg}")
-
-
-@frozen
-class QueueWorker(Worker):
-    inbound_queue: asyncio.Queue
-    outbound_queue: asyncio.Queue | None
-
-    async def run(self):
-        self._log("INFO", "booting..")
-        while True:
-            try:
-                await self.step()
-            except asyncio.CancelledError:
-                self._log("WARNING", "cancelled")
-                return
-            except Exception as e:
-                self._log("DEBUG", f"failed with {e.__class__.__name__}")
-
-    async def step(self):
-        job: Job = await self.inbound_queue.get()
-        assert isinstance(job, Job)
-
-        ret, further_jobs = await job.execute(**self.get_execution_context())
-        if self.outbound_queue is not None:
-            for j in further_jobs:
-                await self.outbound_queue.put(j)
-        elif further_jobs:
-            self._log(
-                "ERROR",
-                f"Could not push {len(further_jobs)} jobs because there is no outbound queue",
-            )
-        self.inbound_queue.task_done()
-
-    @abstractmethod
-    def get_execution_context(self) -> dict: ...
-
-
-@frozen
-class SnapshotWatchdog(Worker):
-    snapshot_queue: asyncio.Queue
-
-    async def run(self):
-        await self._push_new_jobs()
-
-        while True:
-            sleep_time_s = self._seconds_until_next_full_hour()
-            await asyncio.sleep(sleep_time_s)
-            self._log("INFO", f"Woke up at {datetime.now()}")
-            await self._push_new_jobs()
-
-    async def _push_new_jobs(self):
-        initial_jobs = SnapshotSearchJob.create(
-            settings.snapshots.days_in_past, settings.snapshots.hours
-        )
-        for j in initial_jobs:
-            await self.snapshot_queue.put(j)
-
-    @staticmethod
-    def _seconds_until_next_full_hour() -> float:
-        now = datetime.now()
-        next_tick = timedelta(
-            hours=1,
-            minutes=-now.minute,
-            seconds=-now.second,
-            microseconds=-now.microsecond,
-        )
-        return next_tick / timedelta(microseconds=1) / 1e6
-
-
-@frozen
-class SnapshotWorker(QueueWorker):
-    storage: Storage
-    ia_client: InternetArchiveClient
-
-    def get_execution_context(self):
-        return {"storage": self.storage, "ia_client": self.ia_client}
-
-
-@frozen
-class FetchWorker(QueueWorker):
-    ia_client: InternetArchiveClient
-
-    def get_execution_context(self):
-        return {"ia_client": self.ia_client}
-
-
-@frozen
-class ParseWorker(QueueWorker):
-    def get_execution_context(self):
-        return {}
-
-
-@frozen
-class StoreWorker(QueueWorker):
-    storage: Storage
-
-    def get_execution_context(self):
-        return {"storage": self.storage}
 
 
 def batched(iterable, n):

+ 100 - 121
src/media_observer/snapshots.py

@@ -1,18 +1,16 @@
 import asyncio
-import sys
-from uuid import uuid1
-import pickle
-import traceback
+from datetime import date, datetime, time, timedelta
 import os
+from pathlib import Path
+import pickle
 import tempfile
+import traceback
 import urllib.parse
-from pathlib import Path
-from datetime import date, datetime, time, timedelta
 from zoneinfo import ZoneInfo
 from attrs import frozen
-from loguru import logger
-
+from uuid import uuid1
 
+from media_observer.worker import Job, Worker, QueueWorker
 from media_observer.article import ArchiveCollection, FrontPage
 from media_observer.internet_archive import (
     InternetArchiveClient,
@@ -20,14 +18,12 @@ from media_observer.internet_archive import (
     InternetArchiveSnapshotId,
     SnapshotNotYetAvailable,
 )
-from media_observer.medias import media_collection
 from media_observer.storage import Storage
-from media_observer.worker import Job, Worker, JobQueue
+from media_observer.medias import media_collection
 from config import settings
 
 
 tmpdir = Path(tempfile.mkdtemp(prefix="media_observer"))
-idx = 0
 
 
 def unique_id():
@@ -36,6 +32,7 @@ def unique_id():
 
 @frozen
 class SnapshotSearchJob(Job):
+    queue = asyncio.Queue()
     collection: ArchiveCollection
     dt: datetime
 
@@ -63,68 +60,40 @@ class SnapshotSearchJob(Job):
             < now
         ]
 
+    async def execute(self, *, storage: Storage, ia_client: InternetArchiveClient):
+        collection = self.collection
+        dt = self.dt
 
-@frozen
-class SnapshotFetchJob(Job):
-    snap_id: InternetArchiveSnapshotId
-    collection: ArchiveCollection
-    dt: datetime
-
-
-@frozen
-class SnapshotParseJob(Job):
-    collection: ArchiveCollection
-    snapshot: InternetArchiveSnapshot
-    dt: datetime
-
-
-@frozen
-class SnapshotStoreJob(Job):
-    page: FrontPage
-    collection: ArchiveCollection
-    dt: datetime
-
-
-@frozen
-class SearchWorker(Worker):
-    storage: Storage
-    ia_client: InternetArchiveClient
-    type_ = SnapshotSearchJob
-
-    async def execute(self, job: SnapshotSearchJob):
-        collection = job.collection
-        dt = job.dt
-
-        if await self.storage.exists_frontpage(collection.name, dt):
+        if await storage.exists_frontpage(collection.name, dt):
             return None, []
 
         self._log(
-            "DEBUG", job, f"Start handling snap for collection {collection.name} @ {dt}"
+            "DEBUG",
+            f"Start handling snap for collection {collection.name} @ {dt}",
         )
 
         try:
-            id_closest = await self.ia_client.get_snapshot_id_closest_to(
-                job.collection.url, job.dt
+            id_closest = await ia_client.get_snapshot_id_closest_to(
+                self.collection.url, self.dt
             )
 
-            delta = job.dt - id_closest.timestamp
+            delta = self.dt - id_closest.timestamp
             abs_delta = abs(delta)
             if abs_delta.total_seconds() > 3600:
                 time = "after" if delta > timedelta(0) else "before"
                 self._log(
                     "WARNING",
-                    job,
-                    f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {job.dt})",
+                    f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
                 )
 
+            self._log("INFO", f"Got snapshot {id_closest}")
             return id_closest, [
-                SnapshotFetchJob(job.id_, id_closest, job.collection, job.dt)
+                SnapshotFetchJob(self.id_, id_closest, self.collection, self.dt)
             ]
 
         except SnapshotNotYetAvailable as e:
             self._log(
                 "WARNING",
-                job,
                 f"Snapshot for {collection.name} @ {dt} not yet available",
             )
             raise e
@@ -132,7 +101,6 @@ class SearchWorker(Worker):
         except Exception as e:
             self._log(
                 "ERROR",
-                job,
                 f"Error while trying to find snapshot for {collection.name} @ {dt}",
             )
             traceback.print_exception(e)
@@ -140,32 +108,41 @@ class SearchWorker(Worker):
 
 
 @frozen
-class FetchWorker(Worker):
-    ia_client: InternetArchiveClient
-    type_ = SnapshotFetchJob
+class SnapshotFetchJob(Job):
+    queue = asyncio.Queue()
+    snap_id: InternetArchiveSnapshotId
+    collection: ArchiveCollection
+    dt: datetime
 
-    async def execute(self, job: SnapshotFetchJob):
+    async def execute(self, ia_client: InternetArchiveClient):
         try:
-            closest = await self.ia_client.fetch(job.snap_id)
-            return closest, [SnapshotParseJob(job.id_, job.collection, closest, job.dt)]
+            closest = await ia_client.fetch(self.snap_id)
+            return closest, [
+                SnapshotParseJob(self.id_, self.collection, closest, self.dt)
+            ]
         except Exception as e:
-            self._log("ERROR", job, f"Error while fetching {job.snap_id}")
+            self._log("ERROR", f"Error while fetching {self.snap_id}")
             traceback.print_exception(e)
             raise e
 
 
 @frozen
-class ParseWorker(Worker):
-    type_ = SnapshotParseJob
+class SnapshotParseJob(Job):
+    queue = asyncio.Queue()
+    collection: ArchiveCollection
+    snapshot: InternetArchiveSnapshot
+    dt: datetime
 
-    async def execute(self, job: SnapshotParseJob):
+    async def execute(self):
         try:
-            main_page = await job.collection.FrontPageClass.from_snapshot(job.snapshot)
+            main_page = await self.collection.FrontPageClass.from_snapshot(
+                self.snapshot
+            )
             return main_page, [
-                SnapshotStoreJob(job.id_, main_page, job.collection, job.dt)
+                SnapshotStoreJob(self.id_, main_page, self.collection, self.dt)
             ]
         except Exception as e:
-            snapshot = job.snapshot
+            snapshot = self.snapshot
             sub_dir = (
                 tmpdir
                 / urllib.parse.quote_plus(snapshot.id.original)
@@ -173,8 +150,8 @@ class ParseWorker(Worker):
             )
             os.makedirs(sub_dir)
 
-            with open(sub_dir / "job.pickle", "wb") as f:
-                pickle.dump(job, f)
+            with open(sub_dir / "self.pickle", "wb") as f:
+                pickle.dump(self, f)
             with open(sub_dir / "snapshot.html", "w") as f:
                 f.write(snapshot.text)
             with open(sub_dir / "exception.txt", "w") as f:
@@ -184,86 +161,88 @@ class ParseWorker(Worker):
 
             self._log(
                 "ERROR",
-                job,
                 f"Error while parsing snapshot from {snapshot.id.url}, details were written in directory {sub_dir}",
             )
             raise e
 
 
 @frozen
-class StoreWorker(Worker):
-    storage: Storage
-    type_ = SnapshotStoreJob
+class SnapshotStoreJob(Job):
+    queue = asyncio.Queue()
+    page: FrontPage
+    collection: ArchiveCollection
+    dt: datetime
 
-    async def execute(self, job: SnapshotStoreJob):
+    async def execute(self, storage: Storage):
         try:
-            return await self.storage.add_page(job.collection, job.page, job.dt), []
+            return await storage.add_page(self.collection, self.page, self.dt), []
         except Exception as e:
             self._log(
                 "ERROR",
-                job,
-                f"Error while attempting to store {job.page} from {job.collection.name} @ {job.dt}",
+                f"Error while attempting to store {self.page} from {self.collection.name} @ {self.dt}",
             )
             traceback.print_exception(e)
             raise e
 
 
-async def main(jobs):
-    storage = await Storage.create()
+@frozen
+class SnapshotWatchdog(Worker):
+    snapshot_queue: asyncio.Queue
 
-    queue = JobQueue(
-        [
-            SnapshotSearchJob,
-            SnapshotFetchJob,
-            SnapshotParseJob,
-            SnapshotStoreJob,
-        ]
-    )
+    async def run(self):
+        await self._push_new_jobs()
 
-    logger.info("Starting snapshot service..")
+        while True:
+            sleep_time_s = self._seconds_until_next_full_hour()
+            await asyncio.sleep(sleep_time_s)
+            self._log("INFO", f"Woke up at {datetime.now()}")
+            await self._push_new_jobs()
 
-    for j in jobs:
-        queue.put_nowait(j)
+    async def _push_new_jobs(self):
+        initial_jobs = SnapshotSearchJob.create(
+            settings.snapshots.days_in_past, settings.snapshots.hours
+        )
+        for j in initial_jobs:
+            await self.snapshot_queue.put(j)
 
-    async with InternetArchiveClient.create() as ia:
-        workers = {
-            SearchWorker(queue, storage, ia): 3,
-            FetchWorker(queue, ia): 3,
-            ParseWorker(queue): 3,
-            StoreWorker(queue, storage): 1,
-        }
+    @staticmethod
+    def _seconds_until_next_full_hour() -> float:
+        now = datetime.now()
+        next_tick = timedelta(
+            hours=1,
+            minutes=-now.minute,
+            seconds=-now.second,
+            microseconds=-now.microsecond,
+        )
+        return next_tick / timedelta(microseconds=1) / 1e6
+
+
+@frozen
+class SnapshotWorker(QueueWorker):
+    storage: Storage
+    ia_client: InternetArchiveClient
 
-        async with asyncio.TaskGroup() as tg:
-            tasks = []
-            for w, nb in workers.items():
-                for _ in range(nb):
-                    tasks.append(tg.create_task(w.loop()))
+    def get_execution_context(self):
+        return {"storage": self.storage, "ia_client": self.ia_client}
 
-            # Wait until the queue is fully processed.
-            await queue.join()
 
-            for t in tasks:
-                t.cancel()
+@frozen
+class FetchWorker(QueueWorker):
+    ia_client: InternetArchiveClient
 
-    await storage.close()
-    logger.info("Snapshot service exiting")
+    def get_execution_context(self):
+        return {"ia_client": self.ia_client}
 
 
-async def replay(root_dir: Path):
-    jobs = []
-    for pickled_job in root_dir.glob("**/**/*.pickle"):
-        with open(pickled_job, "rb") as f:
-            jobs.append(pickle.load(f))
+@frozen
+class ParseWorker(QueueWorker):
+    def get_execution_context(self):
+        return {}
 
-    await main(jobs)
 
+@frozen
+class StoreWorker(QueueWorker):
+    storage: Storage
 
-if __name__ == "__main__":
-    try:
-        path = Path(sys.argv[1])
-        asyncio.run(replay(path))
-    except IndexError:
-        jobs = SnapshotSearchJob.create(
-            settings.snapshots.days_in_past, settings.snapshots.hours
-        )
-        asyncio.run(main(jobs))
+    def get_execution_context(self):
+        return {"storage": self.storage}

+ 39 - 51
src/media_observer/worker.py

@@ -3,71 +3,59 @@ from uuid import UUID
 from attrs import frozen
 from loguru import logger
 from abc import ABC, abstractmethod
-from typing import Any, ClassVar
+from typing import ClassVar
 
 
 @frozen
 class Job(ABC):
     id_: UUID
+    queue: ClassVar[asyncio.Queue]
 
+    @abstractmethod
+    async def execute(self, **kwargs): ...
 
-class JobQueue:
-    def __init__(self, job_types) -> None:
-        self.job_types = job_types
-        self._finished = asyncio.locks.Event()
-        self._pending_tasks = 0
-        self.queues = {kls: asyncio.Queue() for kls in self.job_types}
-
-    async def get(self, job_kls):
-        return await self.queues[job_kls].get()
-
-    def task_done(self, job_kls):
-        self.queues[job_kls].task_done()
-
-        self._pending_tasks -= 1
-        if self._pending_tasks == 0:
-            self._finished.set()
+    def _log(self, level: str, msg: str):
+        logger.log(level, f"[{self.id_}] {msg}")
 
-    def put_nowait(self, job):
-        self._pending_tasks += 1
-        self._finished.clear()
-        return self.queues[type(job)].put_nowait(job)
 
-    async def join(self):
-        if self._pending_tasks > 0:
-            await self._finished.wait()
+class Worker(ABC):
+    @abstractmethod
+    async def run(self): ...
 
-    def qsize(self):
-        return {j: self.queues[j].qsize() for j in self.job_types}
+    def _log(self, level: str, msg: str):
+        logger.log(level, f"[Worker {self.__class__.__name__}] {msg}")
 
 
 @frozen
-class Worker(ABC):
-    queue: JobQueue
-    type_: ClassVar[type]
+class QueueWorker(Worker):
+    inbound_queue: asyncio.Queue
+    outbound_queue: asyncio.Queue | None
 
-    @abstractmethod
-    async def execute(self, job: Job) -> tuple[Any, list[Job]]: ...
-
-    async def loop(self):
+    async def run(self):
+        self._log("INFO", "booting..")
         while True:
-            # Get a "work item" out of the queue.
-            job = await self.queue.get(self.type_)
-
-            assert isinstance(job, self.type_)
-
             try:
-                res, further_jobs = await self.execute(job)
-
-                if res is not None:
-                    self._log("DEBUG", job, f"Completed job {job.__class__.__name__}")
+                await self.step()
+            except asyncio.CancelledError:
+                self._log("WARNING", "cancelled")
+                return
+            except Exception as e:
+                self._log("DEBUG", f"failed with {e.__class__.__name__}")
+
+    async def step(self):
+        job: Job = await self.inbound_queue.get()
+        assert isinstance(job, Job)
+
+        ret, further_jobs = await job.execute(**self.get_execution_context())
+        if self.outbound_queue is not None:
+            for j in further_jobs:
+                await self.outbound_queue.put(j)
+        elif further_jobs:
+            self._log(
+                "ERROR",
+                f"Could not push {len(further_jobs)} jobs because there is no outbound queue",
+            )
+        self.inbound_queue.task_done()
 
-                for j in further_jobs:
-                    self.queue.put_nowait(j)
-            except Exception:
-                ...
-
-            self.queue.task_done(self.type_)
-
-    def _log(self, level: str, job: Job, msg: str):
-        logger.log(level, f"[{job.id_}] {msg}")
+    @abstractmethod
+    def get_execution_context(self) -> dict: ...