start.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import argparse
  2. import os
  3. import sys
  4. import subprocess
  5. import socket
  6. from pathlib import Path, PurePosixPath
  7. from dataclasses import dataclass
  8. from typing import Any
  9. is_windows = os.name == "nt"
  10. def read_data_sources(hostname: str, login: str) -> list[Path]:
  11. file = Path(f"./data_sources_{hostname}_{login}")
  12. with open(file) as f:
  13. paths = f.readlines()
  14. return [Path(p_str.strip()).expanduser() for p_str in paths]
  15. @dataclass
  16. class KeePass:
  17. path: Path
  18. bin: str | Path
  19. def read_entry_attribute(self, key, attribute):
  20. return self._exec(["show", "-a", attribute, self.path, key]).strip()
  21. def read_entry_attachment(self, key, attachment):
  22. return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
  23. def _exec(self, args: list[Any]):
  24. try:
  25. return subprocess.check_output([self.bin] + args, text=True)
  26. except subprocess.CalledProcessError as e:
  27. print("\nThere was an error on call to keepass, please check the outout")
  28. exit(1)
  29. @classmethod
  30. def new(cls, path: Path):
  31. binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
  32. return cls(path=path, bin=binary)
  33. @dataclass
  34. class Secret:
  35. name: str
  36. mode: int
  37. def create(self, keepass: KeePass):
  38. ...
  39. @classmethod
  40. def from_line(cls, line: str):
  41. type_, *args = line.split(",")
  42. match type_:
  43. case "file":
  44. return SecretFile.from_line(*args)
  45. case "keepass-attribute":
  46. return SecretKeepassAttribute.from_line(*args)
  47. case "keepass-attachment":
  48. return SecretKeepassAttachment.from_line(*args)
  49. @classmethod
  50. def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
  51. file = Path(f"./secret_sources_{hostname}_{login}")
  52. with open(file) as f:
  53. lines = f.readlines()
  54. return [cls.from_line(l.strip()) for l in lines]
  55. @dataclass
  56. class SecretKeepassAttachment(Secret):
  57. key: str
  58. attachment: str
  59. def create(self, keepass: KeePass):
  60. value = keepass.read_entry_attachment(self.key, self.attachment)
  61. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  62. print(args)
  63. subprocess.run(args, input=value.encode())
  64. @classmethod
  65. def from_line(cls, key, attachment):
  66. return cls(name=key, key=key, mode=0o0400, attachment=attachment)
  67. @dataclass
  68. class SecretKeepassAttribute(Secret):
  69. key: str
  70. attribute: str
  71. def create(self, keepass: KeePass):
  72. value = keepass.read_entry_attribute(self.key, self.attribute)
  73. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  74. print(args)
  75. subprocess.run(args, input=value.encode())
  76. @classmethod
  77. def from_line(cls, key, attribute):
  78. return cls(name=key, key=key, mode=0o0400, attribute=attribute)
  79. @dataclass
  80. class SecretFile(Secret):
  81. host_path: Path
  82. def create(self, keepass: KeePass):
  83. args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
  84. print(args)
  85. subprocess.run(args)
  86. @classmethod
  87. def from_line(cls, path: str):
  88. path = Path(path).expanduser()
  89. return cls(host_path=path, name=path.name, mode=0o0400)
  90. def to_source_path(path: Path):
  91. mount_base = PurePosixPath("/mnt") / "source"
  92. inner_path = PurePosixPath(path)
  93. with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:])
  94. return mount_base / with_drive.relative_to(with_drive.anchor)
  95. @dataclass
  96. class BorgmaticContainer:
  97. hostname: str
  98. login: str
  99. name: str
  100. image: str = "ghcr.io/borgmatic-collective/borgmatic"
  101. def run(self, data_sources: list[Path], secret_sources: list[Secret]):
  102. container_name = f"borgmatic_{self.login}"
  103. ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
  104. data_path = Path.cwd() / "data"
  105. config_d_path = data_path / "borgmatic.d"
  106. config_path = data_path / "borgmatic"
  107. history_file = data_path / ".bash_history"
  108. history_file.touch()
  109. volumes = [
  110. f"{config_d_path}:/etc/borgmatic.d/",
  111. f"{config_path}:/etc/borgmatic/",
  112. f"{history_file}:/root/.bash_history",
  113. "borg_ssh_dir:/root/.ssh",
  114. "borg_config:/root/.config/borg",
  115. "borg_cache:/root/.cache/borg",
  116. "borgmatic_state:/root/.local/state/borgmatic",
  117. ]
  118. if ssh_auth_sock:
  119. volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
  120. volumes += [
  121. f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources
  122. ]
  123. volume_args = [a for vol in volumes for a in ["-v", vol]]
  124. secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
  125. args = [
  126. "podman",
  127. "run",
  128. "-h",
  129. self.hostname,
  130. "--detach",
  131. "--name",
  132. container_name,
  133. "-e",
  134. "SSH_AUTH_SOCK",
  135. "-e",
  136. "TZ=Europe/Paris",
  137. "-e",
  138. "SSH_KEY_NAME",
  139. "-e",
  140. f"HOST_LOGIN={self.login}",
  141. "--security-opt=label=disable"
  142. ] + volume_args + secrets_args + [self.image]
  143. print(args)
  144. subprocess.run(args)
  145. def rm(self):
  146. subprocess.run(["podman", "rm", "-f", self.name])
  147. def exec(self, cmd: list[str], env_vars: list[str] = []):
  148. args = ["podman", "exec", "-ti"]
  149. args += [a for var in env_vars for a in ["-e", var]]
  150. subprocess.run(args + [self.name] + cmd)
  151. @classmethod
  152. def new(cls, hostname: str, login: str):
  153. return cls(hostname, login, f"borgmatic_{login}")
  154. class CliArguments:
  155. def __init__(self, namespace) -> None:
  156. for k, v in vars(namespace).items():
  157. if k != "type_":
  158. setattr(self, k, v)
  159. @classmethod
  160. def init_subparser(cls, p):
  161. ...
  162. @classmethod
  163. def new(cls) -> None:
  164. parser = argparse.ArgumentParser(prog=sys.argv[0])
  165. subparsers = parser.add_subparsers()
  166. for sub in [
  167. CliArgumentsStart,
  168. CliArgumentsRm,
  169. CliArgumentsBash,
  170. CliArgumentsCreateRepo,
  171. CliArgumentsExportKey,
  172. CliArgumentsCreateSecrets
  173. ]:
  174. p = subparsers.add_parser(sub.command, help=sub.help)
  175. sub.init_subparser(p)
  176. p.set_defaults(type_=sub)
  177. args = parser.parse_args()
  178. return args.type_(args)
  179. class CliArgumentsStart(CliArguments):
  180. command = "start"
  181. help = "start container"
  182. class CliArgumentsRm(CliArguments):
  183. command = "rm"
  184. help = "remove container"
  185. class CliArgumentsBash(CliArguments):
  186. command = "bash"
  187. help = "run shell in container"
  188. class CliArgumentsCreateRepo(CliArguments):
  189. command = "create_repo"
  190. help = "create repository"
  191. class CliArgumentsExportKey(CliArguments):
  192. command = "export_key"
  193. help = "export the repository key"
  194. class CliArgumentsCreateSecrets(CliArguments):
  195. command="create_secrets"
  196. help="create podman secrets"
  197. @classmethod
  198. def init_subparser(cls, p):
  199. p.add_argument("keepass_path", type=Path, help="Path to the keepass")
  200. def main():
  201. login = os.getlogin()
  202. hostname = socket.gethostname()
  203. secret_sources = Secret.read_sources(hostname, login)
  204. data_sources = read_data_sources(hostname, login)
  205. args = CliArguments.new()
  206. if not secret_sources:
  207. print("no secret required ?")
  208. container = BorgmaticContainer.new(hostname, login)
  209. env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
  210. if isinstance(args, CliArgumentsCreateSecrets):
  211. keepass = KeePass.new(args.keepass_path)
  212. for s in secret_sources:
  213. s.create(keepass)
  214. elif isinstance(args, CliArgumentsStart):
  215. container.run(data_sources, secret_sources)
  216. elif isinstance(args, CliArgumentsRm):
  217. container.rm()
  218. elif isinstance(args, CliArgumentsBash):
  219. container.exec(["bash"])
  220. elif isinstance(args, CliArgumentsCreateRepo):
  221. container.exec(["borgmatic", "repo-create", "--encryption", "repokey"], env_vars)
  222. elif isinstance(args, CliArgumentsExportKey):
  223. container.exec(["borgmatic", "export", "key"], env_vars)
  224. if __name__ == "__main__":
  225. main()