|
|
@@ -116,22 +116,20 @@ class SnapshotSearchJob(Job):
|
|
|
|
|
|
@frozen
|
|
|
class Worker:
|
|
|
- i: int
|
|
|
-
|
|
|
async def loop(self):
|
|
|
- logger.info(f"Hello from task #{self.i}")
|
|
|
+ logger.info(f"Hello from task #{id(self)}")
|
|
|
while True:
|
|
|
try:
|
|
|
await self.run()
|
|
|
except asyncio.CancelledError:
|
|
|
- logger.warning(f"Task #{self.i} cancelled")
|
|
|
+ logger.warning(f"Task #{id(self)} cancelled")
|
|
|
return
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Task #{self.i} failed with #{e}")
|
|
|
+ logger.error(f"Task #{id(self)} failed with #{e}")
|
|
|
|
|
|
async def run(self):
|
|
|
await asyncio.sleep(1)
|
|
|
- logger.info(f"Task #{self.i} doing stuff")
|
|
|
+ logger.info(f"Task #{id(self)} doing stuff")
|
|
|
|
|
|
def get_execution_context(self) -> dict: ...
|
|
|
|
|
|
@@ -141,7 +139,7 @@ class QueueWorker(Worker):
|
|
|
queue: asyncio.Queue
|
|
|
|
|
|
async def run(self):
|
|
|
- logger.info(f"Task #{self.i} waiting for job..")
|
|
|
+ logger.info(f"Task #{id(self)} waiting for job..")
|
|
|
job: Job = await self.queue.get()
|
|
|
assert isinstance(job, Job)
|
|
|
await job.execute(**self.get_execution_context())
|
|
|
@@ -162,9 +160,6 @@ class SnapshotWorker(QueueWorker):
|
|
|
|
|
|
@frozen
|
|
|
class WebServer(Worker):
|
|
|
- # app: Any
|
|
|
- # config: Any
|
|
|
-
|
|
|
async def run(self):
|
|
|
shutdown_event = asyncio.Event()
|
|
|
|
|
|
@@ -191,17 +186,18 @@ async def main():
|
|
|
storage = await Storage.create()
|
|
|
try:
|
|
|
async with InternetArchiveClient.create() as ia:
|
|
|
- worker = SnapshotWorker(15, snap_queue, storage, ia)
|
|
|
- web_server = WebServer(16)
|
|
|
+ workers = {"snapshot": SnapshotWorker(snap_queue, storage, ia)}
|
|
|
+ web_server = WebServer()
|
|
|
async with asyncio.TaskGroup() as tg:
|
|
|
for i in range(0, 2):
|
|
|
- qw = QueueWorker(i, queue=queues[i])
|
|
|
+ qw = QueueWorker(queue=queues[i])
|
|
|
tasks.append(tg.create_task(qw.loop()))
|
|
|
for q in queues:
|
|
|
job = StupidJob(uuid1())
|
|
|
await q.put(job)
|
|
|
|
|
|
- tasks.append(tg.create_task(worker.loop()))
|
|
|
+ for w in workers.values():
|
|
|
+ tasks.append(tg.create_task(w.loop()))
|
|
|
for j in jobs[:3]:
|
|
|
await snap_queue.put(j)
|
|
|
|