start.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import argparse
  2. import os
  3. import sys
  4. import subprocess
  5. import socket
  6. from pathlib import Path, PurePosixPath
  7. from dataclasses import dataclass
  8. from typing import Any
  9. is_windows = os.name == "nt"
  10. def read_data_sources(hostname: str, login: str) -> list[Path]:
  11. file = Path(f"./data_sources_{hostname}_{login}")
  12. with open(file) as f:
  13. paths = f.readlines()
  14. return [Path(p_str.strip()).expanduser() for p_str in paths]
  15. @dataclass
  16. class KeePass:
  17. path: Path
  18. bin: str | Path
  19. def read_entry_attribute(self, key, attribute):
  20. return self._exec(["show", "-a", attribute, self.path, key]).strip()
  21. def read_entry_attachment(self, key, attachment):
  22. return self._exec(
  23. ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"]
  24. )
  25. def _exec(self, args: list[Any]):
  26. try:
  27. return subprocess.check_output([self.bin] + args, text=True)
  28. except subprocess.CalledProcessError as e:
  29. print("\nThere was an error on call to keepass, please check the outout")
  30. exit(1)
  31. @classmethod
  32. def new(cls, path: Path):
  33. binary = (
  34. Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe"
  35. if is_windows
  36. else "keepassxc-cli"
  37. )
  38. return cls(path=path, bin=binary)
  39. @dataclass
  40. class Secret:
  41. name: str
  42. mode: int
  43. def create(self, keepass: KeePass): ...
  44. @classmethod
  45. def from_line(cls, line: str):
  46. type_, *args = line.split(",")
  47. match type_:
  48. case "file":
  49. sub_class = SecretFile
  50. case "keepass-attribute":
  51. sub_class = SecretKeepassAttribute
  52. case "keepass-attachment":
  53. sub_class = SecretKeepassAttachment
  54. return sub_class.from_line(*args)
  55. @classmethod
  56. def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
  57. file = Path(f"./secret_sources_{hostname}_{login}")
  58. with open(file) as f:
  59. lines = f.readlines()
  60. return [cls.from_line(l.strip()) for l in lines]
  61. @dataclass
  62. class SecretKeepassAttachment(Secret):
  63. key: str
  64. attachment: str
  65. def create(self, keepass: KeePass):
  66. value = keepass.read_entry_attachment(self.key, self.attachment)
  67. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  68. print(args)
  69. subprocess.run(args, input=value.encode())
  70. @classmethod
  71. def from_line(cls, key, attachment):
  72. return cls(name=key, key=key, mode=0o0400, attachment=attachment)
  73. @dataclass
  74. class SecretKeepassAttribute(Secret):
  75. key: str
  76. attribute: str
  77. def create(self, keepass: KeePass):
  78. value = keepass.read_entry_attribute(self.key, self.attribute)
  79. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  80. print(args)
  81. subprocess.run(args, input=value.encode())
  82. @classmethod
  83. def from_line(cls, key, attribute):
  84. return cls(name=key, key=key, mode=0o0400, attribute=attribute)
  85. @dataclass
  86. class SecretFile(Secret):
  87. host_path: Path
  88. def create(self, keepass: KeePass):
  89. args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
  90. print(args)
  91. subprocess.run(args)
  92. @classmethod
  93. def from_line(cls, path: str):
  94. path = Path(path).expanduser()
  95. return cls(host_path=path, name=path.name, mode=0o0400)
  96. @dataclass
  97. class BorgmaticContainer:
  98. hostname: str
  99. login: str
  100. name: str
  101. image: str = "ghcr.io/borgmatic-collective/borgmatic"
  102. def run(self, data_sources: list[Path], secret_sources: list[Secret]):
  103. container_name = f"borgmatic_{self.login}"
  104. ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
  105. data_path = Path.cwd() / "pc_backup" / "data"
  106. config_d_path = data_path / "borgmatic.d"
  107. config_path = data_path / "borgmatic"
  108. history_file = data_path / ".bash_history"
  109. history_file.touch()
  110. volumes = [
  111. f"{config_d_path}:/etc/borgmatic.d/",
  112. f"{config_path}:/etc/borgmatic/",
  113. f"{history_file}:/root/.bash_history",
  114. "borg_ssh_dir:/root/.ssh",
  115. "borg_config:/root/.config/borg",
  116. "borg_cache:/root/.cache/borg",
  117. "borgmatic_state:/root/.local/state/borgmatic",
  118. ]
  119. if ssh_auth_sock:
  120. volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
  121. volumes += [f"{vol}:{self.to_source_path(vol)}:ro" for vol in data_sources]
  122. volume_args = [a for vol in volumes for a in ["-v", vol]]
  123. secrets_args = [
  124. a
  125. for s in secret_sources
  126. for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]
  127. ]
  128. args = (
  129. [
  130. "podman",
  131. "run",
  132. "-h",
  133. self.hostname,
  134. "--detach",
  135. "--name",
  136. container_name,
  137. "-e",
  138. "SSH_AUTH_SOCK",
  139. "-e",
  140. "TZ=Europe/Paris",
  141. "-e",
  142. "SSH_KEY_NAME",
  143. "-e",
  144. f"HOST_LOGIN={self.login}",
  145. "--security-opt=label=disable",
  146. ]
  147. + volume_args
  148. + secrets_args
  149. + [self.image]
  150. )
  151. print(args)
  152. subprocess.run(args)
  153. def rm(self):
  154. subprocess.run(["podman", "rm", "-f", self.name])
  155. def exec(self, cmd: list[str], env_vars: list[str] = []):
  156. args = ["podman", "exec", "-ti"]
  157. args += [a for var in env_vars for a in ["-e", var]]
  158. subprocess.run(args + [self.name] + cmd)
  159. @staticmethod
  160. def to_source_path(path: Path):
  161. mount_base = PurePosixPath("/mnt") / "source"
  162. inner_path = PurePosixPath(path)
  163. with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(
  164. *inner_path.parts[1:]
  165. )
  166. return mount_base / with_drive.relative_to(with_drive.anchor)
  167. @classmethod
  168. def new(cls, hostname: str, login: str):
  169. return cls(hostname, login, f"borgmatic_{login}")
  170. class CliArguments:
  171. @staticmethod
  172. def read_command(parser):
  173. args = parser.parse_args()
  174. return args.command(args)
  175. @staticmethod
  176. def new() -> argparse.ArgumentParser:
  177. parser = argparse.ArgumentParser(prog=sys.argv[0])
  178. subparsers = parser.add_subparsers()
  179. for sub in [
  180. CommandStart,
  181. CommandRm,
  182. CommandBash,
  183. CommandCreateRepo,
  184. CommandExportKey,
  185. CommandCreateSecrets,
  186. ]:
  187. p = subparsers.add_parser(sub.command, help=sub.help)
  188. sub.init_subparser(p)
  189. p.set_defaults(command=sub)
  190. return parser
  191. class Command:
  192. def __init__(self, namespace) -> None:
  193. for k, v in vars(namespace).items():
  194. if k != "type_":
  195. setattr(self, k, v)
  196. @classmethod
  197. def init_subparser(cls, p): ...
  198. class CommandStart(Command):
  199. command = "start"
  200. help = "start container"
  201. def run(
  202. self,
  203. *,
  204. container: BorgmaticContainer,
  205. data_sources: list[Path],
  206. secret_sources: list[Secret],
  207. **kwargs,
  208. ):
  209. container.run(data_sources, secret_sources)
  210. class CommandRm(Command):
  211. command = "rm"
  212. help = "remove container"
  213. def run(self, *, container: BorgmaticContainer, **kwargs):
  214. container.rm()
  215. class CommandBash(Command):
  216. command = "bash"
  217. help = "run shell in container"
  218. def run(self, *, container: BorgmaticContainer, **kwargs):
  219. container.exec(["bash"])
  220. class CommandCreateRepo(Command):
  221. command = "create_repo"
  222. help = "create repository"
  223. env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
  224. def run(self, *, container: BorgmaticContainer, **kwargs):
  225. container.exec(
  226. ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars
  227. )
  228. class CommandExportKey(Command):
  229. command = "export_key"
  230. help = "export the repository key"
  231. env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
  232. def run(self, *, container: BorgmaticContainer, **kwargs):
  233. container.exec(["borgmatic", "export", "key"], self.env_vars)
  234. class CommandCreateSecrets(Command):
  235. command = "create_secrets"
  236. help = "create podman secrets"
  237. def run(self, *, secret_sources: list[Secret], **kwargs):
  238. keepass = KeePass.new(self.keepass_path)
  239. for s in secret_sources:
  240. s.create(keepass)
  241. @classmethod
  242. def init_subparser(cls, p):
  243. p.add_argument("keepass_path", type=Path, help="Path to the keepass")
  244. def main():
  245. login = os.getlogin()
  246. hostname = socket.gethostname()
  247. secret_sources = Secret.read_sources(hostname, login)
  248. data_sources = read_data_sources(hostname, login)
  249. if not secret_sources:
  250. print("no secret required ?")
  251. container = BorgmaticContainer.new(hostname, login)
  252. parser = CliArguments.new()
  253. command = CliArguments.read_command(parser)
  254. command.run(
  255. secret_sources=secret_sources,
  256. data_sources=data_sources,
  257. container=container,
  258. )
  259. if __name__ == "__main__":
  260. main()