浏览代码

Add a proper job

jherve 1 年之前
父节点
当前提交
f6e0afb5f2
共有 1 个文件被更改,包括 136 次插入12 次删除
  1. 136 12
      src/media_observer/test.py

+ 136 - 12
src/media_observer/test.py

@@ -1,16 +1,31 @@
 import asyncio
+from datetime import date, datetime, time, timedelta
+import traceback
+from zoneinfo import ZoneInfo
 from loguru import logger
 from attrs import frozen
 from abc import ABC, abstractmethod
 from uuid import UUID, uuid1
 
+from media_observer.article import ArchiveCollection
+from media_observer.internet_archive import (
+    InternetArchiveClient,
+    SnapshotNotYetAvailable,
+)
+from media_observer.storage import Storage
+from media_observer.medias import media_collection
+from config import settings
+
 
 @frozen
 class Job(ABC):
     id_: UUID
 
     @abstractmethod
-    async def execute(self, *args, **kwargs): ...
+    async def execute(self, **kwargs): ...
+
+    def _log(self, level: str, job: "Job", msg: str):
+        logger.log(level, f"[{job.id_}] {msg}")
 
 
 class StupidJob(Job):
@@ -18,6 +33,88 @@ class StupidJob(Job):
         logger.info(f"Executing job {self.id_}..")
 
 
+def unique_id():
+    return uuid1()
+
+
+@frozen
+class SnapshotSearchJob(Job):
+    collection: ArchiveCollection
+    dt: datetime
+
+    @classmethod
+    def create(cls, n_days: int, hours: list[int]):
+        return [
+            cls(unique_id(), c, d)
+            for c in media_collection.values()
+            for d in cls.last_n_days_at_hours(n_days, hours, c.tz)
+        ]
+
+    @staticmethod
+    def last_n_days_at_hours(n: int, hours: list[int], tz: ZoneInfo) -> list[datetime]:
+        now = datetime.now(tz)
+
+        return [
+            dt
+            for i in range(0, n)
+            for h in hours
+            if (
+                dt := datetime.combine(
+                    date.today() - timedelta(days=i), time(hour=h), tzinfo=tz
+                )
+            )
+            < now
+        ]
+
+    async def execute(self, *, storage: Storage, ia_client: InternetArchiveClient):
+        collection = self.collection
+        dt = self.dt
+
+        if await storage.exists_frontpage(collection.name, dt):
+            return None, []
+
+        self._log(
+            "DEBUG",
+            self,
+            f"Start handling snap for collection {collection.name} @ {dt}",
+        )
+
+        try:
+            id_closest = await ia_client.get_snapshot_id_closest_to(
+                self.collection.url, self.dt
+            )
+
+            delta = self.dt - id_closest.timestamp
+            abs_delta = abs(delta)
+            if abs_delta.total_seconds() > 3600:
+                time = "after" if delta > timedelta(0) else "before"
+                self._log(
+                    "WARNING",
+                    self,
+                    f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
+                )
+
+            self._log("INFO", self, f"Got snapshot {id_closest}")
+            return id_closest, [(self.id_, id_closest, self.collection, self.dt)]
+
+        except SnapshotNotYetAvailable as e:
+            self._log(
+                "WARNING",
+                self,
+                f"Snapshot for {collection.name} @ {dt} not yet available",
+            )
+            raise e
+
+        except Exception as e:
+            self._log(
+                "ERROR",
+                self,
+                f"Error while trying to find snapshot for {collection.name} @ {dt}",
+            )
+            traceback.print_exception(e)
+            raise e
+
+
 @frozen
 class Worker:
     i: int
@@ -30,11 +127,15 @@ class Worker:
             except asyncio.CancelledError:
                 logger.warning(f"Task #{self.i} cancelled")
                 return
+            except Exception as e:
+                logger.error(f"Task #{self.i} failed with #{e}")
 
     async def run(self):
         await asyncio.sleep(1)
         logger.info(f"Task #{self.i} doing stuff")
 
+    def get_execution_context(self): ...
+
 
 @frozen
 class QueueWorker(Worker):
@@ -44,26 +145,49 @@ class QueueWorker(Worker):
         logger.info(f"Task #{self.i} waiting for job..")
         job: Job = await self.queue.get()
         assert isinstance(job, Job)
-        await job.execute()
+        await job.execute(**self.get_execution_context())
         self.queue.task_done()
 
+    def get_execution_context(self):
+        return {}
+
+
+@frozen
+class SnapshotWorker(QueueWorker):
+    storage: Storage
+    ia_client: InternetArchiveClient
+
+    def get_execution_context(self):
+        return {"storage": self.storage, "ia_client": self.ia_client}
+
 
 queues = [asyncio.Queue() for _ in range(0, 2)]
+snap_queue = asyncio.Queue()
 
 
 async def main():
     logger.info("Hello world")
     tasks = []
-    async with asyncio.TaskGroup() as tg:
-        for i in range(0, 2):
-            w = Worker(i)
-            tasks.append(tg.create_task(w.loop()))
-        for i in range(0, 2):
-            qw = QueueWorker(i, queue=queues[i])
-            tasks.append(tg.create_task(qw.loop()))
-        for q in queues:
-            job = StupidJob(uuid1())
-            await q.put(job)
+    jobs = SnapshotSearchJob.create(
+        settings.snapshots.days_in_past, settings.snapshots.hours
+    )
+    storage = await Storage.create()
+    async with InternetArchiveClient.create() as ia:
+        worker = SnapshotWorker(15, snap_queue, storage, ia)
+        async with asyncio.TaskGroup() as tg:
+            for i in range(0, 2):
+                w = Worker(i)
+                tasks.append(tg.create_task(w.loop()))
+            for i in range(0, 2):
+                qw = QueueWorker(i, queue=queues[i])
+                tasks.append(tg.create_task(qw.loop()))
+            for q in queues:
+                job = StupidJob(uuid1())
+                await q.put(job)
+
+            tasks.append(tg.create_task(worker.loop()))
+            for j in jobs:
+                await snap_queue.put(j)
 
 
 if __name__ == "__main__":