start.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import os
  2. import sys
  3. import subprocess
  4. import socket
  5. from pathlib import Path, PurePosixPath
  6. from dataclasses import dataclass
  7. from typing import Any
  8. is_windows = os.name == "nt"
  9. def read_data_sources(hostname: str, login: str) -> list[Path]:
  10. file = Path(f"./data_sources_{hostname}_{login}")
  11. with open(file) as f:
  12. paths = f.readlines()
  13. return [Path(p_str.strip()).expanduser() for p_str in paths]
  14. @dataclass
  15. class KeePass:
  16. path: Path
  17. bin: str | Path
  18. def read_entry_attribute(self, key, attribute):
  19. return self._exec(["show", "-a", attribute, self.path, key]).strip()
  20. def read_entry_attachment(self, key, attachment):
  21. return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
  22. def _exec(self, args: list[Any]):
  23. try:
  24. return subprocess.check_output([self.bin] + args, text=True)
  25. except subprocess.CalledProcessError as e:
  26. print("\nThere was an error on call to keepass, please check the outout")
  27. exit(1)
  28. @classmethod
  29. def new(cls, path: Path):
  30. binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
  31. return cls(path=path, bin=binary)
  32. @dataclass
  33. class Secret:
  34. name: str
  35. mode: int
  36. def create(self, keepass: KeePass):
  37. ...
  38. @classmethod
  39. def from_line(cls, line: str):
  40. type_, *args = line.split(",")
  41. match type_:
  42. case "file":
  43. return SecretFile.from_line(*args)
  44. case "keepass-attribute":
  45. return SecretKeepassAttribute.from_line(*args)
  46. case "keepass-attachment":
  47. return SecretKeepassAttachment.from_line(*args)
  48. @classmethod
  49. def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
  50. file = Path(f"./secret_sources_{hostname}_{login}")
  51. with open(file) as f:
  52. lines = f.readlines()
  53. return [cls.from_line(l.strip()) for l in lines]
  54. @dataclass
  55. class SecretKeepassAttachment(Secret):
  56. key: str
  57. attachment: str
  58. def create(self, keepass: KeePass):
  59. value = keepass.read_entry_attachment(self.key, self.attachment)
  60. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  61. print(args)
  62. subprocess.run(args, input=value.encode())
  63. @classmethod
  64. def from_line(cls, key, attachment):
  65. return cls(name=key, key=key, mode=0o0400, attachment=attachment)
  66. @dataclass
  67. class SecretKeepassAttribute(Secret):
  68. key: str
  69. attribute: str
  70. def create(self, keepass: KeePass):
  71. value = keepass.read_entry_attribute(self.key, self.attribute)
  72. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  73. print(args)
  74. subprocess.run(args, input=value.encode())
  75. @classmethod
  76. def from_line(cls, key, attribute):
  77. return cls(name=key, key=key, mode=0o0400, attribute=attribute)
  78. @dataclass
  79. class SecretFile(Secret):
  80. host_path: Path
  81. def create(self, keepass: KeePass):
  82. args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
  83. print(args)
  84. subprocess.run(args)
  85. @classmethod
  86. def from_line(cls, path: str):
  87. path = Path(path).expanduser()
  88. return cls(host_path=path, name=path.name, mode=0o0400)
  89. def to_source_path(path: Path):
  90. mount_base = PurePosixPath("/mnt") / "source"
  91. inner_path = PurePosixPath(path)
  92. with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:])
  93. return mount_base / with_drive.relative_to(with_drive.anchor)
  94. def start_borgmatic_container(hostname: str, login: str, secret_sources: list[Secret]):
  95. data_sources = read_data_sources(hostname, login)
  96. container_name = f"borgmatic_{login}"
  97. ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
  98. data_path = Path.cwd() / "data"
  99. config_d_path = data_path / "borgmatic.d"
  100. config_path = data_path / "borgmatic"
  101. history_file = data_path / ".bash_history"
  102. history_file.touch()
  103. ssh_config_path = Path.home() / ".ssh"
  104. volumes = [
  105. f"{config_d_path}:/etc/borgmatic.d/",
  106. f"{config_path}:/etc/borgmatic/",
  107. f"{ssh_config_path}:/root/.ssh",
  108. f"{history_file}:/root/.bash_history",
  109. "borg_config:/root/.config/borg",
  110. "borg_cache:/root/.cache/borg",
  111. "borgmatic_state:/root/.local/state/borgmatic",
  112. ]
  113. if ssh_auth_sock:
  114. volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
  115. volumes += [
  116. f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources
  117. ]
  118. volume_args = [a for vol in volumes for a in ["-v", vol]]
  119. secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
  120. image_name = "ghcr.io/borgmatic-collective/borgmatic"
  121. args = [
  122. "podman",
  123. "run",
  124. "-h",
  125. hostname,
  126. "--detach",
  127. "--name",
  128. container_name,
  129. "-e",
  130. "SSH_AUTH_SOCK",
  131. "-e",
  132. "TZ=Europe/Paris",
  133. "-e",
  134. "SSH_KEY_NAME",
  135. "-e",
  136. f"HOST_LOGIN={login}",
  137. "--security-opt=label=disable"
  138. ] + volume_args + secrets_args + [image_name]
  139. print(args)
  140. subprocess.run(args)
  141. def main():
  142. login = os.getlogin()
  143. hostname = socket.gethostname()
  144. secret_sources = Secret.read_sources(hostname, login)
  145. if not secret_sources:
  146. print("no secret required ?")
  147. try:
  148. if sys.argv[1] == "create_secrets":
  149. keepass_path = Path(sys.argv[2])
  150. keepass = KeePass.new(keepass_path)
  151. for s in secret_sources:
  152. s.create(keepass)
  153. elif sys.argv[1] == "start":
  154. start_borgmatic_container(hostname, login, secret_sources)
  155. elif sys.argv[1] == "rm":
  156. subprocess.run(["podman", "rm", "-f", f"borgmatic_{login}"])
  157. elif sys.argv[1] == "bash":
  158. subprocess.run(["podman", "exec", "-ti", f"borgmatic_{login}", "bash"])
  159. except IndexError:
  160. print("You should provide an argument")
  161. exit(1)
  162. if __name__ == "__main__":
  163. main()