|
|
@@ -208,22 +208,23 @@ class SnapshotStoreJob(Job):
|
|
|
|
|
|
|
|
|
@frozen
|
|
|
-class Worker:
|
|
|
+class Worker(ABC):
|
|
|
async def loop(self):
|
|
|
- logger.info(f"Hello from task #{id(self)}")
|
|
|
+ logger.info(f"Task {self.__class__.__name__} {id(self)} booting..")
|
|
|
while True:
|
|
|
try:
|
|
|
await self.run()
|
|
|
except asyncio.CancelledError:
|
|
|
- logger.warning(f"Task #{id(self)} cancelled")
|
|
|
+ logger.warning(f"Task {self.__class__.__name__} {id(self)} cancelled")
|
|
|
return
|
|
|
except Exception as e:
|
|
|
traceback.print_exception(e)
|
|
|
- logger.error(f"Task #{id(self)} failed with #{e}")
|
|
|
+ logger.error(
|
|
|
+ f"Task {self.__class__.__name__} {id(self)} failed with {e}"
|
|
|
+ )
|
|
|
|
|
|
- async def run(self):
|
|
|
- await asyncio.sleep(1)
|
|
|
- logger.info(f"Task #{id(self)} doing stuff")
|
|
|
+ @abstractmethod
|
|
|
+ async def run(self): ...
|
|
|
|
|
|
def get_execution_context(self) -> dict: ...
|
|
|
|
|
|
@@ -234,7 +235,7 @@ class QueueWorker(Worker):
|
|
|
outbound_queue: asyncio.Queue | None
|
|
|
|
|
|
async def run(self):
|
|
|
- logger.info(f"Task #{id(self)} waiting for job..")
|
|
|
+ logger.info(f"Task {self.__class__.__name__} {id(self)} waiting for job..")
|
|
|
job: Job = await self.inbound_queue.get()
|
|
|
assert isinstance(job, Job)
|
|
|
|
|
|
@@ -244,7 +245,7 @@ class QueueWorker(Worker):
|
|
|
await self.outbound_queue.put(j)
|
|
|
except AttributeError as e:
|
|
|
logger.error(
|
|
|
- f"Could not push jobs #{further_jobs} because there is no outbound queue"
|
|
|
+ f"Could not push jobs {further_jobs} because there is no outbound queue"
|
|
|
)
|
|
|
raise (e)
|
|
|
self.inbound_queue.task_done()
|
|
|
@@ -298,7 +299,6 @@ class WebServer(Worker):
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
- logger.info("Hello world")
|
|
|
tasks = []
|
|
|
jobs = SnapshotSearchJob.create(
|
|
|
settings.snapshots.days_in_past, settings.snapshots.hours
|
|
|
@@ -323,7 +323,6 @@ async def main():
|
|
|
for j in jobs[:3]:
|
|
|
await SnapshotSearchJob.queue.put(j)
|
|
|
|
|
|
- # THIS TASK DOES NOT HANDLE CANCELLATION SIGNAL
|
|
|
tasks.append(tg.create_task(web_server.run()))
|
|
|
finally:
|
|
|
await storage.close()
|