job_storage.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import re
  2. import subprocess
  3. from pathlib import Path
  4. from urllib.parse import urlparse, ParseResult
  5. from dataclasses import dataclass, field, asdict
  6. from enum import Enum
  7. from datetime import date, datetime
  8. from email.utils import parsedate_to_datetime
  9. class JobOfferOrigin(Enum):
  10. LINKED_IN = "linked_in"
  11. OTHER = "other"
  12. class CompanyKind(Enum):
  13. SSII = "ssii"
  14. START_UP = "start_up"
  15. HEAD_HUNTER = "head_hunter"
  16. REGULAR = "regular"
  17. class ApplicationProcess(Enum):
  18. LINKED_IN_SIMPLIFIED = "linked_in_simplified"
  19. REGULAR = "regular"
  20. CAREER_SITE = "career_site"
  21. SPURIOUS = "spurious"
  22. class ContractType(Enum):
  23. CDI = "CDI"
  24. CDD = "CDD"
  25. NOT_A_JOB = "not_a_job"
  26. class Flexibility(Enum):
  27. ON_SITE = "on_site"
  28. HYBRID = "hybrid"
  29. FULL_REMOTE = "full_remote"
  30. def convert_to_parse_result(url):
  31. if isinstance(url, str):
  32. return urlparse(url)._replace(query=None)
  33. elif isinstance(url, ParseResult):
  34. return url
  35. def convert_to_bool(s: str) -> bool:
  36. return s == "true" or s == "yes" or s == "1"
  37. @dataclass
  38. class JobOffer:
  39. id: str = field(init=False)
  40. url: str = field(repr=False)
  41. title: str
  42. company: str
  43. origin: JobOfferOrigin
  44. location: str
  45. application_process: ApplicationProcess | None = None
  46. company_url: str = ""
  47. description: str = ""
  48. company_kind: CompanyKind | None = None
  49. company_domain: str = ""
  50. comment: str = ""
  51. tags: list[str] = field(default_factory=list)
  52. skills: list[str] = field(default_factory=list)
  53. publication_date: date = None
  54. xp_required: int | None = None
  55. first_seen_date: datetime | None = None
  56. application_considered: bool | None = None
  57. application_date: date | None = None
  58. application_rejection_date: date | None = None
  59. contract_type: ContractType | None = ContractType.CDI
  60. flexibility: Flexibility | None = None
  61. alternate_url: str = None
  62. _url: ParseResult = field(init=False, repr=False)
  63. _company_url: ParseResult = field(init=False, repr=False)
  64. _alternate_url: ParseResult = field(init=False, repr=False, default=None)
  65. def __post_init__(self):
  66. self._url = convert_to_parse_result(self.url)
  67. self.url = self._url.geturl()
  68. self._company_url = convert_to_parse_result(self.company_url)
  69. self.company_url = self._company_url.geturl()
  70. if self.alternate_url:
  71. self._alternate_url = convert_to_parse_result(self.alternate_url)
  72. self.alternate_url = self._alternate_url.geturl()
  73. if self.origin == JobOfferOrigin.LINKED_IN:
  74. path = Path(self._url.path)
  75. self.id = f"linked_in_{path.name}"
  76. def to_storage(self):
  77. return {
  78. k: v
  79. for k, v in asdict(self).items()
  80. if k not in ["_url", "_company_url", "_alternate_url"]
  81. }
  82. @staticmethod
  83. def from_storage(dict: dict):
  84. id = dict.pop("id")
  85. for field, converter in [
  86. ("origin", JobOfferOrigin),
  87. ("application_process", ApplicationProcess),
  88. ("company_kind", CompanyKind),
  89. ("contract_type", ContractType),
  90. ("flexibility", Flexibility),
  91. ("xp_required", int),
  92. ("first_seen_date", parsedate_to_datetime),
  93. ("publication_date", date.fromisoformat),
  94. ("application_considered", convert_to_bool),
  95. ("application_date", date.fromisoformat),
  96. ("application_rejection_date", date.fromisoformat),
  97. ]:
  98. if field in dict:
  99. dict[field] = converter(dict[field])
  100. # For now we simply ignore application-related fields
  101. # read from the storage.
  102. for k in [
  103. "application_first_seen_date",
  104. "application_first_response_date",
  105. "application_cv_version",
  106. "application_appointments",
  107. "application_message",
  108. "application_questions",
  109. "application_url",
  110. "application_contacts",
  111. ]:
  112. try:
  113. del dict[k]
  114. except KeyError:
  115. pass
  116. return JobOffer(**dict)
  117. def remove_whitespace(s):
  118. s = re.sub(r"[^\w\s]", "", s)
  119. s = re.sub(r"\s+", "_", s)
  120. return s
  121. @dataclass
  122. class JobStorage:
  123. base_dir: Path
  124. rec_file_path: Path = field(init=False, repr=False)
  125. def __post_init__(self):
  126. if not self.base_dir.is_absolute():
  127. raise ValueError(
  128. f"The base dir path should be absolute, got '{self.base_dir}'"
  129. )
  130. self.rec_file_path = self.base_dir / "jobs.rec"
  131. # Create the rec file if it does not exist yet, otherwise
  132. # leave it as-is.
  133. try:
  134. f = open(self.rec_file_path)
  135. f.close()
  136. except FileNotFoundError:
  137. with open(self.rec_file_path, "w+") as f:
  138. f.write("%rec: job_offer\n")
  139. f.write("%key: id\n")
  140. f.write("%type: publication_date date\n")
  141. f.write("%type: company_kind enum regular head_hunter ssii start_up\n")
  142. f.write(
  143. "%type: application_process enum regular linked_in_simplified\n"
  144. )
  145. f.write("%type: xp_required range 0 MAX\n")
  146. f.write("%type: origin enum linked_in other\n")
  147. f.write("%type: contract_type enum CDI CDD not_a_job\n")
  148. f.write("%type: flexibility enum on_site hybrid full_remote\n")
  149. f.write("%type: first_seen_date date\n")
  150. f.write("%auto: first_seen_date\n")
  151. def read_all(self) -> dict[str, JobOffer]:
  152. return {r["id"]: JobOffer.from_storage(r) for r in self.select_all("job_offer")}
  153. def add_job(self, offer: JobOffer):
  154. self.insert(offer)
  155. def insert(self, offer: JobOffer):
  156. self.insert_record("job_offer", offer.to_storage())
  157. def insert_record(self, type_, fields):
  158. cmd_args = [
  159. arg
  160. for k, v in self.into_args(fields)
  161. for arg in ["-f", k, "-v", v]
  162. if v is not None and v != [] and v != ""
  163. ]
  164. cmd = (
  165. ["recins", "--verbose", "-t", type_] + cmd_args + [str(self.rec_file_path)]
  166. )
  167. process = subprocess.run(
  168. cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
  169. )
  170. if (code := process.returncode) != 0:
  171. error_lines = process.stderr.splitlines()
  172. first = error_lines[0]
  173. if "error: invalid enum value" in first:
  174. raise ValueError(f"Found invalid enum value in {fields}")
  175. elif "error: duplicated key value in field 'id' in record" in first:
  176. raise FileExistsError(f"Duplicate value {fields['id']}")
  177. else:
  178. raise ValueError(
  179. f"insert command failed with code {code} :\n{process.stderr}"
  180. )
  181. @staticmethod
  182. def into_args(fields: dict) -> list[tuple]:
  183. args = []
  184. for k, v in fields.items():
  185. if isinstance(v, list):
  186. args += [(k, item) for item in v]
  187. elif isinstance(v, int):
  188. args += [(k, str(v))]
  189. elif isinstance(v, Enum):
  190. args += [(k, v.value)]
  191. elif isinstance(v, date):
  192. args += [(k, v.isoformat())]
  193. else:
  194. args += [(k, v)]
  195. return args
  196. def select_all(self, type_):
  197. cmd = ["recsel", "-t", type_, str(self.rec_file_path)]
  198. process = subprocess.run(cmd, stdout=subprocess.PIPE, universal_newlines=True)
  199. if (code := process.returncode) != 0:
  200. raise ValueError(f"select command failed with code {code}")
  201. dict = {}
  202. records = []
  203. for r in process.stdout.split("\n\n"):
  204. dict = {"skills": [], "tags": []}
  205. # For some reason the last record always gets an extra line break which translates
  206. # as an extra empty line in the split
  207. lines = [l for l in re.split(r"\n(?!\+)", r) if l != ""]
  208. for l in lines:
  209. # We assume fields are not empty
  210. [field, value] = l.split(": ", 1)
  211. # Handle multiline records. This will not work if the optional space if not present
  212. # after the PLUS sign.
  213. value = "\n".join(value.split("\n+ "))
  214. if field in ["skills", "tags"]:
  215. dict[field].append(value)
  216. else:
  217. dict[field] = value
  218. if lines != []:
  219. records.append(dict)
  220. return records