|
@@ -1,362 +0,0 @@
|
|
|
-import argparse
|
|
|
|
|
-import os
|
|
|
|
|
-import sys
|
|
|
|
|
-import subprocess
|
|
|
|
|
-import socket
|
|
|
|
|
-from pathlib import Path, PurePosixPath
|
|
|
|
|
-from dataclasses import dataclass
|
|
|
|
|
-from typing import Any
|
|
|
|
|
-
|
|
|
|
|
-is_windows = os.name == "nt"
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def read_data_sources(file: Path) -> list[Path]:
|
|
|
|
|
- with open(file) as f:
|
|
|
|
|
- paths = f.readlines()
|
|
|
|
|
- return [Path(p_str.strip()).expanduser() for p_str in paths]
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class KeePass:
|
|
|
|
|
- path: Path
|
|
|
|
|
- bin: str | Path
|
|
|
|
|
-
|
|
|
|
|
- def read_entry_attribute(self, key, attribute):
|
|
|
|
|
- return self._exec(["show", "-a", attribute, self.path, key]).strip()
|
|
|
|
|
-
|
|
|
|
|
- def read_entry_attachment(self, key, attachment):
|
|
|
|
|
- return self._exec(
|
|
|
|
|
- ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"]
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- def _exec(self, args: list[Any]):
|
|
|
|
|
- try:
|
|
|
|
|
- return subprocess.check_output([self.bin] + args, text=True)
|
|
|
|
|
- except subprocess.CalledProcessError as e:
|
|
|
|
|
- print("\nThere was an error on call to keepass, please check the outout")
|
|
|
|
|
- exit(1)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def new(cls, path: Path):
|
|
|
|
|
- binary = (
|
|
|
|
|
- Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe"
|
|
|
|
|
- if is_windows
|
|
|
|
|
- else "keepassxc-cli"
|
|
|
|
|
- )
|
|
|
|
|
- return cls(path=path, bin=binary)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class Secret:
|
|
|
|
|
- name: str
|
|
|
|
|
- mode: int
|
|
|
|
|
-
|
|
|
|
|
- def create(self, keepass: KeePass): ...
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def from_line(cls, line: str):
|
|
|
|
|
- name, type_, *args = line.split(",")
|
|
|
|
|
- match type_:
|
|
|
|
|
- case "file":
|
|
|
|
|
- sub_class = SecretFile
|
|
|
|
|
- case "keepass-attribute":
|
|
|
|
|
- sub_class = SecretKeepassAttribute
|
|
|
|
|
- case "keepass-attachment":
|
|
|
|
|
- sub_class = SecretKeepassAttachment
|
|
|
|
|
-
|
|
|
|
|
- return sub_class.from_line(name, *args)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def read_sources(cls, file: Path) -> list["Secret"]:
|
|
|
|
|
- with open(file) as f:
|
|
|
|
|
- lines = f.readlines()
|
|
|
|
|
- return [cls.from_line(l.strip()) for l in lines]
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class SecretKeepassAttachment(Secret):
|
|
|
|
|
- key: str
|
|
|
|
|
- attachment: str
|
|
|
|
|
-
|
|
|
|
|
- def create(self, keepass: KeePass):
|
|
|
|
|
- value = keepass.read_entry_attachment(self.key, self.attachment)
|
|
|
|
|
- args = ["podman", "secret", "create", "--replace", self.name, "-"]
|
|
|
|
|
- print(args)
|
|
|
|
|
- subprocess.run(args, input=value.encode())
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def from_line(cls, name: str, key: str, attachment: str):
|
|
|
|
|
- return cls(name=name, key=key, mode=0o0400, attachment=attachment)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class SecretKeepassAttribute(Secret):
|
|
|
|
|
- key: str
|
|
|
|
|
- attribute: str
|
|
|
|
|
-
|
|
|
|
|
- def create(self, keepass: KeePass):
|
|
|
|
|
- value = keepass.read_entry_attribute(self.key, self.attribute)
|
|
|
|
|
- args = ["podman", "secret", "create", "--replace", self.name, "-"]
|
|
|
|
|
- print(args)
|
|
|
|
|
- subprocess.run(args, input=value.encode())
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def from_line(cls, name: str, key: str, attribute: str):
|
|
|
|
|
- return cls(name=name, key=key, mode=0o0400, attribute=attribute)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class SecretFile(Secret):
|
|
|
|
|
- host_path: Path
|
|
|
|
|
-
|
|
|
|
|
- def create(self, keepass: KeePass):
|
|
|
|
|
- args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
|
|
|
|
|
- print(args)
|
|
|
|
|
- subprocess.run(args)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def from_line(cls, name: str, path: str):
|
|
|
|
|
- path = Path(path).expanduser()
|
|
|
|
|
- return cls(host_path=path, name=name, mode=0o0400)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class Configuration:
|
|
|
|
|
- secret_sources: list[Secret]
|
|
|
|
|
- data_sources: list[Path]
|
|
|
|
|
- borgmatic_d_path: Path
|
|
|
|
|
- borgmatic_path: Path
|
|
|
|
|
- history_file: Path
|
|
|
|
|
- ssh_auth_sock: Path | None
|
|
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def get_config_dir() -> Path:
|
|
|
|
|
- if is_windows:
|
|
|
|
|
- program_data = Path(os.getenv("ProgramData"))
|
|
|
|
|
- return program_data / "pc_backup"
|
|
|
|
|
- else:
|
|
|
|
|
- return Path.home() / ".config" / "pc_backup"
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def read(cls, hostname: str, login: str, config_dir: Path):
|
|
|
|
|
- secret_sources_file = config_dir / f"secret_sources_{hostname}_{login}"
|
|
|
|
|
- data_sources_file = config_dir / f"data_sources_{hostname}_{login}"
|
|
|
|
|
- ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
|
|
|
|
|
-
|
|
|
|
|
- return cls(
|
|
|
|
|
- secret_sources=Secret.read_sources(secret_sources_file),
|
|
|
|
|
- data_sources=read_data_sources(data_sources_file),
|
|
|
|
|
- borgmatic_d_path=config_dir / "borgmatic.d",
|
|
|
|
|
- borgmatic_path=config_dir / "borgmatic",
|
|
|
|
|
- history_file=config_dir / f".bash_history_{login}",
|
|
|
|
|
- ssh_auth_sock=Path(ssh_auth_sock) if ssh_auth_sock else None
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@dataclass
|
|
|
|
|
-class BorgmaticContainer:
|
|
|
|
|
- hostname: str
|
|
|
|
|
- login: str
|
|
|
|
|
- name: str
|
|
|
|
|
- image: str = "ghcr.io/borgmatic-collective/borgmatic"
|
|
|
|
|
-
|
|
|
|
|
- def run(self, config: Configuration):
|
|
|
|
|
- container_name = f"borgmatic_{self.login}"
|
|
|
|
|
-
|
|
|
|
|
- config.history_file.touch()
|
|
|
|
|
- volumes = [
|
|
|
|
|
- f"{config.borgmatic_d_path}:/etc/borgmatic.d/",
|
|
|
|
|
- f"{config.borgmatic_path}:/etc/borgmatic/",
|
|
|
|
|
- f"{config.history_file}:/root/.bash_history",
|
|
|
|
|
- "borg_ssh_dir:/root/.ssh",
|
|
|
|
|
- "borg_config:/root/.config/borg",
|
|
|
|
|
- "borg_cache:/root/.cache/borg",
|
|
|
|
|
- "borgmatic_state:/root/.local/state/borgmatic",
|
|
|
|
|
- "borgmatic_log:/root/.local/share/borgmatic",
|
|
|
|
|
- ]
|
|
|
|
|
- if config.ssh_auth_sock:
|
|
|
|
|
- volumes += [f"{config.ssh_auth_sock}:{config.ssh_auth_sock}:Z"]
|
|
|
|
|
-
|
|
|
|
|
- volumes += [
|
|
|
|
|
- f"{vol}:{self.to_source_path(vol)}:ro" for vol in config.data_sources
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
- volume_args = [a for vol in volumes for a in ["-v", vol]]
|
|
|
|
|
-
|
|
|
|
|
- secrets_args = [
|
|
|
|
|
- a
|
|
|
|
|
- for s in config.secret_sources
|
|
|
|
|
- for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
- args = (
|
|
|
|
|
- [
|
|
|
|
|
- "podman",
|
|
|
|
|
- "run",
|
|
|
|
|
- "-h",
|
|
|
|
|
- self.hostname,
|
|
|
|
|
- "--detach",
|
|
|
|
|
- "--name",
|
|
|
|
|
- container_name,
|
|
|
|
|
- "-e",
|
|
|
|
|
- "SSH_AUTH_SOCK",
|
|
|
|
|
- "-e",
|
|
|
|
|
- "TZ=Europe/Paris",
|
|
|
|
|
- "-e",
|
|
|
|
|
- "SSH_KEY_NAME",
|
|
|
|
|
- "-e",
|
|
|
|
|
- f"HOST_LOGIN={self.login}",
|
|
|
|
|
- "--security-opt=label=disable",
|
|
|
|
|
- ]
|
|
|
|
|
- + volume_args
|
|
|
|
|
- + secrets_args
|
|
|
|
|
- + [self.image]
|
|
|
|
|
- )
|
|
|
|
|
- print(args)
|
|
|
|
|
- subprocess.run(args)
|
|
|
|
|
-
|
|
|
|
|
- def rm(self):
|
|
|
|
|
- subprocess.run(["podman", "rm", "-f", self.name])
|
|
|
|
|
-
|
|
|
|
|
- def exec(self, cmd: list[str], env_vars: list[str] = []):
|
|
|
|
|
- args = ["podman", "exec", "-ti"]
|
|
|
|
|
- args += [a for var in env_vars for a in ["-e", var]]
|
|
|
|
|
- subprocess.run(args + [self.name] + cmd)
|
|
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def to_source_path(path: Path):
|
|
|
|
|
- mount_base = PurePosixPath("/mnt") / "source"
|
|
|
|
|
- inner_path = PurePosixPath(path)
|
|
|
|
|
- with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(
|
|
|
|
|
- *inner_path.parts[1:]
|
|
|
|
|
- )
|
|
|
|
|
- return mount_base / with_drive.relative_to(with_drive.anchor)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def new(cls, hostname: str, login: str):
|
|
|
|
|
- return cls(hostname, login, f"borgmatic_{login}")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CliArguments:
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def read_command(parser):
|
|
|
|
|
- args = parser.parse_args()
|
|
|
|
|
- return args.command(args)
|
|
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def new() -> argparse.ArgumentParser:
|
|
|
|
|
- parser = argparse.ArgumentParser(prog=sys.argv[0])
|
|
|
|
|
- subparsers = parser.add_subparsers()
|
|
|
|
|
-
|
|
|
|
|
- for sub in [
|
|
|
|
|
- CommandStart,
|
|
|
|
|
- CommandRm,
|
|
|
|
|
- CommandBash,
|
|
|
|
|
- CommandCreateRepo,
|
|
|
|
|
- CommandExportKey,
|
|
|
|
|
- CommandCreateSecrets,
|
|
|
|
|
- ]:
|
|
|
|
|
- p = subparsers.add_parser(sub.command, help=sub.help)
|
|
|
|
|
- sub.init_subparser(p)
|
|
|
|
|
- p.set_defaults(command=sub)
|
|
|
|
|
-
|
|
|
|
|
- return parser
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class Command:
|
|
|
|
|
- def __init__(self, namespace) -> None:
|
|
|
|
|
- for k, v in vars(namespace).items():
|
|
|
|
|
- if k != "type_":
|
|
|
|
|
- setattr(self, k, v)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def init_subparser(cls, p): ...
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandStart(Command):
|
|
|
|
|
- command = "start"
|
|
|
|
|
- help = "start container"
|
|
|
|
|
-
|
|
|
|
|
- def run(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- container: BorgmaticContainer,
|
|
|
|
|
- config: Configuration,
|
|
|
|
|
- **kwargs,
|
|
|
|
|
- ):
|
|
|
|
|
- container.run(config)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandRm(Command):
|
|
|
|
|
- command = "rm"
|
|
|
|
|
- help = "remove container"
|
|
|
|
|
-
|
|
|
|
|
- def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
|
|
- container.rm()
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandBash(Command):
|
|
|
|
|
- command = "bash"
|
|
|
|
|
- help = "run shell in container"
|
|
|
|
|
-
|
|
|
|
|
- def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
|
|
- container.exec(["bash"])
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandCreateRepo(Command):
|
|
|
|
|
- command = "create_repo"
|
|
|
|
|
- help = "create repository"
|
|
|
|
|
- env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
|
|
|
|
|
-
|
|
|
|
|
- def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
|
|
- container.exec(
|
|
|
|
|
- ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandExportKey(Command):
|
|
|
|
|
- command = "export_key"
|
|
|
|
|
- help = "export the repository key"
|
|
|
|
|
- env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
|
|
|
|
|
-
|
|
|
|
|
- def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
|
|
- container.exec(["borgmatic", "export", "key"], self.env_vars)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class CommandCreateSecrets(Command):
|
|
|
|
|
- command = "create_secrets"
|
|
|
|
|
- help = "create podman secrets"
|
|
|
|
|
-
|
|
|
|
|
- def run(self, *, secret_sources: list[Secret], **kwargs):
|
|
|
|
|
- keepass = KeePass.new(self.keepass_path)
|
|
|
|
|
- for s in secret_sources:
|
|
|
|
|
- s.create(keepass)
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def init_subparser(cls, p):
|
|
|
|
|
- p.add_argument("keepass_path", type=Path, help="Path to the keepass")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def main():
|
|
|
|
|
- login = os.getlogin()
|
|
|
|
|
- hostname = socket.gethostname()
|
|
|
|
|
-
|
|
|
|
|
- config = Configuration.read(hostname, login, Configuration.get_config_dir())
|
|
|
|
|
-
|
|
|
|
|
- if not config.secret_sources:
|
|
|
|
|
- print("no secret required ?")
|
|
|
|
|
-
|
|
|
|
|
- container = BorgmaticContainer.new(hostname, login)
|
|
|
|
|
-
|
|
|
|
|
- parser = CliArguments.new()
|
|
|
|
|
- command = CliArguments.read_command(parser)
|
|
|
|
|
- command.run(
|
|
|
|
|
- config=config,
|
|
|
|
|
- secret_sources=config.secret_sources,
|
|
|
|
|
- data_sources=config.data_sources,
|
|
|
|
|
- container=container,
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-if __name__ == "__main__":
|
|
|
|
|
- main()
|
|
|