| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import asyncio
- from datetime import date, datetime, time, timedelta
- import traceback
- from zoneinfo import ZoneInfo
- from loguru import logger
- from attrs import frozen
- from abc import ABC, abstractmethod
- from uuid import UUID, uuid1
- from media_observer.article import ArchiveCollection
- from media_observer.internet_archive import (
- InternetArchiveClient,
- SnapshotNotYetAvailable,
- )
- from media_observer.storage import Storage
- from media_observer.medias import media_collection
- from config import settings
- @frozen
- class Job(ABC):
- id_: UUID
- @abstractmethod
- async def execute(self, **kwargs): ...
- def _log(self, level: str, msg: str):
- logger.log(level, f"[{self.id_}] {msg}")
- class StupidJob(Job):
- async def execute(self, *args, **kwargs):
- logger.info(f"Executing job {self.id_}..")
- def unique_id():
- return uuid1()
- @frozen
- class SnapshotSearchJob(Job):
- collection: ArchiveCollection
- dt: datetime
- @classmethod
- def create(cls, n_days: int, hours: list[int]):
- return [
- cls(unique_id(), c, d)
- for c in media_collection.values()
- for d in cls.last_n_days_at_hours(n_days, hours, c.tz)
- ]
- @staticmethod
- def last_n_days_at_hours(n: int, hours: list[int], tz: ZoneInfo) -> list[datetime]:
- now = datetime.now(tz)
- return [
- dt
- for i in range(0, n)
- for h in hours
- if (
- dt := datetime.combine(
- date.today() - timedelta(days=i), time(hour=h), tzinfo=tz
- )
- )
- < now
- ]
- async def execute(self, *, storage: Storage, ia_client: InternetArchiveClient):
- collection = self.collection
- dt = self.dt
- if await storage.exists_frontpage(collection.name, dt):
- return None, []
- self._log(
- "DEBUG",
- f"Start handling snap for collection {collection.name} @ {dt}",
- )
- try:
- id_closest = await ia_client.get_snapshot_id_closest_to(
- self.collection.url, self.dt
- )
- delta = self.dt - id_closest.timestamp
- abs_delta = abs(delta)
- if abs_delta.total_seconds() > 3600:
- time = "after" if delta > timedelta(0) else "before"
- self._log(
- "WARNING",
- f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
- )
- self._log("INFO", f"Got snapshot {id_closest}")
- return id_closest, [(self.id_, id_closest, self.collection, self.dt)]
- except SnapshotNotYetAvailable as e:
- self._log(
- "WARNING",
- f"Snapshot for {collection.name} @ {dt} not yet available",
- )
- raise e
- except Exception as e:
- self._log(
- "ERROR",
- f"Error while trying to find snapshot for {collection.name} @ {dt}",
- )
- traceback.print_exception(e)
- raise e
- @frozen
- class Worker:
- i: int
- async def loop(self):
- logger.info(f"Hello from task #{self.i}")
- while True:
- try:
- await self.run()
- except asyncio.CancelledError:
- logger.warning(f"Task #{self.i} cancelled")
- return
- except Exception as e:
- logger.error(f"Task #{self.i} failed with #{e}")
- async def run(self):
- await asyncio.sleep(1)
- logger.info(f"Task #{self.i} doing stuff")
- def get_execution_context(self) -> dict: ...
- @frozen
- class QueueWorker(Worker):
- queue: asyncio.Queue
- async def run(self):
- logger.info(f"Task #{self.i} waiting for job..")
- job: Job = await self.queue.get()
- assert isinstance(job, Job)
- await job.execute(**self.get_execution_context())
- self.queue.task_done()
- def get_execution_context(self):
- return {}
- @frozen
- class SnapshotWorker(QueueWorker):
- storage: Storage
- ia_client: InternetArchiveClient
- def get_execution_context(self):
- return {"storage": self.storage, "ia_client": self.ia_client}
- queues = [asyncio.Queue() for _ in range(0, 2)]
- snap_queue = asyncio.Queue()
- async def main():
- logger.info("Hello world")
- tasks = []
- jobs = SnapshotSearchJob.create(
- settings.snapshots.days_in_past, settings.snapshots.hours
- )
- storage = await Storage.create()
- try:
- async with InternetArchiveClient.create() as ia:
- worker = SnapshotWorker(15, snap_queue, storage, ia)
- async with asyncio.TaskGroup() as tg:
- for i in range(0, 2):
- w = Worker(i)
- tasks.append(tg.create_task(w.loop()))
- for i in range(0, 2):
- qw = QueueWorker(i, queue=queues[i])
- tasks.append(tg.create_task(qw.loop()))
- for q in queues:
- job = StupidJob(uuid1())
- await q.put(job)
- tasks.append(tg.create_task(worker.loop()))
- for j in jobs[:3]:
- await snap_queue.put(j)
- finally:
- await storage.close()
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- logger.warning("Main kbinterrupt")
|