job_storage.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import re
  2. import subprocess
  3. from pathlib import Path
  4. from urllib.parse import urlparse, ParseResult
  5. from dataclasses import dataclass, field, asdict
  6. from enum import Enum
  7. from datetime import date, datetime
  8. from email.utils import parsedate_to_datetime
  9. class JobOfferOrigin(Enum):
  10. LINKED_IN = "linked_in"
  11. OTHER = "other"
  12. class CompanyKind(Enum):
  13. SSII = "ssii"
  14. START_UP = "start_up"
  15. HEAD_HUNTER = "head_hunter"
  16. REGULAR = "regular"
  17. class ApplicationProcess(Enum):
  18. LINKED_IN_SIMPLIFIED = "linked_in_simplified"
  19. REGULAR = "regular"
  20. CAREER_SITE = "career_site"
  21. SPURIOUS = "spurious"
  22. class ContractType(Enum):
  23. CDI = "CDI"
  24. CDD = "CDD"
  25. NOT_A_JOB = "not_a_job"
  26. class Flexibility(Enum):
  27. ON_SITE = "on_site"
  28. HYBRID = "hybrid"
  29. FULL_REMOTE = "full_remote"
  30. def convert_to_parse_result(url):
  31. if isinstance(url, str):
  32. return urlparse(url)._replace(query=None)
  33. elif isinstance(url, ParseResult):
  34. return url
  35. @dataclass
  36. class JobOffer:
  37. id: str = field(init=False)
  38. url: str = field(repr=False)
  39. title: str
  40. company: str
  41. origin: JobOfferOrigin
  42. location: str
  43. application_process: ApplicationProcess | None = None
  44. company_url: str = ""
  45. description: str = ""
  46. company_kind: CompanyKind | None = None
  47. company_domain: str = ""
  48. comment: str = ""
  49. tags: list[str] = field(default_factory=list)
  50. skills: list[str] = field(default_factory=list)
  51. publication_date: date = None
  52. xp_required: int | None = None
  53. first_seen_date: datetime | None = None
  54. application_date: date | None = None
  55. application_rejection_date: date | None = None
  56. contract_type: ContractType | None = ContractType.CDI
  57. flexibility: Flexibility | None = None
  58. alternate_url: str = None
  59. _url: ParseResult = field(init=False, repr=False)
  60. _company_url: ParseResult = field(init=False, repr=False)
  61. _alternate_url: ParseResult = field(init=False, repr=False, default=None)
  62. def __post_init__(self):
  63. self._url = convert_to_parse_result(self.url)
  64. self.url = self._url.geturl()
  65. self._company_url = convert_to_parse_result(self.company_url)
  66. self.company_url = self._company_url.geturl()
  67. if self.alternate_url:
  68. self._alternate_url = convert_to_parse_result(self.alternate_url)
  69. self.alternate_url = self._alternate_url.geturl()
  70. if self.origin == JobOfferOrigin.LINKED_IN:
  71. path = Path(self._url.path)
  72. self.id = f"linked_in_{path.name}"
  73. def to_storage(self):
  74. return {
  75. k: v
  76. for k, v in asdict(self).items()
  77. if k not in ["_url", "_company_url", "_alternate_url"]
  78. }
  79. @staticmethod
  80. def from_storage(dict: dict):
  81. id = dict.pop("id")
  82. for field, converter in [
  83. ("origin", JobOfferOrigin),
  84. ("application_process", ApplicationProcess),
  85. ("company_kind", CompanyKind),
  86. ("contract_type", ContractType),
  87. ("flexibility", Flexibility),
  88. ("xp_required", int),
  89. ("first_seen_date", parsedate_to_datetime),
  90. ("publication_date", date.fromisoformat),
  91. ("application_date", date.fromisoformat),
  92. ("application_rejection_date", date.fromisoformat),
  93. ]:
  94. if field in dict:
  95. dict[field] = converter(dict[field])
  96. # For now we simply ignore application-related fields
  97. # read from the storage.
  98. for k in [
  99. "application_first_seen_date",
  100. "application_first_response_date",
  101. "application_cv_version",
  102. "application_appointments",
  103. "application_message",
  104. "application_questions",
  105. "application_url",
  106. "application_contacts",
  107. ]:
  108. try:
  109. del dict[k]
  110. except KeyError:
  111. pass
  112. return JobOffer(**dict)
  113. def remove_whitespace(s):
  114. s = re.sub(r"[^\w\s]", "", s)
  115. s = re.sub(r"\s+", "_", s)
  116. return s
  117. @dataclass
  118. class JobStorage:
  119. base_dir: Path
  120. rec_file_path: Path = field(init=False, repr=False)
  121. def __post_init__(self):
  122. if not self.base_dir.is_absolute():
  123. raise ValueError(
  124. f"The base dir path should be absolute, got '{self.base_dir}'"
  125. )
  126. self.rec_file_path = self.base_dir / "jobs.rec"
  127. # Create the rec file if it does not exist yet, otherwise
  128. # leave it as-is.
  129. try:
  130. f = open(self.rec_file_path)
  131. f.close()
  132. except FileNotFoundError:
  133. with open(self.rec_file_path, "w+") as f:
  134. f.write("%rec: job_offer\n")
  135. f.write("%key: id\n")
  136. f.write("%type: publication_date date\n")
  137. f.write("%type: company_kind enum regular head_hunter ssii start_up\n")
  138. f.write(
  139. "%type: application_process enum regular linked_in_simplified\n"
  140. )
  141. f.write("%type: xp_required range 0 MAX\n")
  142. f.write("%type: origin enum linked_in other\n")
  143. f.write("%type: contract_type enum CDI CDD not_a_job\n")
  144. f.write("%type: flexibility enum on_site hybrid full_remote\n")
  145. f.write("%type: first_seen_date date\n")
  146. f.write("%auto: first_seen_date\n")
  147. def read_all(self) -> dict[str, JobOffer]:
  148. return {r["id"]: JobOffer.from_storage(r) for r in self.select_all("job_offer")}
  149. def add_job(self, offer: JobOffer):
  150. self.insert(offer)
  151. def insert(self, offer: JobOffer):
  152. self.insert_record("job_offer", offer.to_storage())
  153. def insert_record(self, type_, fields):
  154. cmd_args = [
  155. arg
  156. for k, v in self.into_args(fields)
  157. for arg in ["-f", k, "-v", v]
  158. if v is not None and v != [] and v != ""
  159. ]
  160. cmd = (
  161. ["recins", "--verbose", "-t", type_] + cmd_args + [str(self.rec_file_path)]
  162. )
  163. process = subprocess.run(
  164. cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
  165. )
  166. if (code := process.returncode) != 0:
  167. error_lines = process.stderr.splitlines()
  168. first = error_lines[0]
  169. if "error: invalid enum value" in first:
  170. raise ValueError(f"Found invalid enum value in {fields}")
  171. elif "error: duplicated key value in field 'id' in record" in first:
  172. raise FileExistsError(f"Duplicate value {fields['id']}")
  173. else:
  174. raise ValueError(
  175. f"insert command failed with code {code} :\n{process.stderr}"
  176. )
  177. @staticmethod
  178. def into_args(fields: dict) -> list[tuple]:
  179. args = []
  180. for k, v in fields.items():
  181. if isinstance(v, list):
  182. args += [(k, item) for item in v]
  183. elif isinstance(v, int):
  184. args += [(k, str(v))]
  185. elif isinstance(v, Enum):
  186. args += [(k, v.value)]
  187. elif isinstance(v, date):
  188. args += [(k, v.isoformat())]
  189. else:
  190. args += [(k, v)]
  191. return args
  192. def select_all(self, type_):
  193. cmd = ["recsel", "-t", type_, str(self.rec_file_path)]
  194. process = subprocess.run(cmd, stdout=subprocess.PIPE, universal_newlines=True)
  195. if (code := process.returncode) != 0:
  196. raise ValueError(f"select command failed with code {code}")
  197. dict = {}
  198. records = []
  199. for r in process.stdout.split("\n\n"):
  200. dict = {"skills": [], "tags": []}
  201. lines = re.split(r"\n(?!\+)", r)[:-1]
  202. for l in lines:
  203. # We assume fields are not empty
  204. [field, value] = l.split(": ", 1)
  205. # Handle multiline records. This will not work if the optional space if not present
  206. # after the PLUS sign.
  207. value = "\n".join(value.split("\n+ "))
  208. if field in ["skills", "tags"]:
  209. dict[field].append(value)
  210. else:
  211. dict[field] = value
  212. if lines != []:
  213. records.append(dict)
  214. return records