|
|
@@ -24,8 +24,8 @@ class Job(ABC):
|
|
|
@abstractmethod
|
|
|
async def execute(self, **kwargs): ...
|
|
|
|
|
|
- def _log(self, level: str, job: "Job", msg: str):
|
|
|
- logger.log(level, f"[{job.id_}] {msg}")
|
|
|
+ def _log(self, level: str, msg: str):
|
|
|
+ logger.log(level, f"[{self.id_}] {msg}")
|
|
|
|
|
|
|
|
|
class StupidJob(Job):
|
|
|
@@ -75,7 +75,6 @@ class SnapshotSearchJob(Job):
|
|
|
|
|
|
self._log(
|
|
|
"DEBUG",
|
|
|
- self,
|
|
|
f"Start handling snap for collection {collection.name} @ {dt}",
|
|
|
)
|
|
|
|
|
|
@@ -90,17 +89,15 @@ class SnapshotSearchJob(Job):
|
|
|
time = "after" if delta > timedelta(0) else "before"
|
|
|
self._log(
|
|
|
"WARNING",
|
|
|
- self,
|
|
|
f"Snapshot is {abs(delta)} {time} the required timestamp ({id_closest.timestamp} instead of {self.dt})",
|
|
|
)
|
|
|
|
|
|
- self._log("INFO", self, f"Got snapshot {id_closest}")
|
|
|
+ self._log("INFO", f"Got snapshot {id_closest}")
|
|
|
return id_closest, [(self.id_, id_closest, self.collection, self.dt)]
|
|
|
|
|
|
except SnapshotNotYetAvailable as e:
|
|
|
self._log(
|
|
|
"WARNING",
|
|
|
- self,
|
|
|
f"Snapshot for {collection.name} @ {dt} not yet available",
|
|
|
)
|
|
|
raise e
|
|
|
@@ -108,7 +105,6 @@ class SnapshotSearchJob(Job):
|
|
|
except Exception as e:
|
|
|
self._log(
|
|
|
"ERROR",
|
|
|
- self,
|
|
|
f"Error while trying to find snapshot for {collection.name} @ {dt}",
|
|
|
)
|
|
|
traceback.print_exception(e)
|
|
|
@@ -134,7 +130,7 @@ class Worker:
|
|
|
await asyncio.sleep(1)
|
|
|
logger.info(f"Task #{self.i} doing stuff")
|
|
|
|
|
|
- def get_execution_context(self): ...
|
|
|
+ def get_execution_context(self) -> dict: ...
|
|
|
|
|
|
|
|
|
@frozen
|