writer.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import sys
  2. import traceback
  3. import asyncio
  4. from pathlib import Path
  5. from dataclasses import asdict
  6. from watchfiles import awatch
  7. from job_search.read_write import ReadWriter
  8. from job_search.job_storage import JobStorage
  9. from job_search.messages import (
  10. AddJobMessage,
  11. InitialConfigurationMessage,
  12. StorageReadyMessage,
  13. StorageUpdatedMessage,
  14. ListJobsRequestMessage,
  15. JobOfferListMessage,
  16. LogMessage,
  17. Message,
  18. JobAddedMessage,
  19. JobAlreadyExistsMessage,
  20. )
  21. class Application:
  22. def __init__(self, stdin, stdout):
  23. self.read_writer = ReadWriter(stdin, stdout)
  24. self.job_storage = None
  25. def handle_message(self, message: Message):
  26. match message:
  27. case AddJobMessage():
  28. try:
  29. self.job_storage.insert_record("job_offer", asdict(message))
  30. self.read_writer.send_message(JobAddedMessage(message.id))
  31. except FileExistsError as e:
  32. self.read_writer.send_message(JobAlreadyExistsMessage(message.id))
  33. case ListJobsRequestMessage():
  34. offers = list(self.job_storage.read_all().values())
  35. self.read_writer.send_message(JobOfferListMessage(offers))
  36. case InitialConfigurationMessage(jobs_path):
  37. self.job_storage = JobStorage(base_dir=Path(jobs_path))
  38. self.read_writer.send_message(StorageReadyMessage())
  39. async def loop_on_messages(self):
  40. loop = asyncio.get_running_loop()
  41. while True:
  42. try:
  43. received_message = await loop.run_in_executor(
  44. None, self.read_writer.get_message
  45. )
  46. self.handle_message(received_message)
  47. except Exception as e:
  48. exc_info = sys.exc_info()
  49. tb = "".join(traceback.format_exception(*exc_info))
  50. self.read_writer.send_message(LogMessage.error(content=tb))
  51. async def watch_changes(self):
  52. while True:
  53. await asyncio.sleep(1)
  54. if self.job_storage:
  55. async for changes in awatch(self.job_storage.rec_file_path):
  56. self.read_writer.send_message(StorageUpdatedMessage())
  57. async def aloop(self):
  58. async with asyncio.TaskGroup() as tg:
  59. task1 = tg.create_task(self.loop_on_messages())
  60. task2 = tg.create_task(self.watch_changes())
  61. if __name__ == "__main__":
  62. app = Application(sys.stdin, sys.stdout)
  63. asyncio.run(app.aloop())