import os import sys import subprocess import socket from pathlib import Path, PurePosixPath from dataclasses import dataclass from typing import Any from enum import StrEnum is_windows = os.name == "nt" def read_data_sources(hostname: str, login: str) -> list[Path]: file = Path(f"./data_sources_{hostname}_{login}") with open(file) as f: paths = f.readlines() return [Path(p_str.strip()).expanduser() for p_str in paths] @dataclass class KeePass: path: Path bin: str | Path def read_entry_attribute(self, key, attribute): return self._exec(["show", "-a", attribute, self.path, key]).strip() def _exec(self, args: list[Any]): return subprocess.check_output([self.bin] + args, text=True) @classmethod def new(cls, path: Path): binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli" return cls(path=path, bin=binary) class SecretType(StrEnum): File="file" KeepassAttribute="keepass-attribute" @dataclass class Secret: name: str mode: int type: SecretType host_path: Path | None = None key: str | None = None attribute: str | None = None def create(self, keepass: KeePass): match self.type: case SecretType.File: args = ["podman", "secret", "create", "--replace", self.name, self.host_path] print(args) subprocess.run(args) case SecretType.KeepassAttribute: value = keepass.read_entry_attribute(self.key, self.attribute) args = ["podman", "secret", "create", "--replace", self.name, "-"] print(args) subprocess.run(args, input=value.encode()) @classmethod def from_line(cls, line: str): type_, *args = line.split(",") match (SecretType(type_), *args): case (SecretType.File, path): path = Path(path).expanduser() return cls(host_path=path, name=path.name, mode=0o0400, type=SecretType.File) case (SecretType.KeepassAttribute, key, attribute): return cls(name=key, key=key, mode=0o0400, type=SecretType.KeepassAttribute, attribute=attribute) @classmethod def read_sources(cls, hostname: str, login: str) -> list["Secret"]: file = Path(f"./secret_sources_{hostname}_{login}") with open(file) as f: lines = f.readlines() return [cls.from_line(l.strip()) for l in lines] def to_source_path(path: Path): mount_base = PurePosixPath("/mnt") / "source" inner_path = PurePosixPath(path) with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:]) return mount_base / with_drive.relative_to(with_drive.anchor) def start_borgmatic_container(hostname: str, login: str, secret_sources: list[Secret]): data_sources = read_data_sources(hostname, login) container_name = f"borgmatic_{login}" ssh_auth_sock = os.getenv("SSH_AUTH_SOCK") data_path = Path.cwd() / "data" config_d_path = data_path / "borgmatic.d" config_path = data_path / "borgmatic" history_file = data_path / ".bash_history" history_file.touch() ssh_config_path = Path.home() / ".ssh" volumes = [ f"{config_d_path}:/etc/borgmatic.d/", f"{config_path}:/etc/borgmatic/", f"{ssh_config_path}:/root/.ssh", f"{history_file}:/root/.bash_history", "borg_config:/root/.config/borg", "borg_cache:/root/.cache/borg", "borgmatic_state:/root/.local/state/borgmatic", ] if ssh_auth_sock: volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"] volumes += [ f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources ] volume_args = [a for vol in volumes for a in ["-v", vol]] secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]] image_name = "ghcr.io/borgmatic-collective/borgmatic" args = [ "podman", "run", "-h", hostname, "--detach", "--name", container_name, "-e", "SSH_AUTH_SOCK", "-e", "TZ=Europe/Paris", "-e", "SSH_KEY_NAME", "-e", f"HOST_LOGIN={login}", "--security-opt=label=disable" ] + volume_args + secrets_args + [image_name] print(args) subprocess.run(args) def main(): login = os.getlogin() hostname = socket.gethostname() secret_sources = Secret.read_sources(hostname, login) if not secret_sources: print("no secret required ?") try: if sys.argv[1] == "create_secrets": keepass_path = Path(sys.argv[2]) keepass = KeePass.new(keepass_path) for s in secret_sources: s.create(keepass) elif sys.argv[1] == "start": start_borgmatic_container(hostname, login, secret_sources) elif sys.argv[1] == "rm": subprocess.run(["podman", "rm", "-f", f"borgmatic_{login}"]) elif sys.argv[1] == "bash": subprocess.run(["podman", "exec", "-ti", f"borgmatic_{login}", "bash"]) except IndexError: print("You should provide an argument") exit(1) if __name__ == "__main__": main()