Przeglądaj źródła

Move queue definition to class attribute of Job

jherve 1 rok temu
rodzic
commit
754977a5b0
1 zmienionych plików z 5 dodań i 5 usunięć
  1. 5 5
      src/media_observer/test.py

+ 5 - 5
src/media_observer/test.py

@@ -1,6 +1,7 @@
 import asyncio
 import asyncio
 from datetime import date, datetime, time, timedelta
 from datetime import date, datetime, time, timedelta
 import traceback
 import traceback
+from typing import ClassVar
 from zoneinfo import ZoneInfo
 from zoneinfo import ZoneInfo
 from loguru import logger
 from loguru import logger
 from attrs import frozen
 from attrs import frozen
@@ -23,6 +24,7 @@ from config import settings
 @frozen
 @frozen
 class Job(ABC):
 class Job(ABC):
     id_: UUID
     id_: UUID
+    queue: ClassVar[asyncio.Queue]
 
 
     @abstractmethod
     @abstractmethod
     async def execute(self, **kwargs): ...
     async def execute(self, **kwargs): ...
@@ -42,6 +44,7 @@ def unique_id():
 
 
 @frozen
 @frozen
 class SnapshotSearchJob(Job):
 class SnapshotSearchJob(Job):
+    queue = asyncio.Queue()
     collection: ArchiveCollection
     collection: ArchiveCollection
     dt: datetime
     dt: datetime
 
 
@@ -173,9 +176,6 @@ class WebServer(Worker):
             return
             return
 
 
 
 
-snap_queue = asyncio.Queue()
-
-
 async def main():
 async def main():
     logger.info("Hello world")
     logger.info("Hello world")
     tasks = []
     tasks = []
@@ -185,13 +185,13 @@ async def main():
     storage = await Storage.create()
     storage = await Storage.create()
     try:
     try:
         async with InternetArchiveClient.create() as ia:
         async with InternetArchiveClient.create() as ia:
-            workers = {"snapshot": SnapshotWorker(snap_queue, storage, ia)}
+            workers = {"snapshot": SnapshotWorker(SnapshotSearchJob.queue, storage, ia)}
             web_server = WebServer()
             web_server = WebServer()
             async with asyncio.TaskGroup() as tg:
             async with asyncio.TaskGroup() as tg:
                 for w in workers.values():
                 for w in workers.values():
                     tasks.append(tg.create_task(w.loop()))
                     tasks.append(tg.create_task(w.loop()))
                 for j in jobs[:3]:
                 for j in jobs[:3]:
-                    await snap_queue.put(j)
+                    await SnapshotSearchJob.queue.put(j)
 
 
                 # THIS TASK DOES NOT HANDLE CANCELLATION SIGNAL
                 # THIS TASK DOES NOT HANDLE CANCELLATION SIGNAL
                 tasks.append(tg.create_task(web_server.run()))
                 tasks.append(tg.create_task(web_server.run()))