|
|
@@ -211,11 +211,20 @@ class SnapshotStoreJob(Job):
|
|
|
|
|
|
@frozen
|
|
|
class Worker(ABC):
|
|
|
- async def loop(self):
|
|
|
+ @abstractmethod
|
|
|
+ async def run(self): ...
|
|
|
+
|
|
|
+
|
|
|
+@frozen
|
|
|
+class QueueWorker(Worker):
|
|
|
+ inbound_queue: asyncio.Queue
|
|
|
+ outbound_queue: asyncio.Queue | None
|
|
|
+
|
|
|
+ async def run(self):
|
|
|
logger.info(f"Task {self.__class__.__name__} {id(self)} booting..")
|
|
|
while True:
|
|
|
try:
|
|
|
- await self.run()
|
|
|
+ await self.step()
|
|
|
except asyncio.CancelledError:
|
|
|
logger.warning(f"Task {self.__class__.__name__} {id(self)} cancelled")
|
|
|
return
|
|
|
@@ -225,18 +234,7 @@ class Worker(ABC):
|
|
|
f"Task {self.__class__.__name__} {id(self)} failed with {e}"
|
|
|
)
|
|
|
|
|
|
- @abstractmethod
|
|
|
- async def run(self): ...
|
|
|
-
|
|
|
- def get_execution_context(self) -> dict: ...
|
|
|
-
|
|
|
-
|
|
|
-@frozen
|
|
|
-class QueueWorker(Worker):
|
|
|
- inbound_queue: asyncio.Queue
|
|
|
- outbound_queue: asyncio.Queue | None
|
|
|
-
|
|
|
- async def run(self):
|
|
|
+ async def step(self):
|
|
|
logger.info(f"Task {self.__class__.__name__} {id(self)} waiting for job..")
|
|
|
job: Job = await self.inbound_queue.get()
|
|
|
assert isinstance(job, Job)
|
|
|
@@ -251,8 +249,8 @@ class QueueWorker(Worker):
|
|
|
)
|
|
|
self.inbound_queue.task_done()
|
|
|
|
|
|
- def get_execution_context(self):
|
|
|
- return {}
|
|
|
+ @abstractmethod
|
|
|
+ def get_execution_context(self) -> dict: ...
|
|
|
|
|
|
|
|
|
@frozen
|
|
|
@@ -273,7 +271,9 @@ class FetchWorker(QueueWorker):
|
|
|
|
|
|
|
|
|
@frozen
|
|
|
-class ParseWorker(QueueWorker): ...
|
|
|
+class ParseWorker(QueueWorker):
|
|
|
+ def get_execution_context(self):
|
|
|
+ return {}
|
|
|
|
|
|
|
|
|
@frozen
|
|
|
@@ -419,9 +419,13 @@ async def main():
|
|
|
app = await MediaObserverApplication.create(storage, ia)
|
|
|
|
|
|
async with asyncio.TaskGroup() as tg:
|
|
|
- for w in list(app.workers.values()) + [app.embeds, app.index]:
|
|
|
- tasks.append(tg.create_task(w.loop()))
|
|
|
- tasks.append(tg.create_task(app.web_server.run()))
|
|
|
+ for w in list(app.workers.values()) + [
|
|
|
+ app.embeds,
|
|
|
+ app.index,
|
|
|
+ app.web_server,
|
|
|
+ ]:
|
|
|
+ tasks.append(tg.create_task(w.run()))
|
|
|
+
|
|
|
for j in jobs:
|
|
|
await SnapshotSearchJob.queue.put(j)
|
|
|
|