|
@@ -252,6 +252,38 @@ class QueueWorker(Worker):
|
|
|
def get_execution_context(self) -> dict: ...
|
|
def get_execution_context(self) -> dict: ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+@frozen
|
|
|
|
|
+class SnapshotWatchdog(Worker):
|
|
|
|
|
+ snapshot_queue: asyncio.Queue
|
|
|
|
|
+
|
|
|
|
|
+ async def run(self):
|
|
|
|
|
+ await self._push_new_jobs()
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ sleep_time_s = self._seconds_until_next_full_hour()
|
|
|
|
|
+ await asyncio.sleep(sleep_time_s)
|
|
|
|
|
+ self._log("INFO", f"Woke up at {datetime.now()}")
|
|
|
|
|
+ await self._push_new_jobs()
|
|
|
|
|
+
|
|
|
|
|
+ async def _push_new_jobs(self):
|
|
|
|
|
+ initial_jobs = SnapshotSearchJob.create(
|
|
|
|
|
+ settings.snapshots.days_in_past, settings.snapshots.hours
|
|
|
|
|
+ )
|
|
|
|
|
+ for j in initial_jobs:
|
|
|
|
|
+ await self.snapshot_queue.put(j)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _seconds_until_next_full_hour() -> float:
|
|
|
|
|
+ now = datetime.now()
|
|
|
|
|
+ next_tick = timedelta(
|
|
|
|
|
+ hours=1,
|
|
|
|
|
+ minutes=-now.minute,
|
|
|
|
|
+ seconds=-now.second,
|
|
|
|
|
+ microseconds=-now.microsecond,
|
|
|
|
|
+ )
|
|
|
|
|
+ return next_tick / timedelta(microseconds=1) / 1e6
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@frozen
|
|
@frozen
|
|
|
class SnapshotWorker(QueueWorker):
|
|
class SnapshotWorker(QueueWorker):
|
|
|
storage: Storage
|
|
storage: Storage
|
|
@@ -390,7 +422,8 @@ class MediaObserverApplication:
|
|
|
new_embeddings_event.set()
|
|
new_embeddings_event.set()
|
|
|
|
|
|
|
|
snapshots_workers = (
|
|
snapshots_workers = (
|
|
|
- [
|
|
|
|
|
|
|
+ [SnapshotWatchdog(SnapshotSearchJob.queue)]
|
|
|
|
|
+ + [
|
|
|
SnapshotWorker(
|
|
SnapshotWorker(
|
|
|
SnapshotSearchJob.queue, SnapshotFetchJob.queue, storage, ia
|
|
SnapshotSearchJob.queue, SnapshotFetchJob.queue, storage, ia
|
|
|
)
|
|
)
|
|
@@ -410,14 +443,12 @@ class MediaObserverApplication:
|
|
|
new_embeddings_event,
|
|
new_embeddings_event,
|
|
|
)
|
|
)
|
|
|
index = SimilarityIndexWorker(storage, new_embeddings_event)
|
|
index = SimilarityIndexWorker(storage, new_embeddings_event)
|
|
|
|
|
+ watchdog = SnapshotWatchdog(SnapshotSearchJob.queue)
|
|
|
return MediaObserverApplication(snapshots_workers, web_server, embeds, index)
|
|
return MediaObserverApplication(snapshots_workers, web_server, embeds, index)
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
async def main():
|
|
|
tasks = []
|
|
tasks = []
|
|
|
- jobs = SnapshotSearchJob.create(
|
|
|
|
|
- settings.snapshots.days_in_past, settings.snapshots.hours
|
|
|
|
|
- )
|
|
|
|
|
storage = await Storage.create()
|
|
storage = await Storage.create()
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
@@ -428,9 +459,6 @@ async def main():
|
|
|
for w in app.workers:
|
|
for w in app.workers:
|
|
|
tasks.append(tg.create_task(w.run()))
|
|
tasks.append(tg.create_task(w.run()))
|
|
|
|
|
|
|
|
- for j in jobs:
|
|
|
|
|
- await SnapshotSearchJob.queue.put(j)
|
|
|
|
|
-
|
|
|
|
|
finally:
|
|
finally:
|
|
|
await storage.close()
|
|
await storage.close()
|
|
|
|
|
|