import argparse import os import sys import subprocess import socket from pathlib import Path, PurePosixPath from dataclasses import dataclass from typing import Any is_windows = os.name == "nt" def read_data_sources(file: Path) -> list[Path]: with open(file) as f: paths = f.readlines() return [Path(p_str.strip()).expanduser() for p_str in paths] @dataclass class KeePass: path: Path bin: str | Path def read_entry_attribute(self, key, attribute): return self._exec(["show", "-a", attribute, self.path, key]).strip() def read_entry_attachment(self, key, attachment): return self._exec( ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"] ) def _exec(self, args: list[Any]): try: return subprocess.check_output([self.bin] + args, text=True) except subprocess.CalledProcessError as e: print("\nThere was an error on call to keepass, please check the outout") exit(1) @classmethod def new(cls, path: Path): binary = ( Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli" ) return cls(path=path, bin=binary) @dataclass class Secret: name: str mode: int def create(self, keepass: KeePass): ... @classmethod def from_line(cls, line: str): name, type_, *args = line.split(",") match type_: case "file": sub_class = SecretFile case "keepass-attribute": sub_class = SecretKeepassAttribute case "keepass-attachment": sub_class = SecretKeepassAttachment return sub_class.from_line(name, *args) @classmethod def read_sources(cls, file: Path) -> list["Secret"]: with open(file) as f: lines = f.readlines() return [cls.from_line(l.strip()) for l in lines] @dataclass class SecretKeepassAttachment(Secret): key: str attachment: str def create(self, keepass: KeePass): value = keepass.read_entry_attachment(self.key, self.attachment) args = ["podman", "secret", "create", "--replace", self.name, "-"] print(args) subprocess.run(args, input=value.encode()) @classmethod def from_line(cls, name: str, key: str, attachment: str): return cls(name=name, key=key, mode=0o0400, attachment=attachment) @dataclass class SecretKeepassAttribute(Secret): key: str attribute: str def create(self, keepass: KeePass): value = keepass.read_entry_attribute(self.key, self.attribute) args = ["podman", "secret", "create", "--replace", self.name, "-"] print(args) subprocess.run(args, input=value.encode()) @classmethod def from_line(cls, name: str, key: str, attribute: str): return cls(name=name, key=key, mode=0o0400, attribute=attribute) @dataclass class SecretFile(Secret): host_path: Path def create(self, keepass: KeePass): args = ["podman", "secret", "create", "--replace", self.name, self.host_path] print(args) subprocess.run(args) @classmethod def from_line(cls, name: str, path: str): path = Path(path).expanduser() return cls(host_path=path, name=name, mode=0o0400) @dataclass class Configuration: secret_sources: list[Secret] data_sources: list[Path] borgmatic_d_path: Path borgmatic_path: Path history_file: Path ssh_auth_sock: Path | None @staticmethod def get_config_dir() -> Path: if is_windows: program_data = Path(os.getenv("ProgramData")) return program_data / "pc_backup" else: return Path.home() / ".config" / "pc_backup" @classmethod def read(cls, hostname: str, login: str, config_dir: Path): secret_sources_file = config_dir / f"secret_sources_{hostname}_{login}" data_sources_file = config_dir / f"data_sources_{hostname}_{login}" ssh_auth_sock = os.getenv("SSH_AUTH_SOCK") return cls( secret_sources=Secret.read_sources(secret_sources_file), data_sources=read_data_sources(data_sources_file), borgmatic_d_path=config_dir / "borgmatic.d", borgmatic_path=config_dir / "borgmatic", history_file=config_dir / ".bash_history", ssh_auth_sock=Path(ssh_auth_sock) if ssh_auth_sock else None ) @dataclass class BorgmaticContainer: hostname: str login: str name: str image: str = "ghcr.io/borgmatic-collective/borgmatic" def run(self, config: Configuration): container_name = f"borgmatic_{self.login}" config.history_file.touch() volumes = [ f"{config.borgmatic_d_path}:/etc/borgmatic.d/", f"{config.borgmatic_path}:/etc/borgmatic/", f"{config.history_file}:/root/.bash_history", "borg_ssh_dir:/root/.ssh", "borg_config:/root/.config/borg", "borg_cache:/root/.cache/borg", "borgmatic_state:/root/.local/state/borgmatic", "borgmatic_log:/root/.local/share/borgmatic", ] if config.ssh_auth_sock: volumes += [f"{config.ssh_auth_sock}:{config.ssh_auth_sock}:Z"] volumes += [ f"{vol}:{self.to_source_path(vol)}:ro" for vol in config.data_sources ] volume_args = [a for vol in volumes for a in ["-v", vol]] secrets_args = [ a for s in config.secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"] ] args = ( [ "podman", "run", "-h", self.hostname, "--detach", "--name", container_name, "-e", "SSH_AUTH_SOCK", "-e", "TZ=Europe/Paris", "-e", "SSH_KEY_NAME", "-e", f"HOST_LOGIN={self.login}", "--security-opt=label=disable", ] + volume_args + secrets_args + [self.image] ) print(args) subprocess.run(args) def rm(self): subprocess.run(["podman", "rm", "-f", self.name]) def exec(self, cmd: list[str], env_vars: list[str] = []): args = ["podman", "exec", "-ti"] args += [a for var in env_vars for a in ["-e", var]] subprocess.run(args + [self.name] + cmd) @staticmethod def to_source_path(path: Path): mount_base = PurePosixPath("/mnt") / "source" inner_path = PurePosixPath(path) with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath( *inner_path.parts[1:] ) return mount_base / with_drive.relative_to(with_drive.anchor) @classmethod def new(cls, hostname: str, login: str): return cls(hostname, login, f"borgmatic_{login}") class CliArguments: @staticmethod def read_command(parser): args = parser.parse_args() return args.command(args) @staticmethod def new() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog=sys.argv[0]) subparsers = parser.add_subparsers() for sub in [ CommandStart, CommandRm, CommandBash, CommandCreateRepo, CommandExportKey, CommandCreateSecrets, ]: p = subparsers.add_parser(sub.command, help=sub.help) sub.init_subparser(p) p.set_defaults(command=sub) return parser class Command: def __init__(self, namespace) -> None: for k, v in vars(namespace).items(): if k != "type_": setattr(self, k, v) @classmethod def init_subparser(cls, p): ... class CommandStart(Command): command = "start" help = "start container" def run( self, *, container: BorgmaticContainer, config: Configuration, **kwargs, ): container.run(config) class CommandRm(Command): command = "rm" help = "remove container" def run(self, *, container: BorgmaticContainer, **kwargs): container.rm() class CommandBash(Command): command = "bash" help = "run shell in container" def run(self, *, container: BorgmaticContainer, **kwargs): container.exec(["bash"]) class CommandCreateRepo(Command): command = "create_repo" help = "create repository" env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"] def run(self, *, container: BorgmaticContainer, **kwargs): container.exec( ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars ) class CommandExportKey(Command): command = "export_key" help = "export the repository key" env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"] def run(self, *, container: BorgmaticContainer, **kwargs): container.exec(["borgmatic", "export", "key"], self.env_vars) class CommandCreateSecrets(Command): command = "create_secrets" help = "create podman secrets" def run(self, *, secret_sources: list[Secret], **kwargs): keepass = KeePass.new(self.keepass_path) for s in secret_sources: s.create(keepass) @classmethod def init_subparser(cls, p): p.add_argument("keepass_path", type=Path, help="Path to the keepass") def main(): login = os.getlogin() hostname = socket.gethostname() config = Configuration.read(hostname, login, Configuration.get_config_dir()) if not config.secret_sources: print("no secret required ?") container = BorgmaticContainer.new(hostname, login) parser = CliArguments.new() command = CliArguments.read_command(parser) command.run( config=config, secret_sources=config.secret_sources, data_sources=config.data_sources, container=container, ) if __name__ == "__main__": main()