|
|
@@ -213,6 +213,9 @@ class Worker(ABC):
|
|
|
@abstractmethod
|
|
|
async def run(self): ...
|
|
|
|
|
|
+ def _log(self, level: str, msg: str):
|
|
|
+ logger.log(level, f"[Worker {self.__class__.__name__}] {msg}")
|
|
|
+
|
|
|
|
|
|
@frozen
|
|
|
class QueueWorker(Worker):
|
|
|
@@ -220,21 +223,18 @@ class QueueWorker(Worker):
|
|
|
outbound_queue: asyncio.Queue | None
|
|
|
|
|
|
async def run(self):
|
|
|
- logger.info(f"Task {self.__class__.__name__} {id(self)} booting..")
|
|
|
+ self._log("INFO", "booting..")
|
|
|
while True:
|
|
|
try:
|
|
|
await self.step()
|
|
|
except asyncio.CancelledError:
|
|
|
- logger.warning(f"Task {self.__class__.__name__} {id(self)} cancelled")
|
|
|
+ self._log("WARNING", "cancelled")
|
|
|
return
|
|
|
except Exception as e:
|
|
|
traceback.print_exception(e)
|
|
|
- logger.error(
|
|
|
- f"Task {self.__class__.__name__} {id(self)} failed with {e}"
|
|
|
- )
|
|
|
+ self._log("ERROR", f"failed with {e}")
|
|
|
|
|
|
async def step(self):
|
|
|
- logger.info(f"Task {self.__class__.__name__} {id(self)} waiting for job..")
|
|
|
job: Job = await self.inbound_queue.get()
|
|
|
assert isinstance(job, Job)
|
|
|
|
|
|
@@ -243,8 +243,9 @@ class QueueWorker(Worker):
|
|
|
for j in further_jobs:
|
|
|
await self.outbound_queue.put(j)
|
|
|
elif further_jobs:
|
|
|
- logger.error(
|
|
|
- f"Could not push {len(further_jobs)} jobs because there is no outbound queue"
|
|
|
+ self._log(
|
|
|
+ "ERROR",
|
|
|
+ f"Could not push {len(further_jobs)} jobs because there is no outbound queue",
|
|
|
)
|
|
|
self.inbound_queue.task_done()
|
|
|
|