Procházet zdrojové kódy

Ensure index worker is triggered on addition of embeddings

jherve před 1 rokem
rodič
revize
299feac56b
1 změnil soubory, kde provedl 12 přidání a 2 odebrání
  1. 12 2
      src/media_observer/test.py

+ 12 - 2
src/media_observer/test.py

@@ -298,6 +298,9 @@ def batched(iterable, n):
         yield batch
 
 
+new_embeddings_event = asyncio.Event()
+
+
 @frozen
 class EmbeddingsWorker(Worker):
     storage: Storage
@@ -337,6 +340,9 @@ class EmbeddingsWorker(Worker):
 
             logger.debug(f"Stored {len(embeddings)} embeddings")
 
+            if embeddings:
+                new_embeddings_event.set()
+
         await asyncio.sleep(5)
 
 
@@ -345,13 +351,16 @@ class SimilarityIndexWorker(Worker):
     storage: Storage
 
     async def run(self):
-        sim_index = SimilaritySearch.create(self.storage)
+        await new_embeddings_event.wait()
 
+        sim_index = SimilaritySearch.create(self.storage)
         logger.info("Starting index..")
         await sim_index.add_embeddings()
         await sim_index.save()
         logger.info("Similarity index ready")
 
+        new_embeddings_event.clear()
+
 
 @frozen
 class WebServer(Worker):
@@ -374,6 +383,7 @@ async def main():
         settings.snapshots.days_in_past, settings.snapshots.hours
     )
     storage = await Storage.create()
+    new_embeddings_event.set()
     try:
         async with InternetArchiveClient.create() as ia:
             workers = {
@@ -401,7 +411,7 @@ async def main():
 
                 tasks.append(tg.create_task(web_server.run()))
                 tasks.append(tg.create_task(embeds.loop()))
-                tasks.append(tg.create_task(index.run()))
+                tasks.append(tg.create_task(index.loop()))
     finally:
         await storage.close()