test.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import asyncio
  2. from datetime import date, datetime, time, timedelta
  3. import traceback
  4. from zoneinfo import ZoneInfo
  5. from loguru import logger
  6. from attrs import frozen
  7. from abc import ABC, abstractmethod
  8. from uuid import UUID, uuid1
  9. from media_observer.article import ArchiveCollection
  10. from media_observer.internet_archive import (
  11. InternetArchiveClient,
  12. SnapshotNotYetAvailable,
  13. )
  14. from media_observer.storage import Storage
  15. from media_observer.medias import media_collection
  16. from config import settings
  17. @frozen
  18. class Job(ABC):
  19. id_: UUID
  20. @abstractmethod
  21. async def execute(self, **kwargs): ...
  22. def _log(self, level: str, msg: str):
  23. logger.log(level, f"[{self.id_}] {msg}")
  24. class StupidJob(Job):
  25. async def execute(self, *args, **kwargs):
  26. logger.info(f"Executing job {self.id_}..")
  27. def unique_id():
  28. return uuid1()
  29. @frozen
  30. class SnapshotSearchJob(Job):
  31. collection: ArchiveCollection
  32. dt: datetime
  33. @classmethod
  34. def create(cls, n_days: int, hours: list[int]):
  35. return [
  36. cls(unique_id(), c, d)
  37. for c in media_collection.values()
  38. for d in cls.last_n_days_at_hours(n_days, hours, c.tz)
  39. ]
  40. @staticmethod
  41. def last_n_days_at_hours(n: int, hours: list[int], tz: ZoneInfo) -> list[datetime]:
  42. now = datetime.now(tz)
  43. return [
  44. dt
  45. for i in range(0, n)
  46. for h in hours
  47. if (
  48. dt := datetime.combine(
  49. date.today() - timedelta(days=i), time(hour=h), tzinfo=tz
  50. )
  51. )
  52. < now
  53. ]
  54. async def execute(self, *, storage: Storage, ia_client: InternetArchiveClient):
  55. collection = self.collection
  56. dt = self.dt
  57. if await storage.exists_frontpage(collection.name, dt):
  58. return None, []
  59. self._log(
  60. "DEBUG",
  61. f"Start handling snap for collection {collection.name} @ {dt}",
  62. )
  63. try:
  64. id_closest = await ia_client.get_snapshot_id_closest_to(
  65. self.collection.url, self.dt
  66. )
  67. delta = self.dt - id_closest.timestamp
  68. abs_delta = abs(delta)
  69. if abs_delta.total_seconds() > 3600:
  70. time = "after" if delta > timedelta(0) else "before"
  71. self._log(
  72. "WARNING",
  73. f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
  74. )
  75. self._log("INFO", f"Got snapshot {id_closest}")
  76. return id_closest, [(self.id_, id_closest, self.collection, self.dt)]
  77. except SnapshotNotYetAvailable as e:
  78. self._log(
  79. "WARNING",
  80. f"Snapshot for {collection.name} @ {dt} not yet available",
  81. )
  82. raise e
  83. except Exception as e:
  84. self._log(
  85. "ERROR",
  86. f"Error while trying to find snapshot for {collection.name} @ {dt}",
  87. )
  88. traceback.print_exception(e)
  89. raise e
  90. @frozen
  91. class Worker:
  92. i: int
  93. async def loop(self):
  94. logger.info(f"Hello from task #{self.i}")
  95. while True:
  96. try:
  97. await self.run()
  98. except asyncio.CancelledError:
  99. logger.warning(f"Task #{self.i} cancelled")
  100. return
  101. except Exception as e:
  102. logger.error(f"Task #{self.i} failed with #{e}")
  103. async def run(self):
  104. await asyncio.sleep(1)
  105. logger.info(f"Task #{self.i} doing stuff")
  106. def get_execution_context(self) -> dict: ...
  107. @frozen
  108. class QueueWorker(Worker):
  109. queue: asyncio.Queue
  110. async def run(self):
  111. logger.info(f"Task #{self.i} waiting for job..")
  112. job: Job = await self.queue.get()
  113. assert isinstance(job, Job)
  114. await job.execute(**self.get_execution_context())
  115. self.queue.task_done()
  116. def get_execution_context(self):
  117. return {}
  118. @frozen
  119. class SnapshotWorker(QueueWorker):
  120. storage: Storage
  121. ia_client: InternetArchiveClient
  122. def get_execution_context(self):
  123. return {"storage": self.storage, "ia_client": self.ia_client}
  124. queues = [asyncio.Queue() for _ in range(0, 2)]
  125. snap_queue = asyncio.Queue()
  126. async def main():
  127. logger.info("Hello world")
  128. tasks = []
  129. jobs = SnapshotSearchJob.create(
  130. settings.snapshots.days_in_past, settings.snapshots.hours
  131. )
  132. storage = await Storage.create()
  133. try:
  134. async with InternetArchiveClient.create() as ia:
  135. worker = SnapshotWorker(15, snap_queue, storage, ia)
  136. async with asyncio.TaskGroup() as tg:
  137. for i in range(0, 2):
  138. w = Worker(i)
  139. tasks.append(tg.create_task(w.loop()))
  140. for i in range(0, 2):
  141. qw = QueueWorker(i, queue=queues[i])
  142. tasks.append(tg.create_task(qw.loop()))
  143. for q in queues:
  144. job = StupidJob(uuid1())
  145. await q.put(job)
  146. tasks.append(tg.create_task(worker.loop()))
  147. for j in jobs[:3]:
  148. await snap_queue.put(j)
  149. finally:
  150. await storage.close()
  151. if __name__ == "__main__":
  152. try:
  153. asyncio.run(main())
  154. except KeyboardInterrupt:
  155. logger.warning("Main kbinterrupt")