瀏覽代碼

Add QueueWorker

jherve 1 年之前
父節點
當前提交
4626fa3588
共有 1 個文件被更改,包括 22 次插入0 次删除
  1. 22 0
      src/media_observer/test.py

+ 22 - 0
src/media_observer/test.py

@@ -21,6 +21,23 @@ class Worker:
         logger.info(f"Task #{self.i} doing stuff")
 
 
+@frozen
+class QueueWorker(Worker):
+    queue: asyncio.Queue
+
+    async def run(self):
+        logger.info(f"Task #{self.i} waiting for job..")
+        job = await self.queue.get()
+        await self.execute_job(job)
+        self.queue.task_done()
+
+    async def execute_job(self, job):
+        logger.info(f"Executing job {job}..")
+
+
+queues = [asyncio.Queue() for _ in range(0, 2)]
+
+
 async def main():
     logger.info("Hello world")
     tasks = []
@@ -28,6 +45,11 @@ async def main():
         for i in range(0, 2):
             w = Worker(i)
             tasks.append(tg.create_task(w.loop()))
+        for i in range(0, 2):
+            qw = QueueWorker(i, queue=queues[i])
+            tasks.append(tg.create_task(qw.loop()))
+        for idx, q in enumerate(queues):
+            await q.put({"test": idx})
 
 
 if __name__ == "__main__":