| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- import argparse
- import os
- import sys
- import subprocess
- import socket
- from pathlib import Path, PurePosixPath
- from dataclasses import dataclass
- from typing import Any
- is_windows = os.name == "nt"
- def read_data_sources(hostname: str, login: str) -> list[Path]:
- file = Path(f"./data_sources_{hostname}_{login}")
- with open(file) as f:
- paths = f.readlines()
- return [Path(p_str.strip()).expanduser() for p_str in paths]
- @dataclass
- class KeePass:
- path: Path
- bin: str | Path
- def read_entry_attribute(self, key, attribute):
- return self._exec(["show", "-a", attribute, self.path, key]).strip()
- def read_entry_attachment(self, key, attachment):
- return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
- def _exec(self, args: list[Any]):
- try:
- return subprocess.check_output([self.bin] + args, text=True)
- except subprocess.CalledProcessError as e:
- print("\nThere was an error on call to keepass, please check the outout")
- exit(1)
- @classmethod
- def new(cls, path: Path):
- binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
- return cls(path=path, bin=binary)
- @dataclass
- class Secret:
- name: str
- mode: int
- def create(self, keepass: KeePass):
- ...
- @classmethod
- def from_line(cls, line: str):
- type_, *args = line.split(",")
- match type_:
- case "file":
- return SecretFile.from_line(*args)
- case "keepass-attribute":
- return SecretKeepassAttribute.from_line(*args)
- case "keepass-attachment":
- return SecretKeepassAttachment.from_line(*args)
- @classmethod
- def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
- file = Path(f"./secret_sources_{hostname}_{login}")
- with open(file) as f:
- lines = f.readlines()
- return [cls.from_line(l.strip()) for l in lines]
- @dataclass
- class SecretKeepassAttachment(Secret):
- key: str
- attachment: str
- def create(self, keepass: KeePass):
- value = keepass.read_entry_attachment(self.key, self.attachment)
- args = ["podman", "secret", "create", "--replace", self.name, "-"]
- print(args)
- subprocess.run(args, input=value.encode())
- @classmethod
- def from_line(cls, key, attachment):
- return cls(name=key, key=key, mode=0o0400, attachment=attachment)
- @dataclass
- class SecretKeepassAttribute(Secret):
- key: str
- attribute: str
- def create(self, keepass: KeePass):
- value = keepass.read_entry_attribute(self.key, self.attribute)
- args = ["podman", "secret", "create", "--replace", self.name, "-"]
- print(args)
- subprocess.run(args, input=value.encode())
- @classmethod
- def from_line(cls, key, attribute):
- return cls(name=key, key=key, mode=0o0400, attribute=attribute)
- @dataclass
- class SecretFile(Secret):
- host_path: Path
- def create(self, keepass: KeePass):
- args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
- print(args)
- subprocess.run(args)
- @classmethod
- def from_line(cls, path: str):
- path = Path(path).expanduser()
- return cls(host_path=path, name=path.name, mode=0o0400)
- def to_source_path(path: Path):
- mount_base = PurePosixPath("/mnt") / "source"
- inner_path = PurePosixPath(path)
- with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:])
- return mount_base / with_drive.relative_to(with_drive.anchor)
- @dataclass
- class BorgmaticContainer:
- hostname: str
- login: str
- name: str
- image: str = "ghcr.io/borgmatic-collective/borgmatic"
- def run(self, data_sources: list[Path], secret_sources: list[Secret]):
- container_name = f"borgmatic_{self.login}"
- ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
- data_path = Path.cwd() / "data"
- config_d_path = data_path / "borgmatic.d"
- config_path = data_path / "borgmatic"
- history_file = data_path / ".bash_history"
- history_file.touch()
- volumes = [
- f"{config_d_path}:/etc/borgmatic.d/",
- f"{config_path}:/etc/borgmatic/",
- f"{history_file}:/root/.bash_history",
- "borg_ssh_dir:/root/.ssh",
- "borg_config:/root/.config/borg",
- "borg_cache:/root/.cache/borg",
- "borgmatic_state:/root/.local/state/borgmatic",
- ]
- if ssh_auth_sock:
- volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
- volumes += [
- f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources
- ]
- volume_args = [a for vol in volumes for a in ["-v", vol]]
- secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
- args = [
- "podman",
- "run",
- "-h",
- self.hostname,
- "--detach",
- "--name",
- container_name,
- "-e",
- "SSH_AUTH_SOCK",
- "-e",
- "TZ=Europe/Paris",
- "-e",
- "SSH_KEY_NAME",
- "-e",
- f"HOST_LOGIN={self.login}",
- "--security-opt=label=disable"
- ] + volume_args + secrets_args + [self.image]
- print(args)
- subprocess.run(args)
- def rm(self):
- subprocess.run(["podman", "rm", "-f", self.name])
- def exec(self, cmd: list[str], env_vars: list[str] = []):
- args = ["podman", "exec", "-ti"]
- args += [a for var in env_vars for a in ["-e", var]]
- subprocess.run(args + [self.name] + cmd)
- @classmethod
- def new(cls, hostname: str, login: str):
- return cls(hostname, login, f"borgmatic_{login}")
- class CliArguments:
- def __init__(self, namespace) -> None:
- for k, v in vars(namespace).items():
- if k != "type_":
- setattr(self, k, v)
- @classmethod
- def init_subparser(cls, p):
- ...
- @classmethod
- def new(cls) -> None:
- parser = argparse.ArgumentParser(prog=sys.argv[0])
- subparsers = parser.add_subparsers()
- for sub in [
- CliArgumentsStart,
- CliArgumentsRm,
- CliArgumentsBash,
- CliArgumentsCreateRepo,
- CliArgumentsExportKey,
- CliArgumentsCreateSecrets
- ]:
- p = subparsers.add_parser(sub.command, help=sub.help)
- sub.init_subparser(p)
- p.set_defaults(type_=sub)
- args = parser.parse_args()
- return args.type_(args)
- class CliArgumentsStart(CliArguments):
- command = "start"
- help = "start container"
- def run(self, *, container: BorgmaticContainer, data_sources: list[Path], secret_sources: list[Secret], **kwargs):
- container.run(data_sources, secret_sources)
- class CliArgumentsRm(CliArguments):
- command = "rm"
- help = "remove container"
- def run(self, *, container: BorgmaticContainer, **kwargs):
- container.rm()
- class CliArgumentsBash(CliArguments):
- command = "bash"
- help = "run shell in container"
- def run(self, *, container: BorgmaticContainer, **kwargs):
- container.exec(["bash"])
- class CliArgumentsCreateRepo(CliArguments):
- command = "create_repo"
- help = "create repository"
- env_vars=["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
- def run(self, *, container: BorgmaticContainer, **kwargs):
- container.exec(["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars)
- class CliArgumentsExportKey(CliArguments):
- command = "export_key"
- help = "export the repository key"
- env_vars=["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
- def run(self, *, container: BorgmaticContainer, **kwargs):
- container.exec(["borgmatic", "export", "key"], self.env_vars)
- class CliArgumentsCreateSecrets(CliArguments):
- command="create_secrets"
- help="create podman secrets"
- def run(self, *, secret_sources: list[Secret], **kwargs):
- keepass = KeePass.new(self.keepass_path)
- for s in secret_sources:
- s.create(keepass)
- @classmethod
- def init_subparser(cls, p):
- p.add_argument("keepass_path", type=Path, help="Path to the keepass")
- def main():
- login = os.getlogin()
- hostname = socket.gethostname()
- secret_sources = Secret.read_sources(hostname, login)
- data_sources = read_data_sources(hostname, login)
- if not secret_sources:
- print("no secret required ?")
- container = BorgmaticContainer.new(hostname, login)
- args = CliArguments.new()
- args.run(
- secret_sources=secret_sources,
- data_sources=data_sources,
- container=container,
- )
- if __name__ == "__main__":
- main()
|