start.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import argparse
  2. import os
  3. import sys
  4. import subprocess
  5. import socket
  6. from pathlib import Path, PurePosixPath
  7. from dataclasses import dataclass
  8. from typing import Any
  9. is_windows = os.name == "nt"
  10. def read_data_sources(file: Path) -> list[Path]:
  11. with open(file) as f:
  12. paths = f.readlines()
  13. return [Path(p_str.strip()).expanduser() for p_str in paths]
  14. @dataclass
  15. class KeePass:
  16. path: Path
  17. bin: str | Path
  18. def read_entry_attribute(self, key, attribute):
  19. return self._exec(["show", "-a", attribute, self.path, key]).strip()
  20. def read_entry_attachment(self, key, attachment):
  21. return self._exec(
  22. ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"]
  23. )
  24. def _exec(self, args: list[Any]):
  25. try:
  26. return subprocess.check_output([self.bin] + args, text=True)
  27. except subprocess.CalledProcessError as e:
  28. print("\nThere was an error on call to keepass, please check the outout")
  29. exit(1)
  30. @classmethod
  31. def new(cls, path: Path):
  32. binary = (
  33. Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe"
  34. if is_windows
  35. else "keepassxc-cli"
  36. )
  37. return cls(path=path, bin=binary)
  38. @dataclass
  39. class Secret:
  40. name: str
  41. mode: int
  42. def create(self, keepass: KeePass): ...
  43. @classmethod
  44. def from_line(cls, line: str):
  45. name, type_, *args = line.split(",")
  46. match type_:
  47. case "file":
  48. sub_class = SecretFile
  49. case "keepass-attribute":
  50. sub_class = SecretKeepassAttribute
  51. case "keepass-attachment":
  52. sub_class = SecretKeepassAttachment
  53. return sub_class.from_line(name, *args)
  54. @classmethod
  55. def read_sources(cls, file: Path) -> list["Secret"]:
  56. with open(file) as f:
  57. lines = f.readlines()
  58. return [cls.from_line(l.strip()) for l in lines]
  59. @dataclass
  60. class SecretKeepassAttachment(Secret):
  61. key: str
  62. attachment: str
  63. def create(self, keepass: KeePass):
  64. value = keepass.read_entry_attachment(self.key, self.attachment)
  65. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  66. print(args)
  67. subprocess.run(args, input=value.encode())
  68. @classmethod
  69. def from_line(cls, name: str, key: str, attachment: str):
  70. return cls(name=name, key=key, mode=0o0400, attachment=attachment)
  71. @dataclass
  72. class SecretKeepassAttribute(Secret):
  73. key: str
  74. attribute: str
  75. def create(self, keepass: KeePass):
  76. value = keepass.read_entry_attribute(self.key, self.attribute)
  77. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  78. print(args)
  79. subprocess.run(args, input=value.encode())
  80. @classmethod
  81. def from_line(cls, name: str, key: str, attribute: str):
  82. return cls(name=name, key=key, mode=0o0400, attribute=attribute)
  83. @dataclass
  84. class SecretFile(Secret):
  85. host_path: Path
  86. def create(self, keepass: KeePass):
  87. args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
  88. print(args)
  89. subprocess.run(args)
  90. @classmethod
  91. def from_line(cls, name: str, path: str):
  92. path = Path(path).expanduser()
  93. return cls(host_path=path, name=name, mode=0o0400)
  94. @dataclass
  95. class Configuration:
  96. secret_sources: list[Secret]
  97. data_sources: list[Path]
  98. borgmatic_d_path: Path
  99. borgmatic_path: Path
  100. history_file: Path
  101. ssh_auth_sock: Path | None
  102. @staticmethod
  103. def get_config_dir() -> Path:
  104. if is_windows:
  105. program_data = Path(os.getenv("ProgramData"))
  106. return program_data / "pc_backup"
  107. else:
  108. return Path.home() / ".config" / "pc_backup"
  109. @classmethod
  110. def read(cls, hostname: str, login: str, config_dir: Path):
  111. secret_sources_file = config_dir / f"secret_sources_{hostname}_{login}"
  112. data_sources_file = config_dir / f"data_sources_{hostname}_{login}"
  113. ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
  114. return cls(
  115. secret_sources=Secret.read_sources(secret_sources_file),
  116. data_sources=read_data_sources(data_sources_file),
  117. borgmatic_d_path=config_dir / "borgmatic.d",
  118. borgmatic_path=config_dir / "borgmatic",
  119. history_file=config_dir / f".bash_history_{login}",
  120. ssh_auth_sock=Path(ssh_auth_sock) if ssh_auth_sock else None
  121. )
  122. @dataclass
  123. class BorgmaticContainer:
  124. hostname: str
  125. login: str
  126. name: str
  127. image: str = "ghcr.io/borgmatic-collective/borgmatic"
  128. def run(self, config: Configuration):
  129. container_name = f"borgmatic_{self.login}"
  130. config.history_file.touch()
  131. volumes = [
  132. f"{config.borgmatic_d_path}:/etc/borgmatic.d/",
  133. f"{config.borgmatic_path}:/etc/borgmatic/",
  134. f"{config.history_file}:/root/.bash_history",
  135. "borg_ssh_dir:/root/.ssh",
  136. "borg_config:/root/.config/borg",
  137. "borg_cache:/root/.cache/borg",
  138. "borgmatic_state:/root/.local/state/borgmatic",
  139. "borgmatic_log:/root/.local/share/borgmatic",
  140. ]
  141. if config.ssh_auth_sock:
  142. volumes += [f"{config.ssh_auth_sock}:{config.ssh_auth_sock}:Z"]
  143. volumes += [
  144. f"{vol}:{self.to_source_path(vol)}:ro" for vol in config.data_sources
  145. ]
  146. volume_args = [a for vol in volumes for a in ["-v", vol]]
  147. secrets_args = [
  148. a
  149. for s in config.secret_sources
  150. for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]
  151. ]
  152. args = (
  153. [
  154. "podman",
  155. "run",
  156. "-h",
  157. self.hostname,
  158. "--detach",
  159. "--name",
  160. container_name,
  161. "-e",
  162. "SSH_AUTH_SOCK",
  163. "-e",
  164. "TZ=Europe/Paris",
  165. "-e",
  166. "SSH_KEY_NAME",
  167. "-e",
  168. f"HOST_LOGIN={self.login}",
  169. "--security-opt=label=disable",
  170. ]
  171. + volume_args
  172. + secrets_args
  173. + [self.image]
  174. )
  175. print(args)
  176. subprocess.run(args)
  177. def rm(self):
  178. subprocess.run(["podman", "rm", "-f", self.name])
  179. def exec(self, cmd: list[str], env_vars: list[str] = []):
  180. args = ["podman", "exec", "-ti"]
  181. args += [a for var in env_vars for a in ["-e", var]]
  182. subprocess.run(args + [self.name] + cmd)
  183. @staticmethod
  184. def to_source_path(path: Path):
  185. mount_base = PurePosixPath("/mnt") / "source"
  186. inner_path = PurePosixPath(path)
  187. with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(
  188. *inner_path.parts[1:]
  189. )
  190. return mount_base / with_drive.relative_to(with_drive.anchor)
  191. @classmethod
  192. def new(cls, hostname: str, login: str):
  193. return cls(hostname, login, f"borgmatic_{login}")
  194. class CliArguments:
  195. @staticmethod
  196. def read_command(parser):
  197. args = parser.parse_args()
  198. return args.command(args)
  199. @staticmethod
  200. def new() -> argparse.ArgumentParser:
  201. parser = argparse.ArgumentParser(prog=sys.argv[0])
  202. subparsers = parser.add_subparsers()
  203. for sub in [
  204. CommandStart,
  205. CommandRm,
  206. CommandBash,
  207. CommandCreateRepo,
  208. CommandExportKey,
  209. CommandCreateSecrets,
  210. ]:
  211. p = subparsers.add_parser(sub.command, help=sub.help)
  212. sub.init_subparser(p)
  213. p.set_defaults(command=sub)
  214. return parser
  215. class Command:
  216. def __init__(self, namespace) -> None:
  217. for k, v in vars(namespace).items():
  218. if k != "type_":
  219. setattr(self, k, v)
  220. @classmethod
  221. def init_subparser(cls, p): ...
  222. class CommandStart(Command):
  223. command = "start"
  224. help = "start container"
  225. def run(
  226. self,
  227. *,
  228. container: BorgmaticContainer,
  229. config: Configuration,
  230. **kwargs,
  231. ):
  232. container.run(config)
  233. class CommandRm(Command):
  234. command = "rm"
  235. help = "remove container"
  236. def run(self, *, container: BorgmaticContainer, **kwargs):
  237. container.rm()
  238. class CommandBash(Command):
  239. command = "bash"
  240. help = "run shell in container"
  241. def run(self, *, container: BorgmaticContainer, **kwargs):
  242. container.exec(["bash"])
  243. class CommandCreateRepo(Command):
  244. command = "create_repo"
  245. help = "create repository"
  246. env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
  247. def run(self, *, container: BorgmaticContainer, **kwargs):
  248. container.exec(
  249. ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars
  250. )
  251. class CommandExportKey(Command):
  252. command = "export_key"
  253. help = "export the repository key"
  254. env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
  255. def run(self, *, container: BorgmaticContainer, **kwargs):
  256. container.exec(["borgmatic", "export", "key"], self.env_vars)
  257. class CommandCreateSecrets(Command):
  258. command = "create_secrets"
  259. help = "create podman secrets"
  260. def run(self, *, secret_sources: list[Secret], **kwargs):
  261. keepass = KeePass.new(self.keepass_path)
  262. for s in secret_sources:
  263. s.create(keepass)
  264. @classmethod
  265. def init_subparser(cls, p):
  266. p.add_argument("keepass_path", type=Path, help="Path to the keepass")
  267. def main():
  268. login = os.getlogin()
  269. hostname = socket.gethostname()
  270. config = Configuration.read(hostname, login, Configuration.get_config_dir())
  271. if not config.secret_sources:
  272. print("no secret required ?")
  273. container = BorgmaticContainer.new(hostname, login)
  274. parser = CliArguments.new()
  275. command = CliArguments.read_command(parser)
  276. command.run(
  277. config=config,
  278. secret_sources=config.secret_sources,
  279. data_sources=config.data_sources,
  280. container=container,
  281. )
  282. if __name__ == "__main__":
  283. main()