start.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import os
  2. import sys
  3. import subprocess
  4. import socket
  5. from pathlib import Path, PurePosixPath
  6. from dataclasses import dataclass
  7. from typing import Any
  8. from enum import StrEnum
  9. is_windows = os.name == "nt"
  10. def read_data_sources(hostname: str, login: str) -> list[Path]:
  11. file = Path(f"./data_sources_{hostname}_{login}")
  12. with open(file) as f:
  13. paths = f.readlines()
  14. return [Path(p_str.strip()).expanduser() for p_str in paths]
  15. @dataclass
  16. class KeePass:
  17. path: Path
  18. bin: str | Path
  19. def read_entry_attribute(self, key, attribute):
  20. return self._exec(["show", "-a", attribute, self.path, key]).strip()
  21. def read_entry_attachment(self, key, attachment):
  22. return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
  23. def _exec(self, args: list[Any]):
  24. try:
  25. return subprocess.check_output([self.bin] + args, text=True)
  26. except subprocess.CalledProcessError as e:
  27. print("\nThere was an error on call to keepass, please check the outout")
  28. exit(1)
  29. @classmethod
  30. def new(cls, path: Path):
  31. binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
  32. return cls(path=path, bin=binary)
  33. class SecretType(StrEnum):
  34. File="file"
  35. KeepassAttribute="keepass-attribute"
  36. KeepassAttachment="keepass-attachment"
  37. @dataclass
  38. class Secret:
  39. name: str
  40. mode: int
  41. type: SecretType
  42. host_path: Path | None = None
  43. key: str | None = None
  44. attribute: str | None = None
  45. attachment: str | None = None
  46. def create(self, keepass: KeePass):
  47. match self.type:
  48. case SecretType.File:
  49. args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
  50. print(args)
  51. subprocess.run(args)
  52. case SecretType.KeepassAttribute:
  53. value = keepass.read_entry_attribute(self.key, self.attribute)
  54. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  55. print(args)
  56. subprocess.run(args, input=value.encode())
  57. case SecretType.KeepassAttachment:
  58. value = keepass.read_entry_attachment(self.key, self.attachment)
  59. args = ["podman", "secret", "create", "--replace", self.name, "-"]
  60. print(args)
  61. subprocess.run(args, input=value.encode())
  62. @classmethod
  63. def from_line(cls, line: str):
  64. type_, *args = line.split(",")
  65. match (SecretType(type_), *args):
  66. case (SecretType.File, path):
  67. path = Path(path).expanduser()
  68. return cls(host_path=path, name=path.name, mode=0o0400, type=SecretType.File)
  69. case (SecretType.KeepassAttribute, key, attribute):
  70. return cls(name=key, key=key, mode=0o0400, type=SecretType.KeepassAttribute, attribute=attribute)
  71. case (SecretType.KeepassAttachment, key, attachment):
  72. return cls(name=key, key=key, mode=0o0400, type=SecretType.KeepassAttachment, attachment=attachment)
  73. @classmethod
  74. def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
  75. file = Path(f"./secret_sources_{hostname}_{login}")
  76. with open(file) as f:
  77. lines = f.readlines()
  78. return [cls.from_line(l.strip()) for l in lines]
  79. def to_source_path(path: Path):
  80. mount_base = PurePosixPath("/mnt") / "source"
  81. inner_path = PurePosixPath(path)
  82. with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:])
  83. return mount_base / with_drive.relative_to(with_drive.anchor)
  84. def start_borgmatic_container(hostname: str, login: str, secret_sources: list[Secret]):
  85. data_sources = read_data_sources(hostname, login)
  86. container_name = f"borgmatic_{login}"
  87. ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
  88. data_path = Path.cwd() / "data"
  89. config_d_path = data_path / "borgmatic.d"
  90. config_path = data_path / "borgmatic"
  91. history_file = data_path / ".bash_history"
  92. history_file.touch()
  93. ssh_config_path = Path.home() / ".ssh"
  94. volumes = [
  95. f"{config_d_path}:/etc/borgmatic.d/",
  96. f"{config_path}:/etc/borgmatic/",
  97. f"{ssh_config_path}:/root/.ssh",
  98. f"{history_file}:/root/.bash_history",
  99. "borg_config:/root/.config/borg",
  100. "borg_cache:/root/.cache/borg",
  101. "borgmatic_state:/root/.local/state/borgmatic",
  102. ]
  103. if ssh_auth_sock:
  104. volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
  105. volumes += [
  106. f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources
  107. ]
  108. volume_args = [a for vol in volumes for a in ["-v", vol]]
  109. secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
  110. image_name = "ghcr.io/borgmatic-collective/borgmatic"
  111. args = [
  112. "podman",
  113. "run",
  114. "-h",
  115. hostname,
  116. "--detach",
  117. "--name",
  118. container_name,
  119. "-e",
  120. "SSH_AUTH_SOCK",
  121. "-e",
  122. "TZ=Europe/Paris",
  123. "-e",
  124. "SSH_KEY_NAME",
  125. "-e",
  126. f"HOST_LOGIN={login}",
  127. "--security-opt=label=disable"
  128. ] + volume_args + secrets_args + [image_name]
  129. print(args)
  130. subprocess.run(args)
  131. def main():
  132. login = os.getlogin()
  133. hostname = socket.gethostname()
  134. secret_sources = Secret.read_sources(hostname, login)
  135. if not secret_sources:
  136. print("no secret required ?")
  137. try:
  138. if sys.argv[1] == "create_secrets":
  139. keepass_path = Path(sys.argv[2])
  140. keepass = KeePass.new(keepass_path)
  141. for s in secret_sources:
  142. s.create(keepass)
  143. elif sys.argv[1] == "start":
  144. start_borgmatic_container(hostname, login, secret_sources)
  145. elif sys.argv[1] == "rm":
  146. subprocess.run(["podman", "rm", "-f", f"borgmatic_{login}"])
  147. elif sys.argv[1] == "bash":
  148. subprocess.run(["podman", "exec", "-ti", f"borgmatic_{login}", "bash"])
  149. except IndexError:
  150. print("You should provide an argument")
  151. exit(1)
  152. if __name__ == "__main__":
  153. main()